mirror of
https://github.com/CyberMind-FR/secubox-deb.git
synced 2026-06-29 21:38:35 +00:00
Compare commits
13 Commits
ab8822e3f4
...
1139ce103e
| Author | SHA1 | Date | |
|---|---|---|---|
| 1139ce103e | |||
|
|
173b67c495 | ||
| e8cebe1662 | |||
| 3b0423189e | |||
| 742897700c | |||
| 296cebc69e | |||
| 4160167e5c | |||
| d836179a72 | |||
| 9d3630574f | |||
| 8664c84893 | |||
| 96e04bbe0f | |||
| 09e16f35a1 | |||
| 7834a29724 |
|
|
@ -3,6 +3,28 @@
|
|||
|
||||
---
|
||||
|
||||
## 2026-06-18 — #649 selective SNI-splice (Lever A) shipped dark (PR #650, toolbox 2.6.54)
|
||||
|
||||
- **Architecture decision.** Asked "do we need a full mitm for R3 HTTPS?" Answer:
|
||||
outbound HTTPS interception intrinsically needs per-host cert forging (the
|
||||
WAF/own-cert analogy doesn't transfer) — so we keep a forging MITM but only
|
||||
decrypt flows we'd actually modify. Plan = A-then-B: **A** = selective
|
||||
SNI-splice (this), **B** = Go/Rust core (strategic, later). WAF deferred.
|
||||
- **Lever A.** New `tls_splice` addon (first in the mitm-wg chain) decides at the
|
||||
TLS ClientHello, from the SNI alone, whether to MITM or **splice** (raw
|
||||
passthrough — no forge/decrypt/parse/16-addons). Policy: curated media-only seed
|
||||
(googlevideo/ytimg/fbcdn/twimg/scdn…, deliberately NOT generic CDN edges) ∪
|
||||
autolearn-promoted never-HTML hosts (`splice_host_obs` table, ≥20 obs,
|
||||
html_hits==0). Never splices trackers/fortknox/no-SNI/media_cache-on. Learning
|
||||
obs recorded off the event loop (bg thread), only for undecided hosts.
|
||||
- **Dark-launch.** Ships `tls_splice=observe` (classify + log would-splice, still
|
||||
MITM — zero behavior change); `on` flip is post-soak; `off` kill-switch.
|
||||
- **Built TDD** (7 tasks, 102 tests), two-stage reviews per task + whole-branch
|
||||
review (APPROVED; closed a hot-path sync-SQLite issue → bg-thread offload, and a
|
||||
fortknox-WebUI never-set refresh gap). **Deployed gk2 2.6.54**, rolling restart
|
||||
of the 4 workers, addon loads clean, 0 runtime errors, dark default confirmed.
|
||||
Next: soak → review → flip `on`.
|
||||
|
||||
## 2026-06-18 — #623 systemic shared-parent clobber resolved at source (PR #648)
|
||||
|
||||
- **Root cause corrected.** The recurring `/var/{lib,log,cache,…}/secubox` parent
|
||||
|
|
|
|||
|
|
@ -34,8 +34,23 @@ Tout mergé sur master + déployé sur gk2. Détail dans HISTORY 2026-06-18.
|
|||
thundering-herd) ; live couvert par `dirs-guard.timer` ; arrive au prochain
|
||||
build CI / reflash.
|
||||
|
||||
- ✅ **#649 Lever A — selective SNI-splice (PR #650, toolbox 2.6.54 LIVE dark)**.
|
||||
New `tls_splice` addon (first in mitm-wg chain) splices pure-asset flows at the
|
||||
TLS ClientHello — curated media seed (googlevideo/ytimg/fbcdn/twimg/scdn…) ∪
|
||||
autolearn-promoted never-HTML hosts — so GIL-bound R3 workers skip
|
||||
forge/decrypt/parse/16-addons on no-L7-value flows. Ships `tls_splice=observe`
|
||||
(DARK: classify+log, still MITM). Deployed gk2, addon loads clean, 0 runtime
|
||||
errors. Answer to "do we need full mitm?": YES for outbound HTTPS (per-host cert
|
||||
forging is intrinsic) — but only decrypt what we modify. Lever B (Go/Rust core)
|
||||
= strategic follow-up. WAF = later.
|
||||
|
||||
### ⬜ Next Up
|
||||
|
||||
- **#649 SOAK → FLIP** — review `would-splice` logs + `/run/secubox/splice.json`
|
||||
on real traffic for a soak window, confirm no first-party/HTML host is
|
||||
classified, then flip `tls_splice=on` in `/etc/secubox/toolbox/filters.json`
|
||||
(hot-reload). Before flip: the fortknox-via-WebUI refresh gap is already fixed.
|
||||
- **Lever B (#649 follow-up)** — Go/Rust forging-proxy core if A isn't enough.
|
||||
- **Anti-Track v2 ARMING** (décision USER, gated) — soak observe-only puis flip
|
||||
`privacy_enforce=true` ; régénérer `data/cdn-allowlist.txt` depuis les plages
|
||||
publiques avant `privacy_ip_drop` ; `unbound-checkconf` avant `privacy_dns_feed`.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,675 @@
|
|||
# Toolbox Selective SNI-Splice (Lever A) — Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax.
|
||||
|
||||
**Goal:** Splice (raw TCP passthrough) pure-asset HTTPS flows at the TLS ClientHello by SNI, so the GIL-bound mitm-wg workers only forge/decrypt/parse/run-addons on flows that need L7 work.
|
||||
|
||||
**Architecture:** New `tls_splice` addon (`tls_clienthello` hook) consults a pure classifier (`splice.py`) over a curated media seed ∪ autolearn-promoted never-HTML hosts; ships `tls_splice=observe` (dark) → flip to `on`. New `splice_host_obs` table feeds the learning. WAF + Go/Rust core out of scope.
|
||||
|
||||
**Tech Stack:** Python, mitmproxy 11 addon API, SQLite (WAL), pytest.
|
||||
|
||||
**Spec:** `docs/superpowers/specs/2026-06-18-toolbox-selective-sni-splice.md`
|
||||
|
||||
All paths below are under `packages/secubox-toolbox/`. Run tests from that dir with `python -m pytest` (fallback `python3 -m pytest`).
|
||||
|
||||
---
|
||||
|
||||
### Task 1: filters `tls_splice` toggle (off|observe|on, default observe)
|
||||
|
||||
**Files:** Modify `secubox_toolbox/filters.py`; Test `tests/test_filters_splice.py` (create)
|
||||
|
||||
- [ ] **Step 1: Failing test**
|
||||
|
||||
```python
|
||||
# tests/test_filters_splice.py
|
||||
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
|
||||
import json
|
||||
from secubox_toolbox import filters
|
||||
|
||||
|
||||
def test_default_is_observe(monkeypatch, tmp_path):
|
||||
monkeypatch.setattr(filters, "FILTERS_PATH", str(tmp_path / "f.json"))
|
||||
assert filters.get_filters(force=True)["tls_splice"] == "observe"
|
||||
|
||||
|
||||
def test_bad_value_falls_back(monkeypatch, tmp_path):
|
||||
fp = tmp_path / "f.json"; fp.write_text(json.dumps({"tls_splice": "bogus"}))
|
||||
monkeypatch.setattr(filters, "FILTERS_PATH", str(fp))
|
||||
assert filters.get_filters(force=True)["tls_splice"] == "observe"
|
||||
|
||||
|
||||
def test_set_filters_accepts_valid(monkeypatch, tmp_path):
|
||||
monkeypatch.setattr(filters, "FILTERS_PATH", str(tmp_path / "f.json"))
|
||||
out = filters.set_filters({"tls_splice": "on"})
|
||||
assert out["tls_splice"] == "on"
|
||||
out = filters.set_filters({"tls_splice": "nope"})
|
||||
assert out["tls_splice"] == "on" # invalid ignored, prior kept
|
||||
```
|
||||
|
||||
Run: `python -m pytest tests/test_filters_splice.py -v` → FAIL (`tls_splice` missing).
|
||||
|
||||
- [ ] **Step 2: Implement**
|
||||
|
||||
In `secubox_toolbox/filters.py`:
|
||||
- Add to `DEFAULTS` (after the `"autolearn": True,` line):
|
||||
```python
|
||||
"tls_splice": "observe", # #649 off | observe | on (asset SNI-splice)
|
||||
```
|
||||
- After `_VALID_PROTECTIVE = ("off", "alert", "spoof")` add:
|
||||
```python
|
||||
_VALID_SPLICE = ("off", "observe", "on")
|
||||
```
|
||||
- In `get_filters`, after the `protective` validation block (line ~67-68) add:
|
||||
```python
|
||||
if out.get("tls_splice") not in _VALID_SPLICE:
|
||||
out["tls_splice"] = DEFAULTS["tls_splice"]
|
||||
```
|
||||
- In `set_filters`, add a branch (after the `protective` branch):
|
||||
```python
|
||||
elif k == "tls_splice" and v in _VALID_SPLICE:
|
||||
cur["tls_splice"] = v
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Pass** — `python -m pytest tests/test_filters_splice.py -v` → PASS.
|
||||
- [ ] **Step 4: Commit** — `git add secubox_toolbox/filters.py tests/test_filters_splice.py && git commit -m "feat(toolbox): tls_splice filter toggle off|observe|on (ref #649)"`
|
||||
|
||||
---
|
||||
|
||||
### Task 2: `splice.py` classifier (pure)
|
||||
|
||||
**Files:** Create `secubox_toolbox/splice.py`, `tests/test_splice_classify.py`
|
||||
|
||||
- [ ] **Step 1: Failing test**
|
||||
|
||||
```python
|
||||
# tests/test_splice_classify.py
|
||||
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
|
||||
from secubox_toolbox import splice
|
||||
|
||||
|
||||
def test_host_matches_exact_and_subdomain():
|
||||
pats = {"googlevideo.com", "fbcdn.net"}
|
||||
assert splice.host_matches("googlevideo.com", pats)
|
||||
assert splice.host_matches("r1---sn-x.googlevideo.com", pats)
|
||||
assert not splice.host_matches("notgooglevideo.com", pats) # no false prefix
|
||||
assert not splice.host_matches("example.com", pats)
|
||||
|
||||
|
||||
def test_should_splice_seed_and_learned():
|
||||
seed = {"googlevideo.com"}; learned = {"cdn.example.net"}; never = set()
|
||||
assert splice.should_splice("x.googlevideo.com", seed, learned, never)
|
||||
assert splice.should_splice("cdn.example.net", seed, learned, never)
|
||||
assert not splice.should_splice("news.example.com", seed, learned, never)
|
||||
|
||||
|
||||
def test_never_wins():
|
||||
seed = {"evil-cdn.com"}; never = {"evil-cdn.com"}
|
||||
assert not splice.should_splice("evil-cdn.com", seed, set(), never)
|
||||
|
||||
|
||||
def test_empty_sni_or_sets():
|
||||
assert not splice.should_splice("", {"a.com"}, set(), set())
|
||||
assert not splice.should_splice("a.com", set(), set(), set())
|
||||
|
||||
|
||||
def test_load_seed_strips_comments(tmp_path):
|
||||
f = tmp_path / "seed.conf"
|
||||
f.write_text("# header\ngooglevideo.com # yt\n\n fbcdn.net\n")
|
||||
s = splice.load_splice_seed(str(f))
|
||||
assert s == {"googlevideo.com", "fbcdn.net"}
|
||||
```
|
||||
|
||||
Run: `python -m pytest tests/test_splice_classify.py -v` → FAIL (no module).
|
||||
|
||||
- [ ] **Step 2: Implement** `secubox_toolbox/splice.py`
|
||||
|
||||
```python
|
||||
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
|
||||
# Copyright (c) 2026 CyberMind — Gérald Kerma <devel@cybermind.fr>
|
||||
"""SecuBox-Deb :: toolbox :: SNI-splice classifier (#649).
|
||||
|
||||
Pure helpers deciding, from the TLS SNI alone, whether a flow is a pure-asset
|
||||
flow we can splice (raw passthrough, no MITM). Seed ∪ learned, minus a never-set
|
||||
(trackers we block/poison, fortknox sites). Suffix match so CDN shards match.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from typing import Set
|
||||
|
||||
|
||||
def _load_lines(path: str) -> Set[str]:
|
||||
out: Set[str] = set()
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
for raw in f:
|
||||
line = raw.split("#", 1)[0].strip().lower()
|
||||
if line:
|
||||
out.add(line)
|
||||
except Exception:
|
||||
pass
|
||||
return out
|
||||
|
||||
|
||||
def load_splice_seed(path: str) -> Set[str]:
|
||||
return _load_lines(path)
|
||||
|
||||
|
||||
def load_learned_splice(path: str) -> Set[str]:
|
||||
return _load_lines(path)
|
||||
|
||||
|
||||
def host_matches(host: str, patterns: Set[str]) -> bool:
|
||||
"""True if host == pattern or host is a subdomain of pattern."""
|
||||
h = (host or "").lower().strip(".")
|
||||
if not h or not patterns:
|
||||
return False
|
||||
if h in patterns:
|
||||
return True
|
||||
for p in patterns:
|
||||
if h.endswith("." + p):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def should_splice(sni: str, seed: Set[str], learned: Set[str],
|
||||
never: Set[str]) -> bool:
|
||||
s = (sni or "").lower().strip(".")
|
||||
if not s:
|
||||
return False
|
||||
if host_matches(s, never): # never wins (trackers / fortknox)
|
||||
return False
|
||||
return host_matches(s, seed) or host_matches(s, learned)
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Pass** — `python -m pytest tests/test_splice_classify.py -v` → PASS.
|
||||
- [ ] **Step 4: Commit** — `git add secubox_toolbox/splice.py tests/test_splice_classify.py && git commit -m "feat(toolbox): SNI-splice classifier (seed/learned/never) (ref #649)"`
|
||||
|
||||
---
|
||||
|
||||
### Task 3: curated media seed conf
|
||||
|
||||
**Files:** Create `conf/tls-splice-seed.conf`
|
||||
|
||||
- [ ] **Step 1: Create** `conf/tls-splice-seed.conf`
|
||||
|
||||
```
|
||||
# SecuBox toolbox :: SNI-splice seed (#649)
|
||||
# MEDIA/ASSET-SPECIFIC hosts only — NEVER generic CDN edges (cloudfront/fastly/
|
||||
# akamai-edge) which also serve HTML apps; splicing those would blind the MITM
|
||||
# to real pages. Suffix-matched (subdomains included). One host suffix per line.
|
||||
googlevideo.com # YouTube video streams (largest single hog)
|
||||
ytimg.com # YouTube thumbnails
|
||||
gstatic.com # Google static assets
|
||||
ggpht.com # Google user content / avatars
|
||||
fbcdn.net # Facebook / Instagram media
|
||||
cdninstagram.com # Instagram media
|
||||
twimg.com # Twitter / X media
|
||||
licdn.com # LinkedIn media
|
||||
sndcdn.com # SoundCloud audio
|
||||
scdn.co # Spotify audio
|
||||
mzstatic.com # Apple media / artwork
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Commit** — `git add conf/tls-splice-seed.conf && git commit -m "feat(toolbox): curated media SNI-splice seed (ref #649)"`
|
||||
|
||||
---
|
||||
|
||||
### Task 4: `splice_host_obs` table + store helpers
|
||||
|
||||
**Files:** Modify `secubox_toolbox/store.py`; Test `tests/test_splice_obs.py`
|
||||
|
||||
- [ ] **Step 1: Failing test**
|
||||
|
||||
```python
|
||||
# tests/test_splice_obs.py
|
||||
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
|
||||
from pathlib import Path
|
||||
from secubox_toolbox import store
|
||||
|
||||
|
||||
def _fresh(tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(store, "DB_PATH", Path(tmp_path) / "t.db")
|
||||
|
||||
|
||||
def test_record_and_never_html(tmp_path, monkeypatch):
|
||||
_fresh(tmp_path, monkeypatch)
|
||||
for _ in range(20):
|
||||
store.record_splice_obs("cdn.assets.net", is_html=False)
|
||||
for _ in range(20):
|
||||
store.record_splice_obs("www.site.com", is_html=False)
|
||||
store.record_splice_obs("www.site.com", is_html=True) # served HTML once
|
||||
hosts = store.never_html_hosts(min_hits=20)
|
||||
assert "cdn.assets.net" in hosts
|
||||
assert "www.site.com" not in hosts # html_hits > 0 → excluded
|
||||
|
||||
|
||||
def test_sampling_cap(tmp_path, monkeypatch):
|
||||
_fresh(tmp_path, monkeypatch)
|
||||
for _ in range(100):
|
||||
store.record_splice_obs("x.net", is_html=False)
|
||||
# capped at 50 — never grows unbounded
|
||||
import sqlite3
|
||||
with store._conn() as c:
|
||||
hits = c.execute("SELECT hits FROM splice_host_obs WHERE host='x.net'").fetchone()[0]
|
||||
assert hits == 50
|
||||
```
|
||||
|
||||
Run: `python -m pytest tests/test_splice_obs.py -v` → FAIL.
|
||||
|
||||
- [ ] **Step 2: Implement** in `secubox_toolbox/store.py`
|
||||
|
||||
Add to the `SCHEMA` string (before its closing `"""`):
|
||||
```sql
|
||||
CREATE TABLE IF NOT EXISTS splice_host_obs (
|
||||
host TEXT PRIMARY KEY,
|
||||
hits INTEGER NOT NULL DEFAULT 0,
|
||||
html_hits INTEGER NOT NULL DEFAULT 0,
|
||||
last_seen REAL
|
||||
);
|
||||
```
|
||||
|
||||
Add these functions (anywhere after `_conn`):
|
||||
```python
|
||||
_SPLICE_OBS_CAP = 50 # stop counting once we have enough signal per host
|
||||
|
||||
|
||||
def record_splice_obs(host: str, is_html: bool) -> None:
|
||||
"""Observe a MITM'd flow's host + whether it served text/html. Sampling-capped
|
||||
so writes stay bounded. Best-effort (never raises into the proxy path)."""
|
||||
h = (host or "").lower().strip(".")
|
||||
if not h:
|
||||
return
|
||||
try:
|
||||
with _conn() as c:
|
||||
c.execute(
|
||||
"INSERT INTO splice_host_obs(host, hits, html_hits, last_seen) "
|
||||
"VALUES(?, 1, ?, ?) "
|
||||
"ON CONFLICT(host) DO UPDATE SET "
|
||||
" hits = MIN(hits + 1, ?), "
|
||||
" html_hits = html_hits + ?, "
|
||||
" last_seen = ? "
|
||||
"WHERE splice_host_obs.hits < ?",
|
||||
(h, 1 if is_html else 0, time.time(),
|
||||
_SPLICE_OBS_CAP, 1 if is_html else 0, time.time(), _SPLICE_OBS_CAP),
|
||||
)
|
||||
except Exception as e:
|
||||
log.debug("splice obs failed: %s", e)
|
||||
|
||||
|
||||
def never_html_hosts(min_hits: int = 20) -> list[str]:
|
||||
"""Hosts observed >= min_hits times that NEVER served text/html."""
|
||||
try:
|
||||
with _conn() as c:
|
||||
rows = c.execute(
|
||||
"SELECT host FROM splice_host_obs WHERE hits >= ? AND html_hits = 0",
|
||||
(min_hits,),
|
||||
).fetchall()
|
||||
return [r[0] for r in rows]
|
||||
except Exception:
|
||||
return []
|
||||
```
|
||||
NOTE: confirm `_conn()` returns rows indexable by `[0]` (it uses default row
|
||||
factory unless `row_factory` set — check; if `sqlite3.Row` is set, `r[0]` still
|
||||
works). If `record_splice_obs`'s `WHERE ... hits < cap` guard interferes with the
|
||||
`MIN(...)` (redundant), keep only the `WHERE` guard OR the `MIN` — pick the `MIN`
|
||||
form and drop the trailing `WHERE` clause if the test shows the cap exceeded; the
|
||||
test asserts hits==50.
|
||||
|
||||
- [ ] **Step 3: Pass** — `python -m pytest tests/test_splice_obs.py -v` → PASS. (If the ON CONFLICT cap logic misbehaves on the SQLite version, simplify to: read current hits, `if hits < cap` then increment — keep it passing the test.)
|
||||
- [ ] **Step 4: Commit** — `git add secubox_toolbox/store.py tests/test_splice_obs.py && git commit -m "feat(toolbox): splice_host_obs table + record/never_html helpers (ref #649)"`
|
||||
|
||||
---
|
||||
|
||||
### Task 5: `tls_splice.py` addon
|
||||
|
||||
**Files:** Create `mitmproxy_addons/tls_splice.py`, `tests/test_tls_splice_addon.py`
|
||||
|
||||
- [ ] **Step 1: Failing test**
|
||||
|
||||
```python
|
||||
# tests/test_tls_splice_addon.py
|
||||
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
|
||||
import sys, pathlib, importlib, json, types
|
||||
ADDON_DIR = pathlib.Path(__file__).resolve().parents[1] / "mitmproxy_addons"
|
||||
sys.path.insert(0, str(ADDON_DIR))
|
||||
from secubox_toolbox import filters
|
||||
|
||||
|
||||
def _addon(monkeypatch, tmp_path, mode):
|
||||
fp = tmp_path / "f.json"; fp.write_text(json.dumps({"tls_splice": mode}))
|
||||
monkeypatch.setattr(filters, "FILTERS_PATH", str(fp)); filters.get_filters(force=True)
|
||||
import tls_splice; importlib.reload(tls_splice)
|
||||
a = tls_splice.TlsSplice()
|
||||
a._seed = {"googlevideo.com"}; a._learned = set(); a._never = set()
|
||||
monkeypatch.setattr(a, "_refresh_sets", lambda: None)
|
||||
return tls_splice, a
|
||||
|
||||
|
||||
def _ch(sni):
|
||||
d = types.SimpleNamespace()
|
||||
d.client_hello = types.SimpleNamespace(sni=sni)
|
||||
d.context = types.SimpleNamespace(client=types.SimpleNamespace(peername=("10.99.1.2", 5)))
|
||||
d.ignore_connection = False
|
||||
return d
|
||||
|
||||
|
||||
def test_on_splices_seed_host(monkeypatch, tmp_path):
|
||||
_, a = _addon(monkeypatch, tmp_path, "on")
|
||||
d = _ch("r1.googlevideo.com"); a.tls_clienthello(d)
|
||||
assert d.ignore_connection is True
|
||||
|
||||
|
||||
def test_observe_does_not_splice(monkeypatch, tmp_path):
|
||||
_, a = _addon(monkeypatch, tmp_path, "observe")
|
||||
d = _ch("r1.googlevideo.com"); a.tls_clienthello(d)
|
||||
assert d.ignore_connection is False
|
||||
|
||||
|
||||
def test_off_returns_early(monkeypatch, tmp_path):
|
||||
_, a = _addon(monkeypatch, tmp_path, "off")
|
||||
d = _ch("r1.googlevideo.com"); a.tls_clienthello(d)
|
||||
assert d.ignore_connection is False
|
||||
|
||||
|
||||
def test_non_seed_not_spliced(monkeypatch, tmp_path):
|
||||
_, a = _addon(monkeypatch, tmp_path, "on")
|
||||
d = _ch("news.example.com"); a.tls_clienthello(d)
|
||||
assert d.ignore_connection is False
|
||||
|
||||
|
||||
def test_no_sni_not_spliced(monkeypatch, tmp_path):
|
||||
_, a = _addon(monkeypatch, tmp_path, "on")
|
||||
d = _ch(None); a.tls_clienthello(d)
|
||||
assert d.ignore_connection is False
|
||||
```
|
||||
|
||||
Run: `python -m pytest tests/test_tls_splice_addon.py -v` → FAIL.
|
||||
|
||||
- [ ] **Step 2: Implement** `mitmproxy_addons/tls_splice.py`
|
||||
|
||||
```python
|
||||
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
|
||||
# Copyright (c) 2026 CyberMind — Gérald Kerma <devel@cybermind.fr>
|
||||
"""SecuBox-Deb :: toolbox :: selective SNI-splice (#649, Lever A).
|
||||
|
||||
At the TLS ClientHello, splice (raw passthrough, no forge/decrypt/parse/addons)
|
||||
pure-asset flows decided from the SNI. Modes (filters.tls_splice):
|
||||
off — never splice (legacy: MITM everything)
|
||||
observe — classify + log/count "would-splice", but still MITM (dark-launch)
|
||||
on — actually splice
|
||||
Also records per-host content-type observations (MITM'd flows) to feed the
|
||||
autolearn never-HTML promotion. Registered FIRST in the mitm-wg addon chain.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
if "/usr/lib/secubox/toolbox" not in sys.path:
|
||||
sys.path.insert(0, "/usr/lib/secubox/toolbox")
|
||||
|
||||
from secubox_toolbox import splice as _splice # noqa: E402
|
||||
from secubox_toolbox.filters import get_filters as _gf # noqa: E402
|
||||
try:
|
||||
from secubox_toolbox import store as _store # noqa: E402
|
||||
except Exception: # pragma: no cover
|
||||
_store = None
|
||||
|
||||
log = logging.getLogger("secubox.toolbox.addons")
|
||||
|
||||
SEED_PATH = os.environ.get("SECUBOX_SPLICE_SEED",
|
||||
"/usr/lib/secubox/toolbox/conf/tls-splice-seed.conf")
|
||||
LEARNED_PATH = os.environ.get("SECUBOX_SPLICE_LEARNED",
|
||||
"/var/lib/secubox/toolbox/splice-learned.txt")
|
||||
PURE_PATH = os.environ.get("SECUBOX_PURE_TRACKERS",
|
||||
"/var/lib/secubox/toolbox/pure-trackers.txt")
|
||||
STATS = "/run/secubox/splice.json"
|
||||
|
||||
_counts = {"spliced": 0, "would_splice": 0, "mitm": 0, "since": int(time.time())}
|
||||
_last_flush = 0.0
|
||||
|
||||
|
||||
class TlsSplice:
|
||||
def __init__(self) -> None:
|
||||
self._seed: set = set()
|
||||
self._learned: set = set()
|
||||
self._never: set = set()
|
||||
self._mtimes: tuple = ()
|
||||
self._refresh_sets()
|
||||
|
||||
def _refresh_sets(self) -> None:
|
||||
"""Reload seed/learned/never sets when any backing file changes."""
|
||||
try:
|
||||
mtimes = tuple(
|
||||
os.stat(p).st_mtime if os.path.exists(p) else 0.0
|
||||
for p in (SEED_PATH, LEARNED_PATH, PURE_PATH))
|
||||
except Exception:
|
||||
mtimes = ()
|
||||
if mtimes == self._mtimes and self._seed:
|
||||
return
|
||||
self._seed = _splice.load_splice_seed(SEED_PATH)
|
||||
self._learned = _splice.load_learned_splice(LEARNED_PATH)
|
||||
never = _splice.load_learned_splice(PURE_PATH) # pure trackers
|
||||
try:
|
||||
for s in _gf().get("fortknox_sites", []) or []:
|
||||
never.add(str(s).lower().strip("."))
|
||||
except Exception:
|
||||
pass
|
||||
self._never = never
|
||||
self._mtimes = mtimes
|
||||
|
||||
def tls_clienthello(self, data) -> None:
|
||||
try:
|
||||
mode = _gf().get("tls_splice", "observe")
|
||||
if mode == "off":
|
||||
return
|
||||
# media_cache wants to see asset flows → don't splice when it's on
|
||||
if _gf().get("media_cache"):
|
||||
return
|
||||
sni = getattr(data.client_hello, "sni", None)
|
||||
if not sni:
|
||||
return
|
||||
self._refresh_sets()
|
||||
if not _splice.should_splice(sni, self._seed, self._learned, self._never):
|
||||
return
|
||||
if mode == "on":
|
||||
data.ignore_connection = True
|
||||
_counts["spliced"] += 1
|
||||
else: # observe
|
||||
_counts["would_splice"] += 1
|
||||
log.info("tls-splice would-splice %s", sni)
|
||||
self._flush()
|
||||
except Exception as e: # never break a connection
|
||||
log.debug("tls_splice clienthello error: %s", e)
|
||||
|
||||
def response(self, flow) -> None:
|
||||
"""Record host content-type on MITM'd flows (learning signal)."""
|
||||
if _store is None:
|
||||
return
|
||||
try:
|
||||
if _gf().get("tls_splice", "observe") == "off":
|
||||
return
|
||||
host = flow.request.pretty_host or ""
|
||||
ct = (flow.response.headers.get("content-type", "") or "").lower()
|
||||
_store.record_splice_obs(host, is_html=("text/html" in ct))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _flush(self) -> None:
|
||||
global _last_flush
|
||||
now = time.time()
|
||||
if (now - _last_flush) < 5:
|
||||
return
|
||||
_last_flush = now
|
||||
try:
|
||||
os.makedirs(os.path.dirname(STATS), exist_ok=True)
|
||||
with open(STATS, "w", encoding="utf-8") as f:
|
||||
json.dump({**_counts, "updated": int(now)}, f)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
addons = [TlsSplice()]
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Pass** — `python -m pytest tests/test_tls_splice_addon.py -v` → PASS.
|
||||
- [ ] **Step 4: Commit** — `git add mitmproxy_addons/tls_splice.py tests/test_tls_splice_addon.py && git commit -m "feat(toolbox): tls_splice addon — SNI-splice at ClientHello + obs recorder (ref #649)"`
|
||||
|
||||
---
|
||||
|
||||
### Task 6: autolearn `_splice_feed` promotion
|
||||
|
||||
**Files:** Modify `sbin/secubox-toolbox-autolearn`; Test `tests/test_autolearn_splice.py`
|
||||
|
||||
- [ ] **Step 1: Failing test**
|
||||
|
||||
```python
|
||||
# tests/test_autolearn_splice.py
|
||||
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
|
||||
import os, sqlite3, importlib.util, pathlib
|
||||
|
||||
|
||||
def _load_autolearn():
|
||||
p = pathlib.Path(__file__).resolve().parents[1] / "sbin" / "secubox-toolbox-autolearn"
|
||||
spec = importlib.util.spec_from_loader("autolearn", loader=None)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
exec(compile(p.read_text(), str(p), "exec"), mod.__dict__)
|
||||
return mod
|
||||
|
||||
|
||||
def test_splice_feed_promotes_never_html(tmp_path, monkeypatch):
|
||||
db = tmp_path / "t.db"
|
||||
con = sqlite3.connect(db)
|
||||
con.executescript(
|
||||
"CREATE TABLE splice_host_obs(host TEXT PRIMARY KEY, hits INT, html_hits INT, last_seen REAL);"
|
||||
"INSERT INTO splice_host_obs VALUES('cdn.assets.net',25,0,0);"
|
||||
"INSERT INTO splice_host_obs VALUES('html.site.com',25,3,0);"
|
||||
"INSERT INTO splice_host_obs VALUES('low.hits.net',5,0,0);")
|
||||
con.commit(); con.close()
|
||||
out = tmp_path / "splice-learned.txt"
|
||||
monkeypatch.setenv("SECUBOX_AUTOLEARN_DB", str(db))
|
||||
monkeypatch.setenv("SECUBOX_SPLICE_LEARNED_OUT", str(out))
|
||||
al = _load_autolearn()
|
||||
n = al._splice_feed()
|
||||
learned = set(out.read_text().split())
|
||||
assert "cdn.assets.net" in learned # never-HTML, >=20 hits
|
||||
assert "html.site.com" not in learned # served HTML
|
||||
assert "low.hits.net" not in learned # too few hits
|
||||
assert n == 1
|
||||
```
|
||||
|
||||
Run: `python -m pytest tests/test_autolearn_splice.py -v` → FAIL.
|
||||
|
||||
- [ ] **Step 2: Implement** in `sbin/secubox-toolbox-autolearn`
|
||||
|
||||
Add near the other env paths (after `PURE_OUT`):
|
||||
```python
|
||||
SPLICE_LEARNED_OUT = os.environ.get(
|
||||
"SECUBOX_SPLICE_LEARNED_OUT",
|
||||
"/var/lib/secubox/toolbox/splice-learned.txt")
|
||||
SPLICE_MIN_HITS = int(os.environ.get("SECUBOX_SPLICE_MIN_HITS", "20"))
|
||||
SPLICE_MAX = 2000
|
||||
```
|
||||
|
||||
Add the function (near `_dns_feed`):
|
||||
```python
|
||||
def _splice_feed() -> int:
|
||||
"""Promote hosts that NEVER served text/html over >= SPLICE_MIN_HITS
|
||||
observations into the learned-splice file (registrable-folded, capped).
|
||||
Gated: skip when tls_splice == 'off'. Returns count written, or -1 if gated."""
|
||||
try:
|
||||
from secubox_toolbox.filters import get_filters
|
||||
if get_filters().get("tls_splice", "observe") == "off":
|
||||
return -1
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
con = sqlite3.connect(DB, timeout=5)
|
||||
rows = con.execute(
|
||||
"SELECT host FROM splice_host_obs WHERE hits >= ? AND html_hits = 0",
|
||||
(SPLICE_MIN_HITS,)).fetchall()
|
||||
con.close()
|
||||
except Exception as e:
|
||||
sys.stderr.write(f"autolearn: splice query failed: {e}\n")
|
||||
return -1
|
||||
hosts = sorted({(r[0] or "").lower().strip(".") for r in rows if r[0]})[:SPLICE_MAX]
|
||||
try:
|
||||
os.makedirs(os.path.dirname(SPLICE_LEARNED_OUT), exist_ok=True)
|
||||
tmp = SPLICE_LEARNED_OUT + ".tmp"
|
||||
with open(tmp, "w", encoding="utf-8") as fh:
|
||||
fh.write("\n".join(hosts) + ("\n" if hosts else ""))
|
||||
os.replace(tmp, SPLICE_LEARNED_OUT)
|
||||
except Exception as e:
|
||||
sys.stderr.write(f"autolearn: splice write failed: {e}\n")
|
||||
return -1
|
||||
return len(hosts)
|
||||
```
|
||||
|
||||
Call it from the script's main body (where `_dns_feed` etc. are invoked — find the bottom main section and add):
|
||||
```python
|
||||
try:
|
||||
_n_splice = _splice_feed()
|
||||
sys.stderr.write(f"autolearn: {_n_splice} splice hosts learned\n")
|
||||
except Exception as e:
|
||||
sys.stderr.write(f"autolearn: splice feed error: {e}\n")
|
||||
```
|
||||
(Place it alongside the existing feed calls; keep it best-effort so it never aborts the run.)
|
||||
|
||||
- [ ] **Step 3: Pass** — `python -m pytest tests/test_autolearn_splice.py -v` → PASS.
|
||||
- [ ] **Step 4: Commit** — `git add sbin/secubox-toolbox-autolearn tests/test_autolearn_splice.py && git commit -m "feat(toolbox): autolearn promotes never-HTML hosts to splice-learned (ref #649)"`
|
||||
|
||||
---
|
||||
|
||||
### Task 7: wiring (launch chain, debian/rules, changelog)
|
||||
|
||||
**Files:** Modify `sbin/secubox-toolbox-mitm-wg-launch`, `debian/rules`, `debian/changelog`
|
||||
|
||||
- [ ] **Step 1: Register addon FIRST** in `sbin/secubox-toolbox-mitm-wg-launch`
|
||||
|
||||
In the `for addon in ... ; do` list (currently begins `inject_xff utiq_defense ...`), prepend `tls_splice`:
|
||||
```bash
|
||||
for addon in tls_splice inject_xff utiq_defense protective_mode privacy_guard ad_ghost media_cache local_store social_graph inject_banner dpi cookies avatar ja4 soc_relay cert_pin_detect media_stats; do
|
||||
```
|
||||
(Its only acting hook is `tls_clienthello`, which fires before any requestheaders
|
||||
addon regardless of order — so this doesn't disturb inject_xff's first-at-
|
||||
requestheaders contract; placing it first is just clarity.)
|
||||
|
||||
- [ ] **Step 2: Install the seed conf** in `debian/rules`
|
||||
|
||||
Find the block that installs `conf/` (the bypass-seed install, near `conf/mitm-bypass-seed.conf`) and ensure the whole `conf/` dir (or the new file) lands at `/usr/lib/secubox/toolbox/conf/`. If there's an explicit per-file copy, add:
|
||||
```make
|
||||
install -m 0644 conf/tls-splice-seed.conf $(DESTDIR)/usr/lib/secubox/toolbox/conf/
|
||||
```
|
||||
(If `conf/` is copied wholesale, no change needed — verify with `grep -n "conf/" debian/rules`.)
|
||||
|
||||
- [ ] **Step 3: Bump changelog** — new top entry in `debian/changelog`, version after the current top (`head -1 debian/changelog`):
|
||||
```
|
||||
secubox-toolbox (2.6.54-1~bookworm1) bookworm; urgency=medium
|
||||
|
||||
* feat(#649): selective SNI-splice (Lever A). New tls_splice addon splices
|
||||
pure-asset flows (curated media seed + autolearn-promoted never-HTML hosts)
|
||||
at the TLS ClientHello — no forge/decrypt/parse/addons on those — so the
|
||||
GIL-bound R3 workers only do L7 work on flows that need it. Ships
|
||||
tls_splice=observe (dark: classify + log, still MITM); flip to `on` after
|
||||
soak. Kill-switch `off`. Trackers/fortknox/no-SNI/media_cache never spliced.
|
||||
|
||||
-- Gerald KERMA <devel@cybermind.fr> Thu, 18 Jun 2026 14:00:00 +0200
|
||||
```
|
||||
(Use the actual next version; if top is 2.6.53 → 2.6.54.)
|
||||
|
||||
- [ ] **Step 4: Full suite** — `python -m pytest tests/ -q` → all green (new + existing).
|
||||
- [ ] **Step 5: bash -n** — `bash -n sbin/secubox-toolbox-mitm-wg-launch && python3 -c "import ast; ast.parse(open('sbin/secubox-toolbox-autolearn').read())"` → no errors.
|
||||
- [ ] **Step 6: Commit** — `git add sbin/secubox-toolbox-mitm-wg-launch debian/rules debian/changelog && git commit -m "feat(toolbox): wire tls_splice addon + seed install + changelog 2.6.54 (ref #649)"`
|
||||
|
||||
---
|
||||
|
||||
## Self-Review notes
|
||||
- Spec coverage: filters (T1), classifier (T2), seed (T3), obs/learning store (T4), addon incl. dark-launch modes + media_cache guard + recorder (T5), autolearn promotion (T6), wiring (T7). All spec sections mapped.
|
||||
- Threshold consistency: obs sampling cap 50 (T4) ≥ promotion min_hits 20 (T4 test, T6) — a host can reach 20 never-HTML hits within the 50 cap. Consistent.
|
||||
- `should_splice` never-set includes pure-trackers + fortknox (T2/T5). media_cache guard is in the addon (T5), not the pure classifier — keeps `splice.py` pure.
|
||||
- Dark default `observe` (T1) means deploy is behavior-neutral until flip; matches spec rollout.
|
||||
- Risk noted in T4 Step 2: verify the `ON CONFLICT ... MIN()/WHERE` cap on the target SQLite; fallback (read-then-write) given if needed to keep the test green.
|
||||
|
|
@ -0,0 +1,159 @@
|
|||
# Toolbox selective SNI-splice (Lever A) — design
|
||||
|
||||
- **Date:** 2026-06-18 · **Package:** `secubox-toolbox` · **Issue:** #649
|
||||
- **Status:** Design approved (adaptive seed+learned, dark-launch). Pending plan.
|
||||
- **Parent:** lighter-MITM plan, A-then-B. This is **Lever A** (stay in mitmproxy,
|
||||
decrypt only what we modify). Lever B (Go/Rust core) is a later strategic call.
|
||||
WAF is explicitly out of scope here ("maybe later").
|
||||
|
||||
## Problem
|
||||
R3 web loading is slow because the 4 `secubox-toolbox-mitm-wg-worker@` processes
|
||||
are GIL-bound (~1 core total, each pinned ~25–30% = single-thread ceiling) and
|
||||
**forge a cert + terminate TLS + parse HTTP + run 16 addons on every flow**, then
|
||||
most addons bail. Heavy asset/video/CDN flows (e.g. YouTube `googlevideo`)
|
||||
dominate that CPU for **zero privacy value** — there's nothing to inspect or
|
||||
rewrite in image/video/audio bytes. (#646 measured the ceiling.)
|
||||
|
||||
## Goal
|
||||
Run the expensive L7 path only on flows we'd actually inspect/modify. **Splice**
|
||||
(raw TCP passthrough, no forge/TLS/parse/addons) the pure-asset flows, decided at
|
||||
the TLS ClientHello from the **SNI** alone (the only thing known pre-decrypt).
|
||||
|
||||
Non-goals: removing the MITM (outbound HTTPS interception intrinsically needs
|
||||
per-host cert forging — see issue); WAF; the Go/Rust rewrite.
|
||||
|
||||
## Mechanism
|
||||
A new addon `mitmproxy_addons/tls_splice.py`, registered **FIRST** in
|
||||
`sbin/secubox-toolbox-mitm-wg-launch` (before `inject_xff`), implements:
|
||||
|
||||
```python
|
||||
def tls_clienthello(self, data):
|
||||
mode = _filters().get("tls_splice", "observe") # off | observe | on
|
||||
if mode == "off":
|
||||
return
|
||||
sni = (data.client_hello.sni or "").lower()
|
||||
if not sni: # no SNI → never splice blind
|
||||
return
|
||||
if splice.should_splice(sni, self._seed, self._learned, self._never):
|
||||
if mode == "on":
|
||||
data.ignore_connection = True # SPLICE: raw passthrough
|
||||
_bump("spliced")
|
||||
else: # observe: classify + log, still MITM
|
||||
_bump("would_splice"); log.info("would-splice %s", sni)
|
||||
```
|
||||
|
||||
`data.ignore_connection = True` is mitmproxy's documented splice (no TLS
|
||||
interception). `tls_clienthello` / `data.client_hello.sni` are already used by
|
||||
`ja4.py` and `local_store.py`, so the API is present in our mitmproxy 11.
|
||||
|
||||
The same addon also records a lightweight **learning observation** on the response
|
||||
hook of MITM'd flows (see Learning), so the learned-splice set can grow. (Spliced
|
||||
flows produce no response hook — once a host is promoted, its observation freezes;
|
||||
acceptable, the seed is media-only and the toggle is a kill-switch.)
|
||||
|
||||
## Classifier — `secubox_toolbox/splice.py` (pure, testable)
|
||||
```python
|
||||
def load_splice_seed(path) -> set[str] # suffix patterns from conf (+ comments stripped)
|
||||
def load_learned_splice(path) -> set[str] # learned hostnames (autolearn output)
|
||||
def host_matches(host, patterns) -> bool # host == p or host endswith "."+p
|
||||
def should_splice(sni, seed, learned, never) -> bool:
|
||||
# never wins (defensive): trackers we block/poison, fortknox sites
|
||||
if host_matches(sni, never): return False
|
||||
return host_matches(sni, seed) or host_matches(sni, learned)
|
||||
```
|
||||
- `never` = pure-trackers (`pure-trackers.txt`, already maintained by Anti-Track
|
||||
2a) ∪ `fortknox_sites` (from filters). Even a CDN-fronted tracker stays MITM'd.
|
||||
- Suffix match so `r1---sn-x.googlevideo.com` matches seed `googlevideo.com`.
|
||||
- The seed/learned/never sets are loaded once per worker and **mtime-refreshed**
|
||||
(mirror `_common._wg_hash_of`'s cache pattern) so autolearn updates land without
|
||||
a restart, but per-connection lookups stay O(1) set hits.
|
||||
|
||||
## Seed — `conf/tls-splice-seed.conf`
|
||||
Curated, **media/asset-specific only** (NOT generic CDN edges like cloudfront/
|
||||
fastly/akamai-edge, which also serve HTML apps — splicing those would blind us to
|
||||
real pages). v1 set:
|
||||
```
|
||||
googlevideo.com # YouTube video (the single biggest hog)
|
||||
ytimg.com # YT thumbnails
|
||||
gstatic.com # Google static assets
|
||||
ggpht.com # Google user content
|
||||
fbcdn.net # Facebook/IG media
|
||||
cdninstagram.com
|
||||
twimg.com # Twitter/X media
|
||||
licdn.com # LinkedIn media
|
||||
sndcdn.com # SoundCloud audio
|
||||
scdn.co # Spotify audio
|
||||
mzstatic.com # Apple media
|
||||
```
|
||||
Operator can extend via an operator splice file (same 3-way merge idea as the
|
||||
bypass lists), but v1 ships only the seed + learned.
|
||||
|
||||
## Learning — never-HTML promotion
|
||||
New table (SQLite, WAL already on):
|
||||
```sql
|
||||
CREATE TABLE IF NOT EXISTS splice_host_obs (
|
||||
host TEXT PRIMARY KEY, hits INTEGER NOT NULL DEFAULT 0,
|
||||
html_hits INTEGER NOT NULL DEFAULT 0, last_seen REAL
|
||||
);
|
||||
```
|
||||
- `tls_splice.py` response hook (MITM'd flows only) upserts: `hits += 1`,
|
||||
`html_hits += 1` if `Content-Type` contains `text/html`. **Sampling cap:** stop
|
||||
counting once `hits >= 50` per host (bounds write amplification; 50 is enough
|
||||
signal). Cheap: one upsert, no body read.
|
||||
- `sbin/secubox-toolbox-autolearn` gains `_splice_feed()`: promote hosts with
|
||||
`hits >= 20 AND html_hits == 0` (never served HTML over ≥20 observations) to
|
||||
`/var/lib/secubox/toolbox/splice-learned.txt` (atomic write, `os.replace`).
|
||||
Gated on `tls_splice != "off"`. Registrable-folded, deduped, capped (e.g. 2000).
|
||||
- Demotion: not automatic (spliced hosts stop being observed). The media-only seed
|
||||
+ the never-set + the kill-switch toggle bound the risk; a host that wrongly got
|
||||
spliced is removed by clearing the learned file or toggling off.
|
||||
|
||||
## Config — `filters.json`
|
||||
Add `tls_splice` ∈ `{off, observe, on}`, **default `observe`** (dark-launch:
|
||||
classify + log would-splice, but still MITM — zero behavior change until flipped).
|
||||
- `filters.py`: add `"tls_splice": "observe"` to `DEFAULTS`; add
|
||||
`_VALID_SPLICE = {"off","observe","on"}` and validate (mirror `protective`).
|
||||
- `set_filters`: accept `tls_splice` only if in `_VALID_SPLICE`.
|
||||
|
||||
## Counters / observability
|
||||
`tls_splice.py` flushes `/run/secubox/splice.json`
|
||||
(`{spliced, would_splice, mitm, since, updated}`) every ~5 s (mirror
|
||||
`ad_ghost._flush`). Optional future UI tile; not required for v1.
|
||||
|
||||
## Tradeoff (explicit)
|
||||
Spliced flows are invisible to DPI / media-stats / social-graph / media-cache.
|
||||
Acceptable for pure asset CDNs (no privacy signal in media bytes; assets aren't
|
||||
HTML so no banner/ad-ghost lost). **media_cache interaction:** when
|
||||
`media_cache` is enabled, do NOT splice (media_cache needs to see those flows) —
|
||||
`should_splice` returns False if `filters.media_cache` is true. (v1: media_cache
|
||||
defaults off, so this is a guard for the opt-in case.)
|
||||
|
||||
## Safety / rollout
|
||||
1. Ships `tls_splice=observe` (dark). Soak, review `/run/secubox/splice.json` +
|
||||
"would-splice" logs against real traffic, confirm no first-party/HTML host is
|
||||
classified, THEN flip to `on`.
|
||||
2. No SNI → MITM. `never` set wins. media_cache-on → MITM.
|
||||
3. Kill-switch: `tls_splice=off` reverts to today's behavior instantly (filters
|
||||
hot-reload, 5 s cache).
|
||||
4. Deploy = rolling sequential restart of the 4 `mitm-wg-worker@` (3/4 capacity
|
||||
during the roll), no mass restart.
|
||||
|
||||
## Tests
|
||||
- `splice.py`: `host_matches` suffix logic (exact, subdomain, non-match, no false
|
||||
prefix match e.g. `notgooglevideo.com`); `should_splice` (seed hit, learned hit,
|
||||
never wins over seed, no-SNI→False, empty sets→False).
|
||||
- filters: `tls_splice` validates {off,observe,on}, bad value → default; round-trips
|
||||
via set_filters.
|
||||
- learning: `_splice_feed` promotes `hits>=20 & html_hits==0`, excludes
|
||||
`html_hits>0` and `hits<20` (monkeypatch DB rows).
|
||||
- addon: `tls_clienthello` sets `ignore_connection` only when mode==on AND
|
||||
should_splice; observe mode never sets it; off mode returns early. (Fake
|
||||
ClientHelloData with `.client_hello.sni`.)
|
||||
|
||||
## Files
|
||||
- Create `secubox_toolbox/splice.py`, `mitmproxy_addons/tls_splice.py`,
|
||||
`conf/tls-splice-seed.conf`, tests.
|
||||
- Modify `secubox_toolbox/filters.py` (toggle), `sbin/secubox-toolbox-mitm-wg-launch`
|
||||
(register addon first + ship seed path), `sbin/secubox-toolbox-autolearn`
|
||||
(`_splice_feed`), `secubox_toolbox/store.py` or `social.py` (obs table),
|
||||
`debian/rules` (install seed conf), `debian/changelog`.
|
||||
15
packages/secubox-toolbox/conf/tls-splice-seed.conf
Normal file
15
packages/secubox-toolbox/conf/tls-splice-seed.conf
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
# SecuBox toolbox :: SNI-splice seed (#649)
|
||||
# MEDIA/ASSET-SPECIFIC hosts only — NEVER generic CDN edges (cloudfront/fastly/
|
||||
# akamai-edge) which also serve HTML apps; splicing those would blind the MITM
|
||||
# to real pages. Suffix-matched (subdomains included). One host suffix per line.
|
||||
googlevideo.com # YouTube video streams (largest single hog)
|
||||
ytimg.com # YouTube thumbnails
|
||||
gstatic.com # Google static assets
|
||||
ggpht.com # Google user content / avatars
|
||||
fbcdn.net # Facebook / Instagram media
|
||||
cdninstagram.com # Instagram media
|
||||
twimg.com # Twitter / X media
|
||||
licdn.com # LinkedIn media
|
||||
sndcdn.com # SoundCloud audio
|
||||
scdn.co # Spotify audio
|
||||
mzstatic.com # Apple media / artwork
|
||||
|
|
@ -1,3 +1,17 @@
|
|||
secubox-toolbox (2.6.54-1~bookworm1) bookworm; urgency=medium
|
||||
|
||||
* feat(#649): selective SNI-splice (Lever A). New tls_splice addon (first in
|
||||
the mitm-wg chain) splices pure-asset flows at the TLS ClientHello — curated
|
||||
media seed (googlevideo/ytimg/fbcdn/twimg/scdn…) ∪ autolearn-promoted
|
||||
never-HTML hosts — so the GIL-bound R3 workers skip forge/decrypt/parse/16-
|
||||
addons on flows with no L7 value. Ships tls_splice=observe (DARK: classify +
|
||||
log would-splice, still MITM); flip to `on` after soak; `off` kill-switch.
|
||||
Never splices trackers (pure-trackers)/fortknox/no-SNI/media_cache-on.
|
||||
Learning obs recorded off the event loop (bg thread), only for undecided
|
||||
hosts. New splice_host_obs table; autolearn _splice_feed promotion.
|
||||
|
||||
-- Gerald KERMA <devel@cybermind.fr> Thu, 18 Jun 2026 14:30:00 +0200
|
||||
|
||||
secubox-toolbox (2.6.53-1~bookworm1) bookworm; urgency=medium
|
||||
|
||||
* perf(#646): adaptive Accept-Encoding strip in inject_banner. Keep gzip/br by
|
||||
|
|
|
|||
148
packages/secubox-toolbox/mitmproxy_addons/tls_splice.py
Normal file
148
packages/secubox-toolbox/mitmproxy_addons/tls_splice.py
Normal file
|
|
@ -0,0 +1,148 @@
|
|||
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
|
||||
# Copyright (c) 2026 CyberMind — Gérald Kerma <devel@cybermind.fr>
|
||||
"""SecuBox-Deb :: toolbox :: selective SNI-splice (#649, Lever A).
|
||||
|
||||
At the TLS ClientHello, splice (raw passthrough, no forge/decrypt/parse/addons)
|
||||
pure-asset flows decided from the SNI. Modes (filters.tls_splice):
|
||||
off — never splice (legacy: MITM everything)
|
||||
observe — classify + log/count "would-splice", but still MITM (dark-launch)
|
||||
on — actually splice
|
||||
Also records per-host content-type observations (MITM'd flows) to feed the
|
||||
autolearn never-HTML promotion. Registered FIRST in the mitm-wg addon chain.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import concurrent.futures as _futures
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
if "/usr/lib/secubox/toolbox" not in sys.path:
|
||||
sys.path.insert(0, "/usr/lib/secubox/toolbox")
|
||||
|
||||
from secubox_toolbox import splice as _splice # noqa: E402
|
||||
from secubox_toolbox.filters import get_filters as _gf, FILTERS_PATH as _FILTERS_PATH # noqa: E402
|
||||
try:
|
||||
from secubox_toolbox import store as _store # noqa: E402
|
||||
except Exception: # pragma: no cover
|
||||
_store = None
|
||||
|
||||
log = logging.getLogger("secubox.toolbox.addons")
|
||||
|
||||
SEED_PATH = os.environ.get("SECUBOX_SPLICE_SEED",
|
||||
"/usr/lib/secubox/toolbox/conf/tls-splice-seed.conf")
|
||||
LEARNED_PATH = os.environ.get("SECUBOX_SPLICE_LEARNED",
|
||||
"/var/lib/secubox/toolbox/splice-learned.txt")
|
||||
PURE_PATH = os.environ.get("SECUBOX_PURE_TRACKERS",
|
||||
"/var/lib/secubox/toolbox/pure-trackers.txt")
|
||||
STATS = "/run/secubox/splice.json"
|
||||
|
||||
_counts = {"spliced": 0, "would_splice": 0, "mitm": 0, "since": int(time.time())}
|
||||
_last_flush = 0.0
|
||||
|
||||
# Learning observations are written off the proxy event loop (mirror
|
||||
# local_store): the response hook must return instantly. Single worker thread
|
||||
# serialises writes to the shared SQLite.
|
||||
_obs_executor = _futures.ThreadPoolExecutor(
|
||||
max_workers=1, thread_name_prefix="sbx_splice_obs")
|
||||
|
||||
|
||||
class TlsSplice:
|
||||
def __init__(self) -> None:
|
||||
self._seed: set = set()
|
||||
self._learned: set = set()
|
||||
self._never: set = set()
|
||||
self._mtimes: tuple = ()
|
||||
self._refresh_sets()
|
||||
|
||||
def _refresh_sets(self) -> None:
|
||||
"""Reload seed/learned/never sets when any backing file changes.
|
||||
|
||||
Includes FILTERS_PATH so a fortknox site added via the WebUI lands in
|
||||
the never-set promptly (else a newly-protected site could still be
|
||||
spliced once in `on` mode until another file's mtime moved)."""
|
||||
try:
|
||||
mtimes = tuple(
|
||||
os.stat(p).st_mtime if os.path.exists(p) else 0.0
|
||||
for p in (SEED_PATH, LEARNED_PATH, PURE_PATH, _FILTERS_PATH))
|
||||
except Exception:
|
||||
mtimes = ()
|
||||
if mtimes == self._mtimes and self._seed:
|
||||
return
|
||||
self._seed = _splice.load_splice_seed(SEED_PATH)
|
||||
self._learned = _splice.load_learned_splice(LEARNED_PATH)
|
||||
never = _splice.load_learned_splice(PURE_PATH) # pure trackers
|
||||
try:
|
||||
for s in _gf().get("fortknox_sites", []) or []:
|
||||
never.add(str(s).lower().strip("."))
|
||||
except Exception:
|
||||
pass
|
||||
self._never = never
|
||||
self._mtimes = mtimes
|
||||
|
||||
def tls_clienthello(self, data) -> None:
|
||||
try:
|
||||
mode = _gf().get("tls_splice", "observe")
|
||||
if mode == "off":
|
||||
return
|
||||
# media_cache wants to see asset flows → don't splice when it's on
|
||||
if _gf().get("media_cache"):
|
||||
return
|
||||
sni = getattr(data.client_hello, "sni", None)
|
||||
if not sni:
|
||||
return
|
||||
self._refresh_sets()
|
||||
if not _splice.should_splice(sni, self._seed, self._learned, self._never):
|
||||
return
|
||||
if mode == "on":
|
||||
data.ignore_connection = True
|
||||
_counts["spliced"] += 1
|
||||
else: # observe
|
||||
_counts["would_splice"] += 1
|
||||
log.info("tls-splice would-splice %s", sni)
|
||||
self._flush()
|
||||
except Exception as e: # never break a connection
|
||||
log.debug("tls_splice clienthello error: %s", e)
|
||||
|
||||
def response(self, flow) -> None:
|
||||
"""Record host content-type on MITM'd flows (learning signal).
|
||||
|
||||
Off the event loop (bg thread) so the hook returns instantly. Skips
|
||||
hosts already decided (seed/learned/never) — they need no more signal —
|
||||
so the DB is touched only for the still-unclassified long tail.
|
||||
"""
|
||||
if _store is None:
|
||||
return
|
||||
try:
|
||||
if _gf().get("tls_splice", "observe") == "off":
|
||||
return
|
||||
host = (flow.request.pretty_host or "").lower().strip(".")
|
||||
if not host:
|
||||
return
|
||||
# Already-decided hosts gain nothing from more observations.
|
||||
if (_splice.host_matches(host, self._seed)
|
||||
or _splice.host_matches(host, self._learned)
|
||||
or _splice.host_matches(host, self._never)):
|
||||
return
|
||||
ct = (flow.response.headers.get("content-type", "") or "").lower()
|
||||
_obs_executor.submit(_store.record_splice_obs, host, "text/html" in ct)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _flush(self) -> None:
|
||||
global _last_flush
|
||||
now = time.time()
|
||||
if (now - _last_flush) < 5:
|
||||
return
|
||||
_last_flush = now
|
||||
try:
|
||||
os.makedirs(os.path.dirname(STATS), exist_ok=True)
|
||||
with open(STATS, "w", encoding="utf-8") as f:
|
||||
json.dump({**_counts, "updated": int(now)}, f)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
addons = [TlsSplice()]
|
||||
|
|
@ -25,6 +25,11 @@ OUT = os.environ.get("SECUBOX_AUTOLEARN_OUT",
|
|||
"/var/lib/secubox/toolbox/learned-trackers.txt")
|
||||
PURE_OUT = os.environ.get("SECUBOX_AUTOLEARN_PURE_OUT",
|
||||
"/var/lib/secubox/toolbox/pure-trackers.txt")
|
||||
SPLICE_LEARNED_OUT = os.environ.get(
|
||||
"SECUBOX_SPLICE_LEARNED_OUT",
|
||||
"/var/lib/secubox/toolbox/splice-learned.txt")
|
||||
SPLICE_MIN_HITS = int(os.environ.get("SECUBOX_SPLICE_MIN_HITS", "20"))
|
||||
SPLICE_MAX = 2000
|
||||
MIN_SITES = 2 # cross-site threshold for operator-grade trackers
|
||||
MAX_ENTRIES = 8000
|
||||
COOKIE_XSITE_TOP_N = int(os.environ.get("SECUBOX_COOKIE_XSITE_TOP_N", "5"))
|
||||
|
|
@ -94,6 +99,38 @@ def _dns_feed(pure_hosts) -> int:
|
|||
return sum(1 for l in lines if "local-zone:" in l)
|
||||
|
||||
|
||||
def _splice_feed() -> int:
|
||||
"""Promote hosts that NEVER served text/html over >= SPLICE_MIN_HITS
|
||||
observations into the learned-splice file (registrable-folded, capped).
|
||||
Gated: skip when tls_splice == 'off'. Returns count written, or -1 if gated."""
|
||||
try:
|
||||
from secubox_toolbox.filters import get_filters
|
||||
if get_filters().get("tls_splice", "observe") == "off":
|
||||
return -1
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
con = sqlite3.connect(DB, timeout=5)
|
||||
rows = con.execute(
|
||||
"SELECT host FROM splice_host_obs WHERE hits >= ? AND html_hits = 0",
|
||||
(SPLICE_MIN_HITS,)).fetchall()
|
||||
con.close()
|
||||
except Exception as e:
|
||||
sys.stderr.write(f"autolearn: splice query failed: {e}\n")
|
||||
return -1
|
||||
hosts = sorted({(r[0] or "").lower().strip(".") for r in rows if r[0]})[:SPLICE_MAX]
|
||||
try:
|
||||
os.makedirs(os.path.dirname(SPLICE_LEARNED_OUT), exist_ok=True)
|
||||
tmp = SPLICE_LEARNED_OUT + ".tmp"
|
||||
with open(tmp, "w", encoding="utf-8") as fh:
|
||||
fh.write("\n".join(hosts) + ("\n" if hosts else ""))
|
||||
os.replace(tmp, SPLICE_LEARNED_OUT)
|
||||
except Exception as e:
|
||||
sys.stderr.write(f"autolearn: splice write failed: {e}\n")
|
||||
return -1
|
||||
return len(hosts)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
learned: set[str] = set()
|
||||
try:
|
||||
|
|
@ -180,6 +217,11 @@ def main() -> int:
|
|||
except Exception as e:
|
||||
sys.stderr.write(f"autolearn: pure write failed: {e}\n")
|
||||
dns_zones = _dns_feed(pout)
|
||||
try:
|
||||
_n_splice = _splice_feed()
|
||||
sys.stderr.write(f"autolearn: {_n_splice} splice hosts learned\n")
|
||||
except Exception as e:
|
||||
sys.stderr.write(f"autolearn: splice feed error: {e}\n")
|
||||
sys.stderr.write(
|
||||
f"autolearn: {len(out)} hosts learned ({ti} threat-intel + "
|
||||
f"{len(out) - ti} classified cross-site) @ {int(time.time())}"
|
||||
|
|
|
|||
|
|
@ -95,8 +95,12 @@ if [ -n "$IGNORE_REGEX" ]; then
|
|||
fi
|
||||
|
||||
# Addons :
|
||||
# - inject_xff (Phase 7 #498) MUST be FIRST — sets X-Forwarded-For at
|
||||
# requestheaders so other addons and the upstream see the real peer IP
|
||||
# - tls_splice (#649) runs at tls_clienthello (BEFORE any requestheaders
|
||||
# addon) — it splices pure-asset flows so the rest of the chain never even
|
||||
# sees them. Listed first for clarity; its hook phase makes ordering vs the
|
||||
# requestheaders addons irrelevant, so inject_xff stays first-at-requestheaders.
|
||||
# - inject_xff (Phase 7 #498) MUST be FIRST among requestheaders addons — sets
|
||||
# X-Forwarded-For at requestheaders so other addons and the upstream see the real peer IP
|
||||
# - utiq_defense (Phase 8 #500) runs at requestheaders too ; placed
|
||||
# EARLY so a R1 block short-circuits the flow before downstream
|
||||
# addons spend cycles on it
|
||||
|
|
@ -111,7 +115,7 @@ fi
|
|||
# ad_ghost (#566) runs right after protective_mode: for R3+/R4 it 204s known
|
||||
# ad/tracker hosts (bandwidth save) at request time and injects ad-hiding CSS
|
||||
# on HTML responses. Gated by the modular filter config (toolbox WebUI).
|
||||
for addon in inject_xff utiq_defense protective_mode privacy_guard ad_ghost media_cache local_store social_graph inject_banner dpi cookies avatar ja4 soc_relay cert_pin_detect media_stats; do
|
||||
for addon in tls_splice inject_xff utiq_defense protective_mode privacy_guard ad_ghost media_cache local_store social_graph inject_banner dpi cookies avatar ja4 soc_relay cert_pin_detect media_stats; do
|
||||
ARGS+=(-s "$ADDON_DIR/${addon}.py")
|
||||
done
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ DEFAULTS: Dict = {
|
|||
"media_cache": False, # #577 shared media proxy-cache (opt-in)
|
||||
"stream_inject": True, # #620/#630 stream loader inject (TTFB) — default on
|
||||
"autolearn": True, # #589 also block auto-learned bad hosts
|
||||
"tls_splice": "observe", # #649 off | observe | on (asset SNI-splice)
|
||||
# ── Anti-Track v2 (#633) — ships dark; arm after observe-only soak ──
|
||||
"privacy_enforce": False, # master switch; off = observe-only
|
||||
"privacy_poison": True, # forge stable fake id for loadbearing trackers
|
||||
|
|
@ -41,6 +42,7 @@ DEFAULTS: Dict = {
|
|||
}
|
||||
|
||||
_VALID_PROTECTIVE = ("off", "alert", "spoof")
|
||||
_VALID_SPLICE = ("off", "observe", "on")
|
||||
|
||||
_cache: Dict = {}
|
||||
_cache_ts: float = 0.0
|
||||
|
|
@ -66,6 +68,8 @@ def get_filters(force: bool = False) -> Dict:
|
|||
pass
|
||||
if out.get("protective") not in _VALID_PROTECTIVE:
|
||||
out["protective"] = DEFAULTS["protective"]
|
||||
if out.get("tls_splice") not in _VALID_SPLICE:
|
||||
out["tls_splice"] = DEFAULTS["tls_splice"]
|
||||
_cache = out
|
||||
_cache_ts = now
|
||||
return out
|
||||
|
|
@ -83,6 +87,8 @@ def set_filters(patch: Dict) -> Dict:
|
|||
if ck in DEFAULTS["ad_ghost_categories"]})
|
||||
elif k == "protective" and v in _VALID_PROTECTIVE:
|
||||
cur["protective"] = v
|
||||
elif k == "tls_splice" and v in _VALID_SPLICE:
|
||||
cur["tls_splice"] = v
|
||||
elif k == "fortknox_sites" and isinstance(v, list):
|
||||
cur["fortknox_sites"] = [str(s).strip().lower() for s in v if str(s).strip()]
|
||||
elif k in ("banner", "ad_ghost", "ad_ghost_block", "media_cache", "autolearn",
|
||||
|
|
|
|||
56
packages/secubox-toolbox/secubox_toolbox/splice.py
Normal file
56
packages/secubox-toolbox/secubox_toolbox/splice.py
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
|
||||
# Copyright (c) 2026 CyberMind — Gérald Kerma <devel@cybermind.fr>
|
||||
"""SecuBox-Deb :: toolbox :: SNI-splice classifier (#649).
|
||||
|
||||
Pure helpers deciding, from the TLS SNI alone, whether a flow is a pure-asset
|
||||
flow we can splice (raw passthrough, no MITM). Seed ∪ learned, minus a never-set
|
||||
(trackers we block/poison, fortknox sites). Suffix match so CDN shards match.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from typing import Set
|
||||
|
||||
|
||||
def _load_lines(path: str) -> Set[str]:
|
||||
out: Set[str] = set()
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
for raw in f:
|
||||
line = raw.split("#", 1)[0].strip().lower()
|
||||
if line:
|
||||
out.add(line)
|
||||
except Exception:
|
||||
pass
|
||||
return out
|
||||
|
||||
|
||||
def load_splice_seed(path: str) -> Set[str]:
|
||||
return _load_lines(path)
|
||||
|
||||
|
||||
def load_learned_splice(path: str) -> Set[str]:
|
||||
return _load_lines(path)
|
||||
|
||||
|
||||
def host_matches(host: str, patterns: Set[str]) -> bool:
|
||||
"""True if host == pattern or host is a subdomain of pattern."""
|
||||
h = (host or "").lower().strip(".")
|
||||
if not h or not patterns:
|
||||
return False
|
||||
if h in patterns:
|
||||
return True
|
||||
for p in patterns:
|
||||
if h.endswith("." + p):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def should_splice(sni: str, seed: Set[str], learned: Set[str],
|
||||
never: Set[str]) -> bool:
|
||||
s = (sni or "").lower().strip(".")
|
||||
if not s:
|
||||
return False
|
||||
if host_matches(s, never): # never wins (trackers / fortknox)
|
||||
return False
|
||||
return host_matches(s, seed) or host_matches(s, learned)
|
||||
|
|
@ -14,6 +14,12 @@ log = logging.getLogger("secubox.toolbox")
|
|||
DB_PATH = Path("/var/lib/secubox/toolbox/toolbox.db")
|
||||
|
||||
SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS splice_host_obs (
|
||||
host TEXT PRIMARY KEY,
|
||||
hits INTEGER NOT NULL DEFAULT 0,
|
||||
html_hits INTEGER NOT NULL DEFAULT 0,
|
||||
last_seen REAL
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS consents (
|
||||
mac_hash TEXT PRIMARY KEY,
|
||||
ts INTEGER NOT NULL,
|
||||
|
|
@ -56,6 +62,45 @@ def _conn() -> sqlite3.Connection:
|
|||
return c
|
||||
|
||||
|
||||
_SPLICE_OBS_CAP = 50 # stop counting once we have enough signal per host
|
||||
|
||||
|
||||
def record_splice_obs(host: str, is_html: bool) -> None:
|
||||
"""Observe a MITM'd flow's host + whether it served text/html. Sampling-capped
|
||||
so writes stay bounded. Best-effort (never raises into the proxy path)."""
|
||||
h = (host or "").lower().strip(".")
|
||||
if not h:
|
||||
return
|
||||
try:
|
||||
with _conn() as c:
|
||||
c.execute(
|
||||
"INSERT INTO splice_host_obs(host, hits, html_hits, last_seen) "
|
||||
"VALUES(?, 1, ?, ?) "
|
||||
"ON CONFLICT(host) DO UPDATE SET "
|
||||
" hits = MIN(hits + 1, ?), "
|
||||
" html_hits = html_hits + ?, "
|
||||
" last_seen = ? "
|
||||
"WHERE splice_host_obs.hits < ?",
|
||||
(h, 1 if is_html else 0, time.time(),
|
||||
_SPLICE_OBS_CAP, 1 if is_html else 0, time.time(), _SPLICE_OBS_CAP),
|
||||
)
|
||||
except Exception as e:
|
||||
log.debug("splice obs failed: %s", e)
|
||||
|
||||
|
||||
def never_html_hosts(min_hits: int = 20) -> list[str]:
|
||||
"""Hosts observed >= min_hits times that NEVER served text/html."""
|
||||
try:
|
||||
with _conn() as c:
|
||||
rows = c.execute(
|
||||
"SELECT host FROM splice_host_obs WHERE hits >= ? AND html_hits = 0",
|
||||
(min_hits,),
|
||||
).fetchall()
|
||||
return [r[0] for r in rows]
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def record_consent(mac_hash: str, ip: str, ua: str | None, ttl_seconds: int) -> None:
|
||||
with _conn() as c:
|
||||
c.execute(
|
||||
|
|
|
|||
32
packages/secubox-toolbox/tests/test_autolearn_splice.py
Normal file
32
packages/secubox-toolbox/tests/test_autolearn_splice.py
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
# tests/test_autolearn_splice.py
|
||||
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
|
||||
import os, sqlite3, importlib.util, pathlib
|
||||
|
||||
|
||||
def _load_autolearn():
|
||||
p = pathlib.Path(__file__).resolve().parents[1] / "sbin" / "secubox-toolbox-autolearn"
|
||||
spec = importlib.util.spec_from_loader("autolearn", loader=None)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
exec(compile(p.read_text(), str(p), "exec"), mod.__dict__)
|
||||
return mod
|
||||
|
||||
|
||||
def test_splice_feed_promotes_never_html(tmp_path, monkeypatch):
|
||||
db = tmp_path / "t.db"
|
||||
con = sqlite3.connect(db)
|
||||
con.executescript(
|
||||
"CREATE TABLE splice_host_obs(host TEXT PRIMARY KEY, hits INT, html_hits INT, last_seen REAL);"
|
||||
"INSERT INTO splice_host_obs VALUES('cdn.assets.net',25,0,0);"
|
||||
"INSERT INTO splice_host_obs VALUES('html.site.com',25,3,0);"
|
||||
"INSERT INTO splice_host_obs VALUES('low.hits.net',5,0,0);")
|
||||
con.commit(); con.close()
|
||||
out = tmp_path / "splice-learned.txt"
|
||||
monkeypatch.setenv("SECUBOX_AUTOLEARN_DB", str(db))
|
||||
monkeypatch.setenv("SECUBOX_SPLICE_LEARNED_OUT", str(out))
|
||||
al = _load_autolearn()
|
||||
n = al._splice_feed()
|
||||
learned = set(out.read_text().split())
|
||||
assert "cdn.assets.net" in learned # never-HTML, >=20 hits
|
||||
assert "html.site.com" not in learned # served HTML
|
||||
assert "low.hits.net" not in learned # too few hits
|
||||
assert n == 1
|
||||
23
packages/secubox-toolbox/tests/test_filters_splice.py
Normal file
23
packages/secubox-toolbox/tests/test_filters_splice.py
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
# tests/test_filters_splice.py
|
||||
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
|
||||
import json
|
||||
from secubox_toolbox import filters
|
||||
|
||||
|
||||
def test_default_is_observe(monkeypatch, tmp_path):
|
||||
monkeypatch.setattr(filters, "FILTERS_PATH", str(tmp_path / "f.json"))
|
||||
assert filters.get_filters(force=True)["tls_splice"] == "observe"
|
||||
|
||||
|
||||
def test_bad_value_falls_back(monkeypatch, tmp_path):
|
||||
fp = tmp_path / "f.json"; fp.write_text(json.dumps({"tls_splice": "bogus"}))
|
||||
monkeypatch.setattr(filters, "FILTERS_PATH", str(fp))
|
||||
assert filters.get_filters(force=True)["tls_splice"] == "observe"
|
||||
|
||||
|
||||
def test_set_filters_accepts_valid(monkeypatch, tmp_path):
|
||||
monkeypatch.setattr(filters, "FILTERS_PATH", str(tmp_path / "f.json"))
|
||||
out = filters.set_filters({"tls_splice": "on"})
|
||||
assert out["tls_splice"] == "on"
|
||||
out = filters.set_filters({"tls_splice": "nope"})
|
||||
assert out["tls_splice"] == "on" # invalid ignored, prior kept
|
||||
35
packages/secubox-toolbox/tests/test_splice_classify.py
Normal file
35
packages/secubox-toolbox/tests/test_splice_classify.py
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
# tests/test_splice_classify.py
|
||||
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
|
||||
from secubox_toolbox import splice
|
||||
|
||||
|
||||
def test_host_matches_exact_and_subdomain():
|
||||
pats = {"googlevideo.com", "fbcdn.net"}
|
||||
assert splice.host_matches("googlevideo.com", pats)
|
||||
assert splice.host_matches("r1---sn-x.googlevideo.com", pats)
|
||||
assert not splice.host_matches("notgooglevideo.com", pats) # no false prefix
|
||||
assert not splice.host_matches("example.com", pats)
|
||||
|
||||
|
||||
def test_should_splice_seed_and_learned():
|
||||
seed = {"googlevideo.com"}; learned = {"cdn.example.net"}; never = set()
|
||||
assert splice.should_splice("x.googlevideo.com", seed, learned, never)
|
||||
assert splice.should_splice("cdn.example.net", seed, learned, never)
|
||||
assert not splice.should_splice("news.example.com", seed, learned, never)
|
||||
|
||||
|
||||
def test_never_wins():
|
||||
seed = {"evil-cdn.com"}; never = {"evil-cdn.com"}
|
||||
assert not splice.should_splice("evil-cdn.com", seed, set(), never)
|
||||
|
||||
|
||||
def test_empty_sni_or_sets():
|
||||
assert not splice.should_splice("", {"a.com"}, set(), set())
|
||||
assert not splice.should_splice("a.com", set(), set(), set())
|
||||
|
||||
|
||||
def test_load_seed_strips_comments(tmp_path):
|
||||
f = tmp_path / "seed.conf"
|
||||
f.write_text("# header\ngooglevideo.com # yt\n\n fbcdn.net\n")
|
||||
s = splice.load_splice_seed(str(f))
|
||||
assert s == {"googlevideo.com", "fbcdn.net"}
|
||||
31
packages/secubox-toolbox/tests/test_splice_obs.py
Normal file
31
packages/secubox-toolbox/tests/test_splice_obs.py
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
# tests/test_splice_obs.py
|
||||
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
|
||||
from pathlib import Path
|
||||
from secubox_toolbox import store
|
||||
|
||||
|
||||
def _fresh(tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(store, "DB_PATH", Path(tmp_path) / "t.db")
|
||||
|
||||
|
||||
def test_record_and_never_html(tmp_path, monkeypatch):
|
||||
_fresh(tmp_path, monkeypatch)
|
||||
for _ in range(20):
|
||||
store.record_splice_obs("cdn.assets.net", is_html=False)
|
||||
for _ in range(20):
|
||||
store.record_splice_obs("www.site.com", is_html=False)
|
||||
store.record_splice_obs("www.site.com", is_html=True) # served HTML once
|
||||
hosts = store.never_html_hosts(min_hits=20)
|
||||
assert "cdn.assets.net" in hosts
|
||||
assert "www.site.com" not in hosts # html_hits > 0 → excluded
|
||||
|
||||
|
||||
def test_sampling_cap(tmp_path, monkeypatch):
|
||||
_fresh(tmp_path, monkeypatch)
|
||||
for _ in range(100):
|
||||
store.record_splice_obs("x.net", is_html=False)
|
||||
# capped at 50 — never grows unbounded
|
||||
import sqlite3
|
||||
with store._conn() as c:
|
||||
hits = c.execute("SELECT hits FROM splice_host_obs WHERE host='x.net'").fetchone()[0]
|
||||
assert hits == 50
|
||||
109
packages/secubox-toolbox/tests/test_tls_splice_addon.py
Normal file
109
packages/secubox-toolbox/tests/test_tls_splice_addon.py
Normal file
|
|
@ -0,0 +1,109 @@
|
|||
# tests/test_tls_splice_addon.py
|
||||
# SPDX-License-Identifier: LicenseRef-CMSD-1.0
|
||||
import sys, pathlib, importlib, json, types
|
||||
ADDON_DIR = pathlib.Path(__file__).resolve().parents[1] / "mitmproxy_addons"
|
||||
sys.path.insert(0, str(ADDON_DIR))
|
||||
from secubox_toolbox import filters
|
||||
|
||||
|
||||
def _addon(monkeypatch, tmp_path, mode):
|
||||
fp = tmp_path / "f.json"; fp.write_text(json.dumps({"tls_splice": mode}))
|
||||
monkeypatch.setattr(filters, "FILTERS_PATH", str(fp)); filters.get_filters(force=True)
|
||||
import tls_splice; importlib.reload(tls_splice)
|
||||
a = tls_splice.TlsSplice()
|
||||
a._seed = {"googlevideo.com"}; a._learned = set(); a._never = set()
|
||||
monkeypatch.setattr(a, "_refresh_sets", lambda: None)
|
||||
return tls_splice, a
|
||||
|
||||
|
||||
def _ch(sni):
|
||||
d = types.SimpleNamespace()
|
||||
d.client_hello = types.SimpleNamespace(sni=sni)
|
||||
d.context = types.SimpleNamespace(client=types.SimpleNamespace(peername=("10.99.1.2", 5)))
|
||||
d.ignore_connection = False
|
||||
return d
|
||||
|
||||
|
||||
def test_on_splices_seed_host(monkeypatch, tmp_path):
|
||||
_, a = _addon(monkeypatch, tmp_path, "on")
|
||||
d = _ch("r1.googlevideo.com"); a.tls_clienthello(d)
|
||||
assert d.ignore_connection is True
|
||||
|
||||
|
||||
def test_observe_does_not_splice(monkeypatch, tmp_path):
|
||||
_, a = _addon(monkeypatch, tmp_path, "observe")
|
||||
d = _ch("r1.googlevideo.com"); a.tls_clienthello(d)
|
||||
assert d.ignore_connection is False
|
||||
|
||||
|
||||
def test_off_returns_early(monkeypatch, tmp_path):
|
||||
_, a = _addon(monkeypatch, tmp_path, "off")
|
||||
d = _ch("r1.googlevideo.com"); a.tls_clienthello(d)
|
||||
assert d.ignore_connection is False
|
||||
|
||||
|
||||
def test_non_seed_not_spliced(monkeypatch, tmp_path):
|
||||
_, a = _addon(monkeypatch, tmp_path, "on")
|
||||
d = _ch("news.example.com"); a.tls_clienthello(d)
|
||||
assert d.ignore_connection is False
|
||||
|
||||
|
||||
def test_no_sni_not_spliced(monkeypatch, tmp_path):
|
||||
_, a = _addon(monkeypatch, tmp_path, "on")
|
||||
d = _ch(None); a.tls_clienthello(d)
|
||||
assert d.ignore_connection is False
|
||||
|
||||
|
||||
def test_clienthello_exception_falls_through(monkeypatch, tmp_path):
|
||||
"""A broken ClientHelloData must NOT raise and must NOT splice (→ MITM)."""
|
||||
_, a = _addon(monkeypatch, tmp_path, "on")
|
||||
|
||||
class _Boom:
|
||||
@property
|
||||
def sni(self):
|
||||
raise RuntimeError("malformed client hello")
|
||||
|
||||
d = types.SimpleNamespace()
|
||||
d.client_hello = _Boom()
|
||||
d.ignore_connection = False
|
||||
a.tls_clienthello(d) # must not raise
|
||||
assert d.ignore_connection is False # fail-safe → MITM
|
||||
|
||||
|
||||
def test_response_records_undecided_host(monkeypatch, tmp_path):
|
||||
tls_splice, a = _addon(monkeypatch, tmp_path, "observe")
|
||||
calls = []
|
||||
monkeypatch.setattr(tls_splice, "_obs_executor",
|
||||
types.SimpleNamespace(submit=lambda fn, *args: calls.append(args)))
|
||||
monkeypatch.setattr(tls_splice, "_store", types.SimpleNamespace(record_splice_obs=lambda *a: None))
|
||||
f = types.SimpleNamespace(
|
||||
request=types.SimpleNamespace(pretty_host="news.example.com"),
|
||||
response=types.SimpleNamespace(headers={"content-type": "text/html; charset=utf-8"}))
|
||||
a.response(f)
|
||||
assert calls == [("news.example.com", True)] # undecided host, is_html=True
|
||||
|
||||
|
||||
def test_response_skips_decided_host(monkeypatch, tmp_path):
|
||||
tls_splice, a = _addon(monkeypatch, tmp_path, "observe")
|
||||
calls = []
|
||||
monkeypatch.setattr(tls_splice, "_obs_executor",
|
||||
types.SimpleNamespace(submit=lambda fn, *args: calls.append(args)))
|
||||
monkeypatch.setattr(tls_splice, "_store", types.SimpleNamespace(record_splice_obs=lambda *a: None))
|
||||
f = types.SimpleNamespace(
|
||||
request=types.SimpleNamespace(pretty_host="r1.googlevideo.com"), # in seed
|
||||
response=types.SimpleNamespace(headers={"content-type": "video/mp4"}))
|
||||
a.response(f)
|
||||
assert calls == [] # already-decided (seed) → no observation write
|
||||
|
||||
|
||||
def test_response_off_mode_skips(monkeypatch, tmp_path):
|
||||
tls_splice, a = _addon(monkeypatch, tmp_path, "off")
|
||||
calls = []
|
||||
monkeypatch.setattr(tls_splice, "_obs_executor",
|
||||
types.SimpleNamespace(submit=lambda fn, *args: calls.append(args)))
|
||||
monkeypatch.setattr(tls_splice, "_store", types.SimpleNamespace(record_splice_obs=lambda *a: None))
|
||||
f = types.SimpleNamespace(
|
||||
request=types.SimpleNamespace(pretty_host="news.example.com"),
|
||||
response=types.SimpleNamespace(headers={"content-type": "text/html"}))
|
||||
a.response(f)
|
||||
assert calls == [] # off → recorder disabled
|
||||
Loading…
Reference in New Issue
Block a user