OpenSearch Multi-Tenancy Setup
Multi-tenancy in OpenSearch enables organizations to serve multiple customers or teams from a single cluster while maintaining strict data isolation and security boundaries. This guide provides a complete implementation for production-ready multi-tenant OpenSearch deployments.
Multi-Tenancy Architecture Overview
OpenSearch supports multiple approaches to multi-tenancy, each with different trade-offs in terms of isolation, performance, and resource utilization:
graph TB    subgraph "Multi-Tenancy Models"        subgraph "Index-per-Tenant"            T1_Index[Tenant1 Index]            T2_Index[Tenant2 Index]            T3_Index[Tenant3 Index]        end
        subgraph "Alias-per-Tenant"            Shared_Index[Shared Index]            T1_Alias[Tenant1 Alias]            T2_Alias[Tenant2 Alias]            T3_Alias[Tenant3 Alias]        end
        subgraph "Cluster-per-Tenant"            T1_Cluster[Tenant1 Cluster]            T2_Cluster[Tenant2 Cluster]            T3_Cluster[Tenant3 Cluster]        end    end
    subgraph "Security Layer"        Auth[Authentication]        RBAC[Role-Based Access]        FLS[Field Level Security]        DLS[Document Level Security]    end
    subgraph "Resource Management"        ISM[Index State Management]        Quotas[Resource Quotas]        Monitoring[Tenant Monitoring]    end
    T1_Alias --> Shared_Index    T2_Alias --> Shared_Index    T3_Alias --> Shared_Index
    Auth --> RBAC    RBAC --> FLS    RBAC --> DLS
    style T1_Index fill:#f96,stroke:#333,stroke-width:2px    style RBAC fill:#9f9,stroke:#333,stroke-width:2pxIndex-per-Tenant Implementation
