Compare commits

...

13 Commits

Author SHA1 Message Date
1139ce103e docs: record #649 selective SNI-splice (Lever A) ship + soak/flip next (PR #650)
Some checks are pending
License Headers / check (push) Waiting to run
2026-06-18 11:30:02 +02:00
CyberMind
173b67c495
Merge pull request #650 from CyberMind-FR/feature/649-perf-toolbox-selective-sni-splice-passth
perf(toolbox): selective SNI-splice — passthrough pure-asset flows, MITM only what needs L7 (Lever A)
2026-06-18 11:26:59 +02:00
e8cebe1662 fix(toolbox): refresh never-set on filters.json mtime too (fortknox via WebUI) (ref #649) 2026-06-18 11:26:35 +02:00
3b0423189e feat(toolbox): register tls_splice first in mitm-wg chain + changelog 2.6.54 (ref #649) 2026-06-18 11:23:59 +02:00
742897700c perf(toolbox): offload splice obs to bg thread + skip decided hosts; add fail-safe/recorder tests (ref #649) 2026-06-18 11:22:47 +02:00
296cebc69e feat(toolbox): autolearn promotes never-HTML hosts to splice-learned (ref #649) 2026-06-18 11:18:36 +02:00
4160167e5c feat(toolbox): tls_splice addon — SNI-splice at ClientHello + obs recorder (ref #649) 2026-06-18 11:17:39 +02:00
d836179a72 feat(toolbox): splice_host_obs table + record/never_html helpers (ref #649) 2026-06-18 11:16:39 +02:00
9d3630574f feat(toolbox): curated media SNI-splice seed (ref #649) 2026-06-18 11:11:43 +02:00
8664c84893 feat(toolbox): SNI-splice classifier (seed/learned/never) (ref #649) 2026-06-18 11:11:30 +02:00
96e04bbe0f feat(toolbox): tls_splice filter toggle off|observe|on (ref #649) 2026-06-18 11:10:53 +02:00
09e16f35a1 docs: implementation plan for toolbox selective SNI-splice (#649) 2026-06-18 11:09:02 +02:00
7834a29724 docs: spec for toolbox selective SNI-splice (Lever A, #649) 2026-06-18 11:04:26 +02:00
17 changed files with 1434 additions and 3 deletions

View File

@ -3,6 +3,28 @@
---
## 2026-06-18 — #649 selective SNI-splice (Lever A) shipped dark (PR #650, toolbox 2.6.54)
- **Architecture decision.** Asked "do we need a full mitm for R3 HTTPS?" Answer:
outbound HTTPS interception intrinsically needs per-host cert forging (the
WAF/own-cert analogy doesn't transfer) — so we keep a forging MITM but only
decrypt flows we'd actually modify. Plan = A-then-B: **A** = selective
SNI-splice (this), **B** = Go/Rust core (strategic, later). WAF deferred.
- **Lever A.** New `tls_splice` addon (first in the mitm-wg chain) decides at the
TLS ClientHello, from the SNI alone, whether to MITM or **splice** (raw
passthrough — no forge/decrypt/parse/16-addons). Policy: curated media-only seed
(googlevideo/ytimg/fbcdn/twimg/scdn…, deliberately NOT generic CDN edges)
autolearn-promoted never-HTML hosts (`splice_host_obs` table, ≥20 obs,
html_hits==0). Never splices trackers/fortknox/no-SNI/media_cache-on. Learning
obs recorded off the event loop (bg thread), only for undecided hosts.
- **Dark-launch.** Ships `tls_splice=observe` (classify + log would-splice, still
MITM — zero behavior change); `on` flip is post-soak; `off` kill-switch.
- **Built TDD** (7 tasks, 102 tests), two-stage reviews per task + whole-branch
review (APPROVED; closed a hot-path sync-SQLite issue → bg-thread offload, and a
fortknox-WebUI never-set refresh gap). **Deployed gk2 2.6.54**, rolling restart
of the 4 workers, addon loads clean, 0 runtime errors, dark default confirmed.
Next: soak → review → flip `on`.
## 2026-06-18 — #623 systemic shared-parent clobber resolved at source (PR #648)
- **Root cause corrected.** The recurring `/var/{lib,log,cache,…}/secubox` parent

View File

@ -34,8 +34,23 @@ Tout mergé sur master + déployé sur gk2. Détail dans HISTORY 2026-06-18.
thundering-herd) ; live couvert par `dirs-guard.timer` ; arrive au prochain
build CI / reflash.
- ✅ **#649 Lever A — selective SNI-splice (PR #650, toolbox 2.6.54 LIVE dark)**.
New `tls_splice` addon (first in mitm-wg chain) splices pure-asset flows at the
TLS ClientHello — curated media seed (googlevideo/ytimg/fbcdn/twimg/scdn…)
autolearn-promoted never-HTML hosts — so GIL-bound R3 workers skip
forge/decrypt/parse/16-addons on no-L7-value flows. Ships `tls_splice=observe`
(DARK: classify+log, still MITM). Deployed gk2, addon loads clean, 0 runtime
errors. Answer to "do we need full mitm?": YES for outbound HTTPS (per-host cert
forging is intrinsic) — but only decrypt what we modify. Lever B (Go/Rust core)
= strategic follow-up. WAF = later.
### ⬜ Next Up
- **#649 SOAK → FLIP** — review `would-splice` logs + `/run/secubox/splice.json`
on real traffic for a soak window, confirm no first-party/HTML host is
classified, then flip `tls_splice=on` in `/etc/secubox/toolbox/filters.json`
(hot-reload). Before flip: the fortknox-via-WebUI refresh gap is already fixed.
- **Lever B (#649 follow-up)** — Go/Rust forging-proxy core if A isn't enough.
- **Anti-Track v2 ARMING** (décision USER, gated) — soak observe-only puis flip
`privacy_enforce=true` ; régénérer `data/cdn-allowlist.txt` depuis les plages
publiques avant `privacy_ip_drop` ; `unbound-checkconf` avant `privacy_dns_feed`.

View File

@ -0,0 +1,675 @@
# Toolbox Selective SNI-Splice (Lever A) — Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax.
**Goal:** Splice (raw TCP passthrough) pure-asset HTTPS flows at the TLS ClientHello by SNI, so the GIL-bound mitm-wg workers only forge/decrypt/parse/run-addons on flows that need L7 work.
**Architecture:** New `tls_splice` addon (`tls_clienthello` hook) consults a pure classifier (`splice.py`) over a curated media seed autolearn-promoted never-HTML hosts; ships `tls_splice=observe` (dark) → flip to `on`. New `splice_host_obs` table feeds the learning. WAF + Go/Rust core out of scope.
**Tech Stack:** Python, mitmproxy 11 addon API, SQLite (WAL), pytest.
**Spec:** `docs/superpowers/specs/2026-06-18-toolbox-selective-sni-splice.md`
All paths below are under `packages/secubox-toolbox/`. Run tests from that dir with `python -m pytest` (fallback `python3 -m pytest`).
---
### Task 1: filters `tls_splice` toggle (off|observe|on, default observe)
**Files:** Modify `secubox_toolbox/filters.py`; Test `tests/test_filters_splice.py` (create)
- [ ] **Step 1: Failing test**
```python
# tests/test_filters_splice.py
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
import json
from secubox_toolbox import filters
def test_default_is_observe(monkeypatch, tmp_path):
monkeypatch.setattr(filters, "FILTERS_PATH", str(tmp_path / "f.json"))
assert filters.get_filters(force=True)["tls_splice"] == "observe"
def test_bad_value_falls_back(monkeypatch, tmp_path):
fp = tmp_path / "f.json"; fp.write_text(json.dumps({"tls_splice": "bogus"}))
monkeypatch.setattr(filters, "FILTERS_PATH", str(fp))
assert filters.get_filters(force=True)["tls_splice"] == "observe"
def test_set_filters_accepts_valid(monkeypatch, tmp_path):
monkeypatch.setattr(filters, "FILTERS_PATH", str(tmp_path / "f.json"))
out = filters.set_filters({"tls_splice": "on"})
assert out["tls_splice"] == "on"
out = filters.set_filters({"tls_splice": "nope"})
assert out["tls_splice"] == "on" # invalid ignored, prior kept
```
Run: `python -m pytest tests/test_filters_splice.py -v` → FAIL (`tls_splice` missing).
- [ ] **Step 2: Implement**
In `secubox_toolbox/filters.py`:
- Add to `DEFAULTS` (after the `"autolearn": True,` line):
```python
"tls_splice": "observe", # #649 off | observe | on (asset SNI-splice)
```
- After `_VALID_PROTECTIVE = ("off", "alert", "spoof")` add:
```python
_VALID_SPLICE = ("off", "observe", "on")
```
- In `get_filters`, after the `protective` validation block (line ~67-68) add:
```python
if out.get("tls_splice") not in _VALID_SPLICE:
out["tls_splice"] = DEFAULTS["tls_splice"]
```
- In `set_filters`, add a branch (after the `protective` branch):
```python
elif k == "tls_splice" and v in _VALID_SPLICE:
cur["tls_splice"] = v
```
- [ ] **Step 3: Pass**`python -m pytest tests/test_filters_splice.py -v` → PASS.
- [ ] **Step 4: Commit**`git add secubox_toolbox/filters.py tests/test_filters_splice.py && git commit -m "feat(toolbox): tls_splice filter toggle off|observe|on (ref #649)"`
---
### Task 2: `splice.py` classifier (pure)
**Files:** Create `secubox_toolbox/splice.py`, `tests/test_splice_classify.py`
- [ ] **Step 1: Failing test**
```python
# tests/test_splice_classify.py
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
from secubox_toolbox import splice
def test_host_matches_exact_and_subdomain():
pats = {"googlevideo.com", "fbcdn.net"}
assert splice.host_matches("googlevideo.com", pats)
assert splice.host_matches("r1---sn-x.googlevideo.com", pats)
assert not splice.host_matches("notgooglevideo.com", pats) # no false prefix
assert not splice.host_matches("example.com", pats)
def test_should_splice_seed_and_learned():
seed = {"googlevideo.com"}; learned = {"cdn.example.net"}; never = set()
assert splice.should_splice("x.googlevideo.com", seed, learned, never)
assert splice.should_splice("cdn.example.net", seed, learned, never)
assert not splice.should_splice("news.example.com", seed, learned, never)
def test_never_wins():
seed = {"evil-cdn.com"}; never = {"evil-cdn.com"}
assert not splice.should_splice("evil-cdn.com", seed, set(), never)
def test_empty_sni_or_sets():
assert not splice.should_splice("", {"a.com"}, set(), set())
assert not splice.should_splice("a.com", set(), set(), set())
def test_load_seed_strips_comments(tmp_path):
f = tmp_path / "seed.conf"
f.write_text("# header\ngooglevideo.com # yt\n\n fbcdn.net\n")
s = splice.load_splice_seed(str(f))
assert s == {"googlevideo.com", "fbcdn.net"}
```
Run: `python -m pytest tests/test_splice_classify.py -v` → FAIL (no module).
- [ ] **Step 2: Implement** `secubox_toolbox/splice.py`
```python
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
# Copyright (c) 2026 CyberMind — Gérald Kerma <devel@cybermind.fr>
"""SecuBox-Deb :: toolbox :: SNI-splice classifier (#649).
Pure helpers deciding, from the TLS SNI alone, whether a flow is a pure-asset
flow we can splice (raw passthrough, no MITM). Seed learned, minus a never-set
(trackers we block/poison, fortknox sites). Suffix match so CDN shards match.
"""
from __future__ import annotations
import os
from typing import Set
def _load_lines(path: str) -> Set[str]:
out: Set[str] = set()
try:
with open(path, "r", encoding="utf-8") as f:
for raw in f:
line = raw.split("#", 1)[0].strip().lower()
if line:
out.add(line)
except Exception:
pass
return out
def load_splice_seed(path: str) -> Set[str]:
return _load_lines(path)
def load_learned_splice(path: str) -> Set[str]:
return _load_lines(path)
def host_matches(host: str, patterns: Set[str]) -> bool:
"""True if host == pattern or host is a subdomain of pattern."""
h = (host or "").lower().strip(".")
if not h or not patterns:
return False
if h in patterns:
return True
for p in patterns:
if h.endswith("." + p):
return True
return False
def should_splice(sni: str, seed: Set[str], learned: Set[str],
never: Set[str]) -> bool:
s = (sni or "").lower().strip(".")
if not s:
return False
if host_matches(s, never): # never wins (trackers / fortknox)
return False
return host_matches(s, seed) or host_matches(s, learned)
```
- [ ] **Step 3: Pass**`python -m pytest tests/test_splice_classify.py -v` → PASS.
- [ ] **Step 4: Commit**`git add secubox_toolbox/splice.py tests/test_splice_classify.py && git commit -m "feat(toolbox): SNI-splice classifier (seed/learned/never) (ref #649)"`
---
### Task 3: curated media seed conf
**Files:** Create `conf/tls-splice-seed.conf`
- [ ] **Step 1: Create** `conf/tls-splice-seed.conf`
```
# SecuBox toolbox :: SNI-splice seed (#649)
# MEDIA/ASSET-SPECIFIC hosts only — NEVER generic CDN edges (cloudfront/fastly/
# akamai-edge) which also serve HTML apps; splicing those would blind the MITM
# to real pages. Suffix-matched (subdomains included). One host suffix per line.
googlevideo.com # YouTube video streams (largest single hog)
ytimg.com # YouTube thumbnails
gstatic.com # Google static assets
ggpht.com # Google user content / avatars
fbcdn.net # Facebook / Instagram media
cdninstagram.com # Instagram media
twimg.com # Twitter / X media
licdn.com # LinkedIn media
sndcdn.com # SoundCloud audio
scdn.co # Spotify audio
mzstatic.com # Apple media / artwork
```
- [ ] **Step 2: Commit**`git add conf/tls-splice-seed.conf && git commit -m "feat(toolbox): curated media SNI-splice seed (ref #649)"`
---
### Task 4: `splice_host_obs` table + store helpers
**Files:** Modify `secubox_toolbox/store.py`; Test `tests/test_splice_obs.py`
- [ ] **Step 1: Failing test**
```python
# tests/test_splice_obs.py
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
from pathlib import Path
from secubox_toolbox import store
def _fresh(tmp_path, monkeypatch):
monkeypatch.setattr(store, "DB_PATH", Path(tmp_path) / "t.db")
def test_record_and_never_html(tmp_path, monkeypatch):
_fresh(tmp_path, monkeypatch)
for _ in range(20):
store.record_splice_obs("cdn.assets.net", is_html=False)
for _ in range(20):
store.record_splice_obs("www.site.com", is_html=False)
store.record_splice_obs("www.site.com", is_html=True) # served HTML once
hosts = store.never_html_hosts(min_hits=20)
assert "cdn.assets.net" in hosts
assert "www.site.com" not in hosts # html_hits > 0 → excluded
def test_sampling_cap(tmp_path, monkeypatch):
_fresh(tmp_path, monkeypatch)
for _ in range(100):
store.record_splice_obs("x.net", is_html=False)
# capped at 50 — never grows unbounded
import sqlite3
with store._conn() as c:
hits = c.execute("SELECT hits FROM splice_host_obs WHERE host='x.net'").fetchone()[0]
assert hits == 50
```
Run: `python -m pytest tests/test_splice_obs.py -v` → FAIL.
- [ ] **Step 2: Implement** in `secubox_toolbox/store.py`
Add to the `SCHEMA` string (before its closing `"""`):
```sql
CREATE TABLE IF NOT EXISTS splice_host_obs (
host TEXT PRIMARY KEY,
hits INTEGER NOT NULL DEFAULT 0,
html_hits INTEGER NOT NULL DEFAULT 0,
last_seen REAL
);
```
Add these functions (anywhere after `_conn`):
```python
_SPLICE_OBS_CAP = 50 # stop counting once we have enough signal per host
def record_splice_obs(host: str, is_html: bool) -> None:
"""Observe a MITM'd flow's host + whether it served text/html. Sampling-capped
so writes stay bounded. Best-effort (never raises into the proxy path)."""
h = (host or "").lower().strip(".")
if not h:
return
try:
with _conn() as c:
c.execute(
"INSERT INTO splice_host_obs(host, hits, html_hits, last_seen) "
"VALUES(?, 1, ?, ?) "
"ON CONFLICT(host) DO UPDATE SET "
" hits = MIN(hits + 1, ?), "
" html_hits = html_hits + ?, "
" last_seen = ? "
"WHERE splice_host_obs.hits < ?",
(h, 1 if is_html else 0, time.time(),
_SPLICE_OBS_CAP, 1 if is_html else 0, time.time(), _SPLICE_OBS_CAP),
)
except Exception as e:
log.debug("splice obs failed: %s", e)
def never_html_hosts(min_hits: int = 20) -> list[str]:
"""Hosts observed >= min_hits times that NEVER served text/html."""
try:
with _conn() as c:
rows = c.execute(
"SELECT host FROM splice_host_obs WHERE hits >= ? AND html_hits = 0",
(min_hits,),
).fetchall()
return [r[0] for r in rows]
except Exception:
return []
```
NOTE: confirm `_conn()` returns rows indexable by `[0]` (it uses default row
factory unless `row_factory` set — check; if `sqlite3.Row` is set, `r[0]` still
works). If `record_splice_obs`'s `WHERE ... hits < cap` guard interferes with the
`MIN(...)` (redundant), keep only the `WHERE` guard OR the `MIN` — pick the `MIN`
form and drop the trailing `WHERE` clause if the test shows the cap exceeded; the
test asserts hits==50.
- [ ] **Step 3: Pass**`python -m pytest tests/test_splice_obs.py -v` → PASS. (If the ON CONFLICT cap logic misbehaves on the SQLite version, simplify to: read current hits, `if hits < cap` then increment — keep it passing the test.)
- [ ] **Step 4: Commit**`git add secubox_toolbox/store.py tests/test_splice_obs.py && git commit -m "feat(toolbox): splice_host_obs table + record/never_html helpers (ref #649)"`
---
### Task 5: `tls_splice.py` addon
**Files:** Create `mitmproxy_addons/tls_splice.py`, `tests/test_tls_splice_addon.py`
- [ ] **Step 1: Failing test**
```python
# tests/test_tls_splice_addon.py
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
import sys, pathlib, importlib, json, types
ADDON_DIR = pathlib.Path(__file__).resolve().parents[1] / "mitmproxy_addons"
sys.path.insert(0, str(ADDON_DIR))
from secubox_toolbox import filters
def _addon(monkeypatch, tmp_path, mode):
fp = tmp_path / "f.json"; fp.write_text(json.dumps({"tls_splice": mode}))
monkeypatch.setattr(filters, "FILTERS_PATH", str(fp)); filters.get_filters(force=True)
import tls_splice; importlib.reload(tls_splice)
a = tls_splice.TlsSplice()
a._seed = {"googlevideo.com"}; a._learned = set(); a._never = set()
monkeypatch.setattr(a, "_refresh_sets", lambda: None)
return tls_splice, a
def _ch(sni):
d = types.SimpleNamespace()
d.client_hello = types.SimpleNamespace(sni=sni)
d.context = types.SimpleNamespace(client=types.SimpleNamespace(peername=("10.99.1.2", 5)))
d.ignore_connection = False
return d
def test_on_splices_seed_host(monkeypatch, tmp_path):
_, a = _addon(monkeypatch, tmp_path, "on")
d = _ch("r1.googlevideo.com"); a.tls_clienthello(d)
assert d.ignore_connection is True
def test_observe_does_not_splice(monkeypatch, tmp_path):
_, a = _addon(monkeypatch, tmp_path, "observe")
d = _ch("r1.googlevideo.com"); a.tls_clienthello(d)
assert d.ignore_connection is False
def test_off_returns_early(monkeypatch, tmp_path):
_, a = _addon(monkeypatch, tmp_path, "off")
d = _ch("r1.googlevideo.com"); a.tls_clienthello(d)
assert d.ignore_connection is False
def test_non_seed_not_spliced(monkeypatch, tmp_path):
_, a = _addon(monkeypatch, tmp_path, "on")
d = _ch("news.example.com"); a.tls_clienthello(d)
assert d.ignore_connection is False
def test_no_sni_not_spliced(monkeypatch, tmp_path):
_, a = _addon(monkeypatch, tmp_path, "on")
d = _ch(None); a.tls_clienthello(d)
assert d.ignore_connection is False
```
Run: `python -m pytest tests/test_tls_splice_addon.py -v` → FAIL.
- [ ] **Step 2: Implement** `mitmproxy_addons/tls_splice.py`
```python
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
# Copyright (c) 2026 CyberMind — Gérald Kerma <devel@cybermind.fr>
"""SecuBox-Deb :: toolbox :: selective SNI-splice (#649, Lever A).
At the TLS ClientHello, splice (raw passthrough, no forge/decrypt/parse/addons)
pure-asset flows decided from the SNI. Modes (filters.tls_splice):
off — never splice (legacy: MITM everything)
observe — classify + log/count "would-splice", but still MITM (dark-launch)
on — actually splice
Also records per-host content-type observations (MITM'd flows) to feed the
autolearn never-HTML promotion. Registered FIRST in the mitm-wg addon chain.
"""
from __future__ import annotations
import json
import logging
import os
import sys
import time
if "/usr/lib/secubox/toolbox" not in sys.path:
sys.path.insert(0, "/usr/lib/secubox/toolbox")
from secubox_toolbox import splice as _splice # noqa: E402
from secubox_toolbox.filters import get_filters as _gf # noqa: E402
try:
from secubox_toolbox import store as _store # noqa: E402
except Exception: # pragma: no cover
_store = None
log = logging.getLogger("secubox.toolbox.addons")
SEED_PATH = os.environ.get("SECUBOX_SPLICE_SEED",
"/usr/lib/secubox/toolbox/conf/tls-splice-seed.conf")
LEARNED_PATH = os.environ.get("SECUBOX_SPLICE_LEARNED",
"/var/lib/secubox/toolbox/splice-learned.txt")
PURE_PATH = os.environ.get("SECUBOX_PURE_TRACKERS",
"/var/lib/secubox/toolbox/pure-trackers.txt")
STATS = "/run/secubox/splice.json"
_counts = {"spliced": 0, "would_splice": 0, "mitm": 0, "since": int(time.time())}
_last_flush = 0.0
class TlsSplice:
def __init__(self) -> None:
self._seed: set = set()
self._learned: set = set()
self._never: set = set()
self._mtimes: tuple = ()
self._refresh_sets()
def _refresh_sets(self) -> None:
"""Reload seed/learned/never sets when any backing file changes."""
try:
mtimes = tuple(
os.stat(p).st_mtime if os.path.exists(p) else 0.0
for p in (SEED_PATH, LEARNED_PATH, PURE_PATH))
except Exception:
mtimes = ()
if mtimes == self._mtimes and self._seed:
return
self._seed = _splice.load_splice_seed(SEED_PATH)
self._learned = _splice.load_learned_splice(LEARNED_PATH)
never = _splice.load_learned_splice(PURE_PATH) # pure trackers
try:
for s in _gf().get("fortknox_sites", []) or []:
never.add(str(s).lower().strip("."))
except Exception:
pass
self._never = never
self._mtimes = mtimes
def tls_clienthello(self, data) -> None:
try:
mode = _gf().get("tls_splice", "observe")
if mode == "off":
return
# media_cache wants to see asset flows → don't splice when it's on
if _gf().get("media_cache"):
return
sni = getattr(data.client_hello, "sni", None)
if not sni:
return
self._refresh_sets()
if not _splice.should_splice(sni, self._seed, self._learned, self._never):
return
if mode == "on":
data.ignore_connection = True
_counts["spliced"] += 1
else: # observe
_counts["would_splice"] += 1
log.info("tls-splice would-splice %s", sni)
self._flush()
except Exception as e: # never break a connection
log.debug("tls_splice clienthello error: %s", e)
def response(self, flow) -> None:
"""Record host content-type on MITM'd flows (learning signal)."""
if _store is None:
return
try:
if _gf().get("tls_splice", "observe") == "off":
return
host = flow.request.pretty_host or ""
ct = (flow.response.headers.get("content-type", "") or "").lower()
_store.record_splice_obs(host, is_html=("text/html" in ct))
except Exception:
pass
def _flush(self) -> None:
global _last_flush
now = time.time()
if (now - _last_flush) < 5:
return
_last_flush = now
try:
os.makedirs(os.path.dirname(STATS), exist_ok=True)
with open(STATS, "w", encoding="utf-8") as f:
json.dump({**_counts, "updated": int(now)}, f)
except Exception:
pass
addons = [TlsSplice()]
```
- [ ] **Step 3: Pass**`python -m pytest tests/test_tls_splice_addon.py -v` → PASS.
- [ ] **Step 4: Commit**`git add mitmproxy_addons/tls_splice.py tests/test_tls_splice_addon.py && git commit -m "feat(toolbox): tls_splice addon — SNI-splice at ClientHello + obs recorder (ref #649)"`
---
### Task 6: autolearn `_splice_feed` promotion
**Files:** Modify `sbin/secubox-toolbox-autolearn`; Test `tests/test_autolearn_splice.py`
- [ ] **Step 1: Failing test**
```python
# tests/test_autolearn_splice.py
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
import os, sqlite3, importlib.util, pathlib
def _load_autolearn():
p = pathlib.Path(__file__).resolve().parents[1] / "sbin" / "secubox-toolbox-autolearn"
spec = importlib.util.spec_from_loader("autolearn", loader=None)
mod = importlib.util.module_from_spec(spec)
exec(compile(p.read_text(), str(p), "exec"), mod.__dict__)
return mod
def test_splice_feed_promotes_never_html(tmp_path, monkeypatch):
db = tmp_path / "t.db"
con = sqlite3.connect(db)
con.executescript(
"CREATE TABLE splice_host_obs(host TEXT PRIMARY KEY, hits INT, html_hits INT, last_seen REAL);"
"INSERT INTO splice_host_obs VALUES('cdn.assets.net',25,0,0);"
"INSERT INTO splice_host_obs VALUES('html.site.com',25,3,0);"
"INSERT INTO splice_host_obs VALUES('low.hits.net',5,0,0);")
con.commit(); con.close()
out = tmp_path / "splice-learned.txt"
monkeypatch.setenv("SECUBOX_AUTOLEARN_DB", str(db))
monkeypatch.setenv("SECUBOX_SPLICE_LEARNED_OUT", str(out))
al = _load_autolearn()
n = al._splice_feed()
learned = set(out.read_text().split())
assert "cdn.assets.net" in learned # never-HTML, >=20 hits
assert "html.site.com" not in learned # served HTML
assert "low.hits.net" not in learned # too few hits
assert n == 1
```
Run: `python -m pytest tests/test_autolearn_splice.py -v` → FAIL.
- [ ] **Step 2: Implement** in `sbin/secubox-toolbox-autolearn`
Add near the other env paths (after `PURE_OUT`):
```python
SPLICE_LEARNED_OUT = os.environ.get(
"SECUBOX_SPLICE_LEARNED_OUT",
"/var/lib/secubox/toolbox/splice-learned.txt")
SPLICE_MIN_HITS = int(os.environ.get("SECUBOX_SPLICE_MIN_HITS", "20"))
SPLICE_MAX = 2000
```
Add the function (near `_dns_feed`):
```python
def _splice_feed() -> int:
"""Promote hosts that NEVER served text/html over >= SPLICE_MIN_HITS
observations into the learned-splice file (registrable-folded, capped).
Gated: skip when tls_splice == 'off'. Returns count written, or -1 if gated."""
try:
from secubox_toolbox.filters import get_filters
if get_filters().get("tls_splice", "observe") == "off":
return -1
except Exception:
pass
try:
con = sqlite3.connect(DB, timeout=5)
rows = con.execute(
"SELECT host FROM splice_host_obs WHERE hits >= ? AND html_hits = 0",
(SPLICE_MIN_HITS,)).fetchall()
con.close()
except Exception as e:
sys.stderr.write(f"autolearn: splice query failed: {e}\n")
return -1
hosts = sorted({(r[0] or "").lower().strip(".") for r in rows if r[0]})[:SPLICE_MAX]
try:
os.makedirs(os.path.dirname(SPLICE_LEARNED_OUT), exist_ok=True)
tmp = SPLICE_LEARNED_OUT + ".tmp"
with open(tmp, "w", encoding="utf-8") as fh:
fh.write("\n".join(hosts) + ("\n" if hosts else ""))
os.replace(tmp, SPLICE_LEARNED_OUT)
except Exception as e:
sys.stderr.write(f"autolearn: splice write failed: {e}\n")
return -1
return len(hosts)
```
Call it from the script's main body (where `_dns_feed` etc. are invoked — find the bottom main section and add):
```python
try:
_n_splice = _splice_feed()
sys.stderr.write(f"autolearn: {_n_splice} splice hosts learned\n")
except Exception as e:
sys.stderr.write(f"autolearn: splice feed error: {e}\n")
```
(Place it alongside the existing feed calls; keep it best-effort so it never aborts the run.)
- [ ] **Step 3: Pass**`python -m pytest tests/test_autolearn_splice.py -v` → PASS.
- [ ] **Step 4: Commit**`git add sbin/secubox-toolbox-autolearn tests/test_autolearn_splice.py && git commit -m "feat(toolbox): autolearn promotes never-HTML hosts to splice-learned (ref #649)"`
---
### Task 7: wiring (launch chain, debian/rules, changelog)
**Files:** Modify `sbin/secubox-toolbox-mitm-wg-launch`, `debian/rules`, `debian/changelog`
- [ ] **Step 1: Register addon FIRST** in `sbin/secubox-toolbox-mitm-wg-launch`
In the `for addon in ... ; do` list (currently begins `inject_xff utiq_defense ...`), prepend `tls_splice`:
```bash
for addon in tls_splice inject_xff utiq_defense protective_mode privacy_guard ad_ghost media_cache local_store social_graph inject_banner dpi cookies avatar ja4 soc_relay cert_pin_detect media_stats; do
```
(Its only acting hook is `tls_clienthello`, which fires before any requestheaders
addon regardless of order — so this doesn't disturb inject_xff's first-at-
requestheaders contract; placing it first is just clarity.)
- [ ] **Step 2: Install the seed conf** in `debian/rules`
Find the block that installs `conf/` (the bypass-seed install, near `conf/mitm-bypass-seed.conf`) and ensure the whole `conf/` dir (or the new file) lands at `/usr/lib/secubox/toolbox/conf/`. If there's an explicit per-file copy, add:
```make
install -m 0644 conf/tls-splice-seed.conf $(DESTDIR)/usr/lib/secubox/toolbox/conf/
```
(If `conf/` is copied wholesale, no change needed — verify with `grep -n "conf/" debian/rules`.)
- [ ] **Step 3: Bump changelog** — new top entry in `debian/changelog`, version after the current top (`head -1 debian/changelog`):
```
secubox-toolbox (2.6.54-1~bookworm1) bookworm; urgency=medium
* feat(#649): selective SNI-splice (Lever A). New tls_splice addon splices
pure-asset flows (curated media seed + autolearn-promoted never-HTML hosts)
at the TLS ClientHello — no forge/decrypt/parse/addons on those — so the
GIL-bound R3 workers only do L7 work on flows that need it. Ships
tls_splice=observe (dark: classify + log, still MITM); flip to `on` after
soak. Kill-switch `off`. Trackers/fortknox/no-SNI/media_cache never spliced.
-- Gerald KERMA <devel@cybermind.fr> Thu, 18 Jun 2026 14:00:00 +0200
```
(Use the actual next version; if top is 2.6.53 → 2.6.54.)
- [ ] **Step 4: Full suite**`python -m pytest tests/ -q` → all green (new + existing).
- [ ] **Step 5: bash -n**`bash -n sbin/secubox-toolbox-mitm-wg-launch && python3 -c "import ast; ast.parse(open('sbin/secubox-toolbox-autolearn').read())"` → no errors.
- [ ] **Step 6: Commit**`git add sbin/secubox-toolbox-mitm-wg-launch debian/rules debian/changelog && git commit -m "feat(toolbox): wire tls_splice addon + seed install + changelog 2.6.54 (ref #649)"`
---
## Self-Review notes
- Spec coverage: filters (T1), classifier (T2), seed (T3), obs/learning store (T4), addon incl. dark-launch modes + media_cache guard + recorder (T5), autolearn promotion (T6), wiring (T7). All spec sections mapped.
- Threshold consistency: obs sampling cap 50 (T4) ≥ promotion min_hits 20 (T4 test, T6) — a host can reach 20 never-HTML hits within the 50 cap. Consistent.
- `should_splice` never-set includes pure-trackers + fortknox (T2/T5). media_cache guard is in the addon (T5), not the pure classifier — keeps `splice.py` pure.
- Dark default `observe` (T1) means deploy is behavior-neutral until flip; matches spec rollout.
- Risk noted in T4 Step 2: verify the `ON CONFLICT ... MIN()/WHERE` cap on the target SQLite; fallback (read-then-write) given if needed to keep the test green.

View File

@ -0,0 +1,159 @@
# Toolbox selective SNI-splice (Lever A) — design
- **Date:** 2026-06-18 · **Package:** `secubox-toolbox` · **Issue:** #649
- **Status:** Design approved (adaptive seed+learned, dark-launch). Pending plan.
- **Parent:** lighter-MITM plan, A-then-B. This is **Lever A** (stay in mitmproxy,
decrypt only what we modify). Lever B (Go/Rust core) is a later strategic call.
WAF is explicitly out of scope here ("maybe later").
## Problem
R3 web loading is slow because the 4 `secubox-toolbox-mitm-wg-worker@` processes
are GIL-bound (~1 core total, each pinned ~2530% = single-thread ceiling) and
**forge a cert + terminate TLS + parse HTTP + run 16 addons on every flow**, then
most addons bail. Heavy asset/video/CDN flows (e.g. YouTube `googlevideo`)
dominate that CPU for **zero privacy value** — there's nothing to inspect or
rewrite in image/video/audio bytes. (#646 measured the ceiling.)
## Goal
Run the expensive L7 path only on flows we'd actually inspect/modify. **Splice**
(raw TCP passthrough, no forge/TLS/parse/addons) the pure-asset flows, decided at
the TLS ClientHello from the **SNI** alone (the only thing known pre-decrypt).
Non-goals: removing the MITM (outbound HTTPS interception intrinsically needs
per-host cert forging — see issue); WAF; the Go/Rust rewrite.
## Mechanism
A new addon `mitmproxy_addons/tls_splice.py`, registered **FIRST** in
`sbin/secubox-toolbox-mitm-wg-launch` (before `inject_xff`), implements:
```python
def tls_clienthello(self, data):
mode = _filters().get("tls_splice", "observe") # off | observe | on
if mode == "off":
return
sni = (data.client_hello.sni or "").lower()
if not sni: # no SNI → never splice blind
return
if splice.should_splice(sni, self._seed, self._learned, self._never):
if mode == "on":
data.ignore_connection = True # SPLICE: raw passthrough
_bump("spliced")
else: # observe: classify + log, still MITM
_bump("would_splice"); log.info("would-splice %s", sni)
```
`data.ignore_connection = True` is mitmproxy's documented splice (no TLS
interception). `tls_clienthello` / `data.client_hello.sni` are already used by
`ja4.py` and `local_store.py`, so the API is present in our mitmproxy 11.
The same addon also records a lightweight **learning observation** on the response
hook of MITM'd flows (see Learning), so the learned-splice set can grow. (Spliced
flows produce no response hook — once a host is promoted, its observation freezes;
acceptable, the seed is media-only and the toggle is a kill-switch.)
## Classifier — `secubox_toolbox/splice.py` (pure, testable)
```python
def load_splice_seed(path) -> set[str] # suffix patterns from conf (+ comments stripped)
def load_learned_splice(path) -> set[str] # learned hostnames (autolearn output)
def host_matches(host, patterns) -> bool # host == p or host endswith "."+p
def should_splice(sni, seed, learned, never) -> bool:
# never wins (defensive): trackers we block/poison, fortknox sites
if host_matches(sni, never): return False
return host_matches(sni, seed) or host_matches(sni, learned)
```
- `never` = pure-trackers (`pure-trackers.txt`, already maintained by Anti-Track
2a) `fortknox_sites` (from filters). Even a CDN-fronted tracker stays MITM'd.
- Suffix match so `r1---sn-x.googlevideo.com` matches seed `googlevideo.com`.
- The seed/learned/never sets are loaded once per worker and **mtime-refreshed**
(mirror `_common._wg_hash_of`'s cache pattern) so autolearn updates land without
a restart, but per-connection lookups stay O(1) set hits.
## Seed — `conf/tls-splice-seed.conf`
Curated, **media/asset-specific only** (NOT generic CDN edges like cloudfront/
fastly/akamai-edge, which also serve HTML apps — splicing those would blind us to
real pages). v1 set:
```
googlevideo.com # YouTube video (the single biggest hog)
ytimg.com # YT thumbnails
gstatic.com # Google static assets
ggpht.com # Google user content
fbcdn.net # Facebook/IG media
cdninstagram.com
twimg.com # Twitter/X media
licdn.com # LinkedIn media
sndcdn.com # SoundCloud audio
scdn.co # Spotify audio
mzstatic.com # Apple media
```
Operator can extend via an operator splice file (same 3-way merge idea as the
bypass lists), but v1 ships only the seed + learned.
## Learning — never-HTML promotion
New table (SQLite, WAL already on):
```sql
CREATE TABLE IF NOT EXISTS splice_host_obs (
host TEXT PRIMARY KEY, hits INTEGER NOT NULL DEFAULT 0,
html_hits INTEGER NOT NULL DEFAULT 0, last_seen REAL
);
```
- `tls_splice.py` response hook (MITM'd flows only) upserts: `hits += 1`,
`html_hits += 1` if `Content-Type` contains `text/html`. **Sampling cap:** stop
counting once `hits >= 50` per host (bounds write amplification; 50 is enough
signal). Cheap: one upsert, no body read.
- `sbin/secubox-toolbox-autolearn` gains `_splice_feed()`: promote hosts with
`hits >= 20 AND html_hits == 0` (never served HTML over ≥20 observations) to
`/var/lib/secubox/toolbox/splice-learned.txt` (atomic write, `os.replace`).
Gated on `tls_splice != "off"`. Registrable-folded, deduped, capped (e.g. 2000).
- Demotion: not automatic (spliced hosts stop being observed). The media-only seed
+ the never-set + the kill-switch toggle bound the risk; a host that wrongly got
spliced is removed by clearing the learned file or toggling off.
## Config — `filters.json`
Add `tls_splice``{off, observe, on}`, **default `observe`** (dark-launch:
classify + log would-splice, but still MITM — zero behavior change until flipped).
- `filters.py`: add `"tls_splice": "observe"` to `DEFAULTS`; add
`_VALID_SPLICE = {"off","observe","on"}` and validate (mirror `protective`).
- `set_filters`: accept `tls_splice` only if in `_VALID_SPLICE`.
## Counters / observability
`tls_splice.py` flushes `/run/secubox/splice.json`
(`{spliced, would_splice, mitm, since, updated}`) every ~5 s (mirror
`ad_ghost._flush`). Optional future UI tile; not required for v1.
## Tradeoff (explicit)
Spliced flows are invisible to DPI / media-stats / social-graph / media-cache.
Acceptable for pure asset CDNs (no privacy signal in media bytes; assets aren't
HTML so no banner/ad-ghost lost). **media_cache interaction:** when
`media_cache` is enabled, do NOT splice (media_cache needs to see those flows) —
`should_splice` returns False if `filters.media_cache` is true. (v1: media_cache
defaults off, so this is a guard for the opt-in case.)
## Safety / rollout
1. Ships `tls_splice=observe` (dark). Soak, review `/run/secubox/splice.json` +
"would-splice" logs against real traffic, confirm no first-party/HTML host is
classified, THEN flip to `on`.
2. No SNI → MITM. `never` set wins. media_cache-on → MITM.
3. Kill-switch: `tls_splice=off` reverts to today's behavior instantly (filters
hot-reload, 5 s cache).
4. Deploy = rolling sequential restart of the 4 `mitm-wg-worker@` (3/4 capacity
during the roll), no mass restart.
## Tests
- `splice.py`: `host_matches` suffix logic (exact, subdomain, non-match, no false
prefix match e.g. `notgooglevideo.com`); `should_splice` (seed hit, learned hit,
never wins over seed, no-SNI→False, empty sets→False).
- filters: `tls_splice` validates {off,observe,on}, bad value → default; round-trips
via set_filters.
- learning: `_splice_feed` promotes `hits>=20 & html_hits==0`, excludes
`html_hits>0` and `hits<20` (monkeypatch DB rows).
- addon: `tls_clienthello` sets `ignore_connection` only when mode==on AND
should_splice; observe mode never sets it; off mode returns early. (Fake
ClientHelloData with `.client_hello.sni`.)
## Files
- Create `secubox_toolbox/splice.py`, `mitmproxy_addons/tls_splice.py`,
`conf/tls-splice-seed.conf`, tests.
- Modify `secubox_toolbox/filters.py` (toggle), `sbin/secubox-toolbox-mitm-wg-launch`
(register addon first + ship seed path), `sbin/secubox-toolbox-autolearn`
(`_splice_feed`), `secubox_toolbox/store.py` or `social.py` (obs table),
`debian/rules` (install seed conf), `debian/changelog`.

View File

@ -0,0 +1,15 @@
# SecuBox toolbox :: SNI-splice seed (#649)
# MEDIA/ASSET-SPECIFIC hosts only — NEVER generic CDN edges (cloudfront/fastly/
# akamai-edge) which also serve HTML apps; splicing those would blind the MITM
# to real pages. Suffix-matched (subdomains included). One host suffix per line.
googlevideo.com # YouTube video streams (largest single hog)
ytimg.com # YouTube thumbnails
gstatic.com # Google static assets
ggpht.com # Google user content / avatars
fbcdn.net # Facebook / Instagram media
cdninstagram.com # Instagram media
twimg.com # Twitter / X media
licdn.com # LinkedIn media
sndcdn.com # SoundCloud audio
scdn.co # Spotify audio
mzstatic.com # Apple media / artwork

View File

@ -1,3 +1,17 @@
secubox-toolbox (2.6.54-1~bookworm1) bookworm; urgency=medium
* feat(#649): selective SNI-splice (Lever A). New tls_splice addon (first in
the mitm-wg chain) splices pure-asset flows at the TLS ClientHello — curated
media seed (googlevideo/ytimg/fbcdn/twimg/scdn…) autolearn-promoted
never-HTML hosts — so the GIL-bound R3 workers skip forge/decrypt/parse/16-
addons on flows with no L7 value. Ships tls_splice=observe (DARK: classify +
log would-splice, still MITM); flip to `on` after soak; `off` kill-switch.
Never splices trackers (pure-trackers)/fortknox/no-SNI/media_cache-on.
Learning obs recorded off the event loop (bg thread), only for undecided
hosts. New splice_host_obs table; autolearn _splice_feed promotion.
-- Gerald KERMA <devel@cybermind.fr> Thu, 18 Jun 2026 14:30:00 +0200
secubox-toolbox (2.6.53-1~bookworm1) bookworm; urgency=medium
* perf(#646): adaptive Accept-Encoding strip in inject_banner. Keep gzip/br by

View File

@ -0,0 +1,148 @@
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
# Copyright (c) 2026 CyberMind — Gérald Kerma <devel@cybermind.fr>
"""SecuBox-Deb :: toolbox :: selective SNI-splice (#649, Lever A).
At the TLS ClientHello, splice (raw passthrough, no forge/decrypt/parse/addons)
pure-asset flows decided from the SNI. Modes (filters.tls_splice):
off never splice (legacy: MITM everything)
observe classify + log/count "would-splice", but still MITM (dark-launch)
on actually splice
Also records per-host content-type observations (MITM'd flows) to feed the
autolearn never-HTML promotion. Registered FIRST in the mitm-wg addon chain.
"""
from __future__ import annotations
import concurrent.futures as _futures
import json
import logging
import os
import sys
import time
if "/usr/lib/secubox/toolbox" not in sys.path:
sys.path.insert(0, "/usr/lib/secubox/toolbox")
from secubox_toolbox import splice as _splice # noqa: E402
from secubox_toolbox.filters import get_filters as _gf, FILTERS_PATH as _FILTERS_PATH # noqa: E402
try:
from secubox_toolbox import store as _store # noqa: E402
except Exception: # pragma: no cover
_store = None
log = logging.getLogger("secubox.toolbox.addons")
SEED_PATH = os.environ.get("SECUBOX_SPLICE_SEED",
"/usr/lib/secubox/toolbox/conf/tls-splice-seed.conf")
LEARNED_PATH = os.environ.get("SECUBOX_SPLICE_LEARNED",
"/var/lib/secubox/toolbox/splice-learned.txt")
PURE_PATH = os.environ.get("SECUBOX_PURE_TRACKERS",
"/var/lib/secubox/toolbox/pure-trackers.txt")
STATS = "/run/secubox/splice.json"
_counts = {"spliced": 0, "would_splice": 0, "mitm": 0, "since": int(time.time())}
_last_flush = 0.0
# Learning observations are written off the proxy event loop (mirror
# local_store): the response hook must return instantly. Single worker thread
# serialises writes to the shared SQLite.
_obs_executor = _futures.ThreadPoolExecutor(
max_workers=1, thread_name_prefix="sbx_splice_obs")
class TlsSplice:
def __init__(self) -> None:
self._seed: set = set()
self._learned: set = set()
self._never: set = set()
self._mtimes: tuple = ()
self._refresh_sets()
def _refresh_sets(self) -> None:
"""Reload seed/learned/never sets when any backing file changes.
Includes FILTERS_PATH so a fortknox site added via the WebUI lands in
the never-set promptly (else a newly-protected site could still be
spliced once in `on` mode until another file's mtime moved)."""
try:
mtimes = tuple(
os.stat(p).st_mtime if os.path.exists(p) else 0.0
for p in (SEED_PATH, LEARNED_PATH, PURE_PATH, _FILTERS_PATH))
except Exception:
mtimes = ()
if mtimes == self._mtimes and self._seed:
return
self._seed = _splice.load_splice_seed(SEED_PATH)
self._learned = _splice.load_learned_splice(LEARNED_PATH)
never = _splice.load_learned_splice(PURE_PATH) # pure trackers
try:
for s in _gf().get("fortknox_sites", []) or []:
never.add(str(s).lower().strip("."))
except Exception:
pass
self._never = never
self._mtimes = mtimes
def tls_clienthello(self, data) -> None:
try:
mode = _gf().get("tls_splice", "observe")
if mode == "off":
return
# media_cache wants to see asset flows → don't splice when it's on
if _gf().get("media_cache"):
return
sni = getattr(data.client_hello, "sni", None)
if not sni:
return
self._refresh_sets()
if not _splice.should_splice(sni, self._seed, self._learned, self._never):
return
if mode == "on":
data.ignore_connection = True
_counts["spliced"] += 1
else: # observe
_counts["would_splice"] += 1
log.info("tls-splice would-splice %s", sni)
self._flush()
except Exception as e: # never break a connection
log.debug("tls_splice clienthello error: %s", e)
def response(self, flow) -> None:
"""Record host content-type on MITM'd flows (learning signal).
Off the event loop (bg thread) so the hook returns instantly. Skips
hosts already decided (seed/learned/never) they need no more signal
so the DB is touched only for the still-unclassified long tail.
"""
if _store is None:
return
try:
if _gf().get("tls_splice", "observe") == "off":
return
host = (flow.request.pretty_host or "").lower().strip(".")
if not host:
return
# Already-decided hosts gain nothing from more observations.
if (_splice.host_matches(host, self._seed)
or _splice.host_matches(host, self._learned)
or _splice.host_matches(host, self._never)):
return
ct = (flow.response.headers.get("content-type", "") or "").lower()
_obs_executor.submit(_store.record_splice_obs, host, "text/html" in ct)
except Exception:
pass
def _flush(self) -> None:
global _last_flush
now = time.time()
if (now - _last_flush) < 5:
return
_last_flush = now
try:
os.makedirs(os.path.dirname(STATS), exist_ok=True)
with open(STATS, "w", encoding="utf-8") as f:
json.dump({**_counts, "updated": int(now)}, f)
except Exception:
pass
addons = [TlsSplice()]

View File

@ -25,6 +25,11 @@ OUT = os.environ.get("SECUBOX_AUTOLEARN_OUT",
"/var/lib/secubox/toolbox/learned-trackers.txt")
PURE_OUT = os.environ.get("SECUBOX_AUTOLEARN_PURE_OUT",
"/var/lib/secubox/toolbox/pure-trackers.txt")
SPLICE_LEARNED_OUT = os.environ.get(
"SECUBOX_SPLICE_LEARNED_OUT",
"/var/lib/secubox/toolbox/splice-learned.txt")
SPLICE_MIN_HITS = int(os.environ.get("SECUBOX_SPLICE_MIN_HITS", "20"))
SPLICE_MAX = 2000
MIN_SITES = 2 # cross-site threshold for operator-grade trackers
MAX_ENTRIES = 8000
COOKIE_XSITE_TOP_N = int(os.environ.get("SECUBOX_COOKIE_XSITE_TOP_N", "5"))
@ -94,6 +99,38 @@ def _dns_feed(pure_hosts) -> int:
return sum(1 for l in lines if "local-zone:" in l)
def _splice_feed() -> int:
"""Promote hosts that NEVER served text/html over >= SPLICE_MIN_HITS
observations into the learned-splice file (registrable-folded, capped).
Gated: skip when tls_splice == 'off'. Returns count written, or -1 if gated."""
try:
from secubox_toolbox.filters import get_filters
if get_filters().get("tls_splice", "observe") == "off":
return -1
except Exception:
pass
try:
con = sqlite3.connect(DB, timeout=5)
rows = con.execute(
"SELECT host FROM splice_host_obs WHERE hits >= ? AND html_hits = 0",
(SPLICE_MIN_HITS,)).fetchall()
con.close()
except Exception as e:
sys.stderr.write(f"autolearn: splice query failed: {e}\n")
return -1
hosts = sorted({(r[0] or "").lower().strip(".") for r in rows if r[0]})[:SPLICE_MAX]
try:
os.makedirs(os.path.dirname(SPLICE_LEARNED_OUT), exist_ok=True)
tmp = SPLICE_LEARNED_OUT + ".tmp"
with open(tmp, "w", encoding="utf-8") as fh:
fh.write("\n".join(hosts) + ("\n" if hosts else ""))
os.replace(tmp, SPLICE_LEARNED_OUT)
except Exception as e:
sys.stderr.write(f"autolearn: splice write failed: {e}\n")
return -1
return len(hosts)
def main() -> int:
learned: set[str] = set()
try:
@ -180,6 +217,11 @@ def main() -> int:
except Exception as e:
sys.stderr.write(f"autolearn: pure write failed: {e}\n")
dns_zones = _dns_feed(pout)
try:
_n_splice = _splice_feed()
sys.stderr.write(f"autolearn: {_n_splice} splice hosts learned\n")
except Exception as e:
sys.stderr.write(f"autolearn: splice feed error: {e}\n")
sys.stderr.write(
f"autolearn: {len(out)} hosts learned ({ti} threat-intel + "
f"{len(out) - ti} classified cross-site) @ {int(time.time())}"

View File

@ -95,8 +95,12 @@ if [ -n "$IGNORE_REGEX" ]; then
fi
# Addons :
# - inject_xff (Phase 7 #498) MUST be FIRST — sets X-Forwarded-For at
# requestheaders so other addons and the upstream see the real peer IP
# - tls_splice (#649) runs at tls_clienthello (BEFORE any requestheaders
# addon) — it splices pure-asset flows so the rest of the chain never even
# sees them. Listed first for clarity; its hook phase makes ordering vs the
# requestheaders addons irrelevant, so inject_xff stays first-at-requestheaders.
# - inject_xff (Phase 7 #498) MUST be FIRST among requestheaders addons — sets
# X-Forwarded-For at requestheaders so other addons and the upstream see the real peer IP
# - utiq_defense (Phase 8 #500) runs at requestheaders too ; placed
# EARLY so a R1 block short-circuits the flow before downstream
# addons spend cycles on it
@ -111,7 +115,7 @@ fi
# ad_ghost (#566) runs right after protective_mode: for R3+/R4 it 204s known
# ad/tracker hosts (bandwidth save) at request time and injects ad-hiding CSS
# on HTML responses. Gated by the modular filter config (toolbox WebUI).
for addon in inject_xff utiq_defense protective_mode privacy_guard ad_ghost media_cache local_store social_graph inject_banner dpi cookies avatar ja4 soc_relay cert_pin_detect media_stats; do
for addon in tls_splice inject_xff utiq_defense protective_mode privacy_guard ad_ghost media_cache local_store social_graph inject_banner dpi cookies avatar ja4 soc_relay cert_pin_detect media_stats; do
ARGS+=(-s "$ADDON_DIR/${addon}.py")
done

View File

@ -25,6 +25,7 @@ DEFAULTS: Dict = {
"media_cache": False, # #577 shared media proxy-cache (opt-in)
"stream_inject": True, # #620/#630 stream loader inject (TTFB) — default on
"autolearn": True, # #589 also block auto-learned bad hosts
"tls_splice": "observe", # #649 off | observe | on (asset SNI-splice)
# ── Anti-Track v2 (#633) — ships dark; arm after observe-only soak ──
"privacy_enforce": False, # master switch; off = observe-only
"privacy_poison": True, # forge stable fake id for loadbearing trackers
@ -41,6 +42,7 @@ DEFAULTS: Dict = {
}
_VALID_PROTECTIVE = ("off", "alert", "spoof")
_VALID_SPLICE = ("off", "observe", "on")
_cache: Dict = {}
_cache_ts: float = 0.0
@ -66,6 +68,8 @@ def get_filters(force: bool = False) -> Dict:
pass
if out.get("protective") not in _VALID_PROTECTIVE:
out["protective"] = DEFAULTS["protective"]
if out.get("tls_splice") not in _VALID_SPLICE:
out["tls_splice"] = DEFAULTS["tls_splice"]
_cache = out
_cache_ts = now
return out
@ -83,6 +87,8 @@ def set_filters(patch: Dict) -> Dict:
if ck in DEFAULTS["ad_ghost_categories"]})
elif k == "protective" and v in _VALID_PROTECTIVE:
cur["protective"] = v
elif k == "tls_splice" and v in _VALID_SPLICE:
cur["tls_splice"] = v
elif k == "fortknox_sites" and isinstance(v, list):
cur["fortknox_sites"] = [str(s).strip().lower() for s in v if str(s).strip()]
elif k in ("banner", "ad_ghost", "ad_ghost_block", "media_cache", "autolearn",

View File

@ -0,0 +1,56 @@
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
# Copyright (c) 2026 CyberMind — Gérald Kerma <devel@cybermind.fr>
"""SecuBox-Deb :: toolbox :: SNI-splice classifier (#649).
Pure helpers deciding, from the TLS SNI alone, whether a flow is a pure-asset
flow we can splice (raw passthrough, no MITM). Seed learned, minus a never-set
(trackers we block/poison, fortknox sites). Suffix match so CDN shards match.
"""
from __future__ import annotations
import os
from typing import Set
def _load_lines(path: str) -> Set[str]:
out: Set[str] = set()
try:
with open(path, "r", encoding="utf-8") as f:
for raw in f:
line = raw.split("#", 1)[0].strip().lower()
if line:
out.add(line)
except Exception:
pass
return out
def load_splice_seed(path: str) -> Set[str]:
return _load_lines(path)
def load_learned_splice(path: str) -> Set[str]:
return _load_lines(path)
def host_matches(host: str, patterns: Set[str]) -> bool:
"""True if host == pattern or host is a subdomain of pattern."""
h = (host or "").lower().strip(".")
if not h or not patterns:
return False
if h in patterns:
return True
for p in patterns:
if h.endswith("." + p):
return True
return False
def should_splice(sni: str, seed: Set[str], learned: Set[str],
never: Set[str]) -> bool:
s = (sni or "").lower().strip(".")
if not s:
return False
if host_matches(s, never): # never wins (trackers / fortknox)
return False
return host_matches(s, seed) or host_matches(s, learned)

View File

@ -14,6 +14,12 @@ log = logging.getLogger("secubox.toolbox")
DB_PATH = Path("/var/lib/secubox/toolbox/toolbox.db")
SCHEMA = """
CREATE TABLE IF NOT EXISTS splice_host_obs (
host TEXT PRIMARY KEY,
hits INTEGER NOT NULL DEFAULT 0,
html_hits INTEGER NOT NULL DEFAULT 0,
last_seen REAL
);
CREATE TABLE IF NOT EXISTS consents (
mac_hash TEXT PRIMARY KEY,
ts INTEGER NOT NULL,
@ -56,6 +62,45 @@ def _conn() -> sqlite3.Connection:
return c
_SPLICE_OBS_CAP = 50 # stop counting once we have enough signal per host
def record_splice_obs(host: str, is_html: bool) -> None:
"""Observe a MITM'd flow's host + whether it served text/html. Sampling-capped
so writes stay bounded. Best-effort (never raises into the proxy path)."""
h = (host or "").lower().strip(".")
if not h:
return
try:
with _conn() as c:
c.execute(
"INSERT INTO splice_host_obs(host, hits, html_hits, last_seen) "
"VALUES(?, 1, ?, ?) "
"ON CONFLICT(host) DO UPDATE SET "
" hits = MIN(hits + 1, ?), "
" html_hits = html_hits + ?, "
" last_seen = ? "
"WHERE splice_host_obs.hits < ?",
(h, 1 if is_html else 0, time.time(),
_SPLICE_OBS_CAP, 1 if is_html else 0, time.time(), _SPLICE_OBS_CAP),
)
except Exception as e:
log.debug("splice obs failed: %s", e)
def never_html_hosts(min_hits: int = 20) -> list[str]:
"""Hosts observed >= min_hits times that NEVER served text/html."""
try:
with _conn() as c:
rows = c.execute(
"SELECT host FROM splice_host_obs WHERE hits >= ? AND html_hits = 0",
(min_hits,),
).fetchall()
return [r[0] for r in rows]
except Exception:
return []
def record_consent(mac_hash: str, ip: str, ua: str | None, ttl_seconds: int) -> None:
with _conn() as c:
c.execute(

View File

@ -0,0 +1,32 @@
# tests/test_autolearn_splice.py
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
import os, sqlite3, importlib.util, pathlib
def _load_autolearn():
p = pathlib.Path(__file__).resolve().parents[1] / "sbin" / "secubox-toolbox-autolearn"
spec = importlib.util.spec_from_loader("autolearn", loader=None)
mod = importlib.util.module_from_spec(spec)
exec(compile(p.read_text(), str(p), "exec"), mod.__dict__)
return mod
def test_splice_feed_promotes_never_html(tmp_path, monkeypatch):
db = tmp_path / "t.db"
con = sqlite3.connect(db)
con.executescript(
"CREATE TABLE splice_host_obs(host TEXT PRIMARY KEY, hits INT, html_hits INT, last_seen REAL);"
"INSERT INTO splice_host_obs VALUES('cdn.assets.net',25,0,0);"
"INSERT INTO splice_host_obs VALUES('html.site.com',25,3,0);"
"INSERT INTO splice_host_obs VALUES('low.hits.net',5,0,0);")
con.commit(); con.close()
out = tmp_path / "splice-learned.txt"
monkeypatch.setenv("SECUBOX_AUTOLEARN_DB", str(db))
monkeypatch.setenv("SECUBOX_SPLICE_LEARNED_OUT", str(out))
al = _load_autolearn()
n = al._splice_feed()
learned = set(out.read_text().split())
assert "cdn.assets.net" in learned # never-HTML, >=20 hits
assert "html.site.com" not in learned # served HTML
assert "low.hits.net" not in learned # too few hits
assert n == 1

View File

@ -0,0 +1,23 @@
# tests/test_filters_splice.py
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
import json
from secubox_toolbox import filters
def test_default_is_observe(monkeypatch, tmp_path):
monkeypatch.setattr(filters, "FILTERS_PATH", str(tmp_path / "f.json"))
assert filters.get_filters(force=True)["tls_splice"] == "observe"
def test_bad_value_falls_back(monkeypatch, tmp_path):
fp = tmp_path / "f.json"; fp.write_text(json.dumps({"tls_splice": "bogus"}))
monkeypatch.setattr(filters, "FILTERS_PATH", str(fp))
assert filters.get_filters(force=True)["tls_splice"] == "observe"
def test_set_filters_accepts_valid(monkeypatch, tmp_path):
monkeypatch.setattr(filters, "FILTERS_PATH", str(tmp_path / "f.json"))
out = filters.set_filters({"tls_splice": "on"})
assert out["tls_splice"] == "on"
out = filters.set_filters({"tls_splice": "nope"})
assert out["tls_splice"] == "on" # invalid ignored, prior kept

View File

@ -0,0 +1,35 @@
# tests/test_splice_classify.py
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
from secubox_toolbox import splice
def test_host_matches_exact_and_subdomain():
pats = {"googlevideo.com", "fbcdn.net"}
assert splice.host_matches("googlevideo.com", pats)
assert splice.host_matches("r1---sn-x.googlevideo.com", pats)
assert not splice.host_matches("notgooglevideo.com", pats) # no false prefix
assert not splice.host_matches("example.com", pats)
def test_should_splice_seed_and_learned():
seed = {"googlevideo.com"}; learned = {"cdn.example.net"}; never = set()
assert splice.should_splice("x.googlevideo.com", seed, learned, never)
assert splice.should_splice("cdn.example.net", seed, learned, never)
assert not splice.should_splice("news.example.com", seed, learned, never)
def test_never_wins():
seed = {"evil-cdn.com"}; never = {"evil-cdn.com"}
assert not splice.should_splice("evil-cdn.com", seed, set(), never)
def test_empty_sni_or_sets():
assert not splice.should_splice("", {"a.com"}, set(), set())
assert not splice.should_splice("a.com", set(), set(), set())
def test_load_seed_strips_comments(tmp_path):
f = tmp_path / "seed.conf"
f.write_text("# header\ngooglevideo.com # yt\n\n fbcdn.net\n")
s = splice.load_splice_seed(str(f))
assert s == {"googlevideo.com", "fbcdn.net"}

View File

@ -0,0 +1,31 @@
# tests/test_splice_obs.py
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
from pathlib import Path
from secubox_toolbox import store
def _fresh(tmp_path, monkeypatch):
monkeypatch.setattr(store, "DB_PATH", Path(tmp_path) / "t.db")
def test_record_and_never_html(tmp_path, monkeypatch):
_fresh(tmp_path, monkeypatch)
for _ in range(20):
store.record_splice_obs("cdn.assets.net", is_html=False)
for _ in range(20):
store.record_splice_obs("www.site.com", is_html=False)
store.record_splice_obs("www.site.com", is_html=True) # served HTML once
hosts = store.never_html_hosts(min_hits=20)
assert "cdn.assets.net" in hosts
assert "www.site.com" not in hosts # html_hits > 0 → excluded
def test_sampling_cap(tmp_path, monkeypatch):
_fresh(tmp_path, monkeypatch)
for _ in range(100):
store.record_splice_obs("x.net", is_html=False)
# capped at 50 — never grows unbounded
import sqlite3
with store._conn() as c:
hits = c.execute("SELECT hits FROM splice_host_obs WHERE host='x.net'").fetchone()[0]
assert hits == 50

View File

@ -0,0 +1,109 @@
# tests/test_tls_splice_addon.py
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
import sys, pathlib, importlib, json, types
ADDON_DIR = pathlib.Path(__file__).resolve().parents[1] / "mitmproxy_addons"
sys.path.insert(0, str(ADDON_DIR))
from secubox_toolbox import filters
def _addon(monkeypatch, tmp_path, mode):
fp = tmp_path / "f.json"; fp.write_text(json.dumps({"tls_splice": mode}))
monkeypatch.setattr(filters, "FILTERS_PATH", str(fp)); filters.get_filters(force=True)
import tls_splice; importlib.reload(tls_splice)
a = tls_splice.TlsSplice()
a._seed = {"googlevideo.com"}; a._learned = set(); a._never = set()
monkeypatch.setattr(a, "_refresh_sets", lambda: None)
return tls_splice, a
def _ch(sni):
d = types.SimpleNamespace()
d.client_hello = types.SimpleNamespace(sni=sni)
d.context = types.SimpleNamespace(client=types.SimpleNamespace(peername=("10.99.1.2", 5)))
d.ignore_connection = False
return d
def test_on_splices_seed_host(monkeypatch, tmp_path):
_, a = _addon(monkeypatch, tmp_path, "on")
d = _ch("r1.googlevideo.com"); a.tls_clienthello(d)
assert d.ignore_connection is True
def test_observe_does_not_splice(monkeypatch, tmp_path):
_, a = _addon(monkeypatch, tmp_path, "observe")
d = _ch("r1.googlevideo.com"); a.tls_clienthello(d)
assert d.ignore_connection is False
def test_off_returns_early(monkeypatch, tmp_path):
_, a = _addon(monkeypatch, tmp_path, "off")
d = _ch("r1.googlevideo.com"); a.tls_clienthello(d)
assert d.ignore_connection is False
def test_non_seed_not_spliced(monkeypatch, tmp_path):
_, a = _addon(monkeypatch, tmp_path, "on")
d = _ch("news.example.com"); a.tls_clienthello(d)
assert d.ignore_connection is False
def test_no_sni_not_spliced(monkeypatch, tmp_path):
_, a = _addon(monkeypatch, tmp_path, "on")
d = _ch(None); a.tls_clienthello(d)
assert d.ignore_connection is False
def test_clienthello_exception_falls_through(monkeypatch, tmp_path):
"""A broken ClientHelloData must NOT raise and must NOT splice (→ MITM)."""
_, a = _addon(monkeypatch, tmp_path, "on")
class _Boom:
@property
def sni(self):
raise RuntimeError("malformed client hello")
d = types.SimpleNamespace()
d.client_hello = _Boom()
d.ignore_connection = False
a.tls_clienthello(d) # must not raise
assert d.ignore_connection is False # fail-safe → MITM
def test_response_records_undecided_host(monkeypatch, tmp_path):
tls_splice, a = _addon(monkeypatch, tmp_path, "observe")
calls = []
monkeypatch.setattr(tls_splice, "_obs_executor",
types.SimpleNamespace(submit=lambda fn, *args: calls.append(args)))
monkeypatch.setattr(tls_splice, "_store", types.SimpleNamespace(record_splice_obs=lambda *a: None))
f = types.SimpleNamespace(
request=types.SimpleNamespace(pretty_host="news.example.com"),
response=types.SimpleNamespace(headers={"content-type": "text/html; charset=utf-8"}))
a.response(f)
assert calls == [("news.example.com", True)] # undecided host, is_html=True
def test_response_skips_decided_host(monkeypatch, tmp_path):
tls_splice, a = _addon(monkeypatch, tmp_path, "observe")
calls = []
monkeypatch.setattr(tls_splice, "_obs_executor",
types.SimpleNamespace(submit=lambda fn, *args: calls.append(args)))
monkeypatch.setattr(tls_splice, "_store", types.SimpleNamespace(record_splice_obs=lambda *a: None))
f = types.SimpleNamespace(
request=types.SimpleNamespace(pretty_host="r1.googlevideo.com"), # in seed
response=types.SimpleNamespace(headers={"content-type": "video/mp4"}))
a.response(f)
assert calls == [] # already-decided (seed) → no observation write
def test_response_off_mode_skips(monkeypatch, tmp_path):
tls_splice, a = _addon(monkeypatch, tmp_path, "off")
calls = []
monkeypatch.setattr(tls_splice, "_obs_executor",
types.SimpleNamespace(submit=lambda fn, *args: calls.append(args)))
monkeypatch.setattr(tls_splice, "_store", types.SimpleNamespace(record_splice_obs=lambda *a: None))
f = types.SimpleNamespace(
request=types.SimpleNamespace(pretty_host="news.example.com"),
response=types.SimpleNamespace(headers={"content-type": "text/html"}))
a.response(f)
assert calls == [] # off → recorder disabled