Enterprise Custom Decoders: Advanced Log Parsing for Complex Environments
Introduction
In the heterogeneous landscape of enterprise IT, where legacy systems coexist with cutting-edge cloud services, the ability to parse and understand diverse log formats is crucial. Wazuh’s custom decoder architecture provides unparalleled flexibility in handling everything from proprietary application logs to complex multi-line formats. This comprehensive guide explores advanced decoder development techniques that transform unstructured logs into actionable security intelligence.
Understanding Wazuh Decoder Architecture
The Decoder Hierarchy
<!-- Decoder Architecture Overview -->
<decoder name="custom_app_root">
<prematch>^CUSTOM-APP:</prematch>
<plugin_decoder>JSON_Decoder</plugin_decoder>
</decoder>
<decoder name="custom_app_auth">
<parent>custom_app_root</parent>
<regex type="pcre2">
"event_type":"(\w+)","user":"([^"]+)","ip":"([^"]+)",
"result":"(\w+)","timestamp":"([^"]+)"
</regex>
<order>event_type, user, srcip, result, timestamp</order>
</decoder>
<decoder name="custom_app_transaction">
<parent>custom_app_root</parent>
<regex type="pcre2">
"transaction_id":"([^"]+)","amount":([\d.]+),
"currency":"(\w+)","status":"(\w+)"
</regex>
<order>transaction_id, amount, currency, status</order>
</decoder>
Performance-Optimized Decoder Design
# Decoder Performance Analyzer
class DecoderOptimizer:
def __init__(self):
self.performance_metrics = {
'regex_complexity': self.analyze_regex_complexity,
'prematch_efficiency': self.analyze_prematch_efficiency,
'hierarchy_depth': self.analyze_hierarchy_depth,
'field_extraction': self.analyze_field_extraction
}
def optimize_decoder(self, decoder_xml):
"""Optimize decoder for performance"""
optimization_report = {
'original': decoder_xml,
'optimizations': [],
'performance_gain': 0
}
# Analyze current performance
current_metrics = self.analyze_decoder(decoder_xml)
# Optimize prematch
if current_metrics['prematch_score'] < 0.8:
optimized_prematch = self.optimize_prematch(decoder_xml)
optimization_report['optimizations'].append({
'type': 'prematch',
'change': optimized_prematch
})
# Optimize regex
if current_metrics['regex_efficiency'] < 0.7:
optimized_regex = self.optimize_regex(decoder_xml)
optimization_report['optimizations'].append({
'type': 'regex',
'change': optimized_regex
})
return optimization_report
Advanced Parsing Techniques
Dynamic Log Format Handling
<!-- Dynamic Log Parser with Multiple Format Support -->
<decoder name="dynamic_log_parser">
<prematch>^\d{4}-\d{2}-\d{2}</prematch>
</decoder>
<!-- Format Variant 1: JSON Logs -->
<decoder name="dynamic_json">
<parent>dynamic_log_parser</parent>
<prematch>^[\d\-\s:]+\s*{</prematch>
<plugin_decoder>JSON_Decoder</plugin_decoder>
<use_own_name>yes</use_own_name>
</decoder>
<!-- Format Variant 2: Key-Value Pairs -->
<decoder name="dynamic_kv">
<parent>dynamic_log_parser</parent>
<regex type="pcre2">
^([\d\-\s:]+)\s+(\w+)=([^,]+),\s*(\w+)=([^,]+),
\s*(\w+)=([^,]+)(?:,\s*(\w+)=([^,\s]+))?
</regex>
<order>timestamp, key1, value1, key2, value2, key3, value3, key4, value4</order>
</decoder>
<!-- Format Variant 3: Pipe-Delimited -->
<decoder name="dynamic_pipe">
<parent>dynamic_log_parser</parent>
<prematch>^[\d\-\s:]+\s*\|</prematch>
<regex>^([\d\-\s:]+)\|([^|]+)\|([^|]+)\|([^|]+)\|([^|]+)</regex>
<order>timestamp, event_id, user, action, result</order>
</decoder>
Multi-Line Log Correlation
<!-- Multi-Line Java Exception Decoder -->
<decoder name="java_exception_start">
<program_name>java_app</program_name>
<prematch>^Exception in thread</prematch>
<regex>^Exception in thread "(\S+)" (\S+): (.+)$</regex>
<order>thread_name, exception_type, exception_message</order>
</decoder>
<decoder name="java_exception_stacktrace">
<program_name>java_app</program_name>
<prematch>^\s+at\s+</prematch>
<regex>^\s+at\s+(\S+)\(([^:]+):(\d+)\)</regex>
<order>method, file, line_number</order>
<accumulate>yes</accumulate>
</decoder>
<!-- Multi-Line SQL Query Decoder -->
<decoder name="sql_query_multiline">
<program_name>database</program_name>
<prematch>^BEGIN QUERY:</prematch>
<multiline_regex>
^BEGIN QUERY:\s*(\d+)\s*
((?:SELECT|INSERT|UPDATE|DELETE)[\s\S]+?)
END QUERY:\s*\1
</multiline_regex>
<order>query_id, sql_statement</order>
</decoder>
Complex Nested Structure Parsing
# Advanced Nested Parser Implementation
class NestedLogParser:
def __init__(self):
self.parsers = {
'xml': self.parse_xml_log,
'json': self.parse_json_log,
'protobuf': self.parse_protobuf_log,
'custom': self.parse_custom_nested
}
def parse_custom_nested(self, log_line):
"""Parse complex nested custom format"""
# Example: [HEADER{key1:value1,key2:{subkey1:subvalue1}}]DATA[array1,array2]
parsed = {
'header': {},
'data': [],
'metadata': {}
}
# Extract header section
header_match = re.search(r'\[HEADER({.+?})\]', log_line)
if header_match:
header_content = header_match.group(1)
parsed['header'] = self.parse_nested_brackets(header_content)
# Extract data section
data_match = re.search(r'DATA\[([^\]]+)\]', log_line)
if data_match:
parsed['data'] = data_match.group(1).split(',')
return parsed
def parse_nested_brackets(self, content):
"""Recursively parse nested bracket structures"""
result = {}
# Handle nested structures
depth = 0
current_key = ''
current_value = ''
in_value = False
for char in content:
if char == '{':
depth += 1
if depth > 1:
current_value += char
elif char == '}':
depth -= 1
if depth > 0:
current_value += char
else:
# Complete current key-value pair
if current_key:
result[current_key] = self.parse_value(current_value)
elif char == ':' and depth == 1:
in_value = True
elif char == ',' and depth == 1:
if current_key:
result[current_key] = self.parse_value(current_value)
current_key = ''
current_value = ''
in_value = False
else:
if in_value:
current_value += char
else:
current_key += char
return result
Enterprise Application Decoders
SAP System Decoder
<!-- SAP Security Audit Log Decoder -->
<decoder name="sap_audit_root">
<prematch>^SAP-AUDIT:</prematch>
</decoder>
<decoder name="sap_audit_detail">
<parent>sap_audit_root</parent>
<regex type="pcre2">
^SAP-AUDIT:\s*(\d{8})\s*(\d{6})\s*(\w+)\s*(\w+)\s*
(\w+)\s*([^,]+),\s*(\w+),\s*(\d+),\s*(.+)$
</regex>
<order>date, time, client, user, transaction, terminal,
return_code, message_number, message_text</order>
</decoder>
<!-- SAP RFC Call Decoder -->
<decoder name="sap_rfc">
<parent>sap_audit_root</parent>
<regex type="pcre2">
RFC\s+CALL:\s*Function=(\w+)\s+User=(\w+)\s+
Client=(\w+)\s+System=(\w+)\s+Return=(\w+)
</regex>
<order>function_module, user, client, system, return_code</order>
</decoder>
Oracle Database Decoder
<!-- Oracle Alert Log Decoder -->
<decoder name="oracle_alert">
<prematch>^ORA-\d{5}:</prematch>
<regex>^(ORA-\d{5}):\s*(.+)$</regex>
<order>error_code, error_message</order>
</decoder>
<!-- Oracle Audit Trail Decoder -->
<decoder name="oracle_audit">
<prematch>^AUDIT:</prematch>
<regex type="pcre2">
^AUDIT:\s*(\w+)\s+BY\s+(\w+)\s+AT\s+
([\d\-\s:]+)\s+FROM\s+([^\s]+)\s*
(?:SESSIONID:\s*(\d+))?\s*(?:RETURNCODE:\s*(\d+))?
</regex>
<order>action, username, timestamp, host, session_id, return_code</order>
</decoder>
<!-- Oracle Performance Decoder -->
<decoder name="oracle_performance">
<prematch>^PERF-METRIC:</prematch>
<plugin_decoder>
<name>custom_oracle_perf</name>
<script>parse_oracle_performance.py</script>
</plugin_decoder>
</decoder>
Kubernetes Events Decoder
<!-- Kubernetes Event Decoder -->
<decoder name="k8s_event">
<prematch>^K8S-EVENT:</prematch>
<plugin_decoder>JSON_Decoder</plugin_decoder>
</decoder>
<!-- Kubernetes Audit Log Decoder -->
<decoder name="k8s_audit_root">
<prematch>{"kind":"Event","apiVersion":"audit.k8s.io</prematch>
<plugin_decoder>JSON_Decoder</plugin_decoder>
</decoder>
<decoder name="k8s_audit_detail">
<parent>k8s_audit_root</parent>
<use_own_name>yes</use_own_name>
<regex type="pcre2">
"verb":"(\w+)".*?"objectRef":\{"resource":"(\w+)",
"namespace":"([^"]*)".*?"name":"([^"]*)".*?
"user":\{"username":"([^"]*)"
</regex>
<order>verb, resource, namespace, resource_name, username</order>
</decoder>
Dynamic Decoder Generation
ML-Based Decoder Creation
class MLDecoderGenerator:
def __init__(self):
self.pattern_learner = PatternLearningModel()
self.field_extractor = FieldExtractionModel()
def generate_decoder(self, log_samples):
"""Generate decoder from log samples using ML"""
# Learn log structure
structure = self.pattern_learner.learn_structure(log_samples)
# Identify fields
fields = self.field_extractor.identify_fields(log_samples)
# Generate decoder XML
decoder_xml = self.build_decoder_xml(structure, fields)
# Validate and optimize
optimized_decoder = self.optimize_decoder(decoder_xml)
return optimized_decoder
def learn_structure(self, samples):
"""Learn common structure from samples"""
# Tokenize samples
tokenized = [self.tokenize(s) for s in samples]
# Find common patterns
common_tokens = self.find_common_tokens(tokenized)
# Build regex pattern
pattern = self.build_pattern(common_tokens)
return {
'pattern': pattern,
'confidence': self.calculate_confidence(pattern, samples)
}
def identify_fields(self, samples):
"""Identify and classify fields"""
fields = []
# Extract potential fields
for sample in samples:
extracted = self.extract_values(sample)
for value, context in extracted:
field_type = self.classify_field(value, context)
fields.append({
'value': value,
'type': field_type,
'context': context
})
# Consolidate and name fields
consolidated_fields = self.consolidate_fields(fields)
return consolidated_fields
Template-Based Decoder Factory
class DecoderFactory:
def __init__(self):
self.templates = {
'syslog': self.syslog_template,
'json': self.json_template,
'cef': self.cef_template,
'leef': self.leef_template,
'custom_kv': self.custom_kv_template
}
def create_decoder(self, name, log_format, fields):
"""Create decoder from template"""
if log_format not in self.templates:
raise ValueError(f"Unknown format: {log_format}")
template = self.templates[log_format]
decoder = template(name, fields)
# Validate decoder
self.validate_decoder(decoder)
return decoder
def custom_kv_template(self, name, fields):
"""Generate custom key-value decoder"""
decoder = f"""
<decoder name="{name}_root">
<prematch>^{fields.get('prematch', '\\w+:')}</prematch>
</decoder>
<decoder name="{name}_kv">
<parent>{name}_root</parent>
<regex type="pcre2">"""
# Build regex for key-value pairs
regex_parts = []
order_parts = []
for field in fields['fields']:
key = field['key']
value_pattern = field.get('pattern', '[^,]+')
regex_parts.append(f"{key}=({value_pattern})")
order_parts.append(field['name'])
decoder += f"""
^{fields.get('prefix', '')}{'[,\\s]*'.join(regex_parts)}
</regex>
<order>{', '.join(order_parts)}</order>
</decoder>"""
return decoder
Performance Optimization
Decoder Benchmarking
class DecoderBenchmark:
def __init__(self, wazuh_path='/var/ossec'):
self.wazuh_path = wazuh_path
self.logtest_binary = f"{wazuh_path}/bin/wazuh-logtest"
def benchmark_decoder(self, decoder_file, log_samples, iterations=1000):
"""Benchmark decoder performance"""
results = {
'decoder': decoder_file,
'samples': len(log_samples),
'iterations': iterations,
'metrics': {}
}
# Load decoder
self.load_decoder(decoder_file)
# Benchmark parsing speed
start_time = time.time()
successful_parses = 0
for _ in range(iterations):
for sample in log_samples:
result = self.parse_log(sample)
if result['decoded']:
successful_parses += 1
elapsed_time = time.time() - start_time
# Calculate metrics
results['metrics'] = {
'total_time': elapsed_time,
'avg_time_per_log': elapsed_time / (iterations * len(log_samples)),
'logs_per_second': (iterations * len(log_samples)) / elapsed_time,
'success_rate': successful_parses / (iterations * len(log_samples)),
'cpu_usage': self.measure_cpu_usage(),
'memory_usage': self.measure_memory_usage()
}
return results
def optimize_regex(self, regex_pattern):
"""Optimize regex for performance"""
optimizations = []
# Use atomic groups for better performance
if '(?:' in regex_pattern and not '(?>':
optimized = regex_pattern.replace('(?:', '(?>')
optimizations.append({
'type': 'atomic_groups',
'original': regex_pattern,
'optimized': optimized
})
# Use possessive quantifiers
for quantifier in ['+', '*']:
if f'){quantifier}' in regex_pattern:
optimized = regex_pattern.replace(f'){quantifier}', f'){quantifier}+')
optimizations.append({
'type': 'possessive_quantifiers',
'change': f'{quantifier} -> {quantifier}+'
})
return optimizations
Decoder Caching Strategy
<!-- Cached Decoder Configuration -->
<ossec_config>
<global>
<decoder_cache>
<enabled>yes</enabled>
<size>10000</size>
<ttl>3600</ttl>
</decoder_cache>
</global>
</ossec_config>
<!-- High-Performance Decoder with Caching Hints -->
<decoder name="cached_app_decoder">
<prematch>^APP-CACHED:</prematch>
<cache_hint>frequent</cache_hint>
<regex type="pcre2" flags="CASELESS|MULTILINE">
^APP-CACHED:\s*(\d+)\s*(\w+)\s*(.+)$
</regex>
<order>event_id, event_type, message</order>
</decoder>
Testing and Validation
Comprehensive Decoder Testing
class DecoderTester:
def __init__(self):
self.test_cases = []
self.coverage_analyzer = CoverageAnalyzer()
def add_test_case(self, log_line, expected_fields):
"""Add test case for decoder"""
self.test_cases.append({
'input': log_line,
'expected': expected_fields,
'test_id': hashlib.md5(log_line.encode()).hexdigest()[:8]
})
def run_tests(self, decoder_file):
"""Run comprehensive decoder tests"""
results = {
'decoder': decoder_file,
'total_tests': len(self.test_cases),
'passed': 0,
'failed': 0,
'coverage': 0,
'failures': []
}
for test_case in self.test_cases:
result = self.test_single_case(decoder_file, test_case)
if result['passed']:
results['passed'] += 1
else:
results['failed'] += 1
results['failures'].append({
'test_id': test_case['test_id'],
'input': test_case['input'],
'expected': test_case['expected'],
'actual': result['actual'],
'error': result.get('error')
})
# Calculate coverage
results['coverage'] = self.coverage_analyzer.calculate_coverage(
decoder_file,
self.test_cases
)
return results
def generate_test_report(self, results):
"""Generate detailed test report"""
report = f"""
# Decoder Test Report
## Summary
- Decoder: {results['decoder']}
- Total Tests: {results['total_tests']}
- Passed: {results['passed']} ({results['passed']/results['total_tests']*100:.1f}%)
- Failed: {results['failed']}
- Coverage: {results['coverage']:.1f}%
## Failed Tests
"""
for failure in results['failures']:
report += f"""
### Test {failure['test_id']}
- Input: `{failure['input']}`
- Expected: {failure['expected']}
- Actual: {failure['actual']}
- Error: {failure.get('error', 'Field mismatch')}
"""
return report
Decoder Validation Rules
def validate_decoder_syntax(decoder_xml):
"""Validate decoder XML syntax and best practices"""
validation_results = {
'syntax': True,
'best_practices': [],
'warnings': [],
'errors': []
}
# Parse XML
try:
root = ET.fromstring(decoder_xml)
except ET.ParseError as e:
validation_results['syntax'] = False
validation_results['errors'].append(f"XML Parse Error: {e}")
return validation_results
# Check for required elements
if not root.find('prematch') and not root.find('parent'):
validation_results['errors'].append(
"Decoder must have either 'prematch' or 'parent'"
)
# Check regex complexity
regex_elem = root.find('regex')
if regex_elem is not None:
regex_pattern = regex_elem.text
complexity = calculate_regex_complexity(regex_pattern)
if complexity > 100:
validation_results['warnings'].append(
f"High regex complexity ({complexity}). Consider simplifying."
)
# Check for naming conventions
name = root.get('name')
if name and not re.match(r'^[a-z][a-z0-9_]*$', name):
validation_results['warnings'].append(
"Decoder name should be lowercase with underscores"
)
return validation_results
Decoder Deployment Best Practices
Production Deployment Strategy
#!/bin/bash
# Decoder deployment script
DECODER_DIR="/var/ossec/etc/decoders"
BACKUP_DIR="/var/ossec/etc/decoders/backup"
TEST_LOG="/tmp/decoder_test.log"
deploy_decoder() {
local decoder_file=$1
local decoder_name=$(basename "$decoder_file" .xml)
echo "Deploying decoder: $decoder_name"
# Backup existing decoder
if [ -f "$DECODER_DIR/$decoder_name.xml" ]; then
cp "$DECODER_DIR/$decoder_name.xml" \
"$BACKUP_DIR/$decoder_name.xml.$(date +%Y%m%d%H%M%S)"
fi
# Validate decoder
/var/ossec/bin/wazuh-logtest -v < "$TEST_LOG" > /tmp/validation.out 2>&1
if [ $? -ne 0 ]; then
echo "Decoder validation failed"
cat /tmp/validation.out
return 1
fi
# Deploy decoder
cp "$decoder_file" "$DECODER_DIR/"
# Set permissions
chown ossec:ossec "$DECODER_DIR/$decoder_name.xml"
chmod 640 "$DECODER_DIR/$decoder_name.xml"
# Restart Wazuh manager
systemctl restart wazuh-manager
echo "Decoder deployed successfully"
}
Monitoring Decoder Performance
class DecoderMonitor:
def __init__(self, elasticsearch_client):
self.es = elasticsearch_client
self.metrics = {
'parsing_failures': 0,
'parsing_successes': 0,
'avg_parsing_time': 0,
'decoder_usage': defaultdict(int)
}
def monitor_decoder_performance(self, time_range='1h'):
"""Monitor decoder performance metrics"""
# Query for decoder statistics
query = {
"query": {
"range": {
"@timestamp": {
"gte": f"now-{time_range}"
}
}
},
"aggs": {
"decoder_stats": {
"terms": {
"field": "decoder.name",
"size": 100
},
"aggs": {
"avg_time": {
"avg": {
"field": "decoder.parsing_time"
}
},
"failure_rate": {
"filter": {
"term": {
"decoder.status": "failed"
}
}
}
}
}
}
}
results = self.es.search(index="wazuh-monitoring-*", body=query)
# Process results
performance_report = []
for bucket in results['aggregations']['decoder_stats']['buckets']:
decoder_name = bucket['key']
total_count = bucket['doc_count']
avg_time = bucket['avg_time']['value']
failures = bucket['failure_rate']['doc_count']
performance_report.append({
'decoder': decoder_name,
'total_processed': total_count,
'avg_parsing_time_ms': avg_time,
'failure_rate': failures / total_count if total_count > 0 else 0,
'recommendation': self.get_optimization_recommendation(
avg_time, failures / total_count if total_count > 0 else 0
)
})
return performance_report
Troubleshooting Common Issues
Decoder Debugging Techniques
class DecoderDebugger:
def __init__(self):
self.debug_levels = {
'basic': self.basic_debug,
'regex': self.regex_debug,
'performance': self.performance_debug,
'full': self.full_debug
}
def debug_parsing_failure(self, decoder_file, log_line, debug_level='basic'):
"""Debug why a log line fails to parse"""
debug_info = {
'decoder': decoder_file,
'log_line': log_line,
'analysis': {}
}
# Run appropriate debug level
debug_func = self.debug_levels.get(debug_level, self.basic_debug)
debug_info['analysis'] = debug_func(decoder_file, log_line)
return debug_info
def regex_debug(self, decoder_file, log_line):
"""Debug regex matching issues"""
# Load decoder
decoder = self.load_decoder(decoder_file)
# Test prematch
prematch_result = self.test_prematch(decoder['prematch'], log_line)
# Test main regex
regex_result = self.test_regex(decoder['regex'], log_line)
# Provide recommendations
recommendations = []
if not prematch_result['matched']:
recommendations.append({
'issue': 'Prematch failed',
'suggestion': f"Adjust prematch pattern: {self.suggest_prematch(log_line)}"
})
if not regex_result['matched']:
recommendations.append({
'issue': 'Regex failed',
'suggestion': 'Check regex pattern and escaping'
})
return {
'prematch_test': prematch_result,
'regex_test': regex_result,
'recommendations': recommendations
}
Advanced Decoder Patterns
Conditional Field Extraction
<!-- Conditional Decoder Based on Field Values -->
<decoder name="conditional_decoder">
<prematch>^EVENT:</prematch>
<regex>^EVENT:\s*(\w+)\s*(.+)$</regex>
<order>event_type, event_data</order>
</decoder>
<!-- Type-specific decoders -->
<decoder name="conditional_auth">
<parent>conditional_decoder</parent>
<field name="event_type">AUTH</field>
<regex offset="after_parent">^(\w+)\s+(\w+)\s+([^\s]+)\s+(\w+)$</regex>
<order>action, user, source_ip, result</order>
</decoder>
<decoder name="conditional_file">
<parent>conditional_decoder</parent>
<field name="event_type">FILE</field>
<regex offset="after_parent">^(\w+)\s+([^\s]+)\s+(\d+)\s+(\w+)$</regex>
<order>operation, file_path, size, user</order>
</decoder>
Conclusion
Custom decoders are the unsung heroes of effective log analysis, transforming raw data into structured intelligence. By mastering advanced parsing techniques, performance optimization, and dynamic decoder generation, organizations can handle any log format their complex environments throw at them. The key is not just parsing logs, but doing so efficiently, accurately, and at scale.
Next Steps
- Audit existing log sources for parsing gaps
- Develop custom decoders for proprietary applications
- Implement performance benchmarking
- Create comprehensive test suites
- Deploy monitoring for decoder performance
Remember: A well-crafted decoder is the difference between noise and insight. Invest in your parsing infrastructure—your SOC analysts will thank you.