Skip to content

Day 94 - Zero Trust Security in Multi-Cloud Environments

Published: at 03:30 AM

Day 94 - Zero Trust Security in Multi-Cloud Environments

Watch the video

As organizations embrace multi-cloud strategies, traditional perimeter-based security models become obsolete. Zero Trust architecture, with its “never trust, always verify” principle, provides the framework needed to secure modern distributed infrastructure. Today, we’ll implement Zero Trust security across AWS, Azure, and GCP.

Understanding Zero Trust in Multi-Cloud

Zero Trust eliminates the concept of trusted networks, devices, or users. In a multi-cloud environment, this means:

Core Components of Multi-Cloud Zero Trust

1. Identity as the New Perimeter

Identity becomes the primary security perimeter in Zero Trust architecture.

Unified Identity Management Across Clouds

# Multi-Cloud Identity Federation with Python
import boto3
from azure.identity import DefaultAzureCredential
from google.cloud import iam
from google.oauth2 import service_account
import jwt
import json
from datetime import datetime, timedelta

class MultiCloudIdentityManager:
    def __init__(self):
        # AWS
        self.aws_sts = boto3.client('sts')
        self.aws_iam = boto3.client('iam')

        # Azure
        self.azure_credential = DefaultAzureCredential()

        # GCP
        self.gcp_iam = iam.IAMClient()

    def create_federated_identity(self, user_email, roles):
        """Create federated identity across all clouds"""

        identity_config = {
            'user': user_email,
            'created_at': datetime.utcnow().isoformat(),
            'roles': roles,
            'clouds': {}
        }

        # AWS Identity
        aws_role = self._create_aws_federated_role(user_email, roles.get('aws', []))
        identity_config['clouds']['aws'] = {
            'role_arn': aws_role['Arn'],
            'trust_policy': aws_role['TrustPolicy']
        }

        # Azure Identity
        azure_principal = self._create_azure_service_principal(user_email, roles.get('azure', []))
        identity_config['clouds']['azure'] = {
            'principal_id': azure_principal['id'],
            'tenant_id': azure_principal['tenant_id']
        }

        # GCP Identity
        gcp_sa = self._create_gcp_service_account(user_email, roles.get('gcp', []))
        identity_config['clouds']['gcp'] = {
            'service_account': gcp_sa['email'],
            'project_id': gcp_sa['project_id']
        }

        return identity_config

    def _create_aws_federated_role(self, user_email, permissions):
        """Create AWS role with OIDC federation"""

        trust_policy = {
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {
                    "Federated": f"arn:aws:iam::{AWS_ACCOUNT_ID}:oidc-provider/{OIDC_PROVIDER}"
                },
                "Action": "sts:AssumeRoleWithWebIdentity",
                "Condition": {
                    "StringEquals": {
                        f"{OIDC_PROVIDER}:sub": user_email,
                        f"{OIDC_PROVIDER}:aud": "sts.amazonaws.com"
                    }
                }
            }]
        }

        # Create role
        role_name = f"federated-{user_email.replace('@', '-').replace('.', '-')}"

        try:
            response = self.aws_iam.create_role(
                RoleName=role_name,
                AssumeRolePolicyDocument=json.dumps(trust_policy),
                Description=f'Federated role for {user_email}',
                MaxSessionDuration=3600,  # 1 hour
                Tags=[
                    {'Key': 'Environment', 'Value': 'production'},
                    {'Key': 'ZeroTrust', 'Value': 'enabled'}
                ]
            )

            # Attach policies based on permissions
            for permission in permissions:
                self.aws_iam.attach_role_policy(
                    RoleName=role_name,
                    PolicyArn=self._get_aws_policy_arn(permission)
                )

            return {
                'Arn': response['Role']['Arn'],
                'TrustPolicy': trust_policy
            }

        except Exception as e:
            print(f"Error creating AWS role: {e}")
            raise

Conditional Access Policies

# Azure Conditional Access Policy
apiVersion: authorization.azure.com/v1
kind: ConditionalAccessPolicy
metadata:
  name: zero-trust-multi-cloud
spec:
  displayName: "Zero Trust Multi-Cloud Access"
  state: enabled
  conditions:
    users:
      includeUsers:
        - All
      excludeGroups:
        - emergency-access
    applications:
      includeApplications:
        - All
    locations:
      includeLocations:
        - All
      excludeLocations:
        - trusted-locations
    platforms:
      includePlatforms:
        - all
    signInRiskLevels:
      - high
      - medium
  grantControls:
    operator: AND
    builtInControls:
      - mfa
      - compliantDevice
      - approvedApplication
    customAuthenticationFactors: []
  sessionControls:
    signInFrequency:
      value: 1
      type: hours
    persistentBrowser:
      mode: never

