Featured image of post SIEM Alert Tuning Automation

SIEM Alert Tuning Automation

Automated pipeline for analysing SIEM alert fidelity, identifying false positive patterns, generating suppression recommendations, and tracking detection health over time.

Overview

A poorly tuned SIEM is worse than no SIEM. When 95% of alerts are false positives, analysts stop investigating — the real threats hide in the noise. Alert tuning is one of the highest-leverage investments a SOC can make, but it’s traditionally done manually, slowly, and inconsistently.

This project automates the full tuning cycle:

  1. Collect alert metadata from the SIEM over a rolling window
  2. Calculate fidelity metrics per rule (false positive rate, escalation rate, closure reason)
  3. Identify suppression candidates and threshold adjustment recommendations
  4. Generate a tuning report with specific, actionable changes
  5. Track rule health over time as a time series

Key Metrics

Metric Definition Threshold
False Positive Rate Closed as “not an incident” / total alerts > 80% = review
Escalation Rate Escalated to incident / total alerts < 5% = review
MTTR per rule Avg time to close alerts from this rule > 4 hours = review
Alert Volume Alerts per day from this rule > 50/day = review
Analyst Sentiment Avg analyst rating (1–5) < 2.5 = review

Replication Guide

Step 1 — Install dependencies

1
pip install requests pandas matplotlib jinja2 openpyxl splunk-sdk

Step 2 — Pull alert data from Splunk

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import splunklib.client as client
import splunklib.results as results
import pandas as pd

def fetch_alert_data(splunk_host, splunk_port, username, password, days_back=30):
    service = client.connect(
        host=splunk_host, port=splunk_port,
        username=username, password=password
    )

    query = f"""
    search index=notable
    | eval closed_as_fp=if(status="closed" AND resolution="false_positive", 1, 0)
    | eval escalated=if(status="escalated" OR status="in_progress", 1, 0)
    | eval close_time_hours=round((close_time - create_time)/3600, 2)
    | stats
        count as total_alerts,
        sum(closed_as_fp) as false_positives,
        sum(escalated) as escalations,
        avg(close_time_hours) as avg_close_hours,
        avg(analyst_rating) as avg_rating
      by rule_name
    | eval fp_rate=round(false_positives/total_alerts*100, 1)
    | eval escalation_rate=round(escalations/total_alerts*100, 1)
    | sort -total_alerts
    | head 200
    """

    job = service.jobs.oneshot(query, earliest_time=f"-{days_back}d", latest_time="now")
    reader = results.JSONResultsReader(job)

    rows = [result for result in reader if isinstance(result, dict)]
    return pd.DataFrame(rows)

Step 3 — Score each rule

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def score_rule(row):
    """
    Score: 0 = perfect, 100 = needs immediate tuning.
    """
    score = 0

    fp_rate = float(row.get('fp_rate', 0))
    esc_rate = float(row.get('escalation_rate', 0))
    volume = float(row.get('total_alerts', 0))
    avg_close = float(row.get('avg_close_hours', 0))
    avg_rating = float(row.get('avg_rating', 3))

    # High FP rate is the primary signal
    if fp_rate > 90: score += 40
    elif fp_rate > 80: score += 30
    elif fp_rate > 60: score += 20

    # Low escalation rate compounds FP concerns
    if esc_rate < 2: score += 20
    elif esc_rate < 5: score += 10

    # High volume with low escalation = noisy rule
    daily_volume = volume / 30
    if daily_volume > 100 and esc_rate < 5: score += 20
    elif daily_volume > 50: score += 10

    # Slow close time = analysts ignoring the alert
    if avg_close > 8: score += 10
    elif avg_close > 4: score += 5

    # Low analyst rating
    if avg_rating < 2: score += 10
    elif avg_rating < 3: score += 5

    return min(score, 100)

def classify_action(score, fp_rate, volume):
    if score >= 70 or fp_rate > 95:
        return 'DISABLE'
    elif score >= 50 or fp_rate > 80:
        return 'TUNE_THRESHOLD'
    elif score >= 30:
        return 'ADD_SUPPRESSION'
    else:
        return 'MONITOR'

Step 4 — Identify suppression patterns

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def find_suppression_candidates(splunk_service, rule_name, days_back=30):
    """
    For rules with high FP rates, find common field values in closed FP alerts.
    These become candidates for suppression filters.
    """
    query = f"""
    search index=notable rule_name="{rule_name}" status=closed resolution=false_positive
    | stats count by src_user, src_ip, dest_ip, dest_port, process_name
    | sort -count
    | head 20
    """
    job = splunk_service.jobs.oneshot(query, earliest_time=f"-{days_back}d")
    reader = results.JSONResultsReader(job)

    candidates = []
    total_fps = None

    for result in reader:
        if isinstance(result, dict):
            candidates.append(result)
            if total_fps is None:
                total_fps = int(result.get('count', 0))

    # Only suggest suppression if a single value accounts for > 20% of FPs
    suppression_rules = []
    for c in candidates:
        count = int(c.get('count', 0))
        if total_fps and count / total_fps > 0.20:
            for field in ['src_user', 'src_ip', 'process_name']:
                if c.get(field) and c[field] != 'unknown':
                    suppression_rules.append({
                        'field': field,
                        'value': c[field],
                        'fp_coverage': f"{count/total_fps*100:.1f}%",
                        'count': count
                    })

    return suppression_rules

