Skip to content

Security Correlation System Architecture: Database Design and Deployment Patterns

Published: at 06:00 PM

Security Correlation System Architecture: Database Design and Deployment Patterns

Modern security operations require sophisticated correlation systems that can process, analyze, and correlate vast amounts of security data in real-time. This guide provides a comprehensive overview of system architecture patterns, database design principles, and deployment strategies for building effective security correlation platforms.

Table of Contents

Open Table of Contents

System Architecture Overview

High-Level Architecture

The security correlation system follows a multi-tier architecture designed for scalability, reliability, and real-time processing:

graph TB
    subgraph "Data Sources"
        A1[Network Logs]
        A2[Endpoint Logs]
        A3[Application Logs]
        A4[Threat Intelligence]
        A5[Vulnerability Scanners]
    end

    subgraph "Ingestion Layer"
        B1[Log Collectors]
        B2[API Gateways]
        B3[Message Queues]
    end

    subgraph "Processing Layer"
        C1[Stream Processors]
        C2[Correlation Engine]
        C3[ML/AI Analytics]
        C4[Rule Engine]
    end

    subgraph "Storage Layer"
        D1[Time Series DB]
        D2[Graph Database]
        D3[Document Store]
        D4[Data Warehouse]
    end

    subgraph "Analysis Layer"
        E1[Real-time Analytics]
        E2[Historical Analysis]
        E3[Threat Hunting]
        E4[Incident Response]
    end

    subgraph "Presentation Layer"
        F1[Dashboards]
        F2[Alerting System]
        F3[Reports]
        F4[APIs]
    end

    A1 --> B1
    A2 --> B1
    A3 --> B2
    A4 --> B2
    A5 --> B3

    B1 --> C1
    B2 --> C1
    B3 --> C2

    C1 --> D1
    C2 --> D2
    C3 --> D3
    C4 --> D4

    D1 --> E1
    D2 --> E2
    D3 --> E3
    D4 --> E4

    E1 --> F1
    E2 --> F2
    E3 --> F3
    E4 --> F4

Component Interaction Flow

sequenceDiagram
    participant DS as Data Source
    participant IC as Ingestion Controller
    participant SP as Stream Processor
    participant CE as Correlation Engine
    participant DB as Database
    participant AL as Alert System
    participant UI as User Interface

    DS->>IC: Send raw logs
    IC->>SP: Forward for processing
    SP->>SP: Parse and normalize
    SP->>CE: Send processed events
    CE->>CE: Apply correlation rules
    CE->>DB: Store events and correlations
    CE->>AL: Generate alerts
    AL->>UI: Display alerts
    UI->>DB: Query historical data
    DB->>UI: Return analysis results

Database Schema Design

Core Entity Relationships

