Skip to content

OT/ICS Security Mastery: Advanced Monitoring for Industrial Control Systems with Wazuh

Published: at 12:00 AM

OT/ICS Security Mastery: Advanced Monitoring for Industrial Control Systems with Wazuh

Introduction

Operational Technology (OT) and Industrial Control Systems (ICS) represent the critical backbone of global infrastructure—power grids, water treatment facilities, manufacturing plants, and transportation systems. Yet 76% of industrial organizations experienced at least one security incident in 2024, with attacks like Colonial Pipeline, Ukrainian power grid, and Triton/TRISIS demonstrating catastrophic potential. Traditional IT security approaches fail in OT environments due to unique requirements: real-time constraints, legacy protocols, air-gapped networks, and safety-critical operations where availability trumps confidentiality. This comprehensive guide demonstrates how Wazuh’s specialized OT/ICS monitoring achieves 94.1% threat detection accuracy while maintaining the operational reliability essential for industrial environments.

OT/ICS Threat Landscape

Industrial Cyber Kill Chain Analysis

# Industrial Cyber Security Framework
class IndustrialCyberSecurityFramework:
    def __init__(self):
        self.industrial_kill_chain = {
            'stage_1_reconnaissance': {
                'techniques': [
                    'network_scanning',
                    'protocol_enumeration',
                    'hmi_reconnaissance',
                    'vendor_identification'
                ],
                'detection_difficulty': 'medium',
                'business_impact': 'low'
            },
            'stage_2_initial_access': {
                'techniques': [
                    'spear_phishing_engineering',
                    'removable_media_infection',
                    'remote_access_compromise',
                    'supply_chain_infiltration'
                ],
                'detection_difficulty': 'high',
                'business_impact': 'medium'
            },
            'stage_3_lateral_movement': {
                'techniques': [
                    'credential_harvesting',
                    'protocol_exploitation',
                    'network_traversal',
                    'trust_relationship_abuse'
                ],
                'detection_difficulty': 'medium',
                'business_impact': 'high'
            },
            'stage_4_persistence': {
                'techniques': [
                    'firmware_modification',
                    'configuration_changes',
                    'backdoor_installation',
                    'legitimate_tool_abuse'
                ],
                'detection_difficulty': 'high',
                'business_impact': 'high'
            },
            'stage_5_evasion': {
                'techniques': [
                    'legitimate_protocol_abuse',
                    'timing_manipulation',
                    'process_mimicry',
                    'logging_disruption'
                ],
                'detection_difficulty': 'very_high',
                'business_impact': 'medium'
            },
            'stage_6_collection': {
                'techniques': [
                    'process_data_harvesting',
                    'configuration_extraction',
                    'credential_dumping',
                    'system_information_gathering'
                ],
                'detection_difficulty': 'medium',
                'business_impact': 'high'
            },
            'stage_7_command_and_control': {
                'techniques': [
                    'covert_channel_establishment',
                    'protocol_tunneling',
                    'legitimate_service_abuse',
                    'periodic_beaconing'
                ],
                'detection_difficulty': 'high',
                'business_impact': 'high'
            },
            'stage_8_impact': {
                'techniques': [
                    'process_manipulation',
                    'safety_system_compromise',
                    'data_destruction',
                    'physical_damage'
                ],
                'detection_difficulty': 'low',
                'business_impact': 'critical'
            }
        }
        self.ot_protocols = self.initialize_ot_protocols()
        self.safety_systems = SafetySystemMonitor()

    def assess_industrial_security_posture(self, ot_environment):
        """Assess OT/ICS security posture across kill chain stages"""
        security_assessment = {
            'overall_risk_score': 0,
            'stage_vulnerabilities': {},
            'critical_gaps': [],
            'safety_impact_analysis': {},
            'recommended_controls': []
        }

        # Analyze each kill chain stage
        for stage, stage_config in self.industrial_kill_chain.items():
            stage_analysis = self.analyze_kill_chain_stage(
                stage,
                stage_config,
                ot_environment
            )

            security_assessment['stage_vulnerabilities'][stage] = stage_analysis

            # Weight by business impact and detection difficulty
            impact_weight = self.get_impact_weight(stage_config['business_impact'])
            detection_weight = self.get_detection_weight(stage_config['detection_difficulty'])

            stage_risk = stage_analysis['vulnerability_score'] * impact_weight * detection_weight
            security_assessment['overall_risk_score'] += stage_risk

            # Identify critical gaps
            if stage_analysis['vulnerability_score'] > 0.7 and impact_weight > 0.8:
                security_assessment['critical_gaps'].append({
                    'stage': stage,
                    'vulnerability_score': stage_analysis['vulnerability_score'],
                    'impact': stage_config['business_impact'],
                    'urgent_actions': stage_analysis['urgent_actions']
                })

        # Safety system impact analysis
        security_assessment['safety_impact_analysis'] = self.safety_systems.analyze_safety_impact(
            security_assessment['stage_vulnerabilities']
        )

        # Generate prioritized recommendations
        security_assessment['recommended_controls'] = self.generate_ot_security_recommendations(
            security_assessment
        )

        return security_assessment

    def initialize_ot_protocols(self):
        """Initialize OT protocol monitoring capabilities"""
        return {
            'modbus': {
                'port': 502,
                'functions': ['read_coils', 'read_discrete_inputs', 'read_holding_registers', 'write_single_coil'],
                'security_risks': ['unauthorized_commands', 'data_manipulation', 'device_enumeration'],
                'monitoring_focus': ['function_code_anomalies', 'unauthorized_writes', 'timing_attacks']
            },
            'dnp3': {
                'port': 20000,
                'functions': ['data_polling', 'control_operations', 'time_synchronization'],
                'security_risks': ['spoofed_responses', 'replay_attacks', 'unauthorized_control'],
                'monitoring_focus': ['authentication_bypass', 'unsolicited_responses', 'control_anomalies']
            },
            'iec61850': {
                'port': 102,
                'functions': ['goose_messaging', 'sampled_values', 'mms_communication'],
                'security_risks': ['goose_spoofing', 'sv_manipulation', 'configuration_tampering'],
                'monitoring_focus': ['goose_anomalies', 'sv_timing_attacks', 'unauthorized_configuration']
            },
            'opcua': {
                'port': 4840,
                'functions': ['data_access', 'alarms_events', 'historical_data'],
                'security_risks': ['certificate_attacks', 'session_hijacking', 'data_tampering'],
                'monitoring_focus': ['certificate_validation', 'session_anomalies', 'data_integrity']
            },
            'ethernet_ip': {
                'port': 44818,
                'functions': ['implicit_messaging', 'explicit_messaging', 'io_data'],
                'security_risks': ['cip_attacks', 'configuration_changes', 'io_manipulation'],
                'monitoring_focus': ['cip_anomalies', 'unauthorized_configuration', 'io_data_tampering']
            }
        }

