Overview
Phishing is the most common initial access vector, and manual analysis is the biggest bottleneck in the response process. An analyst receiving a phishing report has to:
- Parse email headers to trace routing
- Extract all URLs and check reputation
- Extract attachments and check hashes
- Detonate suspicious files in a sandbox
- Write up a verdict and update the ticket
This project automates every step. A reported email goes in, a fully analysed verdict comes out — in under 60 seconds.
Architecture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
User Reports Email
│
▼
┌───────────────────┐
│ Email Ingestion │ ← Mailbox polling (IMAP) or SOAR trigger
└────────┬──────────┘
│
▼
┌───────────────────────────────────┐
│ Parsing Layer │
│ • Header extraction │
│ • URL extraction (regex + BeautifulSoup) │
│ • Attachment extraction │
└────────┬──────────────────────────┘
│
▼
┌───────────────────────────────────┐
│ Enrichment Layer │
│ • URL reputation (VirusTotal) │
│ • Hash lookup (VirusTotal) │
│ • Domain WHOIS & age check │
│ • Screenshot capture (URLScan) │
└────────┬──────────────────────────┘
│
▼
┌───────────────────────────────────┐
│ Sandbox Layer │
│ • File detonation (Any.run / Cuckoo) │
│ • Behaviour analysis │
│ • Network IOC extraction │
└────────┬──────────────────────────┘
│
▼
┌───────────────────────────────────┐
│ Verdict Engine │
│ • Score aggregation │
│ • Malicious / Suspicious / Clean │
└────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ Output │
│ ServiceNow ticket updated with full report │
│ Analyst notified via Teams │
│ IOCs pushed to MISP if malicious │
└─────────────────────────────────────────────┘
|
Tech Stack
| Component |
Tool |
| Language |
Python 3.11 |
| Email parsing |
imaplib, email, extract-msg |
| URL extraction |
beautifulsoup4, re |
| Reputation |
VirusTotal API v3, URLScan.io |
| Sandboxing |
Any.run API / Cuckoo REST API |
| WHOIS |
python-whois |
| Ticketing |
ServiceNow REST API |
| Notifications |
Microsoft Teams Webhook |
Replication Guide
Step 1 — Install dependencies
1
|
pip install imaplib2 beautifulsoup4 vt-py python-whois requests extract-msg
|
Step 2 — Connect to the reporting mailbox
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
import imaplib
import email
def fetch_reported_emails(host, user, password, folder='INBOX'):
mail = imaplib.IMAP4_SSL(host)
mail.login(user, password)
mail.select(folder)
_, message_ids = mail.search(None, 'UNSEEN')
emails = []
for msg_id in message_ids[0].split():
_, data = mail.fetch(msg_id, '(RFC822)')
raw = data[0][1]
msg = email.message_from_bytes(raw)
emails.append(msg)
mail.store(msg_id, '+FLAGS', '\\Seen')
mail.logout()
return emails
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
def parse_headers(msg):
headers = {
'from': msg.get('From'),
'reply_to': msg.get('Reply-To'),
'return_path':msg.get('Return-Path'),
'x_originating_ip': msg.get('X-Originating-IP'),
'received': msg.get_all('Received'),
'spf': msg.get('Received-SPF'),
'dkim': msg.get('DKIM-Signature'),
'dmarc': msg.get('Authentication-Results'),
'subject': msg.get('Subject'),
'date': msg.get('Date'),
}
# Flag mismatches — common phishing indicator
headers['from_reply_mismatch'] = (
headers['from'] != headers['reply_to']
and headers['reply_to'] is not None
)
return headers
|
Step 4 — Extract URLs and attachments
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
import re
from bs4 import BeautifulSoup
import hashlib
def extract_urls(msg):
urls = set()
for part in msg.walk():
content_type = part.get_content_type()
if content_type == 'text/html':
soup = BeautifulSoup(part.get_payload(decode=True), 'html.parser')
for tag in soup.find_all('a', href=True):
urls.add(tag['href'])
elif content_type == 'text/plain':
text = part.get_payload(decode=True).decode(errors='ignore')
urls.update(re.findall(r'https?://[^\s<>"]+', text))
return list(urls)
def extract_attachments(msg):
attachments = []
for part in msg.walk():
if part.get_content_disposition() == 'attachment':
filename = part.get_filename()
data = part.get_payload(decode=True)
attachments.append({
'filename': filename,
'data': data,
'md5': hashlib.md5(data).hexdigest(),
'sha256': hashlib.sha256(data).hexdigest(),
'size': len(data)
})
return attachments
|
Step 5 — Enrich URLs via VirusTotal and URLScan
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import vt
import requests
def check_url_virustotal(url, api_key):
client = vt.Client(api_key)
url_id = vt.url_id(url)
try:
obj = client.get_object(f"/urls/{url_id}")
stats = obj.last_analysis_stats
score = stats.get('malicious', 0)
return {'url': url, 'vt_malicious': score, 'vt_total': sum(stats.values())}
except Exception:
return {'url': url, 'vt_malicious': 0, 'vt_error': True}
finally:
client.close()
def screenshot_urlscan(url, api_key):
headers = {'API-Key': api_key, 'Content-Type': 'application/json'}
response = requests.post(
'https://urlscan.io/api/v1/scan/',
headers=headers,
json={'url': url, 'visibility': 'private'}
)
if response.status_code == 200:
return response.json().get('result')
return None
|
Step 6 — Check domain age (newly registered = high risk)
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import whois
from datetime import datetime, timezone
def check_domain_age(domain):
try:
w = whois.whois(domain)
creation = w.creation_date
if isinstance(creation, list):
creation = creation[0]
age_days = (datetime.now(timezone.utc) - creation.replace(tzinfo=timezone.utc)).days
return age_days
except Exception:
return None
|
Step 7 — Build the verdict
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
def calculate_verdict(analysis):
score = 0
# Header checks
if analysis['headers']['from_reply_mismatch']:
score += 20
if not analysis['headers']['spf'] or 'fail' in str(analysis['headers']['spf']).lower():
score += 15
if not analysis['headers']['dkim']:
score += 10
# URL checks
for url_result in analysis['urls']:
if url_result.get('vt_malicious', 0) > 3:
score += 30
if url_result.get('domain_age_days') and url_result['domain_age_days'] < 30:
score += 20
# Attachment checks
for att in analysis['attachments']:
if att.get('vt_malicious', 0) > 5:
score += 40
if score >= 60:
return 'MALICIOUS', score
elif score >= 30:
return 'SUSPICIOUS', score
else:
return 'CLEAN', score
|
Step 8 — Update ServiceNow ticket
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
def update_servicenow_ticket(ticket_id, analysis, verdict, score, sn_url, sn_user, sn_pass):
notes = f"""
**Automated Phishing Analysis Report**
**Verdict:** {verdict} (Score: {score}/100)
**Headers:**
- From/Reply-To mismatch: {analysis['headers']['from_reply_mismatch']}
- SPF: {analysis['headers']['spf']}
- DKIM present: {bool(analysis['headers']['dkim'])}
**URLs found:** {len(analysis['urls'])}
{chr(10).join([f"- {u['url']} → VT malicious: {u.get('vt_malicious', 0)}" for u in analysis['urls']])}
**Attachments:** {len(analysis['attachments'])}
{chr(10).join([f"- {a['filename']} (SHA256: {a['sha256']}) → VT malicious: {a.get('vt_malicious', 0)}" for a in analysis['attachments']])}
"""
requests.patch(
f"{sn_url}/api/now/table/incident/{ticket_id}",
auth=(sn_user, sn_pass),
json={'work_notes': notes, 'state': '2'}
)
|
Example Verdict Output
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
{
"verdict": "MALICIOUS",
"score": 85,
"headers": {
"from": "security@paypa1-verify.com",
"reply_to": "collect@malicious-domain.ru",
"from_reply_mismatch": true,
"spf": "fail",
"dkim": null
},
"urls": [
{
"url": "http://paypa1-verify.com/login",
"vt_malicious": 18,
"vt_total": 90,
"domain_age_days": 3
}
],
"attachments": [
{
"filename": "invoice_2024.exe",
"sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
"vt_malicious": 61
}
],
"analysis_time_seconds": 47
}
|
Contact me at contact@malsayegh.ae to discuss adapting this pipeline for your environment.