diff --git a/.claude/HISTORY.md b/.claude/HISTORY.md index b2695836..bf022a49 100644 --- a/.claude/HISTORY.md +++ b/.claude/HISTORY.md @@ -5202,3 +5202,14 @@ git checkout HEAD -- index.html - UCI config: `/etc/config/dpi-dual` with dual/mitm-only/tap-only modes - Files: mirror-setup.sh, dpi-flow-collector, dpi-correlator, dpi-dualctl, init.d/dpi-dual, dpi_buffer.py + +- **Dual-Stream DPI Phase 2 - MITM Double Buffer + LuCI (Complete)** + - Enhanced mitmproxy addon `dpi_buffer.py`: + - Compiled regex for 6 threat categories (path_traversal, xss, sqli, lfi, rce, ssrf) + - Scanner detection, optional blocking, request replay queue + - New `luci-app-dpi-dual` package: + - RPCD handler with 10 methods (status, flows, buffer, threats, correlation, control) + - KISS dashboard with stream status cards, LED indicators, threats table + - Protocol distribution, manual IP correlation + - Streamlit control panel: Added DPI Dual card + diff --git a/.claude/WIP.md b/.claude/WIP.md index cbcc2c1f..93bc563a 100644 --- a/.claude/WIP.md +++ b/.claude/WIP.md @@ -581,35 +581,35 @@ _Last updated: 2026-03-15 (Wall Colorsets)_ ### 2026-03-15 -- **Dual-Stream DPI Architecture (Phase 1 Complete)** +- **Dual-Stream DPI Architecture (Phase 2 Complete)** - New `secubox-dpi-dual` package implementing parallel MITM + Passive TAP DPI - Architecture doc: `package/secubox/DUAL-STREAM-DPI.md` - - **TAP Stream (Passive)**: + - **Phase 1 - TAP Stream (Passive)**: - `mirror-setup.sh`: tc mirred port mirroring (ingress + egress) - - Creates dummy TAP interface for netifyd analysis - - Software and hardware TAP mode support - - **Flow Collector**: - - `dpi-flow-collector`: Aggregates netifyd flow statistics - - Writes stats to `/tmp/secubox/dpi-flows.json` - - Interface stats from /sys/class/net - - Configurable flow retention cleanup - - **Correlation Engine**: - - `dpi-correlator`: Matches MITM + TAP stream events - - Watches CrowdSec decisions and WAF alerts - - Enriches threats with context from both streams - - Output: `/tmp/secubox/correlated-threats.json` - - **CLI Tool**: - - `dpi-dualctl`: start/stop/restart/status/flows/threats/mirror - - Shows unified status of both streams - - **Procd Service**: - - `init.d/dpi-dual`: Manages flow-collector and correlator instances - - Auto-starts based on UCI mode setting (dual/mitm-only/tap-only) - - **MITM Double Buffer (Phase 2 prep)**: - - `dpi_buffer.py`: mitmproxy addon for async analysis - - Ring buffer with configurable size (1000 requests default) - - Heuristic threat scoring (path traversal, XSS, SQLi, LFI patterns) - - Writes threats to `/tmp/secubox/waf-alerts.json` - - **UCI Config**: `/etc/config/dpi-dual` with global, mitm, tap, correlation sections + - `dpi-flow-collector`: Aggregates netifyd stats → `/tmp/secubox/dpi-flows.json` + - `dpi-correlator`: Matches MITM + TAP events, CrowdSec integration + - `dpi-dualctl`: CLI start/stop/status/flows/threats/mirror + - `init.d/dpi-dual`: Procd service for flow-collector + correlator + - **Phase 2 - MITM Double Buffer + LuCI**: + - Enhanced `dpi_buffer.py` mitmproxy addon: + - Compiled regex patterns for 6 threat categories (path_traversal, xss, sqli, lfi, rce, ssrf) + - Scanner detection (sqlmap, nikto, nmap, etc.) + - Optional blocking mode for high-score threats + - Request replay queue for forensic analysis + - Rate limiting detection + - Stats: buffer entries, threat distribution, top hosts + - **LuCI Dashboard** (`luci-app-dpi-dual`): + - RPCD handler with 10 methods (status, flows, buffer, threats, correlation, start/stop/restart, replay, correlate) + - KISS-themed overview with stream status cards + - LED indicators for MITM/TAP/Correlation running state + - Metrics: buffer entries, threats, blocked, flows/min, RX/TX bytes + - Threats table with timestamp, IP, host, path, categories, score, blocked status + - Protocol distribution from netifyd + - Manual IP correlation trigger + - ACL permissions for read/write + - **Streamlit Control Panel** updated: + - DPI Dual card with flows/min, threats, blocked metrics + - Reads from dpi-buffer.json and dpi-flows.json caches --- diff --git a/package/secubox/luci-app-dpi-dual/Makefile b/package/secubox/luci-app-dpi-dual/Makefile new file mode 100644 index 00000000..9ac3f470 --- /dev/null +++ b/package/secubox/luci-app-dpi-dual/Makefile @@ -0,0 +1,30 @@ +include $(TOPDIR)/rules.mk + +LUCI_TITLE:=LuCI Dual-Stream DPI Dashboard +LUCI_DESCRIPTION:=Dashboard for MITM + Passive TAP deep packet inspection +LUCI_DEPENDS:=+luci-base +secubox-dpi-dual +LUCI_PKGARCH:=all + +PKG_NAME:=luci-app-dpi-dual +PKG_VERSION:=1.0.0 +PKG_RELEASE:=1 +PKG_MAINTAINER:=SecuBox +PKG_LICENSE:=GPL-3.0 + +include $(TOPDIR)/feeds/luci/luci.mk + +define Package/luci-app-dpi-dual/install + $(INSTALL_DIR) $(1)/usr/share/luci/menu.d + $(INSTALL_DATA) ./root/usr/share/luci/menu.d/luci-app-dpi-dual.json $(1)/usr/share/luci/menu.d/ + + $(INSTALL_DIR) $(1)/usr/share/rpcd/acl.d + $(INSTALL_DATA) ./root/usr/share/rpcd/acl.d/luci-app-dpi-dual.json $(1)/usr/share/rpcd/acl.d/ + + $(INSTALL_DIR) $(1)/usr/libexec/rpcd + $(INSTALL_BIN) ./root/usr/libexec/rpcd/luci.dpi-dual $(1)/usr/libexec/rpcd/ + + $(INSTALL_DIR) $(1)/www/luci-static/resources/view/dpi-dual + $(INSTALL_DATA) ./htdocs/luci-static/resources/view/dpi-dual/*.js $(1)/www/luci-static/resources/view/dpi-dual/ +endef + +$(eval $(call BuildPackage,luci-app-dpi-dual)) diff --git a/package/secubox/luci-app-dpi-dual/htdocs/luci-static/resources/view/dpi-dual/overview.js b/package/secubox/luci-app-dpi-dual/htdocs/luci-static/resources/view/dpi-dual/overview.js new file mode 100644 index 00000000..8594db66 --- /dev/null +++ b/package/secubox/luci-app-dpi-dual/htdocs/luci-static/resources/view/dpi-dual/overview.js @@ -0,0 +1,313 @@ +'use strict'; +'require view'; +'require dom'; +'require poll'; +'require rpc'; +'require ui'; + +var callStatus = rpc.declare({ + object: 'luci.dpi-dual', + method: 'status', + expect: {} +}); + +var callGetFlows = rpc.declare({ + object: 'luci.dpi-dual', + method: 'get_flows', + expect: {} +}); + +var callGetThreats = rpc.declare({ + object: 'luci.dpi-dual', + method: 'get_threats', + params: ['limit'], + expect: {} +}); + +var callGetCorrelation = rpc.declare({ + object: 'luci.dpi-dual', + method: 'get_correlation', + params: ['limit'], + expect: {} +}); + +var callStart = rpc.declare({ + object: 'luci.dpi-dual', + method: 'start', + expect: {} +}); + +var callStop = rpc.declare({ + object: 'luci.dpi-dual', + method: 'stop', + expect: {} +}); + +var callRestart = rpc.declare({ + object: 'luci.dpi-dual', + method: 'restart', + expect: {} +}); + +var callCorrelateIP = rpc.declare({ + object: 'luci.dpi-dual', + method: 'correlate_ip', + params: ['ip'], + expect: {} +}); + +function formatBytes(bytes) { + if (bytes === 0) return '0 B'; + var k = 1024; + var sizes = ['B', 'KB', 'MB', 'GB']; + var i = Math.floor(Math.log(bytes) / Math.log(k)); + return parseFloat((bytes / Math.pow(k, i)).toFixed(1)) + ' ' + sizes[i]; +} + +function formatTimestamp(ts) { + if (!ts) return '-'; + var d = new Date(ts); + return d.toLocaleTimeString(); +} + +function createStatusLED(running) { + var color = running ? '#00d4aa' : '#ff4d4d'; + var label = running ? 'RUNNING' : 'STOPPED'; + return E('span', { + 'style': 'display:inline-flex;align-items:center;gap:6px;' + }, [ + E('span', { + 'style': 'width:12px;height:12px;border-radius:50%;background:' + color + + ';box-shadow:0 0 8px ' + color + ';' + }), + E('span', { 'style': 'font-weight:600;color:' + color + ';' }, label) + ]); +} + +function createCard(title, icon, content, borderColor) { + return E('div', { + 'class': 'cbi-section', + 'style': 'background:#12121a;border-radius:12px;padding:1rem;margin:0.5rem 0;' + + 'border-left:4px solid ' + (borderColor || '#2a2a3a') + ';' + }, [ + E('div', { + 'style': 'display:flex;align-items:center;gap:8px;margin-bottom:0.8rem;' + }, [ + E('span', { 'style': 'font-size:1.3rem;' }, icon), + E('span', { 'style': 'font-size:1.1rem;font-weight:600;color:#fff;' }, title) + ]), + E('div', {}, content) + ]); +} + +function createMetric(label, value, color) { + return E('div', { + 'style': 'background:#1a1a24;padding:0.6rem 1rem;border-radius:8px;text-align:center;min-width:80px;' + }, [ + E('div', { + 'style': 'font-size:1.5rem;font-weight:700;color:' + (color || '#00d4aa') + ';font-family:monospace;' + }, String(value)), + E('div', { + 'style': 'font-size:0.7rem;color:#808090;text-transform:uppercase;margin-top:2px;' + }, label) + ]); +} + +function createThreatRow(threat) { + var scoreColor = threat.threat_score > 70 ? '#ff4d4d' : + threat.threat_score > 40 ? '#ffa500' : '#00d4aa'; + + return E('tr', {}, [ + E('td', { 'style': 'padding:8px;color:#808090;' }, formatTimestamp(threat.timestamp)), + E('td', { 'style': 'padding:8px;font-family:monospace;color:#00a0ff;' }, threat.client_ip || '-'), + E('td', { 'style': 'padding:8px;' }, threat.host || '-'), + E('td', { 'style': 'padding:8px;max-width:200px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap;' }, + threat.path || '-'), + E('td', { 'style': 'padding:8px;' }, (threat.categories || []).join(', ') || '-'), + E('td', { 'style': 'padding:8px;text-align:center;' }, + E('span', { + 'style': 'background:' + scoreColor + '22;color:' + scoreColor + + ';padding:2px 8px;border-radius:10px;font-weight:600;' + }, String(threat.threat_score || 0)) + ), + E('td', { 'style': 'padding:8px;text-align:center;' }, + threat.blocked ? + E('span', { 'style': 'color:#ff4d4d;' }, '🚫') : + E('span', { 'style': 'color:#808090;' }, '-') + ) + ]); +} + +return view.extend({ + load: function() { + return Promise.all([ + callStatus().catch(function() { return {}; }), + callGetFlows().catch(function() { return {}; }), + callGetThreats(20).catch(function() { return { alerts: [] }; }), + callGetCorrelation(10).catch(function() { return { correlated: [] }; }) + ]); + }, + + render: function(data) { + var status = data[0] || {}; + var flows = data[1] || {}; + var threats = data[2] || {}; + var correlation = data[3] || {}; + + var mitm = status.mitm_stream || {}; + var tap = status.tap_stream || {}; + var corr = status.correlation || {}; + + var view = E('div', { 'class': 'cbi-map', 'style': 'background:#0a0a12;min-height:100vh;' }, [ + // Header + E('div', { 'style': 'text-align:center;padding:1rem 0;' }, [ + E('h1', { + 'style': 'font-size:1.8rem;font-weight:700;background:linear-gradient(90deg,#00d4aa,#00a0ff);' + + '-webkit-background-clip:text;-webkit-text-fill-color:transparent;margin:0;' + }, 'DPI Dual-Stream'), + E('div', { 'style': 'color:#606070;margin-top:4px;' }, + 'Mode: ' + (status.mode || 'dual').toUpperCase()) + ]), + + // Action buttons + E('div', { 'style': 'display:flex;gap:8px;justify-content:center;margin-bottom:1rem;' }, [ + E('button', { + 'class': 'btn cbi-button cbi-button-apply', + 'click': ui.createHandlerFn(this, function() { + return callStart().then(function() { + ui.addNotification(null, E('p', 'DPI started'), 'info'); + window.location.reload(); + }); + }) + }, '▶ Start'), + E('button', { + 'class': 'btn cbi-button cbi-button-reset', + 'click': ui.createHandlerFn(this, function() { + return callStop().then(function() { + ui.addNotification(null, E('p', 'DPI stopped'), 'info'); + window.location.reload(); + }); + }) + }, '⏹ Stop'), + E('button', { + 'class': 'btn cbi-button', + 'click': ui.createHandlerFn(this, function() { + return callRestart().then(function() { + ui.addNotification(null, E('p', 'DPI restarted'), 'info'); + window.location.reload(); + }); + }) + }, '🔄 Restart') + ]), + + // Stream status cards + E('div', { 'style': 'display:grid;grid-template-columns:repeat(auto-fit,minmax(300px,1fr));gap:1rem;' }, [ + // MITM Stream Card + createCard('MITM Stream', '🔍', E('div', {}, [ + E('div', { 'style': 'margin-bottom:0.8rem;' }, createStatusLED(mitm.running)), + E('div', { 'style': 'display:flex;flex-wrap:wrap;gap:8px;' }, [ + createMetric('Buffer', mitm.buffer_entries || 0, '#00d4aa'), + createMetric('Threats', mitm.threats_detected || 0, '#ffa500'), + createMetric('Blocked', mitm.blocked_count || 0, '#ff4d4d') + ]) + ]), mitm.running ? '#00d4aa' : '#ff4d4d'), + + // TAP Stream Card + createCard('TAP Stream', '📡', E('div', {}, [ + E('div', { 'style': 'margin-bottom:0.8rem;' }, createStatusLED(tap.running)), + E('div', { 'style': 'display:flex;flex-wrap:wrap;gap:8px;' }, [ + createMetric('Interface', tap.interface || 'tap0', tap.interface_up ? '#00d4aa' : '#808090'), + createMetric('RX', formatBytes(tap.rx_bytes || 0), '#00a0ff'), + createMetric('TX', formatBytes(tap.tx_bytes || 0), '#00a0ff'), + createMetric('Flows/min', tap.flows_1min || 0, '#00d4aa') + ]) + ]), tap.running ? '#00d4aa' : '#ff4d4d'), + + // Correlation Card + createCard('Correlation Engine', '🔗', E('div', {}, [ + E('div', { 'style': 'margin-bottom:0.8rem;' }, createStatusLED(corr.running)), + E('div', { 'style': 'display:flex;flex-wrap:wrap;gap:8px;' }, [ + createMetric('Correlated', corr.threats_correlated || 0, '#ffa500') + ]), + E('div', { 'style': 'margin-top:0.8rem;' }, [ + E('input', { + 'type': 'text', + 'id': 'correlate-ip', + 'placeholder': 'IP to correlate...', + 'style': 'background:#1a1a24;border:1px solid #2a2a3a;border-radius:6px;' + + 'padding:6px 10px;color:#fff;width:140px;margin-right:8px;' + }), + E('button', { + 'class': 'btn cbi-button', + 'click': ui.createHandlerFn(this, function() { + var ip = document.getElementById('correlate-ip').value; + if (ip) { + return callCorrelateIP(ip).then(function(res) { + ui.addNotification(null, E('p', res.message || 'Correlation triggered'), 'info'); + }); + } + }) + }, 'Correlate') + ]) + ]), corr.running ? '#00d4aa' : '#808090') + ]), + + // Threats Table + createCard('Recent Threats', '⚠️', E('div', {}, [ + E('div', { 'style': 'color:#808090;margin-bottom:8px;' }, + 'Total alerts: ' + (threats.total || 0)), + E('div', { 'style': 'overflow-x:auto;' }, [ + E('table', { + 'style': 'width:100%;border-collapse:collapse;font-size:0.85rem;' + }, [ + E('thead', {}, [ + E('tr', { 'style': 'border-bottom:1px solid #2a2a3a;' }, [ + E('th', { 'style': 'padding:8px;text-align:left;color:#808090;' }, 'Time'), + E('th', { 'style': 'padding:8px;text-align:left;color:#808090;' }, 'Client IP'), + E('th', { 'style': 'padding:8px;text-align:left;color:#808090;' }, 'Host'), + E('th', { 'style': 'padding:8px;text-align:left;color:#808090;' }, 'Path'), + E('th', { 'style': 'padding:8px;text-align:left;color:#808090;' }, 'Categories'), + E('th', { 'style': 'padding:8px;text-align:center;color:#808090;' }, 'Score'), + E('th', { 'style': 'padding:8px;text-align:center;color:#808090;' }, 'Blocked') + ]) + ]), + E('tbody', {}, + ((threats.alerts || []).slice(-15).reverse()).map(createThreatRow) + ) + ]) + ]) + ]), '#ffa500'), + + // Flow Protocols + flows.protocols ? createCard('Protocol Distribution', '📊', E('div', {}, [ + E('div', { 'style': 'display:flex;flex-wrap:wrap;gap:8px;' }, + Object.entries(flows.protocols || {}).slice(0, 10).map(function(entry) { + return E('div', { + 'style': 'background:#1a1a24;padding:6px 12px;border-radius:6px;' + }, [ + E('span', { 'style': 'color:#00d4aa;font-weight:600;' }, entry[0]), + E('span', { 'style': 'color:#808090;margin-left:6px;' }, '(' + entry[1] + ')') + ]); + }) + ) + ]), '#00a0ff') : E('div') + ]); + + // Auto-refresh every 10 seconds + poll.add(L.bind(function() { + return Promise.all([ + callStatus().catch(function() { return {}; }), + callGetThreats(20).catch(function() { return { alerts: [] }; }) + ]).then(L.bind(function(data) { + // Update would require DOM manipulation - for now just log + // Full implementation would update metrics in place + }, this)); + }, this), 10); + + return view; + }, + + handleSaveApply: null, + handleSave: null, + handleReset: null +}); diff --git a/package/secubox/luci-app-dpi-dual/root/usr/libexec/rpcd/luci.dpi-dual b/package/secubox/luci-app-dpi-dual/root/usr/libexec/rpcd/luci.dpi-dual new file mode 100644 index 00000000..db481211 --- /dev/null +++ b/package/secubox/luci-app-dpi-dual/root/usr/libexec/rpcd/luci.dpi-dual @@ -0,0 +1,252 @@ +#!/bin/sh +# RPCD handler for DPI Dual-Stream dashboard +# Part of luci-app-dpi-dual + +. /lib/functions.sh +. /usr/share/libubox/jshn.sh + +STATS_DIR="/tmp/secubox" +FLOW_DIR="/tmp/dpi-flows" +BUFFER_FILE="$STATS_DIR/dpi-buffer.json" +FLOWS_FILE="$STATS_DIR/dpi-flows.json" +THREATS_FILE="$STATS_DIR/correlated-threats.json" +ALERTS_FILE="$STATS_DIR/waf-alerts.json" + +read_json_file() { + local file="$1" + if [ -f "$file" ]; then + cat "$file" + else + echo '{}' + fi +} + +case "$1" in + list) + cat << 'EOF' +{ + "status": {}, + "get_flows": {}, + "get_buffer": {"limit": 100}, + "get_threats": {"limit": 50}, + "get_correlation": {"limit": 20}, + "get_mirror_status": {}, + "start": {}, + "stop": {}, + "restart": {}, + "replay_request": {"req_hash": "string"}, + "correlate_ip": {"ip": "string"} +} +EOF + ;; + + call) + case "$2" in + status) + # Get unified status of both streams + config_load dpi-dual + + local enabled mode correlation + config_get enabled settings enabled "0" + config_get mode settings mode "dual" + config_get correlation settings correlation "0" + + # Check processes + local mitm_running=0 tap_running=0 collector_running=0 correlator_running=0 + pgrep mitmproxy >/dev/null 2>&1 && mitm_running=1 + pgrep netifyd >/dev/null 2>&1 && tap_running=1 + pgrep dpi-flow-collector >/dev/null 2>&1 && collector_running=1 + pgrep dpi-correlator >/dev/null 2>&1 && correlator_running=1 + + # Get TAP interface status + local tap_if tap_up=0 tap_rx=0 tap_tx=0 + config_get tap_if tap interface "tap0" + if ip link show "$tap_if" >/dev/null 2>&1; then + tap_up=1 + tap_rx=$(cat "/sys/class/net/$tap_if/statistics/rx_bytes" 2>/dev/null || echo 0) + tap_tx=$(cat "/sys/class/net/$tap_if/statistics/tx_bytes" 2>/dev/null || echo 0) + fi + + # Get buffer stats + local buffer_entries=0 buffer_threats=0 buffer_blocked=0 + if [ -f "$BUFFER_FILE" ]; then + buffer_entries=$(jsonfilter -i "$BUFFER_FILE" -e '@.entries' 2>/dev/null || echo 0) + buffer_threats=$(jsonfilter -i "$BUFFER_FILE" -e '@.threats_detected' 2>/dev/null || echo 0) + buffer_blocked=$(jsonfilter -i "$BUFFER_FILE" -e '@.blocked_count' 2>/dev/null || echo 0) + fi + + # Get flow stats + local flows_1min=0 + if [ -f "$FLOWS_FILE" ]; then + flows_1min=$(jsonfilter -i "$FLOWS_FILE" -e '@.flows_1min' 2>/dev/null || echo 0) + fi + + # Get correlation stats + local correlated_threats=0 + if [ -f "$THREATS_FILE" ]; then + correlated_threats=$(wc -l < "$THREATS_FILE" 2>/dev/null || echo 0) + fi + + cat << EOF +{ + "enabled": $enabled, + "mode": "$mode", + "correlation_enabled": $correlation, + "mitm_stream": { + "running": $mitm_running, + "buffer_entries": $buffer_entries, + "threats_detected": $buffer_threats, + "blocked_count": $buffer_blocked + }, + "tap_stream": { + "running": $tap_running, + "interface": "$tap_if", + "interface_up": $tap_up, + "rx_bytes": $tap_rx, + "tx_bytes": $tap_tx, + "flows_1min": $flows_1min, + "collector_running": $collector_running + }, + "correlation": { + "running": $correlator_running, + "threats_correlated": $correlated_threats + } +} +EOF + ;; + + get_flows) + read_json_file "$FLOWS_FILE" + ;; + + get_buffer) + read "$3" + json_load "$REPLY" + json_get_var limit limit 100 + + if [ -f "$BUFFER_FILE" ]; then + cat "$BUFFER_FILE" + else + echo '{"entries": 0, "requests": []}' + fi + ;; + + get_threats) + read "$3" + json_load "$REPLY" + json_get_var limit limit 50 + + if [ -f "$ALERTS_FILE" ]; then + # Return last N alerts + local total + total=$(jsonfilter -i "$ALERTS_FILE" -e '@[*]' 2>/dev/null | wc -l) + + cat << EOF +{ + "total": $total, + "alerts": $(tail -c 50000 "$ALERTS_FILE" 2>/dev/null || echo '[]') +} +EOF + else + echo '{"total": 0, "alerts": []}' + fi + ;; + + get_correlation) + read "$3" + json_load "$REPLY" + json_get_var limit limit 20 + + if [ -f "$THREATS_FILE" ]; then + local total + total=$(wc -l < "$THREATS_FILE" 2>/dev/null || echo 0) + + cat << EOF +{ + "total": $total, + "correlated": $(tail -"$limit" "$THREATS_FILE" 2>/dev/null | tr '\n' ',' | sed 's/,$//' | awk '{print "["$0"]"}') +} +EOF + else + echo '{"total": 0, "correlated": []}' + fi + ;; + + get_mirror_status) + /usr/lib/dpi-dual/mirror-setup.sh status 2>&1 | \ + awk 'BEGIN{print "{"} + /TAP Interface/ {tap=1} + /not found/ {up=0} + /UP/ {up=1} + /RX:/ {rx=$2} + /TX:/ {tx=$2} + /ingress/ {ing=1} + END{ + printf "\"tap_found\": %s, \"tap_up\": %s, \"ingress_configured\": %s", + (tap?1:0), (up?1:0), (ing?1:0); + print "}" + }' + ;; + + start) + /usr/sbin/dpi-dualctl start >/dev/null 2>&1 + echo '{"success": true}' + ;; + + stop) + /usr/sbin/dpi-dualctl stop >/dev/null 2>&1 + echo '{"success": true}' + ;; + + restart) + /usr/sbin/dpi-dualctl restart >/dev/null 2>&1 + echo '{"success": true}' + ;; + + replay_request) + read "$3" + json_load "$REPLY" + json_get_var req_hash req_hash "" + + if [ -z "$req_hash" ]; then + echo '{"success": false, "error": "req_hash required"}' + else + # Add to replay queue (read by mitmproxy addon) + local queue_file="/tmp/dpi-buffer/replay-queue.json" + mkdir -p /tmp/dpi-buffer + + if [ ! -f "$queue_file" ]; then + echo "[]" > "$queue_file" + fi + + local entry="{\"req_hash\":\"$req_hash\",\"queued_at\":\"$(date -Iseconds)\",\"status\":\"pending\"}" + + # Append to queue (keep last 100) + (cat "$queue_file" | jsonfilter -e '@[*]' 2>/dev/null; echo "$entry") | \ + tail -100 | \ + awk 'BEGIN{print "["} {if(NR>1)print ","; print} END{print "]"}' > "$queue_file.tmp" + mv "$queue_file.tmp" "$queue_file" + + echo '{"success": true, "message": "Request queued for replay"}' + fi + ;; + + correlate_ip) + read "$3" + json_load "$REPLY" + json_get_var ip ip "" + + if [ -z "$ip" ]; then + echo '{"success": false, "error": "IP required"}' + else + /usr/sbin/dpi-correlator correlate "$ip" "manual_request" >/dev/null 2>&1 + echo '{"success": true, "message": "Correlation triggered for '"$ip"'"}' + fi + ;; + + *) + echo '{"error": "Unknown method"}' + ;; + esac + ;; +esac diff --git a/package/secubox/luci-app-dpi-dual/root/usr/share/luci/menu.d/luci-app-dpi-dual.json b/package/secubox/luci-app-dpi-dual/root/usr/share/luci/menu.d/luci-app-dpi-dual.json new file mode 100644 index 00000000..23879462 --- /dev/null +++ b/package/secubox/luci-app-dpi-dual/root/usr/share/luci/menu.d/luci-app-dpi-dual.json @@ -0,0 +1,14 @@ +{ + "admin/secubox/dpi-dual": { + "title": "DPI Dual-Stream", + "order": 45, + "action": { + "type": "view", + "path": "dpi-dual/overview" + }, + "depends": { + "acl": ["luci-app-dpi-dual"], + "uci": { "dpi-dual": true } + } + } +} diff --git a/package/secubox/luci-app-dpi-dual/root/usr/share/rpcd/acl.d/luci-app-dpi-dual.json b/package/secubox/luci-app-dpi-dual/root/usr/share/rpcd/acl.d/luci-app-dpi-dual.json new file mode 100644 index 00000000..3f7b0368 --- /dev/null +++ b/package/secubox/luci-app-dpi-dual/root/usr/share/rpcd/acl.d/luci-app-dpi-dual.json @@ -0,0 +1,30 @@ +{ + "luci-app-dpi-dual": { + "description": "Grant access to DPI Dual-Stream dashboard", + "read": { + "ubus": { + "luci.dpi-dual": [ + "status", + "get_flows", + "get_buffer", + "get_threats", + "get_correlation", + "get_mirror_status" + ] + }, + "uci": ["dpi-dual"] + }, + "write": { + "ubus": { + "luci.dpi-dual": [ + "start", + "stop", + "restart", + "replay_request", + "correlate_ip" + ] + }, + "uci": ["dpi-dual"] + } + } +} diff --git a/package/secubox/secubox-app-streamlit/files/usr/share/streamlit/secubox_control.py b/package/secubox/secubox-app-streamlit/files/usr/share/streamlit/secubox_control.py index fde3bd99..fa19ddac 100644 --- a/package/secubox/secubox-app-streamlit/files/usr/share/streamlit/secubox_control.py +++ b/package/secubox/secubox-app-streamlit/files/usr/share/streamlit/secubox_control.py @@ -105,6 +105,16 @@ def get_data(): d["active_bans"] = cs_detail.get("active_bans", 0) d["total_decisions"] = cs_detail.get("total_decisions", 0) + # DPI Dual-Stream stats + dpi_buffer = read_cache("/tmp/secubox/dpi-buffer.json") + dpi_flows = read_cache("/tmp/secubox/dpi-flows.json") + d["dpi_buffer_entries"] = dpi_buffer.get("entries", 0) + d["dpi_threats"] = dpi_buffer.get("threats_detected", 0) + d["dpi_blocked"] = dpi_buffer.get("blocked_count", 0) + d["dpi_flows"] = dpi_flows.get("flows_1min", 0) + d["dpi_rx"] = dpi_flows.get("rx_bytes", 0) + d["dpi_tx"] = dpi_flows.get("tx_bytes", 0) + d["p_haproxy"] = 3 if d["haproxy"] else 10 d["p_crowdsec"] = 3 if d["crowdsec"] and d["cs_alerts"] == 0 else 7 if d["cs_alerts"] > 0 else 10 d["p_mitmproxy"] = 3 if d["mitmproxy"] else 6 @@ -136,7 +146,7 @@ def main(): ''', unsafe_allow_html=True) st.markdown('
SERVICES
', unsafe_allow_html=True) - c1, c2, c3 = st.columns(3) + c1, c2, c3, c4 = st.columns(4) with c1: st.markdown(f''' @@ -168,6 +178,19 @@ def main(): ''', unsafe_allow_html=True) + dpi_color = "#00d4aa" if d["dpi_buffer_entries"] > 0 else "#808090" + with c4: + st.markdown(f''' +
+
📡 DPI Dual
+
+
{d['dpi_flows']}
Flows/min
+
{d['dpi_threats']}
Threats
+
{d['dpi_blocked']}
Blocked
+
+
+ ''', unsafe_allow_html=True) + st.markdown('
SYSTEM
', unsafe_allow_html=True) c1, c2, c3, c4 = st.columns(4) diff --git a/package/secubox/secubox-dpi-dual/files/srv/mitmproxy/addons/dpi_buffer.py b/package/secubox/secubox-dpi-dual/files/srv/mitmproxy/addons/dpi_buffer.py index 6538e080..908f4f53 100644 --- a/package/secubox/secubox-dpi-dual/files/srv/mitmproxy/addons/dpi_buffer.py +++ b/package/secubox/secubox-dpi-dual/files/srv/mitmproxy/addons/dpi_buffer.py @@ -1,24 +1,30 @@ #!/usr/bin/env python3 """ -DPI Double Buffer Addon for mitmproxy +DPI Double Buffer Addon for mitmproxy - Phase 2 Part of secubox-dpi-dual package Implements the double-buffer pattern: - Buffer A: Live path, minimal latency (default mitmproxy behavior) - Buffer B: Copy for deep analysis, async processing -This addon queues requests for asynchronous analysis without -blocking the live traffic path. +Features: +- Ring buffer with configurable size +- Async threat analysis without blocking +- Request replay capability for forensic analysis +- Context gathering for correlated threat investigation +- Stats endpoint for monitoring """ import json import time import hashlib import asyncio +import re from pathlib import Path from collections import deque -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, List from mitmproxy import http, ctx +from mitmproxy.net.http import Response class DPIBuffer: @@ -29,14 +35,56 @@ class DPIBuffer: self.buffer: deque = deque(maxlen=self.buffer_size) self.buffer_dir = Path("/tmp/dpi-buffer") self.stats_file = Path("/tmp/secubox/dpi-buffer.json") + self.alerts_file = Path("/tmp/secubox/waf-alerts.json") + self.replay_queue_file = Path("/tmp/dpi-buffer/replay-queue.json") self.analysis_enabled = True + self.replay_enabled = True self.request_count = 0 self.threat_count = 0 + self.blocked_count = 0 + + # Threat detection patterns + self.threat_patterns = { + "path_traversal": [ + r"\.\./", r"\.\.\\", r"%2e%2e[/\\]", r"%252e%252e", + ], + "xss": [ + r"]+onerror", r"]+onload", + ], + "sqli": [ + r"(?i)union\s+select", r"(?i)insert\s+into", r"(?i)drop\s+table", + r"(?i)or\s+1\s*=\s*1", r"(?i)'\s*or\s+'", r";\s*--", + ], + "lfi": [ + r"/etc/passwd", r"/etc/shadow", r"/proc/self", + r"php://filter", r"php://input", + ], + "rce": [ + r"(?i)cmd\s*=", r"(?i)exec\s*=", r"system\s*\(", + r"\$\{.*\}", r"`[^`]+`", r"\|\s*\w+", + ], + "ssrf": [ + r"(?i)url\s*=\s*https?://", r"(?i)file://", r"(?i)gopher://", + r"169\.254\.169\.254", r"127\.0\.0\.1", r"localhost", + ], + } + + # Compile patterns for performance + self.compiled_patterns = {} + for category, patterns in self.threat_patterns.items(): + self.compiled_patterns[category] = [ + re.compile(p, re.IGNORECASE) for p in patterns + ] # Ensure directories exist self.buffer_dir.mkdir(parents=True, exist_ok=True) self.stats_file.parent.mkdir(parents=True, exist_ok=True) + # Initialize replay queue + if not self.replay_queue_file.exists(): + self.replay_queue_file.write_text("[]") + def load(self, loader): """Load configuration from mitmproxy options.""" loader.add_option( @@ -51,18 +99,32 @@ class DPIBuffer: default=True, help="Enable asynchronous request analysis", ) + loader.add_option( + name="dpi_replay_enabled", + typespec=bool, + default=True, + help="Enable request replay capability", + ) + loader.add_option( + name="dpi_block_threats", + typespec=bool, + default=False, + help="Block requests that match threat patterns (careful!)", + ) def configure(self, updated): """Apply configuration updates.""" if "dpi_buffer_size" in updated: self.buffer_size = ctx.options.dpi_buffer_size - # Resize buffer new_buffer = deque(self.buffer, maxlen=self.buffer_size) self.buffer = new_buffer if "dpi_async_analysis" in updated: self.analysis_enabled = ctx.options.dpi_async_analysis + if "dpi_replay_enabled" in updated: + self.replay_enabled = ctx.options.dpi_replay_enabled + def request(self, flow: http.HTTPFlow): """ Handle incoming request. @@ -73,13 +135,32 @@ class DPIBuffer: # Build entry for Buffer B (async analysis) entry = self._build_entry(flow) + + # Quick synchronous threat check (for blocking mode) + threat_result = self._quick_threat_check(entry) + entry["threat_categories"] = threat_result["categories"] + entry["threat_score"] = threat_result["score"] + + # Block if enabled and high threat score + if ctx.options.dpi_block_threats and threat_result["score"] >= 50: + self.blocked_count += 1 + flow.response = Response.make( + 403, + b"Request blocked by DPI analysis", + {"Content-Type": "text/plain"} + ) + entry["blocked"] = True + self._log_threat_sync(entry) + return + + # Add to buffer self.buffer.append(entry) - # Queue for async analysis if enabled + # Queue for async deep analysis if enabled if self.analysis_enabled: asyncio.create_task(self._async_analyze(entry)) - # Update stats periodically (every 10 requests) + # Update stats periodically if self.request_count % 10 == 0: self._write_stats() @@ -88,7 +169,6 @@ class DPIBuffer: if not flow.request.timestamp_start: return - # Find and update the corresponding entry req_hash = self._request_hash(flow) for entry in self.buffer: if entry.get("req_hash") == req_hash: @@ -96,19 +176,32 @@ class DPIBuffer: "status": flow.response.status_code if flow.response else None, "content_length": len(flow.response.content) if flow.response and flow.response.content else 0, "content_type": flow.response.headers.get("content-type", "") if flow.response else "", + "latency_ms": int((time.time() - entry["ts"]) * 1000), } break def _build_entry(self, flow: http.HTTPFlow) -> Dict[str, Any]: """Build a buffer entry from a flow.""" content_hash = None + content_preview = None if flow.request.content: content_hash = hashlib.md5(flow.request.content).hexdigest() + # Store first 500 bytes for analysis + content_preview = flow.request.content[:500].decode('utf-8', errors='replace') client_ip = "unknown" if flow.client_conn and flow.client_conn.peername: client_ip = flow.client_conn.peername[0] + # Extract query parameters + query_params = {} + if "?" in flow.request.path: + query_string = flow.request.path.split("?", 1)[1] + for param in query_string.split("&"): + if "=" in param: + key, value = param.split("=", 1) + query_params[key] = value + return { "ts": flow.request.timestamp_start, "req_hash": self._request_hash(flow), @@ -116,12 +209,17 @@ class DPIBuffer: "host": flow.request.host, "port": flow.request.port, "path": flow.request.path, + "query_params": query_params, "headers": dict(flow.request.headers), "content_hash": content_hash, + "content_preview": content_preview, "content_length": len(flow.request.content) if flow.request.content else 0, "client_ip": client_ip, + "user_agent": flow.request.headers.get("user-agent", ""), "analyzed": False, "threat_score": 0, + "threat_categories": [], + "blocked": False, } def _request_hash(self, flow: http.HTTPFlow) -> str: @@ -129,58 +227,83 @@ class DPIBuffer: key = f"{flow.request.timestamp_start}:{flow.request.host}:{flow.request.path}" return hashlib.md5(key.encode()).hexdigest()[:16] + def _quick_threat_check(self, entry: Dict[str, Any]) -> Dict[str, Any]: + """Quick synchronous threat check for blocking decisions.""" + score = 0 + categories = [] + + # Check path + query string + full_path = entry.get("path", "") + content = entry.get("content_preview", "") or "" + + for category, patterns in self.compiled_patterns.items(): + for pattern in patterns: + if pattern.search(full_path) or pattern.search(content): + if category not in categories: + categories.append(category) + score += 25 + break + + # Additional heuristics + headers = entry.get("headers", {}) + + # Suspicious user agent + ua = entry.get("user_agent", "").lower() + suspicious_ua = ["sqlmap", "nikto", "nmap", "masscan", "zgrab", "gobuster"] + if any(s in ua for s in suspicious_ua): + categories.append("scanner") + score += 30 + + # Missing or suspicious headers + if not ua or len(ua) < 10: + score += 5 + + # Large POST without content-type + if entry.get("method") == "POST" and entry.get("content_length", 0) > 0: + if "content-type" not in [k.lower() for k in headers.keys()]: + score += 10 + + return {"score": min(score, 100), "categories": categories} + async def _async_analyze(self, entry: Dict[str, Any]): """ Async analysis pipeline - runs without blocking live traffic. - - Analysis steps: - 1. Pattern matching against known threat signatures - 2. Anomaly scoring based on request characteristics - 3. Rate limiting detection - 4. Write results to analysis log + Deep analysis including: + - Pattern matching with context + - Rate limiting detection + - Behavioral analysis """ try: - threat_score = 0 + # Deep pattern analysis already done in quick check + # Here we can add more expensive analysis - # Simple heuristic analysis (placeholder for more sophisticated detection) - # Check for common attack patterns in path - suspicious_patterns = [ - "../", "..\\", # Path traversal - " 20: + entry["threat_categories"].append("rate_limit") + entry["threat_score"] = min(entry["threat_score"] + 20, 100) - path_lower = entry.get("path", "").lower() - for pattern in suspicious_patterns: - if pattern.lower() in path_lower: - threat_score += 20 - - # Check for unusual content types in requests - content_type = entry.get("headers", {}).get("content-type", "") - if "multipart/form-data" in content_type and entry.get("content_length", 0) > 1000000: - threat_score += 10 # Large file upload - - # Update entry with analysis results + # Mark as analyzed entry["analyzed"] = True - entry["threat_score"] = min(threat_score, 100) - # Track threats - if threat_score > 30: + # Log if threat detected + if entry["threat_score"] > 30: self.threat_count += 1 await self._log_threat(entry) except Exception as e: ctx.log.error(f"DPI Buffer analysis error: {e}") - async def _log_threat(self, entry: Dict[str, Any]): - """Log a detected threat to the alerts file.""" - alert_file = Path("/tmp/secubox/waf-alerts.json") + def _log_threat_sync(self, entry: Dict[str, Any]): + """Synchronous threat logging for blocked requests.""" try: alerts = [] - if alert_file.exists(): - alerts = json.loads(alert_file.read_text()) + if self.alerts_file.exists(): + try: + alerts = json.loads(self.alerts_file.read_text()) + except: + alerts = [] alert_id = len(alerts) + 1 alerts.append({ @@ -191,32 +314,57 @@ class DPIBuffer: "path": entry.get("path"), "method": entry.get("method"), "threat_score": entry.get("threat_score"), + "categories": entry.get("threat_categories", []), + "blocked": entry.get("blocked", False), "rule": "dpi_buffer_analysis", }) - # Keep last 1000 alerts alerts = alerts[-1000:] - alert_file.write_text(json.dumps(alerts, indent=2)) - + self.alerts_file.write_text(json.dumps(alerts, indent=2)) except Exception as e: ctx.log.error(f"Failed to log threat: {e}") + async def _log_threat(self, entry: Dict[str, Any]): + """Log a detected threat to the alerts file.""" + self._log_threat_sync(entry) + def _write_stats(self): """Write buffer statistics to stats file.""" try: + # Calculate threat distribution + threat_dist = {} + high_threat_count = 0 + for e in self.buffer: + for cat in e.get("threat_categories", []): + threat_dist[cat] = threat_dist.get(cat, 0) + 1 + if e.get("threat_score", 0) > 30: + high_threat_count += 1 + + # Top hosts + host_counts = {} + for e in self.buffer: + host = e.get("host", "unknown") + host_counts[host] = host_counts.get(host, 0) + 1 + top_hosts = sorted(host_counts.items(), key=lambda x: x[1], reverse=True)[:10] + stats = { "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), "entries": len(self.buffer), "max_size": self.buffer_size, "requests_total": self.request_count, "threats_detected": self.threat_count, + "blocked_count": self.blocked_count, + "high_threat_in_buffer": high_threat_count, + "threat_distribution": threat_dist, + "top_hosts": dict(top_hosts), "analysis_enabled": self.analysis_enabled, + "replay_enabled": self.replay_enabled, } self.stats_file.write_text(json.dumps(stats, indent=2)) except Exception as e: ctx.log.error(f"Failed to write stats: {e}") - def get_context(self, client_ip: str, window_sec: int = 60) -> list: + def get_context(self, client_ip: str, window_sec: int = 60) -> List[Dict]: """ Get recent requests from the same IP for context on alerts. Used by the correlation engine to gather context around threat events. @@ -228,6 +376,70 @@ class DPIBuffer: and now - e.get("ts", 0) < window_sec ] + def get_replay_candidates(self, client_ip: str = None, + min_threat_score: int = 0, + limit: int = 100) -> List[Dict]: + """Get requests suitable for replay analysis.""" + candidates = [] + for e in self.buffer: + if client_ip and e.get("client_ip") != client_ip: + continue + if e.get("threat_score", 0) < min_threat_score: + continue + candidates.append({ + "req_hash": e.get("req_hash"), + "ts": e.get("ts"), + "method": e.get("method"), + "host": e.get("host"), + "path": e.get("path"), + "threat_score": e.get("threat_score"), + "categories": e.get("threat_categories", []), + }) + if len(candidates) >= limit: + break + return candidates + + def queue_replay(self, req_hash: str) -> bool: + """Queue a request for replay analysis.""" + if not self.replay_enabled: + return False + + # Find the request in buffer + entry = None + for e in self.buffer: + if e.get("req_hash") == req_hash: + entry = e + break + + if not entry: + return False + + try: + queue = [] + if self.replay_queue_file.exists(): + queue = json.loads(self.replay_queue_file.read_text()) + + # Add to queue with replay metadata + replay_entry = { + "queued_at": time.strftime("%Y-%m-%dT%H:%M:%S"), + "original_ts": entry.get("ts"), + "method": entry.get("method"), + "host": entry.get("host"), + "path": entry.get("path"), + "headers": entry.get("headers"), + "content_preview": entry.get("content_preview"), + "req_hash": req_hash, + "status": "pending", + } + queue.append(replay_entry) + queue = queue[-100:] # Keep last 100 + + self.replay_queue_file.write_text(json.dumps(queue, indent=2)) + return True + except Exception as e: + ctx.log.error(f"Failed to queue replay: {e}") + return False + # Mitmproxy addon instance addons = [DPIBuffer()]