3745 words
19 minutes
The Complete Guide to Amazon CloudWatch: Comprehensive Monitoring and Observability for AWS

The Complete Guide to Amazon CloudWatch: Comprehensive Monitoring and Observability for AWS#

Amazon CloudWatch is AWS’s comprehensive monitoring and observability service that provides data and actionable insights to monitor applications, respond to system-wide performance changes, and optimize resource utilization. This guide covers everything from basic metrics collection to advanced monitoring strategies.

Table of Contents#

  1. Introduction to CloudWatch
  2. Core Components
  3. CloudWatch Metrics
  4. CloudWatch Logs
  5. CloudWatch Alarms
  6. CloudWatch Dashboards
  7. CloudWatch Events/EventBridge
  8. Advanced Features
  9. Best Practices
  10. Cost Optimization
  11. Security Considerations
  12. Troubleshooting

Introduction to CloudWatch {#introduction}#

Amazon CloudWatch is a monitoring service for AWS cloud resources and applications. It provides real-time monitoring, custom metrics, log aggregation, and automated actions based on defined thresholds.

Key Benefits#

  • Unified Monitoring: Single platform for metrics, logs, and events
  • Real-time Insights: Near real-time data collection and visualization
  • Automated Actions: Trigger actions based on metric thresholds
  • Cost-effective: Pay only for what you use
  • Integration: Works seamlessly with all AWS services

Core Components {#core-components}#

1. Metrics#

Quantitative data points collected over time intervals.

2. Logs#

Text-based log data from applications and AWS services.

3. Alarms#

Notifications and automated actions based on metric thresholds.

4. Dashboards#

Customizable visualization of metrics and logs.

5. Events#

System events from AWS services and custom applications.

CloudWatch Metrics {#cloudwatch-metrics}#

Basic Metrics Collection#

import boto3
import time
from datetime import datetime, timedelta
# Initialize CloudWatch client
cloudwatch = boto3.client('cloudwatch')
def put_custom_metric(metric_name, value, unit='Count', namespace='MyApp'):
"""
Send custom metric to CloudWatch
"""
try:
response = cloudwatch.put_metric_data(
Namespace=namespace,
MetricData=[
{
'MetricName': metric_name,
'Value': value,
'Unit': unit,
'Timestamp': datetime.utcnow()
}
]
)
print(f"Custom metric {metric_name} sent successfully")
return response
except Exception as e:
print(f"Error sending metric: {e}")
# Example usage
put_custom_metric('UserLogins', 25, 'Count', 'WebApp')
put_custom_metric('ResponseTime', 120.5, 'Milliseconds', 'WebApp')

Advanced Metrics with Dimensions#

def put_metric_with_dimensions(metric_name, value, dimensions, namespace='MyApp'):
"""
Send metric with dimensions for better filtering and aggregation
"""
try:
response = cloudwatch.put_metric_data(
Namespace=namespace,
MetricData=[
{
'MetricName': metric_name,
'Value': value,
'Unit': 'Count',
'Dimensions': dimensions,
'Timestamp': datetime.utcnow()
}
]
)
return response
except Exception as e:
print(f"Error sending metric with dimensions: {e}")
# Example with dimensions
dimensions = [
{'Name': 'Environment', 'Value': 'Production'},
{'Name': 'Region', 'Value': 'us-east-1'},
{'Name': 'Service', 'Value': 'UserService'}
]
put_metric_with_dimensions('APIRequests', 100, dimensions)

Retrieving Metrics#

def get_metric_statistics(metric_name, namespace, start_time, end_time, period=300):
"""
Retrieve metric statistics from CloudWatch
"""
try:
response = cloudwatch.get_metric_statistics(
Namespace=namespace,
MetricName=metric_name,
StartTime=start_time,
EndTime=end_time,
Period=period,
Statistics=['Average', 'Maximum', 'Minimum', 'Sum', 'SampleCount']
)
datapoints = sorted(response['Datapoints'], key=lambda x: x['Timestamp'])
for point in datapoints:
print(f"Time: {point['Timestamp']}, Average: {point['Average']}")
return datapoints
except Exception as e:
print(f"Error retrieving metrics: {e}")
# Get metrics for the last hour
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=1)
get_metric_statistics('CPUUtilization', 'AWS/EC2', start_time, end_time)

CloudWatch Logs {#cloudwatch-logs}#

Log Groups and Streams Management#

import boto3
import json
from datetime import datetime
logs_client = boto3.client('logs')
def create_log_group(log_group_name):
"""
Create a CloudWatch log group
"""
try:
response = logs_client.create_log_group(
logGroupName=log_group_name,
tags={
'Environment': 'Production',
'Application': 'MyApp'
}
)
print(f"Log group {log_group_name} created successfully")
return response
except logs_client.exceptions.ResourceAlreadyExistsException:
print(f"Log group {log_group_name} already exists")
except Exception as e:
print(f"Error creating log group: {e}")
def create_log_stream(log_group_name, log_stream_name):
"""
Create a log stream within a log group
"""
try:
response = logs_client.create_log_stream(
logGroupName=log_group_name,
logStreamName=log_stream_name
)
print(f"Log stream {log_stream_name} created successfully")
return response
except Exception as e:
print(f"Error creating log stream: {e}")
# Create log infrastructure
create_log_group('/aws/myapp/production')
create_log_stream('/aws/myapp/production', 'web-server-001')

Sending Logs#

def send_log_events(log_group_name, log_stream_name, log_messages):
"""
Send log events to CloudWatch Logs
"""
try:
# Get the sequence token if stream exists
try:
response = logs_client.describe_log_streams(
logGroupName=log_group_name,
logStreamNamePrefix=log_stream_name
)
sequence_token = None
if response['logStreams']:
sequence_token = response['logStreams'][0].get('uploadSequenceToken')
except:
sequence_token = None
# Prepare log events
log_events = []
for message in log_messages:
log_events.append({
'timestamp': int(datetime.utcnow().timestamp() * 1000),
'message': json.dumps(message) if isinstance(message, dict) else str(message)
})
# Send logs
kwargs = {
'logGroupName': log_group_name,
'logStreamName': log_stream_name,
'logEvents': log_events
}
if sequence_token:
kwargs['sequenceToken'] = sequence_token
response = logs_client.put_log_events(**kwargs)
print(f"Log events sent successfully")
return response
except Exception as e:
print(f"Error sending log events: {e}")
# Send structured logs
log_messages = [
{
'level': 'INFO',
'message': 'User login successful',
'user_id': 'user123',
'ip_address': '192.168.1.100',
'timestamp': datetime.utcnow().isoformat()
},
{
'level': 'ERROR',
'message': 'Database connection failed',
'error_code': 'DB_CONN_001',
'retry_count': 3
}
]
send_log_events('/aws/myapp/production', 'web-server-001', log_messages)

Log Queries with CloudWatch Insights#

def run_log_insights_query(log_group_name, query_string, start_time, end_time):
"""
Run CloudWatch Logs Insights query
"""
try:
# Start query
response = logs_client.start_query(
logGroupName=log_group_name,
startTime=int(start_time.timestamp()),
endTime=int(end_time.timestamp()),
queryString=query_string
)
query_id = response['queryId']
print(f"Query started with ID: {query_id}")
# Poll for results
import time
while True:
result = logs_client.get_query_results(queryId=query_id)
if result['status'] == 'Complete':
print("Query completed successfully")
for record in result['results']:
print({field['field']: field['value'] for field in record})
return result['results']
elif result['status'] == 'Failed':
print("Query failed")
break
else:
print(f"Query status: {result['status']}")
time.sleep(2)
except Exception as e:
print(f"Error running insights query: {e}")
# Example queries
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)
# Query for errors
error_query = """
fields @timestamp, @message
| filter @message like /ERROR/
| sort @timestamp desc
| limit 20
"""
run_log_insights_query('/aws/myapp/production', error_query, start_time, end_time)
# Query for performance metrics
performance_query = """
fields @timestamp, @message
| filter @message like /response_time/
| stats avg(@message) by bin(5m)
"""
run_log_insights_query('/aws/myapp/production', performance_query, start_time, end_time)

CloudWatch Alarms {#cloudwatch-alarms}#

Creating Metric Alarms#

def create_metric_alarm(alarm_name, metric_name, namespace, threshold, comparison_operator='GreaterThanThreshold'):
"""
Create a CloudWatch alarm for a specific metric
"""
try:
response = cloudwatch.put_metric_alarm(
AlarmName=alarm_name,
ComparisonOperator=comparison_operator,
EvaluationPeriods=2,
MetricName=metric_name,
Namespace=namespace,
Period=300,
Statistic='Average',
Threshold=threshold,
ActionsEnabled=True,
AlarmActions=[
'arn:aws:sns:us-east-1:123456789012:my-alarm-topic'
],
AlarmDescription=f'Alarm for {metric_name}',
Unit='Percent' if 'Utilization' in metric_name else 'Count'
)
print(f"Alarm {alarm_name} created successfully")
return response
except Exception as e:
print(f"Error creating alarm: {e}")
# Create CPU utilization alarm
create_metric_alarm(
'HighCPUUtilization',
'CPUUtilization',
'AWS/EC2',
80.0,
'GreaterThanThreshold'
)
# Create custom metric alarm
create_metric_alarm(
'HighErrorRate',
'ErrorCount',
'MyApp',
10.0,
'GreaterThanThreshold'
)

Composite Alarms#

def create_composite_alarm(alarm_name, alarm_rule):
"""
Create a composite alarm based on multiple conditions
"""
try:
response = cloudwatch.put_composite_alarm(
AlarmName=alarm_name,
AlarmRule=alarm_rule,
ActionsEnabled=True,
AlarmActions=[
'arn:aws:sns:us-east-1:123456789012:critical-alerts'
],
AlarmDescription='Composite alarm for critical system health'
)
print(f"Composite alarm {alarm_name} created successfully")
return response
except Exception as e:
print(f"Error creating composite alarm: {e}")
# Create composite alarm
alarm_rule = """
(ALARM("HighCPUUtilization") OR ALARM("HighMemoryUtilization"))
AND ALARM("HighErrorRate")
"""
create_composite_alarm('CriticalSystemHealth', alarm_rule)

Anomaly Detection#

def create_anomaly_detector(metric_name, namespace, dimensions=None):
"""
Create anomaly detector for a metric
"""
try:
detector_config = {
'Namespace': namespace,
'MetricName': metric_name,
'Stat': 'Average'
}
if dimensions:
detector_config['Dimensions'] = dimensions
response = cloudwatch.put_anomaly_detector(**detector_config)
print(f"Anomaly detector created for {metric_name}")
return response
except Exception as e:
print(f"Error creating anomaly detector: {e}")
def create_anomaly_alarm(alarm_name, metric_name, namespace, dimensions=None):
"""
Create alarm based on anomaly detection
"""
try:
metric_config = {
'Id': 'm1',
'MetricStat': {
'Metric': {
'Namespace': namespace,
'MetricName': metric_name
},
'Period': 300,
'Stat': 'Average'
}
}
if dimensions:
metric_config['MetricStat']['Metric']['Dimensions'] = dimensions
response = cloudwatch.put_metric_alarm(
AlarmName=alarm_name,
ComparisonOperator='LessThanLowerOrGreaterThanUpperThreshold',
EvaluationPeriods=2,
Metrics=[
metric_config,
{
'Id': 'ad1',
'AnomalyDetector': {
'Namespace': namespace,
'MetricName': metric_name,
'Stat': 'Average'
}
}
],
ThresholdMetricId='ad1',
ActionsEnabled=True,
AlarmActions=[
'arn:aws:sns:us-east-1:123456789012:anomaly-alerts'
],
AlarmDescription=f'Anomaly detection alarm for {metric_name}'
)
print(f"Anomaly alarm {alarm_name} created successfully")
return response
except Exception as e:
print(f"Error creating anomaly alarm: {e}")
# Create anomaly detection
create_anomaly_detector('ResponseTime', 'MyApp')
create_anomaly_alarm('ResponseTimeAnomaly', 'ResponseTime', 'MyApp')

CloudWatch Dashboards {#cloudwatch-dashboards}#

Creating Custom Dashboards#

import json
def create_dashboard(dashboard_name, dashboard_body):
"""
Create a CloudWatch dashboard
"""
try:
response = cloudwatch.put_dashboard(
DashboardName=dashboard_name,
DashboardBody=json.dumps(dashboard_body)
)
print(f"Dashboard {dashboard_name} created successfully")
return response
except Exception as e:
print(f"Error creating dashboard: {e}")
# Define dashboard configuration
dashboard_config = {
"widgets": [
{
"type": "metric",
"x": 0,
"y": 0,
"width": 12,
"height": 6,
"properties": {
"metrics": [
["AWS/EC2", "CPUUtilization", "InstanceId", "i-1234567890abcdef0"],
["AWS/EC2", "NetworkIn", "InstanceId", "i-1234567890abcdef0"],
["AWS/EC2", "NetworkOut", "InstanceId", "i-1234567890abcdef0"]
],
"period": 300,
"stat": "Average",
"region": "us-east-1",
"title": "EC2 Instance Metrics",
"yAxis": {
"left": {
"min": 0,
"max": 100
}
}
}
},
{
"type": "log",
"x": 0,
"y": 6,
"width": 24,
"height": 6,
"properties": {
"query": "SOURCE '/aws/lambda/my-function'\n| fields @timestamp, @message\n| sort @timestamp desc\n| limit 20",
"region": "us-east-1",
"title": "Recent Lambda Logs",
"view": "table"
}
},
{
"type": "metric",
"x": 12,
"y": 0,
"width": 12,
"height": 6,
"properties": {
"metrics": [
["MyApp", "UserLogins", "Environment", "Production"],
["MyApp", "ErrorCount", "Environment", "Production"],
["MyApp", "ResponseTime", "Environment", "Production"]
],
"period": 300,
"stat": "Sum",
"region": "us-east-1",
"title": "Application Metrics"
}
}
]
}
create_dashboard('MyApplicationDashboard', dashboard_config)

CloudWatch Events/EventBridge {#cloudwatch-events}#

Creating Event Rules#

import boto3
events_client = boto3.client('events')
def create_event_rule(rule_name, event_pattern, targets):
"""
Create CloudWatch Events rule
"""
try:
# Create the rule
response = events_client.put_rule(
Name=rule_name,
EventPattern=json.dumps(event_pattern),
State='ENABLED',
Description=f'Event rule for {rule_name}'
)
rule_arn = response['RuleArn']
print(f"Event rule {rule_name} created: {rule_arn}")
# Add targets to the rule
events_client.put_targets(
Rule=rule_name,
Targets=targets
)
print(f"Targets added to rule {rule_name}")
return response
except Exception as e:
print(f"Error creating event rule: {e}")
# Create rule for EC2 instance state changes
ec2_event_pattern = {
"source": ["aws.ec2"],
"detail-type": ["EC2 Instance State-change Notification"],
"detail": {
"state": ["running", "stopped", "terminated"]
}
}
ec2_targets = [
{
'Id': '1',
'Arn': 'arn:aws:sns:us-east-1:123456789012:ec2-notifications',
'InputTransformer': {
'InputPathsMap': {
'instance': '$.detail.instance-id',
'state': '$.detail.state'
},
'InputTemplate': '{"instance": "<instance>", "state": "<state>"}'
}
}
]
create_event_rule('EC2StateChangeRule', ec2_event_pattern, ec2_targets)

Custom Application Events#

def send_custom_event(source, detail_type, detail):
"""
Send custom event to EventBridge
"""
try:
response = events_client.put_events(
Entries=[
{
'Source': source,
'DetailType': detail_type,
'Detail': json.dumps(detail),
'Time': datetime.utcnow()
}
]
)
print(f"Custom event sent successfully")
return response
except Exception as e:
print(f"Error sending custom event: {e}")
# Send custom application event
custom_detail = {
'user_id': 'user123',
'action': 'purchase',
'amount': 99.99,
'product_id': 'prod456',
'timestamp': datetime.utcnow().isoformat()
}
send_custom_event('myapp.orders', 'Order Completed', custom_detail)

Advanced Features {#advanced-features}#

Cross-Account Monitoring#

def setup_cross_account_dashboard(dashboard_name, source_account_widgets):
"""
Create dashboard with metrics from multiple accounts
"""
dashboard_config = {
"widgets": []
}
for widget in source_account_widgets:
# Add account ID to metric specifications
for metric in widget['properties']['metrics']:
if len(metric) >= 2:
# Insert account ID into metric specification
metric.insert(0, {
'accountId': widget['account_id']
})
dashboard_config['widgets'].append(widget)
return create_dashboard(dashboard_name, dashboard_config)
# Example cross-account widget configuration
cross_account_widgets = [
{
'account_id': '123456789012',
'type': 'metric',
'x': 0,
'y': 0,
'width': 12,
'height': 6,
'properties': {
'metrics': [
['AWS/EC2', 'CPUUtilization', 'InstanceId', 'i-1234567890abcdef0']
],
'period': 300,
'stat': 'Average',
'region': 'us-east-1',
'title': 'Cross-Account EC2 Metrics'
}
}
]
setup_cross_account_dashboard('CrossAccountDashboard', cross_account_widgets)

Custom Widgets with Lambda#

def create_custom_widget_lambda():
"""
Lambda function for custom CloudWatch widget
"""
lambda_code = '''
import json
import boto3
from datetime import datetime, timedelta
def lambda_handler(event, context):
# Extract widget parameters
widget_context = json.loads(event.get('widgetContext', '{}'))
time_range = widget_context.get('timeRange', {})
# Calculate custom metrics
cloudwatch = boto3.client('cloudwatch')
# Example: Calculate cost efficiency metric
end_time = datetime.fromisoformat(time_range.get('end', datetime.utcnow().isoformat()))
start_time = datetime.fromisoformat(time_range.get('start', (datetime.utcnow() - timedelta(hours=1)).isoformat()))
# Get CPU utilization
cpu_response = cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
StartTime=start_time,
EndTime=end_time,
Period=3600,
Statistics=['Average']
)
# Calculate efficiency score
avg_cpu = sum(point['Average'] for point in cpu_response['Datapoints']) / len(cpu_response['Datapoints']) if cpu_response['Datapoints'] else 0
efficiency_score = min(avg_cpu / 80 * 100, 100) # Optimal at 80% CPU
# Return widget data
return {
'statusCode': 200,
'body': json.dumps({
'efficiency_score': efficiency_score,
'timestamp': datetime.utcnow().isoformat(),
'period': f"{start_time.isoformat()} to {end_time.isoformat()}"
})
}
'''
return lambda_code
# Custom widget configuration
custom_widget_config = {
"type": "custom",
"x": 0,
"y": 0,
"width": 6,
"height": 6,
"properties": {
"endpoint": "arn:aws:lambda:us-east-1:123456789012:function:custom-widget-function",
"title": "Resource Efficiency Score",
"updateOn": {
"refresh": True,
"resize": True,
"timeRange": True
}
}
}

Best Practices {#best-practices}#

Monitoring Strategy#

class CloudWatchMonitoringStrategy:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
self.logs_client = boto3.client('logs')
def implement_layered_monitoring(self):
"""
Implement comprehensive monitoring strategy
"""
layers = {
'infrastructure': self.setup_infrastructure_monitoring(),
'application': self.setup_application_monitoring(),
'business': self.setup_business_monitoring(),
'user_experience': self.setup_ux_monitoring()
}
return layers
def setup_infrastructure_monitoring(self):
"""
Monitor infrastructure components
"""
infrastructure_metrics = [
{'metric': 'CPUUtilization', 'threshold': 80, 'namespace': 'AWS/EC2'},
{'metric': 'MemoryUtilization', 'threshold': 85, 'namespace': 'AWS/EC2'},
{'metric': 'DiskSpaceUtilization', 'threshold': 90, 'namespace': 'AWS/EC2'},
{'metric': 'NetworkPacketsIn', 'threshold': 10000, 'namespace': 'AWS/EC2'},
{'metric': 'DatabaseConnections', 'threshold': 80, 'namespace': 'AWS/RDS'},
{'metric': 'FreeStorageSpace', 'threshold': 2000000000, 'namespace': 'AWS/RDS', 'comparison': 'LessThanThreshold'}
]
for metric in infrastructure_metrics:
alarm_name = f"Infrastructure-{metric['metric']}-Alert"
comparison = metric.get('comparison', 'GreaterThanThreshold')
self.cloudwatch.put_metric_alarm(
AlarmName=alarm_name,
ComparisonOperator=comparison,
EvaluationPeriods=2,
MetricName=metric['metric'],
Namespace=metric['namespace'],
Period=300,
Statistic='Average',
Threshold=metric['threshold'],
ActionsEnabled=True,
AlarmActions=[
'arn:aws:sns:us-east-1:123456789012:infrastructure-alerts'
],
AlarmDescription=f'Infrastructure monitoring for {metric["metric"]}'
)
return infrastructure_metrics
def setup_application_monitoring(self):
"""
Monitor application-level metrics
"""
app_metrics = [
{'metric': 'ResponseTime', 'threshold': 1000, 'unit': 'Milliseconds'},
{'metric': 'ErrorRate', 'threshold': 5, 'unit': 'Percent'},
{'metric': 'ThroughputTPS', 'threshold': 100, 'unit': 'Count/Second', 'comparison': 'LessThanThreshold'},
{'metric': 'MemoryLeaks', 'threshold': 1, 'unit': 'Count'},
{'metric': 'FailedTransactions', 'threshold': 10, 'unit': 'Count'}
]
for metric in app_metrics:
alarm_name = f"Application-{metric['metric']}-Alert"
comparison = metric.get('comparison', 'GreaterThanThreshold')
self.cloudwatch.put_metric_alarm(
AlarmName=alarm_name,
ComparisonOperator=comparison,
EvaluationPeriods=3,
MetricName=metric['metric'],
Namespace='MyApp',
Period=60,
Statistic='Average',
Threshold=metric['threshold'],
ActionsEnabled=True,
AlarmActions=[
'arn:aws:sns:us-east-1:123456789012:application-alerts'
],
AlarmDescription=f'Application monitoring for {metric["metric"]}'
)
return app_metrics
def setup_business_monitoring(self):
"""
Monitor business KPIs
"""
business_metrics = [
{'metric': 'DailyActiveUsers', 'threshold': 1000, 'comparison': 'LessThanThreshold'},
{'metric': 'ConversionRate', 'threshold': 2.5, 'unit': 'Percent', 'comparison': 'LessThanThreshold'},
{'metric': 'RevenuePerHour', 'threshold': 500, 'comparison': 'LessThanThreshold'},
{'metric': 'CustomerSatisfactionScore', 'threshold': 4.0, 'comparison': 'LessThanThreshold'},
{'metric': 'ChurnRate', 'threshold': 5, 'unit': 'Percent'}
]
for metric in business_metrics:
alarm_name = f"Business-{metric['metric']}-Alert"
comparison = metric.get('comparison', 'GreaterThanThreshold')
self.cloudwatch.put_metric_alarm(
AlarmName=alarm_name,
ComparisonOperator=comparison,
EvaluationPeriods=1,
MetricName=metric['metric'],
Namespace='Business/KPIs',
Period=3600, # Hourly evaluation
Statistic='Average',
Threshold=metric['threshold'],
ActionsEnabled=True,
AlarmActions=[
'arn:aws:sns:us-east-1:123456789012:business-alerts'
],
AlarmDescription=f'Business KPI monitoring for {metric["metric"]}'
)
return business_metrics
# Initialize monitoring strategy
monitoring = CloudWatchMonitoringStrategy()
monitoring.implement_layered_monitoring()

Efficient Log Management#

class LogManagementBestPractices:
def __init__(self):
self.logs_client = boto3.client('logs')
def setup_log_retention_policies(self, log_groups_config):
"""
Set appropriate retention policies for different log types
"""
retention_policies = {
'application_logs': 30, # 30 days for application logs
'access_logs': 90, # 90 days for access logs
'audit_logs': 2555, # 7 years for audit logs
'debug_logs': 7, # 7 days for debug logs
'error_logs': 180, # 6 months for error logs
'security_logs': 1095 # 3 years for security logs
}
for log_group, log_type in log_groups_config.items():
if log_type in retention_policies:
try:
self.logs_client.put_retention_policy(
logGroupName=log_group,
retentionInDays=retention_policies[log_type]
)
print(f"Retention policy set for {log_group}: {retention_policies[log_type]} days")
except Exception as e:
print(f"Error setting retention policy for {log_group}: {e}")
def setup_log_filters(self, log_group_name):
"""
Create metric filters for important log patterns
"""
filters = [
{
'filter_name': 'ErrorFilter',
'filter_pattern': '[timestamp, request_id, level="ERROR", ...]',
'metric_name': 'ErrorCount',
'metric_namespace': 'LogMetrics',
'metric_value': '1'
},
{
'filter_name': 'WarningFilter',
'filter_pattern': '[timestamp, request_id, level="WARN", ...]',
'metric_name': 'WarningCount',
'metric_namespace': 'LogMetrics',
'metric_value': '1'
},
{
'filter_name': 'ResponseTimeFilter',
'filter_pattern': '[timestamp, request_id, level, method, url, response_time]',
'metric_name': 'ResponseTime',
'metric_namespace': 'LogMetrics',
'metric_value': '$response_time'
}
]
for filter_config in filters:
try:
self.logs_client.put_metric_filter(
logGroupName=log_group_name,
filterName=filter_config['filter_name'],
filterPattern=filter_config['filter_pattern'],
metricTransformations=[
{
'metricName': filter_config['metric_name'],
'metricNamespace': filter_config['metric_namespace'],
'metricValue': filter_config['metric_value']
}
]
)
print(f"Metric filter {filter_config['filter_name']} created for {log_group_name}")
except Exception as e:
print(f"Error creating metric filter {filter_config['filter_name']}: {e}")
# Example usage
log_manager = LogManagementBestPractices()
# Set retention policies
log_groups_config = {
'/aws/myapp/application': 'application_logs',
'/aws/myapp/access': 'access_logs',
'/aws/myapp/audit': 'audit_logs',
'/aws/myapp/debug': 'debug_logs',
'/aws/myapp/errors': 'error_logs',
'/aws/myapp/security': 'security_logs'
}
log_manager.setup_log_retention_policies(log_groups_config)
log_manager.setup_log_filters('/aws/myapp/application')

Cost Optimization {#cost-optimization}#

CloudWatch Cost Management#

class CloudWatchCostOptimization:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
self.logs_client = boto3.client('logs')
self.ce_client = boto3.client('ce') # Cost Explorer
def analyze_cloudwatch_costs(self, start_date, end_date):
"""
Analyze CloudWatch costs and identify optimization opportunities
"""
try:
response = self.ce_client.get_cost_and_usage(
TimePeriod={
'Start': start_date.strftime('%Y-%m-%d'),
'End': end_date.strftime('%Y-%m-%d')
},
Granularity='MONTHLY',
Metrics=['BlendedCost'],
GroupBy=[
{
'Type': 'DIMENSION',
'Key': 'SERVICE'
}
],
Filter={
'Dimensions': {
'Key': 'SERVICE',
'Values': ['Amazon CloudWatch', 'Amazon CloudWatch Logs']
}
}
)
cost_analysis = {}
for result in response['ResultsByTime']:
for group in result['Groups']:
service = group['Keys'][0]
cost = float(group['Metrics']['BlendedCost']['Amount'])
cost_analysis[service] = cost_analysis.get(service, 0) + cost
print("CloudWatch Cost Analysis:")
for service, cost in cost_analysis.items():
print(f"{service}: ${cost:.2f}")
return cost_analysis
except Exception as e:
print(f"Error analyzing costs: {e}")
return {}
def optimize_log_groups(self):
"""
Identify and optimize expensive log groups
"""
try:
paginator = self.logs_client.get_paginator('describe_log_groups')
optimization_recommendations = []
for page in paginator.paginate():
for log_group in page['logGroups']:
log_group_name = log_group['logGroupName']
# Check storage size
storage_bytes = log_group.get('storedBytes', 0)
storage_gb = storage_bytes / (1024**3)
# Check retention policy
retention_days = log_group.get('retentionInDays', 'Never expire')
recommendations = []
if storage_gb > 10: # More than 10GB
recommendations.append("Large storage size - consider retention policy")
if retention_days == 'Never expire':
recommendations.append("No retention policy - data stored indefinitely")
if isinstance(retention_days, int) and retention_days > 365:
recommendations.append("Long retention period - review necessity")
if recommendations:
optimization_recommendations.append({
'log_group': log_group_name,
'storage_gb': storage_gb,
'retention_days': retention_days,
'recommendations': recommendations
})
# Sort by storage size (largest first)
optimization_recommendations.sort(key=lambda x: x['storage_gb'], reverse=True)
print("Log Group Optimization Recommendations:")
for rec in optimization_recommendations[:10]: # Top 10
print(f"\nLog Group: {rec['log_group']}")
print(f"Storage: {rec['storage_gb']:.2f} GB")
print(f"Retention: {rec['retention_days']}")
print("Recommendations:")
for r in rec['recommendations']:
print(f" - {r}")
return optimization_recommendations
except Exception as e:
print(f"Error optimizing log groups: {e}")
return []
def optimize_metric_usage(self):
"""
Analyze and optimize custom metric usage
"""
try:
# Get list of custom metrics
paginator = self.cloudwatch.get_paginator('list_metrics')
metric_usage = {}
total_custom_metrics = 0
for page in paginator.paginate():
for metric in page['Metrics']:
namespace = metric['Namespace']
# Focus on custom metrics (non-AWS namespaces)
if not namespace.startswith('AWS/'):
total_custom_metrics += 1
metric_usage[namespace] = metric_usage.get(namespace, 0) + 1
print(f"Total Custom Metrics: {total_custom_metrics}")
print("\nCustom Metrics by Namespace:")
sorted_namespaces = sorted(metric_usage.items(), key=lambda x: x[1], reverse=True)
for namespace, count in sorted_namespaces:
estimated_cost = count * 0.30 # $0.30 per metric per month
print(f"{namespace}: {count} metrics (Est. ${estimated_cost:.2f}/month)")
# Recommendations
recommendations = []
if total_custom_metrics > 100:
recommendations.append("High number of custom metrics - review necessity")
for namespace, count in sorted_namespaces:
if count > 50:
recommendations.append(f"Namespace '{namespace}' has many metrics ({count}) - consider consolidation")
if recommendations:
print("\nOptimization Recommendations:")
for rec in recommendations:
print(f" - {rec}")
return {
'total_metrics': total_custom_metrics,
'by_namespace': dict(sorted_namespaces),
'recommendations': recommendations
}
except Exception as e:
print(f"Error analyzing metric usage: {e}")
return {}
# Cost optimization analysis
cost_optimizer = CloudWatchCostOptimization()
# Analyze costs for the last 3 months
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=90)
cost_optimizer.analyze_cloudwatch_costs(start_date, end_date)
cost_optimizer.optimize_log_groups()
cost_optimizer.optimize_metric_usage()

Security Considerations {#security}#

IAM Best Practices for CloudWatch#

# CloudWatch IAM Policy Template
Version: '2012-10-17'
Statement:
# Read-only access to metrics and dashboards
- Effect: Allow
Action:
- cloudwatch:GetMetricStatistics
- cloudwatch:ListMetrics
- cloudwatch:GetDashboard
- cloudwatch:ListDashboards
- cloudwatch:DescribeAlarms
- cloudwatch:DescribeAlarmHistory
Resource: '*'
# Limited write access for custom metrics
- Effect: Allow
Action:
- cloudwatch:PutMetricData
Resource: '*'
Condition:
StringEquals:
'cloudwatch:namespace':
- 'MyApp/*'
- 'Custom/*'
# Log access restrictions
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
- logs:DescribeLogGroups
- logs:DescribeLogStreams
Resource:
- 'arn:aws:logs:*:*:log-group:/aws/myapp/*'
- 'arn:aws:logs:*:*:log-group:/custom/*'
# Alarm management (limited)
- Effect: Allow
Action:
- cloudwatch:PutMetricAlarm
- cloudwatch:DeleteAlarms
Resource: '*'
Condition:
StringLike:
'cloudwatch:AlarmName': 'MyApp-*'

Secure Logging Practices#

import hashlib
import hmac
import json
from datetime import datetime
class SecureCloudWatchLogging:
def __init__(self, secret_key):
self.secret_key = secret_key
self.logs_client = boto3.client('logs')
def sanitize_log_data(self, log_data):
"""
Remove sensitive information from log data
"""
sensitive_fields = [
'password', 'api_key', 'token', 'secret',
'ssn', 'credit_card', 'email', 'phone'
]
if isinstance(log_data, dict):
sanitized = {}
for key, value in log_data.items():
if any(sensitive in key.lower() for sensitive in sensitive_fields):
sanitized[key] = '[REDACTED]'
elif isinstance(value, dict):
sanitized[key] = self.sanitize_log_data(value)
elif isinstance(value, list):
sanitized[key] = [self.sanitize_log_data(item) if isinstance(item, dict) else item for item in value]
else:
sanitized[key] = value
return sanitized
return log_data
def add_integrity_check(self, log_data):
"""
Add integrity check to log data
"""
log_json = json.dumps(log_data, sort_keys=True)
signature = hmac.new(
self.secret_key.encode('utf-8'),
log_json.encode('utf-8'),
hashlib.sha256
).hexdigest()
log_data['_integrity_hash'] = signature
return log_data
def secure_log(self, log_group_name, log_stream_name, log_data):
"""
Send secure log with sanitization and integrity check
"""
try:
# Sanitize data
sanitized_data = self.sanitize_log_data(log_data.copy())
# Add metadata
sanitized_data['_timestamp'] = datetime.utcnow().isoformat()
sanitized_data['_log_level'] = log_data.get('level', 'INFO')
# Add integrity check
secure_data = self.add_integrity_check(sanitized_data)
# Send to CloudWatch
response = self.logs_client.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[
{
'timestamp': int(datetime.utcnow().timestamp() * 1000),
'message': json.dumps(secure_data)
}
]
)
return response
except Exception as e:
print(f"Error in secure logging: {e}")
# Usage example
secure_logger = SecureCloudWatchLogging('your-secret-key-here')
# Example log with sensitive data
log_entry = {
'user_id': 'user123',
'action': 'login',
'password': 'secret123', # Will be redacted
'api_key': 'abc123', # Will be redacted
'ip_address': '192.168.1.100',
'timestamp': datetime.utcnow().isoformat()
}
secure_logger.secure_log('/aws/myapp/secure', 'auth-service', log_entry)

Troubleshooting {#troubleshooting}#

Common Issues and Solutions#

class CloudWatchTroubleshooter:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
self.logs_client = boto3.client('logs')
def diagnose_metric_issues(self, namespace, metric_name, start_time, end_time):
"""
Diagnose issues with metric collection
"""
issues = []
try:
# Check if metrics exist
metrics = self.cloudwatch.list_metrics(
Namespace=namespace,
MetricName=metric_name
)
if not metrics['Metrics']:
issues.append("No metrics found - check metric name and namespace")
return issues
# Check for data points
response = self.cloudwatch.get_metric_statistics(
Namespace=namespace,
MetricName=metric_name,
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=['Sum']
)
if not response['Datapoints']:
issues.append("No data points found - check time range and metric publishing")
# Check for gaps in data
datapoints = sorted(response['Datapoints'], key=lambda x: x['Timestamp'])
for i in range(1, len(datapoints)):
time_diff = (datapoints[i]['Timestamp'] - datapoints[i-1]['Timestamp']).total_seconds()
if time_diff > 600: # More than 10 minutes gap
issues.append(f"Data gap detected between {datapoints[i-1]['Timestamp']} and {datapoints[i]['Timestamp']}")
# Check metric dimensions
unique_dimensions = set()
for metric in metrics['Metrics']:
dimension_set = frozenset((d['Name'], d['Value']) for d in metric.get('Dimensions', []))
unique_dimensions.add(dimension_set)
if len(unique_dimensions) > 10:
issues.append(f"High cardinality detected: {len(unique_dimensions)} unique dimension combinations")
except Exception as e:
issues.append(f"Error diagnosing metrics: {e}")
return issues
def diagnose_alarm_issues(self, alarm_name):
"""
Diagnose alarm configuration issues
"""
issues = []
try:
response = self.cloudwatch.describe_alarms(
AlarmNames=[alarm_name]
)
if not response['MetricAlarms']:
issues.append("Alarm not found")
return issues
alarm = response['MetricAlarms'][0]
# Check alarm state
if alarm['StateValue'] == 'INSUFFICIENT_DATA':
issues.append("Alarm has insufficient data - check metric availability")
# Check evaluation periods and period
if alarm['EvaluationPeriods'] * alarm['Period'] < 600:
issues.append("Evaluation period too short - may cause false alarms")
# Check if actions are enabled
if not alarm['ActionsEnabled']:
issues.append("Alarm actions are disabled")
# Check if there are actions configured
if not alarm.get('AlarmActions') and not alarm.get('OKActions'):
issues.append("No actions configured for alarm")
# Get alarm history
history = self.cloudwatch.describe_alarm_history(
AlarmName=alarm_name,
MaxRecords=10
)
# Check for frequent state changes
state_changes = [h for h in history['AlarmHistoryItems'] if h['HistoryItemType'] == 'StateUpdate']
if len(state_changes) > 5:
issues.append("Alarm changing states frequently - review threshold and evaluation criteria")
except Exception as e:
issues.append(f"Error diagnosing alarm: {e}")
return issues
def diagnose_log_issues(self, log_group_name):
"""
Diagnose log ingestion issues
"""
issues = []
try:
# Check if log group exists
response = self.logs_client.describe_log_groups(
logGroupNamePrefix=log_group_name
)
matching_groups = [lg for lg in response['logGroups'] if lg['logGroupName'] == log_group_name]
if not matching_groups:
issues.append("Log group does not exist")
return issues
log_group = matching_groups[0]
# Check log streams
streams_response = self.logs_client.describe_log_streams(
logGroupName=log_group_name,
orderBy='LastEventTime',
descending=True,
limit=10
)
if not streams_response['logStreams']:
issues.append("No log streams found")
else:
# Check for recent activity
latest_stream = streams_response['logStreams'][0]
if 'lastEventTime' in latest_stream:
last_event_time = datetime.fromtimestamp(latest_stream['lastEventTime'] / 1000)
time_since_last = datetime.utcnow() - last_event_time
if time_since_last.total_seconds() > 3600: # More than 1 hour
issues.append(f"No recent log events (last event: {last_event_time})")
# Check for stuck streams
stuck_streams = 0
for stream in streams_response['logStreams']:
if 'lastEventTime' in stream and 'lastIngestionTime' in stream:
event_time = stream['lastEventTime']
ingestion_time = stream['lastIngestionTime']
if ingestion_time - event_time > 300000: # More than 5 minutes delay
stuck_streams += 1
if stuck_streams > 0:
issues.append(f"{stuck_streams} log streams have ingestion delays")
# Check retention policy
if 'retentionInDays' not in log_group:
issues.append("No retention policy set - logs will be stored indefinitely")
except Exception as e:
issues.append(f"Error diagnosing logs: {e}")
return issues
def run_comprehensive_diagnosis(self, resources):
"""
Run comprehensive diagnosis on multiple resources
"""
diagnosis_report = {
'timestamp': datetime.utcnow().isoformat(),
'resources': {}
}
for resource in resources:
resource_type = resource['type']
resource_name = resource['name']
if resource_type == 'metric':
issues = self.diagnose_metric_issues(
resource['namespace'],
resource['metric_name'],
resource['start_time'],
resource['end_time']
)
elif resource_type == 'alarm':
issues = self.diagnose_alarm_issues(resource_name)
elif resource_type == 'log_group':
issues = self.diagnose_log_issues(resource_name)
else:
issues = [f"Unknown resource type: {resource_type}"]
diagnosis_report['resources'][resource_name] = {
'type': resource_type,
'issues': issues,
'status': 'healthy' if not issues else 'issues_detected'
}
return diagnosis_report
# Example usage
troubleshooter = CloudWatchTroubleshooter()
# Define resources to diagnose
resources_to_check = [
{
'type': 'metric',
'name': 'CPUUtilization',
'namespace': 'AWS/EC2',
'metric_name': 'CPUUtilization',
'start_time': datetime.utcnow() - timedelta(hours=2),
'end_time': datetime.utcnow()
},
{
'type': 'alarm',
'name': 'HighCPUUtilization'
},
{
'type': 'log_group',
'name': '/aws/myapp/production'
}
]
# Run diagnosis
diagnosis = troubleshooter.run_comprehensive_diagnosis(resources_to_check)
print("CloudWatch Diagnosis Report:")
print(json.dumps(diagnosis, indent=2, default=str))

Conclusion#

Amazon CloudWatch provides comprehensive monitoring and observability capabilities for AWS infrastructure and applications. Key takeaways:

Essential Features:#

  • Metrics: Collect and monitor quantitative data from AWS services and custom applications
  • Logs: Centralized log management with powerful query capabilities
  • Alarms: Automated notifications and actions based on thresholds
  • Dashboards: Visual monitoring and real-time insights
  • Events: React to system changes and custom application events

Best Practices:#

  • Implement layered monitoring (infrastructure, application, business)
  • Use appropriate log retention policies
  • Set up meaningful alarms with proper thresholds
  • Leverage anomaly detection for dynamic thresholds
  • Optimize costs through metric and log management

Advanced Capabilities:#

  • Cross-account monitoring
  • Custom widgets with Lambda
  • Composite alarms for complex scenarios
  • Log Insights for sophisticated log analysis
  • Integration with EventBridge for event-driven architectures

CloudWatch forms the foundation of observability in AWS, enabling proactive monitoring, rapid troubleshooting, and data-driven decision making. Proper implementation ensures system reliability, performance optimization, and cost control while maintaining security and compliance standards.

The Complete Guide to Amazon CloudWatch: Comprehensive Monitoring and Observability for AWS
https://mranv.pages.dev/posts/complete-guide-amazon-cloudwatch-monitoring/
Author
Anubhav Gain
Published at
2025-09-04
License
CC BY-NC-SA 4.0