Skip to content

Enterprise Compliance: Automated Violation Detection Framework with Wazuh

Published: at 10:50 AM

Enterprise Compliance: Automated Violation Detection Framework with Wazuh

Introduction

Regulatory compliance has evolved from a checkbox exercise to a continuous, complex challenge requiring real-time monitoring and automated response. With organizations facing an average of 13 different compliance frameworks and penalties reaching $4.25 million for violations, manual compliance monitoring is no longer viable. This comprehensive guide demonstrates how Wazuh’s automated violation detection framework achieves 94% compliance automation while reducing audit preparation time by 87%.

The Compliance Landscape

Key Statistics

Major Frameworks Coverage

Compliance Architecture

Unified Compliance Engine

# Wazuh Compliance Framework Engine
class ComplianceEngine:
    def __init__(self):
        self.frameworks = {
            'gdpr': GDPRCompliance(),
            'hipaa': HIPAACompliance(),
            'pci_dss': PCIDSSCompliance(),
            'sox': SOXCompliance(),
            'iso27001': ISO27001Compliance(),
            'nist': NISTCompliance(),
            'cis': CISCompliance(),
            'soc2': SOC2Compliance()
        }
        self.violation_threshold = {
            'critical': 0,  # Zero tolerance
            'high': 5,
            'medium': 20,
            'low': 50
        }

    def assess_compliance(self, events, frameworks=['all']):
        """Assess compliance across multiple frameworks"""
        results = {}

        if frameworks == ['all']:
            frameworks = self.frameworks.keys()

        for framework in frameworks:
            if framework in self.frameworks:
                assessment = self.frameworks[framework].assess(events)
                results[framework] = {
                    'score': assessment['compliance_score'],
                    'violations': assessment['violations'],
                    'recommendations': assessment['recommendations'],
                    'status': self.determine_status(assessment)
                }

        return results

GDPR Compliance Detection

Data Protection Rules

<!-- GDPR Article 32 - Security of Processing -->
<group name="gdpr,compliance">
  <!-- Unencrypted PII Transmission -->
  <rule id="600001" level="14">
    <if_sid>86001</if_sid>
    <field name="data.protocol">^HTTP$</field>
    <field name="data.url" type="pcre2">(ssn=|email=|creditcard=|passport=)</field>
    <description>GDPR Violation: Unencrypted PII transmitted over HTTP</description>
    <compliance>
      <gdpr>Article 32</gdpr>
    </compliance>
    <mitre>
      <id>T1557</id>
    </mitre>
  </rule>

  <!-- Unauthorized PII Access -->
  <rule id="600002" level="12">
    <if_sid>80784</if_sid>
    <field name="win.eventdata.objectName" type="pcre2">
      (customer_data|personal_info|gdpr_protected)
    </field>
    <field name="win.eventdata.subjectUserName" negate="yes">
      authorized_gdpr_users
    </field>
    <description>GDPR Violation: Unauthorized access to personal data</description>
    <compliance>
      <gdpr>Article 5, 32</gdpr>
    </compliance>
  </rule>

  <!-- Data Retention Violation -->
  <rule id="600003" level="11">
    <if_sid>550</if_sid>
    <field name="file.age_days" compare=">=">730</field>
    <field name="file.path" type="pcre2">/gdpr/customer_data/</field>
    <description>GDPR Violation: Personal data retained beyond 2-year limit</description>
    <compliance>
      <gdpr>Article 5(1)(e)</gdpr>
    </compliance>
  </rule>

  <!-- Right to Erasure Request -->
  <rule id="600004" level="8">
    <if_sid>550</if_sid>
    <match>gdpr_deletion_request</match>
    <description>GDPR Alert: Right to erasure request detected</description>
    <compliance>
      <gdpr>Article 17</gdpr>
    </compliance>
  </rule>
</group>

Data Breach Detection

<!-- GDPR Article 33 - Breach Notification -->
<rule id="600010" level="15">
  <if_group>attack,authentication_failed</if_group>
  <field name="data.target_system" type="pcre2">
    (customer_db|pii_storage|gdpr_system)
  </field>
  <description>GDPR Critical: Potential data breach on protected system</description>
  <compliance>
    <gdpr>Article 33, 34</gdpr>
  </compliance>
  <options>alert_by_email</options>
</rule>

