1812 words
9 minutes
Complete Guide to Amazon SNS: Scalable Notification Service

Complete Guide to Amazon SNS: Scalable Notification Service#

Amazon Simple Notification Service (SNS) is a fully managed messaging service that enables you to decouple microservices, distributed systems, and serverless applications. SNS provides topics for high-throughput, push-based, many-to-many messaging.

Overview#

SNS follows the publish-subscribe (pub-sub) messaging pattern. Publishers send messages to topics, and SNS delivers these messages to all subscribed endpoints. This enables you to fanout messages to multiple subscribers including Amazon SQS queues, AWS Lambda functions, HTTP endpoints, email addresses, and mobile push notifications.

Key Benefits#

1. High Throughput#

  • Handle millions of messages per second
  • Automatic scaling based on demand
  • Global message delivery
  • No capacity planning required

2. Multiple Delivery Protocols#

  • HTTP/HTTPS endpoints
  • Email and SMS
  • Amazon SQS queues
  • AWS Lambda functions
  • Mobile push notifications

3. Reliability#

  • Message durability and delivery guarantees
  • Dead letter queues for failed deliveries
  • Message filtering and deduplication
  • Cross-region message replication

4. Security#

  • Encryption in transit and at rest
  • IAM access control
  • Message attribute-based access control
  • VPC endpoint support

Core Concepts#

1. Topics and Subscriptions#

# Basic SNS Topic
NotificationTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: MyApplicationNotifications
DisplayName: My Application Notifications
KmsMasterKeyId: !Ref SNSKMSKey
Tags:
- Key: Application
Value: MyApp
- Key: Environment
Value: Production
# Email subscription
EmailSubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref NotificationTopic
Protocol: email
Endpoint: admin@example.com
FilterPolicy:
event_type:
- error
- critical
# Lambda subscription
LambdaSubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref NotificationTopic
Protocol: lambda
Endpoint: !GetAtt NotificationProcessor.Arn
FilterPolicy:
source:
- application
- system

2. Message Publishing#

import boto3
import json
from datetime import datetime
sns = boto3.client('sns')
def publish_simple_message(topic_arn, message, subject=None):
"""
Publish a simple text message
"""
response = sns.publish(
TopicArn=topic_arn,
Message=message,
Subject=subject
)
return response['MessageId']
def publish_structured_message(topic_arn, message_data):
"""
Publish a structured message with different formats for different protocols
"""
message = {
'default': json.dumps(message_data),
'email': f"Alert: {message_data.get('title', 'Notification')}\n\n{message_data.get('description', '')}",
'sms': f"Alert: {message_data.get('title', '')[:50]}",
'lambda': json.dumps(message_data),
'sqs': json.dumps(message_data)
}
response = sns.publish(
TopicArn=topic_arn,
Message=json.dumps(message),
MessageStructure='json',
MessageAttributes={
'event_type': {
'DataType': 'String',
'StringValue': message_data.get('event_type', 'info')
},
'source': {
'DataType': 'String',
'StringValue': message_data.get('source', 'application')
},
'timestamp': {
'DataType': 'String',
'StringValue': datetime.utcnow().isoformat()
}
}
)
return response['MessageId']
# Example usage
topic_arn = 'arn:aws:sns:us-east-1:123456789012:MyApplicationNotifications'
# Simple message
message_id = publish_simple_message(
topic_arn,
"System maintenance completed successfully",
"Maintenance Update"
)
# Structured message
message_id = publish_structured_message(topic_arn, {
'title': 'High CPU Usage Alert',
'description': 'EC2 instance i-1234567890abcdef0 has exceeded 90% CPU usage',
'event_type': 'warning',
'source': 'monitoring',
'instance_id': 'i-1234567890abcdef0',
'cpu_usage': 92.5
})

3. Message Filtering#