The index-per-tenant model provides the strongest isolation between tenants:
Index Template Configuration
PUT _index_template/tenant-template{  "index_patterns": ["tenant-*"],  "priority": 100,  "template": {    "settings": {      "number_of_shards": 3,      "number_of_replicas": 1,      "index.refresh_interval": "5s",      "index.max_result_window": 10000,      "index.codec": "best_compression",      "index.routing.allocation.total_shards_per_node": 2    },    "mappings": {      "dynamic": "strict",      "properties": {        "tenant_id": {          "type": "keyword",          "index": false        },        "created_at": {          "type": "date",          "format": "epoch_millis"        },        "updated_at": {          "type": "date",          "format": "epoch_millis"        },        "data": {          "type": "object",          "enabled": true        }      }    },    "aliases": {}  },  "composed_of": ["component-template-security", "component-template-lifecycle"]}Tenant Provisioning Script
#!/usr/bin/env python3import osimport jsonimport hashlibimport requestsfrom datetime import datetimefrom opensearchpy import OpenSearch
class TenantProvisioner:    def __init__(self, host='localhost', port=9200, auth=('admin', 'admin')):        self.client = OpenSearch(            hosts=[{'host': host, 'port': port}],            http_auth=auth,            use_ssl=True,            verify_certs=False,            ssl_show_warn=False        )        self.security_api_url = f"https://{host}:{port}/_plugins/_security/api"        self.auth = auth
    def create_tenant(self, tenant_id, tenant_name, admin_user):        """Create a new tenant with all necessary configurations"""        print(f"Creating tenant: {tenant_id}")
        # Create tenant index        index_name = f"tenant-{tenant_id}"        self._create_index(index_name, tenant_id)
        # Create tenant roles        self._create_tenant_roles(tenant_id)
        # Create tenant admin user        self._create_tenant_admin(tenant_id, admin_user)
        # Create index lifecycle policy        self._create_lifecycle_policy(tenant_id)
        # Create monitoring dashboards        self._create_monitoring_resources(tenant_id)
        print(f"Tenant {tenant_id} created successfully")
    def _create_index(self, index_name, tenant_id):        """Create tenant-specific index"""        body = {            "settings": {                "index.blocks.read_only_allow_delete": False,                "analysis": {                    "analyzer": {                        "tenant_analyzer": {                            "type": "custom",                            "tokenizer": "standard",                            "filter": ["lowercase", "stop", "snowball"]                        }                    }                }            },            "aliases": {                f"{tenant_id}-current": {},                f"{tenant_id}-search": {                    "filter": {                        "term": {"tenant_id": tenant_id}                    }                }            }        }
        self.client.indices.create(index=index_name, body=body)
    def _create_tenant_roles(self, tenant_id):        """Create roles for tenant access control"""
        # Tenant admin role        admin_role = {            "cluster_permissions": [                "cluster_monitor",                "indices_monitor"            ],            "index_permissions": [{                "index_patterns": [f"tenant-{tenant_id}*"],                "allowed_actions": [                    "crud",                    "create_index",                    "manage",                    "manage_aliases",                    "delete",                    "index",                    "read",                    "write",                    "search"                ]            }],            "tenant_permissions": [{                "tenant_patterns": [tenant_id],                "allowed_actions": ["kibana_all_write"]            }]        }
        # Tenant read-only role        readonly_role = {            "cluster_permissions": ["cluster_monitor"],            "index_permissions": [{                "index_patterns": [f"tenant-{tenant_id}*"],                "allowed_actions": ["read", "search"]            }],            "tenant_permissions": [{                "tenant_patterns": [tenant_id],                "allowed_actions": ["kibana_all_read"]            }]        }
        # Create roles via Security API        self._security_api_put(f"roles/{tenant_id}_admin", admin_role)        self._security_api_put(f"roles/{tenant_id}_readonly", readonly_role)
    def _create_tenant_admin(self, tenant_id, admin_user):        """Create tenant administrator user"""        user_data = {            "password": self._generate_password(tenant_id),            "opendistro_security_roles": [f"{tenant_id}_admin"],            "backend_roles": [f"tenant_{tenant_id}"],            "attributes": {                "tenant_id": tenant_id,                "created_at": datetime.now().isoformat()            }        }
        self._security_api_put(f"internalusers/{admin_user}", user_data)
    def _create_lifecycle_policy(self, tenant_id):        """Create index lifecycle management policy"""        policy = {            "policy": {                "description": f"Lifecycle policy for tenant {tenant_id}",                "default_state": "hot",                "states": [                    {                        "name": "hot",                        "actions": [                            {                                "rollover": {                                    "min_index_age": "7d",                                    "min_size": "50gb"                                }                            }                        ],                        "transitions": [                            {                                "state_name": "warm",                                "conditions": {                                    "min_index_age": "30d"                                }                            }                        ]                    },                    {                        "name": "warm",                        "actions": [                            {                                "replica_count": {                                    "number_of_replicas": 1                                }                            },                            {                                "shrink": {                                    "number_of_shards": 1                                }                            }                        ],                        "transitions": [                            {                                "state_name": "delete",                                "conditions": {                                    "min_index_age": "90d"                                }                            }                        ]                    },                    {                        "name": "delete",                        "actions": [                            {                                "delete": {}                            }                        ]                    }                ]            }        }
        response = self.client.transport.perform_request(            'PUT',            f'/_plugins/_ism/policies/{tenant_id}_lifecycle',            body=policy        )
    def _create_monitoring_resources(self, tenant_id):        """Create monitoring dashboards and visualizations"""        # Create index pattern        pattern_id = f"{tenant_id}-pattern"        pattern_body = {            "title": f"tenant-{tenant_id}*",            "timeFieldName": "created_at"        }
        # This would integrate with OpenSearch Dashboards API        # Implementation depends on Dashboards version and configuration
    def _generate_password(self, tenant_id):        """Generate secure password for tenant"""        # In production, use a secure password generator        return hashlib.sha256(f"{tenant_id}-{datetime.now()}".encode()).hexdigest()[:16]
    def _security_api_put(self, endpoint, data):        """Make PUT request to Security API"""        response = requests.put(            f"{self.security_api_url}/{endpoint}",            json=data,            auth=self.auth,            verify=False        )        response.raise_for_status()        return response.json()
# Usage exampleif __name__ == "__main__":    provisioner = TenantProvisioner()    provisioner.create_tenant("acme-corp", "ACME Corporation", "acme_admin")Document-Level Security Implementation
For shared index multi-tenancy, document-level security (DLS) provides data isolation:
graph LR    subgraph "Shared Index"        Doc1[Document 1<br/>tenant: A]        Doc2[Document 2<br/>tenant: B]        Doc3[Document 3<br/>tenant: A]        Doc4[Document 4<br/>tenant: C]    end
    subgraph "DLS Filters"        FilterA[Tenant A Filter<br/>tenant_id: A]        FilterB[Tenant B Filter<br/>tenant_id: B]        FilterC[Tenant C Filter<br/>tenant_id: C]    end
    subgraph "User Views"        UserA[User A<br/>Sees: Doc1, Doc3]        UserB[User B<br/>Sees: Doc2]        UserC[User C<br/>Sees: Doc4]    end
    FilterA --> Doc1    FilterA --> Doc3    FilterB --> Doc2    FilterC --> Doc4
    Doc1 --> UserA    Doc3 --> UserA    Doc2 --> UserB    Doc4 --> UserC
    style FilterA fill:#f96,stroke:#333,stroke-width:2px    style UserA fill:#9f9,stroke:#333,stroke-width:2pxDLS Role Configuration