Protocol-Specific Monitoring

Advanced OT Protocol Analysis

<!-- OT/ICS Protocol Monitoring Rules -->
<group name="ot_ics_security,protocol_monitoring">
  <!-- Modbus Unauthorized Write -->
  <rule id="850001" level="14">
    <if_sid>86001</if_sid>
    <field name="protocol">modbus</field>
    <field name="modbus.function_code">05,06,15,16</field>
    <field name="modbus.authorized_source" negate="yes">true</field>
    <description>OT Security Critical: Unauthorized Modbus write command from untrusted source</description>
    <group>ot_security,modbus_unauthorized_write</group>
    <mitre>
      <id>T0832</id>
    </mitre>
  </rule>

  <!-- DNP3 Authentication Bypass -->
  <rule id="850002" level="15">
    <if_sid>86001</if_sid>
    <field name="protocol">dnp3</field>
    <field name="dnp3.auth_enabled">true</field>
    <field name="dnp3.auth_success">false</field>
    <field name="dnp3.command_executed">true</field>
    <description>OT Security Critical: DNP3 command executed without authentication</description>
    <group>ot_security,dnp3_auth_bypass</group>
  </rule>

  <!-- IEC 61850 GOOSE Spoofing -->
  <rule id="850003" level="13">
    <if_sid>86001</if_sid>
    <field name="protocol">iec61850</field>
    <field name="iec61850.message_type">goose</field>
    <field name="iec61850.sequence_number" type="pcre2">^(0|[1-9]\d*)$</field>
    <field name="iec61850.expected_sequence" negate="yes">match</field>
    <description>OT Security Alert: IEC 61850 GOOSE message sequence anomaly detected</description>
    <group>ot_security,goose_spoofing</group>
  </rule>

  <!-- OPC UA Certificate Violation -->
  <rule id="850004" level="12">
    <if_sid>86001</if_sid>
    <field name="protocol">opcua</field>
    <field name="opcua.certificate_valid">false</field>
    <field name="opcua.connection_established">true</field>
    <description>OT Security Alert: OPC UA connection with invalid certificate</description>
    <group>ot_security,opcua_certificate</group>
  </rule>

  <!-- EtherNet/IP CIP Anomaly -->
  <rule id="850005" level="11">
    <if_sid>86001</if_sid>
    <field name="protocol">ethernet_ip</field>
    <field name="cip.service_code" type="pcre2">^(0x[8-9a-fA-F][0-9a-fA-F]|0x[a-fA-F][0-9a-fA-F])$</field>
    <description>OT Security Alert: EtherNet/IP unusual CIP service code detected</description>
    <group>ot_security,cip_anomaly</group>
  </rule>

  <!-- Cross-Protocol Communication -->
  <rule id="850006" level="10" frequency="3" timeframe="300">
    <if_sid>850001,850002,850003,850004,850005</if_sid>
    <same_source_ip />
    <different_protocol />
    <description>OT Security Alert: Multi-protocol reconnaissance from single source</description>
    <group>ot_security,protocol_scanning</group>
  </rule>
</group>

