1812 words
9 minutes
Complete Guide to Amazon SNS: Scalable Notification Service
Anubhav Gain
2024-08-20
Complete Guide to Amazon SNS: Scalable Notification Service
Amazon Simple Notification Service (SNS) is a fully managed messaging service that enables you to decouple microservices, distributed systems, and serverless applications. SNS provides topics for high-throughput, push-based, many-to-many messaging.
Overview
SNS follows the publish-subscribe (pub-sub) messaging pattern. Publishers send messages to topics, and SNS delivers these messages to all subscribed endpoints. This enables you to fanout messages to multiple subscribers including Amazon SQS queues, AWS Lambda functions, HTTP endpoints, email addresses, and mobile push notifications.
Key Benefits
1. High Throughput
- Handle millions of messages per second
- Automatic scaling based on demand
- Global message delivery
- No capacity planning required
2. Multiple Delivery Protocols
- HTTP/HTTPS endpoints
- Email and SMS
- Amazon SQS queues
- AWS Lambda functions
- Mobile push notifications
3. Reliability
- Message durability and delivery guarantees
- Dead letter queues for failed deliveries
- Message filtering and deduplication
- Cross-region message replication
4. Security
- Encryption in transit and at rest
- IAM access control
- Message attribute-based access control
- VPC endpoint support
Core Concepts
1. Topics and Subscriptions
# Basic SNS TopicNotificationTopic: Type: AWS::SNS::Topic Properties: TopicName: MyApplicationNotifications DisplayName: My Application Notifications KmsMasterKeyId: !Ref SNSKMSKey Tags: - Key: Application Value: MyApp - Key: Environment Value: Production
# Email subscriptionEmailSubscription: Type: AWS::SNS::Subscription Properties: TopicArn: !Ref NotificationTopic Protocol: email Endpoint: admin@example.com FilterPolicy: event_type: - error - critical
# Lambda subscriptionLambdaSubscription: Type: AWS::SNS::Subscription Properties: TopicArn: !Ref NotificationTopic Protocol: lambda Endpoint: !GetAtt NotificationProcessor.Arn FilterPolicy: source: - application - system
2. Message Publishing
import boto3import jsonfrom datetime import datetime
sns = boto3.client('sns')
def publish_simple_message(topic_arn, message, subject=None): """ Publish a simple text message """ response = sns.publish( TopicArn=topic_arn, Message=message, Subject=subject ) return response['MessageId']
def publish_structured_message(topic_arn, message_data): """ Publish a structured message with different formats for different protocols """ message = { 'default': json.dumps(message_data), 'email': f"Alert: {message_data.get('title', 'Notification')}\n\n{message_data.get('description', '')}", 'sms': f"Alert: {message_data.get('title', '')[:50]}", 'lambda': json.dumps(message_data), 'sqs': json.dumps(message_data) }
response = sns.publish( TopicArn=topic_arn, Message=json.dumps(message), MessageStructure='json', MessageAttributes={ 'event_type': { 'DataType': 'String', 'StringValue': message_data.get('event_type', 'info') }, 'source': { 'DataType': 'String', 'StringValue': message_data.get('source', 'application') }, 'timestamp': { 'DataType': 'String', 'StringValue': datetime.utcnow().isoformat() } } ) return response['MessageId']
# Example usagetopic_arn = 'arn:aws:sns:us-east-1:123456789012:MyApplicationNotifications'
# Simple messagemessage_id = publish_simple_message( topic_arn, "System maintenance completed successfully", "Maintenance Update")
# Structured messagemessage_id = publish_structured_message(topic_arn, { 'title': 'High CPU Usage Alert', 'description': 'EC2 instance i-1234567890abcdef0 has exceeded 90% CPU usage', 'event_type': 'warning', 'source': 'monitoring', 'instance_id': 'i-1234567890abcdef0', 'cpu_usage': 92.5})
3. Message Filtering
# Subscription with message filteringFilteredSubscription: Type: AWS::SNS::Subscription Properties: TopicArn: !Ref NotificationTopic Protocol: sqs Endpoint: !GetAtt CriticalAlertsQueue.Arn FilterPolicy: event_type: - error - critical source: - application - database severity: - high - critical region: - us-east-1 - us-west-2
# Numeric filteringNumericFilterSubscription: Type: AWS::SNS::Subscription Properties: TopicArn: !Ref MetricsTopic Protocol: lambda Endpoint: !GetAtt AlertProcessor.Arn FilterPolicy: cpu_usage: - ">": 80 memory_usage: - ">=": 90 error_rate: - "<": 5
Advanced Features
1. FIFO Topics
# FIFO SNS Topic for ordered message deliveryFIFOTopic: Type: AWS::SNS::Topic Properties: TopicName: OrderProcessing.fifo FifoTopic: true ContentBasedDeduplication: true KmsMasterKeyId: !Ref SNSKMSKey
# FIFO SQS subscriptionFIFOSubscription: Type: AWS::SNS::Subscription Properties: TopicArn: !Ref FIFOTopic Protocol: sqs Endpoint: !GetAtt OrderQueue.Arn
# Publishing to FIFO topicdef publish_to_fifo_topic(topic_arn, message, message_group_id, deduplication_id=None): """ Publish message to FIFO topic with ordering """ params = { 'TopicArn': topic_arn, 'Message': json.dumps(message), 'MessageGroupId': message_group_id }
# Add deduplication ID if not using content-based deduplication if deduplication_id: params['MessageDeduplicationId'] = deduplication_id
response = sns.publish(**params) return response['MessageId']
# Example: Order processing messagesorder_data = { 'order_id': 'ORD-123456', 'customer_id': 'CUST-789', 'action': 'created', 'timestamp': datetime.utcnow().isoformat()}
message_id = publish_to_fifo_topic( 'arn:aws:sns:us-east-1:123456789012:OrderProcessing.fifo', order_data, message_group_id=f"customer-{order_data['customer_id']}", deduplication_id=f"{order_data['order_id']}-{order_data['action']}")
2. Mobile Push Notifications
# Platform application for mobile pushiOSPlatformApplication: Type: AWS::SNS::PlatformApplication Properties: PlatformApplicationName: MyApp-iOS Platform: APNS Attributes: PlatformCredential: !Ref APNSPrivateKey PlatformPrincipal: !Ref APNSCertificate
AndroidPlatformApplication: Type: AWS::SNS::PlatformApplication Properties: PlatformApplicationName: MyApp-Android Platform: GCM Attributes: PlatformCredential: !Ref FCMServerKey
# Mobile push notification managementclass MobilePushManager: def __init__(self, region='us-east-1'): self.sns = boto3.client('sns', region_name=region)
def create_platform_endpoint(self, platform_application_arn, token, user_data=None): """ Create platform endpoint for device """ attributes = {} if user_data: attributes['CustomUserData'] = json.dumps(user_data)
response = self.sns.create_platform_endpoint( PlatformApplicationArn=platform_application_arn, Token=token, Attributes=attributes ) return response['EndpointArn']
def send_push_notification(self, endpoint_arn, message, badge_count=None, sound=None): """ Send push notification to specific device """ # Construct platform-specific payload ios_payload = { 'aps': { 'alert': message, 'sound': sound or 'default' } }
if badge_count is not None: ios_payload['aps']['badge'] = badge_count
android_payload = { 'data': { 'message': message, 'timestamp': datetime.utcnow().isoformat() } }
# Create message structure message_structure = { 'default': message, 'APNS': json.dumps(ios_payload), 'GCM': json.dumps(android_payload) }
response = self.sns.publish( TargetArn=endpoint_arn, Message=json.dumps(message_structure), MessageStructure='json' )
return response['MessageId']
def send_bulk_notification(self, platform_application_arn, message, tokens): """ Send notification to multiple devices """ message_ids = []
for token in tokens: try: endpoint_arn = self.create_platform_endpoint(platform_application_arn, token) message_id = self.send_push_notification(endpoint_arn, message) message_ids.append(message_id) except Exception as e: print(f"Failed to send notification to token {token}: {e}")
return message_ids
# Usage examplepush_manager = MobilePushManager()
# Send notificationendpoint_arn = push_manager.create_platform_endpoint( 'arn:aws:sns:us-east-1:123456789012:app/APNS/MyApp-iOS', 'device-token-here', user_data={'user_id': '12345', 'preferences': {'notifications': True}})
message_id = push_manager.send_push_notification( endpoint_arn, 'You have a new message!', badge_count=1, sound='notification.wav')
3. Message Delivery Status
# Delivery status loggingDeliveryStatusLogging: Type: AWS::SNS::Topic Properties: TopicName: MyTopicWithLogging DeliveryStatusLogging: - Protocol: lambda SuccessFeedbackRoleArn: !GetAtt SNSLoggingRole.Arn SuccessFeedbackSampleRate: 100 FailureFeedbackRoleArn: !GetAtt SNSLoggingRole.Arn - Protocol: sqs SuccessFeedbackRoleArn: !GetAtt SNSLoggingRole.Arn SuccessFeedbackSampleRate: 50 FailureFeedbackRoleArn: !GetAtt SNSLoggingRole.Arn
SNSLoggingRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: sns.amazonaws.com Action: sts:AssumeRole Policies: - PolicyName: SNSLogsAccess PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: '*'
Integration Patterns
1. SNS to SQS Fanout
# Fanout pattern with multiple SQS queuesPrimaryTopic: Type: AWS::SNS::Topic Properties: TopicName: OrderEvents
# Multiple processing queuesOrderProcessingQueue: Type: AWS::SQS::Queue Properties: QueueName: OrderProcessing MessageRetentionPeriod: 1209600 # 14 days VisibilityTimeoutSeconds: 300
EmailNotificationQueue: Type: AWS::SQS::Queue Properties: QueueName: EmailNotifications MessageRetentionPeriod: 345600 # 4 days
InventoryUpdateQueue: Type: AWS::SQS::Queue Properties: QueueName: InventoryUpdates MessageRetentionPeriod: 1209600
# SubscriptionsOrderProcessingSubscription: Type: AWS::SNS::Subscription Properties: TopicArn: !Ref PrimaryTopic Protocol: sqs Endpoint: !GetAtt OrderProcessingQueue.Arn
EmailSubscription: Type: AWS::SNS::Subscription Properties: TopicArn: !Ref PrimaryTopic Protocol: sqs Endpoint: !GetAtt EmailNotificationQueue.Arn FilterPolicy: event_type: - order_created - order_shipped
InventorySubscription: Type: AWS::SNS::Subscription Properties: TopicArn: !Ref PrimaryTopic Protocol: sqs Endpoint: !GetAtt InventoryUpdateQueue.Arn FilterPolicy: event_type: - order_created - order_cancelled
2. SNS to Lambda Integration
# Lambda function processing SNS messagesimport jsonimport boto3from datetime import datetime
def lambda_handler(event, context): """ Process SNS messages in Lambda """ processed_messages = []
for record in event['Records']: # Extract SNS message sns_message = record['Sns'] message_id = sns_message['MessageId'] topic_arn = sns_message['TopicArn']
try: # Parse message body if sns_message.get('MessageStructure') == 'json': message_body = json.loads(sns_message['Message']) actual_message = json.loads(message_body.get('default', '{}')) else: actual_message = json.loads(sns_message['Message'])
# Extract message attributes attributes = sns_message.get('MessageAttributes', {}) event_type = attributes.get('event_type', {}).get('Value', 'unknown')
# Process based on event type result = process_message(actual_message, event_type, message_id) processed_messages.append(result)
except Exception as e: print(f"Error processing message {message_id}: {str(e)}") # Send to dead letter queue or error handling handle_processing_error(message_id, sns_message, str(e))
return { 'statusCode': 200, 'body': json.dumps({ 'processed_count': len(processed_messages), 'results': processed_messages }) }
def process_message(message, event_type, message_id): """ Process individual message based on type """ if event_type == 'order_created': return process_order_creation(message) elif event_type == 'user_signup': return process_user_signup(message) elif event_type == 'payment_failed': return process_payment_failure(message) else: return process_generic_message(message)
def process_order_creation(message): """ Process new order creation """ order_id = message.get('order_id') customer_id = message.get('customer_id')
# Send welcome email send_order_confirmation(order_id, customer_id)
# Update analytics update_order_analytics(message)
return {'action': 'order_processed', 'order_id': order_id}
3. Cross-Account Topic Access
# Cross-account SNS topic policyCrossAccountTopicPolicy: Type: AWS::SNS::TopicPolicy Properties: Topics: - !Ref SharedTopic PolicyDocument: Version: '2012-10-17' Statement: - Sid: AllowCrossAccountPublish Effect: Allow Principal: AWS: - 'arn:aws:iam::ACCOUNT-B:root' - 'arn:aws:iam::ACCOUNT-C:root' Action: - sns:Publish Resource: !Ref SharedTopic Condition: StringEquals: 'sns:Protocol': - sqs - lambda - Sid: AllowCrossAccountSubscribe Effect: Allow Principal: AWS: - 'arn:aws:iam::ACCOUNT-B:root' Action: - sns:Subscribe - sns:Receive Resource: !Ref SharedTopic
Monitoring and Troubleshooting
1. CloudWatch Metrics
# SNS CloudWatch AlarmsFailedPublishAlarm: Type: AWS::CloudWatch::Alarm Properties: AlarmName: SNS-Failed-Publishes AlarmDescription: SNS messages failed to publish MetricName: NumberOfMessagesFailed Namespace: AWS/SNS Statistic: Sum Period: 300 EvaluationPeriods: 1 Threshold: 1 ComparisonOperator: GreaterThanOrEqualToThreshold Dimensions: - Name: TopicName Value: !GetAtt NotificationTopic.TopicName
DeliveryDelayAlarm: Type: AWS::CloudWatch::Alarm Properties: AlarmName: SNS-High-Delivery-Delay MetricName: PublishSize Namespace: AWS/SNS Statistic: Average Period: 300 EvaluationPeriods: 2 Threshold: 10000 # 10KB ComparisonOperator: GreaterThanThreshold
2. Message Tracking
# SNS message tracking and analyticsimport boto3from datetime import datetime, timedelta
class SNSMonitor: def __init__(self, region='us-east-1'): self.sns = boto3.client('sns', region_name=region) self.cloudwatch = boto3.client('cloudwatch', region_name=region)
def get_topic_metrics(self, topic_arn, start_time=None, end_time=None): """ Get comprehensive metrics for SNS topic """ if not start_time: start_time = datetime.utcnow() - timedelta(hours=24) if not end_time: end_time = datetime.utcnow()
metrics = {}
# Published messages response = self.cloudwatch.get_metric_statistics( Namespace='AWS/SNS', MetricName='NumberOfMessagesPublished', Dimensions=[ {'Name': 'TopicName', 'Value': topic_arn.split(':')[-1]} ], StartTime=start_time, EndTime=end_time, Period=3600, Statistics=['Sum'] )
metrics['published'] = sum(point['Sum'] for point in response['Datapoints'])
# Failed messages response = self.cloudwatch.get_metric_statistics( Namespace='AWS/SNS', MetricName='NumberOfMessagesFailed', Dimensions=[ {'Name': 'TopicName', 'Value': topic_arn.split(':')[-1]} ], StartTime=start_time, EndTime=end_time, Period=3600, Statistics=['Sum'] )
metrics['failed'] = sum(point['Sum'] for point in response['Datapoints'])
# Delivery attempts protocols = ['sqs', 'lambda', 'email', 'http'] metrics['delivery_attempts'] = {}
for protocol in protocols: response = self.cloudwatch.get_metric_statistics( Namespace='AWS/SNS', MetricName='NumberOfNotificationsDelivered', Dimensions=[ {'Name': 'TopicName', 'Value': topic_arn.split(':')[-1]}, {'Name': 'Protocol', 'Value': protocol} ], StartTime=start_time, EndTime=end_time, Period=3600, Statistics=['Sum'] )
metrics['delivery_attempts'][protocol] = sum( point['Sum'] for point in response['Datapoints'] )
return metrics
def analyze_subscription_health(self, topic_arn): """ Analyze health of topic subscriptions """ # Get all subscriptions response = self.sns.list_subscriptions_by_topic(TopicArn=topic_arn) subscriptions = response['Subscriptions']
health_report = []
for subscription in subscriptions: sub_arn = subscription['SubscriptionArn'] protocol = subscription['Protocol'] endpoint = subscription['Endpoint']
# Get subscription attributes try: attrs = self.sns.get_subscription_attributes( SubscriptionArn=sub_arn )['Attributes']
health_report.append({ 'subscription_arn': sub_arn, 'protocol': protocol, 'endpoint': endpoint, 'confirmed': attrs.get('ConfirmationWasAuthenticated', 'false') == 'true', 'filter_policy': attrs.get('FilterPolicy'), 'delivery_policy': attrs.get('DeliveryPolicy') }) except Exception as e: health_report.append({ 'subscription_arn': sub_arn, 'protocol': protocol, 'endpoint': endpoint, 'error': str(e) })
return health_report
Security Best Practices
1. Access Control
# SNS topic with fine-grained access controlSecureTopic: Type: AWS::SNS::Topic Properties: KmsMasterKeyId: !Ref SNSKMSKey
SNSAccessPolicy: Type: AWS::SNS::TopicPolicy Properties: Topics: - !Ref SecureTopic PolicyDocument: Version: '2012-10-17' Statement: - Sid: AllowPublishFromLambda Effect: Allow Principal: AWS: !GetAtt LambdaExecutionRole.Arn Action: - sns:Publish Resource: !Ref SecureTopic Condition: StringEquals: 'aws:SourceAccount': !Ref 'AWS::AccountId' - Sid: AllowSubscribeFromSameAccount Effect: Allow Principal: AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root' Action: - sns:Subscribe - sns:Receive Resource: !Ref SecureTopic - Sid: DenyUnencryptedMessages Effect: Deny Principal: '*' Action: sns:Publish Resource: !Ref SecureTopic Condition: Bool: 'aws:SecureTransport': false
2. Message Encryption
# KMS key for SNS encryptionSNSKMSKey: Type: AWS::KMS::Key Properties: Description: KMS key for SNS encryption KeyPolicy: Statement: - Effect: Allow Principal: AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root" Action: "kms:*" Resource: "*" - Effect: Allow Principal: Service: sns.amazonaws.com Action: - kms:Decrypt - kms:GenerateDataKey Resource: "*"
Cost Optimization
1. Message Batching
def batch_publish_messages(topic_arn, messages, batch_size=10): """ Batch multiple messages to reduce API calls """ message_batches = [messages[i:i + batch_size] for i in range(0, len(messages), batch_size)] results = []
for batch in message_batches: # For SNS, we can't truly batch, but we can optimize by reusing connections batch_results = [] for message in batch: result = sns.publish( TopicArn=topic_arn, Message=json.dumps(message), MessageAttributes={ 'batch_id': { 'DataType': 'String', 'StringValue': str(uuid.uuid4()) } } ) batch_results.append(result['MessageId'])
results.extend(batch_results)
return results
2. Regional Optimization
def get_optimal_region_for_subscribers(subscribers): """ Determine optimal region based on subscriber locations """ region_counts = {}
for subscriber in subscribers: # Parse ARNs or endpoints to determine regions if subscriber.startswith('arn:aws:'): region = subscriber.split(':')[3] region_counts[region] = region_counts.get(region, 0) + 1
# Return region with most subscribers return max(region_counts.items(), key=lambda x: x[1])[0] if region_counts else 'us-east-1'
Best Practices
1. Message Design
- Keep messages small and focused
- Use message attributes for routing
- Implement idempotent message processing
- Include correlation IDs for tracing
2. Subscription Management
- Use filter policies to reduce unnecessary deliveries
- Implement proper error handling and retries
- Monitor subscription health regularly
- Clean up unused subscriptions
3. Security
- Always encrypt sensitive data
- Use IAM roles instead of access keys
- Implement least privilege access
- Enable delivery status logging
4. Performance
- Use appropriate message sizes
- Implement exponential backoff for retries
- Consider FIFO topics for ordered processing
- Monitor and optimize delivery patterns
Additional Resources
Complete Guide to Amazon SNS: Scalable Notification Service
https://mranv.pages.dev/posts/complete-guide-to-amazon-sns/