1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import splunklib.client as client
import splunklib.results as results
import pandas as pd
def fetch_alert_data(splunk_host, splunk_port, username, password, days_back=30):
service = client.connect(
host=splunk_host, port=splunk_port,
username=username, password=password
)
query = f"""
search index=notable
| eval closed_as_fp=if(status="closed" AND resolution="false_positive", 1, 0)
| eval escalated=if(status="escalated" OR status="in_progress", 1, 0)
| eval close_time_hours=round((close_time - create_time)/3600, 2)
| stats
count as total_alerts,
sum(closed_as_fp) as false_positives,
sum(escalated) as escalations,
avg(close_time_hours) as avg_close_hours,
avg(analyst_rating) as avg_rating
by rule_name
| eval fp_rate=round(false_positives/total_alerts*100, 1)
| eval escalation_rate=round(escalations/total_alerts*100, 1)
| sort -total_alerts
| head 200
"""
job = service.jobs.oneshot(query, earliest_time=f"-{days_back}d", latest_time="now")
reader = results.JSONResultsReader(job)
rows = [result for result in reader if isinstance(result, dict)]
return pd.DataFrame(rows)
|