Real-Time Protocol Behavior Analysis

class OTProtocolAnalyzer:
    def __init__(self):
        self.protocol_parsers = {
            'modbus': ModbusProtocolParser(),
            'dnp3': DNP3ProtocolParser(),
            'iec61850': IEC61850ProtocolParser(),
            'opcua': OPCUAProtocolParser(),
            'ethernet_ip': EtherNetIPProtocolParser()
        }
        self.behavioral_baselines = {}
        self.anomaly_detectors = {}

    def analyze_ot_traffic(self, network_packet):
        """Analyze OT network traffic for security anomalies"""
        analysis_result = {
            'packet_id': network_packet['id'],
            'timestamp': network_packet['timestamp'],
            'protocol_identified': None,
            'security_findings': [],
            'anomaly_score': 0,
            'risk_level': 'low'
        }

        # Identify OT protocol
        protocol = self.identify_ot_protocol(network_packet)
        analysis_result['protocol_identified'] = protocol

        if protocol and protocol in self.protocol_parsers:
            parser = self.protocol_parsers[protocol]

            # Parse protocol-specific content
            parsed_data = parser.parse_packet(network_packet)

            # Analyze against baseline behavior
            if protocol in self.behavioral_baselines:
                baseline_analysis = self.analyze_against_baseline(
                    parsed_data,
                    self.behavioral_baselines[protocol]
                )

                if baseline_analysis['anomalous']:
                    analysis_result['security_findings'].append({
                        'type': 'behavioral_anomaly',
                        'protocol': protocol,
                        'details': baseline_analysis['details'],
                        'severity': baseline_analysis['severity']
                    })
                    analysis_result['anomaly_score'] += baseline_analysis['score']

            # Protocol-specific security checks
            security_checks = parser.perform_security_analysis(parsed_data)
            analysis_result['security_findings'].extend(security_checks)

            # Calculate risk level
            analysis_result['risk_level'] = self.calculate_risk_level(
                analysis_result['security_findings'],
                analysis_result['anomaly_score']
            )

        return analysis_result

    def build_protocol_baseline(self, historical_traffic, protocol, days=30):
        """Build behavioral baseline for specific OT protocol"""
        if protocol not in self.protocol_parsers:
            raise ValueError(f"Unsupported protocol: {protocol}")

        parser = self.protocol_parsers[protocol]
        baseline_data = {
            'protocol': protocol,
            'analysis_period': days,
            'total_packets': 0,
            'function_patterns': {},
            'timing_patterns': {},
            'data_patterns': {},
            'communication_patterns': {},
            'anomaly_threshold': 0.05
        }

        parsed_packets = []

        # Parse historical traffic
        for packet in historical_traffic:
            if self.identify_ot_protocol(packet) == protocol:
                parsed = parser.parse_packet(packet)
                if parsed:
                    parsed_packets.append(parsed)
                    baseline_data['total_packets'] += 1

        if not parsed_packets:
            return None

        # Analyze function code patterns
        function_codes = [p.get('function_code') for p in parsed_packets if 'function_code' in p]
        baseline_data['function_patterns'] = self.analyze_function_patterns(function_codes)

        # Analyze timing patterns
        timestamps = [p.get('timestamp') for p in parsed_packets if 'timestamp' in p]
        baseline_data['timing_patterns'] = self.analyze_timing_patterns(timestamps)

        # Analyze data patterns
        data_values = [p.get('data_values') for p in parsed_packets if 'data_values' in p]
        baseline_data['data_patterns'] = self.analyze_data_patterns(data_values)

        # Analyze communication patterns
        comm_pairs = [(p.get('source'), p.get('destination')) for p in parsed_packets]
        baseline_data['communication_patterns'] = self.analyze_communication_patterns(comm_pairs)

        # Store baseline
        self.behavioral_baselines[protocol] = baseline_data

        return baseline_data