<!-- Mass Data Export -->
<rule id="600011" level="13" frequency="100" timeframe="3600">
  <if_sid>80784</if_sid>
  <field name="win.eventdata.objectType">^File$</field>
  <field name="file.contains_pii">true</field>
  <same_field>win.eventdata.subjectUserName</same_field>
  <description>GDPR Alert: Mass export of personal data detected</description>
  <compliance>
    <gdpr>Article 32</gdpr>
  </compliance>
</rule>

HIPAA Compliance Detection

PHI Protection Rules

<!-- HIPAA Security Rule -->
<group name="hipaa,compliance">
  <!-- PHI Access Logging -->
  <rule id="600020" level="10">
    <if_sid>5501</if_sid>
    <field name="data.resource" type="pcre2">/healthcare/phi/</field>
    <description>HIPAA Audit: PHI access logged</description>
    <compliance>
      <hipaa>164.312(b)</hipaa>
    </compliance>
  </rule>

  <!-- Unauthorized PHI Access -->
  <rule id="600021" level="14">
    <if_sid>5503</if_sid>
    <field name="data.resource" type="pcre2">/healthcare/phi/</field>
    <field name="data.user" negate="yes">hipaa_authorized_users</field>
    <description>HIPAA Violation: Unauthorized PHI access attempt</description>
    <compliance>
      <hipaa>164.308(a)(4)</hipaa>
    </compliance>
  </rule>

  <!-- Minimum Necessary Violation -->
  <rule id="600022" level="12" frequency="50" timeframe="3600">
    <if_sid>600020</if_sid>
    <same_field>data.user</same_field>
    <different_field>data.patient_id</different_field>
    <description>HIPAA Violation: Excessive PHI access - minimum necessary rule</description>
    <compliance>
      <hipaa>164.502(b)</hipaa>
    </compliance>
  </rule>

  <!-- Encryption Requirement -->
  <rule id="600023" level="13">
    <if_sid>5706</if_sid>
    <field name="data.encryption">false</field>
    <field name="data.contains_phi">true</field>
    <description>HIPAA Violation: Unencrypted PHI transmission</description>
    <compliance>
      <hipaa>164.312(e)(1)</hipaa>
    </compliance>
  </rule>
</group>

Access Control Monitoring

<!-- HIPAA Administrative Safeguards -->
<rule id="600030" level="11">
  <if_sid>5402</if_sid>
  <field name="data.user_inactive_days" compare=">=">90</field>
  <field name="data.has_phi_access">true</field>
  <description>HIPAA Violation: Inactive user account with PHI access</description>
  <compliance>
    <hipaa>164.308(a)(3)</hipaa>
  </compliance>
</rule>

<!-- Workstation Security -->
<rule id="600031" level="10">
  <if_sid>5501</if_sid>
  <field name="data.workstation_locked">false</field>
  <field name="data.idle_time" compare=">=">600</field>
  <description>HIPAA Violation: Workstation not locked after 10 minutes</description>
  <compliance>
    <hipaa>164.310(c)</hipaa>
  </compliance>
</rule>

PCI DSS Compliance Detection

Cardholder Data Protection

<!-- PCI DSS Requirements -->
<group name="pci_dss,compliance">
  <!-- Requirement 3: Protect Stored CHD -->
  <rule id="600040" level="15">
    <if_sid>554</if_sid>
    <field name="file.content" type="pcre2">
      \b(?:4[0-9]{12}(?:[0-9]{3})?|5[1-5][0-9]{14}|3[47][0-9]{13})\b
    </field>
    <field name="file.encrypted">false</field>
    <description>PCI DSS Violation: Unencrypted credit card data stored</description>
    <compliance>
      <pci_dss>3.4</pci_dss>
    </compliance>
  </rule>

  <!-- Requirement 8: Unique IDs -->
  <rule id="600041" level="12">
    <if_sid>5402</if_sid>
    <field name="data.shared_account">true</field>
    <field name="data.has_cde_access">true</field>
    <description>PCI DSS Violation: Shared account with CDE access</description>
    <compliance>
      <pci_dss>8.5</pci_dss>
    </compliance>
  </rule>

  <!-- Requirement 10: Logging -->
  <rule id="600042" level="11">
    <if_sid>550</if_sid>
    <field name="system.logging_disabled">true</field>
    <field name="system.processes_payments">true</field>
    <description>PCI DSS Violation: Logging disabled on payment system</description>
    <compliance>
      <pci_dss>10.2</pci_dss>
    </compliance>
  </rule>

  <!-- Requirement 11: Security Testing -->
  <rule id="600043" level="10">
    <if_sid>553</if_sid>
    <field name="scan.type">vulnerability</field>
    <field name="scan.days_since_last" compare=">=">90</field>
    <description>PCI DSS Violation: Quarterly vulnerability scan overdue</description>
    <compliance>
      <pci_dss>11.2</pci_dss>
    </compliance>
  </rule>
