Table of Contents
Open Table of Contents
Introduction
Wazuh provides a powerful RESTful API for programmatic access to its security monitoring capabilities. This guide covers implementing secure authentication, managing API tokens, configuring role-based access control (RBAC), and provides practical examples for common use cases.
Wazuh API Authentication Overview
Wazuh API supports multiple authentication methods:
- Basic Authentication - Username and password
- JWT Tokens - JSON Web Tokens for stateless authentication
- API Keys - Long-lived authentication tokens
- Integration Authentication - For external system integration
Basic Authentication Implementation
Python Client with Basic Auth
#!/usr/bin/env python3
# wazuh_api_client.py - Wazuh API client with authentication
import requests
import json
import base64
from typing import Dict, Any, Optional, List
import urllib3
from datetime import datetime, timedelta
import logging
# Disable SSL warnings for self-signed certificates
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class WazuhAPIClient:
def __init__(self, base_url: str, username: str, password: str,
verify_ssl: bool = False, timeout: int = 30):
self.base_url = base_url.rstrip('/')
self.username = username
self.password = password
self.verify_ssl = verify_ssl
self.timeout = timeout
self.session = requests.Session()
self.token: Optional[str] = None
self.token_expires: Optional[datetime] = None
# Setup logging
self.logger = logging.getLogger(__name__)
# Setup session defaults
self.session.verify = verify_ssl
self.session.headers.update({
'Content-Type': 'application/json',
'User-Agent': 'WazuhAPIClient/1.0'
})
def _get_basic_auth_header(self) -> str:
"""Generate basic authentication header"""
credentials = f"{self.username}:{self.password}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
return f"Basic {encoded_credentials}"
def authenticate(self) -> bool:
"""Authenticate with Wazuh API and obtain JWT token"""
auth_url = f"{self.base_url}/security/user/authenticate"
headers = {
'Authorization': self._get_basic_auth_header()
}
try:
response = self.session.post(
auth_url,
headers=headers,
timeout=self.timeout
)
if response.status_code == 200:
data = response.json()
self.token = data['data']['token']
# JWT tokens typically expire in 15 minutes
self.token_expires = datetime.now() + timedelta(minutes=14)
# Update session headers with token
self.session.headers['Authorization'] = f"Bearer {self.token}"
self.logger.info("Successfully authenticated with Wazuh API")
return True
else:
self.logger.error(f"Authentication failed: {response.status_code} - {response.text}")
return False
except requests.exceptions.RequestException as e:
self.logger.error(f"Authentication request failed: {e}")
return False
def _ensure_authenticated(self) -> bool:
"""Ensure we have a valid token"""
if not self.token or not self.token_expires:
return self.authenticate()
if datetime.now() >= self.token_expires:
self.logger.info("Token expired, re-authenticating")
return self.authenticate()
return True
def _make_request(self, method: str, endpoint: str,
params: Dict = None, data: Dict = None) -> Dict[str, Any]:
"""Make authenticated request to Wazuh API"""
if not self._ensure_authenticated():
raise Exception("Failed to authenticate")
url = f"{self.base_url}{endpoint}"
try:
response = self.session.request(
method=method,
url=url,
params=params,
json=data,
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if response.status_code == 401:
# Token might be invalid, try to re-authenticate
self.token = None
if self._ensure_authenticated():
# Retry the request with new token
response = self.session.request(
method=method,
url=url,
params=params,
json=data,
timeout=self.timeout
)
response.raise_for_status()
return response.json()
self.logger.error(f"HTTP error: {e}")
raise
except requests.exceptions.RequestException as e:
self.logger.error(f"Request failed: {e}")
raise
def get(self, endpoint: str, params: Dict = None) -> Dict[str, Any]:
"""Make GET request"""
return self._make_request('GET', endpoint, params=params)
def post(self, endpoint: str, data: Dict = None, params: Dict = None) -> Dict[str, Any]:
"""Make POST request"""
return self._make_request('POST', endpoint, params=params, data=data)
def put(self, endpoint: str, data: Dict = None, params: Dict = None) -> Dict[str, Any]:
"""Make PUT request"""
return self._make_request('PUT', endpoint, params=params, data=data)
def delete(self, endpoint: str, params: Dict = None) -> Dict[str, Any]:
"""Make DELETE request"""
return self._make_request('DELETE', endpoint, params=params)
# Convenience methods for common operations
def get_agents(self, **kwargs) -> List[Dict]:
"""Get list of agents"""
response = self.get('/agents', params=kwargs)
return response.get('data', {}).get('affected_items', [])
def get_agent(self, agent_id: str) -> Dict:
"""Get specific agent information"""
response = self.get(f'/agents/{agent_id}')
affected_items = response.get('data', {}).get('affected_items', [])
return affected_items[0] if affected_items else {}
def get_rules(self, **kwargs) -> List[Dict]:
"""Get security rules"""
response = self.get('/rules', params=kwargs)
return response.get('data', {}).get('affected_items', [])
def get_alerts(self, **kwargs) -> List[Dict]:
"""Get security alerts"""
response = self.get('/security/events', params=kwargs)
return response.get('data', {}).get('affected_items', [])
def restart_agent(self, agent_id: str) -> bool:
"""Restart specific agent"""
try:
self.put(f'/agents/{agent_id}/restart')
return True
except Exception as e:
self.logger.error(f"Failed to restart agent {agent_id}: {e}")
return False
def get_system_info(self) -> Dict:
"""Get system information"""
response = self.get('/manager/status')
return response.get('data', {})
def close(self):
"""Close the session"""
self.session.close()
# Usage example
def main():
# Configure logging
logging.basicConfig(level=logging.INFO)
# Initialize client
client = WazuhAPIClient(
base_url="https://wazuh-manager:55000",
username="wazuh",
password="wazuh",
verify_ssl=False
)
try:
# Authenticate
if not client.authenticate():
print("Authentication failed")
return
# Get system information
system_info = client.get_system_info()
print(f"Wazuh Manager Status: {system_info}")
# Get agents
agents = client.get_agents()
print(f"Found {len(agents)} agents")
for agent in agents[:5]: # Show first 5 agents
print(f"Agent {agent['id']}: {agent['name']} - {agent['status']}")
# Get recent alerts
alerts = client.get_alerts(limit=10)
print(f"\nRecent alerts: {len(alerts)}")
for alert in alerts:
print(f"Alert {alert.get('id', 'N/A')}: {alert.get('rule', {}).get('description', 'N/A')}")
except Exception as e:
print(f"Error: {e}")
finally:
client.close()
if __name__ == "__main__":
main()
JWT Token Management
Advanced Token Management
# advanced_token_manager.py - Advanced JWT token management
import jwt
import json
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa, padding
import hashlib
import secrets
class TokenManager:
def __init__(self, secret_key: str = None, algorithm: str = 'HS256'):
self.secret_key = secret_key or secrets.token_urlsafe(32)
self.algorithm = algorithm
self.active_tokens: Dict[str, Dict] = {}
self.blacklisted_tokens: set = set()
def generate_token(self, user_id: str, username: str, roles: list,
expires_minutes: int = 15, additional_claims: Dict = None) -> str:
"""Generate JWT token with user information"""
now = datetime.utcnow()
exp = now + timedelta(minutes=expires_minutes)
payload = {
'user_id': user_id,
'username': username,
'roles': roles,
'iat': now,
'exp': exp,
'jti': secrets.token_urlsafe(16), # JWT ID for tracking
'iss': 'wazuh-api', # Issuer
'aud': 'wazuh-client' # Audience
}
if additional_claims:
payload.update(additional_claims)
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
# Store token metadata
self.active_tokens[payload['jti']] = {
'user_id': user_id,
'username': username,
'created_at': now,
'expires_at': exp,
'roles': roles
}
return token
def validate_token(self, token: str) -> Optional[Dict[str, Any]]:
"""Validate JWT token and return payload"""
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
audience='wazuh-client',
issuer='wazuh-api'
)
# Check if token is blacklisted
jti = payload.get('jti')
if jti in self.blacklisted_tokens:
return None
# Check if token is in active tokens
if jti not in self.active_tokens:
return None
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def refresh_token(self, token: str, expires_minutes: int = 15) -> Optional[str]:
"""Refresh an existing token"""
payload = self.validate_token(token)
if not payload:
return None
# Generate new token with same claims
return self.generate_token(
user_id=payload['user_id'],
username=payload['username'],
roles=payload['roles'],
expires_minutes=expires_minutes
)
def revoke_token(self, token: str) -> bool:
"""Revoke a specific token"""
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
options={"verify_exp": False} # Don't verify expiration for revocation
)
jti = payload.get('jti')
if jti:
self.blacklisted_tokens.add(jti)
if jti in self.active_tokens:
del self.active_tokens[jti]
return True
except jwt.InvalidTokenError:
pass
return False
def revoke_user_tokens(self, user_id: str) -> int:
"""Revoke all tokens for a specific user"""
revoked_count = 0
tokens_to_remove = []
for jti, token_info in self.active_tokens.items():
if token_info['user_id'] == user_id:
self.blacklisted_tokens.add(jti)
tokens_to_remove.append(jti)
revoked_count += 1
for jti in tokens_to_remove:
del self.active_tokens[jti]
return revoked_count
def cleanup_expired_tokens(self) -> int:
"""Remove expired tokens from active tokens"""
now = datetime.utcnow()
expired_tokens = []
for jti, token_info in self.active_tokens.items():
if token_info['expires_at'] < now:
expired_tokens.append(jti)
for jti in expired_tokens:
del self.active_tokens[jti]
return len(expired_tokens)
def get_active_sessions(self, user_id: str = None) -> List[Dict]:
"""Get active sessions for a user or all users"""
sessions = []
now = datetime.utcnow()
for jti, token_info in self.active_tokens.items():
if token_info['expires_at'] > now: # Only active tokens
if user_id is None or token_info['user_id'] == user_id:
sessions.append({
'jti': jti,
'user_id': token_info['user_id'],
'username': token_info['username'],
'created_at': token_info['created_at'].isoformat(),
'expires_at': token_info['expires_at'].isoformat(),
'roles': token_info['roles']
})
return sessions
class APIKeyManager:
def __init__(self):
self.api_keys: Dict[str, Dict] = {}
def generate_api_key(self, user_id: str, username: str, roles: list,
description: str = "", expires_days: int = 365) -> str:
"""Generate long-lived API key"""
api_key = f"wazuh_api_{secrets.token_urlsafe(32)}"
self.api_keys[api_key] = {
'user_id': user_id,
'username': username,
'roles': roles,
'description': description,
'created_at': datetime.utcnow(),
'expires_at': datetime.utcnow() + timedelta(days=expires_days),
'last_used': None,
'usage_count': 0,
'active': True
}
return api_key
def validate_api_key(self, api_key: str) -> Optional[Dict]:
"""Validate API key and update usage statistics"""
if api_key not in self.api_keys:
return None
key_info = self.api_keys[api_key]
if not key_info['active']:
return None
if datetime.utcnow() > key_info['expires_at']:
return None
# Update usage statistics
key_info['last_used'] = datetime.utcnow()
key_info['usage_count'] += 1
return {
'user_id': key_info['user_id'],
'username': key_info['username'],
'roles': key_info['roles']
}
def revoke_api_key(self, api_key: str) -> bool:
"""Revoke an API key"""
if api_key in self.api_keys:
self.api_keys[api_key]['active'] = False
return True
return False
def list_api_keys(self, user_id: str = None) -> List[Dict]:
"""List API keys for a user or all users"""
keys = []
for api_key, key_info in self.api_keys.items():
if user_id is None or key_info['user_id'] == user_id:
keys.append({
'key': api_key[:16] + '...', # Partial key for security
'user_id': key_info['user_id'],
'username': key_info['username'],
'description': key_info['description'],
'created_at': key_info['created_at'].isoformat(),
'expires_at': key_info['expires_at'].isoformat(),
'last_used': key_info['last_used'].isoformat() if key_info['last_used'] else None,
'usage_count': key_info['usage_count'],
'active': key_info['active']
})
return keys
Role-Based Access Control (RBAC)
RBAC Implementation
# rbac_manager.py - Role-based access control for Wazuh API
from typing import Dict, List, Set, Optional
from enum import Enum
from dataclasses import dataclass, field
import re
class Permission(Enum):
# Agent permissions
AGENTS_READ = "agents:read"
AGENTS_CREATE = "agents:create"
AGENTS_UPDATE = "agents:update"
AGENTS_DELETE = "agents:delete"
AGENTS_RESTART = "agents:restart"
# Rules permissions
RULES_READ = "rules:read"
RULES_CREATE = "rules:create"
RULES_UPDATE = "rules:update"
RULES_DELETE = "rules:delete"
# Security events permissions
EVENTS_READ = "events:read"
EVENTS_CREATE = "events:create"
EVENTS_DELETE = "events:delete"
# System permissions
SYSTEM_READ = "system:read"
SYSTEM_UPDATE = "system:update"
SYSTEM_RESTART = "system:restart"
# User management permissions
USERS_READ = "users:read"
USERS_CREATE = "users:create"
USERS_UPDATE = "users:update"
USERS_DELETE = "users:delete"
# Role management permissions
ROLES_READ = "roles:read"
ROLES_CREATE = "roles:create"
ROLES_UPDATE = "roles:update"
ROLES_DELETE = "roles:delete"
# API key permissions
API_KEYS_READ = "api_keys:read"
API_KEYS_CREATE = "api_keys:create"
API_KEYS_DELETE = "api_keys:delete"
@dataclass
class Role:
name: str
description: str
permissions: Set[Permission] = field(default_factory=set)
resource_filters: Dict[str, List[str]] = field(default_factory=dict)
def add_permission(self, permission: Permission):
self.permissions.add(permission)
def remove_permission(self, permission: Permission):
self.permissions.discard(permission)
def has_permission(self, permission: Permission) -> bool:
return permission in self.permissions
def add_resource_filter(self, resource_type: str, resource_ids: List[str]):
"""Add resource-level filtering (e.g., specific agent IDs)"""
if resource_type not in self.resource_filters:
self.resource_filters[resource_type] = []
self.resource_filters[resource_type].extend(resource_ids)
def can_access_resource(self, resource_type: str, resource_id: str) -> bool:
"""Check if role can access specific resource"""
if resource_type not in self.resource_filters:
return True # No restrictions
allowed_resources = self.resource_filters[resource_type]
return '*' in allowed_resources or resource_id in allowed_resources
@dataclass
class User:
user_id: str
username: str
email: str
roles: Set[str] = field(default_factory=set)
active: bool = True
def add_role(self, role_name: str):
self.roles.add(role_name)
def remove_role(self, role_name: str):
self.roles.discard(role_name)
def has_role(self, role_name: str) -> bool:
return role_name in self.roles
class RBACManager:
def __init__(self):
self.roles: Dict[str, Role] = {}
self.users: Dict[str, User] = {}
self._create_default_roles()
def _create_default_roles(self):
"""Create default system roles"""
# Administrator role - full access
admin_role = Role(
name="administrator",
description="Full system administrator access",
permissions=set(Permission)
)
self.roles["administrator"] = admin_role
# Read-only role
readonly_role = Role(
name="readonly",
description="Read-only access to all resources",
permissions={
Permission.AGENTS_READ,
Permission.RULES_READ,
Permission.EVENTS_READ,
Permission.SYSTEM_READ,
Permission.USERS_READ,
Permission.ROLES_READ,
Permission.API_KEYS_READ
}
)
self.roles["readonly"] = readonly_role
# Agent manager role
agent_manager_role = Role(
name="agent_manager",
description="Manage agents and view events",
permissions={
Permission.AGENTS_READ,
Permission.AGENTS_CREATE,
Permission.AGENTS_UPDATE,
Permission.AGENTS_DELETE,
Permission.AGENTS_RESTART,
Permission.EVENTS_READ,
Permission.RULES_READ
}
)
self.roles["agent_manager"] = agent_manager_role
# Security analyst role
analyst_role = Role(
name="security_analyst",
description="Analyze security events and rules",
permissions={
Permission.AGENTS_READ,
Permission.RULES_READ,
Permission.RULES_CREATE,
Permission.RULES_UPDATE,
Permission.EVENTS_READ,
Permission.EVENTS_CREATE
}
)
self.roles["security_analyst"] = analyst_role
def create_role(self, name: str, description: str,
permissions: List[Permission] = None) -> Role:
"""Create a new role"""
role = Role(name=name, description=description)
if permissions:
role.permissions.update(permissions)
self.roles[name] = role
return role
def get_role(self, name: str) -> Optional[Role]:
"""Get role by name"""
return self.roles.get(name)
def delete_role(self, name: str) -> bool:
"""Delete a role"""
if name in self.roles:
# Remove role from all users
for user in self.users.values():
user.remove_role(name)
del self.roles[name]
return True
return False
def create_user(self, user_id: str, username: str, email: str,
roles: List[str] = None) -> User:
"""Create a new user"""
user = User(user_id=user_id, username=username, email=email)
if roles:
for role_name in roles:
if role_name in self.roles:
user.add_role(role_name)
self.users[user_id] = user
return user
def get_user(self, user_id: str) -> Optional[User]:
"""Get user by ID"""
return self.users.get(user_id)
def get_user_permissions(self, user_id: str) -> Set[Permission]:
"""Get all permissions for a user"""
user = self.get_user(user_id)
if not user or not user.active:
return set()
permissions = set()
for role_name in user.roles:
role = self.get_role(role_name)
if role:
permissions.update(role.permissions)
return permissions
def user_has_permission(self, user_id: str, permission: Permission) -> bool:
"""Check if user has specific permission"""
permissions = self.get_user_permissions(user_id)
return permission in permissions
def user_can_access_resource(self, user_id: str, resource_type: str,
resource_id: str) -> bool:
"""Check if user can access specific resource"""
user = self.get_user(user_id)
if not user or not user.active:
return False
for role_name in user.roles:
role = self.get_role(role_name)
if role and role.can_access_resource(resource_type, resource_id):
return True
return False
def check_endpoint_access(self, user_id: str, method: str,
endpoint: str, resource_id: str = None) -> bool:
"""Check if user can access specific API endpoint"""
user = self.get_user(user_id)
if not user or not user.active:
return False
# Map endpoints to required permissions
endpoint_permissions = {
# Agent endpoints
r'^/agents$': {
'GET': Permission.AGENTS_READ,
'POST': Permission.AGENTS_CREATE
},
r'^/agents/\d+$': {
'GET': Permission.AGENTS_READ,
'PUT': Permission.AGENTS_UPDATE,
'DELETE': Permission.AGENTS_DELETE
},
r'^/agents/\d+/restart$': {
'PUT': Permission.AGENTS_RESTART
},
# Rules endpoints
r'^/rules$': {
'GET': Permission.RULES_READ,
'POST': Permission.RULES_CREATE
},
r'^/rules/\d+$': {
'GET': Permission.RULES_READ,
'PUT': Permission.RULES_UPDATE,
'DELETE': Permission.RULES_DELETE
},
# Events endpoints
r'^/security/events$': {
'GET': Permission.EVENTS_READ,
'POST': Permission.EVENTS_CREATE
},
# System endpoints
r'^/manager/status$': {
'GET': Permission.SYSTEM_READ
},
r'^/manager/restart$': {
'PUT': Permission.SYSTEM_RESTART
},
# User management endpoints
r'^/security/users$': {
'GET': Permission.USERS_READ,
'POST': Permission.USERS_CREATE
},
r'^/security/users/\d+$': {
'GET': Permission.USERS_READ,
'PUT': Permission.USERS_UPDATE,
'DELETE': Permission.USERS_DELETE
}
}
# Find matching endpoint pattern
required_permission = None
for pattern, methods in endpoint_permissions.items():
if re.match(pattern, endpoint):
required_permission = methods.get(method.upper())
break
if not required_permission:
return False # Endpoint not found or method not allowed
# Check if user has required permission
if not self.user_has_permission(user_id, required_permission):
return False
# Check resource-level access if resource_id is provided
if resource_id:
resource_type = self._extract_resource_type(endpoint)
if resource_type and not self.user_can_access_resource(
user_id, resource_type, resource_id
):
return False
return True
def _extract_resource_type(self, endpoint: str) -> Optional[str]:
"""Extract resource type from endpoint"""
if '/agents' in endpoint:
return 'agents'
elif '/rules' in endpoint:
return 'rules'
elif '/events' in endpoint:
return 'events'
elif '/users' in endpoint:
return 'users'
return None
def export_user_permissions(self, user_id: str) -> Dict:
"""Export user permissions for auditing"""
user = self.get_user(user_id)
if not user:
return {}
permissions = self.get_user_permissions(user_id)
return {
'user_id': user.user_id,
'username': user.username,
'email': user.email,
'active': user.active,
'roles': list(user.roles),
'permissions': [p.value for p in permissions],
'resource_filters': {
role_name: self.get_role(role_name).resource_filters
for role_name in user.roles
if self.get_role(role_name)
}
}
# RBAC Decorator for API endpoints
class require_permission:
def __init__(self, permission: Permission):
self.permission = permission
def __call__(self, func):
def wrapper(*args, **kwargs):
# Extract user_id from request context
# This would typically come from the JWT token
user_id = kwargs.get('user_id') or getattr(args[0], 'current_user_id', None)
if not user_id:
raise PermissionError("Authentication required")
rbac_manager = kwargs.get('rbac_manager') or getattr(args[0], 'rbac_manager', None)
if not rbac_manager:
raise RuntimeError("RBAC manager not available")
if not rbac_manager.user_has_permission(user_id, self.permission):
raise PermissionError(f"Permission denied: {self.permission.value}")
return func(*args, **kwargs)
return wrapper
# Usage example
def example_rbac_usage():
rbac = RBACManager()
# Create users
admin_user = rbac.create_user("1", "admin", "admin@example.com", ["administrator"])
analyst_user = rbac.create_user("2", "analyst", "analyst@example.com", ["security_analyst"])
readonly_user = rbac.create_user("3", "viewer", "viewer@example.com", ["readonly"])
# Test permissions
print(f"Admin can delete agents: {rbac.user_has_permission('1', Permission.AGENTS_DELETE)}")
print(f"Analyst can delete agents: {rbac.user_has_permission('2', Permission.AGENTS_DELETE)}")
print(f"Viewer can read agents: {rbac.user_has_permission('3', Permission.AGENTS_READ)}")
# Test endpoint access
print(f"Admin can POST /agents: {rbac.check_endpoint_access('1', 'POST', '/agents')}")
print(f"Analyst can POST /agents: {rbac.check_endpoint_access('2', 'POST', '/agents')}")
print(f"Viewer can GET /agents: {rbac.check_endpoint_access('3', 'GET', '/agents')}")
# Export permissions for auditing
admin_perms = rbac.export_user_permissions('1')
print(f"\nAdmin permissions: {len(admin_perms['permissions'])} total")
if __name__ == "__main__":
example_rbac_usage()
API Authentication Middleware
Flask/FastAPI Middleware Implementation
# auth_middleware.py - Authentication middleware for Wazuh API
from functools import wraps
from flask import Flask, request, jsonify, g
from typing import Dict, Any, Optional, Callable
import logging
class WazuhAuthMiddleware:
def __init__(self, app: Flask, token_manager, rbac_manager, api_key_manager):
self.app = app
self.token_manager = token_manager
self.rbac_manager = rbac_manager
self.api_key_manager = api_key_manager
self.logger = logging.getLogger(__name__)
# Register middleware
app.before_request(self.authenticate_request)
def authenticate_request(self):
"""Authenticate incoming requests"""
# Skip authentication for certain endpoints
if self._should_skip_auth():
return
auth_header = request.headers.get('Authorization')
if not auth_header:
return self._unauthorized("Authorization header required")
# Extract token from header
token = self._extract_token(auth_header)
if not token:
return self._unauthorized("Invalid authorization header")
# Authenticate based on token type
user_info = None
if auth_header.startswith('Bearer '):
# JWT Token authentication
user_info = self._authenticate_jwt(token)
elif auth_header.startswith('ApiKey '):
# API Key authentication
user_info = self._authenticate_api_key(token)
elif auth_header.startswith('Basic '):
# Basic authentication (for initial token acquisition)
return # Handle in specific endpoint
else:
return self._unauthorized("Unsupported authentication method")
if not user_info:
return self._unauthorized("Invalid or expired token")
# Store user info in request context
g.current_user = user_info
# Check endpoint authorization
if not self._authorize_request(user_info['user_id']):
return self._forbidden("Insufficient permissions")
def _should_skip_auth(self) -> bool:
"""Check if authentication should be skipped for this endpoint"""
skip_endpoints = [
'/security/user/authenticate',
'/health',
'/version'
]
return request.endpoint in skip_endpoints or request.path in skip_endpoints
def _extract_token(self, auth_header: str) -> Optional[str]:
"""Extract token from authorization header"""
try:
scheme, token = auth_header.split(' ', 1)
return token
except ValueError:
return None
def _authenticate_jwt(self, token: str) -> Optional[Dict[str, Any]]:
"""Authenticate JWT token"""
payload = self.token_manager.validate_token(token)
if payload:
return {
'user_id': payload['user_id'],
'username': payload['username'],
'roles': payload['roles'],
'auth_method': 'jwt'
}
return None
def _authenticate_api_key(self, api_key: str) -> Optional[Dict[str, Any]]:
"""Authenticate API key"""
key_info = self.api_key_manager.validate_api_key(api_key)
if key_info:
return {
'user_id': key_info['user_id'],
'username': key_info['username'],
'roles': key_info['roles'],
'auth_method': 'api_key'
}
return None
def _authorize_request(self, user_id: str) -> bool:
"""Check if user is authorized for this endpoint"""
method = request.method
endpoint = request.path
# Extract resource ID from path if present
resource_id = self._extract_resource_id(endpoint)
return self.rbac_manager.check_endpoint_access(
user_id, method, endpoint, resource_id
)
def _extract_resource_id(self, endpoint: str) -> Optional[str]:
"""Extract resource ID from endpoint path"""
import re
# Match patterns like /agents/123, /users/456, etc.
match = re.search(r'/(\w+)/(\d+)', endpoint)
if match:
return match.group(2)
return None
def _unauthorized(self, message: str):
"""Return unauthorized response"""
self.logger.warning(f"Unauthorized access attempt: {message}")
return jsonify({
'error': 401,
'message': message
}), 401
def _forbidden(self, message: str):
"""Return forbidden response"""
self.logger.warning(f"Forbidden access attempt: {message}")
return jsonify({
'error': 403,
'message': message
}), 403
# Decorator for additional endpoint-specific authorization
def require_role(role_name: str):
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
if not hasattr(g, 'current_user'):
return jsonify({'error': 401, 'message': 'Authentication required'}), 401
user_roles = g.current_user.get('roles', [])
if role_name not in user_roles:
return jsonify({
'error': 403,
'message': f'Role {role_name} required'
}), 403
return func(*args, **kwargs)
return wrapper
return decorator
def require_permissions(*permissions):
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
if not hasattr(g, 'current_user'):
return jsonify({'error': 401, 'message': 'Authentication required'}), 401
# This would need access to RBAC manager instance
# Implementation depends on how you inject dependencies
return func(*args, **kwargs)
return wrapper
return decorator
# Example Flask application
def create_wazuh_api_app():
app = Flask(__name__)
# Initialize managers (in real app, these would be properly configured)
from your_auth_modules import TokenManager, RBACManager, APIKeyManager
token_manager = TokenManager()
rbac_manager = RBACManager()
api_key_manager = APIKeyManager()
# Initialize middleware
auth_middleware = WazuhAuthMiddleware(
app, token_manager, rbac_manager, api_key_manager
)
@app.route('/security/user/authenticate', methods=['POST'])
def authenticate():
# Handle basic authentication and return JWT token
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Basic '):
return jsonify({'error': 400, 'message': 'Basic authentication required'}), 400
# Decode basic auth credentials
import base64
try:
credentials = base64.b64decode(auth_header[6:]).decode('utf-8')
username, password = credentials.split(':', 1)
except (ValueError, UnicodeDecodeError):
return jsonify({'error': 400, 'message': 'Invalid credentials format'}), 400
# Validate credentials (implement your own logic)
user = validate_user_credentials(username, password)
if not user:
return jsonify({'error': 401, 'message': 'Invalid credentials'}), 401
# Generate JWT token
token = token_manager.generate_token(
user_id=user['id'],
username=user['username'],
roles=user['roles']
)
return jsonify({
'data': {
'token': token,
'user': {
'id': user['id'],
'username': user['username'],
'roles': user['roles']
}
}
})
@app.route('/agents', methods=['GET'])
def get_agents():
# This endpoint automatically requires authentication via middleware
return jsonify({
'data': {
'affected_items': [
{'id': '001', 'name': 'agent1', 'status': 'active'},
{'id': '002', 'name': 'agent2', 'status': 'active'}
],
'total_affected_items': 2
}
})
@app.route('/agents', methods=['POST'])
@require_role('administrator')
def create_agent():
# Only administrators can create agents
return jsonify({
'data': {
'affected_items': [{'id': '003', 'name': 'new_agent'}],
'total_affected_items': 1
}
})
@app.route('/security/users/<user_id>/api-keys', methods=['POST'])
@require_role('administrator')
def create_api_key(user_id):
data = request.get_json()
# Generate API key
api_key = api_key_manager.generate_api_key(
user_id=user_id,
username=data.get('username'),
roles=data.get('roles', []),
description=data.get('description', ''),
expires_days=data.get('expires_days', 365)
)
return jsonify({
'data': {
'api_key': api_key,
'message': 'API key created successfully'
}
})
return app
def validate_user_credentials(username: str, password: str) -> Optional[Dict]:
"""Validate user credentials - implement your own logic"""
# This is a placeholder - implement actual credential validation
if username == "admin" and password == "admin":
return {
'id': '1',
'username': 'admin',
'roles': ['administrator']
}
return None
if __name__ == '__main__':
app = create_wazuh_api_app()
app.run(debug=True, host='0.0.0.0', port=55000)
Security Best Practices
Security Implementation Guidelines
# security_utils.py - Security utilities and best practices
import hashlib
import secrets
import time
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import re
import ipaddress
class SecurityConfig:
# Token settings
JWT_EXPIRY_MINUTES = 15
REFRESH_TOKEN_EXPIRY_DAYS = 30
API_KEY_EXPIRY_DAYS = 365
# Rate limiting
MAX_LOGIN_ATTEMPTS = 5
LOCKOUT_DURATION_MINUTES = 30
# Password policy
MIN_PASSWORD_LENGTH = 12
REQUIRE_UPPERCASE = True
REQUIRE_LOWERCASE = True
REQUIRE_NUMBERS = True
REQUIRE_SPECIAL_CHARS = True
# Session security
MAX_CONCURRENT_SESSIONS = 5
SESSION_TIMEOUT_MINUTES = 60
# Network security
ALLOWED_IP_RANGES = [
'10.0.0.0/8',
'172.16.0.0/12',
'192.168.0.0/16'
]
# Audit settings
LOG_ALL_REQUESTS = True
LOG_SENSITIVE_DATA = False
AUDIT_RETENTION_DAYS = 90
class PasswordValidator:
@staticmethod
def validate_password(password: str) -> Dict[str, bool]:
"""Validate password against security policy"""
results = {
'length': len(password) >= SecurityConfig.MIN_PASSWORD_LENGTH,
'uppercase': bool(re.search(r'[A-Z]', password)) if SecurityConfig.REQUIRE_UPPERCASE else True,
'lowercase': bool(re.search(r'[a-z]', password)) if SecurityConfig.REQUIRE_LOWERCASE else True,
'numbers': bool(re.search(r'\d', password)) if SecurityConfig.REQUIRE_NUMBERS else True,
'special_chars': bool(re.search(r'[!@#$%^&*(),.?":{}|<>]', password)) if SecurityConfig.REQUIRE_SPECIAL_CHARS else True,
'common_patterns': not PasswordValidator._has_common_patterns(password)
}
results['valid'] = all(results.values())
return results
@staticmethod
def _has_common_patterns(password: str) -> bool:
"""Check for common weak patterns"""
common_patterns = [
r'123456',
r'password',
r'admin',
r'qwerty',
r'(.)\1{3,}', # Repeated characters
r'(\d{4,})', # Sequential numbers
]
password_lower = password.lower()
for pattern in common_patterns:
if re.search(pattern, password_lower):
return True
return False
@staticmethod
def hash_password(password: str) -> str:
"""Hash password using secure method"""
import bcrypt
salt = bcrypt.gensalt(rounds=12)
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
@staticmethod
def verify_password(password: str, hash: str) -> bool:
"""Verify password against hash"""
import bcrypt
return bcrypt.checkpw(password.encode('utf-8'), hash.encode('utf-8'))
class RateLimiter:
def __init__(self):
self.attempts: Dict[str, List[float]] = {}
self.lockouts: Dict[str, float] = {}
def is_rate_limited(self, identifier: str) -> bool:
"""Check if identifier is rate limited"""
current_time = time.time()
# Check if currently locked out
if identifier in self.lockouts:
lockout_end = self.lockouts[identifier] + (SecurityConfig.LOCKOUT_DURATION_MINUTES * 60)
if current_time < lockout_end:
return True
else:
# Lockout expired
del self.lockouts[identifier]
if identifier in self.attempts:
del self.attempts[identifier]
# Check recent attempts
if identifier in self.attempts:
# Remove attempts older than 1 hour
cutoff_time = current_time - 3600
self.attempts[identifier] = [
attempt_time for attempt_time in self.attempts[identifier]
if attempt_time > cutoff_time
]
# Check if too many attempts
if len(self.attempts[identifier]) >= SecurityConfig.MAX_LOGIN_ATTEMPTS:
self.lockouts[identifier] = current_time
return True
return False
def record_attempt(self, identifier: str, success: bool = False):
"""Record login attempt"""
current_time = time.time()
if success:
# Clear attempts on successful login
if identifier in self.attempts:
del self.attempts[identifier]
if identifier in self.lockouts:
del self.lockouts[identifier]
else:
# Record failed attempt
if identifier not in self.attempts:
self.attempts[identifier] = []
self.attempts[identifier].append(current_time)
class NetworkSecurity:
@staticmethod
def is_ip_allowed(ip_address: str) -> bool:
"""Check if IP address is in allowed ranges"""
try:
ip = ipaddress.ip_address(ip_address)
for allowed_range in SecurityConfig.ALLOWED_IP_RANGES:
if ip in ipaddress.ip_network(allowed_range):
return True
return False
except ValueError:
return False
@staticmethod
def get_client_ip(request) -> str:
"""Get real client IP from request headers"""
# Check for forwarded headers
forwarded_ips = request.headers.get('X-Forwarded-For')
if forwarded_ips:
# Take the first IP in the chain
return forwarded_ips.split(',')[0].strip()
real_ip = request.headers.get('X-Real-IP')
if real_ip:
return real_ip
return request.remote_addr
class AuditLogger:
def __init__(self):
self.audit_log: List[Dict] = []
def log_authentication_event(self, user_id: str, username: str,
event_type: str, success: bool,
ip_address: str, user_agent: str = None,
additional_data: Dict = None):
"""Log authentication events"""
event = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': event_type,
'user_id': user_id,
'username': username,
'success': success,
'ip_address': ip_address,
'user_agent': user_agent,
'additional_data': additional_data or {}
}
self.audit_log.append(event)
# In production, send to SIEM or log aggregation system
self._send_to_siem(event)
def log_api_access(self, user_id: str, method: str, endpoint: str,
status_code: int, ip_address: str,
response_time_ms: float = None):
"""Log API access events"""
if not SecurityConfig.LOG_ALL_REQUESTS:
return
event = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': 'api_access',
'user_id': user_id,
'method': method,
'endpoint': endpoint,
'status_code': status_code,
'ip_address': ip_address,
'response_time_ms': response_time_ms
}
self.audit_log.append(event)
self._send_to_siem(event)
def log_security_event(self, event_type: str, severity: str,
description: str, user_id: str = None,
ip_address: str = None, additional_data: Dict = None):
"""Log security events"""
event = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': 'security_event',
'security_event_type': event_type,
'severity': severity,
'description': description,
'user_id': user_id,
'ip_address': ip_address,
'additional_data': additional_data or {}
}
self.audit_log.append(event)
self._send_to_siem(event)
def _send_to_siem(self, event: Dict):
"""Send event to SIEM system"""
# Implement SIEM integration
# This could be Wazuh itself, Splunk, ELK stack, etc.
pass
def get_security_events(self, start_time: datetime = None,
end_time: datetime = None,
event_type: str = None) -> List[Dict]:
"""Retrieve security events for analysis"""
events = self.audit_log
if start_time:
events = [e for e in events if datetime.fromisoformat(e['timestamp']) >= start_time]
if end_time:
events = [e for e in events if datetime.fromisoformat(e['timestamp']) <= end_time]
if event_type:
events = [e for e in events if e.get('event_type') == event_type]
return events
class SessionManager:
def __init__(self):
self.active_sessions: Dict[str, Dict] = {}
def create_session(self, user_id: str, token_id: str, ip_address: str,
user_agent: str = None) -> str:
"""Create new session"""
session_id = secrets.token_urlsafe(32)
# Check for too many concurrent sessions
user_sessions = [s for s in self.active_sessions.values()
if s['user_id'] == user_id]
if len(user_sessions) >= SecurityConfig.MAX_CONCURRENT_SESSIONS:
# Remove oldest session
oldest_session = min(user_sessions, key=lambda s: s['created_at'])
self.terminate_session(oldest_session['session_id'])
session = {
'session_id': session_id,
'user_id': user_id,
'token_id': token_id,
'ip_address': ip_address,
'user_agent': user_agent,
'created_at': datetime.utcnow(),
'last_activity': datetime.utcnow(),
'active': True
}
self.active_sessions[session_id] = session
return session_id
def update_session_activity(self, session_id: str) -> bool:
"""Update session last activity"""
if session_id in self.active_sessions:
self.active_sessions[session_id]['last_activity'] = datetime.utcnow()
return True
return False
def terminate_session(self, session_id: str) -> bool:
"""Terminate specific session"""
if session_id in self.active_sessions:
del self.active_sessions[session_id]
return True
return False
def terminate_user_sessions(self, user_id: str) -> int:
"""Terminate all sessions for a user"""
sessions_to_remove = [
sid for sid, session in self.active_sessions.items()
if session['user_id'] == user_id
]
for session_id in sessions_to_remove:
del self.active_sessions[session_id]
return len(sessions_to_remove)
def cleanup_expired_sessions(self) -> int:
"""Remove expired sessions"""
timeout = timedelta(minutes=SecurityConfig.SESSION_TIMEOUT_MINUTES)
cutoff_time = datetime.utcnow() - timeout
expired_sessions = [
sid for sid, session in self.active_sessions.items()
if session['last_activity'] < cutoff_time
]
for session_id in expired_sessions:
del self.active_sessions[session_id]
return len(expired_sessions)
# Example usage
def example_security_implementation():
# Password validation
password_result = PasswordValidator.validate_password("WeakPass123!")
print(f"Password validation: {password_result}")
# Rate limiting
rate_limiter = RateLimiter()
user_ip = "192.168.1.100"
for i in range(6):
if rate_limiter.is_rate_limited(user_ip):
print(f"Attempt {i+1}: Rate limited")
break
else:
print(f"Attempt {i+1}: Allowed")
rate_limiter.record_attempt(user_ip, success=False)
# Network security
allowed = NetworkSecurity.is_ip_allowed("192.168.1.100")
print(f"IP 192.168.1.100 allowed: {allowed}")
# Audit logging
audit_logger = AuditLogger()
audit_logger.log_authentication_event(
user_id="1",
username="admin",
event_type="login",
success=True,
ip_address="192.168.1.100",
user_agent="WazuhAPIClient/1.0"
)
print(f"Audit events: {len(audit_logger.audit_log)}")
if __name__ == "__main__":
example_security_implementation()
Conclusion
Implementing secure authentication for Wazuh API requires:
- Multiple Authentication Methods: Support JWT tokens, API keys, and basic auth
- Strong RBAC: Implement granular role-based access control
- Security Best Practices: Rate limiting, password policies, audit logging
- Token Management: Proper token lifecycle management and revocation
- Monitoring: Comprehensive audit logging and security event tracking
Key security considerations:
- Always use HTTPS in production
- Implement proper token expiration and refresh mechanisms
- Use strong password policies and hashing
- Monitor and log all authentication events
- Implement rate limiting to prevent brute force attacks
- Regular security audits and token cleanup
This implementation provides a robust foundation for secure Wazuh API access while maintaining flexibility for different use cases and integration requirements.