The Complete Guide to Amazon CloudWatch: Comprehensive Monitoring and Observability for AWS
Amazon CloudWatch is AWS’s comprehensive monitoring and observability service that provides data and actionable insights to monitor applications, respond to system-wide performance changes, and optimize resource utilization. This guide covers everything from basic metrics collection to advanced monitoring strategies.
Table of Contents
Introduction to CloudWatch {#introduction}
Amazon CloudWatch is a monitoring service for AWS cloud resources and applications. It provides real-time monitoring, custom metrics, log aggregation, and automated actions based on defined thresholds.
Key Benefits
- Unified Monitoring: Single platform for metrics, logs, and events
- Real-time Insights: Near real-time data collection and visualization
- Automated Actions: Trigger actions based on metric thresholds
- Cost-effective: Pay only for what you use
- Integration: Works seamlessly with all AWS services
Core Components {#core-components}
1. Metrics
Quantitative data points collected over time intervals.
2. Logs
Text-based log data from applications and AWS services.
3. Alarms
Notifications and automated actions based on metric thresholds.
4. Dashboards
Customizable visualization of metrics and logs.
5. Events
System events from AWS services and custom applications.
CloudWatch Metrics {#cloudwatch-metrics}
Basic Metrics Collection
import boto3import timefrom datetime import datetime, timedelta
# Initialize CloudWatch clientcloudwatch = boto3.client('cloudwatch')
def put_custom_metric(metric_name, value, unit='Count', namespace='MyApp'): """ Send custom metric to CloudWatch """ try: response = cloudwatch.put_metric_data( Namespace=namespace, MetricData=[ { 'MetricName': metric_name, 'Value': value, 'Unit': unit, 'Timestamp': datetime.utcnow() } ] ) print(f"Custom metric {metric_name} sent successfully") return response except Exception as e: print(f"Error sending metric: {e}")
# Example usageput_custom_metric('UserLogins', 25, 'Count', 'WebApp')put_custom_metric('ResponseTime', 120.5, 'Milliseconds', 'WebApp')
Advanced Metrics with Dimensions
def put_metric_with_dimensions(metric_name, value, dimensions, namespace='MyApp'): """ Send metric with dimensions for better filtering and aggregation """ try: response = cloudwatch.put_metric_data( Namespace=namespace, MetricData=[ { 'MetricName': metric_name, 'Value': value, 'Unit': 'Count', 'Dimensions': dimensions, 'Timestamp': datetime.utcnow() } ] ) return response except Exception as e: print(f"Error sending metric with dimensions: {e}")
# Example with dimensionsdimensions = [ {'Name': 'Environment', 'Value': 'Production'}, {'Name': 'Region', 'Value': 'us-east-1'}, {'Name': 'Service', 'Value': 'UserService'}]
put_metric_with_dimensions('APIRequests', 100, dimensions)
Retrieving Metrics
def get_metric_statistics(metric_name, namespace, start_time, end_time, period=300): """ Retrieve metric statistics from CloudWatch """ try: response = cloudwatch.get_metric_statistics( Namespace=namespace, MetricName=metric_name, StartTime=start_time, EndTime=end_time, Period=period, Statistics=['Average', 'Maximum', 'Minimum', 'Sum', 'SampleCount'] )
datapoints = sorted(response['Datapoints'], key=lambda x: x['Timestamp'])
for point in datapoints: print(f"Time: {point['Timestamp']}, Average: {point['Average']}")
return datapoints except Exception as e: print(f"Error retrieving metrics: {e}")
# Get metrics for the last hourend_time = datetime.utcnow()start_time = end_time - timedelta(hours=1)
get_metric_statistics('CPUUtilization', 'AWS/EC2', start_time, end_time)
CloudWatch Logs {#cloudwatch-logs}
Log Groups and Streams Management
import boto3import jsonfrom datetime import datetime
logs_client = boto3.client('logs')
def create_log_group(log_group_name): """ Create a CloudWatch log group """ try: response = logs_client.create_log_group( logGroupName=log_group_name, tags={ 'Environment': 'Production', 'Application': 'MyApp' } ) print(f"Log group {log_group_name} created successfully") return response except logs_client.exceptions.ResourceAlreadyExistsException: print(f"Log group {log_group_name} already exists") except Exception as e: print(f"Error creating log group: {e}")
def create_log_stream(log_group_name, log_stream_name): """ Create a log stream within a log group """ try: response = logs_client.create_log_stream( logGroupName=log_group_name, logStreamName=log_stream_name ) print(f"Log stream {log_stream_name} created successfully") return response except Exception as e: print(f"Error creating log stream: {e}")
# Create log infrastructurecreate_log_group('/aws/myapp/production')create_log_stream('/aws/myapp/production', 'web-server-001')
Sending Logs
def send_log_events(log_group_name, log_stream_name, log_messages): """ Send log events to CloudWatch Logs """ try: # Get the sequence token if stream exists try: response = logs_client.describe_log_streams( logGroupName=log_group_name, logStreamNamePrefix=log_stream_name )
sequence_token = None if response['logStreams']: sequence_token = response['logStreams'][0].get('uploadSequenceToken') except: sequence_token = None
# Prepare log events log_events = [] for message in log_messages: log_events.append({ 'timestamp': int(datetime.utcnow().timestamp() * 1000), 'message': json.dumps(message) if isinstance(message, dict) else str(message) })
# Send logs kwargs = { 'logGroupName': log_group_name, 'logStreamName': log_stream_name, 'logEvents': log_events }
if sequence_token: kwargs['sequenceToken'] = sequence_token
response = logs_client.put_log_events(**kwargs) print(f"Log events sent successfully") return response
except Exception as e: print(f"Error sending log events: {e}")
# Send structured logslog_messages = [ { 'level': 'INFO', 'message': 'User login successful', 'user_id': 'user123', 'ip_address': '192.168.1.100', 'timestamp': datetime.utcnow().isoformat() }, { 'level': 'ERROR', 'message': 'Database connection failed', 'error_code': 'DB_CONN_001', 'retry_count': 3 }]
send_log_events('/aws/myapp/production', 'web-server-001', log_messages)
Log Queries with CloudWatch Insights
def run_log_insights_query(log_group_name, query_string, start_time, end_time): """ Run CloudWatch Logs Insights query """ try: # Start query response = logs_client.start_query( logGroupName=log_group_name, startTime=int(start_time.timestamp()), endTime=int(end_time.timestamp()), queryString=query_string )
query_id = response['queryId'] print(f"Query started with ID: {query_id}")
# Poll for results import time while True: result = logs_client.get_query_results(queryId=query_id)
if result['status'] == 'Complete': print("Query completed successfully")
for record in result['results']: print({field['field']: field['value'] for field in record})
return result['results'] elif result['status'] == 'Failed': print("Query failed") break else: print(f"Query status: {result['status']}") time.sleep(2)
except Exception as e: print(f"Error running insights query: {e}")
# Example queriesend_time = datetime.utcnow()start_time = end_time - timedelta(hours=24)
# Query for errorserror_query = """fields @timestamp, @message| filter @message like /ERROR/| sort @timestamp desc| limit 20"""
run_log_insights_query('/aws/myapp/production', error_query, start_time, end_time)
# Query for performance metricsperformance_query = """fields @timestamp, @message| filter @message like /response_time/| stats avg(@message) by bin(5m)"""
run_log_insights_query('/aws/myapp/production', performance_query, start_time, end_time)
CloudWatch Alarms {#cloudwatch-alarms}
Creating Metric Alarms
def create_metric_alarm(alarm_name, metric_name, namespace, threshold, comparison_operator='GreaterThanThreshold'): """ Create a CloudWatch alarm for a specific metric """ try: response = cloudwatch.put_metric_alarm( AlarmName=alarm_name, ComparisonOperator=comparison_operator, EvaluationPeriods=2, MetricName=metric_name, Namespace=namespace, Period=300, Statistic='Average', Threshold=threshold, ActionsEnabled=True, AlarmActions=[ 'arn:aws:sns:us-east-1:123456789012:my-alarm-topic' ], AlarmDescription=f'Alarm for {metric_name}', Unit='Percent' if 'Utilization' in metric_name else 'Count' )
print(f"Alarm {alarm_name} created successfully") return response
except Exception as e: print(f"Error creating alarm: {e}")
# Create CPU utilization alarmcreate_metric_alarm( 'HighCPUUtilization', 'CPUUtilization', 'AWS/EC2', 80.0, 'GreaterThanThreshold')
# Create custom metric alarmcreate_metric_alarm( 'HighErrorRate', 'ErrorCount', 'MyApp', 10.0, 'GreaterThanThreshold')
Composite Alarms
def create_composite_alarm(alarm_name, alarm_rule): """ Create a composite alarm based on multiple conditions """ try: response = cloudwatch.put_composite_alarm( AlarmName=alarm_name, AlarmRule=alarm_rule, ActionsEnabled=True, AlarmActions=[ 'arn:aws:sns:us-east-1:123456789012:critical-alerts' ], AlarmDescription='Composite alarm for critical system health' )
print(f"Composite alarm {alarm_name} created successfully") return response
except Exception as e: print(f"Error creating composite alarm: {e}")
# Create composite alarmalarm_rule = """(ALARM("HighCPUUtilization") OR ALARM("HighMemoryUtilization"))AND ALARM("HighErrorRate")"""
create_composite_alarm('CriticalSystemHealth', alarm_rule)
Anomaly Detection
def create_anomaly_detector(metric_name, namespace, dimensions=None): """ Create anomaly detector for a metric """ try: detector_config = { 'Namespace': namespace, 'MetricName': metric_name, 'Stat': 'Average' }
if dimensions: detector_config['Dimensions'] = dimensions
response = cloudwatch.put_anomaly_detector(**detector_config)
print(f"Anomaly detector created for {metric_name}") return response
except Exception as e: print(f"Error creating anomaly detector: {e}")
def create_anomaly_alarm(alarm_name, metric_name, namespace, dimensions=None): """ Create alarm based on anomaly detection """ try: metric_config = { 'Id': 'm1', 'MetricStat': { 'Metric': { 'Namespace': namespace, 'MetricName': metric_name }, 'Period': 300, 'Stat': 'Average' } }
if dimensions: metric_config['MetricStat']['Metric']['Dimensions'] = dimensions
response = cloudwatch.put_metric_alarm( AlarmName=alarm_name, ComparisonOperator='LessThanLowerOrGreaterThanUpperThreshold', EvaluationPeriods=2, Metrics=[ metric_config, { 'Id': 'ad1', 'AnomalyDetector': { 'Namespace': namespace, 'MetricName': metric_name, 'Stat': 'Average' } } ], ThresholdMetricId='ad1', ActionsEnabled=True, AlarmActions=[ 'arn:aws:sns:us-east-1:123456789012:anomaly-alerts' ], AlarmDescription=f'Anomaly detection alarm for {metric_name}' )
print(f"Anomaly alarm {alarm_name} created successfully") return response
except Exception as e: print(f"Error creating anomaly alarm: {e}")
# Create anomaly detectioncreate_anomaly_detector('ResponseTime', 'MyApp')create_anomaly_alarm('ResponseTimeAnomaly', 'ResponseTime', 'MyApp')
CloudWatch Dashboards {#cloudwatch-dashboards}
Creating Custom Dashboards
import json
def create_dashboard(dashboard_name, dashboard_body): """ Create a CloudWatch dashboard """ try: response = cloudwatch.put_dashboard( DashboardName=dashboard_name, DashboardBody=json.dumps(dashboard_body) )
print(f"Dashboard {dashboard_name} created successfully") return response
except Exception as e: print(f"Error creating dashboard: {e}")
# Define dashboard configurationdashboard_config = { "widgets": [ { "type": "metric", "x": 0, "y": 0, "width": 12, "height": 6, "properties": { "metrics": [ ["AWS/EC2", "CPUUtilization", "InstanceId", "i-1234567890abcdef0"], ["AWS/EC2", "NetworkIn", "InstanceId", "i-1234567890abcdef0"], ["AWS/EC2", "NetworkOut", "InstanceId", "i-1234567890abcdef0"] ], "period": 300, "stat": "Average", "region": "us-east-1", "title": "EC2 Instance Metrics", "yAxis": { "left": { "min": 0, "max": 100 } } } }, { "type": "log", "x": 0, "y": 6, "width": 24, "height": 6, "properties": { "query": "SOURCE '/aws/lambda/my-function'\n| fields @timestamp, @message\n| sort @timestamp desc\n| limit 20", "region": "us-east-1", "title": "Recent Lambda Logs", "view": "table" } }, { "type": "metric", "x": 12, "y": 0, "width": 12, "height": 6, "properties": { "metrics": [ ["MyApp", "UserLogins", "Environment", "Production"], ["MyApp", "ErrorCount", "Environment", "Production"], ["MyApp", "ResponseTime", "Environment", "Production"] ], "period": 300, "stat": "Sum", "region": "us-east-1", "title": "Application Metrics" } } ]}
create_dashboard('MyApplicationDashboard', dashboard_config)
CloudWatch Events/EventBridge {#cloudwatch-events}
Creating Event Rules
import boto3
events_client = boto3.client('events')
def create_event_rule(rule_name, event_pattern, targets): """ Create CloudWatch Events rule """ try: # Create the rule response = events_client.put_rule( Name=rule_name, EventPattern=json.dumps(event_pattern), State='ENABLED', Description=f'Event rule for {rule_name}' )
rule_arn = response['RuleArn'] print(f"Event rule {rule_name} created: {rule_arn}")
# Add targets to the rule events_client.put_targets( Rule=rule_name, Targets=targets )
print(f"Targets added to rule {rule_name}") return response
except Exception as e: print(f"Error creating event rule: {e}")
# Create rule for EC2 instance state changesec2_event_pattern = { "source": ["aws.ec2"], "detail-type": ["EC2 Instance State-change Notification"], "detail": { "state": ["running", "stopped", "terminated"] }}
ec2_targets = [ { 'Id': '1', 'Arn': 'arn:aws:sns:us-east-1:123456789012:ec2-notifications', 'InputTransformer': { 'InputPathsMap': { 'instance': '$.detail.instance-id', 'state': '$.detail.state' }, 'InputTemplate': '{"instance": "<instance>", "state": "<state>"}' } }]
create_event_rule('EC2StateChangeRule', ec2_event_pattern, ec2_targets)
Custom Application Events
def send_custom_event(source, detail_type, detail): """ Send custom event to EventBridge """ try: response = events_client.put_events( Entries=[ { 'Source': source, 'DetailType': detail_type, 'Detail': json.dumps(detail), 'Time': datetime.utcnow() } ] )
print(f"Custom event sent successfully") return response
except Exception as e: print(f"Error sending custom event: {e}")
# Send custom application eventcustom_detail = { 'user_id': 'user123', 'action': 'purchase', 'amount': 99.99, 'product_id': 'prod456', 'timestamp': datetime.utcnow().isoformat()}
send_custom_event('myapp.orders', 'Order Completed', custom_detail)
Advanced Features {#advanced-features}
Cross-Account Monitoring
def setup_cross_account_dashboard(dashboard_name, source_account_widgets): """ Create dashboard with metrics from multiple accounts """ dashboard_config = { "widgets": [] }
for widget in source_account_widgets: # Add account ID to metric specifications for metric in widget['properties']['metrics']: if len(metric) >= 2: # Insert account ID into metric specification metric.insert(0, { 'accountId': widget['account_id'] })
dashboard_config['widgets'].append(widget)
return create_dashboard(dashboard_name, dashboard_config)
# Example cross-account widget configurationcross_account_widgets = [ { 'account_id': '123456789012', 'type': 'metric', 'x': 0, 'y': 0, 'width': 12, 'height': 6, 'properties': { 'metrics': [ ['AWS/EC2', 'CPUUtilization', 'InstanceId', 'i-1234567890abcdef0'] ], 'period': 300, 'stat': 'Average', 'region': 'us-east-1', 'title': 'Cross-Account EC2 Metrics' } }]
setup_cross_account_dashboard('CrossAccountDashboard', cross_account_widgets)
Custom Widgets with Lambda
def create_custom_widget_lambda(): """ Lambda function for custom CloudWatch widget """ lambda_code = '''import jsonimport boto3from datetime import datetime, timedelta
def lambda_handler(event, context): # Extract widget parameters widget_context = json.loads(event.get('widgetContext', '{}')) time_range = widget_context.get('timeRange', {})
# Calculate custom metrics cloudwatch = boto3.client('cloudwatch')
# Example: Calculate cost efficiency metric end_time = datetime.fromisoformat(time_range.get('end', datetime.utcnow().isoformat())) start_time = datetime.fromisoformat(time_range.get('start', (datetime.utcnow() - timedelta(hours=1)).isoformat()))
# Get CPU utilization cpu_response = cloudwatch.get_metric_statistics( Namespace='AWS/EC2', MetricName='CPUUtilization', StartTime=start_time, EndTime=end_time, Period=3600, Statistics=['Average'] )
# Calculate efficiency score avg_cpu = sum(point['Average'] for point in cpu_response['Datapoints']) / len(cpu_response['Datapoints']) if cpu_response['Datapoints'] else 0 efficiency_score = min(avg_cpu / 80 * 100, 100) # Optimal at 80% CPU
# Return widget data return { 'statusCode': 200, 'body': json.dumps({ 'efficiency_score': efficiency_score, 'timestamp': datetime.utcnow().isoformat(), 'period': f"{start_time.isoformat()} to {end_time.isoformat()}" }) } '''
return lambda_code
# Custom widget configurationcustom_widget_config = { "type": "custom", "x": 0, "y": 0, "width": 6, "height": 6, "properties": { "endpoint": "arn:aws:lambda:us-east-1:123456789012:function:custom-widget-function", "title": "Resource Efficiency Score", "updateOn": { "refresh": True, "resize": True, "timeRange": True } }}
Best Practices {#best-practices}
Monitoring Strategy
class CloudWatchMonitoringStrategy: def __init__(self): self.cloudwatch = boto3.client('cloudwatch') self.logs_client = boto3.client('logs')
def implement_layered_monitoring(self): """ Implement comprehensive monitoring strategy """ layers = { 'infrastructure': self.setup_infrastructure_monitoring(), 'application': self.setup_application_monitoring(), 'business': self.setup_business_monitoring(), 'user_experience': self.setup_ux_monitoring() }
return layers
def setup_infrastructure_monitoring(self): """ Monitor infrastructure components """ infrastructure_metrics = [ {'metric': 'CPUUtilization', 'threshold': 80, 'namespace': 'AWS/EC2'}, {'metric': 'MemoryUtilization', 'threshold': 85, 'namespace': 'AWS/EC2'}, {'metric': 'DiskSpaceUtilization', 'threshold': 90, 'namespace': 'AWS/EC2'}, {'metric': 'NetworkPacketsIn', 'threshold': 10000, 'namespace': 'AWS/EC2'}, {'metric': 'DatabaseConnections', 'threshold': 80, 'namespace': 'AWS/RDS'}, {'metric': 'FreeStorageSpace', 'threshold': 2000000000, 'namespace': 'AWS/RDS', 'comparison': 'LessThanThreshold'} ]
for metric in infrastructure_metrics: alarm_name = f"Infrastructure-{metric['metric']}-Alert" comparison = metric.get('comparison', 'GreaterThanThreshold')
self.cloudwatch.put_metric_alarm( AlarmName=alarm_name, ComparisonOperator=comparison, EvaluationPeriods=2, MetricName=metric['metric'], Namespace=metric['namespace'], Period=300, Statistic='Average', Threshold=metric['threshold'], ActionsEnabled=True, AlarmActions=[ 'arn:aws:sns:us-east-1:123456789012:infrastructure-alerts' ], AlarmDescription=f'Infrastructure monitoring for {metric["metric"]}' )
return infrastructure_metrics
def setup_application_monitoring(self): """ Monitor application-level metrics """ app_metrics = [ {'metric': 'ResponseTime', 'threshold': 1000, 'unit': 'Milliseconds'}, {'metric': 'ErrorRate', 'threshold': 5, 'unit': 'Percent'}, {'metric': 'ThroughputTPS', 'threshold': 100, 'unit': 'Count/Second', 'comparison': 'LessThanThreshold'}, {'metric': 'MemoryLeaks', 'threshold': 1, 'unit': 'Count'}, {'metric': 'FailedTransactions', 'threshold': 10, 'unit': 'Count'} ]
for metric in app_metrics: alarm_name = f"Application-{metric['metric']}-Alert" comparison = metric.get('comparison', 'GreaterThanThreshold')
self.cloudwatch.put_metric_alarm( AlarmName=alarm_name, ComparisonOperator=comparison, EvaluationPeriods=3, MetricName=metric['metric'], Namespace='MyApp', Period=60, Statistic='Average', Threshold=metric['threshold'], ActionsEnabled=True, AlarmActions=[ 'arn:aws:sns:us-east-1:123456789012:application-alerts' ], AlarmDescription=f'Application monitoring for {metric["metric"]}' )
return app_metrics
def setup_business_monitoring(self): """ Monitor business KPIs """ business_metrics = [ {'metric': 'DailyActiveUsers', 'threshold': 1000, 'comparison': 'LessThanThreshold'}, {'metric': 'ConversionRate', 'threshold': 2.5, 'unit': 'Percent', 'comparison': 'LessThanThreshold'}, {'metric': 'RevenuePerHour', 'threshold': 500, 'comparison': 'LessThanThreshold'}, {'metric': 'CustomerSatisfactionScore', 'threshold': 4.0, 'comparison': 'LessThanThreshold'}, {'metric': 'ChurnRate', 'threshold': 5, 'unit': 'Percent'} ]
for metric in business_metrics: alarm_name = f"Business-{metric['metric']}-Alert" comparison = metric.get('comparison', 'GreaterThanThreshold')
self.cloudwatch.put_metric_alarm( AlarmName=alarm_name, ComparisonOperator=comparison, EvaluationPeriods=1, MetricName=metric['metric'], Namespace='Business/KPIs', Period=3600, # Hourly evaluation Statistic='Average', Threshold=metric['threshold'], ActionsEnabled=True, AlarmActions=[ 'arn:aws:sns:us-east-1:123456789012:business-alerts' ], AlarmDescription=f'Business KPI monitoring for {metric["metric"]}' )
return business_metrics
# Initialize monitoring strategymonitoring = CloudWatchMonitoringStrategy()monitoring.implement_layered_monitoring()
Efficient Log Management
class LogManagementBestPractices: def __init__(self): self.logs_client = boto3.client('logs')
def setup_log_retention_policies(self, log_groups_config): """ Set appropriate retention policies for different log types """ retention_policies = { 'application_logs': 30, # 30 days for application logs 'access_logs': 90, # 90 days for access logs 'audit_logs': 2555, # 7 years for audit logs 'debug_logs': 7, # 7 days for debug logs 'error_logs': 180, # 6 months for error logs 'security_logs': 1095 # 3 years for security logs }
for log_group, log_type in log_groups_config.items(): if log_type in retention_policies: try: self.logs_client.put_retention_policy( logGroupName=log_group, retentionInDays=retention_policies[log_type] ) print(f"Retention policy set for {log_group}: {retention_policies[log_type]} days") except Exception as e: print(f"Error setting retention policy for {log_group}: {e}")
def setup_log_filters(self, log_group_name): """ Create metric filters for important log patterns """ filters = [ { 'filter_name': 'ErrorFilter', 'filter_pattern': '[timestamp, request_id, level="ERROR", ...]', 'metric_name': 'ErrorCount', 'metric_namespace': 'LogMetrics', 'metric_value': '1' }, { 'filter_name': 'WarningFilter', 'filter_pattern': '[timestamp, request_id, level="WARN", ...]', 'metric_name': 'WarningCount', 'metric_namespace': 'LogMetrics', 'metric_value': '1' }, { 'filter_name': 'ResponseTimeFilter', 'filter_pattern': '[timestamp, request_id, level, method, url, response_time]', 'metric_name': 'ResponseTime', 'metric_namespace': 'LogMetrics', 'metric_value': '$response_time' } ]
for filter_config in filters: try: self.logs_client.put_metric_filter( logGroupName=log_group_name, filterName=filter_config['filter_name'], filterPattern=filter_config['filter_pattern'], metricTransformations=[ { 'metricName': filter_config['metric_name'], 'metricNamespace': filter_config['metric_namespace'], 'metricValue': filter_config['metric_value'] } ] ) print(f"Metric filter {filter_config['filter_name']} created for {log_group_name}") except Exception as e: print(f"Error creating metric filter {filter_config['filter_name']}: {e}")
# Example usagelog_manager = LogManagementBestPractices()
# Set retention policieslog_groups_config = { '/aws/myapp/application': 'application_logs', '/aws/myapp/access': 'access_logs', '/aws/myapp/audit': 'audit_logs', '/aws/myapp/debug': 'debug_logs', '/aws/myapp/errors': 'error_logs', '/aws/myapp/security': 'security_logs'}
log_manager.setup_log_retention_policies(log_groups_config)log_manager.setup_log_filters('/aws/myapp/application')
Cost Optimization {#cost-optimization}
CloudWatch Cost Management
class CloudWatchCostOptimization: def __init__(self): self.cloudwatch = boto3.client('cloudwatch') self.logs_client = boto3.client('logs') self.ce_client = boto3.client('ce') # Cost Explorer
def analyze_cloudwatch_costs(self, start_date, end_date): """ Analyze CloudWatch costs and identify optimization opportunities """ try: response = self.ce_client.get_cost_and_usage( TimePeriod={ 'Start': start_date.strftime('%Y-%m-%d'), 'End': end_date.strftime('%Y-%m-%d') }, Granularity='MONTHLY', Metrics=['BlendedCost'], GroupBy=[ { 'Type': 'DIMENSION', 'Key': 'SERVICE' } ], Filter={ 'Dimensions': { 'Key': 'SERVICE', 'Values': ['Amazon CloudWatch', 'Amazon CloudWatch Logs'] } } )
cost_analysis = {} for result in response['ResultsByTime']: for group in result['Groups']: service = group['Keys'][0] cost = float(group['Metrics']['BlendedCost']['Amount']) cost_analysis[service] = cost_analysis.get(service, 0) + cost
print("CloudWatch Cost Analysis:") for service, cost in cost_analysis.items(): print(f"{service}: ${cost:.2f}")
return cost_analysis
except Exception as e: print(f"Error analyzing costs: {e}") return {}
def optimize_log_groups(self): """ Identify and optimize expensive log groups """ try: paginator = self.logs_client.get_paginator('describe_log_groups')
optimization_recommendations = []
for page in paginator.paginate(): for log_group in page['logGroups']: log_group_name = log_group['logGroupName']
# Check storage size storage_bytes = log_group.get('storedBytes', 0) storage_gb = storage_bytes / (1024**3)
# Check retention policy retention_days = log_group.get('retentionInDays', 'Never expire')
recommendations = []
if storage_gb > 10: # More than 10GB recommendations.append("Large storage size - consider retention policy")
if retention_days == 'Never expire': recommendations.append("No retention policy - data stored indefinitely")
if isinstance(retention_days, int) and retention_days > 365: recommendations.append("Long retention period - review necessity")
if recommendations: optimization_recommendations.append({ 'log_group': log_group_name, 'storage_gb': storage_gb, 'retention_days': retention_days, 'recommendations': recommendations })
# Sort by storage size (largest first) optimization_recommendations.sort(key=lambda x: x['storage_gb'], reverse=True)
print("Log Group Optimization Recommendations:") for rec in optimization_recommendations[:10]: # Top 10 print(f"\nLog Group: {rec['log_group']}") print(f"Storage: {rec['storage_gb']:.2f} GB") print(f"Retention: {rec['retention_days']}") print("Recommendations:") for r in rec['recommendations']: print(f" - {r}")
return optimization_recommendations
except Exception as e: print(f"Error optimizing log groups: {e}") return []
def optimize_metric_usage(self): """ Analyze and optimize custom metric usage """ try: # Get list of custom metrics paginator = self.cloudwatch.get_paginator('list_metrics')
metric_usage = {} total_custom_metrics = 0
for page in paginator.paginate(): for metric in page['Metrics']: namespace = metric['Namespace']
# Focus on custom metrics (non-AWS namespaces) if not namespace.startswith('AWS/'): total_custom_metrics += 1 metric_usage[namespace] = metric_usage.get(namespace, 0) + 1
print(f"Total Custom Metrics: {total_custom_metrics}") print("\nCustom Metrics by Namespace:")
sorted_namespaces = sorted(metric_usage.items(), key=lambda x: x[1], reverse=True) for namespace, count in sorted_namespaces: estimated_cost = count * 0.30 # $0.30 per metric per month print(f"{namespace}: {count} metrics (Est. ${estimated_cost:.2f}/month)")
# Recommendations recommendations = [] if total_custom_metrics > 100: recommendations.append("High number of custom metrics - review necessity")
for namespace, count in sorted_namespaces: if count > 50: recommendations.append(f"Namespace '{namespace}' has many metrics ({count}) - consider consolidation")
if recommendations: print("\nOptimization Recommendations:") for rec in recommendations: print(f" - {rec}")
return { 'total_metrics': total_custom_metrics, 'by_namespace': dict(sorted_namespaces), 'recommendations': recommendations }
except Exception as e: print(f"Error analyzing metric usage: {e}") return {}
# Cost optimization analysiscost_optimizer = CloudWatchCostOptimization()
# Analyze costs for the last 3 monthsend_date = datetime.utcnow()start_date = end_date - timedelta(days=90)
cost_optimizer.analyze_cloudwatch_costs(start_date, end_date)cost_optimizer.optimize_log_groups()cost_optimizer.optimize_metric_usage()
Security Considerations {#security}
IAM Best Practices for CloudWatch
# CloudWatch IAM Policy TemplateVersion: '2012-10-17'Statement: # Read-only access to metrics and dashboards - Effect: Allow Action: - cloudwatch:GetMetricStatistics - cloudwatch:ListMetrics - cloudwatch:GetDashboard - cloudwatch:ListDashboards - cloudwatch:DescribeAlarms - cloudwatch:DescribeAlarmHistory Resource: '*'
# Limited write access for custom metrics - Effect: Allow Action: - cloudwatch:PutMetricData Resource: '*' Condition: StringEquals: 'cloudwatch:namespace': - 'MyApp/*' - 'Custom/*'
# Log access restrictions - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents - logs:DescribeLogGroups - logs:DescribeLogStreams Resource: - 'arn:aws:logs:*:*:log-group:/aws/myapp/*' - 'arn:aws:logs:*:*:log-group:/custom/*'
# Alarm management (limited) - Effect: Allow Action: - cloudwatch:PutMetricAlarm - cloudwatch:DeleteAlarms Resource: '*' Condition: StringLike: 'cloudwatch:AlarmName': 'MyApp-*'
Secure Logging Practices
import hashlibimport hmacimport jsonfrom datetime import datetime
class SecureCloudWatchLogging: def __init__(self, secret_key): self.secret_key = secret_key self.logs_client = boto3.client('logs')
def sanitize_log_data(self, log_data): """ Remove sensitive information from log data """ sensitive_fields = [ 'password', 'api_key', 'token', 'secret', 'ssn', 'credit_card', 'email', 'phone' ]
if isinstance(log_data, dict): sanitized = {} for key, value in log_data.items(): if any(sensitive in key.lower() for sensitive in sensitive_fields): sanitized[key] = '[REDACTED]' elif isinstance(value, dict): sanitized[key] = self.sanitize_log_data(value) elif isinstance(value, list): sanitized[key] = [self.sanitize_log_data(item) if isinstance(item, dict) else item for item in value] else: sanitized[key] = value return sanitized
return log_data
def add_integrity_check(self, log_data): """ Add integrity check to log data """ log_json = json.dumps(log_data, sort_keys=True) signature = hmac.new( self.secret_key.encode('utf-8'), log_json.encode('utf-8'), hashlib.sha256 ).hexdigest()
log_data['_integrity_hash'] = signature return log_data
def secure_log(self, log_group_name, log_stream_name, log_data): """ Send secure log with sanitization and integrity check """ try: # Sanitize data sanitized_data = self.sanitize_log_data(log_data.copy())
# Add metadata sanitized_data['_timestamp'] = datetime.utcnow().isoformat() sanitized_data['_log_level'] = log_data.get('level', 'INFO')
# Add integrity check secure_data = self.add_integrity_check(sanitized_data)
# Send to CloudWatch response = self.logs_client.put_log_events( logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=[ { 'timestamp': int(datetime.utcnow().timestamp() * 1000), 'message': json.dumps(secure_data) } ] )
return response
except Exception as e: print(f"Error in secure logging: {e}")
# Usage examplesecure_logger = SecureCloudWatchLogging('your-secret-key-here')
# Example log with sensitive datalog_entry = { 'user_id': 'user123', 'action': 'login', 'password': 'secret123', # Will be redacted 'api_key': 'abc123', # Will be redacted 'ip_address': '192.168.1.100', 'timestamp': datetime.utcnow().isoformat()}
secure_logger.secure_log('/aws/myapp/secure', 'auth-service', log_entry)
Troubleshooting {#troubleshooting}
Common Issues and Solutions
class CloudWatchTroubleshooter: def __init__(self): self.cloudwatch = boto3.client('cloudwatch') self.logs_client = boto3.client('logs')
def diagnose_metric_issues(self, namespace, metric_name, start_time, end_time): """ Diagnose issues with metric collection """ issues = []
try: # Check if metrics exist metrics = self.cloudwatch.list_metrics( Namespace=namespace, MetricName=metric_name )
if not metrics['Metrics']: issues.append("No metrics found - check metric name and namespace") return issues
# Check for data points response = self.cloudwatch.get_metric_statistics( Namespace=namespace, MetricName=metric_name, StartTime=start_time, EndTime=end_time, Period=300, Statistics=['Sum'] )
if not response['Datapoints']: issues.append("No data points found - check time range and metric publishing")
# Check for gaps in data datapoints = sorted(response['Datapoints'], key=lambda x: x['Timestamp']) for i in range(1, len(datapoints)): time_diff = (datapoints[i]['Timestamp'] - datapoints[i-1]['Timestamp']).total_seconds() if time_diff > 600: # More than 10 minutes gap issues.append(f"Data gap detected between {datapoints[i-1]['Timestamp']} and {datapoints[i]['Timestamp']}")
# Check metric dimensions unique_dimensions = set() for metric in metrics['Metrics']: dimension_set = frozenset((d['Name'], d['Value']) for d in metric.get('Dimensions', [])) unique_dimensions.add(dimension_set)
if len(unique_dimensions) > 10: issues.append(f"High cardinality detected: {len(unique_dimensions)} unique dimension combinations")
except Exception as e: issues.append(f"Error diagnosing metrics: {e}")
return issues
def diagnose_alarm_issues(self, alarm_name): """ Diagnose alarm configuration issues """ issues = []
try: response = self.cloudwatch.describe_alarms( AlarmNames=[alarm_name] )
if not response['MetricAlarms']: issues.append("Alarm not found") return issues
alarm = response['MetricAlarms'][0]
# Check alarm state if alarm['StateValue'] == 'INSUFFICIENT_DATA': issues.append("Alarm has insufficient data - check metric availability")
# Check evaluation periods and period if alarm['EvaluationPeriods'] * alarm['Period'] < 600: issues.append("Evaluation period too short - may cause false alarms")
# Check if actions are enabled if not alarm['ActionsEnabled']: issues.append("Alarm actions are disabled")
# Check if there are actions configured if not alarm.get('AlarmActions') and not alarm.get('OKActions'): issues.append("No actions configured for alarm")
# Get alarm history history = self.cloudwatch.describe_alarm_history( AlarmName=alarm_name, MaxRecords=10 )
# Check for frequent state changes state_changes = [h for h in history['AlarmHistoryItems'] if h['HistoryItemType'] == 'StateUpdate'] if len(state_changes) > 5: issues.append("Alarm changing states frequently - review threshold and evaluation criteria")
except Exception as e: issues.append(f"Error diagnosing alarm: {e}")
return issues
def diagnose_log_issues(self, log_group_name): """ Diagnose log ingestion issues """ issues = []
try: # Check if log group exists response = self.logs_client.describe_log_groups( logGroupNamePrefix=log_group_name )
matching_groups = [lg for lg in response['logGroups'] if lg['logGroupName'] == log_group_name] if not matching_groups: issues.append("Log group does not exist") return issues
log_group = matching_groups[0]
# Check log streams streams_response = self.logs_client.describe_log_streams( logGroupName=log_group_name, orderBy='LastEventTime', descending=True, limit=10 )
if not streams_response['logStreams']: issues.append("No log streams found") else: # Check for recent activity latest_stream = streams_response['logStreams'][0] if 'lastEventTime' in latest_stream: last_event_time = datetime.fromtimestamp(latest_stream['lastEventTime'] / 1000) time_since_last = datetime.utcnow() - last_event_time
if time_since_last.total_seconds() > 3600: # More than 1 hour issues.append(f"No recent log events (last event: {last_event_time})")
# Check for stuck streams stuck_streams = 0 for stream in streams_response['logStreams']: if 'lastEventTime' in stream and 'lastIngestionTime' in stream: event_time = stream['lastEventTime'] ingestion_time = stream['lastIngestionTime'] if ingestion_time - event_time > 300000: # More than 5 minutes delay stuck_streams += 1
if stuck_streams > 0: issues.append(f"{stuck_streams} log streams have ingestion delays")
# Check retention policy if 'retentionInDays' not in log_group: issues.append("No retention policy set - logs will be stored indefinitely")
except Exception as e: issues.append(f"Error diagnosing logs: {e}")
return issues
def run_comprehensive_diagnosis(self, resources): """ Run comprehensive diagnosis on multiple resources """ diagnosis_report = { 'timestamp': datetime.utcnow().isoformat(), 'resources': {} }
for resource in resources: resource_type = resource['type'] resource_name = resource['name']
if resource_type == 'metric': issues = self.diagnose_metric_issues( resource['namespace'], resource['metric_name'], resource['start_time'], resource['end_time'] ) elif resource_type == 'alarm': issues = self.diagnose_alarm_issues(resource_name) elif resource_type == 'log_group': issues = self.diagnose_log_issues(resource_name) else: issues = [f"Unknown resource type: {resource_type}"]
diagnosis_report['resources'][resource_name] = { 'type': resource_type, 'issues': issues, 'status': 'healthy' if not issues else 'issues_detected' }
return diagnosis_report
# Example usagetroubleshooter = CloudWatchTroubleshooter()
# Define resources to diagnoseresources_to_check = [ { 'type': 'metric', 'name': 'CPUUtilization', 'namespace': 'AWS/EC2', 'metric_name': 'CPUUtilization', 'start_time': datetime.utcnow() - timedelta(hours=2), 'end_time': datetime.utcnow() }, { 'type': 'alarm', 'name': 'HighCPUUtilization' }, { 'type': 'log_group', 'name': '/aws/myapp/production' }]
# Run diagnosisdiagnosis = troubleshooter.run_comprehensive_diagnosis(resources_to_check)
print("CloudWatch Diagnosis Report:")print(json.dumps(diagnosis, indent=2, default=str))
Conclusion
Amazon CloudWatch provides comprehensive monitoring and observability capabilities for AWS infrastructure and applications. Key takeaways:
Essential Features:
- Metrics: Collect and monitor quantitative data from AWS services and custom applications
- Logs: Centralized log management with powerful query capabilities
- Alarms: Automated notifications and actions based on thresholds
- Dashboards: Visual monitoring and real-time insights
- Events: React to system changes and custom application events
Best Practices:
- Implement layered monitoring (infrastructure, application, business)
- Use appropriate log retention policies
- Set up meaningful alarms with proper thresholds
- Leverage anomaly detection for dynamic thresholds
- Optimize costs through metric and log management
Advanced Capabilities:
- Cross-account monitoring
- Custom widgets with Lambda
- Composite alarms for complex scenarios
- Log Insights for sophisticated log analysis
- Integration with EventBridge for event-driven architectures
CloudWatch forms the foundation of observability in AWS, enabling proactive monitoring, rapid troubleshooting, and data-driven decision making. Proper implementation ensures system reliability, performance optimization, and cost control while maintaining security and compliance standards.