Wazuh Anomaly Detection Use Cases: Advanced Security Monitoring
This guide explores advanced anomaly detection use cases in Wazuh, demonstrating how to leverage its capabilities for identifying unusual patterns, behaviors, and potential security threats that traditional signature-based detection might miss.
Overview
Anomaly detection in Wazuh goes beyond simple rule matching, employing statistical analysis, behavioral patterns, and machine learning to identify:
- Unusual user behaviors
- Abnormal system activities
- Network traffic anomalies
- Application behavior deviations
- Data exfiltration attempts
- Insider threats
- Zero-day attack patterns
Core Anomaly Detection Components
1. Statistical Analysis Engine
Wazuh uses statistical analysis to establish baselines and detect deviations:
<group name="anomaly,statistical">
<!-- Unusual login times detection --> <rule id="100001" level="10"> <if_sid>5501</if_sid> <time>21:00 - 06:00</time> <description>Successful login during non-business hours</description> <options>no_full_log</options> <group>anomaly,authentication</group> </rule>
<!-- Abnormal number of failed logins --> <rule id="100002" level="12" frequency="10" timeframe="120"> <if_matched_sid>5503</if_matched_sid> <description>High number of failed login attempts - possible brute force</description> <mitre> <id>T1110</id> </mitre> <group>anomaly,authentication,brute_force</group> </rule>
<!-- Unusual process execution frequency --> <rule id="100003" level="8" frequency="50" timeframe="60"> <if_sid>5901</if_sid> <same_field>process.name</same_field> <description>Abnormally high process execution rate detected</description> <group>anomaly,process</group> </rule>
</group>
2. Behavioral Pattern Detection
Implement behavioral analysis for user and system activities:
#!/usr/bin/env python3import jsonimport sysimport timeimport numpy as npfrom collections import defaultdictfrom datetime import datetime, timedelta
class BehavioralAnomalyDetector: def __init__(self): self.user_baselines = defaultdict(lambda: { 'login_times': [], 'commands': defaultdict(int), 'files_accessed': set(), 'network_connections': defaultdict(int), 'data_volume': [] }) self.window_size = 7 # days self.anomaly_threshold = 2.5 # standard deviations
def process_alert(self, alert): """Process incoming alert for anomaly detection""" alert_data = json.loads(alert)
# Extract relevant fields user = alert_data.get('data', {}).get('srcuser', 'unknown') timestamp = alert_data.get('timestamp', '') rule_id = alert_data.get('rule', {}).get('id', '')
# Detect anomalies based on rule type anomalies = []
if rule_id in ['5501', '5715']: # Login events anomaly = self.detect_login_anomaly(user, timestamp) if anomaly: anomalies.append(anomaly)
elif rule_id.startswith('59'): # Process execution command = alert_data.get('data', {}).get('command', '') anomaly = self.detect_command_anomaly(user, command) if anomaly: anomalies.append(anomaly)
elif rule_id in ['550', '551', '552']: # File access file_path = alert_data.get('data', {}).get('path', '') anomaly = self.detect_file_access_anomaly(user, file_path) if anomaly: anomalies.append(anomaly)
elif rule_id.startswith('86'): # Network activity dest_ip = alert_data.get('data', {}).get('dstip', '') bytes_sent = alert_data.get('data', {}).get('bytes', 0) anomaly = self.detect_network_anomaly(user, dest_ip, bytes_sent) if anomaly: anomalies.append(anomaly)
return anomalies
def detect_login_anomaly(self, user, timestamp): """Detect unusual login patterns""" try: login_hour = datetime.fromisoformat(timestamp).hour baseline = self.user_baselines[user]['login_times']
if len(baseline) > 20: # Need sufficient data mean_hour = np.mean(baseline) std_hour = np.std(baseline)
if abs(login_hour - mean_hour) > self.anomaly_threshold * std_hour: return { 'type': 'unusual_login_time', 'user': user, 'severity': 'high', 'details': f'Login at {login_hour}:00 deviates from normal pattern', 'baseline_mean': mean_hour, 'baseline_std': std_hour }
# Update baseline self.user_baselines[user]['login_times'].append(login_hour)
except Exception as e: sys.stderr.write(f"Error in login anomaly detection: {e}\n")
return None
def detect_command_anomaly(self, user, command): """Detect unusual command execution""" command_base = command.split()[0] if command else 'unknown' user_commands = self.user_baselines[user]['commands']
# Check if command is rare for this user total_commands = sum(user_commands.values()) command_frequency = user_commands.get(command_base, 0)
if total_commands > 100: # Need sufficient data expected_frequency = 1.0 / len(user_commands) if user_commands else 0 actual_frequency = command_frequency / total_commands
if actual_frequency < expected_frequency * 0.1: # Rare command anomaly = { 'type': 'rare_command_execution', 'user': user, 'severity': 'medium', 'details': f'Unusual command executed: {command_base}', 'frequency': actual_frequency, 'expected': expected_frequency }
# Check for suspicious patterns suspicious_patterns = [ 'wget', 'curl', 'nc', 'ncat', 'ssh-keygen', 'base64', 'xxd', 'dd', 'tcpdump', 'nmap' ]
if any(pattern in command_base.lower() for pattern in suspicious_patterns): anomaly['severity'] = 'high' anomaly['details'] += ' - Potentially suspicious command'
return anomaly
# Update baseline user_commands[command_base] += 1 return None
def detect_file_access_anomaly(self, user, file_path): """Detect unusual file access patterns""" user_files = self.user_baselines[user]['files_accessed']
# Check for sensitive file access sensitive_paths = [ '/etc/passwd', '/etc/shadow', '/etc/sudoers', '/.ssh/', '/var/log/', '/etc/ssl/', '.key', '.pem', '.crt', '.p12' ]
for sensitive in sensitive_paths: if sensitive in file_path and file_path not in user_files: return { 'type': 'sensitive_file_access', 'user': user, 'severity': 'high', 'details': f'First time access to sensitive file: {file_path}', 'file': file_path }
# Check for unusual directory traversal if '../' in file_path or file_path.count('/') > 10: return { 'type': 'directory_traversal_attempt', 'user': user, 'severity': 'high', 'details': f'Suspicious file path pattern: {file_path}', 'file': file_path }
# Update baseline user_files.add(file_path) return None
def detect_network_anomaly(self, user, dest_ip, bytes_sent): """Detect unusual network behavior""" user_network = self.user_baselines[user]['network_connections'] user_data = self.user_baselines[user]['data_volume']
# Check for new destination if dest_ip and dest_ip not in user_network: # Check if it's a suspicious destination if self.is_suspicious_destination(dest_ip): return { 'type': 'suspicious_network_destination', 'user': user, 'severity': 'critical', 'details': f'Connection to suspicious IP: {dest_ip}', 'destination': dest_ip }
# Check for data exfiltration if bytes_sent > 0: user_data.append(bytes_sent)
if len(user_data) > 20: mean_bytes = np.mean(user_data) std_bytes = np.std(user_data)
if bytes_sent > mean_bytes + (self.anomaly_threshold * std_bytes): return { 'type': 'potential_data_exfiltration', 'user': user, 'severity': 'critical', 'details': f'Unusually large data transfer: {bytes_sent} bytes', 'bytes_sent': bytes_sent, 'baseline_mean': mean_bytes, 'baseline_std': std_bytes }
# Update baseline user_network[dest_ip] += 1 return None
def is_suspicious_destination(self, ip): """Check if IP is in suspicious ranges""" suspicious_ranges = [ '10.0.0.0/8', # Private range (unusual for external) '172.16.0.0/12', # Private range '192.168.0.0/16', # Private range # Add known malicious IPs/ranges ]
# Implement IP range checking logic # This is a simplified example return any(ip.startswith(range.split('/')[0].rsplit('.', 1)[0]) for range in suspicious_ranges)
if __name__ == "__main__": # Read alert from stdin alert = sys.stdin.read()
detector = BehavioralAnomalyDetector() anomalies = detector.process_alert(alert)
# Output anomalies for Wazuh active response for anomaly in anomalies: print(json.dumps(anomaly))
Use Case 1: User Behavior Analytics (UBA)
Configuration
<group name="uba,anomaly">
<!-- Detect privilege escalation attempts --> <rule id="100010" level="12"> <if_sid>5303</if_sid> <match>sudo</match> <different_user /> <description>User executing sudo commands for the first time</description> <mitre> <id>T1548</id> </mitre> <group>privilege_escalation,anomaly</group> </rule>
<!-- Detect lateral movement --> <rule id="100011" level="10" frequency="5" timeframe="300"> <if_sid>5706</if_sid> <different_fields>dst_ip</different_fields> <description>User accessing multiple systems in short time - possible lateral movement</description> <mitre> <id>T1021</id> </mitre> <group>lateral_movement,anomaly</group> </rule>
<!-- Detect data staging --> <rule id="100012" level="11"> <decoded_as>file_integrity</decoded_as> <field name="file_path">tmp|temp|staging</field> <field name="file_size" compare="greater">104857600</field> <!-- 100MB --> <description>Large file created in temporary directory - possible data staging</description> <mitre> <id>T1074</id> </mitre> <group>data_staging,anomaly</group> </rule>
</group>
Implementation Script
#!/usr/bin/env python3import jsonimport sqlite3from datetime import datetime, timedeltafrom collections import Counterimport statistics
class UserBehaviorAnalyzer: def __init__(self, db_path='/var/ossec/logs/uba.db'): self.conn = sqlite3.connect(db_path) self.init_database()
def init_database(self): """Initialize UBA database""" cursor = self.conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS user_activities ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME, user TEXT, action TEXT, resource TEXT, risk_score INTEGER, anomaly_type TEXT ) ''')
cursor.execute(''' CREATE TABLE IF NOT EXISTS user_profiles ( user TEXT PRIMARY KEY, normal_hours TEXT, common_resources TEXT, risk_level INTEGER, last_updated DATETIME ) ''') self.conn.commit()
def analyze_user_behavior(self, user, action, resource, timestamp): """Analyze user behavior and calculate risk score""" risk_score = 0 anomalies = []
# Get user profile cursor = self.conn.cursor() cursor.execute( 'SELECT * FROM user_profiles WHERE user = ?', (user,) ) profile = cursor.fetchone()
# Check time-based anomalies current_hour = datetime.fromisoformat(timestamp).hour if profile and profile[1]: # normal_hours normal_hours = json.loads(profile[1]) if current_hour not in normal_hours: risk_score += 30 anomalies.append('after_hours_activity')
# Check resource access anomalies if profile and profile[2]: # common_resources common_resources = json.loads(profile[2]) if resource not in common_resources: risk_score += 20 anomalies.append('unusual_resource_access')
# Check for suspicious actions suspicious_actions = { 'password_change': 25, 'user_creation': 30, 'permission_change': 35, 'data_download': 20, 'config_modification': 40 }
for suspicious, score in suspicious_actions.items(): if suspicious in action.lower(): risk_score += score anomalies.append(f'suspicious_action_{suspicious}')
# Check frequency anomalies frequency_risk = self.check_frequency_anomaly(user, action) risk_score += frequency_risk
# Store activity cursor.execute(''' INSERT INTO user_activities (timestamp, user, action, resource, risk_score, anomaly_type) VALUES (?, ?, ?, ?, ?, ?) ''', ( timestamp, user, action, resource, risk_score, ','.join(anomalies) )) self.conn.commit()
# Update user profile self.update_user_profile(user, timestamp, current_hour, resource)
return { 'user': user, 'risk_score': risk_score, 'anomalies': anomalies, 'action': action, 'resource': resource, 'timestamp': timestamp, 'severity': self.calculate_severity(risk_score) }
def check_frequency_anomaly(self, user, action): """Check for frequency-based anomalies""" cursor = self.conn.cursor()
# Get recent activities one_hour_ago = datetime.now() - timedelta(hours=1) cursor.execute(''' SELECT COUNT(*) FROM user_activities WHERE user = ? AND action = ? AND timestamp > ? ''', (user, action, one_hour_ago.isoformat()))
recent_count = cursor.fetchone()[0]
# Get historical average cursor.execute(''' SELECT COUNT(*) FROM user_activities WHERE user = ? AND action = ? GROUP BY DATE(timestamp) ''', (user, action))
daily_counts = [row[0] for row in cursor.fetchall()]
if daily_counts: avg_hourly = statistics.mean(daily_counts) / 24 if recent_count > avg_hourly * 3: # 3x normal rate return 40 # High risk
return 0
def update_user_profile(self, user, timestamp, hour, resource): """Update user behavior profile""" cursor = self.conn.cursor()
# Get existing profile cursor.execute( 'SELECT normal_hours, common_resources FROM user_profiles WHERE user = ?', (user,) ) result = cursor.fetchone()
if result: normal_hours = set(json.loads(result[0])) common_resources = set(json.loads(result[1])) else: normal_hours = set() common_resources = set()
# Update with new data normal_hours.add(hour) common_resources.add(resource)
# Calculate risk level based on history cursor.execute(''' SELECT AVG(risk_score) FROM user_activities WHERE user = ? AND timestamp > ? ''', (user, (datetime.now() - timedelta(days=7)).isoformat()))
avg_risk = cursor.fetchone()[0] or 0 risk_level = int(avg_risk / 20) # 0-5 scale
# Update profile cursor.execute(''' INSERT OR REPLACE INTO user_profiles (user, normal_hours, common_resources, risk_level, last_updated) VALUES (?, ?, ?, ?, ?) ''', ( user, json.dumps(list(normal_hours)), json.dumps(list(common_resources)), risk_level, timestamp )) self.conn.commit()
def calculate_severity(self, risk_score): """Calculate severity level from risk score""" if risk_score >= 80: return 'critical' elif risk_score >= 60: return 'high' elif risk_score >= 40: return 'medium' elif risk_score >= 20: return 'low' else: return 'info'
Use Case 2: Network Traffic Anomaly Detection
DGA (Domain Generation Algorithm) Detection
#!/usr/bin/env python3import jsonimport mathimport refrom collections import Counter
class DGADetector: def __init__(self): self.tld_list = [ 'com', 'net', 'org', 'info', 'biz', 'co.uk', 'de', 'fr', 'ru', 'cn', 'jp', 'in' ] self.english_freq = { 'a': 0.0817, 'b': 0.0149, 'c': 0.0278, 'd': 0.0425, 'e': 0.1270, 'f': 0.0223, 'g': 0.0202, 'h': 0.0609, 'i': 0.0697, 'j': 0.0015, 'k': 0.0077, 'l': 0.0403, 'm': 0.0241, 'n': 0.0675, 'o': 0.0751, 'p': 0.0193, 'q': 0.0010, 'r': 0.0599, 's': 0.0633, 't': 0.0906, 'u': 0.0276, 'v': 0.0098, 'w': 0.0236, 'x': 0.0015, 'y': 0.0197, 'z': 0.0007 }
def analyze_domain(self, domain): """Analyze domain for DGA characteristics""" # Remove TLD domain_parts = domain.split('.') if len(domain_parts) < 2: return None
sld = domain_parts[-2] # Second level domain
features = { 'domain': domain, 'length': len(sld), 'entropy': self.calculate_entropy(sld), 'consonant_ratio': self.calculate_consonant_ratio(sld), 'number_ratio': self.calculate_number_ratio(sld), 'bigram_score': self.calculate_bigram_score(sld), 'lexical_diversity': self.calculate_lexical_diversity(sld), 'suspicious_tld': self.check_suspicious_tld(domain_parts[-1]) }
# Calculate DGA probability dga_score = self.calculate_dga_score(features) features['dga_score'] = dga_score features['is_dga'] = dga_score > 0.7
return features
def calculate_entropy(self, text): """Calculate Shannon entropy""" if not text: return 0
prob = [float(text.count(c)) / len(text) for c in dict.fromkeys(text)] entropy = -sum([p * math.log(p) / math.log(2.0) for p in prob if p > 0]) return entropy
def calculate_consonant_ratio(self, text): """Calculate ratio of consonants to total characters""" vowels = 'aeiouAEIOU' consonants = sum(1 for c in text if c.isalpha() and c not in vowels) return consonants / len(text) if text else 0
def calculate_number_ratio(self, text): """Calculate ratio of numbers to total characters""" numbers = sum(1 for c in text if c.isdigit()) return numbers / len(text) if text else 0
def calculate_bigram_score(self, text): """Calculate bigram frequency score""" if len(text) < 2: return 0
bigrams = [text[i:i+2] for i in range(len(text)-1)]
# Common English bigrams common_bigrams = [ 'th', 'he', 'in', 'en', 'nt', 're', 'er', 'an', 'ti', 'es', 'on', 'at', 'se', 'nd', 'or', 'ar' ]
score = sum(1 for bg in bigrams if bg.lower() in common_bigrams) return score / len(bigrams)
def calculate_lexical_diversity(self, text): """Calculate character diversity""" if not text: return 0 return len(set(text)) / len(text)
def check_suspicious_tld(self, tld): """Check if TLD is commonly used in DGA""" suspicious_tlds = ['tk', 'ml', 'ga', 'cf', 'click', 'download'] return tld.lower() in suspicious_tlds
def calculate_dga_score(self, features): """Calculate overall DGA probability score""" score = 0
# High entropy indicates randomness if features['entropy'] > 3.5: score += 0.3 elif features['entropy'] > 3.0: score += 0.2
# High consonant ratio if features['consonant_ratio'] > 0.65: score += 0.2
# Contains numbers if features['number_ratio'] > 0.1: score += 0.15
# Low bigram score (uncommon letter combinations) if features['bigram_score'] < 0.1: score += 0.2
# High lexical diversity if features['lexical_diversity'] > 0.8: score += 0.1
# Suspicious TLD if features['suspicious_tld']: score += 0.15
# Long domain name if features['length'] > 20: score += 0.1 elif features['length'] > 15: score += 0.05
return min(score, 1.0) # Cap at 1.0
def main(): # Read DNS query log from Wazuh alert = json.loads(sys.stdin.read())
if 'data' in alert and 'dns' in alert['data']: domain = alert['data']['dns'].get('query', '')
detector = DGADetector() result = detector.analyze_domain(domain)
if result and result['is_dga']: # Generate Wazuh alert anomaly_alert = { 'integration': 'dga_detector', 'anomaly_type': 'suspicious_domain', 'severity': 'high', 'domain': domain, 'dga_score': result['dga_score'], 'features': result, 'description': f'Possible DGA domain detected: {domain}', 'mitre_attack': ['T1568.002'] # Dynamic Resolution: DGA }
print(json.dumps(anomaly_alert))
if __name__ == '__main__': main()
Use Case 3: Application Behavior Anomaly Detection
Web Application Anomaly Detection
<group name="web,anomaly,webapp">
<!-- Unusual request size --> <rule id="100020" level="7"> <if_sid>31100</if_sid> <field name="http.request.length" compare="greater">10000</field> <description>Unusually large HTTP request detected</description> <group>web_anomaly</group> </rule>
<!-- Unusual user agent --> <rule id="100021" level="8"> <if_sid>31100</if_sid> <field name="http.user_agent">python|curl|wget|nikto|sqlmap</field> <description>Suspicious user agent detected - possible automated tool</description> <group>web_anomaly,recon</group> </rule>
<!-- Parameter pollution --> <rule id="100022" level="9"> <if_sid>31100</if_sid> <regex>\?.*([^&=]+=[^&]*&)\1{2,}</regex> <description>HTTP parameter pollution attempt detected</description> <group>web_anomaly,attack</group> </rule>
<!-- Unusual response time --> <rule id="100023" level="6"> <if_sid>31100</if_sid> <field name="http.response_time" compare="greater">5000</field> <description>Slow HTTP response - possible DoS or resource exhaustion</description> <group>web_anomaly,performance</group> </rule>
</group>
API Behavior Monitoring
#!/usr/bin/env python3import jsonimport timefrom collections import defaultdict, dequefrom datetime import datetime, timedeltaimport numpy as np
class APIAnomalyDetector: def __init__(self): self.endpoint_stats = defaultdict(lambda: { 'response_times': deque(maxlen=1000), 'status_codes': defaultdict(int), 'request_rates': deque(maxlen=60), # Per minute for last hour 'unique_ips': set(), 'payload_sizes': deque(maxlen=1000), 'error_rate': deque(maxlen=100) })
self.user_patterns = defaultdict(lambda: { 'endpoints': defaultdict(int), 'request_times': deque(maxlen=1000), 'auth_failures': 0, 'data_volume': 0 })
def analyze_api_request(self, log_data): """Analyze API request for anomalies""" anomalies = []
# Extract fields endpoint = log_data.get('endpoint', '') method = log_data.get('method', '') status = int(log_data.get('status', 0)) response_time = int(log_data.get('response_time', 0)) user = log_data.get('user', 'anonymous') ip = log_data.get('source_ip', '') payload_size = int(log_data.get('payload_size', 0)) timestamp = log_data.get('timestamp', datetime.now().isoformat())
# Update statistics stats = self.endpoint_stats[endpoint] stats['response_times'].append(response_time) stats['status_codes'][status] += 1 stats['unique_ips'].add(ip) stats['payload_sizes'].append(payload_size)
# Check response time anomaly if len(stats['response_times']) > 100: mean_rt = np.mean(stats['response_times']) std_rt = np.std(stats['response_times'])
if response_time > mean_rt + (3 * std_rt): anomalies.append({ 'type': 'slow_response', 'severity': 'medium', 'details': f'Response time {response_time}ms exceeds normal range', 'baseline_mean': mean_rt, 'baseline_std': std_rt })
# Check error rate anomaly if status >= 400: stats['error_rate'].append(1) else: stats['error_rate'].append(0)
if len(stats['error_rate']) == 100: error_rate = sum(stats['error_rate']) / 100 if error_rate > 0.1: # >10% error rate anomalies.append({ 'type': 'high_error_rate', 'severity': 'high', 'details': f'Error rate {error_rate:.2%} exceeds threshold', 'endpoint': endpoint })
# Check request pattern anomalies user_anomalies = self.check_user_pattern_anomalies(user, endpoint, timestamp) anomalies.extend(user_anomalies)
# Check payload size anomaly if payload_size > 0 and len(stats['payload_sizes']) > 50: mean_size = np.mean(stats['payload_sizes']) std_size = np.std(stats['payload_sizes'])
if payload_size > mean_size + (3 * std_size): anomalies.append({ 'type': 'large_payload', 'severity': 'medium', 'details': f'Payload size {payload_size} bytes exceeds normal', 'baseline_mean': mean_size })
# Check for API abuse patterns abuse_patterns = self.check_api_abuse_patterns(endpoint, method, user, ip) anomalies.extend(abuse_patterns)
return anomalies
def check_user_pattern_anomalies(self, user, endpoint, timestamp): """Check for anomalies in user behavior patterns""" anomalies = [] user_data = self.user_patterns[user]
# Update user data user_data['endpoints'][endpoint] += 1 user_data['request_times'].append(timestamp)
# Check for endpoint scanning if len(user_data['endpoints']) > 50: anomalies.append({ 'type': 'endpoint_scanning', 'severity': 'high', 'details': f'User {user} accessed {len(user_data["endpoints"])} different endpoints', 'user': user })
# Check request frequency if len(user_data['request_times']) > 10: recent_requests = [ t for t in user_data['request_times'] if datetime.fromisoformat(t) > datetime.now() - timedelta(minutes=1) ]
if len(recent_requests) > 100: # >100 requests per minute anomalies.append({ 'type': 'high_request_rate', 'severity': 'high', 'details': f'User {user} made {len(recent_requests)} requests in last minute', 'user': user })
return anomalies
def check_api_abuse_patterns(self, endpoint, method, user, ip): """Check for API abuse patterns""" anomalies = []
# Check for dangerous endpoints dangerous_patterns = [ '/admin/', '/config/', '/debug/', '/internal/', '/api/v1/users/delete', '/api/v1/data/export' ]
for pattern in dangerous_patterns: if pattern in endpoint: anomalies.append({ 'type': 'dangerous_endpoint_access', 'severity': 'critical', 'details': f'Access to dangerous endpoint: {endpoint}', 'user': user, 'endpoint': endpoint }) break
# Check for method anomalies if method in ['DELETE', 'PUT'] and '/api/v1/' in endpoint: # Check if user typically uses these methods user_methods = self.user_patterns[user].get('methods', defaultdict(int)) if user_methods[method] < 5: # Rarely uses these methods anomalies.append({ 'type': 'unusual_method', 'severity': 'medium', 'details': f'Unusual {method} request from user {user}', 'user': user, 'method': method })
return anomalies
Use Case 4: Insider Threat Detection
Configuration for Insider Threat Monitoring
<group name="insider_threat,anomaly">
<!-- After hours data access --> <rule id="100030" level="8"> <if_sid>550,551,552</if_sid> <time>weekdays: 20:00-06:00</time> <field name="file.path">confidential|sensitive|secret</field> <description>Sensitive file accessed after business hours</description> <group>insider_threat,data_theft</group> </rule>
<!-- Mass file download --> <rule id="100031" level="10" frequency="50" timeframe="300"> <if_sid>550</if_sid> <same_user /> <description>Mass file download detected - possible data exfiltration</description> <mitre> <id>T1567</id> </mitre> <group>insider_threat,exfiltration</group> </rule>
<!-- USB device usage anomaly --> <rule id="100032" level="9"> <decoded_as>usb_monitor</decoded_as> <field name="action">mount</field> <field name="device_type">storage</field> <time>weekdays: 18:00-08:00</time> <description>USB storage device connected after hours</description> <group>insider_threat,removable_media</group> </rule>
<!-- Email to personal account --> <rule id="100033" level="9"> <if_sid>3600</if_sid> <field name="mail.to">gmail.com|yahoo.com|hotmail.com|outlook.com</field> <field name="mail.attachments" compare="greater">0</field> <description>Email with attachments sent to personal account</description> <group>insider_threat,data_leak</group> </rule>
</group>
Insider Threat Scoring System
#!/usr/bin/env python3import jsonimport sqlite3from datetime import datetime, timedeltafrom collections import defaultdictimport networkx as nx
class InsiderThreatScorer: def __init__(self, db_path='/var/ossec/logs/insider_threat.db'): self.conn = sqlite3.connect(db_path) self.init_database() self.risk_weights = { 'after_hours_access': 15, 'sensitive_file_access': 20, 'mass_download': 30, 'unusual_application': 10, 'privilege_escalation': 35, 'data_staging': 25, 'external_transfer': 40, 'policy_violation': 20, 'anomalous_behavior': 15 }
def init_database(self): """Initialize threat scoring database""" cursor = self.conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS threat_scores ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME, user TEXT, indicator TEXT, score INTEGER, details TEXT, cumulative_score INTEGER ) ''')
cursor.execute(''' CREATE TABLE IF NOT EXISTS user_risk_profiles ( user TEXT PRIMARY KEY, current_score INTEGER, peak_score INTEGER, indicators_count INTEGER, first_seen DATETIME, last_updated DATETIME, risk_level TEXT ) ''') self.conn.commit()
def process_event(self, event): """Process security event and update threat score""" user = event.get('user', 'unknown') timestamp = event.get('timestamp', datetime.now().isoformat()) indicators = self.extract_indicators(event)
total_score = 0 threat_details = []
for indicator in indicators: score = self.risk_weights.get(indicator['type'], 10)
# Apply contextual multipliers score = self.apply_context_multipliers(score, indicator, event)
total_score += score threat_details.append({ 'indicator': indicator['type'], 'score': score, 'details': indicator.get('details', '') })
# Store indicator self.store_indicator(user, timestamp, indicator['type'], score, indicator.get('details', ''))
# Update user risk profile self.update_risk_profile(user, total_score, timestamp)
# Check if threshold exceeded risk_assessment = self.assess_risk(user)
return { 'user': user, 'event_score': total_score, 'cumulative_score': risk_assessment['cumulative_score'], 'risk_level': risk_assessment['risk_level'], 'indicators': threat_details, 'recommended_action': risk_assessment['recommended_action'] }
def extract_indicators(self, event): """Extract threat indicators from event""" indicators = []
# Check time-based anomalies event_time = datetime.fromisoformat(event['timestamp']) if event_time.hour < 6 or event_time.hour > 20: if 'file' in event and 'sensitive' in event.get('file', {}).get('path', '').lower(): indicators.append({ 'type': 'after_hours_access', 'details': f'Accessed sensitive file at {event_time.hour}:00' })
# Check for mass operations if event.get('operation_count', 0) > 50: indicators.append({ 'type': 'mass_download', 'details': f'{event["operation_count"]} files accessed' })
# Check for data staging if 'file' in event: path = event['file'].get('path', '') if any(staging in path.lower() for staging in ['temp', 'tmp', 'staging', 'export']): if event['file'].get('size', 0) > 100_000_000: # 100MB indicators.append({ 'type': 'data_staging', 'details': f'Large file in staging area: {path}' })
# Check for external transfer if 'network' in event: dest_ip = event['network'].get('dest_ip', '') if not self.is_internal_ip(dest_ip): if event['network'].get('bytes_out', 0) > 50_000_000: # 50MB indicators.append({ 'type': 'external_transfer', 'details': f'Large external transfer to {dest_ip}' })
# Check for privilege escalation if event.get('action') == 'privilege_escalation': indicators.append({ 'type': 'privilege_escalation', 'details': event.get('details', '') })
return indicators
def apply_context_multipliers(self, base_score, indicator, event): """Apply contextual multipliers to risk score""" multiplier = 1.0
# User role multiplier user_role = event.get('user_role', '') if user_role in ['admin', 'root', 'administrator']: multiplier *= 1.5 elif user_role in ['developer', 'engineer']: multiplier *= 1.3
# Repeat behavior multiplier if self.is_repeat_behavior(event['user'], indicator['type']): multiplier *= 0.7 # Lower score for consistent behavior else: multiplier *= 1.5 # Higher score for new behavior
# Time-based multiplier if indicator['type'] in ['after_hours_access', 'external_transfer']: event_time = datetime.fromisoformat(event['timestamp']) if event_time.weekday() >= 5: # Weekend multiplier *= 1.5
return int(base_score * multiplier)
def is_repeat_behavior(self, user, indicator_type): """Check if this is repeat behavior for the user""" cursor = self.conn.cursor() cursor.execute(''' SELECT COUNT(*) FROM threat_scores WHERE user = ? AND indicator = ? AND timestamp > ? ''', (user, indicator_type, (datetime.now() - timedelta(days=30)).isoformat()))
count = cursor.fetchone()[0] return count > 5
def update_risk_profile(self, user, new_score, timestamp): """Update user's risk profile""" cursor = self.conn.cursor()
# Get current profile cursor.execute( 'SELECT current_score, peak_score, indicators_count FROM user_risk_profiles WHERE user = ?', (user,) ) result = cursor.fetchone()
if result: current_score = result[0] + new_score peak_score = max(result[1], current_score) indicators_count = result[2] + 1 else: current_score = new_score peak_score = new_score indicators_count = 1
# Calculate risk level if current_score >= 100: risk_level = 'critical' elif current_score >= 75: risk_level = 'high' elif current_score >= 50: risk_level = 'medium' elif current_score >= 25: risk_level = 'low' else: risk_level = 'minimal'
# Update profile cursor.execute(''' INSERT OR REPLACE INTO user_risk_profiles (user, current_score, peak_score, indicators_count, first_seen, last_updated, risk_level) VALUES (?, ?, ?, ?, COALESCE((SELECT first_seen FROM user_risk_profiles WHERE user = ?), ?), ?, ?) ''', (user, current_score, peak_score, indicators_count, user, timestamp, timestamp, risk_level))
self.conn.commit()
def assess_risk(self, user): """Assess user's current risk level""" cursor = self.conn.cursor() cursor.execute( 'SELECT current_score, risk_level, indicators_count FROM user_risk_profiles WHERE user = ?', (user,) ) result = cursor.fetchone()
if not result: return { 'cumulative_score': 0, 'risk_level': 'minimal', 'recommended_action': 'continue_monitoring' }
cumulative_score, risk_level, indicators_count = result
# Determine recommended action if risk_level == 'critical': action = 'immediate_investigation' elif risk_level == 'high': action = 'priority_review' elif risk_level == 'medium': action = 'enhanced_monitoring' else: action = 'continue_monitoring'
# Check for rapid score increase cursor.execute(''' SELECT SUM(score) FROM threat_scores WHERE user = ? AND timestamp > ? ''', (user, (datetime.now() - timedelta(hours=1)).isoformat()))
recent_score = cursor.fetchone()[0] or 0 if recent_score > 50: action = 'immediate_investigation'
return { 'cumulative_score': cumulative_score, 'risk_level': risk_level, 'indicators_count': indicators_count, 'recommended_action': action, 'recent_activity_score': recent_score }
def is_internal_ip(self, ip): """Check if IP is internal""" internal_ranges = ['10.', '172.16.', '172.17.', '172.18.', '172.19.', '172.20.', '172.21.', '172.22.', '172.23.', '172.24.', '172.25.', '172.26.', '172.27.', '172.28.', '172.29.', '172.30.', '172.31.', '192.168.'] return any(ip.startswith(range) for range in internal_ranges)
def store_indicator(self, user, timestamp, indicator_type, score, details): """Store threat indicator in database""" cursor = self.conn.cursor()
# Get cumulative score cursor.execute( 'SELECT current_score FROM user_risk_profiles WHERE user = ?', (user,) ) result = cursor.fetchone() cumulative = result[0] if result else score
cursor.execute(''' INSERT INTO threat_scores (timestamp, user, indicator, score, details, cumulative_score) VALUES (?, ?, ?, ?, ?, ?) ''', (timestamp, user, indicator_type, score, details, cumulative))
self.conn.commit()
Use Case 5: Machine Learning Integration
Anomaly Detection with Isolation Forest
#!/usr/bin/env python3import jsonimport joblibimport numpy as npfrom sklearn.ensemble import IsolationForestfrom sklearn.preprocessing import StandardScalerimport pandas as pdfrom datetime import datetime, timedelta
class MLAnomalyDetector: def __init__(self, model_path='/var/ossec/models/'): self.model_path = model_path self.models = {} self.scalers = {} self.feature_extractors = { 'network': self.extract_network_features, 'authentication': self.extract_auth_features, 'file_access': self.extract_file_features, 'process': self.extract_process_features } self.load_models()
def load_models(self): """Load pre-trained models""" for model_type in ['network', 'authentication', 'file_access', 'process']: try: self.models[model_type] = joblib.load(f'{self.model_path}/{model_type}_model.pkl') self.scalers[model_type] = joblib.load(f'{self.model_path}/{model_type}_scaler.pkl') except: # Initialize new model if not found self.models[model_type] = IsolationForest( contamination=0.1, random_state=42, n_estimators=100 ) self.scalers[model_type] = StandardScaler()
def extract_network_features(self, event): """Extract features for network anomaly detection""" features = { 'src_port': int(event.get('src_port', 0)), 'dst_port': int(event.get('dst_port', 0)), 'packet_size': int(event.get('packet_size', 0)), 'duration': int(event.get('duration', 0)), 'bytes_in': int(event.get('bytes_in', 0)), 'bytes_out': int(event.get('bytes_out', 0)), 'packets_in': int(event.get('packets_in', 0)), 'packets_out': int(event.get('packets_out', 0)), 'protocol_tcp': 1 if event.get('protocol') == 'tcp' else 0, 'protocol_udp': 1 if event.get('protocol') == 'udp' else 0, 'hour': datetime.fromisoformat(event.get('timestamp', datetime.now().isoformat())).hour, 'is_weekend': 1 if datetime.fromisoformat(event.get('timestamp', datetime.now().isoformat())).weekday() >= 5 else 0 } return features
def extract_auth_features(self, event): """Extract features for authentication anomaly detection""" features = { 'hour': datetime.fromisoformat(event.get('timestamp', datetime.now().isoformat())).hour, 'day_of_week': datetime.fromisoformat(event.get('timestamp', datetime.now().isoformat())).weekday(), 'is_weekend': 1 if datetime.fromisoformat(event.get('timestamp', datetime.now().isoformat())).weekday() >= 5 else 0, 'auth_success': 1 if event.get('outcome') == 'success' else 0, 'source_ip_octets': self.ip_to_features(event.get('source_ip', '0.0.0.0')), 'auth_type_password': 1 if event.get('auth_type') == 'password' else 0, 'auth_type_key': 1 if event.get('auth_type') == 'publickey' else 0, 'auth_type_kerberos': 1 if event.get('auth_type') == 'kerberos' else 0 }
# Flatten IP octets ip_features = features.pop('source_ip_octets') for i, octet in enumerate(ip_features): features[f'ip_octet_{i}'] = octet
return features
def extract_file_features(self, event): """Extract features for file access anomaly detection""" file_path = event.get('file_path', '') features = { 'hour': datetime.fromisoformat(event.get('timestamp', datetime.now().isoformat())).hour, 'file_size': int(event.get('file_size', 0)), 'operation_read': 1 if event.get('operation') == 'read' else 0, 'operation_write': 1 if event.get('operation') == 'write' else 0, 'operation_delete': 1 if event.get('operation') == 'delete' else 0, 'is_hidden': 1 if file_path.startswith('.') or '/.' in file_path else 0, 'is_system': 1 if any(sys_path in file_path for sys_path in ['/etc/', '/sys/', '/proc/']) else 0, 'path_depth': file_path.count('/'), 'extension_executable': 1 if any(file_path.endswith(ext) for ext in ['.exe', '.sh', '.bat', '.cmd']) else 0, 'extension_config': 1 if any(file_path.endswith(ext) for ext in ['.conf', '.cfg', '.ini', '.yaml']) else 0 } return features
def extract_process_features(self, event): """Extract features for process anomaly detection""" features = { 'hour': datetime.fromisoformat(event.get('timestamp', datetime.now().isoformat())).hour, 'cpu_usage': float(event.get('cpu_usage', 0)), 'memory_usage': float(event.get('memory_usage', 0)), 'thread_count': int(event.get('thread_count', 1)), 'ppid': int(event.get('ppid', 0)), 'nice_value': int(event.get('nice', 0)), 'is_system_process': 1 if int(event.get('uid', 1000)) < 1000 else 0, 'has_network': 1 if event.get('network_connections', 0) > 0 else 0, 'child_count': int(event.get('child_processes', 0)), 'file_descriptors': int(event.get('open_files', 0)) } return features
def ip_to_features(self, ip): """Convert IP address to numerical features""" try: octets = [int(x) for x in ip.split('.')] return octets + [0] * (4 - len(octets)) except: return [0, 0, 0, 0]
def detect_anomaly(self, event_type, event_data): """Detect anomaly using appropriate model""" if event_type not in self.models: return None
# Extract features feature_extractor = self.feature_extractors.get(event_type) if not feature_extractor: return None
features = feature_extractor(event_data)
# Convert to array feature_array = np.array(list(features.values())).reshape(1, -1)
# Scale features try: scaled_features = self.scalers[event_type].transform(feature_array) except: # Fit scaler if not fitted scaled_features = self.scalers[event_type].fit_transform(feature_array)
# Predict prediction = self.models[event_type].predict(scaled_features)[0] anomaly_score = self.models[event_type].score_samples(scaled_features)[0]
if prediction == -1: # Anomaly detected return { 'is_anomaly': True, 'anomaly_score': float(-anomaly_score), # Convert to positive score 'event_type': event_type, 'features': features, 'severity': self.calculate_severity(anomaly_score), 'confidence': self.calculate_confidence(anomaly_score) }
return { 'is_anomaly': False, 'anomaly_score': float(-anomaly_score), 'event_type': event_type }
def calculate_severity(self, anomaly_score): """Calculate severity based on anomaly score""" if anomaly_score < -0.5: return 'critical' elif anomaly_score < -0.3: return 'high' elif anomaly_score < -0.1: return 'medium' else: return 'low'
def calculate_confidence(self, anomaly_score): """Calculate confidence level""" # Map anomaly score to confidence percentage confidence = min(100, max(0, (abs(anomaly_score) + 0.5) * 100)) return round(confidence, 2)
def update_model(self, event_type, new_data, labels=None): """Update model with new data (online learning)""" if event_type not in self.models: return
# Extract features for all new data feature_extractor = self.feature_extractors.get(event_type) features_list = []
for event in new_data: features = feature_extractor(event) features_list.append(list(features.values()))
# Convert to array X = np.array(features_list)
# Update scaler self.scalers[event_type].partial_fit(X)
# Scale features X_scaled = self.scalers[event_type].transform(X)
# Retrain model (in practice, you might want to use incremental learning) if labels is None: # Unsupervised retraining self.models[event_type].fit(X_scaled) else: # Semi-supervised if labels are provided # IsolationForest doesn't support this directly, # but you could use this for model selection pass
# Save updated model joblib.dump(self.models[event_type], f'{self.model_path}/{event_type}_model.pkl') joblib.dump(self.scalers[event_type], f'{self.model_path}/{event_type}_scaler.pkl')
def main(): # Read event from Wazuh event = json.loads(sys.stdin.read())
# Determine event type rule_id = event.get('rule', {}).get('id', '')
if rule_id.startswith('5'): event_type = 'authentication' elif rule_id.startswith('6'): event_type = 'network' elif rule_id.startswith('7'): event_type = 'file_access' elif rule_id.startswith('8'): event_type = 'process' else: event_type = None
if event_type: detector = MLAnomalyDetector() result = detector.detect_anomaly(event_type, event.get('data', {}))
if result and result['is_anomaly']: # Generate alert anomaly_alert = { 'integration': 'ml_anomaly_detector', 'anomaly': result, 'original_event': event, 'timestamp': datetime.now().isoformat(), 'description': f'ML-detected anomaly in {event_type} behavior' }
print(json.dumps(anomaly_alert))
if __name__ == '__main__': main()
Integration and Deployment
Wazuh Manager Configuration
<ossec_config> <!-- Enable anomaly detection integrations --> <integration> <name>behavioral_anomaly</name> <hook_url>file:///var/ossec/integrations/behavioral_anomaly.py</hook_url> <rule_id>5501,5503,5901,550,551,552,86001-86010</rule_id> <alert_format>json</alert_format> <options>{"threshold": 2.5, "window_days": 7}</options> </integration>
<integration> <name>dga_detector</name> <hook_url>file:///var/ossec/integrations/dga_detector.py</hook_url> <rule_id>34001-34100</rule_id> <alert_format>json</alert_format> </integration>
<integration> <name>ml_anomaly_detector</name> <hook_url>file:///var/ossec/integrations/ml_anomaly_detector.py</hook_url> <rule_id>all</rule_id> <alert_format>json</alert_format> <options>{"model_path": "/var/ossec/models/"}</options> </integration>
<!-- Active response for anomalies --> <active-response> <disabled>no</disabled> <command>anomaly-response</command> <location>local</location> <rules_id>100001-100100</rules_id> <timeout>300</timeout> </active-response></ossec_config>
Active Response Script
#!/bin/bashACTION=$1USER=$2IP=$3ALERT_ID=$4RULE_ID=$5
LOG_FILE="/var/ossec/logs/active-responses.log"
DATE=$(date +"%Y-%m-%d %H:%M:%S")echo "[$DATE] Anomaly response triggered: Action=$ACTION User=$USER IP=$IP Alert=$ALERT_ID Rule=$RULE_ID" >> $LOG_FILE
case $RULE_ID in # High-risk anomalies - immediate action 10003[0-9]|10004[0-9]) if [ "$ACTION" = "add" ]; then # Block user account usermod -L "$USER" 2>/dev/null echo "[$DATE] User $USER account locked due to high-risk anomaly" >> $LOG_FILE
# Kill user sessions pkill -KILL -u "$USER"
# Send alert to security team /var/ossec/integrations/send_alert.py "Critical anomaly detected for user $USER" fi ;;
# Medium-risk anomalies - enhanced monitoring 10002[0-9]) if [ "$ACTION" = "add" ]; then # Enable detailed logging for user echo "$USER" >> /var/ossec/logs/enhanced_monitoring.list
# Increase audit logging auditctl -a always,exit -F arch=b64 -F uid="$USER" -S all -k anomaly_monitor fi ;;
# Network anomalies - firewall rules 10001[0-9]) if [ "$ACTION" = "add" ] && [ "$IP" != "" ]; then # Add temporary firewall rule iptables -I INPUT -s "$IP" -j DROP echo "[$DATE] Blocked IP $IP due to network anomaly" >> $LOG_FILE
# Schedule removal after timeout echo "iptables -D INPUT -s $IP -j DROP" | at now + 5 hours fi ;;esac
exit 0
Monitoring and Tuning
Performance Monitoring
#!/usr/bin/env python3import jsonimport sqlite3from datetime import datetime, timedeltaimport matplotlib.pyplot as pltimport seaborn as sns
class AnomalyPerformanceMonitor: def __init__(self): self.conn = sqlite3.connect('/var/ossec/logs/anomaly_metrics.db') self.init_database()
def init_database(self): cursor = self.conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS anomaly_metrics ( timestamp DATETIME, detector_type TEXT, true_positives INTEGER, false_positives INTEGER, false_negatives INTEGER, processing_time REAL, memory_usage INTEGER ) ''') self.conn.commit()
def generate_performance_report(self): """Generate performance metrics report""" # Query metrics cursor = self.conn.cursor() cursor.execute(''' SELECT detector_type, SUM(true_positives) as tp, SUM(false_positives) as fp, SUM(false_negatives) as fn, AVG(processing_time) as avg_time, MAX(memory_usage) as max_memory FROM anomaly_metrics WHERE timestamp > ? GROUP BY detector_type ''', ((datetime.now() - timedelta(days=7)).isoformat(),))
results = cursor.fetchall()
report = { 'generated_at': datetime.now().isoformat(), 'period': 'last_7_days', 'detectors': {} }
for row in results: detector_type, tp, fp, fn, avg_time, max_memory = row
# Calculate metrics precision = tp / (tp + fp) if (tp + fp) > 0 else 0 recall = tp / (tp + fn) if (tp + fn) > 0 else 0 f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
report['detectors'][detector_type] = { 'precision': round(precision, 3), 'recall': round(recall, 3), 'f1_score': round(f1_score, 3), 'avg_processing_time_ms': round(avg_time, 2), 'max_memory_mb': round(max_memory / 1024 / 1024, 2), 'total_alerts': tp + fp }
return report
def plot_anomaly_trends(self): """Generate visualization of anomaly trends""" cursor = self.conn.cursor() cursor.execute(''' SELECT DATE(timestamp) as date, detector_type, SUM(true_positives + false_positives) as total_anomalies FROM anomaly_metrics WHERE timestamp > ? GROUP BY DATE(timestamp), detector_type ORDER BY date ''', ((datetime.now() - timedelta(days=30)).isoformat(),))
data = cursor.fetchall()
# Create plot plt.figure(figsize=(12, 6))
# Process data by detector type detectors = {} for date, detector, count in data: if detector not in detectors: detectors[detector] = {'dates': [], 'counts': []} detectors[detector]['dates'].append(date) detectors[detector]['counts'].append(count)
# Plot lines for each detector for detector, values in detectors.items(): plt.plot(values['dates'], values['counts'], label=detector, marker='o')
plt.xlabel('Date') plt.ylabel('Anomaly Count') plt.title('Anomaly Detection Trends (Last 30 Days)') plt.legend() plt.xticks(rotation=45) plt.tight_layout()
# Save plot plt.savefig('/var/ossec/reports/anomaly_trends.png') plt.close()
Best Practices
1. Baseline Establishment
- Allow 2-4 weeks for initial baseline creation
- Regularly update baselines to adapt to legitimate changes
- Separate baselines for different user groups and time periods
2. False Positive Reduction
# Whitelist managementclass AnomalyWhitelist: def __init__(self): self.whitelist = { 'users': ['backup_user', 'monitoring_user'], 'processes': ['backup.sh', 'health_check.py'], 'ips': ['10.0.0.10', '10.0.0.11'], # Monitoring servers 'patterns': [ r'^/var/log/.*\.log$', # Log file access r'^/tmp/systemd-.*' # System temporary files ] }
def is_whitelisted(self, event_type, value): return value in self.whitelist.get(event_type, [])
3. Tuning Recommendations
- Start with higher thresholds and gradually lower them
- Use feedback loops to improve detection accuracy
- Implement time-based and context-aware thresholds
- Regular review of anomaly patterns and adjustments
4. Integration Guidelines
- Test integrations in isolated environments first
- Implement gradual rollout with monitoring
- Maintain separate configurations for different environments
- Document all custom rules and modifications
Conclusion
Wazuh’s anomaly detection capabilities provide a powerful layer of security beyond traditional signature-based detection. By implementing these use cases:
- Behavioral Analysis: Detect deviations from normal user and system behavior
- Statistical Anomalies: Identify outliers in system metrics and patterns
- Machine Learning: Leverage AI for advanced threat detection
- Insider Threats: Monitor and score internal security risks
- Zero-Day Protection: Detect unknown threats through behavioral patterns
The key to successful anomaly detection is continuous tuning, regular baseline updates, and integration with existing security workflows. These implementations provide a foundation for building a comprehensive anomaly detection system tailored to your organization’s specific needs.