# Subscription with message filtering
FilteredSubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref NotificationTopic
Protocol: sqs
Endpoint: !GetAtt CriticalAlertsQueue.Arn
FilterPolicy:
event_type:
- error
- critical
source:
- application
- database
severity:
- high
- critical
region:
- us-east-1
- us-west-2
# Numeric filtering
NumericFilterSubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref MetricsTopic
Protocol: lambda
Endpoint: !GetAtt AlertProcessor.Arn
FilterPolicy:
cpu_usage:
- ">": 80
memory_usage:
- ">=": 90
error_rate:
- "<": 5

Advanced Features#

1. FIFO Topics#

# FIFO SNS Topic for ordered message delivery
FIFOTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: OrderProcessing.fifo
FifoTopic: true
ContentBasedDeduplication: true
KmsMasterKeyId: !Ref SNSKMSKey
# FIFO SQS subscription
FIFOSubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref FIFOTopic
Protocol: sqs
Endpoint: !GetAtt OrderQueue.Arn
# Publishing to FIFO topic
def publish_to_fifo_topic(topic_arn, message, message_group_id, deduplication_id=None):
"""
Publish message to FIFO topic with ordering
"""
params = {
'TopicArn': topic_arn,
'Message': json.dumps(message),
'MessageGroupId': message_group_id
}
# Add deduplication ID if not using content-based deduplication
if deduplication_id:
params['MessageDeduplicationId'] = deduplication_id
response = sns.publish(**params)
return response['MessageId']
# Example: Order processing messages
order_data = {
'order_id': 'ORD-123456',
'customer_id': 'CUST-789',
'action': 'created',
'timestamp': datetime.utcnow().isoformat()
}
message_id = publish_to_fifo_topic(
'arn:aws:sns:us-east-1:123456789012:OrderProcessing.fifo',
order_data,
message_group_id=f"customer-{order_data['customer_id']}",
deduplication_id=f"{order_data['order_id']}-{order_data['action']}"
)

2. Mobile Push Notifications#

# Platform application for mobile push
iOSPlatformApplication:
Type: AWS::SNS::PlatformApplication
Properties:
PlatformApplicationName: MyApp-iOS
Platform: APNS
Attributes:
PlatformCredential: !Ref APNSPrivateKey
PlatformPrincipal: !Ref APNSCertificate
AndroidPlatformApplication:
Type: AWS::SNS::PlatformApplication
Properties:
PlatformApplicationName: MyApp-Android
Platform: GCM
Attributes:
PlatformCredential: !Ref FCMServerKey
# Mobile push notification management
class MobilePushManager:
def __init__(self, region='us-east-1'):
self.sns = boto3.client('sns', region_name=region)
def create_platform_endpoint(self, platform_application_arn, token, user_data=None):
"""
Create platform endpoint for device
"""
attributes = {}
if user_data:
attributes['CustomUserData'] = json.dumps(user_data)
response = self.sns.create_platform_endpoint(
PlatformApplicationArn=platform_application_arn,
Token=token,
Attributes=attributes
)
return response['EndpointArn']
def send_push_notification(self, endpoint_arn, message, badge_count=None, sound=None):
"""
Send push notification to specific device
"""
# Construct platform-specific payload
ios_payload = {
'aps': {
'alert': message,
'sound': sound or 'default'
}
}
if badge_count is not None:
ios_payload['aps']['badge'] = badge_count
android_payload = {
'data': {
'message': message,
'timestamp': datetime.utcnow().isoformat()
}
}
# Create message structure
message_structure = {
'default': message,
'APNS': json.dumps(ios_payload),
'GCM': json.dumps(android_payload)
}
response = self.sns.publish(
TargetArn=endpoint_arn,
Message=json.dumps(message_structure),
MessageStructure='json'
)
return response['MessageId']
def send_bulk_notification(self, platform_application_arn, message, tokens):
"""
Send notification to multiple devices
"""
message_ids = []
for token in tokens:
try:
endpoint_arn = self.create_platform_endpoint(platform_application_arn, token)
message_id = self.send_push_notification(endpoint_arn, message)
message_ids.append(message_id)
except Exception as e:
print(f"Failed to send notification to token {token}: {e}")
return message_ids
# Usage example
push_manager = MobilePushManager()
# Send notification
endpoint_arn = push_manager.create_platform_endpoint(
'arn:aws:sns:us-east-1:123456789012:app/APNS/MyApp-iOS',
'device-token-here',
user_data={'user_id': '12345', 'preferences': {'notifications': True}}
)
message_id = push_manager.send_push_notification(
endpoint_arn,
'You have a new message!',
badge_count=1,
sound='notification.wav'
)

