Overview
Every SOC drowns in raw IOCs — IP addresses, domains, file hashes, and URLs that arrive from dozens of feeds with no context, no score, and no priority. This pipeline solves that by:
- Pulling indicators from multiple threat intel sources on a schedule
- Enriching each IOC with reputation data from multiple vendors
- Deduplicating and scoring based on confidence and severity
- Pushing only high-confidence, actionable indicators to SIEM and EDR
The result is a clean, scored, deduplicated feed that fires detection rules rather than flooding analysts with noise.
Architecture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
┌─────────────────────────────────────────────────┐
│ Ingestion Layer │
│ MISP ──┐ │
│ OTX ──┼──► Normaliser ──► Deduplication DB │
│ Feeds ──┘ │ │
└───────────────────────────────────┼──────────────┘
▼
┌─────────────────────────────────────────────────┐
│ Enrichment Layer │
│ VirusTotal ──┐ │
│ Shodan ──┼──► Scorer ──► Enriched IOC │
│ AbuseIPDB ──┘ │
└───────────────────────────────────┼──────────────┘
▼
┌─────────────────────────────────────────────────┐
│ Output Layer │
│ Splunk / Sentinel ◄──┬──── High confidence │
│ CrowdStrike EDR ◄──┘ (score ≥ 70) │
│ MISP (feedback) ◄────── All enriched IOCs │
└─────────────────────────────────────────────────┘
|
Tech Stack
| Component |
Tool |
| Language |
Python 3.11 |
| Threat Intel |
MISP, AlienVault OTX, Abuse.ch |
| Enrichment |
VirusTotal API v3, Shodan, AbuseIPDB |
| Storage |
SQLite (dedup cache) |
| Output |
Splunk HEC, Sentinel REST API, CrowdStrike IOC API |
| Scheduler |
Cron (every 4 hours) |
Replication Guide
Step 1 — Install dependencies
1
|
pip install pymisp OTXv2 vt-py shodan requests sqlite3
|
Create a config.yaml file:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
misp:
url: "https://your-misp-instance.local"
key: "YOUR_MISP_API_KEY"
otx:
key: "YOUR_OTX_API_KEY"
virustotal:
key: "YOUR_VT_API_KEY"
shodan:
key: "YOUR_SHODAN_API_KEY"
abuseipdb:
key: "YOUR_ABUSEIPDB_API_KEY"
outputs:
splunk_hec_url: "https://splunk:8088/services/collector"
splunk_hec_token: "YOUR_HEC_TOKEN"
crowdstrike_client_id: "YOUR_CS_CLIENT_ID"
crowdstrike_client_secret: "YOUR_CS_SECRET"
|
Step 3 — Ingest IOCs from sources
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
from pymisp import PyMISP
from OTXv2 import OTXv2, IndicatorTypes
def fetch_misp_iocs(config):
misp = PyMISP(config['misp']['url'], config['misp']['key'])
# Pull all events updated in the last 24 hours
events = misp.search(publish_timestamp='1d', to_ids=True)
iocs = []
for event in events:
for attr in event['Event']['Attribute']:
if attr['type'] in ['ip-dst', 'domain', 'md5', 'sha256', 'url']:
iocs.append({
'value': attr['value'],
'type': attr['type'],
'source': 'MISP',
'tags': [t['name'] for t in attr.get('Tag', [])]
})
return iocs
def fetch_otx_iocs(config):
otx = OTXv2(config['otx']['key'])
pulses = otx.getall(modified_since='2024-01-01')
iocs = []
for pulse in pulses:
for indicator in pulse['indicators']:
iocs.append({
'value': indicator['indicator'],
'type': indicator['type'],
'source': 'OTX',
'tags': pulse.get('tags', [])
})
return iocs
|
Step 4 — Deduplicate using SQLite cache
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import sqlite3
import hashlib
def init_db():
conn = sqlite3.connect('ioc_cache.db')
conn.execute('''
CREATE TABLE IF NOT EXISTS iocs (
hash TEXT PRIMARY KEY,
value TEXT,
type TEXT,
first_seen TEXT,
last_seen TEXT,
score INTEGER
)
''')
conn.commit()
return conn
def is_duplicate(conn, ioc_value):
h = hashlib.sha256(ioc_value.encode()).hexdigest()
row = conn.execute('SELECT hash FROM iocs WHERE hash = ?', (h,)).fetchone()
return row is not None, h
|
Step 5 — Enrich with VirusTotal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import vt
def enrich_virustotal(ioc, api_key):
client = vt.Client(api_key)
score = 0
try:
if ioc['type'] in ['md5', 'sha256']:
file_obj = client.get_object(f"/files/{ioc['value']}")
stats = file_obj.last_analysis_stats
score = int((stats['malicious'] / sum(stats.values())) * 100)
elif ioc['type'] == 'domain':
domain_obj = client.get_object(f"/domains/{ioc['value']}")
stats = domain_obj.last_analysis_stats
score = int((stats['malicious'] / sum(stats.values())) * 100)
elif ioc['type'] == 'ip-dst':
ip_obj = client.get_object(f"/ip_addresses/{ioc['value']}")
stats = ip_obj.last_analysis_stats
score = int((stats['malicious'] / sum(stats.values())) * 100)
except Exception:
score = 0
finally:
client.close()
return score
|
Step 6 — Push to Splunk via HEC
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
import requests
import json
def push_to_splunk(iocs, hec_url, hec_token):
headers = {'Authorization': f'Splunk {hec_token}'}
for ioc in iocs:
if ioc['score'] >= 70: # Only high-confidence IOCs
payload = {
"time": ioc['timestamp'],
"sourcetype": "ioc:enriched",
"event": {
"ioc_value": ioc['value'],
"ioc_type": ioc['type'],
"score": ioc['score'],
"source": ioc['source'],
"tags": ioc['tags']
}
}
requests.post(hec_url, headers=headers,
data=json.dumps(payload), verify=False)
|
Step 7 — Schedule with cron
1
2
|
# Run every 4 hours
0 */4 * * * /usr/bin/python3 /opt/ioc-pipeline/main.py >> /var/log/ioc-pipeline.log 2>&1
|
Example Output
A scored, enriched IOC pushed to Splunk looks like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
{
"ioc_value": "185.220.101.47",
"ioc_type": "ip-dst",
"score": 87,
"source": "MISP",
"vt_malicious_engines": 52,
"vt_total_engines": 90,
"shodan_open_ports": [22, 443, 9001],
"shodan_org": "Tor-Exit-Node-Hosting",
"abuseipdb_confidence": 94,
"tags": ["Tor", "C2", "APT"],
"first_seen": "2026-01-10T08:23:00Z",
"last_seen": "2026-01-15T04:11:00Z"
}
|
Contact me at contact@malsayegh.ae if you want to discuss implementation details or adapt this for your environment.