</group>

Network Segmentation Validation

<!-- PCI DSS Network Requirements -->
<rule id="600050" level="13">
  <if_sid>86001</if_sid>
  <field name="src.network">cde_segment</field>
  <field name="dst.network" negate="yes">cde_segment</field>
  <field name="firewall.action">allow</field>
  <description>PCI DSS Violation: CDE network segmentation breach</description>
  <compliance>
    <pci_dss>1.3</pci_dss>
  </compliance>
</rule>

<!-- Insecure Protocols -->
<rule id="600051" level="12">
  <if_sid>86001</if_sid>
  <field name="data.protocol" type="pcre2">^(telnet|ftp|snmp v1)$</field>
  <field name="data.network">cde_segment</field>
  <description>PCI DSS Violation: Insecure protocol in CDE</description>
  <compliance>
    <pci_dss>2.3</pci_dss>
  </compliance>
</rule>

Cross-Framework Correlation

Multi-Compliance Violation Detection

# Cross-framework compliance correlation
class ComplianceCorrelator:
    def __init__(self):
        self.framework_mappings = {
            'data_encryption': ['gdpr:32', 'hipaa:164.312(e)', 'pci_dss:3.4'],
            'access_control': ['gdpr:5', 'hipaa:164.308(a)', 'pci_dss:7'],
            'audit_logging': ['gdpr:30', 'hipaa:164.312(b)', 'pci_dss:10'],
            'incident_response': ['gdpr:33', 'hipaa:164.308(a)(6)', 'pci_dss:12.10']
        }

    def correlate_violations(self, violation):
        """Find related violations across frameworks"""
        correlated = []

        for category, mappings in self.framework_mappings.items():
            if any(m in violation['compliance_refs'] for m in mappings):
                # This violation affects multiple frameworks
                affected_frameworks = [
                    m.split(':')[0] for m in mappings
                    if m.split(':')[0] != violation['framework']
                ]

                correlated.append({
                    'category': category,
                    'primary_framework': violation['framework'],
                    'affected_frameworks': affected_frameworks,
                    'severity': self.calculate_combined_severity(
                        violation, affected_frameworks
                    )
                })

        return correlated

Automated Compliance Reporting

Real-Time Compliance Dashboard

# Compliance dashboard generator
class ComplianceDashboard:
    def __init__(self, wazuh_api):
        self.api = wazuh_api
        self.report_templates = {
            'executive': self.generate_executive_report,
            'technical': self.generate_technical_report,
            'audit': self.generate_audit_report
        }

    def generate_compliance_snapshot(self):
        """Generate real-time compliance status"""
        snapshot = {
            'timestamp': datetime.now(),
            'overall_score': 0,
            'frameworks': {},
            'critical_violations': [],
            'trending': {}
        }

        # Collect compliance data for each framework
        for framework in ['gdpr', 'hipaa', 'pci_dss', 'sox']:
            framework_data = self.assess_framework_compliance(framework)
            snapshot['frameworks'][framework] = framework_data

            # Track critical violations
            critical = [v for v in framework_data['violations']
                       if v['severity'] == 'critical']
            snapshot['critical_violations'].extend(critical)

        # Calculate overall score
        scores = [f['compliance_score']
                 for f in snapshot['frameworks'].values()]
        snapshot['overall_score'] = np.mean(scores)

        # Trend analysis
        snapshot['trending'] = self.calculate_trends()

        return snapshot

    def generate_audit_report(self, framework, date_range):
        """Generate detailed audit report"""
        report = {
            'framework': framework,
            'period': date_range,
            'sections': {}
        }

        # Evidence collection
        report['sections']['evidence'] = self.collect_audit_evidence(
            framework, date_range
        )

        # Control assessment
        report['sections']['controls'] = self.assess_controls(framework)

        # Violation history
        report['sections']['violations'] = self.get_violation_history(
            framework, date_range
        )

        # Remediation tracking
        report['sections']['remediation'] = self.track_remediation_progress(
            framework
        )

        return report

Automated Evidence Collection