3. Message Delivery Status#

# Delivery status logging
DeliveryStatusLogging:
Type: AWS::SNS::Topic
Properties:
TopicName: MyTopicWithLogging
DeliveryStatusLogging:
- Protocol: lambda
SuccessFeedbackRoleArn: !GetAtt SNSLoggingRole.Arn
SuccessFeedbackSampleRate: 100
FailureFeedbackRoleArn: !GetAtt SNSLoggingRole.Arn
- Protocol: sqs
SuccessFeedbackRoleArn: !GetAtt SNSLoggingRole.Arn
SuccessFeedbackSampleRate: 50
FailureFeedbackRoleArn: !GetAtt SNSLoggingRole.Arn
SNSLoggingRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: sns.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: SNSLogsAccess
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: '*'

Integration Patterns#

1. SNS to SQS Fanout#

# Fanout pattern with multiple SQS queues
PrimaryTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: OrderEvents
# Multiple processing queues
OrderProcessingQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: OrderProcessing
MessageRetentionPeriod: 1209600 # 14 days
VisibilityTimeoutSeconds: 300
EmailNotificationQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: EmailNotifications
MessageRetentionPeriod: 345600 # 4 days
InventoryUpdateQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: InventoryUpdates
MessageRetentionPeriod: 1209600
# Subscriptions
OrderProcessingSubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref PrimaryTopic
Protocol: sqs
Endpoint: !GetAtt OrderProcessingQueue.Arn
EmailSubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref PrimaryTopic
Protocol: sqs
Endpoint: !GetAtt EmailNotificationQueue.Arn
FilterPolicy:
event_type:
- order_created
- order_shipped
InventorySubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref PrimaryTopic
Protocol: sqs
Endpoint: !GetAtt InventoryUpdateQueue.Arn
FilterPolicy:
event_type:
- order_created
- order_cancelled

2. SNS to Lambda Integration#

# Lambda function processing SNS messages
import json
import boto3
from datetime import datetime
def lambda_handler(event, context):
"""
Process SNS messages in Lambda
"""
processed_messages = []
for record in event['Records']:
# Extract SNS message
sns_message = record['Sns']
message_id = sns_message['MessageId']
topic_arn = sns_message['TopicArn']
try:
# Parse message body
if sns_message.get('MessageStructure') == 'json':
message_body = json.loads(sns_message['Message'])
actual_message = json.loads(message_body.get('default', '{}'))
else:
actual_message = json.loads(sns_message['Message'])
# Extract message attributes
attributes = sns_message.get('MessageAttributes', {})
event_type = attributes.get('event_type', {}).get('Value', 'unknown')
# Process based on event type
result = process_message(actual_message, event_type, message_id)
processed_messages.append(result)
except Exception as e:
print(f"Error processing message {message_id}: {str(e)}")
# Send to dead letter queue or error handling
handle_processing_error(message_id, sns_message, str(e))
return {
'statusCode': 200,
'body': json.dumps({
'processed_count': len(processed_messages),
'results': processed_messages
})
}
def process_message(message, event_type, message_id):
"""
Process individual message based on type
"""
if event_type == 'order_created':
return process_order_creation(message)
elif event_type == 'user_signup':
return process_user_signup(message)
elif event_type == 'payment_failed':
return process_payment_failure(message)
else:
return process_generic_message(message)
def process_order_creation(message):
"""
Process new order creation
"""
order_id = message.get('order_id')
customer_id = message.get('customer_id')
# Send welcome email
send_order_confirmation(order_id, customer_id)
# Update analytics
update_order_analytics(message)
return {'action': 'order_processed', 'order_id': order_id}

3. Cross-Account Topic Access#