PUT _plugins/_security/api/roles/tenant_dls_role{  "cluster_permissions": ["cluster_monitor"],  "index_permissions": [{    "index_patterns": ["shared-data-*"],    "dls": "{\"term\": {\"tenant_id\": \"${attr.internal.tenant_id}\"}}",    "allowed_actions": [      "read",      "write",      "delete",      "search",      "index"    ]  }]}Field-Level Security Configuration
PUT _plugins/_security/api/roles/tenant_fls_role{  "cluster_permissions": ["cluster_monitor"],  "index_permissions": [{    "index_patterns": ["shared-data-*"],    "dls": "{\"term\": {\"tenant_id\": \"${attr.internal.tenant_id}\"}}",    "fls": [      "~internal_*",      "~system_*",      "~admin_notes"    ],    "allowed_actions": ["read", "search"]  }]}Kibana/OpenSearch Dashboards Multi-Tenancy
Tenant Space Configuration
opensearch.username: "kibanaserver"opensearch.password: "kibanaserver"opensearch.requestHeadersWhitelist: ["securitytenant", "Authorization"]
opensearch_security.multitenancy.enabled: trueopensearch_security.multitenancy.tenants.enable_global: trueopensearch_security.multitenancy.tenants.enable_private: trueopensearch_security.multitenancy.tenants.preferred: ["Private", "Global"]opensearch_security.readonly_mode.roles: ["readonly_role"]
# Tenant brandingopensearch_security.multitenancy.custom_branding:  tenant_a:    logo: "/assets/tenant_a_logo.svg"    favicon: "/assets/tenant_a_favicon.ico"    title: "Tenant A Analytics"  tenant_b:    logo: "/assets/tenant_b_logo.svg"    favicon: "/assets/tenant_b_favicon.ico"    title: "Tenant B Dashboard"Automated Tenant Dashboard Setup
#!/usr/bin/env python3import requestsimport jsonfrom typing import Dict, List
class DashboardManager:    def __init__(self, dashboards_url: str, auth: tuple):        self.url = dashboards_url        self.auth = auth        self.headers = {            'Content-Type': 'application/json',            'osd-xsrf': 'true'        }
    def create_tenant_space(self, tenant_id: str):        """Create a new tenant space in OpenSearch Dashboards"""
        # Create saved objects for tenant        self._create_index_pattern(tenant_id)        self._create_default_dashboard(tenant_id)        self._create_default_visualizations(tenant_id)
    def _create_index_pattern(self, tenant_id: str):        """Create index pattern for tenant"""        pattern = {            "attributes": {                "title": f"tenant-{tenant_id}*",                "timeFieldName": "created_at",                "fields": "[]",                "fieldFormatMap": "{}"            }        }
        response = requests.post(            f"{self.url}/api/saved_objects/index-pattern",            headers={**self.headers, 'securitytenant': tenant_id},            json=pattern,            auth=self.auth,            verify=False        )
        return response.json()
    def _create_default_dashboard(self, tenant_id: str):        """Create default dashboard for tenant"""        dashboard = {            "attributes": {                "title": f"{tenant_id} Overview Dashboard",                "hits": 0,                "description": f"Main dashboard for {tenant_id}",                "panelsJSON": json.dumps([                    {                        "id": "1",                        "type": "visualization",                        "gridData": {                            "x": 0,                            "y": 0,                            "w": 24,                            "h": 15                        }                    },                    {                        "id": "2",                        "type": "visualization",                        "gridData": {                            "x": 24,                            "y": 0,                            "w": 24,                            "h": 15                        }                    }                ]),                "optionsJSON": json.dumps({                    "darkTheme": False,                    "hidePanelTitles": False,                    "useMargins": True                }),                "version": 1,                "timeRestore": True,                "timeTo": "now",                "timeFrom": "now-7d",                "refreshInterval": {                    "pause": True,                    "value": 0                },                "kibanaSavedObjectMeta": {                    "searchSourceJSON": json.dumps({                        "query": {                            "language": "kuery",                            "query": ""                        },                        "filter": []                    })                }            }        }
        response = requests.post(            f"{self.url}/api/saved_objects/dashboard",            headers={**self.headers, 'securitytenant': tenant_id},            json=dashboard,            auth=self.auth,            verify=False        )
        return response.json()
    def _create_default_visualizations(self, tenant_id: str):        """Create default visualizations for tenant"""
        # Document count over time        time_series_viz = {            "attributes": {                "title": f"{tenant_id} - Documents Over Time",                "visState": json.dumps({                    "title": f"{tenant_id} - Documents Over Time",                    "type": "line",                    "aggs": [                        {                            "id": "1",                            "enabled": True,                            "type": "count",                            "params": {},                            "schema": "metric"                        },                        {                            "id": "2",                            "enabled": True,                            "type": "date_histogram",                            "params": {                                "field": "created_at",                                "interval": "auto",                                "min_doc_count": 0,                                "extended_bounds": {}                            },                            "schema": "segment"                        }                    ]                }),                "uiStateJSON": "{}",                "kibanaSavedObjectMeta": {                    "searchSourceJSON": json.dumps({                        "index": f"tenant-{tenant_id}*",                        "query": {                            "match_all": {}                        }                    })                }            }        }
        response = requests.post(            f"{self.url}/api/saved_objects/visualization",            headers={**self.headers, 'securitytenant': tenant_id},            json=time_series_viz,            auth=self.auth,            verify=False        )
        return response.json()Resource Isolation and Quotas
