재시도 모드 설정 (Configuring a retry mode)
가능한 재시도 모드 목록 (Available retry modes)
- legacy
- 기본 값
- Older (v1)
- 기본적으로 최대 5번 재시도 수행
- max_attempts 값을 설정하여 변경 가능
- 재시도를 지원하는 오류와 예외의 목록이 v2 대비 적음
- standard
- Updated retry handler (v2)
- 기본적으로 최대 3번 재시도 수행
- max_attempts 값을 설정하여 변경 가능
- 설정한 max_attempts와 back-off 알고리즘에 따라 자동으로 재시도를 수행
- legacy 모드 (v1) 대비 더 많은 오류와 예외에 대한 재시도 지원
- adaptive
- 실험적 모드
Boto3 코드 예시
import boto3
from botocore.config import Config
config = Config(
retries = {
'max_attempts': 10,
'mode': 'standard'
}
)
ec2 = boto3.client('ec2', config=config)
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html
재시도 검증 (Validating retry attempts)
Legacy 모드인 경우:
botocore.retryhandler
- No retry needed
- Retry needed, action of: <action_value>
- Reached the maximum number of retry attempts: <attempt_num>
Standard 또는 adaptive 모드인 경우:
botocore.retryhandler
- Not retrying request
- Retry needed, retrying request after deay of: <delay_value>
- Retry needed but retry quota reached, not retrying request
Boto3의 event system 사용 하기
needs-retry
needs-retry.service-name.operation-name
최근 수행한 요청을 재시도 해야하는 경우 발생.
import boto3
s3 = boto3.client('s3')
# Access the event system on the S3 client
event_system = s3.meta.events
# Create a handler that determines retry behavior.
def needs_retry_handler(**kwargs):
# Implement custom retry logic
if some_condition:
return None
else:
return some_delay
# Register the function to an event
event_system.register('needs-retry', needs_retry_handler)
s3.list_buckets()
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/events.html
만약 SQS client를 사용중이고, send_message_batch 함수를 사용중일 때, retry 에 관한 event를 받고 싶다면, 다음 이벤트를 등록하면 된다.
needs-retry.sqs.SendMessageBatch
만약 sqs에 대한 retry 이벤트 전체를 구독하려 한다면,
needs-retry.sqs
를 사용할 수도 있다.
import boto3
import logging
import sys
from botocore.config import Config
from botocore.exceptions import ReadTimeoutError, ConnectTimeoutError, EndpointConnectionError
# 로그 설정: 표준 출력으로 기록 (Lambda 환경 등에서 CloudWatch Logs로 전송됨)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
stream=sys.stdout
)
logger = logging.getLogger(__name__)
# boto3 클라이언트에 사용할 Config 객체
retry_config = Config(
retries={
'max_attempts': 5, # 최초 시도 포함 총 5회 시도 (즉, 1회 시도 후 4회 재시도)
'mode': 'standard' # 표준 재시도 로직: 지수 백오프 및 지터 포함
},
connect_timeout=30, # 연결 타임아웃 30초
read_timeout=30 # 응답 타임아웃 30초
)
# SQS 클라이언트 생성
sqs_client = boto3.client('sqs', config=retry_config)
# needs-retry 이벤트 핸들러 등록: 재시도 이벤트 발생 시 실패한 메시지(Failed 필드)가 있으면 로그로 출력
def log_retry_event(**kwargs):
# kwargs는 재시도와 관련된 상세정보(예: current attempt, last exception, response 등)를 포함할 수 있습니다.
if 'response' in kwargs:
response = kwargs['response']
# SendMessageBatch의 경우, response에 Failed 리스트가 있을 수 있음
if isinstance(response, dict) and response.get('Failed'):
logger.warning("Retry event triggered. Failed messages: %s", response['Failed'])
logger.info("SendMessageBatch retry event: %s", kwargs)
# kwargs에는 operation, attempts, event_name 등의 추가 정보가 있을 수 있다.
operation = kwargs.get('operation', 'Unknwon operation')
operation = kwargs.get('attempts', -1)
operation = kwargs.get('event_name', 'Unknwon event')
# 원본 요청 정보를 얻고 싶은 경우, request_dict를 참고할 수 있다.
body = kwargs.get('request_dict', {}).get('body', '')
sqs_client.meta.events.register('needs-retry.sqs.SendMessageBatch', log_retry_event)
def send_sqs_batch_with_logging(queue_url, messages):
"""
SQS에 메시지를 배치로 전송합니다.
boto3의 내장 재시도 로직(Standard 모드)과 함께, 재시도 이벤트 발생 시 실패한 메시지 정보를 로그로 남깁니다.
:param queue_url: SQS 대기열 URL
:param messages: 각 메시지는 'Id'와 'MessageBody'를 포함하는 dict 목록 (최대 10개)
:return: send_message_batch 응답 (성공/실패에 대한 정보 포함)
"""
try:
response = sqs_client.send_message_batch(QueueUrl=queue_url, Entries=messages)
return response
except (ReadTimeoutError, ConnectTimeoutError, EndpointConnectionError) as e:
logger.error("Timeout or connection error during send_message_batch: %s", e)
return None
except Exception as e:
logger.error("Unexpected error during send_message_batch: %s", e)
return None
if __name__ == '__main__':
# 실제 사용 중인 SQS 대기열 URL을 입력하세요.
queue_url = "https://sqs.<region>.amazonaws.com/<account_id>/<queue_name>"
# 예시 메시지들 (각 메시지의 'Id'는 배치 내에서 고유해야 합니다.)
messages = [
{"Id": "msg1", "MessageBody": "Hello, this is message 1"},
{"Id": "msg2", "MessageBody": "Hello, this is message 2"}
]
result = send_sqs_batch_with_logging(queue_url, messages)
if result:
logger.info("SQS batch send succeeded:")
logger.info(result)
if result.get("Failed"):
logger.warning("Some messages failed even after retries: %s", result["Failed"])
else:
logger.error("SQS batch send failed.")