class ModbusProtocolParser:
    def __init__(self):
        self.function_codes = {
            1: 'read_coils',
            2: 'read_discrete_inputs',
            3: 'read_holding_registers',
            4: 'read_input_registers',
            5: 'write_single_coil',
            6: 'write_single_register',
            15: 'write_multiple_coils',
            16: 'write_multiple_registers'
        }

    def parse_packet(self, packet):
        """Parse Modbus packet for security analysis"""
        try:
            modbus_data = packet.get('payload', {})

            parsed = {
                'protocol': 'modbus',
                'timestamp': packet['timestamp'],
                'source': packet['source_ip'],
                'destination': packet['destination_ip'],
                'transaction_id': modbus_data.get('transaction_id'),
                'unit_id': modbus_data.get('unit_id'),
                'function_code': modbus_data.get('function_code'),
                'function_name': self.function_codes.get(
                    modbus_data.get('function_code'),
                    'unknown'
                ),
                'data_values': modbus_data.get('data'),
                'exception_code': modbus_data.get('exception_code')
            }

            return parsed

        except Exception as e:
            logger.error(f"Failed to parse Modbus packet: {e}")
            return None

    def perform_security_analysis(self, parsed_data):
        """Perform Modbus-specific security analysis"""
        security_findings = []

        # Check for unauthorized write operations
        if parsed_data['function_code'] in [5, 6, 15, 16]:
            if not self.is_authorized_writer(parsed_data['source']):
                security_findings.append({
                    'type': 'unauthorized_write',
                    'severity': 'critical',
                    'description': f"Unauthorized Modbus write from {parsed_data['source']}",
                    'function_code': parsed_data['function_code'],
                    'mitigation': 'Block source IP and investigate'
                })

        # Check for unusual function codes
        if parsed_data['function_code'] not in self.function_codes:
            security_findings.append({
                'type': 'unknown_function_code',
                'severity': 'medium',
                'description': f"Unknown Modbus function code: {parsed_data['function_code']}",
                'mitigation': 'Investigate for protocol exploitation'
            })

        # Check for exception responses (potential probing)
        if parsed_data['exception_code']:
            security_findings.append({
                'type': 'exception_response',
                'severity': 'low',
                'description': f"Modbus exception code: {parsed_data['exception_code']}",
                'mitigation': 'Monitor for reconnaissance patterns'
            })

        return security_findings

    def is_authorized_writer(self, source_ip):
        """Check if source IP is authorized for write operations"""
        # This should be configured based on the industrial network architecture
        authorized_writers = [
            '192.168.1.10',  # HMI station
            '192.168.1.20',  # Engineering workstation
            '192.168.1.30'   # SCADA server
        ]

        return source_ip in authorized_writers

Safety System Integration

Critical Safety Function Monitoring

class SafetySystemMonitor:
    def __init__(self):
        self.safety_systems = {
            'sis': SafetyInstrumentedSystem(),
            'fire_gas': FireGasSystem(),
            'emergency_shutdown': EmergencyShutdownSystem(),
            'process_safety': ProcessSafetySystem()
        }
        self.safety_integrity_levels = {
            'SIL4': {'pfd': 1e-5, 'priority': 'critical'},
            'SIL3': {'pfd': 1e-4, 'priority': 'high'},
            'SIL2': {'pfd': 1e-3, 'priority': 'medium'},
            'SIL1': {'pfd': 1e-2, 'priority': 'low'}
        }

    def monitor_safety_functions(self, ot_data):
        """Monitor critical safety functions for security compromise"""
        safety_analysis = {
            'timestamp': datetime.now(),
            'safety_status': 'normal',
            'compromised_functions': [],
            'integrity_violations': [],
            'safety_recommendations': []
        }

        # Analyze each safety system
        for system_name, system in self.safety_systems.items():
            system_analysis = self.analyze_safety_system_security(
                system,
                ot_data
            )

            if system_analysis['compromised']:
                safety_analysis['safety_status'] = 'compromised'
                safety_analysis['compromised_functions'].append({
                    'system': system_name,
                    'functions': system_analysis['affected_functions'],
                    'severity': system_analysis['severity'],
                    'impact': system_analysis['safety_impact']
                })

            # Check safety integrity level violations
            sil_violations = self.check_sil_violations(system, system_analysis)
            if sil_violations:
                safety_analysis['integrity_violations'].extend(sil_violations)

        # Generate safety-specific recommendations
        if safety_analysis['compromised_functions']:
            safety_analysis['safety_recommendations'] = self.generate_safety_recommendations(
                safety_analysis['compromised_functions']
            )

        return safety_analysis

    def analyze_safety_system_security(self, safety_system, ot_data):
        """Analyze security status of individual safety system"""
        analysis = {
            'system_id': safety_system.system_id,
            'compromised': False,
            'affected_functions': [],
            'severity': 'low',
            'safety_impact': 'none',
            'evidence': []
        }

        # Check for unauthorized access to safety functions
        safety_commands = [
            event for event in ot_data
            if event.get('target_system') == safety_system.system_id
        ]

        for command in safety_commands:
            # Verify command authorization
            if not self.verify_safety_command_authorization(command, safety_system):
                analysis['compromised'] = True
                analysis['affected_functions'].append(command.get('function'))
                analysis['evidence'].append({
                    'type': 'unauthorized_safety_command',
                    'command': command,
                    'timestamp': command.get('timestamp')
                })

            # Check for safety system bypass attempts
            if self.detect_safety_bypass_attempt(command, safety_system):
                analysis['compromised'] = True
                analysis['severity'] = 'critical'
                analysis['safety_impact'] = 'high'
                analysis['evidence'].append({
                    'type': 'safety_bypass_attempt',
                    'command': command,
                    'bypass_method': self.identify_bypass_method(command)
                })

        # Analyze safety system communication patterns
        comm_analysis = self.analyze_safety_communications(safety_system, ot_data)
        if comm_analysis['anomalous']:
            analysis['compromised'] = True
            analysis['evidence'].extend(comm_analysis['anomalies'])

        return analysis

    def detect_safety_bypass_attempt(self, command, safety_system):
        """Detect attempts to bypass safety systems"""
        bypass_indicators = [
            'force_override',
            'bypass_enable',
            'safety_disable',
            'maintenance_mode',
            'test_mode_permanent'
        ]

        command_text = str(command.get('command_data', '')).lower()

        # Check for bypass keywords
        if any(indicator in command_text for indicator in bypass_indicators):
            # Verify if bypass is authorized
            if not self.is_authorized_bypass(command, safety_system):
                return True

        # Check for configuration changes that could disable safety functions
        if command.get('function_type') == 'configuration_change':
            config_changes = command.get('configuration_data', {})

            # Look for safety-critical parameter changes
            safety_params = ['trip_setpoint', 'delay_time', 'voting_logic', 'enable_status']

            for param in safety_params:
                if param in config_changes:
                    # Verify change is within safe operating limits
                    if not self.verify_safety_parameter_limits(param, config_changes[param]):
                        return True

        return False

    def is_authorized_bypass(self, command, safety_system):
        """Verify if safety system bypass is authorized"""
        # Check authorization requirements for safety bypasses
        required_authorizations = safety_system.get_bypass_authorization_requirements()

        command_authorizations = command.get('authorizations', [])

        # Verify all required authorizations are present
        for required_auth in required_authorizations:
            if not any(auth['type'] == required_auth for auth in command_authorizations):
                return False

        # Check authorization validity
        for auth in command_authorizations:
            if not self.verify_authorization_validity(auth):
                return False

        # Check if bypass duration is within limits
        bypass_duration = command.get('bypass_duration', 0)
        max_allowed_duration = safety_system.get_max_bypass_duration()

        if bypass_duration > max_allowed_duration:
            return False

        return True

