Error Handling and Retry Strategies for AWS Bedrock
Error Handling and Retry Strategies for AWS Bedrock
This tutorial explores robust error handling and retry strategies for AWS Bedrock, with a focus on managing throttling and quota limitations.
Objective
By the end of this tutorial, you’ll be able to:
- Identify and handle different types of AWS Bedrock errors
- Implement effective retry strategies with exponential backoff
- Create a robust error handling framework for production applications
- Optimize retry behavior around quota limitations
Prerequisites
- Understanding of AWS Bedrock and its quota system
- Familiarity with Python error handling concepts
- Basic understanding of boto3 and AWS SDK error patterns
Understanding AWS Bedrock Error Types
AWS Bedrock can return several types of errors that require different handling strategies:
1. Throttling Errors
These occur when you exceed your quota limits:
- ThrottlingException - You’ve exceeded RPM or TPM quotas
- TooManyRequestsException - Too many concurrent requests
- ServiceQuotaExceededException - Explicit quota limit exceeded
2. Validation Errors
These indicate issues with your request format:
- ValidationException - Invalid request structure or parameters
- InvalidRequestException - Malformed request
- ModelNotReadyException - Model is not ready for inference
3. Service Errors
These represent issues on the AWS side:
- ServiceUnavailableException - Temporary service unavailability
- InternalServerException - Internal error in the AWS service
- ServiceException - General service error
4. Authentication/Authorization Errors
These indicate permission issues:
- AccessDeniedException - Insufficient permissions
- UnauthorizedException - Invalid credentials
- ResourceNotFoundException - Specified resource does not exist
Step 1: Basic Error Handling Structure
Let’s start with a basic error handling structure for AWS Bedrock:
import boto3
import json
import logging
from botocore.exceptions import ClientError
from utils.profile_manager import get_profile
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def invoke_with_basic_error_handling(model_id, prompt_data):
"""
Invoke a model with basic error handling.
Args:
model_id: The model identifier
prompt_data: Dictionary with the prompt payload
Returns:
Model response or None if an error occurred
"""
# Use the configured profile (defaults to 'aws' for local testing)
profile_name = get_profile()
session = boto3.Session(profile_name=profile_name)
bedrock_runtime = session.client('bedrock-runtime')
try:
# Invoke the model
response = bedrock_runtime.invoke_model(
modelId=model_id,
body=json.dumps(prompt_data)
)
# Process the response
response_body = json.loads(response['body'].read())
return response_body
except ClientError as e:
error_code = e.response['Error']['Code']
error_message = e.response['Error']['Message']
if error_code == 'ThrottlingException':
logger.warning(f"Request throttled: {error_message}")
elif error_code == 'ValidationException':
logger.error(f"Validation error: {error_message}")
elif error_code == 'ServiceUnavailableException':
logger.warning(f"Service unavailable: {error_message}")
elif error_code == 'AccessDeniedException':
logger.error(f"Access denied: {error_message}")
else:
logger.error(f"Error invoking model: {error_code} - {error_message}")
return None
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return None
Step 2: Implementing Exponential Backoff with Jitter
For throttling errors, implement exponential backoff with jitter to spread out retry attempts:
import random
import time
def invoke_with_backoff(model_id, prompt_data, max_retries=5, base_delay=1.0):
"""
Invoke a model with exponential backoff retry strategy.
Args:
model_id: The model identifier
prompt_data: Dictionary with the prompt payload
max_retries: Maximum number of retry attempts
base_delay: Base delay between retries in seconds
Returns:
Model response or None if all retries failed
"""
profile_name = get_profile()
session = boto3.Session(profile_name=profile_name)
bedrock_runtime = session.client('bedrock-runtime')
# Retry loop
retries = 0
while retries <= max_retries:
try:
# Invoke the model
response = bedrock_runtime.invoke_model(
modelId=model_id,
body=json.dumps(prompt_data)
)
# Process the response
response_body = json.loads(response['body'].read())
return response_body
except ClientError as e:
error_code = e.response['Error']['Code']
error_message = e.response['Error']['Message']
# Only retry on throttling or temporary service errors
if error_code in ['ThrottlingException', 'TooManyRequestsException',
'ServiceUnavailableException', 'ServiceException']:
retries += 1
if retries > max_retries:
logger.warning(f"Maximum retries ({max_retries}) exceeded. Giving up.")
return None
# Calculate backoff delay with jitter
delay = base_delay * (2 ** (retries - 1)) # Exponential backoff
jitter = delay * 0.2 * random.random() # 20% jitter
sleep_time = delay + jitter
logger.info(f"Throttled, retrying in {sleep_time:.2f}s (attempt {retries}/{max_retries})")
time.sleep(sleep_time)
else:
# Non-retryable error
logger.error(f"Non-retryable error: {error_code} - {error_message}")
return None
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return None
return None # Should not reach here, but just in case
Step 3: Creating an Error Classification System
Let’s create a more sophisticated error classification system:
class BedrockErrorClassifier:
"""
Classifies AWS Bedrock errors and provides handling recommendations.
"""
# Error categories
RETRYABLE = "retryable" # Can be retried
NON_RETRYABLE = "non-retryable" # Should not be retried
THROTTLING = "throttling" # Throttling-specific errors
VALIDATION = "validation" # Input validation errors
AUTHENTICATION = "authentication" # Auth/permission errors
SERVICE = "service" # AWS service errors
# Error codes mapped to categories
ERROR_CATEGORIES = {
# Throttling errors
"ThrottlingException": THROTTLING,
"TooManyRequestsException": THROTTLING,
"ServiceQuotaExceededException": THROTTLING,
# Validation errors
"ValidationException": VALIDATION,
"InvalidRequestException": VALIDATION,
"ModelNotReadyException": VALIDATION,
# Service errors
"ServiceUnavailableException": SERVICE,
"InternalServerException": SERVICE,
"ServiceException": SERVICE,
# Auth errors
"AccessDeniedException": AUTHENTICATION,
"UnauthorizedException": AUTHENTICATION,
"ResourceNotFoundException": AUTHENTICATION
}
# Errors that should be retried
RETRYABLE_CATEGORIES = {THROTTLING, SERVICE}
@classmethod
def classify(cls, error):
"""
Classify a boto3 ClientError.
Args:
error: The boto3 ClientError
Returns:
Tuple of (error_code, category, is_retryable)
"""
if not isinstance(error, ClientError):
return "UnknownError", "unknown", False
error_code = error.response['Error']['Code']
category = cls.ERROR_CATEGORIES.get(error_code, "unknown")
is_retryable = category in cls.RETRYABLE_CATEGORIES
return error_code, category, is_retryable
@classmethod
def get_retry_strategy(cls, error, attempt=0):
"""
Get recommended retry strategy for an error.
Args:
error: The boto3 ClientError
attempt: Current retry attempt (0-based)
Returns:
Dictionary with retry recommendations
"""
error_code, category, is_retryable = cls.classify(error)
if not is_retryable:
return {
"should_retry": False,
"reason": f"Non-retryable error category: {category}"
}
# Base delay for different categories
if category == cls.THROTTLING:
base_delay = 1.0 # Start with 1 second for throttling
elif category == cls.SERVICE:
base_delay = 2.0 # Start with 2 seconds for service errors
else:
base_delay = 0.5 # Default for other retryable errors
# Calculate delay with exponential backoff and jitter
delay = base_delay * (2 ** attempt)
jitter = delay * 0.2 * random.random() # 20% jitter
retry_delay = min(delay + jitter, 60) # Cap at 60 seconds
return {
"should_retry": True,
"retry_delay": retry_delay,
"category": category,
"error_code": error_code
}
Step 4: Building a Comprehensive Retry Framework
Now let’s create a comprehensive retry framework that’s quota-aware:
class BedrockRetryer:
"""
A comprehensive retry framework for AWS Bedrock operations.
"""
def __init__(self, max_retries=5, respect_quota=True):
"""
Initialize the retriever.
Args:
max_retries: Maximum number of retry attempts
respect_quota: Whether to be quota-aware in retry strategy
"""
self.max_retries = max_retries
self.respect_quota = respect_quota
self.error_classifier = BedrockErrorClassifier
# Quota tracking (simplified)
self.throttling_events = 0
self.last_throttle_time = 0
def execute(self, operation, *args, **kwargs):
"""
Execute an operation with retry logic.
Args:
operation: Function to execute
*args, **kwargs: Arguments to pass to the operation
Returns:
Result of the operation or None on failure
"""
attempt = 0
while attempt <= self.max_retries:
try:
# Check if we should apply quota-aware delay
if self.respect_quota and self.throttling_events > 0:
self._apply_quota_aware_delay()
# Execute the operation
result = operation(*args, **kwargs)
# Success - reset throttling counter
self.throttling_events = max(0, self.throttling_events - 1)
return result
except ClientError as e:
# Classify the error
error_code, category, is_retryable = self.error_classifier.classify(e)
# Log the error
logger.warning(f"AWS error: {error_code} ({category}) - {e.response['Error']['Message']}")
# Update throttling metrics if relevant
if category == BedrockErrorClassifier.THROTTLING:
self.throttling_events += 1
self.last_throttle_time = time.time()
# Check if we should retry
attempt += 1
if not is_retryable or attempt > self.max_retries:
logger.error(f"Not retrying: {'Max retries exceeded' if is_retryable else 'Non-retryable error'}")
return None
# Get retry strategy
retry_strategy = self.error_classifier.get_retry_strategy(e, attempt - 1)
# Apply the retry delay
delay = retry_strategy["retry_delay"]
logger.info(f"Retrying in {delay:.2f}s (attempt {attempt}/{self.max_retries})")
time.sleep(delay)
except Exception as e:
# Unexpected error
logger.error(f"Unexpected error: {str(e)}")
return None
return None # Should not reach here
def _apply_quota_aware_delay(self):
"""Apply an additional delay based on recent throttling events"""
now = time.time()
time_since_last_throttle = now - self.last_throttle_time
# If we've had throttling recently
if time_since_last_throttle < 60 and self.throttling_events > 1:
# Calculate a progressive delay based on throttling frequency
adaptive_delay = min(1.0 * self.throttling_events, 5.0)
logger.info(f"Adding quota-aware delay of {adaptive_delay:.2f}s")
time.sleep(adaptive_delay)
Step 5: Using the Framework with AWS Bedrock
Now let’s put it all together with AWS Bedrock:
def create_bedrock_client(profile_name=None):
"""Create a boto3 client for Bedrock with the specified profile"""
profile = profile_name or get_profile()
session = boto3.Session(profile_name=profile)
return session.client('bedrock-runtime')
def invoke_model_with_retries(model_id, prompt_data, max_retries=5):
"""
Invoke an AWS Bedrock model with robust retry handling.
Args:
model_id: The model identifier
prompt_data: Dictionary with the prompt payload
max_retries: Maximum retry attempts
Returns:
Model response or None on failure
"""
# Create the client
client = create_bedrock_client()
# Create the retriever
retriever = BedrockRetryer(max_retries=max_retries)
# Define the operation to retry
def invoke_operation():
response = client.invoke_model(
modelId=model_id,
body=json.dumps(prompt_data)
)
return json.loads(response['body'].read())
# Execute with retry logic
return retriever.execute(invoke_operation)
Step 6: Handling Streaming Responses
Streaming responses require special error handling:
def invoke_streaming_with_retries(model_id, prompt_data, max_retries=5):
"""
Invoke a streaming AWS Bedrock model with retry handling.
Args:
model_id: The model identifier
prompt_data: Dictionary with the prompt payload
max_retries: Maximum retry attempts
Returns:
Generator yielding response chunks or None on failure
"""
client = create_bedrock_client()
retriever = BedrockRetryer(max_retries=max_retries)
def start_stream():
"""Initiate the streaming response"""
response = client.invoke_model_with_response_stream(
modelId=model_id,
body=json.dumps(prompt_data)
)
return response.get('body')
# Get the stream
stream = retriever.execute(start_stream)
if not stream:
logger.error("Failed to initiate streaming response")
return None
# Process the stream with error handling
try:
for event in stream:
chunk = event.get('chunk')
if chunk:
yield json.loads(chunk.get('bytes').decode())
except Exception as e:
logger.error(f"Error processing stream: {str(e)}")
# Stream errors are not retryable once the stream has started
return None
Step 7: Quota-Aware Request Batching
For high-throughput scenarios, implement quota-aware batching:
class BedrockBatchProcessor:
"""
Process batches of requests with quota awareness.
"""
def __init__(self, model_id, requests_per_minute=60, tokens_per_minute=10000):
"""
Initialize the batch processor.
Args:
model_id: The model identifier
requests_per_minute: RPM quota limit
tokens_per_minute: TPM quota limit
"""
self.model_id = model_id
self.rpm_limit = requests_per_minute
self.tpm_limit = tokens_per_minute
self.client = create_bedrock_client()
self.retriever = BedrockRetryer(max_retries=3)
# Token tracking
self.estimated_tokens_used = 0
self.requests_made = 0
self.window_start_time = time.time()
def process_batch(self, prompts, token_estimator=None):
"""
Process a batch of prompts with quota awareness.
Args:
prompts: List of prompt payloads
token_estimator: Optional function to estimate tokens in a prompt
Returns:
List of results (or None for failed requests)
"""
results = []
# Reset tracking at the start of a batch
self._reset_if_window_expired()
for prompt in prompts:
# Check if we should wait before proceeding
self._apply_rate_limiting(prompt, token_estimator)
# Process the individual request
result = self._process_single_request(prompt)
results.append(result)
return results
def _process_single_request(self, prompt):
"""Process a single request with retries"""
def invoke_operation():
response = self.client.invoke_model(
modelId=self.model_id,
body=json.dumps(prompt)
)
result = json.loads(response['body'].read())
# Update token tracking
if "anthropic" in self.model_id and "usage" in result:
self.estimated_tokens_used += result["usage"]["input_tokens"]
self.estimated_tokens_used += result["usage"]["output_tokens"]
return result
# Track the request
self.requests_made += 1
# Execute with retry logic
return self.retriever.execute(invoke_operation)
def _reset_if_window_expired(self):
"""Reset tracking if the current minute window has expired"""
now = time.time()
seconds_elapsed = now - self.window_start_time
if seconds_elapsed >= 60:
logger.info(f"Resetting quota window. Previous window: {self.requests_made} requests, "
f"~{self.estimated_tokens_used} tokens")
self.window_start_time = now
self.requests_made = 0
self.estimated_tokens_used = 0
def _apply_rate_limiting(self, prompt, token_estimator):
"""Apply rate limiting based on quota usage"""
# Estimate tokens in the prompt
estimated_request_tokens = 0
if token_estimator:
estimated_request_tokens = token_estimator(prompt)
else:
# Rough estimation if no estimator provided
prompt_str = json.dumps(prompt)
estimated_request_tokens = len(prompt_str.split()) * 1.3
# Check if we're approaching RPM limit
rpm_utilization = self.requests_made / self.rpm_limit
# Check if we're approaching TPM limit
tpm_utilization = (self.estimated_tokens_used + estimated_request_tokens) / self.tpm_limit
# Use the higher utilization to determine delay
utilization = max(rpm_utilization, tpm_utilization)
if utilization > 0.9:
# We're at >90% of quota, wait until next window
seconds_in_window = time.time() - self.window_start_time
seconds_to_wait = max(0, 60 - seconds_in_window)
logger.info(f"Approaching quota limit ({utilization:.1%} utilized), "
f"waiting {seconds_to_wait:.1f}s for next window")
if seconds_to_wait > 0:
time.sleep(seconds_to_wait)
self._reset_if_window_expired()
elif utilization > 0.7:
# We're at >70% of quota, add some delay to spread requests
delay = utilization * 0.5 # Up to 0.5s delay at 100% utilization
logger.info(f"Spreading requests ({utilization:.1%} utilized), adding {delay:.2f}s delay")
time.sleep(delay)
Step 8: Error Response Interpretation
Different models return errors in different formats. Let’s handle this:
def interpret_model_error(model_id, response):
"""
Interpret model-specific error responses.
Args:
model_id: The model identifier
response: The error response from the model
Returns:
Dictionary with error details
"""
# Default error info
error_info = {
"error_type": "unknown",
"message": "Unknown error",
"is_retryable": False
}
try:
if "anthropic" in model_id:
if "error" in response:
error = response["error"]
error_info["error_type"] = error.get("type", "unknown")
error_info["message"] = error.get("message", "Unknown error")
# Anthropic-specific error types
if error_info["error_type"] in ["rate_limit_exceeded", "service_unavailable"]:
error_info["is_retryable"] = True
elif "meta" in model_id or "llama" in model_id:
if "error" in response:
error_info["error_type"] = "model_error"
error_info["message"] = response["error"]
# Check for retryable phrases
retryable_phrases = ["rate limit", "capacity", "try again", "temporarily"]
if any(phrase in response["error"].lower() for phrase in retryable_phrases):
error_info["is_retryable"] = True
elif "ai21" in model_id:
if "error" in response:
error_info["error_type"] = response["error"].get("code", "unknown")
error_info["message"] = response["error"].get("message", "Unknown error")
# AI21-specific error types
if error_info["error_type"] in ["throttling", "service_unavailable"]:
error_info["is_retryable"] = True
# Add more model-specific error handling as needed
except Exception as e:
logger.error(f"Error interpreting model response: {str(e)}")
return error_info
Step 9: Comprehensive Error Monitoring
For production applications, implement monitoring:
class BedrockErrorMonitor:
"""
Monitor and track errors for AWS Bedrock operations.
"""
def __init__(self):
"""Initialize the error monitor"""
self.error_counts = {
BedrockErrorClassifier.THROTTLING: 0,
BedrockErrorClassifier.VALIDATION: 0,
BedrockErrorClassifier.SERVICE: 0,
BedrockErrorClassifier.AUTHENTICATION: 0,
"unknown": 0
}
self.total_requests = 0
self.successful_requests = 0
self.failed_requests = 0
self.retry_counts = [] # Number of retries needed for each request
# Time tracking
self.start_time = time.time()
def record_request(self, success, retries=0):
"""Record a request result"""
self.total_requests += 1
if success:
self.successful_requests += 1
else:
self.failed_requests += 1
self.retry_counts.append(retries)
def record_error(self, error):
"""Record an error"""
_, category, _ = BedrockErrorClassifier.classify(error)
self.error_counts[category] = self.error_counts.get(category, 0) + 1
def get_stats(self):
"""Get current statistics"""
elapsed_time = time.time() - self.start_time
minutes = elapsed_time / 60
return {
"total_requests": self.total_requests,
"successful_requests": self.successful_requests,
"failed_requests": self.failed_requests,
"success_rate": (self.successful_requests / self.total_requests * 100) if self.total_requests > 0 else 0,
"error_counts": self.error_counts,
"requests_per_minute": self.total_requests / minutes if minutes > 0 else 0,
"average_retries": sum(self.retry_counts) / len(self.retry_counts) if self.retry_counts else 0,
"elapsed_minutes": minutes
}
def log_stats(self):
"""Log the current statistics"""
stats = self.get_stats()
logger.info(f"=== Bedrock Error Monitor Statistics ===")
logger.info(f"Total requests: {stats['total_requests']}")
logger.info(f"Success rate: {stats['success_rate']:.1f}%")
logger.info(f"Requests per minute: {stats['requests_per_minute']:.1f}")
logger.info(f"Average retries: {stats['average_retries']:.2f}")
logger.info(f"Error counts:")
for category, count in stats['error_counts'].items():
if count > 0:
logger.info(f" - {category}: {count}")
logger.info(f"========================================")
return stats
Error Handling Best Practices
For Throttling Errors
- Implement exponential backoff - Increase delay between retries exponentially
- Add jitter - Randomize delay times to prevent retry storms
- Track throttling frequency - Adjust strategy based on recent throttling history
- Pre-emptively rate limit - Stay under quota limits by self-limiting
- Monitor TPM and RPM - Track both metrics to identify the limiting factor
For Validation Errors
- Validate requests client-side - Check input before sending to the API
- Log validation errors in detail - Include the specific validation issue
- Don’t retry validation errors - These generally won’t succeed on retry
- Add unit tests for request formats - Ensure your request format is valid
For Service Errors
- Implement retries with increasing backoff - Services often recover
- Add circuit breaker - Stop retrying after persistent failures
- Log service errors for diagnosis - Help identify patterns or regional issues
- Consider fallback services - Have a backup plan for critical operations
For Authentication Errors
- Validate credentials early - Test authentication at startup
- Don’t retry auth errors - These generally require human intervention
- Implement secure credential handling - Use AWS best practices
- Log auth errors at high priority - These need immediate attention
Conclusion
Effective error handling is essential for robust AWS Bedrock applications, especially when working within quota limits. By implementing proper error classification, intelligent retry strategies, and quota-aware processing, you can maximize throughput while gracefully handling temporary service limitations.
The approaches demonstrated in this tutorial can be combined with the quota optimization techniques from previous tutorials to create highly resilient AWS Bedrock applications that make the most of available resources.
Next Steps
- Implement a complete error monitoring dashboard
- Integrate these strategies with the throughput optimization techniques
- Explore adaptive quota management based on error patterns
- Develop model-specific error handling strategies