2. Microsegmentation Across Clouds

Implement granular network segmentation that spans cloud boundaries.

# Terraform - Multi-Cloud Network Segmentation
module "zero_trust_network" {
  source = "./modules/zero-trust-network"

  # AWS VPC Configuration
  aws_vpc = {
    cidr_block = "10.0.0.0/16"
    enable_dns_hostnames = true
    enable_flow_logs = true

    # Microsegmentation with Security Groups
    security_groups = {
      web_tier = {
        description = "Web tier security group"
        ingress_rules = [
          {
            from_port   = 443
            to_port     = 443
            protocol    = "tcp"
            cidr_blocks = ["0.0.0.0/0"]  # Public HTTPS
          }
        ]
        egress_rules = [
          {
            from_port       = 3306
            to_port         = 3306
            protocol        = "tcp"
            security_groups = ["app_tier"]  # Only to app tier
          }
        ]
      }

      app_tier = {
        description = "Application tier security group"
        ingress_rules = [
          {
            from_port       = 3306
            to_port         = 3306
            protocol        = "tcp"
            security_groups = ["web_tier"]  # Only from web tier
          }
        ]
        egress_rules = [
          {
            from_port       = 5432
            to_port         = 5432
            protocol        = "tcp"
            security_groups = ["data_tier"]  # Only to data tier
          }
        ]
      }

      data_tier = {
        description = "Data tier security group"
        ingress_rules = [
          {
            from_port       = 5432
            to_port         = 5432
            protocol        = "tcp"
            security_groups = ["app_tier"]  # Only from app tier
          }
        ]
        egress_rules = []  # No outbound connections
      }
    }
  }

  # Azure Network Security Groups
  azure_network = {
    address_space = ["10.1.0.0/16"]

    subnets = {
      web = {
        address_prefix = "10.1.1.0/24"
        network_security_group = {
          security_rules = [
            {
              name                       = "HTTPS"
              priority                   = 100
              direction                  = "Inbound"
              access                     = "Allow"
              protocol                   = "Tcp"
              source_port_range          = "*"
              destination_port_range     = "443"
              source_address_prefix      = "*"
              destination_address_prefix = "*"
            }
          ]
        }
      }

      app = {
        address_prefix = "10.1.2.0/24"
        network_security_group = {
          security_rules = [
            {
              name                       = "FromWebTier"
              priority                   = 100
              direction                  = "Inbound"
              access                     = "Allow"
              protocol                   = "Tcp"
              source_port_range          = "*"
              destination_port_range     = "8080"
              source_address_prefix      = "10.1.1.0/24"
              destination_address_prefix = "*"
            }
          ]
        }
      }
    }
  }

  # GCP VPC with Firewall Rules
  gcp_network = {
    auto_create_subnetworks = false

    subnets = [
      {
        name          = "web-subnet"
        ip_cidr_range = "10.2.1.0/24"
        region        = "us-central1"
      },
      {
        name          = "app-subnet"
        ip_cidr_range = "10.2.2.0/24"
        region        = "us-central1"
      }
    ]

    firewall_rules = [
      {
        name        = "allow-web-to-app"
        direction   = "INGRESS"
        priority    = 1000

        source_ranges = ["10.2.1.0/24"]
        target_tags   = ["app-tier"]

        allow = [{
          protocol = "tcp"
          ports    = ["8080"]
        }]
      },
      {
        name        = "deny-all-else"
        direction   = "INGRESS"
        priority    = 65534

        source_ranges = ["0.0.0.0/0"]

        deny = [{
          protocol = "all"
        }]
      }
    ]
  }
}

3. Policy-as-Code with Open Policy Agent (OPA)

Implement consistent policy enforcement across all clouds.

# zero_trust_policies.rego
package zero_trust.authorization

import future.keywords.if
import future.keywords.contains

default allow = false

# Multi-Cloud Resource Access Policy
allow if {
    # Verify JWT token
    token_valid

    # Check user authentication
    input.user.authenticated == true
    input.user.mfa_verified == true

    # Verify device compliance
    input.device.compliant == true
    input.device.managed == true

    # Check network context
    network_trusted

    # Verify least privilege
    required_permission in user_permissions
}

# Token validation
token_valid if {
    [header, payload, _] := io.jwt.decode_verify(
        input.token,
        {"secret": data.jwt_secret}
    )

    # Check token expiration
    payload.exp > time.now_ns() / 1000000000

    # Verify issuer
    payload.iss == data.trusted_issuer
}