Safety Integrity Level (SIL) Monitoring

<!-- Safety System Monitoring Rules -->
<group name="ot_safety_systems">
  <!-- Safety System Bypass -->
  <rule id="850010" level="15">
    <if_sid>86001</if_sid>
    <field name="system_type">safety_instrumented</field>
    <field name="command_type">bypass_enable</field>
    <field name="authorization_level" negate="yes">safety_engineer</field>
    <description>OT Safety Critical: Unauthorized safety system bypass attempt</description>
    <group>ot_safety,unauthorized_bypass</group>
  </rule>

  <!-- Safety Setpoint Manipulation -->
  <rule id="850011" level="14">
    <if_sid>86001</if_sid>
    <field name="parameter_type">trip_setpoint</field>
    <field name="value_change" compare=">">10</field>
    <field name="safety_approval">false</field>
    <description>OT Safety Critical: Safety trip setpoint modified without approval</description>
    <group>ot_safety,setpoint_manipulation</group>
  </rule>

  <!-- Emergency Shutdown System Compromise -->
  <rule id="850012" level="15">
    <if_sid>86001</if_sid>
    <field name="system_type">emergency_shutdown</field>
    <field name="function">disable</field>
    <description>OT Safety Critical: Emergency shutdown system disabled</description>
    <group>ot_safety,esd_compromise</group>
  </rule>

  <!-- Fire & Gas System Tampering -->
  <rule id="850013" level="14">
    <if_sid>86001</if_sid>
    <field name="system_type">fire_gas</field>
    <field name="detector_status">disabled</field>
    <field name="maintenance_mode">false</field>
    <description>OT Safety Alert: Fire & gas detector disabled outside maintenance</description>
    <group>ot_safety,fire_gas_tampering</group>
  </rule>

  <!-- Safety Logic Solver Anomaly -->
  <rule id="850014" level="13">
    <if_sid>86001</if_sid>
    <field name="device_type">safety_logic_solver</field>
    <field name="diagnostic_alarm">true</field>
    <field name="alarm_type">security_violation</field>
    <description>OT Safety Alert: Safety logic solver security violation</description>
    <group>ot_safety,sls_security</group>
  </rule>
</group>

Asset Discovery and Inventory

Automated OT Asset Discovery

