Featured image of post Phishing Analysis Automation

Phishing Analysis Automation

Automated pipeline for analysing reported phishing emails — header parsing, URL detonation, attachment sandboxing, and verdict delivery back to the analyst and ticketing system.

Overview

Phishing is the most common initial access vector, and manual analysis is the biggest bottleneck in the response process. An analyst receiving a phishing report has to:

  • Parse email headers to trace routing
  • Extract all URLs and check reputation
  • Extract attachments and check hashes
  • Detonate suspicious files in a sandbox
  • Write up a verdict and update the ticket

This project automates every step. A reported email goes in, a fully analysed verdict comes out — in under 60 seconds.


Architecture

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
User Reports Email
┌───────────────────┐
│  Email Ingestion  │  ← Mailbox polling (IMAP) or SOAR trigger
└────────┬──────────┘
┌───────────────────────────────────┐
│         Parsing Layer             │
│  • Header extraction              │
│  • URL extraction (regex + BeautifulSoup) │
│  • Attachment extraction          │
└────────┬──────────────────────────┘
┌───────────────────────────────────┐
│        Enrichment Layer           │
│  • URL reputation (VirusTotal)    │
│  • Hash lookup (VirusTotal)       │
│  • Domain WHOIS & age check       │
│  • Screenshot capture (URLScan)   │
└────────┬──────────────────────────┘
┌───────────────────────────────────┐
│        Sandbox Layer              │
│  • File detonation (Any.run / Cuckoo) │
│  • Behaviour analysis             │
│  • Network IOC extraction         │
└────────┬──────────────────────────┘
┌───────────────────────────────────┐
│         Verdict Engine            │
│  • Score aggregation              │
│  • Malicious / Suspicious / Clean │
└────────┬──────────────────────────┘
┌─────────────────────────────────────────────┐
│              Output                         │
│  ServiceNow ticket updated with full report │
│  Analyst notified via Teams                 │
│  IOCs pushed to MISP if malicious           │
└─────────────────────────────────────────────┘

Tech Stack

Component Tool
Language Python 3.11
Email parsing imaplib, email, extract-msg
URL extraction beautifulsoup4, re
Reputation VirusTotal API v3, URLScan.io
Sandboxing Any.run API / Cuckoo REST API
WHOIS python-whois
Ticketing ServiceNow REST API
Notifications Microsoft Teams Webhook

Replication Guide

Step 1 — Install dependencies

1
pip install imaplib2 beautifulsoup4 vt-py python-whois requests extract-msg

Step 2 — Connect to the reporting mailbox

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import imaplib
import email

def fetch_reported_emails(host, user, password, folder='INBOX'):
    mail = imaplib.IMAP4_SSL(host)
    mail.login(user, password)
    mail.select(folder)

    _, message_ids = mail.search(None, 'UNSEEN')
    emails = []

    for msg_id in message_ids[0].split():
        _, data = mail.fetch(msg_id, '(RFC822)')
        raw = data[0][1]
        msg = email.message_from_bytes(raw)
        emails.append(msg)
        mail.store(msg_id, '+FLAGS', '\\Seen')

    mail.logout()
    return emails

Step 3 — Parse headers

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def parse_headers(msg):
    headers = {
        'from':       msg.get('From'),
        'reply_to':   msg.get('Reply-To'),
        'return_path':msg.get('Return-Path'),
        'x_originating_ip': msg.get('X-Originating-IP'),
        'received':   msg.get_all('Received'),
        'spf':        msg.get('Received-SPF'),
        'dkim':       msg.get('DKIM-Signature'),
        'dmarc':      msg.get('Authentication-Results'),
        'subject':    msg.get('Subject'),
        'date':       msg.get('Date'),
    }
    # Flag mismatches — common phishing indicator
    headers['from_reply_mismatch'] = (
        headers['from'] != headers['reply_to']
        and headers['reply_to'] is not None
    )
    return headers

Step 4 — Extract URLs and attachments

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import re
from bs4 import BeautifulSoup
import hashlib

def extract_urls(msg):
    urls = set()
    for part in msg.walk():
        content_type = part.get_content_type()
        if content_type == 'text/html':
            soup = BeautifulSoup(part.get_payload(decode=True), 'html.parser')
            for tag in soup.find_all('a', href=True):
                urls.add(tag['href'])
        elif content_type == 'text/plain':
            text = part.get_payload(decode=True).decode(errors='ignore')
            urls.update(re.findall(r'https?://[^\s<>"]+', text))
    return list(urls)