# Network trust evaluation
network_trusted if {
    # Check if request is from internal network
    net.cidr_contains("10.0.0.0/8", input.source_ip)
} else if {
    # Or if VPN connected
    input.network.vpn_connected == true
    input.network.vpn_compliance == true
} else if {
    # Or if using verified ZTNA
    input.network.ztna_verified == true
}

# Permission checking
user_permissions[permission] if {
    some role in input.user.roles
    some permission in data.rbac[role].permissions
}

required_permission := permission if {
    # Map action to permission
    action_map := {
        "read": "viewer",
        "write": "editor",
        "delete": "admin"
    }
    permission := action_map[input.action]
}

# Cloud-specific policies
aws_resource_allowed if {
    input.cloud == "aws"
    input.resource.type in data.aws_allowed_resources
    input.resource.tags.environment == input.user.allowed_environments[_]
}

azure_resource_allowed if {
    input.cloud == "azure"
    input.resource.type in data.azure_allowed_resources
    input.resource.resource_group in input.user.allowed_resource_groups
}

gcp_resource_allowed if {
    input.cloud == "gcp"
    input.resource.type in data.gcp_allowed_resources
    input.resource.project_id in input.user.allowed_projects
}

4. Service Mesh for Zero Trust Networking

Deploy Istio service mesh across multi-cloud environments.

# Istio Multi-Cloud Configuration
apiVersion: install.istio.io/v1alpha1
kind: IstioOperator
metadata:
  name: control-plane
spec:
  values:
    pilot:
      env:
        PILOT_ENABLE_WORKLOAD_ENTRY_AUTOREGISTRATION: true
        PILOT_ENABLE_CROSS_CLUSTER_WORKLOAD_ENTRY: true
    global:
      meshID: mesh1
      multiCluster:
        clusterName: aws-cluster
      network: network1

  components:
    pilot:
      k8s:
        env:
          - name: PILOT_TRACE_SAMPLING
            value: "100"
        resources:
          requests:
            cpu: 1000m
            memory: 1024Mi

    # Enable mTLS everywhere
    ingressGateways:
      - name: istio-ingressgateway
        enabled: true
        k8s:
          service:
            type: LoadBalancer
            ports:
              - port: 15021
                targetPort: 15021
                name: status-port
              - port: 443
                targetPort: 8443
                name: https
---
# PeerAuthentication for mTLS
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
  name: default
  namespace: istio-system
spec:
  mtls:
    mode: STRICT
---
# AuthorizationPolicy for Zero Trust
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: zero-trust-policy
  namespace: production
spec:
  selector:
    matchLabels:
      app: api-service
  action: ALLOW
  rules:
    - from:
        - source:
            principals: ["cluster.local/ns/production/sa/frontend"]
      to:
        - operation:
            methods: ["GET", "POST"]
            paths: ["/api/v1/*"]
      when:
        - key: request.headers[x-user-id]
          values: ["*"]
        - key: request.auth.claims[verified]
          values: ["true"]
        - key: source.ip
          notValues: ["0.0.0.0/0"]

5. Continuous Verification and Monitoring

Implement real-time verification of all access attempts.

# Real-time Zero Trust Monitoring System
import asyncio
from elasticsearch import AsyncElasticsearch
from kafka import KafkaProducer, KafkaConsumer
import json
from datetime import datetime
import numpy as np
from sklearn.ensemble import IsolationForest

