Compare commits

...

2 Commits

Author SHA1 Message Date
CyberMind
e7a84f0380
Merge pull request #491 from CyberMind-FR/feature/490-phase-2c-toolbox-receiving-modules-actua
Some checks are pending
License Headers / check (push) Waiting to run
feat(phase-2c): receiving modules enrichment (nDPI-style classify, JA4 fingerprint, providers, avatar)
2026-06-13 08:19:50 +02:00
e67baf4cd7 feat(secubox-core+5modules+toolbox): Phase 2c receiving modules enrichment (ref #490)
Phase 2b (#488/#489) wired mitm addons to 5 receiving modules but events
were persisted raw. This phase implements actual enrichment in each module
via the enrich_hook param, plus aggregator merge so reports show meaningful
data (YouTube/Signal/iPhone/Safari/JA4 fingerprints) instead of raw bytes.

  - host_app.py  : 60+ host patterns -> app + category + emoji
  - cookie.py    : 40+ cookie tracker patterns -> provider + category + emoji
  - avatar.py    : UA + Client Hints -> device + browser + OS + emoji
  - ja4.py       : NEW. Deterministic JA4-style fingerprint hash from
                   cipher_suites + alpn + extensions. Lookup table for
                   known JA4 fingerprints (empty for now, Phase 3 will
                   populate). 12-char hex (SHA256 truncated).

The host_app/cookie/avatar are copies of the secubox-toolbox classifiers
moved to secubox-core so all 5 receiving modules can import them. The
secubox-toolbox-local ones stay as-is to avoid breaking changes — Phase 3
will consolidate.

  - secubox-dpi             : host/SNI -> {app, category, emoji}
  - secubox-cookies         : cookie names -> {providers{}, categories{}}
  - secubox-avatar          : UA + CH -> {device, browser, os_label}
  - secubox-threat-analyst  : ClientHello -> {ja4_fingerprint, known_client}
  - secubox-soc             : indicators -> {total_weight, band, kinds}

Each is ~25 lines, called by mount_ingest_routes BEFORE persistence.
Enriched output joins the raw event under the 'enriched' key.

_pull_mitm_module_events() now also calls _summarize_enriched(kind, events)
to consolidate per-module enrichment :

  - dpi             : top_apps[] aggregated from enriched.app counts
  - cookies         : top_providers[] from enriched.providers + tracker_total
  - avatar          : devices{} + browsers{} from enriched.{device,browser}
  - threat-analyst  : top_fingerprints[] grouped by JA4 hash
  - soc             : total_weight + max_band + indicator_kinds

These appear under mitm_modules.<kind>.enriched_summary in the /report JSON.

POST realistic payloads to all 5 sockets :
  - YouTube host -> dpi/enriched.app = 'YouTube' (streaming)
  - GA + FB Pixel cookies -> cookies/providers : GA x3, FB x1, total 4
  - iPhone Safari UA -> avatar/device='iPhone' (📱 iOS 17.4) + Safari (🧭)
  - facebook.com ClientHello -> threat-analyst/ja4 = '7175ee3a68f0'
  - 2 indicators (weight 15+25) -> soc/band='medium', total=40, kinds=[dga, suspicious]

  - secubox-dpi : call live nDPI/netifyd socket (currently pattern-match only)
  - secubox-threat-analyst : implement full FoxIO JA4 string format (currently
    deterministic SHA256 trunc which is JA4-like but not the canonical format)
  - secubox-soc : threat-intel feed lookup (currently just sums static weights)
  - secubox-avatar : screen/timing fingerprinting via WebGL hash (currently UA only)
  - Reports (PDF + HTML) : surface mitm_modules.enriched_summary in the report UI
2026-06-13 08:00:18 +02:00
10 changed files with 591 additions and 11 deletions

View File

@ -0,0 +1,10 @@
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
# Copyright (c) 2026 CyberMind — Gérald Kerma <devel@cybermind.fr>
"""Shared classifiers used by mitm-ingest enrich_hooks across modules.
- host_app : host/SNI app + category + emoji
- cookie : cookie name provider + category + emoji
- avatar : UA device + browser + os + emoji
- ja4 : TLS ClientHello fingerprint hash
"""
from . import host_app, cookie, avatar, ja4 # noqa: F401

View File

@ -0,0 +1,116 @@
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
# Copyright (c) 2026 CyberMind — Gérald Kerma <devel@cybermind.fr>
"""Avatar analysis : UA + Client Hints → device emoji + readable name."""
from __future__ import annotations
import re
# Devices identification patterns. Order = priority (first match wins).
DEVICE_PATTERNS = [
# ── iPhone ──
(re.compile(r"iPhone\s?OS\s?(\d+_\d+)|iPhone.*OS\s?(\d+_\d+)", re.I),
"iPhone", "📱", "iPhone iOS {}"),
(re.compile(r"iPhone", re.I), "iPhone", "📱", "iPhone"),
# ── iPad ──
(re.compile(r"iPad", re.I), "iPad", "📱", "iPad"),
# ── Mac ──
(re.compile(r"Mac OS X (\d+[._]\d+)", re.I), "Mac", "💻", "macOS {}"),
(re.compile(r"Macintosh", re.I), "Mac", "💻", "Mac"),
# ── Android ──
(re.compile(r"Pixel\s?(\d+)", re.I), "Pixel", "📱", "Pixel {}"),
(re.compile(r"SM-[A-Z]\d+", re.I), "Samsung", "📱", "Samsung"),
(re.compile(r"Android (\d+)", re.I), "Android", "📱", "Android {}"),
(re.compile(r"Android", re.I), "Android", "📱", "Android"),
# ── Windows ──
(re.compile(r"Windows NT 11"), "Windows", "💻", "Windows 11"),
(re.compile(r"Windows NT 10"), "Windows", "💻", "Windows 10"),
(re.compile(r"Windows NT"), "Windows", "💻", "Windows"),
# ── Linux ──
(re.compile(r"Linux", re.I), "Linux", "🐧", "Linux"),
# ── Game / IoT ──
(re.compile(r"PlayStation", re.I), "PlayStation", "🎮", "PlayStation"),
(re.compile(r"Xbox", re.I), "Xbox", "🎮", "Xbox"),
(re.compile(r"Nintendo", re.I), "Nintendo", "🎮", "Nintendo"),
(re.compile(r"AppleTV", re.I), "Apple TV", "📺", "Apple TV"),
(re.compile(r"Roku", re.I), "Roku", "📺", "Roku"),
# ── Bot / known clients ──
(re.compile(r"curl/", re.I), "curl", "🛠", "curl"),
(re.compile(r"wget/", re.I), "wget", "🛠", "wget"),
]
BROWSER_PATTERNS = [
(re.compile(r"Edg/(\d+)"), "Edge", "🪟", "Edge {}"),
(re.compile(r"Chrome/(\d+)"), "Chrome", "🟢", "Chrome {}"),
(re.compile(r"Firefox/(\d+)"), "Firefox","🦊", "Firefox {}"),
(re.compile(r"Safari/(\d+)"), "Safari", "🧭", "Safari"),
(re.compile(r"OPR/(\d+)|Opera/(\d+)"), "Opera", "🔴", "Opera"),
(re.compile(r"DuckDuckGo/(\d+)"), "DuckDuckGo", "🦆", "DuckDuckGo {}"),
]
def classify_user_agent(ua: str) -> dict:
"""Returns {device, device_emoji, os_label, browser, browser_emoji, browser_label, raw}."""
if not ua:
return {"device": "unknown", "device_emoji": "", "os_label": "?",
"browser": "unknown", "browser_emoji": "", "browser_label": "?",
"raw": ""}
device_match = None
device_label = "unknown"
for pattern, label, emoji, template in DEVICE_PATTERNS:
m = pattern.search(ua)
if m:
# Try to fill the template with first non-None group
groups = [g for g in m.groups() if g]
if groups and "{}" in template:
device_label = template.format(groups[0].replace("_", "."))
else:
device_label = template
device_match = {"device": label, "device_emoji": emoji,
"os_label": device_label}
break
if not device_match:
device_match = {"device": "unknown", "device_emoji": "",
"os_label": ua[:50]}
browser_match = None
for pattern, label, emoji, template in BROWSER_PATTERNS:
m = pattern.search(ua)
if m:
groups = [g for g in m.groups() if g]
if groups and "{}" in template:
bl = template.format(groups[0])
else:
bl = template
browser_match = {"browser": label, "browser_emoji": emoji, "browser_label": bl}
break
if not browser_match:
browser_match = {"browser": "unknown", "browser_emoji": "", "browser_label": "?"}
return {**device_match, **browser_match, "raw": ua[:200]}
def analyze_user_agents(ua_set: set[str] | list[str]) -> dict:
"""Aggregate a set of UAs : returns {devices, browsers, most_common, raw_count}."""
if not ua_set:
return {"devices": {}, "browsers": {}, "most_common": None, "raw_count": 0}
devices: dict[str, dict] = {}
browsers: dict[str, dict] = {}
for ua in ua_set:
cls = classify_user_agent(ua)
d = cls["device"]
if d not in devices:
devices[d] = {"count": 0, "emoji": cls["device_emoji"], "os_label": cls["os_label"]}
devices[d]["count"] += 1
b = cls["browser"]
if b not in browsers:
browsers[b] = {"count": 0, "emoji": cls["browser_emoji"], "label": cls["browser_label"]}
browsers[b]["count"] += 1
# Most common device
most_common = max(devices.items(), key=lambda x: x[1]["count"])[0] if devices else None
return {
"devices": devices,
"browsers": browsers,
"most_common": most_common,
"most_common_emoji": devices[most_common]["emoji"] if most_common else "",
"raw_count": len(ua_set),
}

View File

@ -0,0 +1,140 @@
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
# Copyright (c) 2026 CyberMind — Gérald Kerma <devel@cybermind.fr>
"""Cookie analysis : identify trackers + providers + categorize.
Phase 2a+ heuristic: pattern matching sur les noms de cookies bien connus,
mapping vers fournisseur + catégorie (analytics / advertising / social / etc.).
Database extensible pour Phase 3 on chargera depuis cookiepedia ou EasyList.
"""
from __future__ import annotations
import re
# Pattern → (provider, category, emoji)
COOKIE_PATTERNS = [
# ── Analytics ──
(re.compile(r"^_ga(_|$|t)"), "Google Analytics", "analytics", "📊"),
(re.compile(r"^_gid$"), "Google Analytics", "analytics", "📊"),
(re.compile(r"^_gat"), "Google Analytics", "analytics", "📊"),
(re.compile(r"^_gcl_au$"), "Google Ads conversion", "advertising", "💰"),
(re.compile(r"^_pk_(id|ses|cvar)"), "Matomo / Piwik", "analytics", "📊"),
(re.compile(r"^plausible_"), "Plausible", "analytics", "📊"),
(re.compile(r"^_mkto_trk$"), "Marketo", "analytics", "📊"),
(re.compile(r"^__hssc$|^__hstc$"), "HubSpot", "analytics", "📊"),
(re.compile(r"^mp_[a-z0-9]+_mixpanel"), "Mixpanel", "analytics", "📊"),
(re.compile(r"^amplitude_"), "Amplitude", "analytics", "📊"),
(re.compile(r"^optimizelyEndUserId$"), "Optimizely", "analytics", "📊"),
(re.compile(r"^_hjSession"), "Hotjar", "analytics", "📊"),
(re.compile(r"^_hjFirstSeen$"), "Hotjar", "analytics", "📊"),
(re.compile(r"^crisp-client/session/"), "Crisp Chat", "analytics", "💬"),
# ── Advertising / Tracking ──
(re.compile(r"^_fbp$|^fr$"), "Facebook Pixel", "advertising","🎯"),
(re.compile(r"^IDE$"), "Google DoubleClick", "advertising","🎯"),
(re.compile(r"^NID$"), "Google", "advertising","🎯"),
(re.compile(r"^DSID$"), "Google DoubleClick", "advertising","🎯"),
(re.compile(r"^uid$|^bcookie$|^lidc$"), "LinkedIn Insight", "advertising","💼"),
(re.compile(r"^MUID$|^_uetsid$|^_uetvid$"), "Microsoft Clarity / Bing Ads", "advertising", "🎯"),
(re.compile(r"^_pin_unauth$|^_pinterest_ct_"), "Pinterest", "advertising","📌"),
(re.compile(r"^tt_appInfo$|^tt_webid"), "TikTok", "advertising","🎵"),
(re.compile(r"^_ttp$"), "TikTok Pixel", "advertising","🎵"),
(re.compile(r"^ANID$"), "Google", "advertising","🎯"),
(re.compile(r"^__qca$"), "Quantcast", "advertising","🎯"),
(re.compile(r"^__gads$|^__gpi$"), "Google AdSense", "advertising","💰"),
(re.compile(r"^test_cookie$"), "Google", "advertising","🎯"),
# ── Social ──
(re.compile(r"^c_user$|^xs$|^datr$"), "Facebook", "social", "👥"),
(re.compile(r"^sb$|^locale$|^wd$"), "Facebook", "social", "👥"),
(re.compile(r"^twid$|^ct0$|^auth_token$"), "Twitter / X", "social", "👥"),
(re.compile(r"^li_at$"), "LinkedIn", "social", "👥"),
(re.compile(r"^IG_"), "Instagram", "social", "👥"),
# ── Auth / Session (legit, no tracker) ──
(re.compile(r"^session(_id)?$|^sessionid$"), "Session generic", "session", "🔑"),
(re.compile(r"^csrftoken$|^_csrf$"), "CSRF token", "session", "🔒"),
(re.compile(r"^XSRF-TOKEN$"), "XSRF token", "session", "🔒"),
(re.compile(r"^remember_token$"), "Remember-me", "session", "🔑"),
(re.compile(r"^PHPSESSID$"), "PHP session", "session", "🔑"),
(re.compile(r"^JSESSIONID$"), "Java session", "session", "🔑"),
(re.compile(r"^connect\.sid$"), "Express.js session", "session", "🔑"),
# ── CDN / infra ──
(re.compile(r"^__cf_bm$|^cf_clearance$"), "Cloudflare", "infra", ""),
(re.compile(r"^_dd_s$"), "Datadog RUM", "monitoring", "📈"),
]
def classify_cookie_name(name: str) -> dict:
"""Returns {provider, category, emoji} for a single cookie name.
Unknown {provider: 'unknown', category: 'other', emoji: ''}."""
for pattern, provider, category, emoji in COOKIE_PATTERNS:
if pattern.search(name):
return {"provider": provider, "category": category, "emoji": emoji}
return {"provider": "unknown", "category": "other", "emoji": ""}
def parse_cookie_header(header_value: str) -> list[str]:
"""Parse 'Cookie:' or 'Set-Cookie:' value, return list of cookie NAMES."""
if not header_value:
return []
names = []
for part in header_value.split(";"):
if "=" in part:
n = part.split("=", 1)[0].strip()
if n:
names.append(n)
return names
def analyze_cookie_events(cookie_events: list[dict]) -> dict:
"""Aggregate cookie events into stats + per-provider breakdown.
Input : list of {url, set_cookie_count, cookie_count, ...} from local_store
(note : Phase 1.5 stored only counts, not names. Phase 2a+ local_store
should store names. Until then, this function works on whatever's present.)
Returns :
{
providers: {provider: {count, category, emoji}, ...},
categories: {category: count, ...},
unknown_count: int,
}
"""
providers: dict[str, dict] = {}
categories: dict[str, int] = {}
unknown_count = 0
for ev in cookie_events:
# The cookie name might be in `set_cookie_names` or `cookie_names` if Phase 2a+
# local_store. Backward-compat : skip if absent.
for key in ("set_cookie_names", "cookie_names"):
names = ev.get(key, [])
if not isinstance(names, list):
continue
for n in names:
cls = classify_cookie_name(n)
p = cls["provider"]
if p == "unknown":
unknown_count += 1
else:
if p not in providers:
providers[p] = {"count": 0, "category": cls["category"],
"emoji": cls["emoji"]}
providers[p]["count"] += 1
cat = cls["category"]
categories[cat] = categories.get(cat, 0) + 1
return {
"providers": providers,
"categories": categories,
"unknown_count": unknown_count,
}
# Quick lookup for live use in /report endpoints
def top_providers(cookie_events: list[dict], limit: int = 10) -> list[dict]:
"""Returns top providers by hit count : [{provider, count, category, emoji}, ...]"""
stats = analyze_cookie_events(cookie_events)
return sorted(
[{"provider": p, **v} for p, v in stats["providers"].items()],
key=lambda x: -x["count"],
)[:limit]

View File

@ -0,0 +1,84 @@
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
# Copyright (c) 2026 CyberMind — Gérald Kerma <devel@cybermind.fr>
"""JA4 / JA4-like TLS ClientHello fingerprint.
Reference: https://github.com/FoxIO-LLC/ja4 (BSD-3)
Phase 2c implementation : compute a deterministic, JA4-style fingerprint
hash from cipher_suites + alpn_protocols + extensions. The output is
12-char hex (truncated SHA256), suitable for matching against external
JA4 databases (custom curation, not the full FoxIO format).
This is NOT the canonical FoxIO JA4 string. It's a deterministic
fingerprint that's stable per-client-stack, so the same iPhone Safari
will always yield the same hash. We can map known hashes to bots,
trackers, malware C2 in Phase 3.
"""
from __future__ import annotations
import hashlib
def _sort_norm(items: list | None) -> str:
"""Sort + join items as canonical comma-separated lowercase string."""
if not items:
return ""
parts = []
for x in items:
if isinstance(x, bytes):
parts.append(x.hex())
else:
parts.append(str(x).lower())
return ",".join(sorted(parts))
def compute_ja4_hash(
*,
sni: str | None = None,
alpn_protocols: list | None = None,
cipher_suites: list | None = None,
extensions: list | None = None,
transport: str = "t", # 't' for TCP, 'q' for QUIC
tls_version: str = "13", # 13 for TLS 1.3, 12 for TLS 1.2
) -> dict:
"""Compute a JA4-style fingerprint dict.
Returns {
fingerprint : 12-char hex hash,
transport : t/q,
tls_version : 13/12,
alpn_count : int,
cipher_count : int,
ext_count : int,
sni_present : bool,
raw_repr : compact str repr for debug,
}
"""
alpn_str = _sort_norm(alpn_protocols)
cipher_str = _sort_norm(cipher_suites)
ext_str = _sort_norm(extensions)
raw = f"{transport}{tls_version}|alpn={alpn_str}|c={cipher_str}|x={ext_str}"
h = hashlib.sha256(raw.encode("utf-8", errors="ignore")).hexdigest()[:12]
return {
"fingerprint": h,
"transport": transport,
"tls_version": tls_version,
"alpn_count": len(alpn_protocols or []),
"cipher_count": len(cipher_suites or []),
"ext_count": len(extensions or []),
"sni_present": bool(sni),
"raw_repr": raw[:200],
}
# Phase 3-ready : map known JA4 hashes to client tags. Empty for now.
KNOWN_JA4_FINGERPRINTS: dict[str, dict] = {
# "abc123def456": {"label": "iPhone Safari 17.x", "category": "browser", "trust": "high"},
# "deadbeef0000": {"label": "Tor Browser 14.x", "category": "browser-anon", "trust": "medium"},
}
def lookup_ja4(fingerprint: str) -> dict | None:
"""Return known label for a fingerprint, or None if unknown."""
return KNOWN_JA4_FINGERPRINTS.get(fingerprint)

View File

@ -26,13 +26,43 @@ from secubox_core.logger import get_logger
app = FastAPI(title="secubox-avatar", version="1.0.0", root_path="/api/v1/avatar")
# Phase 2b (#488) : ingest mitm avatar fingerprint events from secubox-toolbox addon
# Phase 2b/2c (#488/#490) : ingest mitm avatar events + UA/CH device classification
from secubox_core.mitm_ingest import mount_ingest_routes # noqa: E402
from secubox_core.classifiers import avatar as _avatar_cls # noqa: E402
def _avatar_enrich(event: dict) -> dict:
"""Phase 2c enrichment : UA + Client Hints -> {device, browser, os, emoji}."""
ua = event.get("user_agent") or ""
if not ua:
return event
cls = _avatar_cls.classify_user_agent(ua)
# Augment with Client Hints if present (more reliable than UA spoofing)
chints = event.get("client_hints") or {}
if "sec-ch-ua-platform" in chints:
cls["ch_platform"] = chints["sec-ch-ua-platform"].strip('"')
if "sec-ch-ua-model" in chints:
cls["ch_model"] = chints["sec-ch-ua-model"].strip('"')
event["enriched"] = {
"device": cls.get("device", "unknown"),
"device_emoji": cls.get("device_emoji", ""),
"os_label": cls.get("os_label", "?"),
"browser": cls.get("browser", "unknown"),
"browser_emoji": cls.get("browser_emoji", ""),
"browser_label": cls.get("browser_label", "?"),
"ch_platform": cls.get("ch_platform"),
"ch_model": cls.get("ch_model"),
"source": "secubox-avatar/classifier",
}
return event
mount_ingest_routes(
app,
endpoint_path="/fingerprint",
db_path="/var/lib/secubox/avatar/mitm-ingest.db",
kind="avatar",
enrich_hook=_avatar_enrich,
)
# ══════════════════════════════════════════════════════════════════

View File

@ -28,13 +28,45 @@ except ImportError:
app = FastAPI(title="SecuBox Cookies API", version="1.0.0")
# Phase 2b (#488) : ingest mitm cookies events from secubox-toolbox addon
# Phase 2b/2c (#488/#490) : ingest mitm cookies events + provider classification
from secubox_core.mitm_ingest import mount_ingest_routes # noqa: E402
from secubox_core.classifiers import cookie as _cookie_cls # noqa: E402
def _cookies_enrich(event: dict) -> dict:
"""Phase 2c enrichment : map cookie names -> {providers[], categories{}}."""
set_names = event.get("set_cookie_names", []) or []
sent_names = event.get("cookie_names", []) or []
all_names = list(set_names) + list(sent_names)
if not all_names:
return event
providers: dict[str, dict] = {}
categories: dict[str, int] = {}
for n in all_names:
cls = _cookie_cls.classify_cookie_name(n)
p = cls["provider"]
if p != "unknown":
if p not in providers:
providers[p] = {"count": 0, "category": cls["category"], "emoji": cls["emoji"]}
providers[p]["count"] += 1
cat = cls["category"]
categories[cat] = categories.get(cat, 0) + 1
event["enriched"] = {
"providers": providers,
"categories": categories,
"total_names": len(all_names),
"tracker_count": sum(v["count"] for v in providers.values()),
"source": "secubox-cookies/classifier",
}
return event
mount_ingest_routes(
app,
endpoint_path="/inject",
db_path="/var/lib/secubox/cookies/mitm-ingest.db",
kind="cookies",
enrich_hook=_cookies_enrich,
)
# Configuration paths

View File

@ -23,13 +23,36 @@ import httpx
app = FastAPI(title="secubox-dpi", version="2.0.0", root_path="/api/v1/dpi")
# Phase 2b (#488) : ingest mitm DPI events from secubox-toolbox addon
# Phase 2b/2c (#488/#490) : ingest mitm DPI events + nDPI-style classification
from secubox_core.mitm_ingest import mount_ingest_routes # noqa: E402
from secubox_core.classifiers import host_app as _host_app # noqa: E402
def _dpi_enrich(event: dict) -> dict:
"""Phase 2c enrichment : classify host/SNI -> {app, category, emoji}.
Future Phase 3 : query nDPI/netifyd daemon socket for live classification.
"""
host = event.get("host") or event.get("sni") or ""
if not host:
return event
cls = _host_app.classify_host(host)
event["enriched"] = {
"app": cls["app"],
"category": cls["category"],
"emoji": cls["emoji"],
"source": "secubox-dpi/host_app",
"method": "pattern-match",
}
return event
mount_ingest_routes(
app,
endpoint_path="/classify",
db_path="/var/lib/secubox/dpi/mitm-ingest.db",
kind="dpi",
enrich_hook=_dpi_enrich,
)
# ══════════════════════════════════════════════════════════════════

View File

@ -34,13 +34,42 @@ P2P_SOCKET = "/run/secubox/p2p.sock"
app = FastAPI(title="SecuBox SOC", version="2.0.0")
# Phase 2b (#488) : ingest mitm SOC indicator events from secubox-toolbox addon
# Phase 2b/2c (#488/#490) : ingest mitm SOC events + score aggregation
from secubox_core.mitm_ingest import mount_ingest_routes # noqa: E402
def _soc_enrich(event: dict) -> dict:
"""Phase 2c enrichment : sum indicator weights -> score band.
Future Phase 3 : query threat-intel feeds (CrowdSec/ThreatFox/etc.)
locally instead of just summing static weights.
"""
indicators = event.get("indicators") or []
if not indicators:
return event
total_weight = sum((i.get("weight") or 0) for i in indicators if isinstance(i, dict))
band = "low"
if total_weight >= 50:
band = "high"
elif total_weight >= 20:
band = "medium"
kinds = sorted({i.get("kind", "?") for i in indicators if isinstance(i, dict)})
event["enriched"] = {
"total_weight": total_weight,
"band": band,
"indicator_kinds": kinds,
"indicator_count": len(indicators),
"source": "secubox-soc/scoring",
}
return event
mount_ingest_routes(
app,
endpoint_path="/event",
db_path="/var/lib/secubox/soc/mitm-ingest.db",
kind="soc",
enrich_hook=_soc_enrich,
)
# Data directories

View File

@ -54,13 +54,39 @@ QUEUE_FILE = DATA_DIR / "pending_rules.json"
app = FastAPI(title="SecuBox Threat Analyst", version="1.0.0")
logger = logging.getLogger("secubox.threat-analyst")
# Phase 2b (#488) : ingest mitm JA4 clienthello events from secubox-toolbox addon
# Phase 2b/2c (#488/#490) : ingest mitm JA4 events + compute fingerprint hash
from secubox_core.mitm_ingest import mount_ingest_routes # noqa: E402
from secubox_core.classifiers import ja4 as _ja4_cls # noqa: E402
def _ja4_enrich(event: dict) -> dict:
"""Phase 2c enrichment : compute JA4-style fingerprint + lookup known clients."""
ja4_hash = _ja4_cls.compute_ja4_hash(
sni=event.get("sni"),
alpn_protocols=event.get("alpn_protocols"),
cipher_suites=event.get("cipher_suites"),
extensions=event.get("extensions"),
)
known = _ja4_cls.lookup_ja4(ja4_hash["fingerprint"])
event["enriched"] = {
"ja4_fingerprint": ja4_hash["fingerprint"],
"ja4_raw_repr": ja4_hash["raw_repr"],
"cipher_count": ja4_hash["cipher_count"],
"alpn_count": ja4_hash["alpn_count"],
"ext_count": ja4_hash["ext_count"],
"sni_present": ja4_hash["sni_present"],
"known_client": known, # None if unknown, dict if matched
"source": "secubox-threat-analyst/ja4",
}
return event
mount_ingest_routes(
app,
endpoint_path="/ja4",
db_path="/var/lib/secubox/threat-analyst/mitm-ingest.db",
kind="ja4",
enrich_hook=_ja4_enrich,
)

View File

@ -1852,11 +1852,17 @@ _MITM_MODULES = [
]
def _pull_mitm_module_events(mac_hash: str) -> dict:
def _pull_mitm_module_events(mac_hash: str, limit: int = 50) -> dict:
"""Query each receiving module's GET /mitm-events for this client.
Returns a dict {module: {count, sample_events}} for the report. Errors per
module are non-fatal if a module is down, it just shows count=0.
Returns a dict {module: {count, sample_events, enriched_summary}} for the
report. Errors per module are non-fatal if a module is down, it just
shows count=0.
Phase 2c (#490) : also build an enriched_summary per module aggregating
the enrich_hook output (top apps from dpi, top providers from cookies,
devices from avatar, JA4 fingerprints from threat-analyst, score band
from soc).
"""
import socket as _sock
import urllib.parse as _up
@ -1872,15 +1878,17 @@ def _pull_mitm_module_events(mac_hash: str) -> dict:
self.sock.connect(sock_path)
conn = UDSConnection("localhost", timeout=2)
qs = _up.urlencode({"mac_hash": mac_hash, "limit": 20})
qs = _up.urlencode({"mac_hash": mac_hash, "limit": limit})
conn.request("GET", f"/mitm-events?{qs}")
resp = conn.getresponse()
if resp.status == 200:
import json as _json
data = _json.loads(resp.read().decode("utf-8", errors="ignore")[:50000])
data = _json.loads(resp.read().decode("utf-8", errors="ignore")[:200000])
events = data.get("events", [])
out[kind] = {
"count": data.get("count", 0),
"sample": data.get("events", [])[:5],
"sample": events[:5],
"enriched_summary": _summarize_enriched(kind, events),
}
else:
out[kind] = {"count": 0, "error": f"HTTP {resp.status}"}
@ -1892,6 +1900,88 @@ def _pull_mitm_module_events(mac_hash: str) -> dict:
return out
def _summarize_enriched(kind: str, events: list[dict]) -> dict:
"""Phase 2c (#490) : per-module aggregation of enrich_hook output.
Each receiving module attaches its enrich_hook result under 'enriched'
inside the event payload. This function consolidates them into a
compact summary suitable for the /report display.
"""
if not events:
return {}
if kind == "dpi":
apps: dict[str, dict] = {}
for ev in events:
e = (ev.get("payload") or {}).get("enriched") or {}
app = e.get("app")
if not app or app == "?":
continue
if app not in apps:
apps[app] = {"count": 0, "category": e.get("category"), "emoji": e.get("emoji")}
apps[app]["count"] += 1
top = sorted([{"app": k, **v} for k, v in apps.items()], key=lambda x: -x["count"])[:15]
return {"top_apps": top, "classified_events": sum(v["count"] for v in apps.values())}
if kind == "cookies":
providers: dict[str, dict] = {}
total_trackers = 0
for ev in events:
e = (ev.get("payload") or {}).get("enriched") or {}
for p, info in (e.get("providers") or {}).items():
if p not in providers:
providers[p] = {"count": 0, "category": info.get("category"), "emoji": info.get("emoji")}
providers[p]["count"] += info.get("count", 1)
total_trackers += info.get("count", 1)
top = sorted([{"provider": k, **v} for k, v in providers.items()], key=lambda x: -x["count"])[:10]
return {"top_providers": top, "tracker_total": total_trackers}
if kind == "avatar":
devices: dict[str, dict] = {}
browsers: dict[str, dict] = {}
for ev in events:
e = (ev.get("payload") or {}).get("enriched") or {}
d = e.get("device")
if d and d != "unknown":
if d not in devices:
devices[d] = {"count": 0, "emoji": e.get("device_emoji"), "os_label": e.get("os_label")}
devices[d]["count"] += 1
b = e.get("browser")
if b and b != "unknown":
if b not in browsers:
browsers[b] = {"count": 0, "emoji": e.get("browser_emoji"), "label": e.get("browser_label")}
browsers[b]["count"] += 1
return {"devices": devices, "browsers": browsers}
if kind == "threat-analyst":
fps: dict[str, dict] = {}
for ev in events:
e = (ev.get("payload") or {}).get("enriched") or {}
fp = e.get("ja4_fingerprint")
if not fp:
continue
if fp not in fps:
fps[fp] = {
"count": 0,
"known_client": e.get("known_client"),
"raw_repr": e.get("ja4_raw_repr"),
}
fps[fp]["count"] += 1
top = sorted([{"fingerprint": k, **v} for k, v in fps.items()], key=lambda x: -x["count"])[:10]
return {"top_fingerprints": top, "unique_count": len(fps)}
if kind == "soc":
total_w = 0
kinds_seen: dict[str, int] = {}
max_band = "low"
band_order = ["low", "medium", "high"]
for ev in events:
e = (ev.get("payload") or {}).get("enriched") or {}
total_w += e.get("total_weight") or 0
for k in e.get("indicator_kinds") or []:
kinds_seen[k] = kinds_seen.get(k, 0) + 1
b = e.get("band") or "low"
if band_order.index(b) > band_order.index(max_band):
max_band = b
return {"total_weight": total_w, "max_band": max_band, "indicator_kinds": kinds_seen}
return {}
def _enrich_with_geo(matches: list[dict]) -> list[dict]:
"""Add geo info to threat_intel matches."""
out = []