# Cross-account SNS topic policy
CrossAccountTopicPolicy:
Type: AWS::SNS::TopicPolicy
Properties:
Topics:
- !Ref SharedTopic
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: AllowCrossAccountPublish
Effect: Allow
Principal:
AWS:
- 'arn:aws:iam::ACCOUNT-B:root'
- 'arn:aws:iam::ACCOUNT-C:root'
Action:
- sns:Publish
Resource: !Ref SharedTopic
Condition:
StringEquals:
'sns:Protocol':
- sqs
- lambda
- Sid: AllowCrossAccountSubscribe
Effect: Allow
Principal:
AWS:
- 'arn:aws:iam::ACCOUNT-B:root'
Action:
- sns:Subscribe
- sns:Receive
Resource: !Ref SharedTopic

Monitoring and Troubleshooting#

1. CloudWatch Metrics#

# SNS CloudWatch Alarms
FailedPublishAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: SNS-Failed-Publishes
AlarmDescription: SNS messages failed to publish
MetricName: NumberOfMessagesFailed
Namespace: AWS/SNS
Statistic: Sum
Period: 300
EvaluationPeriods: 1
Threshold: 1
ComparisonOperator: GreaterThanOrEqualToThreshold
Dimensions:
- Name: TopicName
Value: !GetAtt NotificationTopic.TopicName
DeliveryDelayAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: SNS-High-Delivery-Delay
MetricName: PublishSize
Namespace: AWS/SNS
Statistic: Average
Period: 300
EvaluationPeriods: 2
Threshold: 10000 # 10KB
ComparisonOperator: GreaterThanThreshold

2. Message Tracking#

# SNS message tracking and analytics
import boto3
from datetime import datetime, timedelta
class SNSMonitor:
def __init__(self, region='us-east-1'):
self.sns = boto3.client('sns', region_name=region)
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
def get_topic_metrics(self, topic_arn, start_time=None, end_time=None):
"""
Get comprehensive metrics for SNS topic
"""
if not start_time:
start_time = datetime.utcnow() - timedelta(hours=24)
if not end_time:
end_time = datetime.utcnow()
metrics = {}
# Published messages
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/SNS',
MetricName='NumberOfMessagesPublished',
Dimensions=[
{'Name': 'TopicName', 'Value': topic_arn.split(':')[-1]}
],
StartTime=start_time,
EndTime=end_time,
Period=3600,
Statistics=['Sum']
)
metrics['published'] = sum(point['Sum'] for point in response['Datapoints'])
# Failed messages
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/SNS',
MetricName='NumberOfMessagesFailed',
Dimensions=[
{'Name': 'TopicName', 'Value': topic_arn.split(':')[-1]}
],
StartTime=start_time,
EndTime=end_time,
Period=3600,
Statistics=['Sum']
)
metrics['failed'] = sum(point['Sum'] for point in response['Datapoints'])
# Delivery attempts
protocols = ['sqs', 'lambda', 'email', 'http']
metrics['delivery_attempts'] = {}
for protocol in protocols:
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/SNS',
MetricName='NumberOfNotificationsDelivered',
Dimensions=[
{'Name': 'TopicName', 'Value': topic_arn.split(':')[-1]},
{'Name': 'Protocol', 'Value': protocol}
],
StartTime=start_time,
EndTime=end_time,
Period=3600,
Statistics=['Sum']
)
metrics['delivery_attempts'][protocol] = sum(
point['Sum'] for point in response['Datapoints']
)
return metrics
def analyze_subscription_health(self, topic_arn):
"""
Analyze health of topic subscriptions
"""
# Get all subscriptions
response = self.sns.list_subscriptions_by_topic(TopicArn=topic_arn)
subscriptions = response['Subscriptions']
health_report = []
for subscription in subscriptions:
sub_arn = subscription['SubscriptionArn']
protocol = subscription['Protocol']
endpoint = subscription['Endpoint']
# Get subscription attributes
try:
attrs = self.sns.get_subscription_attributes(
SubscriptionArn=sub_arn
)['Attributes']
health_report.append({
'subscription_arn': sub_arn,
'protocol': protocol,
'endpoint': endpoint,
'confirmed': attrs.get('ConfirmationWasAuthenticated', 'false') == 'true',
'filter_policy': attrs.get('FilterPolicy'),
'delivery_policy': attrs.get('DeliveryPolicy')
})
except Exception as e:
health_report.append({
'subscription_arn': sub_arn,
'protocol': protocol,
'endpoint': endpoint,
'error': str(e)
})
return health_report

