TheHive and Cortex together enable security teams to automate the analysis and enrichment of observables, reducing manual effort and accelerating incident response. TheHive serves as a centralized platform for managing and coordinating incident response efforts, providing a unified interface for collaboration.
----In the first part of our blog on this topic, we had covered the installation and integration of TheHive & Cortex. In this (the second part), we shall take our journey further and explore how to integrate Wazuh (an SIEM tool) with TheHive. We shall also create some cases by using Wazuh alerts.
For the uninitiated, Wazuh is an open-source Security Information and Event Management (SIEM) tool. It provides security information, log management, vulnerability detection, intrusion detection, and much more.
Integrate Wazuh with TheHive
The integration is tested on Wazuh v4.5 and TheHive v4.1.18-1.
Prerequisites
The setup in this part shows you the steps to create an organization and the users on TheHive. We then proceed to demonstrate the integration with Wazuh.
Configure Wazuh Manager:
Login to your Wazuh Server console and perform the following steps -
Install TheHive Python module:
sudo /var/ossec/framework/python/bin/pip3 install thehive4py==1.8.1
We create the custom integration script by pasting the following python code in /var/ossec/integrations/custom-w2thehive.py.
----------------------------- custom-w2thehive.py ----------------------------
#!/var/ossec/framework/python/bin/python3
import json
import sys
import os
import re
import logging
import uuid
from thehive4py.api import TheHiveApi
from thehive4py.models import Alert, AlertArtifact
#start user config
# Global vars
#threshold for wazuh rules level
lvl_threshold=0
#threshold for suricata rules level
suricata_lvl_threshold=3
debug_enabled = False
#info about created alert
info_enabled = True
#end user config
# Set paths
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
log_file = '{0}/logs/integrations.log'.format(pwd)
logger = logging.getLogger(__name__)
#set logging level
logger.setLevel(logging.WARNING)
if info_enabled:
logger.setLevel(logging.INFO)
if debug_enabled:
logger.setLevel(logging.DEBUG)
# create the logging file handler
fh = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)
def main(args):
logger.debug('#start main')
logger.debug('#get alert file location')
alert_file_location = args[1]
logger.debug('#get TheHive url')
thive = args[3]
logger.debug('#get TheHive api key')
thive_api_key = args[2]
thive_api = TheHiveApi(thive, thive_api_key )
logger.debug('#open alert file')
w_alert = json.load(open(alert_file_location))
logger.debug('#alert data')
logger.debug(str(w_alert))
logger.debug('#gen json to dot-key-text')
alt = pr(w_alert,'',[])
logger.debug('#formatting description')
format_alt = md_format(alt)
logger.debug('#search artifacts')
artifacts_dict = artifact_detect(format_alt)
alert = generate_alert(format_alt, artifacts_dict, w_alert)
logger.debug('#threshold filtering')
if w_alert['rule']['groups']==['ids','suricata']:
#checking the existence of the data.alert.severity field
if 'data' in w_alert.keys():
if 'alert' in w_alert['data']:
#checking the level of the source event
if int(w_alert['data']['alert']['severity'])<=suricata_lvl_threshold:
send_alert(alert, thive_api)
elif int(w_alert['rule']['level'])>=lvl_threshold:
#if the event is different from suricata AND suricata-event-type: alert check lvl_threshold
send_alert(alert, thive_api)
def pr(data,prefix, alt):
for key,value in data.items():
if hasattr(value,'keys'):
pr(value,prefix+'.'+str(key),alt=alt)
else:
alt.append((prefix+'.'+str(key)+'|||'+str(value)))
return alt
def md_format(alt,format_alt=''):
md_title_dict = {}
#sorted with first key
for now in alt:
now = now[1:]
#fix first key last symbol
dot = now.split('|||')[0].find('.')
if dot==-1:
md_title_dict[now.split('|||')[0]] =[now]
else:
if now[0:dot] in md_title_dict.keys():
(md_title_dict[now[0:dot]]).append(now)
else:
md_title_dict[now[0:dot]]=[now]
for now in md_title_dict.keys():
format_alt+='### '+now.capitalize()+'n'+'| key | val |n| ------ | ------ |n'
for let in md_title_dict[now]:
key,val = let.split('|||')[0],let.split('|||')[1]
format_alt+='| **' + key + '** | ' + val + ' |n'
return format_alt
def pr(data,prefix, alt):
for key,value in data.items():
if hasattr(value,'keys'):
pr(value,prefix+'.'+str(key),alt=alt)
else:
alt.append((prefix+'.'+str(key)+'|||'+str(value)))
return alt
def md_format(alt,format_alt=''):
md_title_dict = {}
#sorted with first key
for now in alt:
now = now[1:]
#fix first key last symbol
dot = now.split('|||')[0].find('.')
if dot==-1:
md_title_dict[now.split('|||')[0]] =[now]
else:
if now[0:dot] in md_title_dict.keys():
(md_title_dict[now[0:dot]]).append(now)
else:
md_title_dict[now[0:dot]]=[now]
for now in md_title_dict.keys():
format_alt+='### '+now.capitalize()+'n'+'| key | val |n| ------ | ------ |n'
for let in md_title_dict[now]:
key,val = let.split('|||')[0],let.split('|||')[1]
format_alt+='| **' + key + '** | ' + val + ' |n'
return format_alt
def generate_alert(format_alt, artifacts_dict,w_alert):
#generate alert sourceRef
sourceRef = str(uuid.uuid4())[0:6]
artifacts = []
if 'agent' in w_alert.keys():
if 'ip' not in w_alert['agent'].keys():
w_alert['agent']['ip']='no agent ip'
else:
w_alert['agent'] = {'id':'no agent id', 'name':'no agent name'}
for key,value in artifacts_dict.items():
for val in value:
artifacts.append(AlertArtifact(dataType=key, data=val))
alert = Alert(title=w_alert['rule']['description'],
tlp=2,
tags=['wazuh',
'rule='+w_alert['rule']['id'],
'agent_name='+w_alert['agent']['name'],
'agent_id='+w_alert['agent']['id'],
'agent_ip='+w_alert['agent']['ip'],],
description=format_alt ,
type='wazuh_alert',
source='wazuh',
sourceRef=sourceRef,
artifacts=artifacts,)
return alert
def send_alert(alert, thive_api):
response = thive_api.create_alert(alert)
if response.status_code == 201:
logger.info('Create TheHive alert: '+ str(response.json()['id']))
else:
logger.error('Error create TheHive alert: {}/{}'.format(response.status_code, response.text))
if __name__ == "__main__":
try:
logger.debug('debug mode') # if debug enabled
# Main function
main(sys.argv)
except Exception:
logger.exception('EGOR')
--------------------------------------------------------------------------------
We create a bash script as /var/ossec/integrations/custom-w2thive. This will execute the .py script created in the previous step.
----------------------------------- custom-w2thehive ---------------------------
#!/bin/sh
# Copyright (C) 2015-2020, Wazuh Inc.
# Created by Wazuh, Inc..
# This program is free software; you can redistribute it and/or modify it under the terms of GP>
WPYTHON_BIN="framework/python/bin/python3"
SCRIPT_PATH_NAME="$0"
DIR_NAME="$(cd $(dirname ${SCRIPT_PATH_NAME}); pwd -P)"
SCRIPT_NAME="$(basename ${SCRIPT_PATH_NAME})"
case ${DIR_NAME} in
*/active-response/bin | */wodles*)
if [ -z "${WAZUH_PATH}" ]; then
WAZUH_PATH="$(cd ${DIR_NAME}/../..; pwd)"
fi
PYTHON_SCRIPT="${DIR_NAME}/${SCRIPT_NAME}.py"
;;
*/bin)
if [ -z "${WAZUH_PATH}" ]; then
WAZUH_PATH="$(cd ${DIR_NAME}/..; pwd)"
fi
PYTHON_SCRIPT="${WAZUH_PATH}/framework/scripts/${SCRIPT_NAME}.py"
;;
*/integrations)
if [ -z "${WAZUH_PATH}" ]; then
WAZUH_PATH="$(cd ${DIR_NAME}/..; pwd)"
fi
PYTHON_SCRIPT="${DIR_NAME}/${SCRIPT_NAME}.py"
;;
esac
${WAZUH_PATH}/${WPYTHON_BIN} ${PYTHON_SCRIPT} $@
--------------------------------------------------------------------------------
We change the file’s permission and the ownership to ensure that Wazuh has adequate permissions to access and run them:
sudo chmod 755 /var/ossec/integrations/custom-w2thive.py
sudo chmod 755 /var/ossec/integrations/custom-w2thive
sudo chown root:ossec /var/ossec/integrations/custom-w2thive.py
sudo chown root:ossec /var/ossec/integrations/custom-w2thive
To allow Wazuh to run the integration script, we add the following lines to the manager configuration file located at /var/ossec/etc/ossec.conf. We insert the IP address for TheHive server along with the API key that was generated earlier.
-----------------------------------------------------------------------------------
custom-w2thive
http://TheHive_Server_IP:9000
API
json
-----------------------------------------------------------------------------------
Restart the manager to apply the changes:
sudo systemctl restart wazuh-manager
Log into TheHive with our test user account, and we can see Wazuh generated alerts under the “Alerts” tab:
Create case:
Conclusion:
The integration of TheHive and Wazuh provides a comprehensive solution for security operations and incident response. Both TheHive and Wazuh are actively developed projects and continue being improved over time. Regular updates and new features ensure that the integrated solution remains effective against evolving cyber threats.