Overview
Organisations rarely know they have been breached until weeks or months after the fact — often when credentials appear for sale on dark web markets or when data dumps surface on paste sites. This project builds a continuous monitoring pipeline that watches for:
- Leaked credentials matching target domains (e.g.
@company.ae)
- Mentions of the organisation name or brand on dark web forums
- Compromised employee data in public breach databases
- New paste site uploads containing relevant keywords
When a match is found, an alert is generated with context, severity, and recommended actions — before the leaked data is weaponised.
Architecture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
┌────────────────────────────────────────────────┐
│ Monitoring Sources │
│ Paste Sites (Pastebin, Riseup, Ghostbin) ──┐ │
│ Have I Been Pwned API ├──► Collector
│ Telegram Leak Channels ──┘ │
└───────────────────────────────┬────────────────┘
│
▼
┌────────────────────────────────────────────────┐
│ Keyword Matching Engine │
│ Domain patterns → @targetdomain.ae │
│ Brand keywords → "TargetOrg", "Target Inc" │
│ Executive names → hashed for privacy │
└───────────────────────────────┬────────────────┘
│
▼
┌────────────────────────────────────────────────┐
│ Enrichment & Triage │
│ Credential validity check (safe, passive) │
│ Breach source identification │
│ Severity scoring │
└───────────────────────────────┬────────────────┘
│
▼
┌────────────────────────────────────────────────┐
│ Alerting │
│ Teams / Email notification │
│ MISP event creation │
│ ServiceNow incident (if critical) │
└────────────────────────────────────────────────┘
|
Tech Stack
| Component |
Tool |
| Language |
Python 3.11 |
| Paste monitoring |
requests, Pastebin scraping, RSS feeds |
| Breach data |
Have I Been Pwned API v3 |
| Telegram monitoring |
telethon (Telegram MTProto) |
| Storage |
SQLite (alert deduplication) |
| Alerting |
Microsoft Teams Webhook, MISP API |
| Scheduler |
Cron (every 30 minutes) |
Note: All monitoring described here is passive and read-only. No credentials are tested against live systems.
Replication Guide
Step 1 — Install dependencies
1
|
pip install requests telethon pymisp sqlite3 python-dotenv
|
Create a targets.yaml:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
domains:
- "company.ae"
- "company.com"
keywords:
- "Company Name"
- "CompanyAbbreviation"
executive_emails:
- "ceo@company.ae"
- "cfo@company.ae"
# Severity thresholds
severity:
credential_match: HIGH
keyword_mention: MEDIUM
executive_match: CRITICAL
|
Pastebin and several open paste sites expose RSS feeds that update in near real-time.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import requests
import feedparser
PASTE_FEEDS = [
"https://pastebin.com/archive/rss",
"https://riseup.net/paste/rss",
]
def monitor_paste_feeds(keywords):
matches = []
for feed_url in PASTE_FEEDS:
feed = feedparser.parse(feed_url)
for entry in feed.entries:
content = entry.get('summary', '') + entry.get('title', '')
for keyword in keywords:
if keyword.lower() in content.lower():
matches.append({
'source': feed_url,
'title': entry.title,
'link': entry.link,
'keyword': keyword,
'published': entry.get('published', 'unknown')
})
return matches
|
Step 4 — Check breach databases via HIBP
Have I Been Pwned’s Domain Search API returns all breached accounts for a given domain without exposing the actual passwords.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import requests
def check_hibp_domain(domain, api_key):
headers = {
'hibp-api-key': api_key,
'user-agent': 'DarkWebMonitor/1.0'
}
url = f"https://haveibeenpwned.com/api/v3/breacheddomain/{domain}"
response = requests.get(url, headers=headers)
if response.status_code == 200:
breaches = response.json()
# Returns dict of {email_prefix: [breach_names]}
return breaches
elif response.status_code == 404:
return {} # No breaches found
else:
raise Exception(f"HIBP API error: {response.status_code}")
def process_hibp_results(domain, breaches, known_breaches_db):
"""Filter out breaches already alerted on."""
new_breaches = {}
for email_prefix, breach_list in breaches.items():
new = [b for b in breach_list if b not in known_breaches_db]
if new:
new_breaches[f"{email_prefix}@{domain}"] = new
return new_breaches
|
Step 5 — Monitor Telegram leak channels
Many threat actors announce data dumps in public Telegram channels before listing them for sale. telethon allows passive read-only monitoring of public channels.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
from telethon.sync import TelegramClient
from telethon.tl.functions.channels import JoinChannelRequest
MONITORED_CHANNELS = [
# Add public channel usernames or IDs here
# e.g. 'channel_username'
]
async def monitor_telegram(api_id, api_hash, keywords, session_name='monitor'):
async with TelegramClient(session_name, api_id, api_hash) as client:
matches = []
for channel in MONITORED_CHANNELS:
async for message in client.iter_messages(channel, limit=100):
if message.text:
for keyword in keywords:
if keyword.lower() in message.text.lower():
matches.append({
'source': f'Telegram:{channel}',
'message': message.text[:500],
'date': str(message.date),
'keyword': keyword
})
return matches
|
Step 6 — Deduplicate alerts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import sqlite3
import hashlib
def init_alert_db():
conn = sqlite3.connect('alerts.db')
conn.execute('''
CREATE TABLE IF NOT EXISTS alerts (
hash TEXT PRIMARY KEY,
source TEXT,
keyword TEXT,
severity TEXT,
created_at TEXT
)
''')
conn.commit()
return conn
def is_new_alert(conn, source, keyword):
alert_hash = hashlib.sha256(f"{source}{keyword}".encode()).hexdigest()
row = conn.execute('SELECT hash FROM alerts WHERE hash = ?', (alert_hash,)).fetchone()
if row:
return False, alert_hash
return True, alert_hash
|
Step 7 — Send Teams alert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import requests
import json
def send_teams_alert(webhook_url, match, severity):
colour = {'CRITICAL': 'FF0000', 'HIGH': 'FF6600', 'MEDIUM': 'FFCC00'}.get(severity, 'CCCCCC')
payload = {
"@type": "MessageCard",
"@context": "http://schema.org/extensions",
"themeColor": colour,
"summary": f"[{severity}] Dark Web Match Detected",
"sections": [{
"activityTitle": f"**[{severity}] Dark Web Monitoring Alert**",
"facts": [
{"name": "Source", "value": match['source']},
{"name": "Keyword", "value": match['keyword']},
{"name": "Detected", "value": match.get('published', 'Unknown')},
{"name": "Link", "value": match.get('link', 'N/A')}
],
"text": match.get('message', match.get('title', ''))[:300]
}]
}
requests.post(webhook_url, data=json.dumps(payload),
headers={'Content-Type': 'application/json'})
|
Step 8 — Schedule the monitor
1
2
|
# Run every 30 minutes
*/30 * * * * /usr/bin/python3 /opt/dark-web-monitor/main.py >> /var/log/dark-web-monitor.log 2>&1
|
Example Alert Output
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
{
"alert_id": "DWM-2026-0142",
"severity": "HIGH",
"source": "Pastebin RSS",
"keyword_matched": "company.ae",
"context": "Fresh dump - 2,400 accounts from company.ae breach - includes plaintext passwords",
"link": "https://pastebin.com/xxxxxxxx",
"detected_at": "2026-02-20T07:14:00Z",
"recommended_actions": [
"Force password reset for all @company.ae accounts",
"Notify affected users",
"Correlate with recent login anomalies in SIEM",
"Submit paste URL to VirusTotal and URLScan"
]
}
|
Contact me at contact@malsayegh.ae if you want to discuss implementation or expand coverage to additional sources.