1056 words
5 minutes
Wazuh Agent Status to NATS Integration - Rust POC
Wazuh Agent Status to NATS Integration - POC Document
Overview
Create a Rust-based service that monitors Wazuh agent status changes and publishes them to NATS for real-time security event distribution across XDR/OXDR platforms.
Technical Objectives
- Real-time Monitoring: Detect agent status changes (Active/Disconnected/Never Connected)
- Message Publishing: Send structured events to NATS subjects
- Security First: Implement proper authentication, encryption, and error handling
- Performance: Handle high-throughput environments with minimal latency
- Reliability: Ensure message delivery with appropriate retry mechanisms
Architecture Overview
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐│ Wazuh Manager │ │ Rust Service │ │ NATS Server ││ │ │ │ │ ││ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ ││ │ global.db │─┼────┼─│ DB Monitor │ │ │ │ Subjects │ ││ │ client.keys │ │ │ │ Status Diff │ │ │ │ - agent.* │ ││ └─────────────┘ │ │ │ NATS Client │─┼────┼─│ - security.*│ ││ │ │ └─────────────┘ │ │ └─────────────┘ ││ /var/ossec/ │ │ │ │ │└─────────────────┘ └─────────────────┘ └─────────────────┘
Technical Requirements
Core Dependencies (Rust)
[dependencies]# NATS clientasync-nats = "0.35"
# Database accessrusqlite = { version = "0.31", features = ["bundled"] }tokio-rusqlite = "0.5"
# Async runtimetokio = { version = "1.0", features = ["full"] }
# Serializationserde = { version = "1.0", features = ["derive"] }serde_json = "1.0"
# Error handlinganyhow = "1.0"thiserror = "1.0"
# Loggingtracing = "0.1"tracing-subscriber = "0.3"
# Configurationconfig = "0.14"clap = { version = "4.0", features = ["derive"] }
# File system monitoringnotify = "6.0"
# Cryptography (for message integrity)ring = "0.17"base64 = "0.22"
# Time handlingchrono = { version = "0.4", features = ["serde"] }
Data Models
Agent Status Event Structure
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct AgentStatusEvent { pub agent_id: String, pub agent_name: String, pub previous_status: AgentStatus, pub current_status: AgentStatus, pub ip_address: Option<String>, pub last_keepalive: Option<DateTime<Utc>>, pub manager_node: String, pub timestamp: DateTime<Utc>, pub event_id: String, pub signature: Option<String>, // For integrity verification}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]pub enum AgentStatus { Active, Disconnected, NeverConnected, Pending, Unknown,}
Implementation Components
1. Database Monitor (src/database/monitor.rs)
pub struct WazuhDbMonitor { db_path: PathBuf, poll_interval: Duration, last_check: HashMap<String, AgentStatusSnapshot>,}
impl WazuhDbMonitor { pub async fn new(db_path: impl Into<PathBuf>) -> Result<Self>; pub async fn start_monitoring(&mut self) -> Result<Receiver<AgentStatusEvent>>; pub async fn get_current_agents(&self) -> Result<Vec<AgentInfo>>; pub async fn detect_changes(&mut self) -> Result<Vec<AgentStatusEvent>>;}
Key Implementation Points:
- Query
/var/ossec/queue/db/global.db
for agent table - Implement SQLite row-level change detection
- Handle database locks and connection pooling
- Monitor for new agent registrations
2. NATS Publisher (src/nats/publisher.rs)
pub struct NatsPublisher { client: async_nats::Client, subject_prefix: String, encryption_key: Option<Vec<u8>>,}
impl NatsPublisher { pub async fn new(config: NatsConfig) -> Result<Self>; pub async fn publish_agent_event(&self, event: &AgentStatusEvent) -> Result<()>; pub async fn publish_batch(&self, events: Vec<AgentStatusEvent>) -> Result<()>; pub fn create_subject(&self, agent_id: &str, event_type: &str) -> String;}
NATS Subject Structure:
wazuh.agent.{agent_id}.status
- Individual agent status changeswazuh.agent.{agent_id}.connected
- Agent connection eventswazuh.agent.{agent_id}.disconnected
- Agent disconnection eventswazuh.security.agent_events
- Aggregated security eventswazuh.manager.{node_id}.heartbeat
- Manager health status
3. Configuration Management (src/config/mod.rs)
#[derive(Debug, Deserialize)]pub struct Config { pub wazuh: WazuhConfig, pub nats: NatsConfig, pub security: SecurityConfig, pub monitoring: MonitoringConfig,}
#[derive(Debug, Deserialize)]pub struct NatsConfig { pub url: String, pub credentials_file: Option<PathBuf>, pub tls_cert: Option<PathBuf>, pub tls_key: Option<PathBuf>, pub subject_prefix: String, pub max_reconnects: usize, pub reconnect_delay: Duration,}
4. Security Implementation (src/security/mod.rs)
pub struct MessageSigner { private_key: Vec<u8>,}
impl MessageSigner { pub fn new(key_path: impl AsRef<Path>) -> Result<Self>; pub fn sign_message(&self, message: &str) -> Result<String>; pub fn verify_signature(&self, message: &str, signature: &str) -> Result<bool>;}
pub struct EncryptionHandler { key: Vec<u8>,}
impl EncryptionHandler { pub fn encrypt(&self, data: &[u8]) -> Result<Vec<u8>>; pub fn decrypt(&self, encrypted_data: &[u8]) -> Result<Vec<u8>>;}
Core Implementation Tasks
Phase 1: Database Integration (3-4 hours)
-
SQLite Connection Management
- Implement connection pooling for
/var/ossec/queue/db/global.db
- Handle database locks and read-only access
- Create agent status query functions
- Implement connection pooling for
-
Change Detection Algorithm
- Implement efficient diff mechanism for agent status
- Handle agent registration/deregistration events
- Create timestamp-based change tracking
Phase 2: NATS Integration (2-3 hours)
-
NATS Client Setup
- Configure connection with authentication (JWT/NKeys)
- Implement TLS encryption for transport security
- Handle connection failures and auto-reconnection
-
Message Publishing
- Design subject hierarchy for different event types
- Implement message serialization (JSON/Protocol Buffers)
- Add message deduplication mechanisms
Phase 3: Security & Reliability (2-3 hours)
-
Message Integrity
- Implement HMAC signing for message authenticity
- Add optional message encryption for sensitive data
- Create message versioning scheme
-
Error Handling & Resilience
- Implement exponential backoff for retries
- Add circuit breaker for external dependencies
- Create comprehensive logging and metrics
Phase 4: Testing & Documentation (1-2 hours)
-
Unit Tests
- Mock SQLite database interactions
- Test NATS message publishing scenarios
- Validate security implementations
-
Integration Tests
- Test with actual Wazuh manager setup
- Verify NATS message delivery
- Performance testing under load
Configuration Example
wazuh: db_path: "/var/ossec/queue/db/global.db" poll_interval: "5s" manager_node: "wazuh-manager-01"
nats: url: "nats://localhost:4222" credentials_file: "/etc/wazuh-nats/nats.creds" tls_cert: "/etc/wazuh-nats/client.crt" tls_key: "/etc/wazuh-nats/client.key" subject_prefix: "wazuh" max_reconnects: 10 reconnect_delay: "2s"
security: signing_key_path: "/etc/wazuh-nats/signing.key" encrypt_messages: true encryption_key_path: "/etc/wazuh-nats/encryption.key"
monitoring: enable_metrics: true metrics_port: 9090 log_level: "info"
NATS Message Examples
Agent Status Change Event
{ "agent_id": "001", "agent_name": "web-server-01", "previous_status": "Active", "current_status": "Disconnected", "ip_address": "192.168.1.100", "last_keepalive": "2025-05-28T10:30:45Z", "manager_node": "wazuh-manager-01", "timestamp": "2025-05-28T10:35:00Z", "event_id": "evt_001_1716897300_status_change", "signature": "abc123def456..."}
Batch Status Update
{ "batch_id": "batch_1716897300", "manager_node": "wazuh-manager-01", "timestamp": "2025-05-28T10:35:00Z", "events": [ { /* individual agent events */ } ], "summary": { "total_agents": 150, "active_agents": 145, "disconnected_agents": 5, "changes_detected": 3 }}
Deployment Considerations
Security Requirements
- Filesystem Permissions: Read access to Wazuh databases
- Network Security: TLS encryption for NATS connections
- Authentication: NATS JWT/NKeys or credentials file
- Message Integrity: HMAC signatures for all published events
Performance Targets
- Latency: < 100ms from status change to NATS publish
- Throughput: Handle 1000+ agents with 5-second polling
- Resource Usage: < 50MB memory, < 1% CPU under normal load
- Reliability: 99.9% message delivery success rate
Monitoring & Observability
- Metrics: Agent count, message publish rate, error rates
- Logging: Structured logs with correlation IDs
- Health Checks: Database connectivity, NATS connection status
- Alerting: Failed message delivery, database connection issues
Evaluation Criteria
Technical Implementation (40%)
- Code quality and Rust best practices
- Proper error handling and edge cases
- Security implementation completeness
- Performance and resource efficiency
Architecture & Design (30%)
- System design and component separation
- Scalability considerations
- Security architecture decisions
- Database integration approach
Testing & Documentation (20%)
- Unit and integration test coverage
- Code documentation quality
- Configuration examples and setup guides
- Error handling scenarios
Security & Reliability (10%)
- Message integrity implementation
- Connection security measures
- Failure recovery mechanisms
- Input validation and sanitization
Bonus Challenges
Advanced Features (Optional)
- Implement message compression for high-throughput scenarios
- Add support for multiple Wazuh manager nodes
- Create a web dashboard for monitoring service health
Security Enhancements (Optional)
- Implement message rate limiting per agent
- Add anomaly detection for unusual status patterns
- Create audit logging for all published events
Operational Excellence (Optional)
- Add Prometheus metrics endpoint
- Implement graceful shutdown with message queue draining
- Create Docker containerization with security hardening
Expected Deliverables
- Source Code: Complete Rust implementation with proper project structure
- Configuration: Example configuration files and environment setup
- Documentation: README with setup instructions and API documentation
- Tests: Unit and integration tests with coverage report
- Demo: Working demonstration with sample NATS subscribers
Estimated Timeline: 8-12 hours for complete implementation
Skill Level: Intermediate to Advanced Rust developer with systems programming experience
Wazuh Agent Status to NATS Integration - Rust POC
https://mranv.pages.dev/posts/wazuh-agent-status-monitor-rust/