Skip to content

Wazuh Vulnerability Dashboard Tool - Python Script for API Integration

Published: at 07:00 AM

Wazuh Vulnerability Dashboard Tool - Python Script for API Integration

This tool replaces the vulnerability dashboard in Wazuh by connecting to the Wazuh manager API, retrieving vulnerability data, and displaying it with various filtering options.

Python Script

"""Tool to replace vulnerability dashboard in Wazuh.
Connects to Wazuh manager API, retrieves vulnerability data, and displays it.
Ensure server socket and credentials under Configuration are correct!
Output can be filtered by using the commandline switches:
-n , --name :     Only return data for specified package
-s , --severity : Only return vulnerabilties of that severity
-c , --cve :      Only return vulnerabilities with that CVE
-g , --group :    Only return vulnerabilities for group members
--short :         Only display summary of vulnerabilties
Use caution when supplying arguments as no sanitizing is done!
Referenced from:
https://documentation.wazuh.com/current/user-manual/api/index.html
Written in Python 3.10 using Visual Studio Code.
Stephen Vose 2022
"""


def get_args() -> tuple[str, str, bool]:
    """Take arguments from command line for filtering output."""
    import argparse
    parser = argparse.ArgumentParser(
    usage='vuln_dash.py [-h] [-n, --name NAME] [-s, --severity SEVERITY]\n\
[-c, --cve CVE] [-g, --group GROUP] [--short]')
    parser.add_argument('-n', '--name',
                        default="",
                        dest='name_arg',
                        help='Filter output by package name',
                        type=str
                        )
    parser.add_argument('-s', '--severity',
                        default="",
                        dest='sev_arg',
                        help='Filter output by severity',
                        type=str
                        )
    parser.add_argument('-c', '--cve',
                        default="",
                        dest='cve_arg',
                        help='Filter output by CVE',
                        type=str
                        )
    parser.add_argument('-g', '--group',
                        default="",
                        dest='group_arg',
                        help='Filter agents by group',
                        type=str
                        )
    parser.add_argument('--short',
                        action='store_true',
                        help='Return only summary of agent vulnerabilities',
                        )
    args = parser.parse_args()
    # Format string for appending to API request
    if args.name_arg != "":
        arg_name = str('&name=' + args.name_arg)
    else:
        arg_name = ""
    if args.sev_arg != "":
        arg_sev = str('&severity=' + args.sev_arg)
    else:
        arg_sev = ""
    if args.cve_arg != "":
        arg_cve = str('&cve=' + args.cve_arg)
    else:
        arg_cve = ""
    if args.group_arg != "":
        arg_group = str('&group=' + args.group_arg)
    else:
        arg_group = ""
    arg_string = arg_name + arg_sev + arg_cve
    arg_short = bool(args.short)
    return arg_string, arg_group, arg_short


def api_calls(filters: str, group: str) -> tuple[list, list, dict]:
    """ Set up and make API calls to server for retrieval of information."""
    import json
    from base64 import b64encode

    try:
        import requests
        import urllib3
    except ModuleNotFoundError:
        print('Error: ensure modules "urllib3" and "requests" are installed')
        exit()

    # Disable insecure https warnings (for self-signed SSL certificates)
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

    # Configuration
    protocol = 'https'
    host = 'X.X.X.X'  # Supply server address
    port = 55000
    user = 'XXXXX'   # Supply API credentials
    password = 'XXXXX'
    login_endpoint = 'security/user/authenticate'

    login_url = f"{protocol}://{host}:{port}/{login_endpoint}"
    basic_auth = f"{user}:{password}".encode()
    login_headers = {'Content-Type': 'application/json',
                     'Authorization': f'Basic {b64encode(basic_auth).decode()}'
                     }

    # API call with login to get auth token
    try:
        response = requests.get(login_url, headers=login_headers, verify=False)
        response.raise_for_status()
        token = json.loads(response.content.decode())['data']['token']

        # New authorization header with the JWT token we got
        requests_headers = {'Content-Type': 'application/json',
                            'Authorization': f'Bearer {token}'}

        # Retrieve list of active agents from the server for querying vulns,
        # filter output by group if specified.
        login_endpoint = f"agents/?status=active&select=id,name{group}"
        login_url = f"{protocol}://{host}:{port}/{login_endpoint}"
        response = requests.get(login_url,
            headers=requests_headers, verify=False)
        response.raise_for_status()
        jlist = json.loads(response.content.decode())
        active_agents = []
        agent_names = []
        for i in jlist['data']['affected_items']:
            active_agents.append(i['id'])
            agent_names.append(i['name'])

        # Retrieve list of vulnerabilities for each active agent, filtering
        # with any supplied arguments; format {&argtype=[arg]}
        vulns = {}
        for k in active_agents:
            response = requests.get(
                f"{protocol}://{host}:{port}/vulnerability/{k}?limit=10000\
                &select=cve,name,severity,condition{filters}",
                headers=requests_headers, verify=False
            )
            response.raise_for_status()
            vulns[k] = json.loads(response.content.decode())

    # Catch problems talking to server
    except requests.exceptions.HTTPError:
        print("An error occurred: %s, %s" % (
              json.loads(response.content)['title'],
              json.loads(response.content)['detail']))
        quit()
    except requests.exceptions.ConnectionError as e:
        print("Connection error:", str(e))
        quit()

    return active_agents, agent_names, vulns


