Wazuh Vulnerability Dashboard Tool - Python Script for API Integration
This tool replaces the vulnerability dashboard in Wazuh by connecting to the Wazuh manager API, retrieving vulnerability data, and displaying it with various filtering options.
Python Script
"""Tool to replace vulnerability dashboard in Wazuh.
Connects to Wazuh manager API, retrieves vulnerability data, and displays it.
Ensure server socket and credentials under Configuration are correct!
Output can be filtered by using the commandline switches:
-n , --name : Only return data for specified package
-s , --severity : Only return vulnerabilties of that severity
-c , --cve : Only return vulnerabilities with that CVE
-g , --group : Only return vulnerabilities for group members
--short : Only display summary of vulnerabilties
Use caution when supplying arguments as no sanitizing is done!
Referenced from:
https://documentation.wazuh.com/current/user-manual/api/index.html
Written in Python 3.10 using Visual Studio Code.
Stephen Vose 2022
"""
def get_args() -> tuple[str, str, bool]:
"""Take arguments from command line for filtering output."""
import argparse
parser = argparse.ArgumentParser(
usage='vuln_dash.py [-h] [-n, --name NAME] [-s, --severity SEVERITY]\n\
[-c, --cve CVE] [-g, --group GROUP] [--short]')
parser.add_argument('-n', '--name',
default="",
dest='name_arg',
help='Filter output by package name',
type=str
)
parser.add_argument('-s', '--severity',
default="",
dest='sev_arg',
help='Filter output by severity',
type=str
)
parser.add_argument('-c', '--cve',
default="",
dest='cve_arg',
help='Filter output by CVE',
type=str
)
parser.add_argument('-g', '--group',
default="",
dest='group_arg',
help='Filter agents by group',
type=str
)
parser.add_argument('--short',
action='store_true',
help='Return only summary of agent vulnerabilities',
)
args = parser.parse_args()
# Format string for appending to API request
if args.name_arg != "":
arg_name = str('&name=' + args.name_arg)
else:
arg_name = ""
if args.sev_arg != "":
arg_sev = str('&severity=' + args.sev_arg)
else:
arg_sev = ""
if args.cve_arg != "":
arg_cve = str('&cve=' + args.cve_arg)
else:
arg_cve = ""
if args.group_arg != "":
arg_group = str('&group=' + args.group_arg)
else:
arg_group = ""
arg_string = arg_name + arg_sev + arg_cve
arg_short = bool(args.short)
return arg_string, arg_group, arg_short
def api_calls(filters: str, group: str) -> tuple[list, list, dict]:
""" Set up and make API calls to server for retrieval of information."""
import json
from base64 import b64encode
try:
import requests
import urllib3
except ModuleNotFoundError:
print('Error: ensure modules "urllib3" and "requests" are installed')
exit()
# Disable insecure https warnings (for self-signed SSL certificates)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Configuration
protocol = 'https'
host = 'X.X.X.X' # Supply server address
port = 55000
user = 'XXXXX' # Supply API credentials
password = 'XXXXX'
login_endpoint = 'security/user/authenticate'
login_url = f"{protocol}://{host}:{port}/{login_endpoint}"
basic_auth = f"{user}:{password}".encode()
login_headers = {'Content-Type': 'application/json',
'Authorization': f'Basic {b64encode(basic_auth).decode()}'
}
# API call with login to get auth token
try:
response = requests.get(login_url, headers=login_headers, verify=False)
response.raise_for_status()
token = json.loads(response.content.decode())['data']['token']
# New authorization header with the JWT token we got
requests_headers = {'Content-Type': 'application/json',
'Authorization': f'Bearer {token}'}
# Retrieve list of active agents from the server for querying vulns,
# filter output by group if specified.
login_endpoint = f"agents/?status=active&select=id,name{group}"
login_url = f"{protocol}://{host}:{port}/{login_endpoint}"
response = requests.get(login_url,
headers=requests_headers, verify=False)
response.raise_for_status()
jlist = json.loads(response.content.decode())
active_agents = []
agent_names = []
for i in jlist['data']['affected_items']:
active_agents.append(i['id'])
agent_names.append(i['name'])
# Retrieve list of vulnerabilities for each active agent, filtering
# with any supplied arguments; format {&argtype=[arg]}
vulns = {}
for k in active_agents:
response = requests.get(
f"{protocol}://{host}:{port}/vulnerability/{k}?limit=10000\
&select=cve,name,severity,condition{filters}",
headers=requests_headers, verify=False
)
response.raise_for_status()
vulns[k] = json.loads(response.content.decode())
# Catch problems talking to server
except requests.exceptions.HTTPError:
print("An error occurred: %s, %s" % (
json.loads(response.content)['title'],
json.loads(response.content)['detail']))
quit()
except requests.exceptions.ConnectionError as e:
print("Connection error:", str(e))
quit()
return active_agents, agent_names, vulns
class ProcessInfo():
"""Class to extract and compile a specific type of data from bulk
vulnerability information.
"""
# Create class-specific variables.
def __init__(self, agent_list: list, vuln_dict: dict, type_filter: str):
self.type_filter = type_filter
self.item_tuple = self.process_info(agent_list, vuln_dict)
def process_info(self, agent_list, vuln_dict) -> list:
"""Process vulnerability metadata from each
agent and compile a list of unique entries, with counts.
"""
item_compendium = []
item_occurrence = []
for agent in agent_list:
for vuln in vuln_dict[agent]['data']['affected_items']:
# Append vulnerability condition to name results.
if self.type_filter == 'name':
item_compendium.append(vuln[self.type_filter] +
', ' + vuln['condition'])
else:
item_compendium.append(vuln[self.type_filter])
item_header = set(item_compendium)
for item in item_header:
item_occurrence.append(item_compendium.count(item))
item_tuple = tuple(zip(item_occurrence, item_header))
# End program early if no matches detected
if len(item_tuple) == 0:
print('No results found.')
exit()
return sorted(item_tuple, reverse=True)
def print_results(self, arg_short: bool) -> None:
"""Format and display compiled information."""
if not arg_short:
print(f'\nList of results by {self.type_filter}:\n')
for s in self.item_tuple:
print(*s, sep='\t')
else:
print(f'\nTop results by {self.type_filter}:\n')
for s in self.item_tuple[:25:]:
print(*s, sep='\t')
print('\nTotal number detected:', len(self.item_tuple))
def agent_counts(agent_list: list, names_list: list, vuln_dict: dict) -> None:
"""Shows results per agent, separated by severity."""
print('Agent\tCrit\tHigh\tMedium\tLow\tTotal\tName')
for a, n in zip(agent_list, names_list):
sev_comp = []
sev_list = []
for vuln in vuln_dict[a]['data']['affected_items']:
sev_comp.append(vuln['severity'])
sev_list.append(a)
sev_list.append(sev_comp.count('Critical'))
sev_list.append(sev_comp.count('High'))
sev_list.append(sev_comp.count('Medium'))
sev_list.append(sev_comp.count('Low'))
sev_list.append(vuln_dict[a]['data']['total_affected_items'])
sev_list.append(n)
print(*sev_list, sep='\t')
def main():
# Interpret command-line switches
arg_string, arg_group, arg_short = get_args()
# Function to retrieve and process data from server API
list_of_agents, names, dict_of_vulns = api_calls(arg_string, arg_group)
# Build classes to store each type of data separately
sev_occurrence = ProcessInfo(list_of_agents, dict_of_vulns, 'severity')
name_occurrence = ProcessInfo(list_of_agents, dict_of_vulns, 'name')
cve_occurrence = ProcessInfo(list_of_agents, dict_of_vulns, 'cve')
# Output data by type
print('\nList of agent IDs and number of vulnerabilities on each:\n')
if not arg_short:
agent_counts(list_of_agents, names, dict_of_vulns)
else: # Print reduced results for '--short'
for a, n in zip(list_of_agents, names):
num = dict_of_vulns[a]['data']['total_affected_items']
if num != 0:
print(f'{a}\t{num}\t{n}')
for result in [sev_occurrence, name_occurrence, cve_occurrence]:
result.print_results(arg_short)
if __name__ == "__main__":
main()
Usage Guide
To use this vulnerability dashboard tool for Wazuh, follow these steps:
1. Prerequisites
Ensure you have Python 3.10 installed on your system.
2. Install Dependencies
Install the required dependencies:
- requests
- urllib3
You can install these using pip:
pip install requests urllib3
3. Configure the Script
Set up the configuration by modifying these values in the script:
- Replace
X.X.X.X
with your Wazuh manager’s IP address - Replace
XXXXX
with your API username and password in the respective fields
4. Save the Script
Save the script as vuln_dash.py
(or any preferred name).
5. Run the Script
Run the script from the command line. You can use various options to filter and customize the output:
Basic usage:
python vuln_dash.py
Filter by package name:
python vuln_dash.py -n packagename
Filter by severity:
python vuln_dash.py -s Critical
Filter by CVE:
python vuln_dash.py -c CVE-2023-12345
Filter by agent group:
python vuln_dash.py -g groupname
Display only a summary:
python vuln_dash.py --short
Combine multiple filters:
python vuln_dash.py -n packagename -s High -g groupname --short
6. Understanding the Output
The script will connect to your Wazuh manager, retrieve vulnerability data, and display the results based on your specified filters.
The output will include:
- A list of agent IDs and the number of vulnerabilities on each
- Vulnerability occurrences grouped by severity
- Vulnerability occurrences grouped by package name
- Vulnerability occurrences grouped by CVE
Important Notes
- Use caution when supplying arguments, as the script doesn’t sanitize inputs
- Ensure you have the necessary permissions to access the Wazuh API
- The script disables SSL warnings for self-signed certificates
Features
- Flexible Filtering: Filter vulnerabilities by package name, severity, CVE, or agent group
- Summary View: Use
--short
flag for a condensed view of results - Comprehensive Output: View vulnerabilities organized by severity, package name, and CVE
- API Integration: Direct integration with Wazuh Manager API for real-time data
- Agent-based Analysis: View vulnerability counts per agent with severity breakdown
This tool can be very useful for quickly assessing the vulnerability status across your Wazuh-monitored environment, especially when you need to focus on specific types of vulnerabilities or particular agent groups.