재시도 모드 설정 (Configuring a retry mode)
가능한 재시도 모드 목록 (Available retry modes)
- legacy
- 기본 값
- Older (v1)
- 기본적으로 최대 5번 재시도 수행
- max_attempts 값을 설정하여 변경 가능
- 재시도를 지원하는 오류와 예외의 목록이 v2 대비 적음
- standard
- Updated retry handler (v2)
- 기본적으로 최대 3번 재시도 수행
- max_attempts 값을 설정하여 변경 가능
- 설정한 max_attempts와 back-off 알고리즘에 따라 자동으로 재시도를 수행
- legacy 모드 (v1) 대비 더 많은 오류와 예외에 대한 재시도 지원
- adaptive
- 실험적 모드
Boto3 코드 예시
import boto3 from botocore.config import Config config = Config( retries = { 'max_attempts': 10, 'mode': 'standard' } ) ec2 = boto3.client('ec2', config=config)
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html
재시도 검증 (Validating retry attempts)
Legacy 모드인 경우:
botocore.retryhandler
- No retry needed
- Retry needed, action of: <action_value>
- Reached the maximum number of retry attempts: <attempt_num>
Standard 또는 adaptive 모드인 경우:
botocore.retryhandler
- Not retrying request
- Retry needed, retrying request after deay of: <delay_value>
- Retry needed but retry quota reached, not retrying request
Boto3의 event system 사용 하기
needs-retry
needs-retry.service-name.operation-name
최근 수행한 요청을 재시도 해야하는 경우 발생.
import boto3 s3 = boto3.client('s3') # Access the event system on the S3 client event_system = s3.meta.events # Create a handler that determines retry behavior. def needs_retry_handler(**kwargs): # Implement custom retry logic if some_condition: return None else: return some_delay # Register the function to an event event_system.register('needs-retry', needs_retry_handler) s3.list_buckets()
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/events.html
만약 SQS client를 사용중이고, send_message_batch
함수를 사용중일 때, retry 에 관한 event를 받고 싶다면, 다음 이벤트를 등록하면 된다.
needs-retry.sqs.SendMessageBatch
만약 sqs에 대한 retry 이벤트 전체를 구독하려 한다면,
needs-retry.sqs
를 사용할 수도 있다.
import boto3 import logging import sys from botocore.config import Config from botocore.exceptions import ReadTimeoutError, ConnectTimeoutError, EndpointConnectionError # 로그 설정: 표준 출력으로 기록 (Lambda 환경 등에서 CloudWatch Logs로 전송됨) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", stream=sys.stdout ) logger = logging.getLogger(__name__) # boto3 클라이언트에 사용할 Config 객체 retry_config = Config( retries={ 'max_attempts': 5, # 최초 시도 포함 총 5회 시도 (즉, 1회 시도 후 4회 재시도) 'mode': 'standard' # 표준 재시도 로직: 지수 백오프 및 지터 포함 }, connect_timeout=30, # 연결 타임아웃 30초 read_timeout=30 # 응답 타임아웃 30초 ) # SQS 클라이언트 생성 sqs_client = boto3.client('sqs', config=retry_config) # needs-retry 이벤트 핸들러 등록: 재시도 이벤트 발생 시 실패한 메시지(Failed 필드)가 있으면 로그로 출력 def log_retry_event(**kwargs): # kwargs는 재시도와 관련된 상세정보(예: current attempt, last exception, response 등)를 포함할 수 있습니다. if 'response' in kwargs: response = kwargs['response'] # SendMessageBatch의 경우, response에 Failed 리스트가 있을 수 있음 if isinstance(response, dict) and response.get('Failed'): logger.warning("Retry event triggered. Failed messages: %s", response['Failed']) logger.info("SendMessageBatch retry event: %s", kwargs) # kwargs에는 operation, attempts, event_name 등의 추가 정보가 있을 수 있다. operation = kwargs.get('operation', 'Unknwon operation') operation = kwargs.get('attempts', -1) operation = kwargs.get('event_name', 'Unknwon event') # 원본 요청 정보를 얻고 싶은 경우, request_dict를 참고할 수 있다. body = kwargs.get('request_dict', {}).get('body', '') sqs_client.meta.events.register('needs-retry.sqs.SendMessageBatch', log_retry_event) def send_sqs_batch_with_logging(queue_url, messages): """ SQS에 메시지를 배치로 전송합니다. boto3의 내장 재시도 로직(Standard 모드)과 함께, 재시도 이벤트 발생 시 실패한 메시지 정보를 로그로 남깁니다. :param queue_url: SQS 대기열 URL :param messages: 각 메시지는 'Id'와 'MessageBody'를 포함하는 dict 목록 (최대 10개) :return: send_message_batch 응답 (성공/실패에 대한 정보 포함) """ try: response = sqs_client.send_message_batch(QueueUrl=queue_url, Entries=messages) return response except (ReadTimeoutError, ConnectTimeoutError, EndpointConnectionError) as e: logger.error("Timeout or connection error during send_message_batch: %s", e) return None except Exception as e: logger.error("Unexpected error during send_message_batch: %s", e) return None if __name__ == '__main__': # 실제 사용 중인 SQS 대기열 URL을 입력하세요. queue_url = "https://sqs.<region>.amazonaws.com/<account_id>/<queue_name>" # 예시 메시지들 (각 메시지의 'Id'는 배치 내에서 고유해야 합니다.) messages = [ {"Id": "msg1", "MessageBody": "Hello, this is message 1"}, {"Id": "msg2", "MessageBody": "Hello, this is message 2"} ] result = send_sqs_batch_with_logging(queue_url, messages) if result: logger.info("SQS batch send succeeded:") logger.info(result) if result.get("Failed"): logger.warning("Some messages failed even after retries: %s", result["Failed"]) else: logger.error("SQS batch send failed.")