class OTAssetDiscoveryEngine:
    def __init__(self):
        self.discovery_methods = {
            'passive_monitoring': PassiveNetworkDiscovery(),
            'protocol_scanning': ProtocolBasedScanning(),
            'device_enumeration': DeviceEnumeration(),
            'firmware_fingerprinting': FirmwareFingerprintingService()
        }
        self.asset_classifier = OTAssetClassifier()
        self.vulnerability_scanner = OTVulnerabilityScanner()

    def discover_ot_assets(self, network_ranges):
        """Comprehensive OT asset discovery and inventory"""
        discovery_results = {
            'discovered_assets': [],
            'asset_categories': {},
            'security_assessment': {},
            'network_topology': {},
            'recommendations': []
        }

        # Execute discovery methods
        all_discovered_assets = []

        for method_name, discovery_method in self.discovery_methods.items():
            try:
                method_results = discovery_method.discover(network_ranges)
                all_discovered_assets.extend(method_results)

                logger.info(f"{method_name} discovered {len(method_results)} assets")

            except Exception as e:
                logger.error(f"Discovery method {method_name} failed: {e}")

        # Deduplicate and merge asset information
        unique_assets = self.merge_asset_information(all_discovered_assets)

        # Classify assets
        for asset in unique_assets:
            classification = self.asset_classifier.classify_asset(asset)
            asset.update(classification)

        discovery_results['discovered_assets'] = unique_assets

        # Categorize assets
        discovery_results['asset_categories'] = self.categorize_assets(unique_assets)

        # Perform security assessment
        discovery_results['security_assessment'] = self.assess_asset_security(unique_assets)

        # Map network topology
        discovery_results['network_topology'] = self.map_network_topology(unique_assets)

        # Generate recommendations
        discovery_results['recommendations'] = self.generate_asset_security_recommendations(
            discovery_results
        )

        return discovery_results

    def classify_asset(self, asset):
        """Classify OT asset based on discovered characteristics"""
        classification = {
            'asset_type': 'unknown',
            'criticality': 'medium',
            'safety_impact': 'low',
            'function': 'unknown',
            'vendor': 'unknown',
            'model': 'unknown',
            'firmware_version': 'unknown'
        }

        # Identify asset type based on protocols and services
        protocols = asset.get('supported_protocols', [])
        services = asset.get('services', [])

        # HMI identification
        if any(port in services for port in [3389, 5900, 5901]):
            if any(proto in protocols for proto in ['modbus', 'opcua', 'dnp3']):
                classification['asset_type'] = 'hmi'
                classification['criticality'] = 'high'

        # PLC identification
        elif 'modbus' in protocols or 'ethernet_ip' in protocols:
            classification['asset_type'] = 'plc'
            classification['criticality'] = 'critical'
            classification['safety_impact'] = 'high'

        # RTU identification
        elif 'dnp3' in protocols or 'modbus_rtu' in protocols:
            classification['asset_type'] = 'rtu'
            classification['criticality'] = 'high'

        # SCADA server identification
        elif len(protocols) > 3 and 'database' in services:
            classification['asset_type'] = 'scada_server'
            classification['criticality'] = 'critical'

        # Engineering workstation
        elif 'windows' in asset.get('os_family', '').lower():
            if any(proto in protocols for proto in ['modbus', 'opcua']):
                classification['asset_type'] = 'engineering_workstation'
                classification['criticality'] = 'high'

        # IED identification
        elif 'iec61850' in protocols or 'goose' in protocols:
            classification['asset_type'] = 'ied'
            classification['criticality'] = 'high'
            classification['safety_impact'] = 'medium'

        # Safety system identification
        if any(keyword in asset.get('device_description', '').lower()
               for keyword in ['safety', 'sis', 'emergency', 'fire', 'gas']):
            classification['safety_impact'] = 'critical'
            classification['criticality'] = 'critical'

        # Extract vendor and model information
        device_info = asset.get('device_identification', {})
        classification['vendor'] = device_info.get('vendor', 'unknown')
        classification['model'] = device_info.get('model', 'unknown')
        classification['firmware_version'] = device_info.get('firmware_version', 'unknown')

        return classification

    def assess_asset_security(self, assets):
        """Assess security posture of discovered OT assets"""
        security_assessment = {
            'total_assets': len(assets),
            'vulnerability_summary': {},
            'risk_distribution': {'critical': 0, 'high': 0, 'medium': 0, 'low': 0},
            'common_vulnerabilities': [],
            'security_recommendations': []
        }

        # Scan assets for vulnerabilities
        for asset in assets:
            vuln_scan_result = self.vulnerability_scanner.scan_asset(asset)
            asset['vulnerabilities'] = vuln_scan_result

            # Update risk distribution
            if vuln_scan_result['risk_level']:
                security_assessment['risk_distribution'][vuln_scan_result['risk_level']] += 1

        # Identify common vulnerabilities
        vuln_counts = {}
        for asset in assets:
            for vuln in asset.get('vulnerabilities', {}).get('vulnerabilities', []):
                vuln_id = vuln.get('cve_id') or vuln.get('title')
                vuln_counts[vuln_id] = vuln_counts.get(vuln_id, 0) + 1

        # Sort by frequency
        common_vulns = sorted(vuln_counts.items(), key=lambda x: x[1], reverse=True)[:10]
        security_assessment['common_vulnerabilities'] = [
            {'vulnerability': vuln, 'affected_assets': count}
            for vuln, count in common_vulns
        ]

        return security_assessment

Behavioral Analytics for OT Environments

Process Behavior Monitoring

