Featured image of post Dark Web Monitoring

Dark Web Monitoring

Automated monitoring of paste sites, dark web forums, and leak channels for compromised credentials, data exposure, and threat actor mentions of target organisations.

Overview

Organisations rarely know they have been breached until weeks or months after the fact — often when credentials appear for sale on dark web markets or when data dumps surface on paste sites. This project builds a continuous monitoring pipeline that watches for:

  • Leaked credentials matching target domains (e.g. @company.ae)
  • Mentions of the organisation name or brand on dark web forums
  • Compromised employee data in public breach databases
  • New paste site uploads containing relevant keywords

When a match is found, an alert is generated with context, severity, and recommended actions — before the leaked data is weaponised.


Architecture

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
┌────────────────────────────────────────────────┐
│               Monitoring Sources               │
│  Paste Sites (Pastebin, Riseup, Ghostbin) ──┐  │
│  Have I Been Pwned API                      ├──► Collector
│  Telegram Leak Channels                    ──┘  │
└───────────────────────────────┬────────────────┘
┌────────────────────────────────────────────────┐
│              Keyword Matching Engine           │
│  Domain patterns   → @targetdomain.ae          │
│  Brand keywords    → "TargetOrg", "Target Inc" │
│  Executive names   → hashed for privacy        │
└───────────────────────────────┬────────────────┘
┌────────────────────────────────────────────────┐
│              Enrichment & Triage               │
│  Credential validity check (safe, passive)     │
│  Breach source identification                  │
│  Severity scoring                              │
└───────────────────────────────┬────────────────┘
┌────────────────────────────────────────────────┐
│                   Alerting                     │
│  Teams / Email notification                    │
│  MISP event creation                           │
│  ServiceNow incident (if critical)             │
└────────────────────────────────────────────────┘

Tech Stack

Component Tool
Language Python 3.11
Paste monitoring requests, Pastebin scraping, RSS feeds
Breach data Have I Been Pwned API v3
Telegram monitoring telethon (Telegram MTProto)
Storage SQLite (alert deduplication)
Alerting Microsoft Teams Webhook, MISP API
Scheduler Cron (every 30 minutes)

Note: All monitoring described here is passive and read-only. No credentials are tested against live systems.


Replication Guide

Step 1 — Install dependencies

1
pip install requests telethon pymisp sqlite3 python-dotenv

Step 2 — Configure targets and keywords

Create a targets.yaml:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
domains:
  - "company.ae"
  - "company.com"

keywords:
  - "Company Name"
  - "CompanyAbbreviation"

executive_emails:
  - "ceo@company.ae"
  - "cfo@company.ae"

# Severity thresholds
severity:
  credential_match: HIGH
  keyword_mention: MEDIUM
  executive_match: CRITICAL

Step 3 — Monitor paste sites via RSS

Pastebin and several open paste sites expose RSS feeds that update in near real-time.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import requests
import feedparser

PASTE_FEEDS = [
    "https://pastebin.com/archive/rss",
    "https://riseup.net/paste/rss",
]

def monitor_paste_feeds(keywords):
    matches = []
    for feed_url in PASTE_FEEDS:
        feed = feedparser.parse(feed_url)
        for entry in feed.entries:
            content = entry.get('summary', '') + entry.get('title', '')
            for keyword in keywords:
                if keyword.lower() in content.lower():
                    matches.append({
                        'source': feed_url,
                        'title': entry.title,
                        'link': entry.link,
                        'keyword': keyword,
                        'published': entry.get('published', 'unknown')
                    })
    return matches

Step 4 — Check breach databases via HIBP

Have I Been Pwned’s Domain Search API returns all breached accounts for a given domain without exposing the actual passwords.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import requests

def check_hibp_domain(domain, api_key):
    headers = {
        'hibp-api-key': api_key,
        'user-agent': 'DarkWebMonitor/1.0'
    }
    url = f"https://haveibeenpwned.com/api/v3/breacheddomain/{domain}"
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        breaches = response.json()
        # Returns dict of {email_prefix: [breach_names]}
        return breaches
    elif response.status_code == 404:
        return {}  # No breaches found
    else:
        raise Exception(f"HIBP API error: {response.status_code}")

