Compare commits

...

1 Commits

Author SHA1 Message Date
55626e510b wip(toolbox): Phase 11.C backend schema + GeoIP fold + evidence helper (ref #508)
Checkpoint — backend evidence layer scaffolded.  Frontend wire + PDF
generator + addon consent probe land in follow-up commits.

  - social.py schema : social_edges.consent_state column ; social_nodes
    gains country_iso / asn_org / eu_inside / pre_consent_hits.
  - Idempotent _migrate_phase11c() ALTERs the pre-existing tables on
    2.6.0 → 2.6.x upgrades (no destructive recreate).
  - _EU_EEA_ISO whitelist (27 EU + 3 EFTA + UK adequacy) + is_eu_iso().
  - _geo_for() LRU-cached (4096) wrapper around the existing geo
    module ; fold time populates the GeoIP fields on every node row.
  - record_edge() accepts consent_state (default 'none_seen') ; fold
    accumulates pre_consent_hits into the per-node aggregate.
  - evidence(mac_hash) helper : returns the two legal-grade buckets
    (pre_consent + extra_eu) consumed by the PDF in the next commit.

Pivoting to admin tab routing per user request — Phase 11.C resumes
after that lands.
2026-06-10 08:16:38 +02:00

View File

@ -69,7 +69,12 @@ CREATE TABLE IF NOT EXISTS social_edges (
src_site TEXT NOT NULL,
tracker_domain TEXT NOT NULL,
cookie_id_hash TEXT NOT NULL,
ja4_hash TEXT
ja4_hash TEXT,
-- Phase 11.C (#508) — consent state at the moment the edge was
-- recorded. Computed by the addon based on whether a consent
-- platform cookie (OneTrust/Didomi/Quantcast/Sourcepoint) has
-- already been observed for this peer × site pair.
consent_state TEXT NOT NULL DEFAULT 'none_seen'
);
CREATE INDEX IF NOT EXISTS idx_social_edges_mac_ts
ON social_edges(client_mac_hash, ts);
@ -83,6 +88,16 @@ CREATE TABLE IF NOT EXISTS social_nodes (
first_seen INTEGER NOT NULL,
last_seen INTEGER NOT NULL,
sites_jsonl TEXT NOT NULL DEFAULT '[]',
-- Phase 11.C (#508) — GeoIP-derived metadata populated at fold
-- time so reads + PDF rendering don't have to do per-row mmdb
-- lookups. eu_inside is 1 when country_iso EU/EEA whitelist.
country_iso TEXT,
asn_org TEXT,
eu_inside INTEGER NOT NULL DEFAULT 1,
-- Number of edges recorded against this (peer, tracker) BEFORE a
-- consent cookie was observed. >0 = legal-grade evidence of
-- tracker firing before consent (RGPD art. 6.1.a + 7).
pre_consent_hits INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (client_mac_hash, tracker_domain)
);
@ -103,9 +118,79 @@ def _conn() -> sqlite3.Connection:
c = sqlite3.connect(str(DB_PATH), timeout=5.0, isolation_level=None)
c.row_factory = sqlite3.Row
c.executescript(_SCHEMA)
_migrate_phase11c(c)
return c
# ───── Phase 11.C migrations — additive columns on pre-existing tables ─────
# CREATE TABLE IF NOT EXISTS skips creation if the table already exists, so
# the new columns won't auto-appear on a 2.6.0 → 2.6.2 upgrade. Idempotent
# ALTERs : we probe the column list first to skip the duplicate-column
# error case (which would raise on every connection otherwise).
_PHASE11C_MIGRATIONS = (
("social_edges", "consent_state", "TEXT NOT NULL DEFAULT 'none_seen'"),
("social_nodes", "country_iso", "TEXT"),
("social_nodes", "asn_org", "TEXT"),
("social_nodes", "eu_inside", "INTEGER NOT NULL DEFAULT 1"),
("social_nodes", "pre_consent_hits", "INTEGER NOT NULL DEFAULT 0"),
)
def _migrate_phase11c(c: sqlite3.Connection) -> None:
try:
for table, col, decl in _PHASE11C_MIGRATIONS:
existing = {
r["name"] for r in c.execute(f"PRAGMA table_info({table})").fetchall()
}
if col not in existing:
c.execute(f"ALTER TABLE {table} ADD COLUMN {col} {decl}")
except Exception as e: # pragma: no cover
log.warning("Phase 11.C migration failed: %s", e)
# ───── EU / EEA whitelist (RGPD scope, art. 45 + Schengen extension) ─────
# Codes are ISO 3166-1 alpha-2. Source : EU member state list + EFTA
# (NO, IS, LI) + UK (adequacy decision in force as of writing). The
# Phase C "extra_eu" flag is set when GeoIP says the tracker's country
# ISO is NOT in this set.
_EU_EEA_ISO: frozenset = frozenset({
"AT", "BE", "BG", "CY", "CZ", "DE", "DK", "EE", "ES", "FI", "FR",
"GR", "HR", "HU", "IE", "IT", "LT", "LU", "LV", "MT", "NL", "PL",
"PT", "RO", "SE", "SI", "SK", # 27 EU
"IS", "LI", "NO", # EFTA / EEA
"GB", # UK adequacy decision
})
def is_eu_iso(iso: str | None) -> bool:
return bool(iso) and (iso or "").upper() in _EU_EEA_ISO
# Lightweight cache around the existing `geo` module so the fold loop
# doesn't pay the lookup cost per repeated tracker_domain. Bounded to
# 4096 entries (well above any realistic distinct tracker count seen
# in a 7d retention window).
import functools as _functools
@_functools.lru_cache(maxsize=4096)
def _geo_for(host: str) -> Tuple[Optional[str], Optional[str]]:
"""Return (country_iso, asn_org) for a tracker host.
Best-effort. Falls back to (None, None) when the GeoIP module isn't
importable (worker hasn't installed the mmdb yet) or when the host
is a raw IP and the underlying lookup misses.
"""
try:
from secubox_toolbox import geo as _g # type: ignore
info = _g.lookup(host) or {}
iso = (info.get("country_iso") or "").upper() or None
asn = (info.get("asn_org") or "")[:64] or None
return iso, asn
except Exception:
return None, None
def cookie_id_hash(tracker_domain: str, cookie_name: str, cookie_value: str) -> str:
"""Stable short hash for an observed tracker identifier.
@ -147,13 +232,14 @@ def _record_edge_sync(
tracker_domain: str,
cookie_id_hash_val: str,
ja4_hash: Optional[str],
consent_state: str,
) -> None:
try:
with _conn() as c:
c.execute(
"INSERT INTO social_edges(ts, client_mac_hash, src_site, "
"tracker_domain, cookie_id_hash, ja4_hash) "
"VALUES (?, ?, ?, ?, ?, ?)",
"tracker_domain, cookie_id_hash, ja4_hash, consent_state) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(
int(time.time()),
client_mac_hash,
@ -161,6 +247,7 @@ def _record_edge_sync(
tracker_domain,
cookie_id_hash_val,
ja4_hash,
consent_state or "none_seen",
),
)
except Exception as e: # pragma: no cover — best-effort
@ -174,9 +261,15 @@ def record_edge(
tracker_domain: str,
cookie_id_hash_val: str,
ja4_hash: Optional[str] = None,
consent_state: str = "none_seen",
) -> None:
"""Submit one edge off-thread. Best-effort, never raises into the
addon, never blocks the mitmproxy asyncio loop."""
addon, never blocks the mitmproxy asyncio loop.
`consent_state` is one of {none_seen, pre_consent, post_consent} as
computed by the addon based on the per-peer × per-site consent
cookie observation log (Phase 11.C).
"""
if not (client_mac_hash and src_site and tracker_domain and cookie_id_hash_val):
return
try:
@ -187,6 +280,7 @@ def record_edge(
tracker_domain,
cookie_id_hash_val,
ja4_hash,
consent_state,
)
except RuntimeError:
# Executor shut down (interpreter teardown) — silent drop.
@ -208,7 +302,7 @@ def fold_recent(window_seconds: int = 300) -> Tuple[int, int]:
with _conn() as c:
edges = c.execute(
"SELECT client_mac_hash, src_site, tracker_domain, "
"cookie_id_hash, ja4_hash, ts "
"cookie_id_hash, ja4_hash, ts, consent_state "
"FROM social_edges WHERE ts >= ?",
(since,),
).fetchall()
@ -233,12 +327,20 @@ def fold_recent(window_seconds: int = 300) -> Tuple[int, int]:
key_n = (mac, trk)
n = per_node.setdefault(
key_n,
{"hits": 0, "first_seen": ts, "last_seen": ts, "sites": set()},
{
"hits": 0,
"first_seen": ts,
"last_seen": ts,
"sites": set(),
"pre_consent_hits": 0,
},
)
n["hits"] += 1
n["first_seen"] = min(n["first_seen"], ts)
n["last_seen"] = max(n["last_seen"], ts)
n["sites"].add(site)
if e["consent_state"] == "pre_consent":
n["pre_consent_hits"] += 1
# Per-site tracker index (for link fold below)
per_site_trackers.setdefault((mac, site), set()).add(trk)
@ -248,8 +350,13 @@ def fold_recent(window_seconds: int = 300) -> Tuple[int, int]:
# Persist nodes
for (mac, trk), n in per_node.items():
# Merge into existing row if present
# Phase 11.C : enrich with GeoIP at fold time so reads
# + PDF rendering never block on mmdb lookups.
country_iso, asn_org = _geo_for(trk)
eu_inside = 1 if is_eu_iso(country_iso) else 0
cur = c.execute(
"SELECT hits, first_seen, sites_jsonl FROM social_nodes "
"SELECT hits, first_seen, sites_jsonl, pre_consent_hits "
"FROM social_nodes "
"WHERE client_mac_hash = ? AND tracker_domain = ?",
(mac, trk),
).fetchone()
@ -261,17 +368,22 @@ def fold_recent(window_seconds: int = 300) -> Tuple[int, int]:
except Exception:
existing_sites = set()
sites = sorted(existing_sites | n["sites"])
pre = (cur["pre_consent_hits"] or 0) + n["pre_consent_hits"]
c.execute(
"UPDATE social_nodes SET hits = ?, first_seen = ?, "
"last_seen = ?, sites_jsonl = ? "
"last_seen = ?, sites_jsonl = ?, country_iso = ?, "
"asn_org = ?, eu_inside = ?, pre_consent_hits = ? "
"WHERE client_mac_hash = ? AND tracker_domain = ?",
(hits, first, n["last_seen"], json.dumps(sites), mac, trk),
(hits, first, n["last_seen"], json.dumps(sites),
country_iso, asn_org, eu_inside, pre, mac, trk),
)
else:
c.execute(
"INSERT INTO social_nodes(client_mac_hash, "
"tracker_domain, hits, first_seen, last_seen, "
"sites_jsonl) VALUES (?, ?, ?, ?, ?, ?)",
"sites_jsonl, country_iso, asn_org, eu_inside, "
"pre_consent_hits) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
mac,
trk,
@ -279,6 +391,10 @@ def fold_recent(window_seconds: int = 300) -> Tuple[int, int]:
n["first_seen"],
n["last_seen"],
json.dumps(sorted(n["sites"])),
country_iso,
asn_org,
eu_inside,
n["pre_consent_hits"],
),
)
nodes_touched += 1
@ -502,6 +618,74 @@ def aggregate(hours: int = 24) -> Dict:
return out
def evidence(mac_hash: str, since_seconds: int = 86400) -> Dict:
"""Phase 11.C evidence helper — returns the legal-grade slice
consumed by the bilingual PDF report.
Two evidence buckets, both fact-only (no interpretation) :
- ``pre_consent`` : (tracker_domain, sites, pre_consent_hits,
country_iso, asn_org). Trackers that fired BEFORE a consent
cookie was observed for that peer × site. Direct RGPD art. 7
+ art. 6.1.a evidence.
- ``extra_eu`` : (tracker_domain, country_iso, asn_org,
sites). Trackers resolving to non-EU/EEA countries. Note :
we report the fact, not SCC absence (we can't prove a
negative). RGPD art. 44+ evidence.
"""
since = int(time.time()) - max(since_seconds, 3600)
out: Dict = {"pre_consent": [], "extra_eu": []}
if not mac_hash:
return out
try:
with _conn() as c:
for r in c.execute(
"SELECT tracker_domain, hits, pre_consent_hits, sites_jsonl, "
"country_iso, asn_org, last_seen "
"FROM social_nodes "
"WHERE client_mac_hash = ? AND last_seen >= ? "
"AND pre_consent_hits > 0 "
"ORDER BY pre_consent_hits DESC, hits DESC LIMIT 100",
(mac_hash, since),
).fetchall():
try:
sites = json.loads(r["sites_jsonl"])
except Exception:
sites = []
out["pre_consent"].append({
"tracker_domain": r["tracker_domain"],
"hits": r["hits"],
"pre_consent_hits": r["pre_consent_hits"],
"sites": sites,
"country_iso": r["country_iso"],
"asn_org": r["asn_org"],
"last_seen": r["last_seen"],
})
for r in c.execute(
"SELECT tracker_domain, hits, sites_jsonl, country_iso, "
"asn_org, last_seen "
"FROM social_nodes "
"WHERE client_mac_hash = ? AND last_seen >= ? "
"AND eu_inside = 0 AND country_iso IS NOT NULL "
"ORDER BY hits DESC LIMIT 100",
(mac_hash, since),
).fetchall():
try:
sites = json.loads(r["sites_jsonl"])
except Exception:
sites = []
out["extra_eu"].append({
"tracker_domain": r["tracker_domain"],
"hits": r["hits"],
"sites": sites,
"country_iso": r["country_iso"],
"asn_org": r["asn_org"],
"last_seen": r["last_seen"],
})
except Exception as e: # pragma: no cover
log.warning("evidence query failed: %s", e)
return out
def purge_older_than(days: int = 7) -> int:
"""Drop raw edges older than `days`. The aggregate node/link tables
stay : they represent the durable fold. Operator-side wipe goes