Wazuh MCP Server: Bridging SIEM and AI for Next-Generation Security Operations
Executive Summary
The Wazuh MCP Server represents a paradigm shift in security operations, seamlessly bridging the gap between traditional SIEM capabilities and modern AI-powered analysis. By implementing the Model Context Protocol (MCP), this solution enables security teams to leverage natural language interactions with their security data, dramatically reducing the time from detection to response.
The Challenge in Modern SOCs
Security Operations Centers (SOCs) face unprecedented challenges:
- Data Overload: Processing millions of daily security events
- Complex Queries: Writing intricate API calls and custom scripts
- Skill Gap: Requiring deep platform expertise for effective analysis
- Response Time: Manual investigation processes slowing incident response
- Alert Fatigue: Struggling to identify critical threats among noise
What is the Wazuh MCP Server?
The Wazuh MCP Server is a production-ready Python server that acts as an intelligent bridge between Wazuh SIEM and Large Language Models (LLMs). It exposes Wazuh’s comprehensive API capabilities as standardized “tools” that AI models can call, enabling natural language interactions with security data.
Core Architecture
graph TB subgraph "Security Operations Platform" LLM[LLM/AI Model] <--> MCP[Wazuh MCP Server] MCP <--> WM[Wazuh Manager] WM --> A1[Agent 001] WM --> A2[Agent 002] WM --> A3[Agent 003]
subgraph "MCP Components" MCP --> AUTH[JWT Authentication] MCP --> POOL[Connection Pool] MCP --> TOOLS[Tool Registry] MCP --> SSE[SSE Transport] end
subgraph "Security Data" WM --> PROC[Processes] WM --> NET[Network Ports] WM --> PKG[Packages] WM --> SCAT[SCA Results] end end
Key Features & Capabilities
1. Production-Ready Infrastructure
- Enterprise Security: JWT token management with automatic refresh
- High Performance: HTTP/2 support with connection pooling
- Reliability: Comprehensive error handling and retry mechanisms
- Scalability: Asynchronous architecture supporting concurrent requests
2. Comprehensive Tool Suite
# Available Security ToolsWAZUH_MCP_TOOLS = { 'AuthenticateTool': 'Secure authentication with Wazuh API', 'GetAgentsTool': 'Retrieve and analyze agent status', 'GetAgentPortsTool': 'Network port analysis', 'GetAgentPackagesTool': 'Installed software inventory', 'GetAgentProcessesTool': 'Running process analysis', 'ListRulesTool': 'Security rule exploration', 'GetRuleFileContentTool': 'Rule content inspection', 'GetRuleFilesTool': 'Rule file management', 'GetAgentSCATool': 'Security Configuration Assessment'}
3. Flexible Configuration
# Configuration Optionswazuh: url: "https://wazuh-manager:55000" username: "admin" password: "${WAZUH_PASSWORD}" ssl_verify: true timeout: 30 max_retries: 3
server: host: "0.0.0.0" port: 8000 log_level: "INFO"
security: read_only_mode: false allowed_tools: ["GetAgents", "GetAgentPorts"] disabled_tools: []
Real-World Implementation
Installation & Setup
# Python 3.11+ requiredpython -m venv wazuh-mcp-envsource wazuh-mcp-env/bin/activate
# Install from GitHubpip install git+https://github.com/socfortress/wazuh-mcp-server.git
# Configure environmentcat > .env << EOFWAZUH_PROD_URL=https://wazuh.company.com:55000WAZUH_PROD_USERNAME=adminWAZUH_PROD_PASSWORD=SecurePassword123!WAZUH_PROD_SSL_VERIFY=trueEOF
# Launch serverpython -m wazuh_mcp_server
Integration with LangChain
from langchain_openai import ChatOpenAIfrom langchain.agents import create_openai_tools_agentfrom mcp_langchain import MultiServerMCPClient
# Initialize MCP clientmcp_client = MultiServerMCPClient({ "wazuh-mcp-server": { "transport": "sse", "url": "http://127.0.0.1:8000/sse/", "headers": { "Authorization": f"Bearer {JWT_TOKEN}" } }})
# Create AI agent with Wazuh toolsllm = ChatOpenAI(model="gpt-4", temperature=0)tools = await mcp_client.get_tools()agent = create_openai_tools_agent(llm, tools)
# Natural language security queriesasync def analyze_security(): response = await agent.ainvoke({ "input": "Show me all Windows agents with suspicious PowerShell processes" }) return response
Transformative Use Cases
1. Threat Hunting Automation
# Traditional approach: Complex API callsresponse = requests.get( f"{WAZUH_URL}/agents", headers={"Authorization": f"Bearer {token}"})agents = response.json()["data"]["items"]for agent in agents: if agent["os"]["platform"] == "windows": processes = requests.get( f"{WAZUH_URL}/syscollector/{agent['id']}/processes", headers=headers ) # Complex filtering logic...
# MCP approach: Natural language"Find all Windows agents running processes with encoded PowerShell commands"
2. Incident Investigation
# AI-powered investigation workflowinvestigation_queries = [ "List all agents that communicated with IP 192.168.1.100 in the last 24 hours", "Show me processes spawned by those agents during that timeframe", "Check if any of those processes have unusual network connections", "Identify any persistence mechanisms established"]
for query in investigation_queries: result = await agent.ainvoke({"input": query}) analyze_findings(result)
3. Compliance Auditing
# Automated compliance checkscompliance_check = """For all production servers:1. Verify antivirus is installed and running2. Check if all critical patches are applied3. Ensure logging is enabled and functional4. Validate firewall configurationsGenerate a compliance report with any deviations"""
report = await agent.ainvoke({"input": compliance_check})
Security Architecture
Authentication Flow
sequenceDiagram participant Client participant MCP Server participant Wazuh API
Client->>MCP Server: Request with credentials MCP Server->>Wazuh API: Authenticate Wazuh API-->>MCP Server: JWT Token MCP Server-->>Client: Session established
loop Every request Client->>MCP Server: API call with token MCP Server->>MCP Server: Validate token alt Token valid MCP Server->>Wazuh API: Forward request Wazuh API-->>MCP Server: Response MCP Server-->>Client: Processed data else Token expired MCP Server->>Wazuh API: Refresh token Wazuh API-->>MCP Server: New token MCP Server->>Wazuh API: Retry request end end
Security Best Practices
-
Network Segmentation
- Deploy MCP server in DMZ or security zone
- Use TLS/SSL for all communications
- Implement firewall rules restricting access
-
Access Control
- Enable read-only mode for analysis tasks
- Implement tool-level permissions
- Use service accounts with minimal privileges
-
Audit & Monitoring
- Log all MCP server interactions
- Monitor for anomalous query patterns
- Set up alerts for authentication failures
Performance Optimization
Connection Pooling Strategy
# Optimized connection configurationCONNECTION_POOL = { 'max_connections': 20, 'max_keepalive_connections': 10, 'keepalive_expiry': 300, # 5 minutes 'http2': True, 'retries': { 'max_attempts': 3, 'backoff_factor': 0.5 }}
Query Optimization
# Batch operations for efficiencyasync def batch_agent_analysis(agent_ids): tasks = [] for agent_id in agent_ids: tasks.append( agent.ainvoke({ "input": f"Analyze security posture of agent {agent_id}" }) ) results = await asyncio.gather(*tasks) return consolidate_results(results)
Advanced Integration Patterns
1. SOAR Platform Integration
# Integrate with security orchestrationclass WazuhMCPConnector: def __init__(self, mcp_url): self.client = MultiServerMCPClient({ "wazuh": {"url": mcp_url, "transport": "sse"} })
async def investigate_alert(self, alert_data): # AI-driven investigation context = f"Investigate alert: {alert_data}" findings = await self.client.query(context)
# Automated response if findings['severity'] == 'critical': await self.isolate_host(findings['agent_id']) await self.create_incident_ticket(findings)
return findings
2. Continuous Threat Hunting
# Automated threat hunting workflowclass ThreatHunter: def __init__(self, mcp_client): self.mcp = mcp_client self.hunting_queries = self.load_hunting_playbook()
async def hunt(self): while True: for query in self.hunting_queries: result = await self.mcp.query(query) if self.is_suspicious(result): await self.deep_dive_investigation(result)
await asyncio.sleep(3600) # Hunt every hour
3. Intelligent Alert Enrichment
# AI-powered alert enrichmentasync def enrich_alert(alert): enrichment_prompt = f""" Alert: {alert['rule']['description']} Agent: {alert['agent']['name']}
Please provide: 1. Threat context and potential impact 2. Related IOCs to investigate 3. Recommended response actions 4. Similar historical incidents """
enriched_data = await mcp_agent.ainvoke({ "input": enrichment_prompt })
return merge_enrichment(alert, enriched_data)
Monitoring & Observability
Metrics Collection
# Prometheus metrics for MCP serverfrom prometheus_client import Counter, Histogram, Gauge
mcp_requests_total = Counter( 'mcp_requests_total', 'Total MCP requests', ['method', 'tool', 'status'])
mcp_request_duration = Histogram( 'mcp_request_duration_seconds', 'MCP request duration', ['method', 'tool'])
wazuh_agents_active = Gauge( 'wazuh_agents_active', 'Number of active Wazuh agents')
Logging Configuration
# Structured logging setupimport structlog
logger = structlog.get_logger()
logger.info( "mcp_request", tool="GetAgentProcesses", agent_id="001", user="analyst1", duration=0.234, status="success")
Troubleshooting Guide
Common Issues & Solutions
- Connection Timeout
# Increase timeout settingsexport WAZUH_TIMEOUT=60python -m wazuh_mcp_server --timeout 60
- SSL Certificate Verification
# For self-signed certificates (dev only)export WAZUH_PROD_SSL_VERIFY=false
# For custom CA certificatesexport SSL_CERT_FILE=/path/to/ca-bundle.crt
- Rate Limiting
# Implement rate limitingfrom aiohttp import ClientSessionfrom aiolimiter import AsyncLimiter
rate_limiter = AsyncLimiter(10, 1) # 10 requests per second
async def rate_limited_request(url): async with rate_limiter: async with ClientSession() as session: return await session.get(url)
Future Roadmap
Upcoming Features
- Vector Database Integration: Store and search security events semantically
- Multi-tenancy Support: Isolate data between different organizations
- Custom Tool Development: SDK for creating domain-specific tools
- ML Model Integration: Direct integration with security-focused ML models
- Streaming Support: Real-time event streaming with WebSockets
Community Contributions
The project welcomes contributions in:
- Additional tool implementations
- LLM framework integrations
- Performance optimizations
- Documentation improvements
- Security enhancements
Conclusion
The Wazuh MCP Server fundamentally transforms how security teams interact with their SIEM data. By bridging the gap between traditional security tools and modern AI capabilities, it enables:
- Faster Investigation: Natural language queries eliminate complex scripting
- Democratized Access: Junior analysts can perform advanced analysis
- Automated Workflows: AI-driven investigation and response
- Reduced MTTR: Accelerated threat detection and response
- Enhanced Coverage: Continuous, intelligent threat hunting
As security threats evolve, the combination of Wazuh’s comprehensive monitoring and AI’s analytical capabilities positions organizations to stay ahead of adversaries while optimizing their security operations.
Resources
Get Started
Ready to revolutionize your security operations? Deploy the Wazuh MCP Server today and experience the future of AI-powered security analysis.
# Quick startpip install git+https://github.com/socfortress/wazuh-mcp-server.gitpython -m wazuh_mcp_server
Join the community and contribute to the evolution of intelligent security operations.