Resource Allocation Strategy
graph TB    subgraph "Cluster Resources"        CPU[CPU Cores: 32]        Memory[Memory: 128GB]        Storage[Storage: 2TB]    end
    subgraph "Resource Pools"        subgraph "Premium Tenants"            PT_CPU[CPU: 16 cores]            PT_Mem[Memory: 64GB]            PT_Storage[Storage: 1TB]        end
        subgraph "Standard Tenants"            ST_CPU[CPU: 12 cores]            ST_Mem[Memory: 48GB]            ST_Storage[Storage: 750GB]        end
        subgraph "Basic Tenants"            BT_CPU[CPU: 4 cores]            BT_Mem[Memory: 16GB]            BT_Storage[Storage: 250GB]        end    end
    CPU --> PT_CPU    CPU --> ST_CPU    CPU --> BT_CPU
    Memory --> PT_Mem    Memory --> ST_Mem    Memory --> BT_Mem
    Storage --> PT_Storage    Storage --> ST_Storage    Storage --> BT_Storage
    style Premium fill:#f96,stroke:#333,stroke-width:2px    style Standard fill:#99f,stroke:#333,stroke-width:2px    style Basic fill:#9f9,stroke:#333,stroke-width:2pxImplementing Resource Quotas
#!/usr/bin/env python3import jsonfrom opensearchpy import OpenSearchfrom typing import Dict, Optional
class ResourceQuotaManager:    def __init__(self, client: OpenSearch):        self.client = client        self.quota_index = ".tenant_quotas"
    def set_tenant_quota(self, tenant_id: str, quota_type: str = "standard"):        """Set resource quota for a tenant"""
        quotas = {            "premium": {                "max_indices": 100,                "max_shards": 500,                "max_docs": 100000000,  # 100M documents                "max_size_gb": 1000,                "max_fields": 1000,                "max_query_rate": 1000,  # queries per second                "max_index_rate": 5000,  # docs per second                "circuit_breaker": {                    "request": "80%",                    "total": "95%"                }            },            "standard": {                "max_indices": 50,                "max_shards": 250,                "max_docs": 50000000,  # 50M documents                "max_size_gb": 500,                "max_fields": 500,                "max_query_rate": 500,                "max_index_rate": 2500,                "circuit_breaker": {                    "request": "70%",                    "total": "90%"                }            },            "basic": {                "max_indices": 10,                "max_shards": 50,                "max_docs": 10000000,  # 10M documents                "max_size_gb": 100,                "max_fields": 200,                "max_query_rate": 100,                "max_index_rate": 500,                "circuit_breaker": {                    "request": "60%",                    "total": "85%"                }            }        }
        quota = quotas.get(quota_type, quotas["basic"])        quota["tenant_id"] = tenant_id        quota["quota_type"] = quota_type        quota["created_at"] = "now"
        # Store quota configuration        self.client.index(            index=self.quota_index,            id=tenant_id,            body=quota        )
        # Apply index settings        self._apply_index_settings(tenant_id, quota)
    def _apply_index_settings(self, tenant_id: str, quota: Dict):        """Apply quota settings to tenant indices"""
        settings = {            "index.max_result_window": min(quota["max_docs"], 10000),            "index.max_regex_length": 1000,            "index.max_terms_count": quota["max_fields"],            "index.max_script_fields": 32,            "index.requests.cache.enable": True,            "index.queries.cache.enabled": True        }
        # Apply to all tenant indices        self.client.indices.put_settings(            index=f"tenant-{tenant_id}*",            body={"settings": settings}        )
    def check_quota_usage(self, tenant_id: str) -> Dict:        """Check current resource usage against quota"""
        # Get quota configuration        quota_doc = self.client.get(index=self.quota_index, id=tenant_id)        quota = quota_doc["_source"]
        # Get current usage        stats = self.client.indices.stats(index=f"tenant-{tenant_id}*")
        total_docs = sum(idx["primaries"]["docs"]["count"]                        for idx in stats["indices"].values())        total_size = sum(idx["primaries"]["store"]["size_in_bytes"]                        for idx in stats["indices"].values())        total_shards = sum(len(idx["shards"])                          for idx in stats["indices"].values())
        usage = {            "tenant_id": tenant_id,            "quota_type": quota["quota_type"],            "usage": {                "indices": {                    "used": len(stats["indices"]),                    "limit": quota["max_indices"],                    "percentage": (len(stats["indices"]) / quota["max_indices"]) * 100                },                "shards": {                    "used": total_shards,                    "limit": quota["max_shards"],                    "percentage": (total_shards / quota["max_shards"]) * 100                },                "documents": {                    "used": total_docs,                    "limit": quota["max_docs"],                    "percentage": (total_docs / quota["max_docs"]) * 100                },                "storage_gb": {                    "used": total_size / (1024**3),                    "limit": quota["max_size_gb"],                    "percentage": (total_size / (1024**3) / quota["max_size_gb"]) * 100                }            },            "warnings": []        }
        # Add warnings for high usage        for metric, data in usage["usage"].items():            if data["percentage"] > 80:                usage["warnings"].append(f"{metric} usage above 80%")
        return usageQuery Rate Limiting
Rate Limiter Implementation
#!/usr/bin/env python3import timeimport redisfrom functools import wrapsfrom typing import Optional, Callable
class TenantRateLimiter:    def __init__(self, redis_host='localhost', redis_port=6379):        self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
    def limit_queries(self, tenant_id: str, max_qps: int):        """Decorator to enforce query rate limits per tenant"""        def decorator(func: Callable):            @wraps(func)            def wrapper(*args, **kwargs):                key = f"rate_limit:{tenant_id}:queries"
                try:                    current = self.redis_client.incr(key)                    if current == 1:                        self.redis_client.expire(key, 1)
                    if current > max_qps:                        raise RateLimitExceeded(                            f"Tenant {tenant_id} exceeded query rate limit of {max_qps} QPS"                        )
                    return func(*args, **kwargs)
                except redis.RedisError:                    # If Redis is down, allow the query (fail open)                    return func(*args, **kwargs)
            return wrapper        return decorator
    def limit_indexing(self, tenant_id: str, max_docs_per_second: int):        """Implement indexing rate limits using token bucket algorithm"""        def decorator(func: Callable):            @wraps(func)            def wrapper(*args, **kwargs):                bucket_key = f"token_bucket:{tenant_id}:indexing"                timestamp_key = f"token_bucket:{tenant_id}:indexing:timestamp"
                # Token bucket parameters                capacity = max_docs_per_second * 10  # Allow bursts                refill_rate = max_docs_per_second
                pipe = self.redis_client.pipeline()
                # Get current tokens and last refill time                pipe.get(bucket_key)                pipe.get(timestamp_key)                results = pipe.execute()
                current_tokens = int(results[0] or capacity)                last_refill = float(results[1] or time.time())
                # Calculate tokens to add                now = time.time()                elapsed = now - last_refill                tokens_to_add = int(elapsed * refill_rate)
                # Update bucket                new_tokens = min(capacity, current_tokens + tokens_to_add)
                # Check if we have tokens available                docs_count = kwargs.get('docs_count', 1)                if new_tokens >= docs_count:                    # Consume tokens                    pipe = self.redis_client.pipeline()                    pipe.set(bucket_key, new_tokens - docs_count)                    pipe.set(timestamp_key, now)                    pipe.expire(bucket_key, 60)                    pipe.expire(timestamp_key, 60)                    pipe.execute()
                    return func(*args, **kwargs)                else:                    raise RateLimitExceeded(                        f"Tenant {tenant_id} exceeded indexing rate limit"                    )
            return wrapper        return decorator
class RateLimitExceeded(Exception):    """Rate limit exceeded exception"""    pass
# Usage examplerate_limiter = TenantRateLimiter()
@rate_limiter.limit_queries("tenant-123", max_qps=100)def search_documents(query):    # Perform search    pass
@rate_limiter.limit_indexing("tenant-123", max_docs_per_second=1000)def bulk_index(documents, docs_count=None):    # Perform bulk indexing    passCross-Tenant Analytics
Aggregated Metrics Collection
#!/usr/bin/env python3from opensearchpy import OpenSearchfrom datetime import datetime, timedeltaimport json
class CrossTenantAnalytics:    def __init__(self, client: OpenSearch):        self.client = client        self.analytics_index = ".tenant_analytics"
    def collect_tenant_metrics(self):        """Collect metrics across all tenants"""
        # Get all tenant indices        all_indices = self.client.indices.get_alias(index="tenant-*")
        tenant_metrics = {}
        for index_name in all_indices:            tenant_id = index_name.split("-")[1]
            if tenant_id not in tenant_metrics:                tenant_metrics[tenant_id] = {                    "tenant_id": tenant_id,                    "timestamp": datetime.now().isoformat(),                    "indices": [],                    "total_docs": 0,                    "total_size_bytes": 0,                    "query_latency_ms": [],                    "index_rate": 0,                    "search_rate": 0                }
            # Get index stats            stats = self.client.indices.stats(index=index_name)            idx_stats = stats["indices"][index_name]
            tenant_metrics[tenant_id]["indices"].append(index_name)            tenant_metrics[tenant_id]["total_docs"] += idx_stats["primaries"]["docs"]["count"]            tenant_metrics[tenant_id]["total_size_bytes"] += idx_stats["primaries"]["store"]["size_in_bytes"]
            # Get search stats            search_stats = idx_stats["primaries"]["search"]            tenant_metrics[tenant_id]["search_rate"] = search_stats.get("query_total", 0) / max(search_stats.get("query_time_in_millis", 1) / 1000, 1)
            # Get indexing stats            indexing_stats = idx_stats["primaries"]["indexing"]            tenant_metrics[tenant_id]["index_rate"] = indexing_stats.get("index_total", 0) / max(indexing_stats.get("index_time_in_millis", 1) / 1000, 1)
        # Store metrics        for tenant_id, metrics in tenant_metrics.items():            self.client.index(                index=self.analytics_index,                body=metrics            )
        return tenant_metrics
    def generate_billing_report(self, start_date: datetime, end_date: datetime):        """Generate billing report based on resource usage"""
        query = {            "query": {                "range": {                    "timestamp": {                        "gte": start_date.isoformat(),                        "lte": end_date.isoformat()                    }                }            },            "aggs": {                "tenants": {                    "terms": {                        "field": "tenant_id.keyword",                        "size": 10000                    },                    "aggs": {                        "avg_docs": {                            "avg": {                                "field": "total_docs"                            }                        },                        "avg_storage_gb": {                            "avg": {                                "script": {                                    "source": "doc['total_size_bytes'].value / (1024.0 * 1024.0 * 1024.0)"                                }                            }                        },                        "total_searches": {                            "sum": {                                "field": "search_rate"                            }                        },                        "total_indexing": {                            "sum": {                                "field": "index_rate"                            }                        }                    }                }            }        }
        result = self.client.search(index=self.analytics_index, body=query)
        billing_report = []
        for bucket in result["aggregations"]["tenants"]["buckets"]:            tenant_id = bucket["key"]
            # Calculate costs (example pricing model)            storage_cost = bucket["avg_storage_gb"]["value"] * 0.10  # $0.10 per GB            doc_cost = (bucket["avg_docs"]["value"] / 1000000) * 1.00  # $1.00 per million docs            search_cost = (bucket["total_searches"]["value"] / 1000) * 0.01  # $0.01 per 1000 searches            index_cost = (bucket["total_indexing"]["value"] / 10000) * 0.05  # $0.05 per 10k indexing ops
            total_cost = storage_cost + doc_cost + search_cost + index_cost
            billing_report.append({                "tenant_id": tenant_id,                "period": f"{start_date.date()} to {end_date.date()}",                "usage": {                    "avg_storage_gb": round(bucket["avg_storage_gb"]["value"], 2),                    "avg_documents": int(bucket["avg_docs"]["value"]),                    "total_searches": int(bucket["total_searches"]["value"]),                    "total_indexing_ops": int(bucket["total_indexing"]["value"])                },                "costs": {                    "storage": round(storage_cost, 2),                    "documents": round(doc_cost, 2),                    "searches": round(search_cost, 2),                    "indexing": round(index_cost, 2),                    "total": round(total_cost, 2)                }            })
        return billing_reportSecurity Best Practices
Multi-Tenancy Security Checklist
graph TB    subgraph "Security Layers"        subgraph "Network Security"            TLS[TLS Encryption]            Firewall[Firewall Rules]            VPN[VPN Access]        end
        subgraph "Authentication"            SAML[SAML Integration]            OIDC[OpenID Connect]            MFA[Multi-Factor Auth]        end
        subgraph "Authorization"            RBAC[Role-Based Access]            ABAC[Attribute-Based Access]            DLS[Document-Level Security]            FLS[Field-Level Security]        end
        subgraph "Audit & Compliance"            AuditLog[Audit Logging]            Compliance[Compliance Monitoring]            DataRetention[Data Retention]        end    end
    TLS --> SAML    SAML --> RBAC    RBAC --> AuditLog
    style TLS fill:#f96,stroke:#333,stroke-width:2px    style RBAC fill:#9f9,stroke:#333,stroke-width:2px    style AuditLog fill:#99f,stroke:#333,stroke-width:2pxSecurity Configuration Script
#!/bin/bash# Enable security featurescat > /etc/opensearch/opensearch.yml << EOF# Security Configurationplugins.security.ssl.transport.pemcert_filepath: node-cert.pemplugins.security.ssl.transport.pemkey_filepath: node-key.pemplugins.security.ssl.transport.pemtrustedcas_filepath: root-ca.pemplugins.security.ssl.transport.enforce_hostname_verification: trueplugins.security.ssl.http.enabled: trueplugins.security.ssl.http.pemcert_filepath: node-cert.pemplugins.security.ssl.http.pemkey_filepath: node-key.pemplugins.security.ssl.http.pemtrustedcas_filepath: root-ca.pem
# Audit Configurationplugins.security.audit.type: internal_opensearchplugins.security.audit.config.log4j.logger_name: auditplugins.security.audit.config.log4j.level: INFOplugins.security.audit.config.disabled_rest_categories: NONEplugins.security.audit.config.disabled_transport_categories: NONE
# Multi-tenancyplugins.security.restapi.roles_enabled: ["all_access", "security_rest_api_access"]plugins.security.check_snapshot_restore_write_privileges: trueplugins.security.enable_snapshot_restore_privilege: true
# DLS/FLSplugins.security.dls_fls.enabled: true
# Field maskingplugins.security.compliance.history.write.log_diffs: trueplugins.security.compliance.history.read.watched_fields: ["personal_data.*", "sensitive.*"]EOF
# Configure authentication backendscat > /etc/opensearch/security/config.yml << EOF_meta:  type: "config"  config_version: 2
config:  dynamic:    http:      anonymous_auth_enabled: false      xff:        enabled: true        internalProxies: '192\.168\.0\.0/16'
    authc:      basic_internal_auth_domain:        http_enabled: true        transport_enabled: true        order: 0        http_authenticator:          type: basic          challenge: false        authentication_backend:          type: internal
      saml_auth_domain:        http_enabled: true        transport_enabled: false        order: 1        http_authenticator:          type: saml          challenge: true          config:            idp:              metadata_file: saml-idp-metadata.xml              entity_id: https://idp.company.com            sp:              entity_id: https://opensearch.company.com              signature_algorithm: RSA_SHA256            kibana_url: https://dashboards.company.com        authentication_backend:          type: noop
    authz:      roles_from_myldap:        http_enabled: true        transport_enabled: true        authorization_backend:          type: ldap          config:            enable_ssl: true            enable_start_tls: false            enable_ssl_client_auth: false            verify_hostnames: true            hosts:              - ldap.company.com:636            bind_dn: cn=admin,dc=company,dc=com            password: changeme            userbase: ou=people,dc=company,dc=com            usersearch: (uid={0})            username_attribute: uid            rolebase: ou=groups,dc=company,dc=com            rolesearch: (member={0})            userroleattribute: null            userrolename: memberOf            rolename: cn            resolve_nested_roles: trueEOF
# Apply security configuration/usr/share/opensearch/plugins/opensearch-security/tools/securityadmin.sh \  -cd /etc/opensearch/security/ \  -icl -nhnv \  -cacert /etc/opensearch/root-ca.pem \  -cert /etc/opensearch/admin-cert.pem \  -key /etc/opensearch/admin-key.pemMonitoring and Alerting
Tenant-Specific Monitoring
groups:  - name: tenant_alerts    interval: 30s    rules:      - alert: TenantQuotaExceeded        expr: |          (tenant_storage_used_bytes / tenant_storage_quota_bytes) > 0.9        for: 5m        labels:          severity: warning          team: platform        annotations:          summary: "Tenant {{ $labels.tenant_id }} approaching storage quota"          description: "Tenant {{ $labels.tenant_id }} is using {{ $value | humanizePercentage }} of allocated storage"
      - alert: TenantHighQueryLatency        expr: |          histogram_quantile(0.95, tenant_query_latency_seconds) > 1        for: 10m        labels:          severity: warning          team: platform        annotations:          summary: "High query latency for tenant {{ $labels.tenant_id }}"          description: "95th percentile query latency is {{ $value }}s"
      - alert: TenantIndexingRateAnomaly        expr: |          abs(rate(tenant_docs_indexed[5m]) - avg_over_time(rate(tenant_docs_indexed[5m])[1h:5m]))          / avg_over_time(rate(tenant_docs_indexed[5m])[1h:5m]) > 2        for: 15m        labels:          severity: info          team: security        annotations:          summary: "Unusual indexing pattern for tenant {{ $labels.tenant_id }}"          description: "Indexing rate deviates significantly from normal pattern"Migration and Scaling
Tenant Migration Strategy
#!/usr/bin/env python3import jsonfrom opensearchpy import OpenSearch, helpersfrom typing import List, Dict
class TenantMigrator:    def __init__(self, source_client: OpenSearch, target_client: OpenSearch):        self.source = source_client        self.target = target_client
    def migrate_tenant(self, tenant_id: str, strategy: str = "reindex"):        """Migrate tenant data between clusters or indices"""
        if strategy == "reindex":            self._reindex_migration(tenant_id)        elif strategy == "snapshot":            self._snapshot_migration(tenant_id)        elif strategy == "split":            self._split_tenant(tenant_id)
    def _reindex_migration(self, tenant_id: str):        """Migrate using reindex API"""
        source_index = f"tenant-{tenant_id}"        target_index = f"tenant-{tenant_id}-new"
        # Create target index with updated settings        self.target.indices.create(            index=target_index,            body={                "settings": {                    "number_of_shards": 5,                    "number_of_replicas": 1,                    "refresh_interval": "-1"  # Disable refresh during migration                },                "mappings": self.source.indices.get_mapping(index=source_index)[source_index]["mappings"]            }        )
        # Perform reindex        self.target.reindex(            body={                "source": {                    "remote": {                        "host": "http://source-cluster:9200",                        "username": "migration-user",                        "password": "migration-password"                    },                    "index": source_index,                    "size": 1000                },                "dest": {                    "index": target_index                }            },            wait_for_completion=False        )
        # Monitor reindex progress        task_id = response["task"]        self._monitor_task(task_id)
        # Switch alias        self.target.indices.update_aliases(            body={                "actions": [                    {"remove": {"index": source_index, "alias": f"{tenant_id}-current"}},                    {"add": {"index": target_index, "alias": f"{tenant_id}-current"}}                ]            }        )
    def _split_tenant(self, tenant_id: str):        """Split large tenant into multiple indices"""
        source_index = f"tenant-{tenant_id}"
        # Analyze data distribution        distribution = self.source.search(            index=source_index,            body={                "size": 0,                "aggs": {                    "data_categories": {                        "terms": {                            "field": "data_type.keyword",                            "size": 100                        }                    }                }            }        )
        # Create separate indices for each category        for bucket in distribution["aggregations"]["data_categories"]["buckets"]:            category = bucket["key"]            target_index = f"tenant-{tenant_id}-{category}"
            # Create category-specific index            self.target.indices.create(                index=target_index,                body={                    "settings": {                        "number_of_shards": 3,                        "number_of_replicas": 1                    }                }            )
            # Reindex category data            self.target.reindex(                body={                    "source": {                        "index": source_index,                        "query": {                            "term": {"data_type.keyword": category}                        }                    },                    "dest": {                        "index": target_index                    }                },                wait_for_completion=False            )Conclusion
Implementing multi-tenancy in OpenSearch requires careful planning and consideration of isolation requirements, performance characteristics, and operational complexity. The approach you choose—whether index-per-tenant, document-level security, or a hybrid model—should align with your specific use case, security requirements, and scalability needs.
Key takeaways for successful multi-tenant OpenSearch deployments:
- Choose the right isolation model based on your security and performance requirements
 - Implement comprehensive access controls using OpenSearch Security features
 - Monitor and enforce resource quotas to prevent noisy neighbor problems
 - Automate tenant provisioning to ensure consistency and reduce operational overhead
 - Plan for growth with appropriate sharding strategies and migration paths
 - Implement thorough monitoring to track per-tenant metrics and ensure SLA compliance
 
With proper implementation, OpenSearch can efficiently serve thousands of tenants from a single cluster while maintaining strong isolation and performance guarantees.