Blocking Attacks with Active Response in Wazuh
Introduction
Note: Wazuh v4.2.0 includes breaking changes in Active Response. Check the official documentation for the latest updates.
Active Response is one of Wazuh’s most powerful features, allowing automatic execution of scripts or commands in response to specific alerts. This capability enables organizations to immediately block attacks, remediate vulnerabilities, and maintain security posture without manual intervention.
Active Response enables:
- 🛡️ Automatic Attack Blocking: Stop threats in real-time
- ⚡ Immediate Remediation: Execute defensive actions instantly
- 🔧 Custom Responses: Create tailored security responses
- 📊 Reduced Response Time: Minimize window of exposure
- 🔄 Automated Security: Implement self-healing security measures
Understanding Active Response
Architecture Overview
flowchart TB subgraph "Threat Detection" T1[Security Event] T2[Log Analysis] T3[Rule Match] end
subgraph "Wazuh Manager" D1[Decoder Processing] R1[Rule Engine] AR1[Active Response Trigger] C1[Command Selection] end
subgraph "Active Response" E1[Execute Local] E2[Execute on Agent] E3[Execute on Server] E4[Execute Everywhere] end
subgraph "Response Actions" A1[Block IP] A2[Kill Process] A3[Delete File] A4[Custom Script] end
subgraph "Management" M1[Timeout Control] M2[Whitelist Check] M3[Response Logging] end
T1 --> T2 T2 --> T3 T3 --> D1 D1 --> R1 R1 --> AR1 AR1 --> C1
C1 --> E1 C1 --> E2 C1 --> E3 C1 --> E4
E1 --> A1 E2 --> A2 E3 --> A3 E4 --> A4
AR1 --> M1 AR1 --> M2 A1 --> M3
style T1 fill:#ff6b6b style AR1 fill:#4dabf7 style A1 fill:#51cf66
Key Components
- Commands: Define what action to execute
- Active Response Configuration: Specify when and where to execute
- Scripts: Implement the actual response logic
- Timeouts: Automatic reversal of actions
- Whitelists: Prevent blocking critical systems
Implementation Guide
Prerequisites
- Wazuh Manager: Version 1.1+ installed and running
- Wazuh Agents: Deployed on systems to protect
- Firewall: iptables, pf, or Windows Firewall
- Permissions: Root/Administrator access
Use Case: Blocking SSH Brute Force Attacks
We’ll implement active response to automatically block IPs attempting SSH brute force attacks.
Step 1: Define the Command
Add to /var/ossec/etc/ossec.conf
on the Wazuh Manager:
<ossec_config> <command> <name>firewall-drop</name> <executable>firewall-drop</executable> <timeout_allowed>yes</timeout_allowed> </command></ossec_config>
For Wazuh 4.2.0+:
<ossec_config> <command> <name>firewall-drop</name> <executable>firewall-drop</executable> <timeout_allowed>yes</timeout_allowed> </command></ossec_config>
Step 2: Configure Active Response
Add to /var/ossec/etc/ossec.conf
:
<ossec_config> <active-response> <command>firewall-drop</command> <location>local</location> <rules_id>5712</rules_id> <timeout>1800</timeout> </active-response></ossec_config>
Configuration explained:
- command: References the command defined earlier
- location: Where to execute (local, server, defined-agent, all)
- rules_id: Trigger on rule 5712 (SSH brute force)
- timeout: Block for 1800 seconds (30 minutes)
Step 3: Understanding the Firewall Drop Script
The firewall-drop
script (located at /var/ossec/active-response/bin/firewall-drop
) handles multiple firewall types:
#!/bin/sh# Firewall-drop response script
ACTION=$1USER=$2IP=$3ALERTID=$4RULEID=$5
LOCAL=$(dirname $0);cd $LOCALcd ../PWD=$(pwd)
# Lock fileLOCK="${PWD}/fw-drop"LOCK_PID="${LOCK}/pid"
# Getting alert timeALERTTIME=$(echo "$ALERTID" | cut -d "." -f 1)
# Checking for an IPif [ "x${IP}" = "x" ]; then echo "$0: Missing IP" exit 1;fi
# Blocking IPif [ "x${ACTION}" != "xadd" -a "x${ACTION}" != "xdelete" ]; then echo "$0: Invalid action: ${ACTION}" exit 1;fi
# We should block only IPs that are validif echo "${IP}" | grep -E "^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$" > /dev/null 2>&1; then # Detect firewall type and block if which iptables > /dev/null 2>&1; then if [ "x${ACTION}" = "xadd" ]; then iptables -I INPUT -s ${IP} -j DROP else iptables -D INPUT -s ${IP} -j DROP fi elif which firewall-cmd > /dev/null 2>&1; then if [ "x${ACTION}" = "xadd" ]; then firewall-cmd --direct --add-rule ipv4 filter INPUT 0 -s ${IP} -j DROP else firewall-cmd --direct --remove-rule ipv4 filter INPUT 0 -s ${IP} -j DROP fi elif which pf > /dev/null 2>&1; then # BSD pf firewall if [ "x${ACTION}" = "xadd" ]; then echo "block in quick from ${IP} to any" | pfctl -a wazuh -f - else pfctl -a wazuh -F rules fi fifi
exit 0;
Step 4: Configure Whitelisting
Prevent blocking critical IPs by adding to /var/ossec/etc/ossec.conf
:
<ossec_config> <global> <white_list>127.0.0.1</white_list> <white_list>^localhost.localdomain$</white_list> <white_list>10.0.0.0/8</white_list> <white_list>192.168.0.0/16</white_list> <!-- Add your management IPs --> <white_list>203.0.113.10</white_list> </global></ossec_config>
Step 5: Restart Wazuh Manager
systemctl restart wazuh-manager
Testing Active Response
Simulate SSH Brute Force Attack
From an attacker machine (10.0.0.6), attempt multiple failed SSH logins:
# Check connectivity before attackping 10.0.0.5# PING 10.0.0.5 (10.0.0.5) 56(84) bytes of data.# 64 bytes from 10.0.0.5: icmp_seq=1 ttl=64 time=0.602 ms
# Simulate brute forcefor i in {1..10}; do ssh invalid@10.0.0.5done
Monitor Alert Generation
Check Wazuh alerts:
tail -f /var/ossec/logs/alerts/alerts.log
Expected output:
** Alert 1461258591.68247: mail - syslog,sshd,authentication_failures,pci_dss_11.4,pci_dss_10.2.4,pci_dss_10.2.5,2016 Apr 21 17:09:51 (LinAgent) 10.0.0.5->/var/log/auth.logRule: 5712 (level 10) -> 'SSHD brute force trying to get access to the system.'Src IP: 10.0.0.6Apr 21 17:09:49 LinAgent sshd[1336]: Invalid user ec2-user from 10.0.0.6Apr 21 17:09:46 LinAgent sshd[1334]: Invalid user ec2-user from 10.0.0.6...
Verify IP Blocking
From the attacker machine:
# Try to ping after being blockedping 10.0.0.5# PING 10.0.0.5 (10.0.0.5) 56(84) bytes of data.# ^C# --- 10.0.0.5 ping statistics ---# 12 packets transmitted, 0 received, 100% packet loss, time 11000ms
Advanced Active Response Configurations
1. Multiple Response Actions
<ossec_config> <!-- Block and notify --> <active-response> <command>firewall-drop</command> <location>local</location> <rules_id>5712</rules_id> <timeout>3600</timeout> </active-response>
<active-response> <command>mail-notify</command> <location>server</location> <rules_id>5712</rules_id> </active-response>
<!-- Kill malicious processes --> <active-response> <command>kill-process</command> <location>local</location> <rules_id>100200</rules_id> </active-response></ossec_config>
2. Repeated Offenders
Configure increasing block times for repeat offenders:
<ossec_config> <active-response> <command>firewall-drop</command> <location>local</location> <rules_id>5712</rules_id> <timeout>1800</timeout> <repeated_offenders>3600,7200,86400</repeated_offenders> </active-response></ossec_config>
- First offense: 30 minutes (1800 seconds)
- Second offense: 1 hour (3600 seconds)
- Third offense: 2 hours (7200 seconds)
- Fourth+ offense: 24 hours (86400 seconds)
3. Custom Response Scripts
Example: Quarantine Infected Files
Create /var/ossec/active-response/bin/file-quarantine
:
#!/bin/bash# file-quarantine - Move malicious files to quarantine
ACTION=$1USER=$2IP=$3ALERTID=$4RULEID=$5FILENAME=$6
QUARANTINE_DIR="/var/ossec/quarantine"LOG_FILE="/var/ossec/logs/active-responses.log"
# Create quarantine directory if not existsmkdir -p "$QUARANTINE_DIR"
# Function to log actionslog_action() { echo "$(date): $1" >> "$LOG_FILE"}
if [ "$ACTION" = "add" ]; then if [ -f "$FILENAME" ]; then # Generate unique quarantine name TIMESTAMP=$(date +%Y%m%d_%H%M%S) BASENAME=$(basename "$FILENAME") QUARANTINE_FILE="$QUARANTINE_DIR/${TIMESTAMP}_${ALERTID}_${BASENAME}"
# Move file to quarantine mv "$FILENAME" "$QUARANTINE_FILE" chmod 000 "$QUARANTINE_FILE"
# Create metadata file cat > "${QUARANTINE_FILE}.info" << EOFOriginal Path: $FILENAMEAlert ID: $ALERTIDRule ID: $RULEIDQuarantine Time: $(date)User: $USERSource IP: $IPEOF
log_action "Quarantined file: $FILENAME -> $QUARANTINE_FILE" else log_action "File not found: $FILENAME" fielif [ "$ACTION" = "delete" ]; then # Restore file from quarantine QUARANTINE_FILE=$(find "$QUARANTINE_DIR" -name "*_${ALERTID}_*" -type f ! -name "*.info" | head -1)
if [ -n "$QUARANTINE_FILE" ] && [ -f "${QUARANTINE_FILE}.info" ]; then ORIGINAL_PATH=$(grep "Original Path:" "${QUARANTINE_FILE}.info" | cut -d: -f2- | xargs)
if [ -n "$ORIGINAL_PATH" ]; then mv "$QUARANTINE_FILE" "$ORIGINAL_PATH" rm -f "${QUARANTINE_FILE}.info" log_action "Restored file: $QUARANTINE_FILE -> $ORIGINAL_PATH" fi fifi
exit 0
Configure the command:
<command> <name>file-quarantine</name> <executable>file-quarantine</executable> <timeout_allowed>yes</timeout_allowed></command>
<active-response> <command>file-quarantine</command> <location>local</location> <rules_id>554</rules_id> <!-- File modified --></active-response>
Example: Network Isolation
Create /var/ossec/active-response/bin/network-isolate
:
#!/usr/bin/env python3# network-isolate - Isolate compromised hosts
import sysimport subprocessimport jsonimport loggingfrom datetime import datetime
# Setup logginglogging.basicConfig( filename='/var/ossec/logs/network-isolation.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def isolate_host(action, ip): """Isolate or restore network access for a host"""
if action == "add": # Create isolation rules rules = [ # Block all incoming except from management network f"iptables -I INPUT -s {ip} -j DROP", f"iptables -I INPUT -s 10.0.1.0/24 -d {ip} -j ACCEPT",
# Block all outgoing except to DNS and management f"iptables -I OUTPUT -d {ip} -j DROP", f"iptables -I OUTPUT -s {ip} -d 10.0.1.0/24 -j ACCEPT", f"iptables -I OUTPUT -s {ip} -p udp --dport 53 -j ACCEPT",
# Log isolation f"iptables -I INPUT -s {ip} -j LOG --log-prefix 'ISOLATED:{ip} '", f"iptables -I OUTPUT -d {ip} -j LOG --log-prefix 'ISOLATED:{ip} '" ]
for rule in rules: subprocess.run(rule.split(), capture_output=True)
logging.info(f"Isolated host: {ip}")
# Notify SOC notify_soc("Host Isolated", f"Host {ip} has been isolated due to security alert")
elif action == "delete": # Remove isolation rules rules = [ f"iptables -D INPUT -s {ip} -j DROP", f"iptables -D OUTPUT -d {ip} -j DROP", f"iptables -D INPUT -s {ip} -j LOG --log-prefix 'ISOLATED:{ip} '", f"iptables -D OUTPUT -d {ip} -j LOG --log-prefix 'ISOLATED:{ip} '" ]
for rule in rules: try: subprocess.run(rule.split(), capture_output=True) except: pass
logging.info(f"Restored network access for: {ip}")
def notify_soc(subject, message): """Send notification to SOC team"""
webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
payload = { "text": subject, "attachments": [{ "color": "danger", "fields": [{ "title": "Alert", "value": message, "short": False }, { "title": "Time", "value": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "short": True }] }] }
try: subprocess.run([ "curl", "-X", "POST", "-H", "Content-Type: application/json", "-d", json.dumps(payload), webhook_url ], capture_output=True) except: pass
if __name__ == "__main__": if len(sys.argv) < 4: sys.exit(1)
action = sys.argv[1] ip = sys.argv[3]
if action in ["add", "delete"] and ip: isolate_host(action, ip)
4. Monitoring Active Response
Enable Active Response Logging
Configure agents to log active response actions. Add to /var/ossec/etc/shared/agent.conf
:
<agent_config> <localfile> <log_format>syslog</log_format> <location>/var/ossec/logs/active-responses.log</location> </localfile></agent_config>
Create the log file on each agent:
touch /var/ossec/logs/active-responses.logchown ossec:ossec /var/ossec/logs/active-responses.log
Create Monitoring Rules
Add to /var/ossec/etc/rules/local_rules.xml
:
<group name="active_response,"> <!-- Active response executed --> <rule id="100300" level="3"> <decoded_as>ar_log</decoded_as> <status>add</status> <description>Active response triggered: $(script) on $(srcip)</description> <group>active_response_trigger,</group> </rule>
<!-- Active response timeout --> <rule id="100301" level="3"> <decoded_as>ar_log</decoded_as> <status>delete</status> <description>Active response timeout: $(script) on $(srcip)</description> <group>active_response_timeout,</group> </rule>
<!-- Active response error --> <rule id="100302" level="7"> <decoded_as>ar_log</decoded_as> <status>error</status> <description>Active response error: $(script)</description> <group>active_response_error,</group> </rule>
<!-- Multiple active responses --> <rule id="100303" level="9" frequency="10" timeframe="300"> <if_matched_sid>100300</if_matched_sid> <same_source_ip /> <description>Multiple active responses triggered for same IP</description> <group>active_response_multiple,</group> </rule></group>
Best Practices
1. Testing Active Response
Always test active response in a controlled environment:
#!/bin/bash# test-active-response.sh - Test active response configuration
# Test command execution/var/ossec/bin/agent_control -b 192.168.1.100 -f firewall-drop -r 5712
# Check if IP was blockediptables -L INPUT -n | grep 192.168.1.100
# Test timeoutsleep 1805 # Wait for timeout + 5 seconds
# Verify IP was unblockediptables -L INPUT -n | grep 192.168.1.100
2. Gradual Implementation
Implementation Phases: Phase 1 - Monitoring: - Deploy active response in alert-only mode - Log actions without executing - Analyze potential impact
Phase 2 - Limited Deployment: - Enable on non-critical systems - Short timeout periods (300-600 seconds) - Monitor false positives
Phase 3 - Production: - Enable on all systems - Appropriate timeout periods - Implement repeated offenders
Phase 4 - Advanced: - Custom response scripts - Integration with other tools - Automated remediation
3. Whitelist Management
#!/usr/bin/env python3# manage-whitelist.py - Manage active response whitelist
import ipaddressimport yamlimport subprocess
class WhitelistManager: def __init__(self, config_file='/etc/ossec-whitelist.yaml'): self.config_file = config_file self.load_whitelist()
def load_whitelist(self): """Load whitelist from YAML file""" with open(self.config_file, 'r') as f: self.whitelist = yaml.safe_load(f)
def add_ip(self, ip, reason, expires=None): """Add IP to whitelist""" entry = { 'ip': ip, 'reason': reason, 'added': datetime.now().isoformat(), 'expires': expires }
self.whitelist['ips'].append(entry) self.save_whitelist() self.update_ossec_config()
def remove_expired(self): """Remove expired whitelist entries""" current_time = datetime.now()
self.whitelist['ips'] = [ entry for entry in self.whitelist['ips'] if not entry.get('expires') or datetime.fromisoformat(entry['expires']) > current_time ]
self.save_whitelist() self.update_ossec_config()
def update_ossec_config(self): """Update OSSEC configuration with whitelist"""
# Generate whitelist XML whitelist_xml = "" for entry in self.whitelist['ips']: whitelist_xml += f" <white_list>{entry['ip']}</white_list>\n"
# Update ossec.conf # (Implementation depends on your config management approach)
# Restart OSSEC subprocess.run(['systemctl', 'restart', 'wazuh-manager'])
# Example whitelist YAML format:"""ips: - ip: 10.0.0.10 reason: Management server added: 2023-01-01T00:00:00
- ip: 192.168.1.0/24 reason: Internal network added: 2023-01-01T00:00:00
- ip: 203.0.113.50 reason: Temporary vendor access added: 2023-06-01T00:00:00 expires: 2023-06-30T00:00:00"""
Advanced Use Cases
1. Ransomware Response
<!-- Detect and respond to ransomware --><rule id="100400" level="15"> <if_group>syscheck</if_group> <match>\.encrypted|\.locked|\.crypto</match> <description>Possible ransomware activity detected</description></rule>
<command> <name>ransomware-response</name> <executable>ransomware-response.sh</executable> <timeout_allowed>no</timeout_allowed></command>
<active-response> <command>ransomware-response</command> <location>local</location> <rules_id>100400</rules_id></active-response>
Response script:
#!/bin/bash# ransomware-response.sh - Emergency ransomware response
ACTION=$1USER=$2IP=$3ALERTID=$4RULEID=$5
if [ "$ACTION" = "add" ]; then # Isolate system iptables -I INPUT -j DROP iptables -I OUTPUT -j DROP
# Allow only management access iptables -I INPUT -s 10.0.1.0/24 -j ACCEPT
# Kill suspicious processes pkill -f ".encrypted|.locked|.crypto"
# Disable scheduled tasks systemctl stop crond
# Alert administrators echo "RANSOMWARE DETECTED on $(hostname)" | \ mail -s "CRITICAL: Ransomware Alert" security@company.com
# Initiate backup /usr/local/bin/emergency-backup.sh &fi
2. Web Attack Mitigation
<!-- Block web application attacks --><rule id="100410" level="10" frequency="5" timeframe="60"> <if_group>web|attack</if_group> <same_source_ip /> <description>Multiple web application attacks from same IP</description></rule>
<command> <name>nginx-ban</name> <executable>nginx-ban.py</executable> <timeout_allowed>yes</timeout_allowed></command>
<active-response> <command>nginx-ban</command> <location>local</location> <rules_id>100410</rules_id> <timeout>3600</timeout></active-response>
Nginx ban script:
#!/usr/bin/env python3# nginx-ban.py - Ban IPs in Nginx
import sysimport osimport subprocess
NGINX_BLOCKED_CONF = "/etc/nginx/blocked_ips.conf"NGINX_RELOAD_CMD = ["nginx", "-s", "reload"]
def add_block(ip): """Add IP to Nginx blocked list"""
# Add deny rule with open(NGINX_BLOCKED_CONF, 'a') as f: f.write(f"deny {ip};\n")
# Reload Nginx subprocess.run(NGINX_RELOAD_CMD)
def remove_block(ip): """Remove IP from Nginx blocked list"""
# Read current blocks with open(NGINX_BLOCKED_CONF, 'r') as f: lines = f.readlines()
# Remove the IP with open(NGINX_BLOCKED_CONF, 'w') as f: for line in lines: if f"deny {ip};" not in line: f.write(line)
# Reload Nginx subprocess.run(NGINX_RELOAD_CMD)
if __name__ == "__main__": action = sys.argv[1] ip = sys.argv[3]
if action == "add": add_block(ip) elif action == "delete": remove_block(ip)
3. Automated Vulnerability Remediation
#!/usr/bin/env python3# vulnerability-patch.py - Automated patching response
import sysimport subprocessimport loggingimport json
class VulnerabilityPatcher: def __init__(self): self.patches = { '100500': self.patch_shellshock, '100501': self.patch_heartbleed, '100502': self.patch_log4j }
def patch_shellshock(self): """Patch Shellshock vulnerability""" commands = [ ["yum", "update", "-y", "bash"], ["apt-get", "update"], ["apt-get", "install", "-y", "bash"] ]
for cmd in commands: try: subprocess.run(cmd, check=True) return True except: continue
return False
def patch_log4j(self): """Mitigate Log4j vulnerability"""
# Set system property with open('/etc/environment', 'a') as f: f.write('\nLOG4J_FORMAT_MSG_NO_LOOKUPS=true\n')
# Find and update log4j configurations result = subprocess.run( ["find", "/", "-name", "log4j*.properties"], capture_output=True, text=True )
for config_file in result.stdout.strip().split('\n'): if config_file: # Add mitigation to config with open(config_file, 'a') as f: f.write('\nlog4j2.formatMsgNoLookups=true\n')
return True
def execute_patch(self, rule_id): """Execute patch based on rule ID"""
if rule_id in self.patches: logging.info(f"Executing patch for rule {rule_id}")
success = self.patches[rule_id]()
if success: logging.info(f"Successfully patched vulnerability {rule_id}") self.notify_success(rule_id) else: logging.error(f"Failed to patch vulnerability {rule_id}") self.notify_failure(rule_id) else: logging.warning(f"No patch available for rule {rule_id}")
def notify_success(self, rule_id): """Notify successful patching""" message = f"Vulnerability {rule_id} successfully patched on {subprocess.run(['hostname'], capture_output=True, text=True).stdout.strip()}" subprocess.run(["mail", "-s", "Patch Success", "security@company.com"], input=message.encode())
def notify_failure(self, rule_id): """Notify patching failure""" message = f"Failed to patch vulnerability {rule_id} on {subprocess.run(['hostname'], capture_output=True, text=True).stdout.strip()}" subprocess.run(["mail", "-s", "Patch Failure - Manual Intervention Required", "security@company.com"], input=message.encode())
if __name__ == "__main__": action = sys.argv[1] rule_id = sys.argv[5]
if action == "add": patcher = VulnerabilityPatcher() patcher.execute_patch(rule_id)
Monitoring and Metrics
Active Response Dashboard
Create visualizations to monitor active response effectiveness:
#!/usr/bin/env python3# ar-metrics.py - Collect active response metrics
import jsonimport sqlite3from datetime import datetime, timedeltafrom collections import defaultdict
class ActiveResponseMetrics: def __init__(self, db_path='/var/ossec/stats/ar_metrics.db'): self.conn = sqlite3.connect(db_path) self.create_tables()
def create_tables(self): """Create metrics tables"""
self.conn.execute(''' CREATE TABLE IF NOT EXISTS ar_executions ( id INTEGER PRIMARY KEY, timestamp DATETIME, rule_id INTEGER, command TEXT, source_ip TEXT, action TEXT, status TEXT, error TEXT ) ''')
self.conn.execute(''' CREATE TABLE IF NOT EXISTS ar_effectiveness ( date DATE PRIMARY KEY, total_blocks INTEGER, repeat_offenders INTEGER, false_positives INTEGER, avg_block_duration REAL ) ''')
def log_execution(self, alert_data): """Log active response execution"""
self.conn.execute(''' INSERT INTO ar_executions (timestamp, rule_id, command, source_ip, action, status) VALUES (?, ?, ?, ?, ?, ?) ''', ( datetime.now(), alert_data['rule_id'], alert_data['command'], alert_data['source_ip'], alert_data['action'], 'success' )) self.conn.commit()
def calculate_effectiveness(self): """Calculate active response effectiveness metrics"""
# Get data for last 24 hours yesterday = datetime.now() - timedelta(days=1)
cursor = self.conn.execute(''' SELECT COUNT(*) as total, COUNT(DISTINCT source_ip) as unique_ips, AVG(CASE WHEN action = 'delete' THEN (julianday(timestamp) - julianday( LAG(timestamp) OVER (PARTITION BY source_ip ORDER BY timestamp) )) * 24 * 60 ELSE NULL END) as avg_block_minutes FROM ar_executions WHERE timestamp > ? ''', (yesterday,))
metrics = cursor.fetchone()
# Count repeat offenders cursor = self.conn.execute(''' SELECT COUNT(*) FROM ( SELECT source_ip, COUNT(*) as count FROM ar_executions WHERE timestamp > ? AND action = 'add' GROUP BY source_ip HAVING count > 1 ) ''', (yesterday,))
repeat_offenders = cursor.fetchone()[0]
return { 'total_blocks': metrics[0], 'unique_ips': metrics[1], 'repeat_offenders': repeat_offenders, 'avg_block_duration': metrics[2] or 0, 'effectiveness_rate': (metrics[1] - repeat_offenders) / metrics[1] * 100 if metrics[1] > 0 else 0 }
def generate_report(self): """Generate active response report"""
metrics = self.calculate_effectiveness()
report = f"""Active Response Daily Report============================Date: {datetime.now().strftime('%Y-%m-%d')}
Summary Statistics:- Total Blocks: {metrics['total_blocks']}- Unique IPs Blocked: {metrics['unique_ips']}- Repeat Offenders: {metrics['repeat_offenders']}- Average Block Duration: {metrics['avg_block_duration']:.2f} minutes- Effectiveness Rate: {metrics['effectiveness_rate']:.2f}%
Top Triggered Rules:"""
# Get top rules cursor = self.conn.execute(''' SELECT rule_id, COUNT(*) as count FROM ar_executions WHERE timestamp > ? AND action = 'add' GROUP BY rule_id ORDER BY count DESC LIMIT 10 ''', (datetime.now() - timedelta(days=1),))
for row in cursor: report += f" - Rule {row[0]}: {row[1]} times\n"
return report
# Generate daily reportif __name__ == "__main__": metrics = ActiveResponseMetrics() print(metrics.generate_report())
Troubleshooting
Common Issues and Solutions
Issue 1: Active Response Not Triggering
# Check if active response is enabledgrep -A 10 "<active-response>" /var/ossec/etc/ossec.conf
# Verify rule is triggeringgrep "rule:5712" /var/ossec/logs/alerts/alerts.log
# Check active response logtail -f /var/ossec/logs/active-responses.log
# Test manual execution/var/ossec/active-response/bin/firewall-drop add - 192.168.1.100 1234567.89 5712
Issue 2: IPs Not Being Blocked
# Check if script has execute permissionsls -la /var/ossec/active-response/bin/firewall-drop
# Verify firewall commandswhich iptablesiptables -L INPUT -n
# Check for whitelistgrep -A 10 "white_list" /var/ossec/etc/ossec.conf
Issue 3: Timeouts Not Working
# Check timeout queuels -la /var/ossec/var/run/
# Monitor timeout executiontail -f /var/ossec/logs/ossec.log | grep -i timeout
# Verify ar_queue is runningps aux | grep ar_queue
Security Considerations
1. Preventing DoS via Active Response
#!/usr/bin/env python3# ar-rate-limit.py - Rate limit active response executions
import timeimport jsonfrom collections import defaultdictfrom datetime import datetime, timedelta
class ActiveResponseRateLimit: def __init__(self, max_blocks_per_hour=100, max_per_ip=5): self.max_blocks_per_hour = max_blocks_per_hour self.max_per_ip = max_per_ip self.blocks = defaultdict(list) self.total_blocks = []
def should_block(self, ip): """Check if IP should be blocked based on rate limits"""
current_time = datetime.now() hour_ago = current_time - timedelta(hours=1)
# Clean old entries self.total_blocks = [ t for t in self.total_blocks if t > hour_ago ] self.blocks[ip] = [ t for t in self.blocks[ip] if t > hour_ago ]
# Check total rate limit if len(self.total_blocks) >= self.max_blocks_per_hour: return False, "Global rate limit exceeded"
# Check per-IP rate limit if len(self.blocks[ip]) >= self.max_per_ip: return False, f"Per-IP rate limit exceeded for {ip}"
# Allow block self.blocks[ip].append(current_time) self.total_blocks.append(current_time)
return True, "OK"
2. Secure Script Execution
#!/bin/bash# secure-ar-wrapper.sh - Secure wrapper for active response scripts
# Validate inputsvalidate_ip() { local ip=$1 if [[ ! $ip =~ ^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then return 1 fi return 0}
validate_action() { local action=$1 if [[ "$action" != "add" && "$action" != "delete" ]]; then return 1 fi return 0}
# Sanitize inputsACTION=$(echo "$1" | tr -cd '[:alnum:]')IP=$(echo "$3" | tr -cd '[:digit:].')
# Validateif ! validate_action "$ACTION"; then logger -t wazuh-ar "Invalid action: $1" exit 1fi
if ! validate_ip "$IP"; then logger -t wazuh-ar "Invalid IP: $3" exit 1fi
# Execute with restricted environmentenv -i PATH=/usr/bin:/bin USER=ossec HOME=/var/ossec \ /var/ossec/active-response/bin/firewall-drop "$ACTION" "$2" "$IP" "$4" "$5"
Best Practices Summary
- Test Thoroughly: Always test in non-production first
- Start Conservative: Begin with short timeouts and monitoring
- Whitelist Critical Systems: Prevent blocking essential services
- Monitor Effectiveness: Track metrics and adjust configurations
- Document Everything: Maintain clear documentation of all active responses
- Regular Reviews: Periodically review and update rules
- Incident Response Integration: Align with IR procedures
- Fail-Safe Mechanisms: Implement emergency disable procedures
Conclusion
Active Response transforms Wazuh from a detection system into an active defense platform. By automatically responding to threats, organizations can:
- 🛡️ Reduce Response Time: Block attacks within seconds
- ⚡ Minimize Damage: Stop threats before they spread
- 🔧 Automate Security: Reduce manual intervention needs
- 📊 Improve Security Posture: Consistent, rapid response
- 🚨 24/7 Protection: Automated response works around the clock
Remember that with great power comes great responsibility - always implement active response carefully with appropriate testing and safeguards.
Key Takeaways
- Start Simple: Begin with basic firewall blocking
- Test Extensively: Verify behavior before production
- Monitor Impact: Track false positives and effectiveness
- Iterate and Improve: Continuously refine responses
- Document Actions: Maintain audit trails
Resources
- Wazuh Active Response Documentation
- Wazuh Rules Reference
- OSSEC Active Response Scripts
- Security Response Best Practices
Automate your security response with Wazuh Active Response! 🛡️⚡