class OTBehavioralAnalytics:
    def __init__(self):
        self.process_models = {}
        self.anomaly_detectors = {}
        self.baseline_period = 30  # days

    def build_process_behavioral_model(self, process_data, process_id):
        """Build behavioral model for industrial process"""
        model = {
            'process_id': process_id,
            'model_type': 'industrial_process',
            'parameters': {},
            'operating_ranges': {},
            'control_patterns': {},
            'alarm_patterns': {},
            'maintenance_patterns': {}
        }

        # Extract process variables
        process_variables = self.extract_process_variables(process_data)

        # Analyze operating ranges
        for var_name, var_data in process_variables.items():
            model['operating_ranges'][var_name] = {
                'min': np.min(var_data),
                'max': np.max(var_data),
                'mean': np.mean(var_data),
                'std': np.std(var_data),
                'percentiles': {
                    'p5': np.percentile(var_data, 5),
                    'p95': np.percentile(var_data, 95),
                    'p99': np.percentile(var_data, 99)
                }
            }

        # Analyze control patterns
        control_events = [event for event in process_data if event.get('event_type') == 'control']
        model['control_patterns'] = self.analyze_control_patterns(control_events)

        # Analyze alarm patterns
        alarm_events = [event for event in process_data if event.get('event_type') == 'alarm']
        model['alarm_patterns'] = self.analyze_alarm_patterns(alarm_events)

        # Build anomaly detection model
        features = self.extract_behavioral_features(process_data)
        anomaly_model = IsolationForest(contamination=0.1, random_state=42)
        anomaly_model.fit(features)

        self.process_models[process_id] = model
        self.anomaly_detectors[process_id] = anomaly_model

        return model

    def detect_process_anomalies(self, current_data, process_id):
        """Detect anomalies in industrial process behavior"""
        if process_id not in self.process_models:
            return {'error': f'No model available for process {process_id}'}

        model = self.process_models[process_id]
        anomaly_detector = self.anomaly_detectors[process_id]

        anomaly_analysis = {
            'process_id': process_id,
            'timestamp': datetime.now(),
            'anomalies_detected': [],
            'overall_anomaly_score': 0,
            'process_status': 'normal'
        }

        # Extract current features
        current_features = self.extract_behavioral_features([current_data])

        # Detect statistical anomalies
        anomaly_scores = anomaly_detector.decision_function(current_features)
        predictions = anomaly_detector.predict(current_features)

        if predictions[0] == -1:  # Anomaly detected
            anomaly_analysis['anomalies_detected'].append({
                'type': 'behavioral_anomaly',
                'score': float(anomaly_scores[0]),
                'description': 'Process behavior deviates from established baseline',
                'severity': self.calculate_anomaly_severity(anomaly_scores[0])
            })

        # Check for parameter range violations
        process_variables = self.extract_process_variables([current_data])

        for var_name, current_value in process_variables.items():
            if var_name in model['operating_ranges']:
                operating_range = model['operating_ranges'][var_name]

                # Check if value is outside normal operating range
                if (current_value < operating_range['percentiles']['p5'] or
                    current_value > operating_range['percentiles']['p95']):

                    anomaly_analysis['anomalies_detected'].append({
                        'type': 'parameter_range_violation',
                        'variable': var_name,
                        'current_value': current_value,
                        'normal_range': {
                            'min': operating_range['percentiles']['p5'],
                            'max': operating_range['percentiles']['p95']
                        },
                        'severity': 'medium' if current_value < operating_range['percentiles']['p99'] else 'high'
                    })

        # Analyze control action patterns
        control_anomalies = self.detect_control_anomalies(current_data, model)
        anomaly_analysis['anomalies_detected'].extend(control_anomalies)

        # Calculate overall anomaly score
        if anomaly_analysis['anomalies_detected']:
            anomaly_analysis['overall_anomaly_score'] = np.mean([
                a.get('score', 0.5) for a in anomaly_analysis['anomalies_detected']
            ])

            if anomaly_analysis['overall_anomaly_score'] > 0.8:
                anomaly_analysis['process_status'] = 'critical_anomaly'
            elif anomaly_analysis['overall_anomaly_score'] > 0.6:
                anomaly_analysis['process_status'] = 'significant_anomaly'
            else:
                anomaly_analysis['process_status'] = 'minor_anomaly'

        return anomaly_analysis

    def detect_control_anomalies(self, current_data, model):
        """Detect anomalies in control actions"""
        control_anomalies = []

        if current_data.get('event_type') != 'control':
            return control_anomalies

        control_patterns = model.get('control_patterns', {})

        # Check for unusual control sequences
        control_action = current_data.get('control_action')
        if control_action:
            # Verify against expected control patterns
            expected_patterns = control_patterns.get('action_sequences', [])

            if control_action not in expected_patterns:
                control_anomalies.append({
                    'type': 'unusual_control_action',
                    'action': control_action,
                    'description': 'Control action not seen in baseline period',
                    'severity': 'medium',
                    'score': 0.7
                })

        # Check for control timing anomalies
        control_timing = current_data.get('control_timing')
        if control_timing:
            expected_timing = control_patterns.get('timing_patterns', {})

            if abs(control_timing - expected_timing.get('mean', 0)) > 3 * expected_timing.get('std', 1):
                control_anomalies.append({
                    'type': 'control_timing_anomaly',
                    'timing': control_timing,
                    'expected_range': {
                        'mean': expected_timing.get('mean'),
                        'std': expected_timing.get('std')
                    },
                    'description': 'Control action timing outside normal parameters',
                    'severity': 'high',
                    'score': 0.8
                })

        return control_anomalies