def extract_attachments(msg):
    attachments = []
    for part in msg.walk():
        if part.get_content_disposition() == 'attachment':
            filename = part.get_filename()
            data = part.get_payload(decode=True)
            attachments.append({
                'filename': filename,
                'data': data,
                'md5': hashlib.md5(data).hexdigest(),
                'sha256': hashlib.sha256(data).hexdigest(),
                'size': len(data)
            })
    return attachments

Step 5 — Enrich URLs via VirusTotal and URLScan

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import vt
import requests

def check_url_virustotal(url, api_key):
    client = vt.Client(api_key)
    url_id = vt.url_id(url)
    try:
        obj = client.get_object(f"/urls/{url_id}")
        stats = obj.last_analysis_stats
        score = stats.get('malicious', 0)
        return {'url': url, 'vt_malicious': score, 'vt_total': sum(stats.values())}
    except Exception:
        return {'url': url, 'vt_malicious': 0, 'vt_error': True}
    finally:
        client.close()

def screenshot_urlscan(url, api_key):
    headers = {'API-Key': api_key, 'Content-Type': 'application/json'}
    response = requests.post(
        'https://urlscan.io/api/v1/scan/',
        headers=headers,
        json={'url': url, 'visibility': 'private'}
    )
    if response.status_code == 200:
        return response.json().get('result')
    return None

Step 6 — Check domain age (newly registered = high risk)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import whois
from datetime import datetime, timezone

def check_domain_age(domain):
    try:
        w = whois.whois(domain)
        creation = w.creation_date
        if isinstance(creation, list):
            creation = creation[0]
        age_days = (datetime.now(timezone.utc) - creation.replace(tzinfo=timezone.utc)).days
        return age_days
    except Exception:
        return None

Step 7 — Build the verdict

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def calculate_verdict(analysis):
    score = 0

    # Header checks
    if analysis['headers']['from_reply_mismatch']:
        score += 20
    if not analysis['headers']['spf'] or 'fail' in str(analysis['headers']['spf']).lower():
        score += 15
    if not analysis['headers']['dkim']:
        score += 10

    # URL checks
    for url_result in analysis['urls']:
        if url_result.get('vt_malicious', 0) > 3:
            score += 30
        if url_result.get('domain_age_days') and url_result['domain_age_days'] < 30:
            score += 20

    # Attachment checks
    for att in analysis['attachments']:
        if att.get('vt_malicious', 0) > 5:
            score += 40

    if score >= 60:
        return 'MALICIOUS', score
    elif score >= 30:
        return 'SUSPICIOUS', score
    else:
        return 'CLEAN', score

Step 8 — Update ServiceNow ticket

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def update_servicenow_ticket(ticket_id, analysis, verdict, score, sn_url, sn_user, sn_pass):
    notes = f"""
**Automated Phishing Analysis Report**

**Verdict:** {verdict} (Score: {score}/100)

**Headers:**
- From/Reply-To mismatch: {analysis['headers']['from_reply_mismatch']}
- SPF: {analysis['headers']['spf']}
- DKIM present: {bool(analysis['headers']['dkim'])}

**URLs found:** {len(analysis['urls'])}
{chr(10).join([f"- {u['url']} → VT malicious: {u.get('vt_malicious', 0)}" for u in analysis['urls']])}

**Attachments:** {len(analysis['attachments'])}
{chr(10).join([f"- {a['filename']} (SHA256: {a['sha256']}) → VT malicious: {a.get('vt_malicious', 0)}" for a in analysis['attachments']])}
"""
    requests.patch(
        f"{sn_url}/api/now/table/incident/{ticket_id}",
        auth=(sn_user, sn_pass),
        json={'work_notes': notes, 'state': '2'}
    )

Example Verdict Output

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
  "verdict": "MALICIOUS",
  "score": 85,
  "headers": {
    "from": "security@paypa1-verify.com",
    "reply_to": "collect@malicious-domain.ru",
    "from_reply_mismatch": true,
    "spf": "fail",
    "dkim": null
  },
  "urls": [
    {
      "url": "http://paypa1-verify.com/login",
      "vt_malicious": 18,
      "vt_total": 90,
      "domain_age_days": 3
    }
  ],
  "attachments": [
    {
      "filename": "invoice_2024.exe",
      "sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
      "vt_malicious": 61
    }
  ],
  "analysis_time_seconds": 47
}

Contact me at contact@malsayegh.ae to discuss adapting this pipeline for your environment.

comments powered by Disqus
All rights Reserved for malsayegh.ae
Built with Hugo
Theme Stack designed by Jimmy