Blog categories

Comments

[AWS Boto3] Retry 설정

[AWS Boto3] Retry 설정

재시도 모드 설정 (Configuring a retry mode)

가능한 재시도 모드 목록 (Available retry modes)

  • legacy
    • 기본 값
    • Older (v1)
    • 기본적으로 최대 5번 재시도 수행
      • max_attempts 값을 설정하여 변경 가능
    • 재시도를 지원하는 오류와 예외의 목록이 v2 대비 적음
  • standard
    • Updated retry handler (v2)
    • 기본적으로 최대 3번 재시도 수행
      • max_attempts 값을 설정하여 변경 가능
    • 설정한 max_attempts와 back-off 알고리즘에 따라 자동으로 재시도를 수행
    • legacy 모드 (v1) 대비 더 많은 오류와 예외에 대한 재시도 지원
  • adaptive
    • 실험적 모드

Boto3 코드 예시

import boto3
from botocore.config import Config

config = Config(
   retries = {
      'max_attempts': 10,
      'mode': 'standard'
   }
)

ec2 = boto3.client('ec2', config=config)

https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html

재시도 검증 (Validating retry attempts)

Legacy 모드인 경우:

botocore.retryhandler

  • No retry needed
  • Retry needed, action of: <action_value>
  • Reached the maximum number of retry attempts: <attempt_num>

Standard 또는 adaptive 모드인 경우:

botocore.retryhandler

  • Not retrying request
  • Retry needed, retrying request after deay of: <delay_value>
  • Retry needed but retry quota reached, not retrying request

Boto3의 event system 사용 하기

needs-retry

needs-retry.service-name.operation-name

최근 수행한 요청을 재시도 해야하는 경우 발생.

import boto3

s3 = boto3.client('s3')

# Access the event system on the S3 client
event_system = s3.meta.events

# Create a handler that determines retry behavior.
def needs_retry_handler(**kwargs):
    # Implement custom retry logic
    if some_condition:
        return None
    else:
        return some_delay

# Register the function to an event
event_system.register('needs-retry', needs_retry_handler)

s3.list_buckets()

https://boto3.amazonaws.com/v1/documentation/api/latest/guide/events.html

만약 SQS client를 사용중이고, send_message_batch 함수를 사용중일 때, retry 에 관한 event를 받고 싶다면, 다음 이벤트를 등록하면 된다.

needs-retry.sqs.SendMessageBatch

만약 sqs에 대한 retry 이벤트 전체를 구독하려 한다면,

needs-retry.sqs

를 사용할 수도 있다.

import boto3
import logging
import sys
from botocore.config import Config
from botocore.exceptions import ReadTimeoutError, ConnectTimeoutError, EndpointConnectionError

# 로그 설정: 표준 출력으로 기록 (Lambda 환경 등에서 CloudWatch Logs로 전송됨)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    stream=sys.stdout
)
logger = logging.getLogger(__name__)

# boto3 클라이언트에 사용할 Config 객체
retry_config = Config(
    retries={
        'max_attempts': 5,    # 최초 시도 포함 총 5회 시도 (즉, 1회 시도 후 4회 재시도)
        'mode': 'standard'    # 표준 재시도 로직: 지수 백오프 및 지터 포함
    },
    connect_timeout=30,       # 연결 타임아웃 30초
    read_timeout=30           # 응답 타임아웃 30초
)

# SQS 클라이언트 생성
sqs_client = boto3.client('sqs', config=retry_config)

# needs-retry 이벤트 핸들러 등록: 재시도 이벤트 발생 시 실패한 메시지(Failed 필드)가 있으면 로그로 출력
def log_retry_event(**kwargs):
    # kwargs는 재시도와 관련된 상세정보(예: current attempt, last exception, response 등)를 포함할 수 있습니다.
    if 'response' in kwargs:
        response = kwargs['response']
        # SendMessageBatch의 경우, response에 Failed 리스트가 있을 수 있음
        if isinstance(response, dict) and response.get('Failed'):
            logger.warning("Retry event triggered. Failed messages: %s", response['Failed'])
    logger.info("SendMessageBatch retry event: %s", kwargs)
    # kwargs에는 operation, attempts, event_name 등의 추가 정보가 있을 수 있다.
    operation = kwargs.get('operation', 'Unknwon operation')
    operation = kwargs.get('attempts', -1)
    operation = kwargs.get('event_name', 'Unknwon event')

    # 원본 요청 정보를 얻고 싶은 경우, request_dict를 참고할 수 있다.
    body = kwargs.get('request_dict', {}).get('body', '')
sqs_client.meta.events.register('needs-retry.sqs.SendMessageBatch', log_retry_event)

def send_sqs_batch_with_logging(queue_url, messages):
    """
    SQS에 메시지를 배치로 전송합니다.
    boto3의 내장 재시도 로직(Standard 모드)과 함께, 재시도 이벤트 발생 시 실패한 메시지 정보를 로그로 남깁니다.
    
    :param queue_url: SQS 대기열 URL
    :param messages: 각 메시지는 'Id'와 'MessageBody'를 포함하는 dict 목록 (최대 10개)
    :return: send_message_batch 응답 (성공/실패에 대한 정보 포함)
    """
    try:
        response = sqs_client.send_message_batch(QueueUrl=queue_url, Entries=messages)
        return response
    except (ReadTimeoutError, ConnectTimeoutError, EndpointConnectionError) as e:
        logger.error("Timeout or connection error during send_message_batch: %s", e)
        return None
    except Exception as e:
        logger.error("Unexpected error during send_message_batch: %s", e)
        return None

if __name__ == '__main__':
    # 실제 사용 중인 SQS 대기열 URL을 입력하세요.
    queue_url = "https://sqs.<region>.amazonaws.com/<account_id>/<queue_name>"
    
    # 예시 메시지들 (각 메시지의 'Id'는 배치 내에서 고유해야 합니다.)
    messages = [
        {"Id": "msg1", "MessageBody": "Hello, this is message 1"},
        {"Id": "msg2", "MessageBody": "Hello, this is message 2"}
    ]
    
    result = send_sqs_batch_with_logging(queue_url, messages)
    if result:
        logger.info("SQS batch send succeeded:")
        logger.info(result)
        if result.get("Failed"):
            logger.warning("Some messages failed even after retries: %s", result["Failed"])
    else:
        logger.error("SQS batch send failed.")

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다

div#stuning-header .dfd-stuning-header-bg-container {background-image: url(https://tech.sangron.com/wp-content/uploads/sites/2/2018/02/python_wallpaper_background.jpg);background-color: #3f3f3f;background-size: cover;background-position: top center;background-attachment: initial;background-repeat: no-repeat;}#stuning-header div.page-title-inner {min-height: 350px;}