class ProcessInfo():
    """Class to extract and compile a specific type of data from bulk
    vulnerability information.
    """

    # Create class-specific variables.
    def __init__(self, agent_list: list, vuln_dict: dict, type_filter: str):
        self.type_filter = type_filter
        self.item_tuple = self.process_info(agent_list, vuln_dict)

    def process_info(self, agent_list, vuln_dict) -> list:
        """Process vulnerability metadata from each
        agent and compile a list of unique entries, with counts.
        """
        item_compendium = []
        item_occurrence = []
        for agent in agent_list:
            for vuln in vuln_dict[agent]['data']['affected_items']:
                # Append vulnerability condition to name results.
                if self.type_filter == 'name':
                    item_compendium.append(vuln[self.type_filter] +
                    ', ' + vuln['condition'])
                else:
                    item_compendium.append(vuln[self.type_filter])
        item_header = set(item_compendium)
        for item in item_header:
            item_occurrence.append(item_compendium.count(item))
        item_tuple = tuple(zip(item_occurrence, item_header))
        # End program early if no matches detected
        if len(item_tuple) == 0:
            print('No results found.')
            exit()

        return sorted(item_tuple, reverse=True)


    def print_results(self, arg_short: bool) -> None:
        """Format and display compiled information."""
        if not arg_short:
            print(f'\nList of results by {self.type_filter}:\n')
            for s in self.item_tuple:
                print(*s, sep='\t')
        else:
            print(f'\nTop results by {self.type_filter}:\n')
            for s in self.item_tuple[:25:]:
                print(*s, sep='\t')
        print('\nTotal number detected:', len(self.item_tuple))


def agent_counts(agent_list: list, names_list: list, vuln_dict: dict) -> None:
    """Shows results per agent, separated by severity."""
    print('Agent\tCrit\tHigh\tMedium\tLow\tTotal\tName')

    for a, n in zip(agent_list, names_list):
        sev_comp = []
        sev_list = []
        for vuln in vuln_dict[a]['data']['affected_items']:
            sev_comp.append(vuln['severity'])
        sev_list.append(a)
        sev_list.append(sev_comp.count('Critical'))
        sev_list.append(sev_comp.count('High'))
        sev_list.append(sev_comp.count('Medium'))
        sev_list.append(sev_comp.count('Low'))
        sev_list.append(vuln_dict[a]['data']['total_affected_items'])
        sev_list.append(n)
        print(*sev_list, sep='\t')


def main():

    # Interpret command-line switches
    arg_string, arg_group, arg_short = get_args()

    # Function to retrieve and process data from server API
    list_of_agents, names, dict_of_vulns = api_calls(arg_string, arg_group)

    # Build classes to store each type of data separately
    sev_occurrence = ProcessInfo(list_of_agents, dict_of_vulns, 'severity')
    name_occurrence = ProcessInfo(list_of_agents, dict_of_vulns, 'name')
    cve_occurrence = ProcessInfo(list_of_agents, dict_of_vulns, 'cve')

    # Output data by type
    print('\nList of agent IDs and number of vulnerabilities on each:\n')
    if not arg_short:
        agent_counts(list_of_agents, names, dict_of_vulns)
    else: # Print reduced results for '--short'
        for a, n in zip(list_of_agents, names):
            num = dict_of_vulns[a]['data']['total_affected_items']
            if num != 0:
                print(f'{a}\t{num}\t{n}')

    for result in [sev_occurrence, name_occurrence, cve_occurrence]:
        result.print_results(arg_short)


if __name__ == "__main__":
    main()

Usage Guide

To use this vulnerability dashboard tool for Wazuh, follow these steps:

1. Prerequisites

Ensure you have Python 3.10 installed on your system.

2. Install Dependencies

Install the required dependencies:

You can install these using pip:

pip install requests urllib3

3. Configure the Script

Set up the configuration by modifying these values in the script:

4. Save the Script

Save the script as vuln_dash.py (or any preferred name).

5. Run the Script

Run the script from the command line. You can use various options to filter and customize the output:

Basic usage:

python vuln_dash.py

Filter by package name:

python vuln_dash.py -n packagename

Filter by severity:

python vuln_dash.py -s Critical

Filter by CVE:

python vuln_dash.py -c CVE-2023-12345

Filter by agent group:

python vuln_dash.py -g groupname

Display only a summary:

python vuln_dash.py --short

Combine multiple filters:

python vuln_dash.py -n packagename -s High -g groupname --short

6. Understanding the Output

The script will connect to your Wazuh manager, retrieve vulnerability data, and display the results based on your specified filters.

The output will include:

Important Notes

Features

This tool can be very useful for quickly assessing the vulnerability status across your Wazuh-monitored environment, especially when you need to focus on specific types of vulnerabilities or particular agent groups.