class ZeroTrustMonitor:
    def __init__(self):
        # Elasticsearch for log aggregation
        self.es = AsyncElasticsearch(['http://localhost:9200'])

        # Kafka for real-time event streaming
        self.producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )

        # ML model for anomaly detection
        self.anomaly_detector = IsolationForest(
            contamination=0.1,
            random_state=42
        )

        # Risk scoring thresholds
        self.risk_thresholds = {
            'low': 0.3,
            'medium': 0.6,
            'high': 0.8,
            'critical': 0.95
        }

    async def monitor_access_attempt(self, access_event):
        """Monitor and score each access attempt"""

        # Calculate risk score
        risk_score = await self.calculate_risk_score(access_event)

        # Add risk score to event
        access_event['risk_score'] = risk_score
        access_event['risk_level'] = self.get_risk_level(risk_score)
        access_event['timestamp'] = datetime.utcnow().isoformat()

        # Store in Elasticsearch
        await self.es.index(
            index=f"zero-trust-{datetime.utcnow().strftime('%Y.%m.%d')}",
            body=access_event
        )

        # Stream to Kafka for real-time processing
        self.producer.send('zero-trust-events', access_event)

        # Take action based on risk
        if risk_score > self.risk_thresholds['high']:
            await self.handle_high_risk_event(access_event)

        return {
            'allowed': risk_score < self.risk_thresholds['medium'],
            'risk_score': risk_score,
            'risk_level': access_event['risk_level'],
            'additional_verification_required': risk_score > self.risk_thresholds['low']
        }

    async def calculate_risk_score(self, event):
        """Calculate risk score using multiple factors"""

        risk_factors = []

        # User behavior analysis
        user_risk = await self.analyze_user_behavior(event['user_id'])
        risk_factors.append(user_risk * 0.3)

        # Device trust score
        device_risk = self.calculate_device_risk(event['device'])
        risk_factors.append(device_risk * 0.2)

        # Network context
        network_risk = self.evaluate_network_risk(event['network'])
        risk_factors.append(network_risk * 0.2)

        # Resource sensitivity
        resource_risk = self.assess_resource_sensitivity(event['resource'])
        risk_factors.append(resource_risk * 0.2)

        # Time-based anomaly
        time_risk = self.detect_time_anomaly(event)
        risk_factors.append(time_risk * 0.1)

        # Combine risk factors
        total_risk = sum(risk_factors)

        # Apply ML anomaly detection
        anomaly_score = self.detect_anomaly(event)
        if anomaly_score == -1:  # Anomaly detected
            total_risk = min(total_risk * 1.5, 1.0)

        return total_risk

    async def analyze_user_behavior(self, user_id):
        """Analyze user behavior patterns"""

        # Query historical user behavior
        query = {
            "query": {
                "bool": {
                    "must": [
                        {"term": {"user_id": user_id}},
                        {"range": {"timestamp": {"gte": "now-30d"}}}
                    ]
                }
            },
            "aggs": {
                "login_times": {
                    "date_histogram": {
                        "field": "timestamp",
                        "calendar_interval": "hour"
                    }
                },
                "accessed_resources": {
                    "cardinality": {
                        "field": "resource.id"
                    }
                },
                "failed_attempts": {
                    "filter": {
                        "term": {"success": False}
                    }
                }
            }
        }

        result = await self.es.search(index="zero-trust-*", body=query)

        # Calculate behavior risk score
        failed_ratio = result['aggregations']['failed_attempts']['doc_count'] / max(result['hits']['total']['value'], 1)
        resource_diversity = result['aggregations']['accessed_resources']['value']

        # Higher risk for unusual patterns
        if failed_ratio > 0.1:
            return 0.8
        elif resource_diversity > 100:  # Accessing too many different resources
            return 0.6
        else:
            return 0.2

    def detect_anomaly(self, event):
        """Use ML to detect anomalous access patterns"""

        # Feature extraction
        features = [
            event.get('user_risk_score', 0),
            event.get('device_trust_score', 0),
            1 if event.get('network', {}).get('vpn_connected') else 0,
            len(event.get('resource', {}).get('tags', [])),
            event.get('request_size', 0),
            event.get('response_time', 0)
        ]

        # Predict anomaly
        prediction = self.anomaly_detector.predict([features])
        return prediction[0]

    async def handle_high_risk_event(self, event):
        """Handle high-risk access attempts"""

        # Send alert
        alert = {
            'severity': 'HIGH',
            'event': event,
            'timestamp': datetime.utcnow().isoformat(),
            'actions_required': [
                'Block access attempt',
                'Notify security team',
                'Initiate step-up authentication',
                'Log for forensic analysis'
            ]
        }

        # Send to security team
        self.producer.send('security-alerts', alert)

        # Block access
        await self.block_access(event['session_id'])

        # Trigger incident response
        if event['risk_score'] > self.risk_thresholds['critical']:
            await self.trigger_incident_response(event)

6. Automated Incident Response

Implement automated responses to security events.

