OpenSearch Multi-Tenancy Setup
Multi-tenancy in OpenSearch enables organizations to serve multiple customers or teams from a single cluster while maintaining strict data isolation and security boundaries. This guide provides a complete implementation for production-ready multi-tenant OpenSearch deployments.
Multi-Tenancy Architecture Overview
OpenSearch supports multiple approaches to multi-tenancy, each with different trade-offs in terms of isolation, performance, and resource utilization:
graph TB
subgraph "Multi-Tenancy Models"
subgraph "Index-per-Tenant"
T1_Index[Tenant1 Index]
T2_Index[Tenant2 Index]
T3_Index[Tenant3 Index]
end
subgraph "Alias-per-Tenant"
Shared_Index[Shared Index]
T1_Alias[Tenant1 Alias]
T2_Alias[Tenant2 Alias]
T3_Alias[Tenant3 Alias]
end
subgraph "Cluster-per-Tenant"
T1_Cluster[Tenant1 Cluster]
T2_Cluster[Tenant2 Cluster]
T3_Cluster[Tenant3 Cluster]
end
end
subgraph "Security Layer"
Auth[Authentication]
RBAC[Role-Based Access]
FLS[Field Level Security]
DLS[Document Level Security]
end
subgraph "Resource Management"
ISM[Index State Management]
Quotas[Resource Quotas]
Monitoring[Tenant Monitoring]
end
T1_Alias --> Shared_Index
T2_Alias --> Shared_Index
T3_Alias --> Shared_Index
Auth --> RBAC
RBAC --> FLS
RBAC --> DLS
style T1_Index fill:#f96,stroke:#333,stroke-width:2px
style RBAC fill:#9f9,stroke:#333,stroke-width:2px
Index-per-Tenant Implementation
The index-per-tenant model provides the strongest isolation between tenants:
Index Template Configuration
PUT _index_template/tenant-template
{
"index_patterns": ["tenant-*"],
"priority": 100,
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.refresh_interval": "5s",
"index.max_result_window": 10000,
"index.codec": "best_compression",
"index.routing.allocation.total_shards_per_node": 2
},
"mappings": {
"dynamic": "strict",
"properties": {
"tenant_id": {
"type": "keyword",
"index": false
},
"created_at": {
"type": "date",
"format": "epoch_millis"
},
"updated_at": {
"type": "date",
"format": "epoch_millis"
},
"data": {
"type": "object",
"enabled": true
}
}
},
"aliases": {}
},
"composed_of": ["component-template-security", "component-template-lifecycle"]
}
Tenant Provisioning Script
#!/usr/bin/env python3
# provision_tenant.py
import os
import json
import hashlib
import requests
from datetime import datetime
from opensearchpy import OpenSearch
class TenantProvisioner:
def __init__(self, host='localhost', port=9200, auth=('admin', 'admin')):
self.client = OpenSearch(
hosts=[{'host': host, 'port': port}],
http_auth=auth,
use_ssl=True,
verify_certs=False,
ssl_show_warn=False
)
self.security_api_url = f"https://{host}:{port}/_plugins/_security/api"
self.auth = auth
def create_tenant(self, tenant_id, tenant_name, admin_user):
"""Create a new tenant with all necessary configurations"""
print(f"Creating tenant: {tenant_id}")
# Create tenant index
index_name = f"tenant-{tenant_id}"
self._create_index(index_name, tenant_id)
# Create tenant roles
self._create_tenant_roles(tenant_id)
# Create tenant admin user
self._create_tenant_admin(tenant_id, admin_user)
# Create index lifecycle policy
self._create_lifecycle_policy(tenant_id)
# Create monitoring dashboards
self._create_monitoring_resources(tenant_id)
print(f"Tenant {tenant_id} created successfully")
def _create_index(self, index_name, tenant_id):
"""Create tenant-specific index"""
body = {
"settings": {
"index.blocks.read_only_allow_delete": False,
"analysis": {
"analyzer": {
"tenant_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "stop", "snowball"]
}
}
}
},
"aliases": {
f"{tenant_id}-current": {},
f"{tenant_id}-search": {
"filter": {
"term": {"tenant_id": tenant_id}
}
}
}
}
self.client.indices.create(index=index_name, body=body)
def _create_tenant_roles(self, tenant_id):
"""Create roles for tenant access control"""
# Tenant admin role
admin_role = {
"cluster_permissions": [
"cluster_monitor",
"indices_monitor"
],
"index_permissions": [{
"index_patterns": [f"tenant-{tenant_id}*"],
"allowed_actions": [
"crud",
"create_index",
"manage",
"manage_aliases",
"delete",
"index",
"read",
"write",
"search"
]
}],
"tenant_permissions": [{
"tenant_patterns": [tenant_id],
"allowed_actions": ["kibana_all_write"]
}]
}
# Tenant read-only role
readonly_role = {
"cluster_permissions": ["cluster_monitor"],
"index_permissions": [{
"index_patterns": [f"tenant-{tenant_id}*"],
"allowed_actions": ["read", "search"]
}],
"tenant_permissions": [{
"tenant_patterns": [tenant_id],
"allowed_actions": ["kibana_all_read"]
}]
}
# Create roles via Security API
self._security_api_put(f"roles/{tenant_id}_admin", admin_role)
self._security_api_put(f"roles/{tenant_id}_readonly", readonly_role)
def _create_tenant_admin(self, tenant_id, admin_user):
"""Create tenant administrator user"""
user_data = {
"password": self._generate_password(tenant_id),
"opendistro_security_roles": [f"{tenant_id}_admin"],
"backend_roles": [f"tenant_{tenant_id}"],
"attributes": {
"tenant_id": tenant_id,
"created_at": datetime.now().isoformat()
}
}
self._security_api_put(f"internalusers/{admin_user}", user_data)
def _create_lifecycle_policy(self, tenant_id):
"""Create index lifecycle management policy"""
policy = {
"policy": {
"description": f"Lifecycle policy for tenant {tenant_id}",
"default_state": "hot",
"states": [
{
"name": "hot",
"actions": [
{
"rollover": {
"min_index_age": "7d",
"min_size": "50gb"
}
}
],
"transitions": [
{
"state_name": "warm",
"conditions": {
"min_index_age": "30d"
}
}
]
},
{
"name": "warm",
"actions": [
{
"replica_count": {
"number_of_replicas": 1
}
},
{
"shrink": {
"number_of_shards": 1
}
}
],
"transitions": [
{
"state_name": "delete",
"conditions": {
"min_index_age": "90d"
}
}
]
},
{
"name": "delete",
"actions": [
{
"delete": {}
}
]
}
]
}
}
response = self.client.transport.perform_request(
'PUT',
f'/_plugins/_ism/policies/{tenant_id}_lifecycle',
body=policy
)
def _create_monitoring_resources(self, tenant_id):
"""Create monitoring dashboards and visualizations"""
# Create index pattern
pattern_id = f"{tenant_id}-pattern"
pattern_body = {
"title": f"tenant-{tenant_id}*",
"timeFieldName": "created_at"
}
# This would integrate with OpenSearch Dashboards API
# Implementation depends on Dashboards version and configuration
def _generate_password(self, tenant_id):
"""Generate secure password for tenant"""
# In production, use a secure password generator
return hashlib.sha256(f"{tenant_id}-{datetime.now()}".encode()).hexdigest()[:16]
def _security_api_put(self, endpoint, data):
"""Make PUT request to Security API"""
response = requests.put(
f"{self.security_api_url}/{endpoint}",
json=data,
auth=self.auth,
verify=False
)
response.raise_for_status()
return response.json()
# Usage example
if __name__ == "__main__":
provisioner = TenantProvisioner()
provisioner.create_tenant("acme-corp", "ACME Corporation", "acme_admin")
Document-Level Security Implementation
For shared index multi-tenancy, document-level security (DLS) provides data isolation:
graph LR
subgraph "Shared Index"
Doc1[Document 1<br/>tenant: A]
Doc2[Document 2<br/>tenant: B]
Doc3[Document 3<br/>tenant: A]
Doc4[Document 4<br/>tenant: C]
end
subgraph "DLS Filters"
FilterA[Tenant A Filter<br/>tenant_id: A]
FilterB[Tenant B Filter<br/>tenant_id: B]
FilterC[Tenant C Filter<br/>tenant_id: C]
end
subgraph "User Views"
UserA[User A<br/>Sees: Doc1, Doc3]
UserB[User B<br/>Sees: Doc2]
UserC[User C<br/>Sees: Doc4]
end
FilterA --> Doc1
FilterA --> Doc3
FilterB --> Doc2
FilterC --> Doc4
Doc1 --> UserA
Doc3 --> UserA
Doc2 --> UserB
Doc4 --> UserC
style FilterA fill:#f96,stroke:#333,stroke-width:2px
style UserA fill:#9f9,stroke:#333,stroke-width:2px
DLS Role Configuration
PUT _plugins/_security/api/roles/tenant_dls_role
{
"cluster_permissions": ["cluster_monitor"],
"index_permissions": [{
"index_patterns": ["shared-data-*"],
"dls": "{\"term\": {\"tenant_id\": \"${attr.internal.tenant_id}\"}}",
"allowed_actions": [
"read",
"write",
"delete",
"search",
"index"
]
}]
}
Field-Level Security Configuration
PUT _plugins/_security/api/roles/tenant_fls_role
{
"cluster_permissions": ["cluster_monitor"],
"index_permissions": [{
"index_patterns": ["shared-data-*"],
"dls": "{\"term\": {\"tenant_id\": \"${attr.internal.tenant_id}\"}}",
"fls": [
"~internal_*",
"~system_*",
"~admin_notes"
],
"allowed_actions": ["read", "search"]
}]
}
Kibana/OpenSearch Dashboards Multi-Tenancy
Tenant Space Configuration
# opensearch_dashboards.yml
opensearch.username: "kibanaserver"
opensearch.password: "kibanaserver"
opensearch.requestHeadersWhitelist: ["securitytenant", "Authorization"]
opensearch_security.multitenancy.enabled: true
opensearch_security.multitenancy.tenants.enable_global: true
opensearch_security.multitenancy.tenants.enable_private: true
opensearch_security.multitenancy.tenants.preferred: ["Private", "Global"]
opensearch_security.readonly_mode.roles: ["readonly_role"]
# Tenant branding
opensearch_security.multitenancy.custom_branding:
tenant_a:
logo: "/assets/tenant_a_logo.svg"
favicon: "/assets/tenant_a_favicon.ico"
title: "Tenant A Analytics"
tenant_b:
logo: "/assets/tenant_b_logo.svg"
favicon: "/assets/tenant_b_favicon.ico"
title: "Tenant B Dashboard"
Automated Tenant Dashboard Setup
#!/usr/bin/env python3
# setup_tenant_dashboards.py
import requests
import json
from typing import Dict, List
class DashboardManager:
def __init__(self, dashboards_url: str, auth: tuple):
self.url = dashboards_url
self.auth = auth
self.headers = {
'Content-Type': 'application/json',
'osd-xsrf': 'true'
}
def create_tenant_space(self, tenant_id: str):
"""Create a new tenant space in OpenSearch Dashboards"""
# Create saved objects for tenant
self._create_index_pattern(tenant_id)
self._create_default_dashboard(tenant_id)
self._create_default_visualizations(tenant_id)
def _create_index_pattern(self, tenant_id: str):
"""Create index pattern for tenant"""
pattern = {
"attributes": {
"title": f"tenant-{tenant_id}*",
"timeFieldName": "created_at",
"fields": "[]",
"fieldFormatMap": "{}"
}
}
response = requests.post(
f"{self.url}/api/saved_objects/index-pattern",
headers={**self.headers, 'securitytenant': tenant_id},
json=pattern,
auth=self.auth,
verify=False
)
return response.json()
def _create_default_dashboard(self, tenant_id: str):
"""Create default dashboard for tenant"""
dashboard = {
"attributes": {
"title": f"{tenant_id} Overview Dashboard",
"hits": 0,
"description": f"Main dashboard for {tenant_id}",
"panelsJSON": json.dumps([
{
"id": "1",
"type": "visualization",
"gridData": {
"x": 0,
"y": 0,
"w": 24,
"h": 15
}
},
{
"id": "2",
"type": "visualization",
"gridData": {
"x": 24,
"y": 0,
"w": 24,
"h": 15
}
}
]),
"optionsJSON": json.dumps({
"darkTheme": False,
"hidePanelTitles": False,
"useMargins": True
}),
"version": 1,
"timeRestore": True,
"timeTo": "now",
"timeFrom": "now-7d",
"refreshInterval": {
"pause": True,
"value": 0
},
"kibanaSavedObjectMeta": {
"searchSourceJSON": json.dumps({
"query": {
"language": "kuery",
"query": ""
},
"filter": []
})
}
}
}
response = requests.post(
f"{self.url}/api/saved_objects/dashboard",
headers={**self.headers, 'securitytenant': tenant_id},
json=dashboard,
auth=self.auth,
verify=False
)
return response.json()
def _create_default_visualizations(self, tenant_id: str):
"""Create default visualizations for tenant"""
# Document count over time
time_series_viz = {
"attributes": {
"title": f"{tenant_id} - Documents Over Time",
"visState": json.dumps({
"title": f"{tenant_id} - Documents Over Time",
"type": "line",
"aggs": [
{
"id": "1",
"enabled": True,
"type": "count",
"params": {},
"schema": "metric"
},
{
"id": "2",
"enabled": True,
"type": "date_histogram",
"params": {
"field": "created_at",
"interval": "auto",
"min_doc_count": 0,
"extended_bounds": {}
},
"schema": "segment"
}
]
}),
"uiStateJSON": "{}",
"kibanaSavedObjectMeta": {
"searchSourceJSON": json.dumps({
"index": f"tenant-{tenant_id}*",
"query": {
"match_all": {}
}
})
}
}
}
response = requests.post(
f"{self.url}/api/saved_objects/visualization",
headers={**self.headers, 'securitytenant': tenant_id},
json=time_series_viz,
auth=self.auth,
verify=False
)
return response.json()
Resource Isolation and Quotas
Resource Allocation Strategy
graph TB
subgraph "Cluster Resources"
CPU[CPU Cores: 32]
Memory[Memory: 128GB]
Storage[Storage: 2TB]
end
subgraph "Resource Pools"
subgraph "Premium Tenants"
PT_CPU[CPU: 16 cores]
PT_Mem[Memory: 64GB]
PT_Storage[Storage: 1TB]
end
subgraph "Standard Tenants"
ST_CPU[CPU: 12 cores]
ST_Mem[Memory: 48GB]
ST_Storage[Storage: 750GB]
end
subgraph "Basic Tenants"
BT_CPU[CPU: 4 cores]
BT_Mem[Memory: 16GB]
BT_Storage[Storage: 250GB]
end
end
CPU --> PT_CPU
CPU --> ST_CPU
CPU --> BT_CPU
Memory --> PT_Mem
Memory --> ST_Mem
Memory --> BT_Mem
Storage --> PT_Storage
Storage --> ST_Storage
Storage --> BT_Storage
style Premium fill:#f96,stroke:#333,stroke-width:2px
style Standard fill:#99f,stroke:#333,stroke-width:2px
style Basic fill:#9f9,stroke:#333,stroke-width:2px
Implementing Resource Quotas
#!/usr/bin/env python3
# resource_quota_manager.py
import json
from opensearchpy import OpenSearch
from typing import Dict, Optional
class ResourceQuotaManager:
def __init__(self, client: OpenSearch):
self.client = client
self.quota_index = ".tenant_quotas"
def set_tenant_quota(self, tenant_id: str, quota_type: str = "standard"):
"""Set resource quota for a tenant"""
quotas = {
"premium": {
"max_indices": 100,
"max_shards": 500,
"max_docs": 100000000, # 100M documents
"max_size_gb": 1000,
"max_fields": 1000,
"max_query_rate": 1000, # queries per second
"max_index_rate": 5000, # docs per second
"circuit_breaker": {
"request": "80%",
"total": "95%"
}
},
"standard": {
"max_indices": 50,
"max_shards": 250,
"max_docs": 50000000, # 50M documents
"max_size_gb": 500,
"max_fields": 500,
"max_query_rate": 500,
"max_index_rate": 2500,
"circuit_breaker": {
"request": "70%",
"total": "90%"
}
},
"basic": {
"max_indices": 10,
"max_shards": 50,
"max_docs": 10000000, # 10M documents
"max_size_gb": 100,
"max_fields": 200,
"max_query_rate": 100,
"max_index_rate": 500,
"circuit_breaker": {
"request": "60%",
"total": "85%"
}
}
}
quota = quotas.get(quota_type, quotas["basic"])
quota["tenant_id"] = tenant_id
quota["quota_type"] = quota_type
quota["created_at"] = "now"
# Store quota configuration
self.client.index(
index=self.quota_index,
id=tenant_id,
body=quota
)
# Apply index settings
self._apply_index_settings(tenant_id, quota)
def _apply_index_settings(self, tenant_id: str, quota: Dict):
"""Apply quota settings to tenant indices"""
settings = {
"index.max_result_window": min(quota["max_docs"], 10000),
"index.max_regex_length": 1000,
"index.max_terms_count": quota["max_fields"],
"index.max_script_fields": 32,
"index.requests.cache.enable": True,
"index.queries.cache.enabled": True
}
# Apply to all tenant indices
self.client.indices.put_settings(
index=f"tenant-{tenant_id}*",
body={"settings": settings}
)
def check_quota_usage(self, tenant_id: str) -> Dict:
"""Check current resource usage against quota"""
# Get quota configuration
quota_doc = self.client.get(index=self.quota_index, id=tenant_id)
quota = quota_doc["_source"]
# Get current usage
stats = self.client.indices.stats(index=f"tenant-{tenant_id}*")
total_docs = sum(idx["primaries"]["docs"]["count"]
for idx in stats["indices"].values())
total_size = sum(idx["primaries"]["store"]["size_in_bytes"]
for idx in stats["indices"].values())
total_shards = sum(len(idx["shards"])
for idx in stats["indices"].values())
usage = {
"tenant_id": tenant_id,
"quota_type": quota["quota_type"],
"usage": {
"indices": {
"used": len(stats["indices"]),
"limit": quota["max_indices"],
"percentage": (len(stats["indices"]) / quota["max_indices"]) * 100
},
"shards": {
"used": total_shards,
"limit": quota["max_shards"],
"percentage": (total_shards / quota["max_shards"]) * 100
},
"documents": {
"used": total_docs,
"limit": quota["max_docs"],
"percentage": (total_docs / quota["max_docs"]) * 100
},
"storage_gb": {
"used": total_size / (1024**3),
"limit": quota["max_size_gb"],
"percentage": (total_size / (1024**3) / quota["max_size_gb"]) * 100
}
},
"warnings": []
}
# Add warnings for high usage
for metric, data in usage["usage"].items():
if data["percentage"] > 80:
usage["warnings"].append(f"{metric} usage above 80%")
return usage
Query Rate Limiting
Rate Limiter Implementation
#!/usr/bin/env python3
# rate_limiter.py
import time
import redis
from functools import wraps
from typing import Optional, Callable
class TenantRateLimiter:
def __init__(self, redis_host='localhost', redis_port=6379):
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
def limit_queries(self, tenant_id: str, max_qps: int):
"""Decorator to enforce query rate limits per tenant"""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
key = f"rate_limit:{tenant_id}:queries"
try:
current = self.redis_client.incr(key)
if current == 1:
self.redis_client.expire(key, 1)
if current > max_qps:
raise RateLimitExceeded(
f"Tenant {tenant_id} exceeded query rate limit of {max_qps} QPS"
)
return func(*args, **kwargs)
except redis.RedisError:
# If Redis is down, allow the query (fail open)
return func(*args, **kwargs)
return wrapper
return decorator
def limit_indexing(self, tenant_id: str, max_docs_per_second: int):
"""Implement indexing rate limits using token bucket algorithm"""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
bucket_key = f"token_bucket:{tenant_id}:indexing"
timestamp_key = f"token_bucket:{tenant_id}:indexing:timestamp"
# Token bucket parameters
capacity = max_docs_per_second * 10 # Allow bursts
refill_rate = max_docs_per_second
pipe = self.redis_client.pipeline()
# Get current tokens and last refill time
pipe.get(bucket_key)
pipe.get(timestamp_key)
results = pipe.execute()
current_tokens = int(results[0] or capacity)
last_refill = float(results[1] or time.time())
# Calculate tokens to add
now = time.time()
elapsed = now - last_refill
tokens_to_add = int(elapsed * refill_rate)
# Update bucket
new_tokens = min(capacity, current_tokens + tokens_to_add)
# Check if we have tokens available
docs_count = kwargs.get('docs_count', 1)
if new_tokens >= docs_count:
# Consume tokens
pipe = self.redis_client.pipeline()
pipe.set(bucket_key, new_tokens - docs_count)
pipe.set(timestamp_key, now)
pipe.expire(bucket_key, 60)
pipe.expire(timestamp_key, 60)
pipe.execute()
return func(*args, **kwargs)
else:
raise RateLimitExceeded(
f"Tenant {tenant_id} exceeded indexing rate limit"
)
return wrapper
return decorator
class RateLimitExceeded(Exception):
"""Rate limit exceeded exception"""
pass
# Usage example
rate_limiter = TenantRateLimiter()
@rate_limiter.limit_queries("tenant-123", max_qps=100)
def search_documents(query):
# Perform search
pass
@rate_limiter.limit_indexing("tenant-123", max_docs_per_second=1000)
def bulk_index(documents, docs_count=None):
# Perform bulk indexing
pass
Cross-Tenant Analytics
Aggregated Metrics Collection
#!/usr/bin/env python3
# cross_tenant_analytics.py
from opensearchpy import OpenSearch
from datetime import datetime, timedelta
import json
class CrossTenantAnalytics:
def __init__(self, client: OpenSearch):
self.client = client
self.analytics_index = ".tenant_analytics"
def collect_tenant_metrics(self):
"""Collect metrics across all tenants"""
# Get all tenant indices
all_indices = self.client.indices.get_alias(index="tenant-*")
tenant_metrics = {}
for index_name in all_indices:
tenant_id = index_name.split("-")[1]
if tenant_id not in tenant_metrics:
tenant_metrics[tenant_id] = {
"tenant_id": tenant_id,
"timestamp": datetime.now().isoformat(),
"indices": [],
"total_docs": 0,
"total_size_bytes": 0,
"query_latency_ms": [],
"index_rate": 0,
"search_rate": 0
}
# Get index stats
stats = self.client.indices.stats(index=index_name)
idx_stats = stats["indices"][index_name]
tenant_metrics[tenant_id]["indices"].append(index_name)
tenant_metrics[tenant_id]["total_docs"] += idx_stats["primaries"]["docs"]["count"]
tenant_metrics[tenant_id]["total_size_bytes"] += idx_stats["primaries"]["store"]["size_in_bytes"]
# Get search stats
search_stats = idx_stats["primaries"]["search"]
tenant_metrics[tenant_id]["search_rate"] = search_stats.get("query_total", 0) / max(search_stats.get("query_time_in_millis", 1) / 1000, 1)
# Get indexing stats
indexing_stats = idx_stats["primaries"]["indexing"]
tenant_metrics[tenant_id]["index_rate"] = indexing_stats.get("index_total", 0) / max(indexing_stats.get("index_time_in_millis", 1) / 1000, 1)
# Store metrics
for tenant_id, metrics in tenant_metrics.items():
self.client.index(
index=self.analytics_index,
body=metrics
)
return tenant_metrics
def generate_billing_report(self, start_date: datetime, end_date: datetime):
"""Generate billing report based on resource usage"""
query = {
"query": {
"range": {
"timestamp": {
"gte": start_date.isoformat(),
"lte": end_date.isoformat()
}
}
},
"aggs": {
"tenants": {
"terms": {
"field": "tenant_id.keyword",
"size": 10000
},
"aggs": {
"avg_docs": {
"avg": {
"field": "total_docs"
}
},
"avg_storage_gb": {
"avg": {
"script": {
"source": "doc['total_size_bytes'].value / (1024.0 * 1024.0 * 1024.0)"
}
}
},
"total_searches": {
"sum": {
"field": "search_rate"
}
},
"total_indexing": {
"sum": {
"field": "index_rate"
}
}
}
}
}
}
result = self.client.search(index=self.analytics_index, body=query)
billing_report = []
for bucket in result["aggregations"]["tenants"]["buckets"]:
tenant_id = bucket["key"]
# Calculate costs (example pricing model)
storage_cost = bucket["avg_storage_gb"]["value"] * 0.10 # $0.10 per GB
doc_cost = (bucket["avg_docs"]["value"] / 1000000) * 1.00 # $1.00 per million docs
search_cost = (bucket["total_searches"]["value"] / 1000) * 0.01 # $0.01 per 1000 searches
index_cost = (bucket["total_indexing"]["value"] / 10000) * 0.05 # $0.05 per 10k indexing ops
total_cost = storage_cost + doc_cost + search_cost + index_cost
billing_report.append({
"tenant_id": tenant_id,
"period": f"{start_date.date()} to {end_date.date()}",
"usage": {
"avg_storage_gb": round(bucket["avg_storage_gb"]["value"], 2),
"avg_documents": int(bucket["avg_docs"]["value"]),
"total_searches": int(bucket["total_searches"]["value"]),
"total_indexing_ops": int(bucket["total_indexing"]["value"])
},
"costs": {
"storage": round(storage_cost, 2),
"documents": round(doc_cost, 2),
"searches": round(search_cost, 2),
"indexing": round(index_cost, 2),
"total": round(total_cost, 2)
}
})
return billing_report
Security Best Practices
Multi-Tenancy Security Checklist
graph TB
subgraph "Security Layers"
subgraph "Network Security"
TLS[TLS Encryption]
Firewall[Firewall Rules]
VPN[VPN Access]
end
subgraph "Authentication"
SAML[SAML Integration]
OIDC[OpenID Connect]
MFA[Multi-Factor Auth]
end
subgraph "Authorization"
RBAC[Role-Based Access]
ABAC[Attribute-Based Access]
DLS[Document-Level Security]
FLS[Field-Level Security]
end
subgraph "Audit & Compliance"
AuditLog[Audit Logging]
Compliance[Compliance Monitoring]
DataRetention[Data Retention]
end
end
TLS --> SAML
SAML --> RBAC
RBAC --> AuditLog
style TLS fill:#f96,stroke:#333,stroke-width:2px
style RBAC fill:#9f9,stroke:#333,stroke-width:2px
style AuditLog fill:#99f,stroke:#333,stroke-width:2px
Security Configuration Script
#!/bin/bash
# secure_multi_tenant_setup.sh
# Enable security features
cat > /etc/opensearch/opensearch.yml << EOF
# Security Configuration
plugins.security.ssl.transport.pemcert_filepath: node-cert.pem
plugins.security.ssl.transport.pemkey_filepath: node-key.pem
plugins.security.ssl.transport.pemtrustedcas_filepath: root-ca.pem
plugins.security.ssl.transport.enforce_hostname_verification: true
plugins.security.ssl.http.enabled: true
plugins.security.ssl.http.pemcert_filepath: node-cert.pem
plugins.security.ssl.http.pemkey_filepath: node-key.pem
plugins.security.ssl.http.pemtrustedcas_filepath: root-ca.pem
# Audit Configuration
plugins.security.audit.type: internal_opensearch
plugins.security.audit.config.log4j.logger_name: audit
plugins.security.audit.config.log4j.level: INFO
plugins.security.audit.config.disabled_rest_categories: NONE
plugins.security.audit.config.disabled_transport_categories: NONE
# Multi-tenancy
plugins.security.restapi.roles_enabled: ["all_access", "security_rest_api_access"]
plugins.security.check_snapshot_restore_write_privileges: true
plugins.security.enable_snapshot_restore_privilege: true
# DLS/FLS
plugins.security.dls_fls.enabled: true
# Field masking
plugins.security.compliance.history.write.log_diffs: true
plugins.security.compliance.history.read.watched_fields: ["personal_data.*", "sensitive.*"]
EOF
# Configure authentication backends
cat > /etc/opensearch/security/config.yml << EOF
_meta:
type: "config"
config_version: 2
config:
dynamic:
http:
anonymous_auth_enabled: false
xff:
enabled: true
internalProxies: '192\.168\.0\.0/16'
authc:
basic_internal_auth_domain:
http_enabled: true
transport_enabled: true
order: 0
http_authenticator:
type: basic
challenge: false
authentication_backend:
type: internal
saml_auth_domain:
http_enabled: true
transport_enabled: false
order: 1
http_authenticator:
type: saml
challenge: true
config:
idp:
metadata_file: saml-idp-metadata.xml
entity_id: https://idp.company.com
sp:
entity_id: https://opensearch.company.com
signature_algorithm: RSA_SHA256
kibana_url: https://dashboards.company.com
authentication_backend:
type: noop
authz:
roles_from_myldap:
http_enabled: true
transport_enabled: true
authorization_backend:
type: ldap
config:
enable_ssl: true
enable_start_tls: false
enable_ssl_client_auth: false
verify_hostnames: true
hosts:
- ldap.company.com:636
bind_dn: cn=admin,dc=company,dc=com
password: changeme
userbase: ou=people,dc=company,dc=com
usersearch: (uid={0})
username_attribute: uid
rolebase: ou=groups,dc=company,dc=com
rolesearch: (member={0})
userroleattribute: null
userrolename: memberOf
rolename: cn
resolve_nested_roles: true
EOF
# Apply security configuration
/usr/share/opensearch/plugins/opensearch-security/tools/securityadmin.sh \
-cd /etc/opensearch/security/ \
-icl -nhnv \
-cacert /etc/opensearch/root-ca.pem \
-cert /etc/opensearch/admin-cert.pem \
-key /etc/opensearch/admin-key.pem
Monitoring and Alerting
Tenant-Specific Monitoring
# tenant_monitoring_rules.yml
groups:
- name: tenant_alerts
interval: 30s
rules:
- alert: TenantQuotaExceeded
expr: |
(tenant_storage_used_bytes / tenant_storage_quota_bytes) > 0.9
for: 5m
labels:
severity: warning
team: platform
annotations:
summary: "Tenant {{ $labels.tenant_id }} approaching storage quota"
description: "Tenant {{ $labels.tenant_id }} is using {{ $value | humanizePercentage }} of allocated storage"
- alert: TenantHighQueryLatency
expr: |
histogram_quantile(0.95, tenant_query_latency_seconds) > 1
for: 10m
labels:
severity: warning
team: platform
annotations:
summary: "High query latency for tenant {{ $labels.tenant_id }}"
description: "95th percentile query latency is {{ $value }}s"
- alert: TenantIndexingRateAnomaly
expr: |
abs(rate(tenant_docs_indexed[5m]) - avg_over_time(rate(tenant_docs_indexed[5m])[1h:5m]))
/ avg_over_time(rate(tenant_docs_indexed[5m])[1h:5m]) > 2
for: 15m
labels:
severity: info
team: security
annotations:
summary: "Unusual indexing pattern for tenant {{ $labels.tenant_id }}"
description: "Indexing rate deviates significantly from normal pattern"
Migration and Scaling
Tenant Migration Strategy
#!/usr/bin/env python3
# tenant_migration.py
import json
from opensearchpy import OpenSearch, helpers
from typing import List, Dict
class TenantMigrator:
def __init__(self, source_client: OpenSearch, target_client: OpenSearch):
self.source = source_client
self.target = target_client
def migrate_tenant(self, tenant_id: str, strategy: str = "reindex"):
"""Migrate tenant data between clusters or indices"""
if strategy == "reindex":
self._reindex_migration(tenant_id)
elif strategy == "snapshot":
self._snapshot_migration(tenant_id)
elif strategy == "split":
self._split_tenant(tenant_id)
def _reindex_migration(self, tenant_id: str):
"""Migrate using reindex API"""
source_index = f"tenant-{tenant_id}"
target_index = f"tenant-{tenant_id}-new"
# Create target index with updated settings
self.target.indices.create(
index=target_index,
body={
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1,
"refresh_interval": "-1" # Disable refresh during migration
},
"mappings": self.source.indices.get_mapping(index=source_index)[source_index]["mappings"]
}
)
# Perform reindex
self.target.reindex(
body={
"source": {
"remote": {
"host": "http://source-cluster:9200",
"username": "migration-user",
"password": "migration-password"
},
"index": source_index,
"size": 1000
},
"dest": {
"index": target_index
}
},
wait_for_completion=False
)
# Monitor reindex progress
task_id = response["task"]
self._monitor_task(task_id)
# Switch alias
self.target.indices.update_aliases(
body={
"actions": [
{"remove": {"index": source_index, "alias": f"{tenant_id}-current"}},
{"add": {"index": target_index, "alias": f"{tenant_id}-current"}}
]
}
)
def _split_tenant(self, tenant_id: str):
"""Split large tenant into multiple indices"""
source_index = f"tenant-{tenant_id}"
# Analyze data distribution
distribution = self.source.search(
index=source_index,
body={
"size": 0,
"aggs": {
"data_categories": {
"terms": {
"field": "data_type.keyword",
"size": 100
}
}
}
}
)
# Create separate indices for each category
for bucket in distribution["aggregations"]["data_categories"]["buckets"]:
category = bucket["key"]
target_index = f"tenant-{tenant_id}-{category}"
# Create category-specific index
self.target.indices.create(
index=target_index,
body={
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
}
}
)
# Reindex category data
self.target.reindex(
body={
"source": {
"index": source_index,
"query": {
"term": {"data_type.keyword": category}
}
},
"dest": {
"index": target_index
}
},
wait_for_completion=False
)
Conclusion
Implementing multi-tenancy in OpenSearch requires careful planning and consideration of isolation requirements, performance characteristics, and operational complexity. The approach you choose—whether index-per-tenant, document-level security, or a hybrid model—should align with your specific use case, security requirements, and scalability needs.
Key takeaways for successful multi-tenant OpenSearch deployments:
- Choose the right isolation model based on your security and performance requirements
- Implement comprehensive access controls using OpenSearch Security features
- Monitor and enforce resource quotas to prevent noisy neighbor problems
- Automate tenant provisioning to ensure consistency and reduce operational overhead
- Plan for growth with appropriate sharding strategies and migration paths
- Implement thorough monitoring to track per-tenant metrics and ensure SLA compliance
With proper implementation, OpenSearch can efficiently serve thousands of tenants from a single cluster while maintaining strong isolation and performance guarantees.