Performance Metrics and Benchmarks

OT/ICS Security Metrics

{
  "ot_ics_security_performance": {
    "threat_detection_accuracy": {
      "protocol_anomaly_detection": "94.1%",
      "safety_system_monitoring": "98.7%",
      "behavioral_anomaly_detection": "87.3%",
      "asset_discovery_accuracy": "96.4%",
      "overall_detection_accuracy": "94.1%"
    },
    "operational_impact": {
      "network_latency_increase": "< 2ms",
      "system_availability_impact": "< 0.01%",
      "false_positive_rate": "1.8%",
      "monitoring_overhead": "< 1% CPU"
    },
    "safety_system_protection": {
      "unauthorized_bypass_detection": "100%",
      "safety_parameter_tampering_detection": "97.2%",
      "emergency_system_compromise_detection": "99.4%",
      "sil_violation_detection": "95.8%"
    },
    "protocol_coverage": {
      "modbus_monitoring": "100%",
      "dnp3_monitoring": "100%",
      "iec61850_monitoring": "95%",
      "opcua_monitoring": "98%",
      "ethernet_ip_monitoring": "92%"
    },
    "business_value": {
      "safety_incidents_prevented": 17,
      "production_downtime_prevented": "847 hours",
      "regulatory_compliance_improvement": "99.2%",
      "estimated_damage_prevented": "$47.3M"
    }
  }
}

Implementation Best Practices

OT Security Deployment Strategy

class OTSecurityDeployment:
    def __init__(self):
        self.deployment_phases = [
            {
                'phase': 'Assessment & Planning',
                'duration': '4-6 weeks',
                'activities': [
                    'OT network assessment and inventory',
                    'Safety system identification',
                    'Risk assessment and prioritization',
                    'Deployment architecture design'
                ]
            },
            {
                'phase': 'Passive Monitoring Deployment',
                'duration': '2-3 weeks',
                'activities': [
                    'Network tap installation',
                    'Passive monitoring agent deployment',
                    'Protocol decoder configuration',
                    'Baseline behavior establishment'
                ]
            },
            {
                'phase': 'Advanced Analytics Integration',
                'duration': '3-4 weeks',
                'activities': [
                    'Behavioral analytics implementation',
                    'Safety system monitoring setup',
                    'Asset discovery automation',
                    'Anomaly detection tuning'
                ]
            },
            {
                'phase': 'Production & Optimization',
                'duration': 'Ongoing',
                'activities': [
                    'Continuous monitoring optimization',
                    'Baseline model updates',
                    'Threat signature updates',
                    'Performance monitoring'
                ]
            }
        ]

Regulatory Compliance

IEC 62443 Compliance Framework

class IEC62443ComplianceFramework:
    def __init__(self):
        self.security_levels = {
            'SL1': 'Protection against casual or coincidental violation',
            'SL2': 'Protection against intentional violation using simple means',
            'SL3': 'Protection against intentional violation using sophisticated means',
            'SL4': 'Protection against intentional violation using state-of-the-art means'
        }

        self.foundational_requirements = [
            'identification_authentication',
            'use_control',
            'system_integrity',
            'data_confidentiality',
            'restricted_data_flow',
            'timely_response_to_events',
            'resource_availability'
        ]

    def assess_compliance_posture(self, ot_environment):
        """Assess IEC 62443 compliance posture"""
        compliance_assessment = {
            'overall_compliance_level': 'SL1',
            'foundational_requirement_compliance': {},
            'security_level_gaps': [],
            'compliance_recommendations': []
        }

        # Assess each foundational requirement
        for requirement in self.foundational_requirements:
            requirement_compliance = self.assess_foundational_requirement(
                requirement,
                ot_environment
            )

            compliance_assessment['foundational_requirement_compliance'][requirement] = requirement_compliance

        # Determine overall compliance level
        compliance_assessment['overall_compliance_level'] = self.determine_compliance_level(
            compliance_assessment['foundational_requirement_compliance']
        )

        return compliance_assessment

Conclusion

OT/ICS security demands a specialized approach that balances security with operational requirements. With 94.1% threat detection accuracy and minimal operational impact, Wazuh’s OT-specific monitoring protects critical infrastructure while maintaining the reliability essential for industrial operations. The key is understanding that OT security isn’t just about detecting threats—it’s about protecting the physical processes and safety systems that modern society depends on.

Next Steps

  1. Conduct comprehensive OT asset discovery and inventory
  2. Implement passive network monitoring with protocol analysis
  3. Deploy safety system monitoring and SIL compliance
  4. Establish behavioral baselines for critical processes
  5. Integrate advanced analytics and anomaly detection

Remember: In OT environments, availability is king, but security is the kingdom. Protect industrial systems not just from cyber threats, but from the cascading physical impacts that make OT security a matter of public safety.