erDiagram
    EVENT ||--o{ EVENT_ATTRIBUTE : contains
    EVENT ||--o{ CORRELATION : participates_in
    EVENT }|--|| SOURCE : originates_from
    EVENT }|--|| EVENT_TYPE : classified_as

    CORRELATION ||--o{ CORRELATION_RULE : follows
    CORRELATION ||--o{ ALERT : generates

    ALERT }|--|| SEVERITY : has
    ALERT ||--o{ INCIDENT : escalates_to

    INCIDENT ||--o{ RESPONSE_ACTION : triggers
    INCIDENT }|--|| USER : assigned_to

    USER ||--o{ ROLE : has
    ROLE ||--o{ PERMISSION : grants

    THREAT_INTEL ||--o{ IOC : contains
    IOC ||--o{ EVENT : matches

    ASSET ||--o{ EVENT : generates
    ASSET }|--|| ASSET_TYPE : categorized_as

    EVENT {
        uuid event_id PK
        timestamp event_time
        string source_ip
        string dest_ip
        int source_port
        int dest_port
        string protocol
        string event_type
        text raw_data
        json normalized_data
        uuid source_id FK
        uuid event_type_id FK
        timestamp created_at
        timestamp updated_at
    }

    EVENT_ATTRIBUTE {
        uuid attribute_id PK
        uuid event_id FK
        string attribute_name
        string attribute_value
        string data_type
        timestamp created_at
    }

    SOURCE {
        uuid source_id PK
        string source_name
        string source_type
        string ip_address
        json configuration
        boolean is_active
        timestamp last_seen
        timestamp created_at
    }

    EVENT_TYPE {
        uuid event_type_id PK
        string type_name
        string category
        text description
        json schema
        int severity_default
        timestamp created_at
    }

    CORRELATION {
        uuid correlation_id PK
        string correlation_name
        uuid rule_id FK
        timestamp start_time
        timestamp end_time
        int event_count
        float confidence_score
        json metadata
        timestamp created_at
    }

    CORRELATION_RULE {
        uuid rule_id PK
        string rule_name
        text rule_definition
        string rule_type
        int time_window
        int threshold
        boolean is_active
        timestamp created_at
        timestamp updated_at
    }

    ALERT {
        uuid alert_id PK
        uuid correlation_id FK
        uuid severity_id FK
        string alert_title
        text alert_description
        string status
        json details
        timestamp triggered_at
        timestamp acknowledged_at
        uuid acknowledged_by FK
    }

    SEVERITY {
        uuid severity_id PK
        string severity_name
        int severity_level
        string color_code
        text description
    }

    INCIDENT {
        uuid incident_id PK
        string incident_number
        string title
        text description
        uuid assigned_to FK
        string status
        string priority
        timestamp created_at
        timestamp resolved_at
    }

    USER {
        uuid user_id PK
        string username
        string email
        string full_name
        boolean is_active
        timestamp last_login
        timestamp created_at
    }

    ROLE {
        uuid role_id PK
        string role_name
        text description
        timestamp created_at
    }

    PERMISSION {
        uuid permission_id PK
        string permission_name
        string resource
        string action
        text description
    }

    THREAT_INTEL {
        uuid intel_id PK
        string source_name
        string intel_type
        float confidence_score
        timestamp valid_from
        timestamp valid_until
        json metadata
        timestamp created_at
    }

    IOC {
        uuid ioc_id PK
        uuid intel_id FK
        string ioc_type
        string ioc_value
        string status
        float confidence_score
        timestamp first_seen
        timestamp last_seen
    }

    ASSET {
        uuid asset_id PK
        string asset_name
        uuid asset_type_id FK
        string ip_address
        string mac_address
        string hostname
        string operating_system
        json metadata
        boolean is_critical
        timestamp created_at
        timestamp updated_at
    }

    ASSET_TYPE {
        uuid asset_type_id PK
        string type_name
        text description
        json attributes_schema
        timestamp created_at
    }

    RESPONSE_ACTION {
        uuid action_id PK
        uuid incident_id FK
        string action_type
        text description
        string status
        uuid performed_by FK
        timestamp performed_at
        json results
    }

Database Implementation SQL

-- Events table with partitioning for performance
CREATE TABLE events (
    event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    event_time TIMESTAMPTZ NOT NULL,
    source_ip INET,
    dest_ip INET,
    source_port INTEGER,
    dest_port INTEGER,
    protocol VARCHAR(20),
    event_type VARCHAR(100) NOT NULL,
    raw_data TEXT,
    normalized_data JSONB,
    source_id UUID REFERENCES sources(source_id),
    event_type_id UUID REFERENCES event_types(event_type_id),
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (event_time);

-- Create monthly partitions
CREATE TABLE events_2024_01 PARTITION OF events
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

-- Indexes for performance
CREATE INDEX idx_events_time ON events (event_time);
CREATE INDEX idx_events_source_ip ON events USING HASH (source_ip);
CREATE INDEX idx_events_dest_ip ON events USING HASH (dest_ip);
CREATE INDEX idx_events_type ON events (event_type);
CREATE INDEX idx_events_normalized_data ON events USING GIN (normalized_data);

-- Correlation rules table
CREATE TABLE correlation_rules (
    rule_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    rule_name VARCHAR(255) NOT NULL UNIQUE,
    rule_definition TEXT NOT NULL,
    rule_type VARCHAR(50) NOT NULL,
    time_window INTEGER NOT NULL, -- in seconds
    threshold INTEGER DEFAULT 1,
    is_active BOOLEAN DEFAULT true,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Alerts table with status tracking
CREATE TABLE alerts (
    alert_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    correlation_id UUID REFERENCES correlations(correlation_id),
    severity_id UUID REFERENCES severities(severity_id),
    alert_title VARCHAR(255) NOT NULL,
    alert_description TEXT,
    status VARCHAR(50) DEFAULT 'open',
    details JSONB,
    triggered_at TIMESTAMPTZ DEFAULT NOW(),
    acknowledged_at TIMESTAMPTZ,
    acknowledged_by UUID REFERENCES users(user_id)
);

-- Function to update updated_at timestamp
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ language 'plpgsql';

-- Trigger for automatic timestamp updates
CREATE TRIGGER update_events_updated_at BEFORE UPDATE ON events
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

Deployment Architecture

Cloud-Native Deployment

graph TB
    subgraph "Load Balancer"
        LB[Application Load Balancer]
    end

    subgraph "API Gateway Cluster"
        AG1[API Gateway 1]
        AG2[API Gateway 2]
        AG3[API Gateway 3]
    end

    subgraph "Application Tier"
        subgraph "Correlation Service"
            CS1[Correlation Service 1]
            CS2[Correlation Service 2]
            CS3[Correlation Service 3]
        end

        subgraph "Analytics Service"
            AS1[Analytics Service 1]
            AS2[Analytics Service 2]
        end

        subgraph "Alert Service"
            ALS1[Alert Service 1]
            ALS2[Alert Service 2]
        end
    end

    subgraph "Message Queue Cluster"
        MQ1[Message Queue 1]
        MQ2[Message Queue 2]
        MQ3[Message Queue 3]
    end

    subgraph "Database Cluster"
        subgraph "Primary DB"
            DB1[PostgreSQL Primary]
        end

        subgraph "Read Replicas"
            DB2[PostgreSQL Replica 1]
            DB3[PostgreSQL Replica 2]
        end

        subgraph "Time Series DB"
            TS1[InfluxDB Cluster]
        end

        subgraph "Search Engine"
            ES1[Elasticsearch Cluster]
        end
    end

    subgraph "Caching Layer"
        RC1[Redis Cluster 1]
        RC2[Redis Cluster 2]
    end

    subgraph "Monitoring"
        MON1[Prometheus]
        MON2[Grafana]
        MON3[Alert Manager]
    end

    LB --> AG1
    LB --> AG2
    LB --> AG3

    AG1 --> CS1
    AG2 --> CS2
    AG3 --> CS3

    CS1 --> MQ1
    CS2 --> MQ2
    CS3 --> MQ3

    CS1 --> RC1
    AS1 --> RC2

    CS1 --> DB1
    AS1 --> DB2
    ALS1 --> DB3

    CS1 --> TS1
    AS1 --> ES1

    CS1 --> MON1
    AS1 --> MON1
    ALS1 --> MON1

Container Orchestration

# docker-compose.yml for development environment
version: "3.8"

services:
  # Load Balancer
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - correlation-api
      - analytics-api

  # Correlation Service
  correlation-api:
    build:
      context: ./correlation-service
      dockerfile: Dockerfile
    ports:
      - "8001:8000"
    environment:
      - DATABASE_URL=postgresql://user:password@postgres:5432/correlation_db
      - REDIS_URL=redis://redis:6379/0
      - MESSAGE_QUEUE_URL=amqp://rabbitmq:5672
    depends_on:
      - postgres
      - redis
      - rabbitmq
    deploy:
      replicas: 3
      resources:
        limits:
          memory: 1G
          cpus: "0.5"

  # Analytics Service
  analytics-api:
    build:
      context: ./analytics-service
      dockerfile: Dockerfile
    ports:
      - "8002:8000"
    environment:
      - DATABASE_URL=postgresql://user:password@postgres:5432/correlation_db
      - ELASTICSEARCH_URL=http://elasticsearch:9200
    depends_on:
      - postgres
      - elasticsearch

  # Database
  postgres:
    image: postgres:15-alpine
    environment:
      - POSTGRES_DB=correlation_db
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    ports:
      - "5432:5432"

  # Redis Cache
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  # Message Queue
  rabbitmq:
    image: rabbitmq:3-management-alpine
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=password
    ports:
      - "5672:5672"
      - "15672:15672"
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq

  # Elasticsearch
  elasticsearch:
    image: elasticsearch:8.11.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
    ports:
      - "9200:9200"
    volumes:
      - elasticsearch_data:/usr/share/elasticsearch/data

  # InfluxDB for time series data
  influxdb:
    image: influxdb:2.7-alpine
    environment:
      - INFLUXDB_DB=metrics
      - INFLUXDB_ADMIN_USER=admin
      - INFLUXDB_ADMIN_PASSWORD=password
    ports:
      - "8086:8086"
    volumes:
      - influxdb_data:/var/lib/influxdb2

  # Monitoring
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana_data:/var/lib/grafana

volumes:
  postgres_data:
  redis_data:
  rabbitmq_data:
  elasticsearch_data:
  influxdb_data:
  prometheus_data:
  grafana_data:

Kubernetes Deployment

# correlation-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: correlation-service
  labels:
    app: correlation-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: correlation-service
  template:
    metadata:
      labels:
        app: correlation-service
    spec:
      containers:
        - name: correlation-service
          image: correlation-service:latest
          ports:
            - containerPort: 8000
          env:
            - name: DATABASE_URL
              valueFrom:
                secretKeyRef:
                  name: db-secret
                  key: database-url
            - name: REDIS_URL
              value: "redis://redis-service:6379/0"
          resources:
            requests:
              memory: "512Mi"
              cpu: "250m"
            limits:
              memory: "1Gi"
              cpu: "500m"
          livenessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 30
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /ready
              port: 8000
            initialDelaySeconds: 5
            periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: correlation-service
spec:
  selector:
    app: correlation-service
  ports:
    - port: 80
      targetPort: 8000
  type: ClusterIP

---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: correlation-ingress
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /
spec:
  rules:
    - host: correlation.example.com
      http:
        paths:
          - path: /
            pathType: Prefix
            backend:
              service:
                name: correlation-service
                port:
                  number: 80

Use Case Implementation

Real-Time Threat Detection

graph LR
    subgraph "Data Ingestion"
        A1[Firewall Logs]
        A2[IDS/IPS Alerts]
        A3[Endpoint Logs]
        A4[DNS Logs]
    end

    subgraph "Correlation Rules"
        B1[Brute Force Detection]
        B2[Lateral Movement]
        B3[Data Exfiltration]
        B4[Malware Communication]
    end

    subgraph "Analysis Engine"
        C1[Pattern Matching]
        C2[Statistical Analysis]
        C3[Machine Learning]
        C4[Threat Intelligence]
    end

    subgraph "Response Actions"
        D1[Alert Generation]
        D2[Incident Creation]
        D3[Automated Response]
        D4[Threat Hunting]
    end

    A1 --> B1
    A2 --> B2
    A3 --> B3
    A4 --> B4

    B1 --> C1
    B2 --> C2
    B3 --> C3
    B4 --> C4

    C1 --> D1
    C2 --> D2
    C3 --> D3
    C4 --> D4

User Behavior Analytics

stateDiagram-v2
    [*] --> Normal_Behavior
    Normal_Behavior --> Anomaly_Detected: Threshold Exceeded
    Anomaly_Detected --> Risk_Assessment: Analyze Context
    Risk_Assessment --> Low_Risk: Score < 30
    Risk_Assessment --> Medium_Risk: Score 30-70
    Risk_Assessment --> High_Risk: Score > 70

    Low_Risk --> Normal_Behavior: Log Event
    Medium_Risk --> Alert_Generated: Notify SOC
    High_Risk --> Incident_Created: Escalate

    Alert_Generated --> Investigation: Analyst Review
    Incident_Created --> Response_Initiated: Immediate Action

    Investigation --> False_Positive: No Threat
    Investigation --> Confirmed_Threat: Threat Validated

    False_Positive --> Normal_Behavior: Update Baseline
    Confirmed_Threat --> Response_Initiated: Take Action

    Response_Initiated --> [*]: Case Closed

Performance Optimization

Database Optimization

-- Partitioning strategy for events table
CREATE TABLE events_y2024m01 PARTITION OF events
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

-- Indexes for common query patterns
CREATE INDEX CONCURRENTLY idx_events_source_ip_time
    ON events (source_ip, event_time DESC);

CREATE INDEX CONCURRENTLY idx_events_dest_ip_time
    ON events (dest_ip, event_time DESC);

-- Materialized views for aggregated data
CREATE MATERIALIZED VIEW hourly_event_summary AS
SELECT
    date_trunc('hour', event_time) as hour,
    event_type,
    COUNT(*) as event_count,
    COUNT(DISTINCT source_ip) as unique_sources
FROM events
WHERE event_time >= NOW() - INTERVAL '7 days'
GROUP BY date_trunc('hour', event_time), event_type;

-- Refresh materialized view periodically
CREATE OR REPLACE FUNCTION refresh_hourly_summary()
RETURNS void AS $$
BEGIN
    REFRESH MATERIALIZED VIEW CONCURRENTLY hourly_event_summary;
END;
$$ LANGUAGE plpgsql;

-- Schedule refresh every hour
SELECT cron.schedule('refresh-hourly-summary', '0 * * * *',
                     'SELECT refresh_hourly_summary();');

Caching Strategy

# Redis caching for correlation rules
import redis
import json
from datetime import timedelta

class CorrelationRuleCache:
    def __init__(self, redis_url):
        self.redis_client = redis.from_url(redis_url)
        self.cache_ttl = timedelta(hours=1)

    def get_active_rules(self):
        """Get active correlation rules from cache"""
        cached_rules = self.redis_client.get("active_correlation_rules")
        if cached_rules:
            return json.loads(cached_rules)

        # Fetch from database if not in cache
        rules = self._fetch_rules_from_db()
        self.redis_client.setex(
            "active_correlation_rules",
            self.cache_ttl,
            json.dumps(rules)
        )
        return rules

    def invalidate_cache(self):
        """Invalidate correlation rules cache"""
        self.redis_client.delete("active_correlation_rules")

    def _fetch_rules_from_db(self):
        # Database query to fetch active rules
        pass

Stream Processing Optimization

# Apache Kafka consumer for real-time event processing
from kafka import KafkaConsumer
import json
import concurrent.futures
from typing import List, Dict

class EventStreamProcessor:
    def __init__(self, kafka_brokers: List[str], topic: str):
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=kafka_brokers,
            value_deserializer=lambda x: json.loads(x.decode('utf-8')),
            group_id='correlation-processor',
            auto_offset_reset='latest',
            max_poll_records=1000,
            session_timeout_ms=30000,
            heartbeat_interval_ms=10000
        )
        self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)

    def process_events(self):
        """Process events from Kafka stream"""
        batch = []
        batch_size = 100

        for message in self.consumer:
            batch.append(message.value)

            if len(batch) >= batch_size:
                # Process batch asynchronously
                self.thread_pool.submit(self._process_batch, batch.copy())
                batch.clear()

    def _process_batch(self, events: List[Dict]):
        """Process a batch of events"""
        # Normalize events
        normalized_events = [self._normalize_event(event) for event in events]

        # Store in database
        self._store_events(normalized_events)

        # Apply correlation rules
        self._apply_correlation_rules(normalized_events)

    def _normalize_event(self, event: Dict) -> Dict:
        """Normalize event data"""
        # Event normalization logic
        pass

    def _store_events(self, events: List[Dict]):
        """Bulk insert events to database"""
        # Bulk database insertion
        pass

    def _apply_correlation_rules(self, events: List[Dict]):
        """Apply correlation rules to events"""
        # Correlation logic
        pass

Security Considerations

Data Protection

# Encryption for sensitive data
from cryptography.fernet import Fernet
import base64
import os

class DataEncryption:
    def __init__(self):
        # Generate or load encryption key
        self.key = self._get_or_create_key()
        self.cipher_suite = Fernet(self.key)

    def encrypt_sensitive_data(self, data: str) -> str:
        """Encrypt sensitive data before storing"""
        encrypted_data = self.cipher_suite.encrypt(data.encode())
        return base64.b64encode(encrypted_data).decode()

    def decrypt_sensitive_data(self, encrypted_data: str) -> str:
        """Decrypt sensitive data when retrieving"""
        encrypted_bytes = base64.b64decode(encrypted_data.encode())
        decrypted_data = self.cipher_suite.decrypt(encrypted_bytes)
        return decrypted_data.decode()

    def _get_or_create_key(self) -> bytes:
        """Get encryption key from environment or generate new one"""
        key_env = os.environ.get('ENCRYPTION_KEY')
        if key_env:
            return base64.b64decode(key_env.encode())
        else:
            # Generate new key (store securely)
            return Fernet.generate_key()

Access Control

# Role-based access control
from functools import wraps
from flask import request, jsonify, g
import jwt

def require_permission(permission: str):
    """Decorator to check user permissions"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            token = request.headers.get('Authorization', '').replace('Bearer ', '')

            try:
                payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
                user_permissions = payload.get('permissions', [])

                if permission not in user_permissions:
                    return jsonify({'error': 'Insufficient permissions'}), 403

                g.current_user = payload
                return f(*args, **kwargs)

            except jwt.InvalidTokenError:
                return jsonify({'error': 'Invalid token'}), 401

        return decorated_function
    return decorator

# Usage example
@app.route('/api/correlations', methods=['GET'])
@require_permission('view_correlations')
def get_correlations():
    """Get correlation data with permission check"""
    return jsonify(correlations)

Audit Logging

# Comprehensive audit logging
import logging
import json
from datetime import datetime
from flask import request, g

class AuditLogger:
    def __init__(self):
        self.logger = logging.getLogger('audit')
        handler = logging.FileHandler('/var/log/security-correlation/audit.log')
        formatter = logging.Formatter('%(asctime)s - %(message)s')
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def log_access(self, resource: str, action: str, result: str):
        """Log access attempts"""
        audit_record = {
            'timestamp': datetime.utcnow().isoformat(),
            'user_id': getattr(g, 'current_user', {}).get('user_id'),
            'username': getattr(g, 'current_user', {}).get('username'),
            'resource': resource,
            'action': action,
            'result': result,
            'ip_address': request.remote_addr,
            'user_agent': request.user_agent.string
        }

        self.logger.info(json.dumps(audit_record))

    def log_data_access(self, table: str, operation: str, record_count: int):
        """Log database access"""
        audit_record = {
            'timestamp': datetime.utcnow().isoformat(),
            'user_id': getattr(g, 'current_user', {}).get('user_id'),
            'operation_type': 'database_access',
            'table': table,
            'operation': operation,
            'record_count': record_count
        }

        self.logger.info(json.dumps(audit_record))

Conclusion

Building an effective security correlation system requires careful consideration of architecture, database design, deployment strategies, and security measures. The patterns and implementations provided in this guide offer a solid foundation for creating scalable, secure, and efficient correlation platforms.

Key takeaways:

By following these architectural patterns and best practices, organizations can build robust security correlation systems that effectively detect, analyze, and respond to security threats in real-time.