Wazuh Snapshot Migration Guide: Data Analysis, Rule Engine, and Windows Monitoring
This comprehensive guide covers Wazuh’s data analysis workflow, rule engine architecture, snapshot migration procedures, and Windows monitoring capabilities using WMI and Icinga. Understanding these components is crucial for effective SIEM deployment and management.
Table of Contents
Wazuh Data Analysis Workflow
Complete Data Flow Architecture
The Wazuh data analysis process follows a structured pipeline from log collection to alert generation:
graph TD A[Log Collection & Preprocessing] --> B[Decoder Selection] B --> C[JSON Decoder] B --> D[Dynamic Fields Decoder] B --> E[Sibling Decoders] B --> F[Custom Decoders] C --> G[Normalized Data] D --> G E --> G F --> G G --> H[Rule Matching Engine] H --> I[Default Rules] H --> J[Custom Rules] H --> K[Classification Rules] I --> L[Alert Generation] J --> L K --> L L --> M[Output & Notifications]
Log Collection and Preprocessing
The first stage involves gathering logs from various sources:
Agent Log Collection:
<!-- ossec.conf on agent --><localfile> <log_format>json</log_format> <location>/var/log/application.log</location></localfile>
<localfile> <log_format>syslog</log_format> <location>/var/log/syslog</location></localfile>
<localfile> <log_format>eventlog</log_format> <location>Security</location></localfile>
Log Forwarding and Aggregation:
<!-- Manager configuration --><global> <logall>yes</logall> <logall_json>yes</logall_json> <integrity_checking>yes</integrity_checking></global>
<remote> <connection>syslog</connection> <port>514</port> <protocol>udp</protocol></remote>
Decoder Selection and Processing
Wazuh uses multiple decoder types to handle different log formats:
JSON Decoder Configuration
<decoder name="json"> <program_name>^json_decoder$</program_name> <plugin_decoder>JSON_Decoder</plugin_decoder></decoder>
<decoder name="application-json"> <parent>json</parent> <program_name>^application$</program_name> <json_null_field>null</json_null_field> <json_array_structure>array</json_array_structure></decoder>
Dynamic Fields Decoder
<decoder name="dynamic-fields"> <program_name>^dynamic$</program_name> <regex offset="after_parent">^(\S+)\s+(\S+)\s+(.+)$</regex> <order>field1,field2,extra_data</order></decoder>
<decoder name="dynamic-fields-child"> <parent>dynamic-fields</parent> <regex>^(\w+)=(\S+)</regex> <order>key,value</order></decoder>
Sibling Decoders for Related Events
<decoder name="login-attempt"> <program_name>^ssh$</program_name> <regex>^(\w+)\s+(\d+)\s+(\S+)\s+(.+)$</regex> <order>status,pid,user,srcip</order></decoder>
<decoder name="login-success"> <parent>login-attempt</parent> <regex>Accepted</regex> <fts>name,user,srcip</fts></decoder>
<decoder name="login-failure"> <parent>login-attempt</parent> <regex>Failed</regex> <fts>name,user,srcip</fts></decoder>
Rule Engine Architecture
Rule Definition Structure
Wazuh rules are defined in XML format with specific matching criteria:
<group name="security,authentication"> <!-- Default Rules --> <rule id="5500" level="3"> <if_sid>5400</if_sid> <match>^authentication success</match> <description>User authentication success.</description> <group>authentication_success,</group> </rule>
<!-- Custom Rules --> <rule id="100001" level="7"> <if_sid>5400</if_sid> <match>^authentication failure</match> <same_source_ip /> <frequency>5</frequency> <timeframe>300</timeframe> <description>Multiple authentication failures from same source.</description> <group>authentication_failures,multiple_failures,</group> </rule>
<!-- Classification Rules --> <rule id="100002" level="10"> <if_matched_sid>100001</if_matched_sid> <same_source_ip /> <frequency>3</frequency> <timeframe>600</timeframe> <description>Persistent authentication attacks detected.</description> <group>authentication_attack,brute_force,</group> </rule></group>
Rule Types and Hierarchy
Default Rules
<!-- System default rules --><rule id="1001" level="1"> <category>ids</category> <decoded_as>sshd</decoded_as> <description>SSH messages grouped.</description></rule>
<rule id="5700" level="8"> <if_sid>5500</if_sid> <match>^Failed|^error|^bad|^invalid</match> <description>SSH authentication failure.</description> <group>authentication_failed,</group></rule>
Custom Rules with Advanced Logic
<rule id="100100" level="12"> <if_sid>5700</if_sid> <srcip>!^192.168.</srcip> <same_source_ip /> <frequency>10</frequency> <timeframe>240</timeframe> <description>SSH brute force attack from external source.</description> <group>attack,brute_force,</group> <options>no_email_alert</options></rule>
Classification Rules for Event Correlation
<rule id="100200" level="15"> <if_matched_sid>100100</if_matched_sid> <same_source_ip /> <description>Critical: Coordinated attack detected.</description> <group>attack,coordinated,critical,</group></rule>
Complete Ruleset Configuration
A comprehensive ruleset configuration combines decoders and rules:
<?xml version="1.0" encoding="UTF-8"?><root> <!-- Decoders Section --> <decoder_section> <decoder name="json"> <program_name>^json_app$</program_name> <plugin_decoder>JSON_Decoder</plugin_decoder> </decoder>
<decoder name="dynamic-fields"> <program_name>^app$</program_name> <regex>^(\S+)\s+(\S+)\s+(.+)$</regex> <order>timestamp,level,message</order> </decoder>
<decoder name="sibling-auth"> <program_name>^auth$</program_name> <regex>^(\w+)\s+(\S+)\s+(\S+)$</regex> <order>action,user,result</order> </decoder>
<decoder name="custom-app"> <program_name>^myapp$</program_name> <regex>^Event:\s+(\w+)\s+User:\s+(\S+)\s+IP:\s+(\S+)$</regex> <order>event_type,username,client_ip</order> </decoder> </decoder_section>
<!-- Rules Section --> <rules_section> <!-- Default Rules --> <rule id="1000" level="0"> <description>Generic rule for application events.</description> </rule>
<rule id="1001" level="3"> <if_sid>1000</if_sid> <program_name>^myapp$</program_name> <description>MyApp events.</description> </rule>
<!-- Custom Rules --> <rule id="100001" level="7"> <if_sid>1001</if_sid> <field name="event_type">^login_failure$</field> <description>Application login failure.</description> <group>authentication_failed,</group> </rule>
<rule id="100002" level="10"> <if_matched_sid>100001</if_matched_sid> <same_field>client_ip</same_field> <frequency>5</frequency> <timeframe>300</timeframe> <description>Multiple login failures from same IP.</description> <group>brute_force,authentication_attack,</group> </rule>
<!-- Classification Rules --> <rule id="100003" level="12"> <if_matched_sid>100002</if_matched_sid> <same_field>client_ip</same_field> <frequency>3</frequency> <timeframe>600</timeframe> <description>Persistent brute force attack detected.</description> <group>attack,persistent,critical,</group> </rule> </rules_section></root>
Snapshot Migration Procedures
Migration Strategy Overview
When migrating Wazuh data between clusters or performing system upgrades:
#!/bin/bash# Wazuh Migration Script
SOURCE_MANAGER="192.168.1.100"TARGET_MANAGER="192.168.1.200"MIGRATION_DATE=$(date +%Y%m%d_%H%M%S)
# Phase 1: Pre-migration Assessmentecho "=== Phase 1: Assessment ==="ssh root@$SOURCE_MANAGER " wazuh-control status du -sh /var/ossec/ df -h /var/ossec/"
# Phase 2: Create Snapshotecho "=== Phase 2: Creating Snapshot ==="ssh root@$SOURCE_MANAGER " /var/ossec/bin/wazuh-control stop tar -czf /tmp/wazuh-backup-$MIGRATION_DATE.tar.gz \ -C /var/ossec/ \ etc/ logs/ queue/ var/ ruleset/ /var/ossec/bin/wazuh-control start"
# Phase 3: Transfer Dataecho "=== Phase 3: Data Transfer ==="scp root@$SOURCE_MANAGER:/tmp/wazuh-backup-$MIGRATION_DATE.tar.gz \ /tmp/wazuh-backup-$MIGRATION_DATE.tar.gz
scp /tmp/wazuh-backup-$MIGRATION_DATE.tar.gz \ root@$TARGET_MANAGER:/tmp/
# Phase 4: Restore on Targetecho "=== Phase 4: Restore ==="ssh root@$TARGET_MANAGER " /var/ossec/bin/wazuh-control stop cd /var/ossec/ tar -xzf /tmp/wazuh-backup-$MIGRATION_DATE.tar.gz chown -R wazuh:wazuh /var/ossec/ /var/ossec/bin/wazuh-control start"
# Phase 5: Validationecho "=== Phase 5: Validation ==="ssh root@$TARGET_MANAGER " /var/ossec/bin/wazuh-control status tail -f /var/ossec/logs/ossec.log"
Agent Re-registration Script
#!/bin/bash# Agent re-registration for new manager
NEW_MANAGER_IP="192.168.1.200"AGENT_CONFIG="/var/ossec/etc/ossec.conf"
# Backup current configurationcp $AGENT_CONFIG ${AGENT_CONFIG}.backup
# Update manager IPsed -i "s/<address>.*<\/address>/<address>$NEW_MANAGER_IP<\/address>/" $AGENT_CONFIG
# Remove agent keyrm -f /var/ossec/etc/client.keys
# Restart agentsystemctl restart wazuh-agent
# Check connectiontail -f /var/ossec/logs/ossec.log | grep -i "connected\|error"
Windows Monitoring with WMI
WMI Configuration for Remote Monitoring
Windows Target Configuration
Enable WMI remote access on target Windows systems:
# Enable WMI through Windows Firewallnetsh advfirewall firewall set rule group="Windows Management Instrumentation (WMI)" new enable=yes
# Configure DCOM permissionspowershell -Command "Set-ItemProperty -Path 'HKLM:\SOFTWARE\Microsoft\Ole' -Name 'EnableDCOM' -Value 'Y'"
# Disable Remote UAC filteringreg add HKLM\SOFTWARE\Microsoft\Windows\CurrentVersion\Policies\System /v LocalAccountTokenFilterPolicy /t REG_DWORD /d 1 /f
# Restart WMI servicenet stop winmgmtnet start winmgmt
Linux WMI Client Setup
Install and configure WMI tools on Linux monitoring server:
# Install dependenciessudo yum install -y gcc python3-devel libffi-devel openssl-devel
# Install Python WMI librariespip3 install wmi-client-wrapper pywinrm
# Test WMI connectivitypython3 << EOFimport wmic = wmi.WMI(computer="192.168.1.27", user="Administrator", password="Password123")for process in c.Win32_Process(): print(f"PID: {process.ProcessId}, Name: {process.Name}")EOF
Advanced WMI Queries
System Information Queries
#!/usr/bin/env python3import wmiimport jsonfrom datetime import datetime
class WindowsMonitor: def __init__(self, host, username, password): self.connection = wmi.WMI( computer=host, user=username, password=password )
def get_system_info(self): """Get comprehensive system information""" try: # Operating System Details os_info = [] for os in self.connection.Win32_OperatingSystem(): os_info.append({ "caption": os.Caption, "version": os.Version, "build_number": os.BuildNumber, "total_memory": int(os.TotalVisibleMemorySize) * 1024, "free_memory": int(os.FreePhysicalMemory) * 1024, "last_boot": os.LastBootUpTime })
# CPU Information cpu_info = [] for cpu in self.connection.Win32_Processor(): cpu_info.append({ "name": cpu.Name, "cores": cpu.NumberOfCores, "logical_processors": cpu.NumberOfLogicalProcessors, "load_percentage": cpu.LoadPercentage })
# Disk Information disk_info = [] for disk in self.connection.Win32_LogicalDisk(): if disk.Size: disk_info.append({ "drive": disk.Caption, "filesystem": disk.FileSystem, "size": int(disk.Size), "free_space": int(disk.FreeSpace), "used_percentage": round((1 - int(disk.FreeSpace) / int(disk.Size)) * 100, 2) })
return { "timestamp": datetime.now().isoformat(), "operating_system": os_info, "processors": cpu_info, "disks": disk_info }
except Exception as e: return {"error": str(e)}
def get_running_processes(self): """Get list of running processes""" processes = [] try: for process in self.connection.Win32_Process(): processes.append({ "process_id": process.ProcessId, "name": process.Name, "executable_path": process.ExecutablePath, "command_line": process.CommandLine, "working_set_size": process.WorkingSetSize }) return { "timestamp": datetime.now().isoformat(), "processes": processes } except Exception as e: return {"error": str(e)}
def get_security_events(self, hours=24): """Get Windows Security Event Log entries""" events = [] try: query = f""" SELECT * FROM Win32_NTLogEvent WHERE Logfile = 'Security' AND TimeGenerated > '{(datetime.now() - timedelta(hours=hours)).strftime("%Y%m%d%H%M%S")}.000000+000' """
for event in self.connection.query(query): events.append({ "event_id": event.EventCode, "time_generated": event.TimeGenerated, "source_name": event.SourceName, "message": event.Message, "user": event.User, "computer_name": event.ComputerName })
return { "timestamp": datetime.now().isoformat(), "events": events } except Exception as e: return {"error": str(e)}
# Usage exampleif __name__ == "__main__": monitor = WindowsMonitor("192.168.1.27", "Administrator", "Password123")
# Get system information system_info = monitor.get_system_info() print(json.dumps(system_info, indent=2))
# Get running processes processes = monitor.get_running_processes() print(json.dumps(processes, indent=2))
Go-based WMI Monitoring
Using the go-msrpc library for efficient WMI queries:
package main
import ( "context" "encoding/json" "fmt" "log" "time"
"github.com/oiweiwei/go-msrpc/msrpc/dcom/wmi" "github.com/oiweiwei/go-msrpc/msrpc/dcom")
type WindowsMetrics struct { Timestamp time.Time `json:"timestamp"` Processes []Process `json:"processes"` System SystemInfo `json:"system"`}
type Process struct { ProcessID uint32 `json:"process_id"` Name string `json:"name"` ExecutablePath string `json:"executable_path"`}
type SystemInfo struct { Caption string `json:"caption"` Version string `json:"version"` TotalMemory uint64 `json:"total_memory"` FreeMemory uint64 `json:"free_memory"`}
func main() { config := &dcom.Config{ Username: "Administrator", Password: "Password123", Domain: "WORKGROUP", Server: "192.168.1.27", }
ctx := context.Background()
// Connect to WMI conn, err := wmi.NewConnection(ctx, config) if err != nil { log.Fatal(err) } defer conn.Close()
// Query running processes processQuery := "SELECT ProcessId, Name, ExecutablePath FROM Win32_Process" processResult, err := conn.Query(ctx, processQuery) if err != nil { log.Fatal(err) }
var processes []Process for processResult.Next() { var p Process if err := processResult.Scan(&p.ProcessID, &p.Name, &p.ExecutablePath); err != nil { continue } processes = append(processes, p) }
// Query system information systemQuery := "SELECT Caption, Version, TotalVisibleMemorySize, FreePhysicalMemory FROM Win32_OperatingSystem" systemResult, err := conn.Query(ctx, systemQuery) if err != nil { log.Fatal(err) }
var system SystemInfo if systemResult.Next() { systemResult.Scan(&system.Caption, &system.Version, &system.TotalMemory, &system.FreeMemory) }
// Create metrics object metrics := WindowsMetrics{ Timestamp: time.Now(), Processes: processes, System: system, }
// Output as JSON output, _ := json.MarshalIndent(metrics, "", " ") fmt.Println(string(output))}
Icinga Integration
Installing Check WMI Plus on Linux
Comprehensive installation for agentless Windows monitoring:
#!/bin/bash# Check WMI Plus Installation Script
# Install dependenciessudo apt-get updatesudo apt-get install -y libconfig-inifiles-perl libdatetime-perl \ libscalar-list-utils-perl libnumber-format-perl libjson-perl \ libgetopt-long-descriptive-perl
# Download and install Check WMI Pluscd /tmpwget https://github.com/willixix/WMI-Plus/releases/latest/download/check_wmi_plus.tar.gztar -xzf check_wmi_plus.tar.gz
# Install pluginsudo cp check_wmi_plus/check_wmi_plus.pl /usr/local/bin/sudo cp -r check_wmi_plus/etc /etc/check_wmi_plussudo chmod +x /usr/local/bin/check_wmi_plus.pl
# Configure plugin pathssudo sed -i 's|/usr/lib/nagios/plugins|/usr/local/lib/nagios/plugins|g' /usr/local/bin/check_wmi_plus.pl
# Test installation/usr/local/bin/check_wmi_plus.pl -d -d | head -25
Icinga Configuration for Windows Monitoring
Command Definitions
object CheckCommand "check_wmi" { import "plugin-check-command" command = [ PluginDir + "/check_wmi_plus.pl" ] arguments = { "-H" = { value = "$host.address$" description = "Target host address" } "-u" = { value = "$wmi_username$" description = "WMI username" } "-p" = { value = "$wmi_password$" description = "WMI password" } "-m" = { value = "$check_mode$" description = "WMI check mode" } "-w" = { value = "$wmi_warn$" description = "Warning threshold" } "-c" = { value = "$wmi_crit$" description = "Critical threshold" } "-a" = { value = "$wmi_arg1$" description = "First argument" } "-o" = { value = "$wmi_arg2$" description = "Second argument" } }}
Service Templates
# Windows service templatetemplate Service "windows-service" { import "generic-service" check_command = "check_wmi" check_interval = 5m retry_interval = 1m vars.wmi_username = "monitoring" vars.wmi_password = "MonitoringPass123"}
Service Definitions
# CPU Utilization Checkapply Service "CPU Utilization" { import "windows-service" vars.check_mode = "checkcpu" vars.wmi_warn = "80" vars.wmi_crit = "90" assign where host.vars.os == "Windows"}
# Memory Usage Checkapply Service "Memory Usage" { import "windows-service" vars.check_mode = "checkmem" vars.wmi_warn = "85" vars.wmi_crit = "95" assign where host.vars.os == "Windows"}
# Disk Space Checkapply Service "Disk Space C:" { import "windows-service" vars.check_mode = "checkvolsize" vars.wmi_arg1 = "C:" vars.wmi_warn = "80" vars.wmi_crit = "90" assign where host.vars.os == "Windows"}
# Windows Services Checkapply Service "Critical Services" { import "windows-service" vars.check_mode = "checkservice" vars.wmi_arg1 = "Spooler,BITS,Themes" assign where host.vars.os == "Windows"}
# Event Log Monitoringapply Service "Event Log - System" { import "windows-service" vars.check_mode = "checkeventlog" vars.wmi_arg1 = "system" vars.wmi_arg2 = "2" # Hours to check vars.wmi_warn = "5" # Warning events vars.wmi_crit = "10" # Critical events assign where host.vars.os == "Windows"}
Host Configuration
object Host "windows-server-01" { import "generic-host" address = "192.168.1.27" vars.os = "Windows" vars.wmi_username = "monitoring" vars.wmi_password = "MonitoringPass123"}
Remote Windows Management
Advanced WinExe Usage
Execute commands remotely on Windows systems:
#!/bin/bash# Windows Remote Management Script
TARGET_HOST="192.168.1.27"USERNAME="Administrator"PASSWORD="Password123"CREDENTIALS="$USERNAME%$PASSWORD"
# System Information Gatheringecho "=== System Information ==="winexe -U "$CREDENTIALS" //$TARGET_HOST "systeminfo"
# Process Monitoringecho "=== Running Processes ==="winexe -U "$CREDENTIALS" //$TARGET_HOST "tasklist /fo csv"
# Service Statusecho "=== Service Status ==="winexe -U "$CREDENTIALS" //$TARGET_HOST "sc query type= service state= all"
# Network Configurationecho "=== Network Configuration ==="winexe -U "$CREDENTIALS" //$TARGET_HOST "ipconfig /all"
# Event Log Analysisecho "=== Recent Security Events ==="winexe -U "$CREDENTIALS" //$TARGET_HOST "wevtutil qe Security /c:10 /rd:true /f:text"
# Performance Countersecho "=== Performance Metrics ==="winexe -U "$CREDENTIALS" //$TARGET_HOST "typeperf \"\\Processor(_Total)\\% Processor Time\" \"\\Memory\\Available MBytes\" -sc 1"
# Registry Queriesecho "=== Registry Information ==="winexe -U "$CREDENTIALS" //$TARGET_HOST "reg query HKLM\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion /v ProductName"
# File System Informationecho "=== Disk Usage ==="winexe -U "$CREDENTIALS" //$TARGET_HOST "dir C:\\ /-c"
# Windows Updatesecho "=== Installed Updates ==="winexe -U "$CREDENTIALS" //$TARGET_HOST "wmic qfe list brief"
# Security Configurationecho "=== Security Policies ==="winexe -U "$CREDENTIALS" //$TARGET_HOST "secedit /export /cfg C:\\temp\\secpol.inf && type C:\\temp\\secpol.inf"
PowerShell Remote Execution
# Execute PowerShell commands remotelywinexe -U "$CREDENTIALS" //$TARGET_HOST "powershell -Command \"Get-Process | Sort-Object CPU -Descending | Select-Object -First 10 | Format-Table -AutoSize\""
# Get Windows featureswinexe -U "$CREDENTIALS" //$TARGET_HOST "powershell -Command \"Get-WindowsFeature | Where-Object {\\$_.InstallState -eq 'Installed'} | Select-Object Name,InstallState\""
# Check Windows Defender statuswinexe -U "$CREDENTIALS" //$TARGET_HOST "powershell -Command \"Get-MpComputerStatus | Select-Object AntivirusEnabled,RealTimeProtectionEnabled,IoavProtectionEnabled\""
Advanced Querying Techniques
Complex WQL Queries
Advanced Windows Management Instrumentation Query Language examples:
-- Monitor process creation eventsSELECT * FROM Win32_ProcessStartTrace
-- Query processes with high CPU usageSELECT ProcessId, Name, PercentProcessorTimeFROM Win32_PerfRawData_PerfProc_ProcessWHERE Name <> '_Total'AND Name <> 'Idle'
-- Check for specific security eventsSELECT * FROM Win32_NTLogEventWHERE Logfile = 'Security'AND EventCode IN (4624, 4625, 4648, 4720, 4722, 4724, 4725, 4726, 4728, 4732, 4756)AND TimeGenerated > '20250128000000.000000+000'
-- Monitor file system changesSELECT * FROM Win32_VolumeChangeEvent
-- Query installed softwareSELECT Name, Version, InstallDateFROM Win32_ProductWHERE Name LIKE '%Microsoft%'
-- Check network connectionsSELECT LocalAddress, LocalPort, RemoteAddress, RemotePort, StateFROM Win32_PerfRawData_Tcpip_NetworkInterface
-- Monitor service status changesSELECT * FROM Win32_ServiceControlEvent
-- Query system performanceSELECT Name, Frequency, LoadPercentage, NumberOfCores, NumberOfLogicalProcessorsFROM Win32_Processor
-- Check disk performanceSELECT Name, DiskReadBytesPerSec, DiskWriteBytesPerSec, CurrentDiskQueueLengthFROM Win32_PerfRawData_PerfDisk_PhysicalDiskWHERE Name <> '_Total'
-- Monitor memory usageSELECT TotalVisibleMemorySize, FreePhysicalMemory, TotalVirtualMemorySize, FreeVirtualMemoryFROM Win32_OperatingSystem
Automated Monitoring Script
#!/usr/bin/env python3"""Comprehensive Windows monitoring with Wazuh integration"""
import jsonimport wmiimport timeimport loggingimport argparsefrom datetime import datetime, timedeltafrom pathlib import Path
class WazuhWindowsMonitor: def __init__(self, host, username, password, output_dir="/var/ossec/logs/wmi"): self.host = host self.username = username self.password = password self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True)
# Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(self.output_dir / 'wmi_monitor.log'), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__)
try: self.connection = wmi.WMI( computer=host, user=username, password=password ) self.logger.info(f"Connected to {host}") except Exception as e: self.logger.error(f"Failed to connect to {host}: {e}") raise
def collect_metrics(self): """Collect comprehensive Windows metrics""" metrics = { "timestamp": datetime.now().isoformat(), "host": self.host, "wazuh_integration": True }
try: # System Information metrics["system"] = self._get_system_info()
# Process Information metrics["processes"] = self._get_process_info()
# Service Status metrics["services"] = self._get_service_status()
# Security Events metrics["security_events"] = self._get_security_events()
# Performance Metrics metrics["performance"] = self._get_performance_metrics()
# Network Information metrics["network"] = self._get_network_info()
return metrics
except Exception as e: self.logger.error(f"Error collecting metrics: {e}") return {"error": str(e), "timestamp": datetime.now().isoformat()}
def _get_system_info(self): """Get system information""" try: os_info = self.connection.Win32_OperatingSystem()[0] computer_info = self.connection.Win32_ComputerSystem()[0]
return { "os_name": os_info.Caption, "os_version": os_info.Version, "build_number": os_info.BuildNumber, "computer_name": computer_info.Name, "domain": computer_info.Domain, "total_memory": int(os_info.TotalVisibleMemorySize) * 1024, "free_memory": int(os_info.FreePhysicalMemory) * 1024, "uptime": os_info.LastBootUpTime } except Exception as e: self.logger.error(f"Error getting system info: {e}") return {"error": str(e)}
def _get_process_info(self): """Get running process information""" try: processes = [] for process in self.connection.Win32_Process(): processes.append({ "pid": process.ProcessId, "name": process.Name, "executable": process.ExecutablePath, "command_line": process.CommandLine }) return processes[:50] # Limit to top 50 processes except Exception as e: self.logger.error(f"Error getting process info: {e}") return {"error": str(e)}
def _get_service_status(self): """Get Windows service status""" try: services = [] critical_services = [ "Spooler", "BITS", "Themes", "Dhcp", "Dnscache", "EventLog", "PlugPlay", "RpcSs", "Schedule", "W32Time" ]
for service in self.connection.Win32_Service(): if service.Name in critical_services: services.append({ "name": service.Name, "display_name": service.DisplayName, "state": service.State, "start_mode": service.StartMode, "status": service.Status }) return services except Exception as e: self.logger.error(f"Error getting service status: {e}") return {"error": str(e)}
def _get_security_events(self, hours=1): """Get recent security events""" try: events = [] cutoff_time = (datetime.now() - timedelta(hours=hours)).strftime("%Y%m%d%H%M%S")
query = f""" SELECT EventCode, TimeGenerated, SourceName, Message, User FROM Win32_NTLogEvent WHERE Logfile = 'Security' AND EventCode IN (4624, 4625, 4648, 4720, 4722) AND TimeGenerated > '{cutoff_time}.000000+000' """
for event in self.connection.query(query): events.append({ "event_id": event.EventCode, "time": event.TimeGenerated, "source": event.SourceName, "user": event.User, "message": event.Message[:200] if event.Message else None })
return events[:20] # Limit to last 20 events except Exception as e: self.logger.error(f"Error getting security events: {e}") return {"error": str(e)}
def _get_performance_metrics(self): """Get system performance metrics""" try: # CPU Information cpu_info = self.connection.Win32_Processor()[0]
# Memory Information os_info = self.connection.Win32_OperatingSystem()[0]
# Disk Information disks = [] for disk in self.connection.Win32_LogicalDisk(): if disk.Size: disks.append({ "drive": disk.Caption, "total_size": int(disk.Size), "free_space": int(disk.FreeSpace), "used_percent": round((1 - int(disk.FreeSpace) / int(disk.Size)) * 100, 2) })
return { "cpu_cores": cpu_info.NumberOfCores, "cpu_logical_processors": cpu_info.NumberOfLogicalProcessors, "memory_total": int(os_info.TotalVisibleMemorySize) * 1024, "memory_free": int(os_info.FreePhysicalMemory) * 1024, "memory_used_percent": round((1 - int(os_info.FreePhysicalMemory) / int(os_info.TotalVisibleMemorySize)) * 100, 2), "disks": disks } except Exception as e: self.logger.error(f"Error getting performance metrics: {e}") return {"error": str(e)}
def _get_network_info(self): """Get network configuration""" try: adapters = [] for adapter in self.connection.Win32_NetworkAdapterConfiguration(): if adapter.IPAddress: adapters.append({ "description": adapter.Description, "ip_addresses": adapter.IPAddress, "subnet_masks": adapter.IPSubnet, "default_gateways": adapter.DefaultIPGateway, "dns_servers": adapter.DNSServerSearchOrder, "dhcp_enabled": adapter.DHCPEnabled }) return adapters except Exception as e: self.logger.error(f"Error getting network info: {e}") return {"error": str(e)}
def generate_wazuh_log(self, metrics): """Generate Wazuh-compatible log entry""" log_entry = { "wazuh": { "agent": { "name": self.host, "ip": self.host } }, "windows": metrics, "@timestamp": datetime.now().isoformat() }
# Write to Wazuh log format log_file = self.output_dir / f"windows_wmi_{datetime.now().strftime('%Y%m%d')}.json" with open(log_file, 'a') as f: f.write(json.dumps(log_entry) + '\n')
return log_entry
def run_monitoring(self, interval=300): """Run continuous monitoring""" self.logger.info(f"Starting monitoring for {self.host} with {interval}s interval")
while True: try: metrics = self.collect_metrics() log_entry = self.generate_wazuh_log(metrics)
# Print summary if "system" in metrics: self.logger.info( f"Metrics collected - Memory: {metrics['performance']['memory_used_percent']:.1f}% " f"Processes: {len(metrics['processes'])} " f"Security Events: {len(metrics['security_events'])}" )
time.sleep(interval)
except KeyboardInterrupt: self.logger.info("Monitoring stopped by user") break except Exception as e: self.logger.error(f"Monitoring error: {e}") time.sleep(60) # Wait before retrying
def main(): parser = argparse.ArgumentParser(description='Wazuh Windows WMI Monitor') parser.add_argument('--host', required=True, help='Target Windows host') parser.add_argument('--username', required=True, help='Windows username') parser.add_argument('--password', required=True, help='Windows password') parser.add_argument('--interval', type=int, default=300, help='Monitoring interval in seconds') parser.add_argument('--output-dir', default='/var/ossec/logs/wmi', help='Output directory') parser.add_argument('--one-shot', action='store_true', help='Run once and exit')
args = parser.parse_args()
monitor = WazuhWindowsMonitor( host=args.host, username=args.username, password=args.password, output_dir=args.output_dir )
if args.one_shot: metrics = monitor.collect_metrics() log_entry = monitor.generate_wazuh_log(metrics) print(json.dumps(log_entry, indent=2)) else: monitor.run_monitoring(args.interval)
if __name__ == "__main__": main()
Conclusion
This comprehensive guide covers the essential aspects of Wazuh data analysis, rule engine configuration, snapshot migration, and Windows monitoring integration. The combination of these technologies provides a robust security monitoring solution with:
- Complete data flow visibility from log collection to alert generation
- Flexible rule engine supporting default, custom, and classification rules
- Reliable migration procedures for system upgrades and data transfer
- Comprehensive Windows monitoring using WMI and remote management tools
- Seamless integration with monitoring platforms like Icinga
For production deployments, ensure proper security configurations, regular backups, and comprehensive monitoring of all components. Regular testing of migration procedures and monitoring capabilities helps maintain system reliability and security effectiveness.
Remember to adapt configurations to your specific environment requirements and security policies.