Featured image of post IOC Enrichment Pipeline

IOC Enrichment Pipeline

An automated pipeline that ingests IOCs from MISP, VirusTotal, and AlienVault OTX, scores them, deduplicates, and pushes actionable indicators to SIEM and EDR platforms.

Overview

Every SOC drowns in raw IOCs — IP addresses, domains, file hashes, and URLs that arrive from dozens of feeds with no context, no score, and no priority. This pipeline solves that by:

  1. Pulling indicators from multiple threat intel sources on a schedule
  2. Enriching each IOC with reputation data from multiple vendors
  3. Deduplicating and scoring based on confidence and severity
  4. Pushing only high-confidence, actionable indicators to SIEM and EDR

The result is a clean, scored, deduplicated feed that fires detection rules rather than flooding analysts with noise.


Architecture

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
┌─────────────────────────────────────────────────┐
│                  Ingestion Layer                 │
│  MISP  ──┐                                       │
│  OTX   ──┼──► Normaliser ──► Deduplication DB   │
│  Feeds ──┘                        │              │
└───────────────────────────────────┼──────────────┘
┌─────────────────────────────────────────────────┐
│                 Enrichment Layer                 │
│  VirusTotal ──┐                                  │
│  Shodan      ──┼──► Scorer ──► Enriched IOC     │
│  AbuseIPDB  ──┘                                  │
└───────────────────────────────────┼──────────────┘
┌─────────────────────────────────────────────────┐
│                   Output Layer                   │
│  Splunk / Sentinel ◄──┬──── High confidence     │
│  CrowdStrike EDR   ◄──┘     (score ≥ 70)        │
│  MISP (feedback)   ◄────── All enriched IOCs    │
└─────────────────────────────────────────────────┘

Tech Stack

Component Tool
Language Python 3.11
Threat Intel MISP, AlienVault OTX, Abuse.ch
Enrichment VirusTotal API v3, Shodan, AbuseIPDB
Storage SQLite (dedup cache)
Output Splunk HEC, Sentinel REST API, CrowdStrike IOC API
Scheduler Cron (every 4 hours)

Replication Guide

Step 1 — Install dependencies

1
pip install pymisp OTXv2 vt-py shodan requests sqlite3

Step 2 — Configure credentials

Create a config.yaml file:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
misp:
  url: "https://your-misp-instance.local"
  key: "YOUR_MISP_API_KEY"

otx:
  key: "YOUR_OTX_API_KEY"

virustotal:
  key: "YOUR_VT_API_KEY"

shodan:
  key: "YOUR_SHODAN_API_KEY"

abuseipdb:
  key: "YOUR_ABUSEIPDB_API_KEY"

outputs:
  splunk_hec_url: "https://splunk:8088/services/collector"
  splunk_hec_token: "YOUR_HEC_TOKEN"
  crowdstrike_client_id: "YOUR_CS_CLIENT_ID"
  crowdstrike_client_secret: "YOUR_CS_SECRET"

Step 3 — Ingest IOCs from sources

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from pymisp import PyMISP
from OTXv2 import OTXv2, IndicatorTypes

def fetch_misp_iocs(config):
    misp = PyMISP(config['misp']['url'], config['misp']['key'])
    # Pull all events updated in the last 24 hours
    events = misp.search(publish_timestamp='1d', to_ids=True)
    iocs = []
    for event in events:
        for attr in event['Event']['Attribute']:
            if attr['type'] in ['ip-dst', 'domain', 'md5', 'sha256', 'url']:
                iocs.append({
                    'value': attr['value'],
                    'type': attr['type'],
                    'source': 'MISP',
                    'tags': [t['name'] for t in attr.get('Tag', [])]
                })
    return iocs

def fetch_otx_iocs(config):
    otx = OTXv2(config['otx']['key'])
    pulses = otx.getall(modified_since='2024-01-01')
    iocs = []
    for pulse in pulses:
        for indicator in pulse['indicators']:
            iocs.append({
                'value': indicator['indicator'],
                'type': indicator['type'],
                'source': 'OTX',
                'tags': pulse.get('tags', [])
            })
    return iocs

Step 4 — Deduplicate using SQLite cache

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import sqlite3
import hashlib

def init_db():
    conn = sqlite3.connect('ioc_cache.db')
    conn.execute('''
        CREATE TABLE IF NOT EXISTS iocs (
            hash TEXT PRIMARY KEY,
            value TEXT,
            type TEXT,
            first_seen TEXT,
            last_seen TEXT,
            score INTEGER
        )
    ''')
    conn.commit()
    return conn

def is_duplicate(conn, ioc_value):
    h = hashlib.sha256(ioc_value.encode()).hexdigest()
    row = conn.execute('SELECT hash FROM iocs WHERE hash = ?', (h,)).fetchone()
    return row is not None, h

Step 5 — Enrich with VirusTotal

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import vt

def enrich_virustotal(ioc, api_key):
    client = vt.Client(api_key)
    score = 0
    try:
        if ioc['type'] in ['md5', 'sha256']:
            file_obj = client.get_object(f"/files/{ioc['value']}")
            stats = file_obj.last_analysis_stats
            score = int((stats['malicious'] / sum(stats.values())) * 100)

        elif ioc['type'] == 'domain':
            domain_obj = client.get_object(f"/domains/{ioc['value']}")
            stats = domain_obj.last_analysis_stats
            score = int((stats['malicious'] / sum(stats.values())) * 100)

        elif ioc['type'] == 'ip-dst':
            ip_obj = client.get_object(f"/ip_addresses/{ioc['value']}")
            stats = ip_obj.last_analysis_stats
            score = int((stats['malicious'] / sum(stats.values())) * 100)

    except Exception:
        score = 0
    finally:
        client.close()

    return score

Step 6 — Push to Splunk via HEC

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import requests
import json

def push_to_splunk(iocs, hec_url, hec_token):
    headers = {'Authorization': f'Splunk {hec_token}'}
    for ioc in iocs:
        if ioc['score'] >= 70:  # Only high-confidence IOCs
            payload = {
                "time": ioc['timestamp'],
                "sourcetype": "ioc:enriched",
                "event": {
                    "ioc_value": ioc['value'],
                    "ioc_type": ioc['type'],
                    "score": ioc['score'],
                    "source": ioc['source'],
                    "tags": ioc['tags']
                }
            }
            requests.post(hec_url, headers=headers,
                          data=json.dumps(payload), verify=False)

Step 7 — Schedule with cron

1
2
# Run every 4 hours
0 */4 * * * /usr/bin/python3 /opt/ioc-pipeline/main.py >> /var/log/ioc-pipeline.log 2>&1

Example Output

A scored, enriched IOC pushed to Splunk looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
  "ioc_value": "185.220.101.47",
  "ioc_type": "ip-dst",
  "score": 87,
  "source": "MISP",
  "vt_malicious_engines": 52,
  "vt_total_engines": 90,
  "shodan_open_ports": [22, 443, 9001],
  "shodan_org": "Tor-Exit-Node-Hosting",
  "abuseipdb_confidence": 94,
  "tags": ["Tor", "C2", "APT"],
  "first_seen": "2026-01-10T08:23:00Z",
  "last_seen": "2026-01-15T04:11:00Z"
}

Contact me at contact@malsayegh.ae if you want to discuss implementation details or adapt this for your environment.

comments powered by Disqus
All rights Reserved for malsayegh.ae
Built with Hugo
Theme Stack designed by Jimmy