def process_hibp_results(domain, breaches, known_breaches_db):
    """Filter out breaches already alerted on."""
    new_breaches = {}
    for email_prefix, breach_list in breaches.items():
        new = [b for b in breach_list if b not in known_breaches_db]
        if new:
            new_breaches[f"{email_prefix}@{domain}"] = new
    return new_breaches

Step 5 — Monitor Telegram leak channels

Many threat actors announce data dumps in public Telegram channels before listing them for sale. telethon allows passive read-only monitoring of public channels.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from telethon.sync import TelegramClient
from telethon.tl.functions.channels import JoinChannelRequest

MONITORED_CHANNELS = [
    # Add public channel usernames or IDs here
    # e.g. 'channel_username'
]

async def monitor_telegram(api_id, api_hash, keywords, session_name='monitor'):
    async with TelegramClient(session_name, api_id, api_hash) as client:
        matches = []
        for channel in MONITORED_CHANNELS:
            async for message in client.iter_messages(channel, limit=100):
                if message.text:
                    for keyword in keywords:
                        if keyword.lower() in message.text.lower():
                            matches.append({
                                'source': f'Telegram:{channel}',
                                'message': message.text[:500],
                                'date': str(message.date),
                                'keyword': keyword
                            })
        return matches

Step 6 — Deduplicate alerts

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import sqlite3
import hashlib

def init_alert_db():
    conn = sqlite3.connect('alerts.db')
    conn.execute('''
        CREATE TABLE IF NOT EXISTS alerts (
            hash TEXT PRIMARY KEY,
            source TEXT,
            keyword TEXT,
            severity TEXT,
            created_at TEXT
        )
    ''')
    conn.commit()
    return conn

def is_new_alert(conn, source, keyword):
    alert_hash = hashlib.sha256(f"{source}{keyword}".encode()).hexdigest()
    row = conn.execute('SELECT hash FROM alerts WHERE hash = ?', (alert_hash,)).fetchone()
    if row:
        return False, alert_hash
    return True, alert_hash

Step 7 — Send Teams alert

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import requests
import json

def send_teams_alert(webhook_url, match, severity):
    colour = {'CRITICAL': 'FF0000', 'HIGH': 'FF6600', 'MEDIUM': 'FFCC00'}.get(severity, 'CCCCCC')
    payload = {
        "@type": "MessageCard",
        "@context": "http://schema.org/extensions",
        "themeColor": colour,
        "summary": f"[{severity}] Dark Web Match Detected",
        "sections": [{
            "activityTitle": f"**[{severity}] Dark Web Monitoring Alert**",
            "facts": [
                {"name": "Source", "value": match['source']},
                {"name": "Keyword", "value": match['keyword']},
                {"name": "Detected", "value": match.get('published', 'Unknown')},
                {"name": "Link", "value": match.get('link', 'N/A')}
            ],
            "text": match.get('message', match.get('title', ''))[:300]
        }]
    }
    requests.post(webhook_url, data=json.dumps(payload),
                  headers={'Content-Type': 'application/json'})

Step 8 — Schedule the monitor

1
2
# Run every 30 minutes
*/30 * * * * /usr/bin/python3 /opt/dark-web-monitor/main.py >> /var/log/dark-web-monitor.log 2>&1

Example Alert Output

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
{
  "alert_id": "DWM-2026-0142",
  "severity": "HIGH",
  "source": "Pastebin RSS",
  "keyword_matched": "company.ae",
  "context": "Fresh dump - 2,400 accounts from company.ae breach - includes plaintext passwords",
  "link": "https://pastebin.com/xxxxxxxx",
  "detected_at": "2026-02-20T07:14:00Z",
  "recommended_actions": [
    "Force password reset for all @company.ae accounts",
    "Notify affected users",
    "Correlate with recent login anomalies in SIEM",
    "Submit paste URL to VirusTotal and URLScan"
  ]
}

Contact me at contact@malsayegh.ae if you want to discuss implementation or expand coverage to additional sources.

comments powered by Disqus
All rights Reserved for malsayegh.ae
Built with Hugo
Theme Stack designed by Jimmy