Step 5 — Generate tuning report

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from jinja2 import Template
import datetime

REPORT_TEMPLATE = """
# SIEM Alert Tuning Report
**Generated:** {{ generated_at }}
**Period:** Last {{ days_back }} days
**Total rules analysed:** {{ total_rules }}

---

## Executive Summary

| Category | Count |
|---|---|
| Rules to DISABLE | {{ disable_count }} |
| Rules to TUNE | {{ tune_count }} |
| Rules to SUPPRESS | {{ suppress_count }} |
| Healthy rules | {{ healthy_count }} |

Estimated alert volume reduction if all recommendations are applied: **{{ volume_reduction }}%**

---

## Rules Requiring Immediate Action

{% for rule in critical_rules %}
### {{ rule.rule_name }}
- **FP Rate:** {{ rule.fp_rate }}%
- **Daily Volume:** {{ rule.daily_volume }}
- **Escalation Rate:** {{ rule.escalation_rate }}%
- **Recommended Action:** {{ rule.action }}

{% if rule.suppression_candidates %}
**Suggested Suppression Filters:**
{% for s in rule.suppression_candidates %}
- `{{ s.field }} = "{{ s.value }}"` — covers {{ s.fp_coverage }} of FPs
{% endfor %}
{% endif %}
---
{% endfor %}
"""

def generate_report(df, suppression_map, days_back=30):
    df['tuning_score'] = df.apply(score_rule, axis=1)
    df['action'] = df.apply(
        lambda r: classify_action(r['tuning_score'], float(r['fp_rate']), float(r['total_alerts'])),
        axis=1
    )
    df['daily_volume'] = (df['total_alerts'].astype(float) / days_back).round(1)

    critical = df[df['action'].isin(['DISABLE', 'TUNE_THRESHOLD'])].to_dict('records')
    for rule in critical:
        rule['suppression_candidates'] = suppression_map.get(rule['rule_name'], [])

    volume_reduction = int(
        df[df['action'] == 'DISABLE']['total_alerts'].astype(float).sum()
        / df['total_alerts'].astype(float).sum() * 100
    )

    template = Template(REPORT_TEMPLATE)
    return template.render(
        generated_at=datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC'),
        days_back=days_back,
        total_rules=len(df),
        disable_count=len(df[df['action'] == 'DISABLE']),
        tune_count=len(df[df['action'] == 'TUNE_THRESHOLD']),
        suppress_count=len(df[df['action'] == 'ADD_SUPPRESSION']),
        healthy_count=len(df[df['action'] == 'MONITOR']),
        volume_reduction=volume_reduction,
        critical_rules=critical
    )

Step 6 — Track rule health over time

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import sqlite3

def record_rule_health_snapshot(df, db_path='rule_health.db'):
    """Store a daily snapshot to track improvement over time."""
    conn = sqlite3.connect(db_path)
    conn.execute('''
        CREATE TABLE IF NOT EXISTS rule_health (
            snapshot_date TEXT,
            rule_name TEXT,
            total_alerts INTEGER,
            fp_rate REAL,
            escalation_rate REAL,
            tuning_score INTEGER,
            action TEXT
        )
    ''')

    today = datetime.date.today().isoformat()
    for _, row in df.iterrows():
        conn.execute('''
            INSERT INTO rule_health VALUES (?, ?, ?, ?, ?, ?, ?)
        ''', (
            today,
            row['rule_name'],
            int(float(row['total_alerts'])),
            float(row['fp_rate']),
            float(row['escalation_rate']),
            int(row['tuning_score']),
            row['action']
        ))

    conn.commit()
    conn.close()

Step 7 — Schedule and distribute

1
2
3
4
# Run every Monday morning, send report to SOC team
0 7 * * 1 /usr/bin/python3 /opt/siem-tuning/main.py \
  --output /reports/tuning_$(date +\%Y-\%W).md \
  --email soc-team@company.ae

Example Report Output

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
## Rules Requiring Immediate Action

### Brute Force Login Detected
- FP Rate: 94.2%
- Daily Volume: 312 alerts
- Escalation Rate: 1.1%
- Recommended Action: TUNE_THRESHOLD

Suggested Suppression Filters:
- src_user = "svc_backup" — covers 67.3% of FPs
- src_ip = "10.10.5.22" — covers 18.4% of FPs

### Outbound Connection to Rare Country
- FP Rate: 88.7%
- Daily Volume: 47 alerts
- Escalation Rate: 3.2%
- Recommended Action: ADD_SUPPRESSION

Suggested Suppression Filters:
- dest_ip = "151.101.0.0/16" (Fastly CDN) — covers 52.1% of FPs

Contact me at contact@malsayegh.ae to discuss building an alert tuning programme for your SIEM.

comments powered by Disqus
All rights Reserved for malsayegh.ae
Built with Hugo
Theme Stack designed by Jimmy