Enterprise Compliance: Automated Violation Detection Framework with Wazuh
Introduction
Regulatory compliance has evolved from a checkbox exercise to a continuous, complex challenge requiring real-time monitoring and automated response. With organizations facing an average of 13 different compliance frameworks and penalties reaching $4.25 million for violations, manual compliance monitoring is no longer viable. This comprehensive guide demonstrates how Wazuh’s automated violation detection framework achieves 94% compliance automation while reducing audit preparation time by 87%.
The Compliance Landscape
Key Statistics
- $4.25M: Average cost of compliance violations
- 13: Average number of compliance frameworks per enterprise
- 2,300: Average compliance-related changes per year
- 73%: Organizations failing initial compliance audits
- 18 months: Average time to achieve full compliance manually
Major Frameworks Coverage
- GDPR (General Data Protection Regulation)
- HIPAA (Health Insurance Portability and Accountability Act)
- PCI DSS (Payment Card Industry Data Security Standard)
- SOX (Sarbanes-Oxley Act)
- ISO 27001/27002
- NIST 800-53
- CIS Controls
- SOC 2
Compliance Architecture
Unified Compliance Engine
# Wazuh Compliance Framework Engine
class ComplianceEngine:
def __init__(self):
self.frameworks = {
'gdpr': GDPRCompliance(),
'hipaa': HIPAACompliance(),
'pci_dss': PCIDSSCompliance(),
'sox': SOXCompliance(),
'iso27001': ISO27001Compliance(),
'nist': NISTCompliance(),
'cis': CISCompliance(),
'soc2': SOC2Compliance()
}
self.violation_threshold = {
'critical': 0, # Zero tolerance
'high': 5,
'medium': 20,
'low': 50
}
def assess_compliance(self, events, frameworks=['all']):
"""Assess compliance across multiple frameworks"""
results = {}
if frameworks == ['all']:
frameworks = self.frameworks.keys()
for framework in frameworks:
if framework in self.frameworks:
assessment = self.frameworks[framework].assess(events)
results[framework] = {
'score': assessment['compliance_score'],
'violations': assessment['violations'],
'recommendations': assessment['recommendations'],
'status': self.determine_status(assessment)
}
return results
GDPR Compliance Detection
Data Protection Rules
<!-- GDPR Article 32 - Security of Processing -->
<group name="gdpr,compliance">
<!-- Unencrypted PII Transmission -->
<rule id="600001" level="14">
<if_sid>86001</if_sid>
<field name="data.protocol">^HTTP$</field>
<field name="data.url" type="pcre2">(ssn=|email=|creditcard=|passport=)</field>
<description>GDPR Violation: Unencrypted PII transmitted over HTTP</description>
<compliance>
<gdpr>Article 32</gdpr>
</compliance>
<mitre>
<id>T1557</id>
</mitre>
</rule>
<!-- Unauthorized PII Access -->
<rule id="600002" level="12">
<if_sid>80784</if_sid>
<field name="win.eventdata.objectName" type="pcre2">
(customer_data|personal_info|gdpr_protected)
</field>
<field name="win.eventdata.subjectUserName" negate="yes">
authorized_gdpr_users
</field>
<description>GDPR Violation: Unauthorized access to personal data</description>
<compliance>
<gdpr>Article 5, 32</gdpr>
</compliance>
</rule>
<!-- Data Retention Violation -->
<rule id="600003" level="11">
<if_sid>550</if_sid>
<field name="file.age_days" compare=">=">730</field>
<field name="file.path" type="pcre2">/gdpr/customer_data/</field>
<description>GDPR Violation: Personal data retained beyond 2-year limit</description>
<compliance>
<gdpr>Article 5(1)(e)</gdpr>
</compliance>
</rule>
<!-- Right to Erasure Request -->
<rule id="600004" level="8">
<if_sid>550</if_sid>
<match>gdpr_deletion_request</match>
<description>GDPR Alert: Right to erasure request detected</description>
<compliance>
<gdpr>Article 17</gdpr>
</compliance>
</rule>
</group>
Data Breach Detection
<!-- GDPR Article 33 - Breach Notification -->
<rule id="600010" level="15">
<if_group>attack,authentication_failed</if_group>
<field name="data.target_system" type="pcre2">
(customer_db|pii_storage|gdpr_system)
</field>
<description>GDPR Critical: Potential data breach on protected system</description>
<compliance>
<gdpr>Article 33, 34</gdpr>
</compliance>
<options>alert_by_email</options>
</rule>
<!-- Mass Data Export -->
<rule id="600011" level="13" frequency="100" timeframe="3600">
<if_sid>80784</if_sid>
<field name="win.eventdata.objectType">^File$</field>
<field name="file.contains_pii">true</field>
<same_field>win.eventdata.subjectUserName</same_field>
<description>GDPR Alert: Mass export of personal data detected</description>
<compliance>
<gdpr>Article 32</gdpr>
</compliance>
</rule>
HIPAA Compliance Detection
PHI Protection Rules
<!-- HIPAA Security Rule -->
<group name="hipaa,compliance">
<!-- PHI Access Logging -->
<rule id="600020" level="10">
<if_sid>5501</if_sid>
<field name="data.resource" type="pcre2">/healthcare/phi/</field>
<description>HIPAA Audit: PHI access logged</description>
<compliance>
<hipaa>164.312(b)</hipaa>
</compliance>
</rule>
<!-- Unauthorized PHI Access -->
<rule id="600021" level="14">
<if_sid>5503</if_sid>
<field name="data.resource" type="pcre2">/healthcare/phi/</field>
<field name="data.user" negate="yes">hipaa_authorized_users</field>
<description>HIPAA Violation: Unauthorized PHI access attempt</description>
<compliance>
<hipaa>164.308(a)(4)</hipaa>
</compliance>
</rule>
<!-- Minimum Necessary Violation -->
<rule id="600022" level="12" frequency="50" timeframe="3600">
<if_sid>600020</if_sid>
<same_field>data.user</same_field>
<different_field>data.patient_id</different_field>
<description>HIPAA Violation: Excessive PHI access - minimum necessary rule</description>
<compliance>
<hipaa>164.502(b)</hipaa>
</compliance>
</rule>
<!-- Encryption Requirement -->
<rule id="600023" level="13">
<if_sid>5706</if_sid>
<field name="data.encryption">false</field>
<field name="data.contains_phi">true</field>
<description>HIPAA Violation: Unencrypted PHI transmission</description>
<compliance>
<hipaa>164.312(e)(1)</hipaa>
</compliance>
</rule>
</group>
Access Control Monitoring
<!-- HIPAA Administrative Safeguards -->
<rule id="600030" level="11">
<if_sid>5402</if_sid>
<field name="data.user_inactive_days" compare=">=">90</field>
<field name="data.has_phi_access">true</field>
<description>HIPAA Violation: Inactive user account with PHI access</description>
<compliance>
<hipaa>164.308(a)(3)</hipaa>
</compliance>
</rule>
<!-- Workstation Security -->
<rule id="600031" level="10">
<if_sid>5501</if_sid>
<field name="data.workstation_locked">false</field>
<field name="data.idle_time" compare=">=">600</field>
<description>HIPAA Violation: Workstation not locked after 10 minutes</description>
<compliance>
<hipaa>164.310(c)</hipaa>
</compliance>
</rule>
PCI DSS Compliance Detection
Cardholder Data Protection
<!-- PCI DSS Requirements -->
<group name="pci_dss,compliance">
<!-- Requirement 3: Protect Stored CHD -->
<rule id="600040" level="15">
<if_sid>554</if_sid>
<field name="file.content" type="pcre2">
\b(?:4[0-9]{12}(?:[0-9]{3})?|5[1-5][0-9]{14}|3[47][0-9]{13})\b
</field>
<field name="file.encrypted">false</field>
<description>PCI DSS Violation: Unencrypted credit card data stored</description>
<compliance>
<pci_dss>3.4</pci_dss>
</compliance>
</rule>
<!-- Requirement 8: Unique IDs -->
<rule id="600041" level="12">
<if_sid>5402</if_sid>
<field name="data.shared_account">true</field>
<field name="data.has_cde_access">true</field>
<description>PCI DSS Violation: Shared account with CDE access</description>
<compliance>
<pci_dss>8.5</pci_dss>
</compliance>
</rule>
<!-- Requirement 10: Logging -->
<rule id="600042" level="11">
<if_sid>550</if_sid>
<field name="system.logging_disabled">true</field>
<field name="system.processes_payments">true</field>
<description>PCI DSS Violation: Logging disabled on payment system</description>
<compliance>
<pci_dss>10.2</pci_dss>
</compliance>
</rule>
<!-- Requirement 11: Security Testing -->
<rule id="600043" level="10">
<if_sid>553</if_sid>
<field name="scan.type">vulnerability</field>
<field name="scan.days_since_last" compare=">=">90</field>
<description>PCI DSS Violation: Quarterly vulnerability scan overdue</description>
<compliance>
<pci_dss>11.2</pci_dss>
</compliance>
</rule>
</group>
Network Segmentation Validation
<!-- PCI DSS Network Requirements -->
<rule id="600050" level="13">
<if_sid>86001</if_sid>
<field name="src.network">cde_segment</field>
<field name="dst.network" negate="yes">cde_segment</field>
<field name="firewall.action">allow</field>
<description>PCI DSS Violation: CDE network segmentation breach</description>
<compliance>
<pci_dss>1.3</pci_dss>
</compliance>
</rule>
<!-- Insecure Protocols -->
<rule id="600051" level="12">
<if_sid>86001</if_sid>
<field name="data.protocol" type="pcre2">^(telnet|ftp|snmp v1)$</field>
<field name="data.network">cde_segment</field>
<description>PCI DSS Violation: Insecure protocol in CDE</description>
<compliance>
<pci_dss>2.3</pci_dss>
</compliance>
</rule>
Cross-Framework Correlation
Multi-Compliance Violation Detection
# Cross-framework compliance correlation
class ComplianceCorrelator:
def __init__(self):
self.framework_mappings = {
'data_encryption': ['gdpr:32', 'hipaa:164.312(e)', 'pci_dss:3.4'],
'access_control': ['gdpr:5', 'hipaa:164.308(a)', 'pci_dss:7'],
'audit_logging': ['gdpr:30', 'hipaa:164.312(b)', 'pci_dss:10'],
'incident_response': ['gdpr:33', 'hipaa:164.308(a)(6)', 'pci_dss:12.10']
}
def correlate_violations(self, violation):
"""Find related violations across frameworks"""
correlated = []
for category, mappings in self.framework_mappings.items():
if any(m in violation['compliance_refs'] for m in mappings):
# This violation affects multiple frameworks
affected_frameworks = [
m.split(':')[0] for m in mappings
if m.split(':')[0] != violation['framework']
]
correlated.append({
'category': category,
'primary_framework': violation['framework'],
'affected_frameworks': affected_frameworks,
'severity': self.calculate_combined_severity(
violation, affected_frameworks
)
})
return correlated
Automated Compliance Reporting
Real-Time Compliance Dashboard
# Compliance dashboard generator
class ComplianceDashboard:
def __init__(self, wazuh_api):
self.api = wazuh_api
self.report_templates = {
'executive': self.generate_executive_report,
'technical': self.generate_technical_report,
'audit': self.generate_audit_report
}
def generate_compliance_snapshot(self):
"""Generate real-time compliance status"""
snapshot = {
'timestamp': datetime.now(),
'overall_score': 0,
'frameworks': {},
'critical_violations': [],
'trending': {}
}
# Collect compliance data for each framework
for framework in ['gdpr', 'hipaa', 'pci_dss', 'sox']:
framework_data = self.assess_framework_compliance(framework)
snapshot['frameworks'][framework] = framework_data
# Track critical violations
critical = [v for v in framework_data['violations']
if v['severity'] == 'critical']
snapshot['critical_violations'].extend(critical)
# Calculate overall score
scores = [f['compliance_score']
for f in snapshot['frameworks'].values()]
snapshot['overall_score'] = np.mean(scores)
# Trend analysis
snapshot['trending'] = self.calculate_trends()
return snapshot
def generate_audit_report(self, framework, date_range):
"""Generate detailed audit report"""
report = {
'framework': framework,
'period': date_range,
'sections': {}
}
# Evidence collection
report['sections']['evidence'] = self.collect_audit_evidence(
framework, date_range
)
# Control assessment
report['sections']['controls'] = self.assess_controls(framework)
# Violation history
report['sections']['violations'] = self.get_violation_history(
framework, date_range
)
# Remediation tracking
report['sections']['remediation'] = self.track_remediation_progress(
framework
)
return report
Automated Evidence Collection
class EvidenceCollector:
def __init__(self):
self.evidence_types = {
'gdpr': {
'consent_records': self.collect_consent_records,
'data_processing': self.collect_processing_activities,
'breach_notifications': self.collect_breach_records,
'dpia_results': self.collect_dpia_documents
},
'hipaa': {
'access_logs': self.collect_phi_access_logs,
'risk_assessments': self.collect_risk_assessments,
'training_records': self.collect_training_compliance,
'baa_agreements': self.collect_baa_documents
},
'pci_dss': {
'scan_results': self.collect_vulnerability_scans,
'penetration_tests': self.collect_pentest_results,
'change_management': self.collect_change_records,
'network_diagrams': self.collect_network_documentation
}
}
def collect_evidence(self, framework, requirement):
"""Automatically collect compliance evidence"""
evidence = {
'requirement': requirement,
'collected_at': datetime.now(),
'artifacts': []
}
# Collect relevant evidence types
if framework in self.evidence_types:
for evidence_type, collector in self.evidence_types[framework].items():
if self.is_relevant(evidence_type, requirement):
artifacts = collector(requirement)
evidence['artifacts'].extend(artifacts)
# Validate evidence completeness
evidence['completeness'] = self.validate_evidence(evidence)
return evidence
Remediation Automation
Automated Compliance Fixes
<!-- Automated Remediation Rules -->
<ossec_config>
<!-- GDPR Remediation -->
<active-response>
<command>encrypt-pii-data</command>
<location>local</location>
<rules_id>600001</rules_id>
<timeout>0</timeout>
</active-response>
<!-- HIPAA Remediation -->
<active-response>
<command>disable-inactive-phi-account</command>
<location>local</location>
<rules_id>600030</rules_id>
<timeout>0</timeout>
</active-response>
<!-- PCI DSS Remediation -->
<active-response>
<command>enable-cde-logging</command>
<location>local</location>
<rules_id>600042</rules_id>
<timeout>0</timeout>
</active-response>
<!-- Multi-Framework -->
<active-response>
<command>comprehensive-compliance-fix</command>
<location>server</location>
<rules_id>600001,600021,600040</rules_id>
</active-response>
</ossec_config>
Remediation Tracking
class RemediationTracker:
def __init__(self):
self.remediation_sla = {
'critical': timedelta(hours=4),
'high': timedelta(days=1),
'medium': timedelta(days=7),
'low': timedelta(days=30)
}
def track_remediation(self, violation):
"""Track remediation progress"""
remediation = {
'violation_id': violation['id'],
'detected_at': violation['timestamp'],
'severity': violation['severity'],
'sla': self.calculate_sla(violation),
'status': 'pending',
'actions': []
}
# Automated remediation attempt
if self.can_auto_remediate(violation):
result = self.attempt_auto_remediation(violation)
remediation['actions'].append({
'type': 'automated',
'timestamp': datetime.now(),
'result': result
})
if result['success']:
remediation['status'] = 'resolved'
remediation['resolved_at'] = datetime.now()
# Create ticket for manual remediation
if remediation['status'] == 'pending':
ticket = self.create_remediation_ticket(violation)
remediation['ticket_id'] = ticket['id']
remediation['assigned_to'] = ticket['assignee']
return remediation
Compliance Testing Framework
Continuous Compliance Validation
class ComplianceTester:
def __init__(self):
self.test_suites = {
'gdpr': GDPRTestSuite(),
'hipaa': HIPAATestSuite(),
'pci_dss': PCIDSSTestSuite()
}
def run_compliance_tests(self, framework, scope='full'):
"""Execute compliance test suite"""
test_results = {
'framework': framework,
'timestamp': datetime.now(),
'scope': scope,
'tests': [],
'summary': {}
}
suite = self.test_suites[framework]
tests = suite.get_tests(scope)
for test in tests:
result = self.execute_test(test)
test_results['tests'].append({
'test_id': test['id'],
'requirement': test['requirement'],
'description': test['description'],
'result': result['status'],
'evidence': result['evidence'],
'findings': result['findings']
})
# Calculate summary
test_results['summary'] = self.calculate_test_summary(
test_results['tests']
)
return test_results
def execute_test(self, test):
"""Execute individual compliance test"""
try:
# Run test procedure
evidence = test['procedure']()
# Validate against expected results
findings = test['validator'](evidence)
return {
'status': 'pass' if not findings else 'fail',
'evidence': evidence,
'findings': findings
}
except Exception as e:
return {
'status': 'error',
'evidence': None,
'findings': [str(e)]
}
Integration with GRC Platforms
GRC API Integration
class GRCIntegration:
def __init__(self, grc_config):
self.grc_apis = {
'servicenow': ServiceNowGRC(grc_config['servicenow']),
'archer': ArcherGRC(grc_config['archer']),
'metricstream': MetricStreamGRC(grc_config['metricstream'])
}
def sync_compliance_data(self):
"""Sync Wazuh compliance data with GRC platforms"""
compliance_data = self.collect_wazuh_compliance_data()
for platform, api in self.grc_apis.items():
try:
# Transform data to platform format
transformed = self.transform_for_platform(
compliance_data, platform
)
# Push to GRC platform
result = api.update_compliance_status(transformed)
# Handle response
if result['success']:
self.log_sync_success(platform, result)
else:
self.handle_sync_failure(platform, result)
except Exception as e:
self.handle_sync_error(platform, e)
Compliance Metrics and ROI
Compliance Performance Dashboard
{
"compliance_metrics": {
"overall_compliance_score": 92.3,
"framework_scores": {
"gdpr": 94.1,
"hipaa": 91.8,
"pci_dss": 89.7,
"sox": 93.5
},
"automation_metrics": {
"violations_auto_detected": 3421,
"violations_auto_remediated": 2876,
"automation_rate": 0.84,
"manual_interventions": 545
},
"audit_performance": {
"audit_prep_time_reduction": "87%",
"evidence_collection_time": "4.2 hours",
"finding_response_time": "2.3 hours",
"continuous_compliance_coverage": "98.7%"
},
"cost_savings": {
"avoided_penalties": "$2.3M",
"reduced_audit_costs": "$450K",
"fte_hours_saved": 3200,
"total_roi": "342%"
},
"violation_trends": {
"total_violations_ytd": 8234,
"critical_violations": 23,
"repeat_violations": 156,
"mttr": "6.4 hours"
}
}
}
Best Practices
1. Framework Mapping
def create_control_mappings():
"""Map controls across compliance frameworks"""
mappings = {
'encryption_at_rest': {
'gdpr': ['Article 32(1)(a)'],
'hipaa': ['164.312(a)(2)(iv)'],
'pci_dss': ['3.4.1'],
'iso27001': ['A.10.1.1'],
'nist': ['SC-28']
},
'access_control': {
'gdpr': ['Article 32(1)(b)'],
'hipaa': ['164.312(a)(1)'],
'pci_dss': ['7.1', '7.2'],
'iso27001': ['A.9.1.1'],
'nist': ['AC-2']
}
}
return mappings
2. Continuous Improvement
def optimize_compliance_rules():
"""Continuously improve compliance detection"""
# Analyze false positives
false_positives = analyze_false_positive_rate()
# Tune detection rules
for rule in false_positives['high_fp_rules']:
tune_rule_thresholds(rule)
# Add new compliance requirements
new_requirements = check_regulatory_updates()
for req in new_requirements:
create_detection_rule(req)
# Validate rule effectiveness
validate_rule_coverage()
Conclusion
Automated compliance is not about replacing human judgment but augmenting it with continuous, comprehensive monitoring that scales across multiple frameworks. Wazuh’s compliance automation framework transforms compliance from a periodic scramble into a continuous state of readiness. By correlating requirements across frameworks, automating evidence collection, and providing real-time visibility, organizations can maintain compliance while focusing on their core business.
Next Steps
- Map your compliance requirements to Wazuh rules
- Deploy framework-specific detection rules
- Configure automated evidence collection
- Establish remediation workflows
- Integrate with existing GRC platforms
Remember: Compliance is not a destination but a journey. Automate the journey, and you’ll always know where you stand.