Security Best Practices#

1. Access Control#

# SNS topic with fine-grained access control
SecureTopic:
Type: AWS::SNS::Topic
Properties:
KmsMasterKeyId: !Ref SNSKMSKey
SNSAccessPolicy:
Type: AWS::SNS::TopicPolicy
Properties:
Topics:
- !Ref SecureTopic
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: AllowPublishFromLambda
Effect: Allow
Principal:
AWS: !GetAtt LambdaExecutionRole.Arn
Action:
- sns:Publish
Resource: !Ref SecureTopic
Condition:
StringEquals:
'aws:SourceAccount': !Ref 'AWS::AccountId'
- Sid: AllowSubscribeFromSameAccount
Effect: Allow
Principal:
AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
Action:
- sns:Subscribe
- sns:Receive
Resource: !Ref SecureTopic
- Sid: DenyUnencryptedMessages
Effect: Deny
Principal: '*'
Action: sns:Publish
Resource: !Ref SecureTopic
Condition:
Bool:
'aws:SecureTransport': false

2. Message Encryption#

# KMS key for SNS encryption
SNSKMSKey:
Type: AWS::KMS::Key
Properties:
Description: KMS key for SNS encryption
KeyPolicy:
Statement:
- Effect: Allow
Principal:
AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root"
Action: "kms:*"
Resource: "*"
- Effect: Allow
Principal:
Service: sns.amazonaws.com
Action:
- kms:Decrypt
- kms:GenerateDataKey
Resource: "*"

Cost Optimization#

1. Message Batching#

def batch_publish_messages(topic_arn, messages, batch_size=10):
"""
Batch multiple messages to reduce API calls
"""
message_batches = [messages[i:i + batch_size] for i in range(0, len(messages), batch_size)]
results = []
for batch in message_batches:
# For SNS, we can't truly batch, but we can optimize by reusing connections
batch_results = []
for message in batch:
result = sns.publish(
TopicArn=topic_arn,
Message=json.dumps(message),
MessageAttributes={
'batch_id': {
'DataType': 'String',
'StringValue': str(uuid.uuid4())
}
}
)
batch_results.append(result['MessageId'])
results.extend(batch_results)
return results

2. Regional Optimization#

def get_optimal_region_for_subscribers(subscribers):
"""
Determine optimal region based on subscriber locations
"""
region_counts = {}
for subscriber in subscribers:
# Parse ARNs or endpoints to determine regions
if subscriber.startswith('arn:aws:'):
region = subscriber.split(':')[3]
region_counts[region] = region_counts.get(region, 0) + 1
# Return region with most subscribers
return max(region_counts.items(), key=lambda x: x[1])[0] if region_counts else 'us-east-1'

Best Practices#

1. Message Design#

  • Keep messages small and focused
  • Use message attributes for routing
  • Implement idempotent message processing
  • Include correlation IDs for tracing

2. Subscription Management#

  • Use filter policies to reduce unnecessary deliveries
  • Implement proper error handling and retries
  • Monitor subscription health regularly
  • Clean up unused subscriptions

3. Security#

  • Always encrypt sensitive data
  • Use IAM roles instead of access keys
  • Implement least privilege access
  • Enable delivery status logging

4. Performance#

  • Use appropriate message sizes
  • Implement exponential backoff for retries
  • Consider FIFO topics for ordered processing
  • Monitor and optimize delivery patterns

Additional Resources#

Complete Guide to Amazon SNS: Scalable Notification Service
https://mranv.pages.dev/posts/complete-guide-to-amazon-sns/
Author
Anubhav Gain
Published at
2024-08-20
License
CC BY-NC-SA 4.0