# Automated Zero Trust Incident Response
class IncidentResponseOrchestrator:
    def __init__(self):
        self.response_playbooks = {
            'suspicious_login': self.handle_suspicious_login,
            'privilege_escalation': self.handle_privilege_escalation,
            'data_exfiltration': self.handle_data_exfiltration,
            'lateral_movement': self.handle_lateral_movement
        }

    async def respond_to_incident(self, incident):
        """Orchestrate incident response"""

        incident_type = self.classify_incident(incident)

        # Execute appropriate playbook
        if incident_type in self.response_playbooks:
            response = await self.response_playbooks[incident_type](incident)
        else:
            response = await self.handle_unknown_incident(incident)

        # Log response
        await self.log_incident_response(incident, response)

        return response

    async def handle_suspicious_login(self, incident):
        """Handle suspicious login attempts"""

        user_id = incident['user_id']
        session_id = incident['session_id']

        actions = []

        # 1. Terminate session
        await self.terminate_session(session_id)
        actions.append('Session terminated')

        # 2. Disable user account temporarily
        await self.disable_user_account(user_id, duration_minutes=30)
        actions.append('User account disabled for 30 minutes')

        # 3. Force password reset
        await self.force_password_reset(user_id)
        actions.append('Password reset required')

        # 4. Revoke all tokens
        await self.revoke_user_tokens(user_id)
        actions.append('All tokens revoked')

        # 5. Send notification
        await self.notify_user(user_id, {
            'type': 'security_alert',
            'message': 'Suspicious login detected. Your account has been temporarily locked.',
            'actions_required': ['Reset password', 'Verify identity']
        })

        return {
            'incident_id': incident['id'],
            'response_time': datetime.utcnow().isoformat(),
            'actions_taken': actions,
            'status': 'contained'
        }

    async def handle_lateral_movement(self, incident):
        """Handle detected lateral movement"""

        source_ip = incident['source_ip']
        compromised_resources = incident['accessed_resources']

        actions = []

        # 1. Isolate affected resources
        for resource in compromised_resources:
            await self.isolate_resource(resource)
            actions.append(f'Isolated resource: {resource["id"]}')

        # 2. Block source IP across all clouds
        await self.block_ip_multicloud(source_ip)
        actions.append(f'Blocked IP {source_ip} across all clouds')

        # 3. Snapshot for forensics
        for resource in compromised_resources:
            snapshot_id = await self.create_forensic_snapshot(resource)
            actions.append(f'Created forensic snapshot: {snapshot_id}')

        # 4. Deploy honeypot
        honeypot = await self.deploy_honeypot(incident['attack_pattern'])
        actions.append(f'Deployed honeypot: {honeypot["id"]}')

        # 5. Enhance monitoring
        await self.enhance_monitoring(compromised_resources)
        actions.append('Enhanced monitoring on affected resources')

        return {
            'incident_id': incident['id'],
            'response_time': datetime.utcnow().isoformat(),
            'actions_taken': actions,
            'status': 'investigating',
            'forensics_enabled': True
        }

7. Compliance and Audit

Maintain continuous compliance across all cloud environments.

# Cloud Custodian - Multi-Cloud Compliance Policies
policies:
  # AWS Zero Trust Compliance
  - name: aws-zero-trust-iam-mfa
    resource: aws.iam-user
    filters:
      - type: mfa-device
        value: empty
    actions:
      - type: remove-keys
        age: 0
      - type: notify
        template: zero-trust-violation
        subject: "Zero Trust Violation: MFA Not Enabled"

  - name: aws-zero-trust-sg-ingress
    resource: aws.security-group
    filters:
      - type: ingress
        Cidr:
          value: "0.0.0.0/0"
        OnlyPorts: false
    actions:
      - type: delete
      - type: notify
        template: zero-trust-violation

  # Azure Zero Trust Compliance
  - name: azure-zero-trust-network-watcher
    resource: azure.networkinterface
    filters:
      - type: network-flow-logs
        enabled: false
    actions:
      - type: set-flow-logs
        enabled: true

  - name: azure-zero-trust-storage-encryption
    resource: azure.storage
    filters:
      - not:
          - type: storage-encryption
            enabled: true
    actions:
      - type: set-encryption
        enabled: true

  # GCP Zero Trust Compliance
  - name: gcp-zero-trust-iam-audit
    resource: gcp.project
    filters:
      - type: iam-policy
        doc:
          bindings:
            - members: ["allUsers", "allAuthenticatedUsers"]
    actions:
      - type: set-iam-policy
        remove-bindings:
          - members: ["allUsers", "allAuthenticatedUsers"]

  - name: gcp-zero-trust-vpc-flow-logs
    resource: gcp.subnet
    filters:
      - type: flow-logs
        enabled: false
    actions:
      - type: set-flow-logs
        config:
          enable: true
          aggregationInterval: INTERVAL_5_SEC
          flowSampling: 1.0

Best Practices for Multi-Cloud Zero Trust

1. Start with Identity

2. Embrace Automation

3. Monitor Everything

4. Plan for Failure

Conclusion

Implementing Zero Trust in multi-cloud environments requires a fundamental shift in security thinking. By eliminating implicit trust, continuously verifying every transaction, and enforcing least privilege access, organizations can maintain security across distributed cloud infrastructure.

Key takeaways:

Zero Trust isn’t a product but a journey. Start with high-value assets, gradually expand coverage, and continuously improve based on lessons learned. In today’s threat landscape, Zero Trust isn’t optional—it’s essential for securing multi-cloud environments.