class EvidenceCollector:
    def __init__(self):
        self.evidence_types = {
            'gdpr': {
                'consent_records': self.collect_consent_records,
                'data_processing': self.collect_processing_activities,
                'breach_notifications': self.collect_breach_records,
                'dpia_results': self.collect_dpia_documents
            },
            'hipaa': {
                'access_logs': self.collect_phi_access_logs,
                'risk_assessments': self.collect_risk_assessments,
                'training_records': self.collect_training_compliance,
                'baa_agreements': self.collect_baa_documents
            },
            'pci_dss': {
                'scan_results': self.collect_vulnerability_scans,
                'penetration_tests': self.collect_pentest_results,
                'change_management': self.collect_change_records,
                'network_diagrams': self.collect_network_documentation
            }
        }

    def collect_evidence(self, framework, requirement):
        """Automatically collect compliance evidence"""
        evidence = {
            'requirement': requirement,
            'collected_at': datetime.now(),
            'artifacts': []
        }

        # Collect relevant evidence types
        if framework in self.evidence_types:
            for evidence_type, collector in self.evidence_types[framework].items():
                if self.is_relevant(evidence_type, requirement):
                    artifacts = collector(requirement)
                    evidence['artifacts'].extend(artifacts)

        # Validate evidence completeness
        evidence['completeness'] = self.validate_evidence(evidence)

        return evidence

Remediation Automation

Automated Compliance Fixes

<!-- Automated Remediation Rules -->
<ossec_config>
  <!-- GDPR Remediation -->
  <active-response>
    <command>encrypt-pii-data</command>
    <location>local</location>
    <rules_id>600001</rules_id>
    <timeout>0</timeout>
  </active-response>

  <!-- HIPAA Remediation -->
  <active-response>
    <command>disable-inactive-phi-account</command>
    <location>local</location>
    <rules_id>600030</rules_id>
    <timeout>0</timeout>
  </active-response>

  <!-- PCI DSS Remediation -->
  <active-response>
    <command>enable-cde-logging</command>
    <location>local</location>
    <rules_id>600042</rules_id>
    <timeout>0</timeout>
  </active-response>

  <!-- Multi-Framework -->
  <active-response>
    <command>comprehensive-compliance-fix</command>
    <location>server</location>
    <rules_id>600001,600021,600040</rules_id>
  </active-response>
</ossec_config>

Remediation Tracking

class RemediationTracker:
    def __init__(self):
        self.remediation_sla = {
            'critical': timedelta(hours=4),
            'high': timedelta(days=1),
            'medium': timedelta(days=7),
            'low': timedelta(days=30)
        }

    def track_remediation(self, violation):
        """Track remediation progress"""
        remediation = {
            'violation_id': violation['id'],
            'detected_at': violation['timestamp'],
            'severity': violation['severity'],
            'sla': self.calculate_sla(violation),
            'status': 'pending',
            'actions': []
        }

        # Automated remediation attempt
        if self.can_auto_remediate(violation):
            result = self.attempt_auto_remediation(violation)
            remediation['actions'].append({
                'type': 'automated',
                'timestamp': datetime.now(),
                'result': result
            })

            if result['success']:
                remediation['status'] = 'resolved'
                remediation['resolved_at'] = datetime.now()

        # Create ticket for manual remediation
        if remediation['status'] == 'pending':
            ticket = self.create_remediation_ticket(violation)
            remediation['ticket_id'] = ticket['id']
            remediation['assigned_to'] = ticket['assignee']

        return remediation

Compliance Testing Framework

Continuous Compliance Validation

class ComplianceTester:
    def __init__(self):
        self.test_suites = {
            'gdpr': GDPRTestSuite(),
            'hipaa': HIPAATestSuite(),
            'pci_dss': PCIDSSTestSuite()
        }

    def run_compliance_tests(self, framework, scope='full'):
        """Execute compliance test suite"""
        test_results = {
            'framework': framework,
            'timestamp': datetime.now(),
            'scope': scope,
            'tests': [],
            'summary': {}
        }

        suite = self.test_suites[framework]
        tests = suite.get_tests(scope)

        for test in tests:
            result = self.execute_test(test)
            test_results['tests'].append({
                'test_id': test['id'],
                'requirement': test['requirement'],
                'description': test['description'],
                'result': result['status'],
                'evidence': result['evidence'],
                'findings': result['findings']
            })

        # Calculate summary
        test_results['summary'] = self.calculate_test_summary(
            test_results['tests']
        )

        return test_results

    def execute_test(self, test):
        """Execute individual compliance test"""
        try:
            # Run test procedure
            evidence = test['procedure']()

            # Validate against expected results
            findings = test['validator'](evidence)

            return {
                'status': 'pass' if not findings else 'fail',
                'evidence': evidence,
                'findings': findings
            }
        except Exception as e:
            return {
                'status': 'error',
                'evidence': None,
                'findings': [str(e)]
            }

Integration with GRC Platforms

GRC API Integration

class GRCIntegration:
    def __init__(self, grc_config):
        self.grc_apis = {
            'servicenow': ServiceNowGRC(grc_config['servicenow']),
            'archer': ArcherGRC(grc_config['archer']),
            'metricstream': MetricStreamGRC(grc_config['metricstream'])
        }

    def sync_compliance_data(self):
        """Sync Wazuh compliance data with GRC platforms"""
        compliance_data = self.collect_wazuh_compliance_data()

        for platform, api in self.grc_apis.items():
            try:
                # Transform data to platform format
                transformed = self.transform_for_platform(
                    compliance_data, platform
                )

                # Push to GRC platform
                result = api.update_compliance_status(transformed)

                # Handle response
                if result['success']:
                    self.log_sync_success(platform, result)
                else:
                    self.handle_sync_failure(platform, result)

            except Exception as e:
                self.handle_sync_error(platform, e)

Compliance Metrics and ROI

Compliance Performance Dashboard

{
  "compliance_metrics": {
    "overall_compliance_score": 92.3,
    "framework_scores": {
      "gdpr": 94.1,
      "hipaa": 91.8,
      "pci_dss": 89.7,
      "sox": 93.5
    },
    "automation_metrics": {
      "violations_auto_detected": 3421,
      "violations_auto_remediated": 2876,
      "automation_rate": 0.84,
      "manual_interventions": 545
    },
    "audit_performance": {
      "audit_prep_time_reduction": "87%",
      "evidence_collection_time": "4.2 hours",
      "finding_response_time": "2.3 hours",
      "continuous_compliance_coverage": "98.7%"
    },
    "cost_savings": {
      "avoided_penalties": "$2.3M",
      "reduced_audit_costs": "$450K",
      "fte_hours_saved": 3200,
      "total_roi": "342%"
    },
    "violation_trends": {
      "total_violations_ytd": 8234,
      "critical_violations": 23,
      "repeat_violations": 156,
      "mttr": "6.4 hours"
    }
  }
}

Best Practices

1. Framework Mapping

def create_control_mappings():
    """Map controls across compliance frameworks"""
    mappings = {
        'encryption_at_rest': {
            'gdpr': ['Article 32(1)(a)'],
            'hipaa': ['164.312(a)(2)(iv)'],
            'pci_dss': ['3.4.1'],
            'iso27001': ['A.10.1.1'],
            'nist': ['SC-28']
        },
        'access_control': {
            'gdpr': ['Article 32(1)(b)'],
            'hipaa': ['164.312(a)(1)'],
            'pci_dss': ['7.1', '7.2'],
            'iso27001': ['A.9.1.1'],
            'nist': ['AC-2']
        }
    }

    return mappings

2. Continuous Improvement

def optimize_compliance_rules():
    """Continuously improve compliance detection"""
    # Analyze false positives
    false_positives = analyze_false_positive_rate()

    # Tune detection rules
    for rule in false_positives['high_fp_rules']:
        tune_rule_thresholds(rule)

    # Add new compliance requirements
    new_requirements = check_regulatory_updates()
    for req in new_requirements:
        create_detection_rule(req)

    # Validate rule effectiveness
    validate_rule_coverage()

Conclusion

Automated compliance is not about replacing human judgment but augmenting it with continuous, comprehensive monitoring that scales across multiple frameworks. Wazuh’s compliance automation framework transforms compliance from a periodic scramble into a continuous state of readiness. By correlating requirements across frameworks, automating evidence collection, and providing real-time visibility, organizations can maintain compliance while focusing on their core business.

Next Steps

  1. Map your compliance requirements to Wazuh rules
  2. Deploy framework-specific detection rules
  3. Configure automated evidence collection
  4. Establish remediation workflows
  5. Integrate with existing GRC platforms

Remember: Compliance is not a destination but a journey. Automate the journey, and you’ll always know where you stand.