feat(dpi): Phase 2 - MITM double buffer + LuCI dashboard

MITM Double Buffer (dpi_buffer.py):
- Compiled regex patterns for 6 threat categories
- Scanner detection (sqlmap, nikto, nmap, etc.)
- Optional blocking mode for high-score threats
- Request replay queue for forensic analysis
- Rate limiting detection
- Stats: buffer entries, threat distribution, top hosts

LuCI Dashboard (luci-app-dpi-dual):
- RPCD handler with 10 methods
- KISS-themed overview with stream status cards
- LED indicators for MITM/TAP/Correlation
- Threats table with score and blocked status
- Protocol distribution from netifyd
- Manual IP correlation trigger

Streamlit Control Panel:
- Added DPI Dual card with flows/threats/blocked metrics

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
CyberMind-FR 2026-03-15 12:20:56 +01:00
parent 58a51eb271
commit a24beaf316
9 changed files with 959 additions and 74 deletions

View File

@ -5202,3 +5202,14 @@ git checkout HEAD -- index.html
- UCI config: `/etc/config/dpi-dual` with dual/mitm-only/tap-only modes
- Files: mirror-setup.sh, dpi-flow-collector, dpi-correlator, dpi-dualctl, init.d/dpi-dual, dpi_buffer.py
- **Dual-Stream DPI Phase 2 - MITM Double Buffer + LuCI (Complete)**
- Enhanced mitmproxy addon `dpi_buffer.py`:
- Compiled regex for 6 threat categories (path_traversal, xss, sqli, lfi, rce, ssrf)
- Scanner detection, optional blocking, request replay queue
- New `luci-app-dpi-dual` package:
- RPCD handler with 10 methods (status, flows, buffer, threats, correlation, control)
- KISS dashboard with stream status cards, LED indicators, threats table
- Protocol distribution, manual IP correlation
- Streamlit control panel: Added DPI Dual card

View File

@ -581,35 +581,35 @@ _Last updated: 2026-03-15 (Wall Colorsets)_
### 2026-03-15
- **Dual-Stream DPI Architecture (Phase 1 Complete)**
- **Dual-Stream DPI Architecture (Phase 2 Complete)**
- New `secubox-dpi-dual` package implementing parallel MITM + Passive TAP DPI
- Architecture doc: `package/secubox/DUAL-STREAM-DPI.md`
- **TAP Stream (Passive)**:
- **Phase 1 - TAP Stream (Passive)**:
- `mirror-setup.sh`: tc mirred port mirroring (ingress + egress)
- Creates dummy TAP interface for netifyd analysis
- Software and hardware TAP mode support
- **Flow Collector**:
- `dpi-flow-collector`: Aggregates netifyd flow statistics
- Writes stats to `/tmp/secubox/dpi-flows.json`
- Interface stats from /sys/class/net
- Configurable flow retention cleanup
- **Correlation Engine**:
- `dpi-correlator`: Matches MITM + TAP stream events
- Watches CrowdSec decisions and WAF alerts
- Enriches threats with context from both streams
- Output: `/tmp/secubox/correlated-threats.json`
- **CLI Tool**:
- `dpi-dualctl`: start/stop/restart/status/flows/threats/mirror
- Shows unified status of both streams
- **Procd Service**:
- `init.d/dpi-dual`: Manages flow-collector and correlator instances
- Auto-starts based on UCI mode setting (dual/mitm-only/tap-only)
- **MITM Double Buffer (Phase 2 prep)**:
- `dpi_buffer.py`: mitmproxy addon for async analysis
- Ring buffer with configurable size (1000 requests default)
- Heuristic threat scoring (path traversal, XSS, SQLi, LFI patterns)
- Writes threats to `/tmp/secubox/waf-alerts.json`
- **UCI Config**: `/etc/config/dpi-dual` with global, mitm, tap, correlation sections
- `dpi-flow-collector`: Aggregates netifyd stats → `/tmp/secubox/dpi-flows.json`
- `dpi-correlator`: Matches MITM + TAP events, CrowdSec integration
- `dpi-dualctl`: CLI start/stop/status/flows/threats/mirror
- `init.d/dpi-dual`: Procd service for flow-collector + correlator
- **Phase 2 - MITM Double Buffer + LuCI**:
- Enhanced `dpi_buffer.py` mitmproxy addon:
- Compiled regex patterns for 6 threat categories (path_traversal, xss, sqli, lfi, rce, ssrf)
- Scanner detection (sqlmap, nikto, nmap, etc.)
- Optional blocking mode for high-score threats
- Request replay queue for forensic analysis
- Rate limiting detection
- Stats: buffer entries, threat distribution, top hosts
- **LuCI Dashboard** (`luci-app-dpi-dual`):
- RPCD handler with 10 methods (status, flows, buffer, threats, correlation, start/stop/restart, replay, correlate)
- KISS-themed overview with stream status cards
- LED indicators for MITM/TAP/Correlation running state
- Metrics: buffer entries, threats, blocked, flows/min, RX/TX bytes
- Threats table with timestamp, IP, host, path, categories, score, blocked status
- Protocol distribution from netifyd
- Manual IP correlation trigger
- ACL permissions for read/write
- **Streamlit Control Panel** updated:
- DPI Dual card with flows/min, threats, blocked metrics
- Reads from dpi-buffer.json and dpi-flows.json caches
---

View File

@ -0,0 +1,30 @@
include $(TOPDIR)/rules.mk
LUCI_TITLE:=LuCI Dual-Stream DPI Dashboard
LUCI_DESCRIPTION:=Dashboard for MITM + Passive TAP deep packet inspection
LUCI_DEPENDS:=+luci-base +secubox-dpi-dual
LUCI_PKGARCH:=all
PKG_NAME:=luci-app-dpi-dual
PKG_VERSION:=1.0.0
PKG_RELEASE:=1
PKG_MAINTAINER:=SecuBox <secubox@gk2.net>
PKG_LICENSE:=GPL-3.0
include $(TOPDIR)/feeds/luci/luci.mk
define Package/luci-app-dpi-dual/install
$(INSTALL_DIR) $(1)/usr/share/luci/menu.d
$(INSTALL_DATA) ./root/usr/share/luci/menu.d/luci-app-dpi-dual.json $(1)/usr/share/luci/menu.d/
$(INSTALL_DIR) $(1)/usr/share/rpcd/acl.d
$(INSTALL_DATA) ./root/usr/share/rpcd/acl.d/luci-app-dpi-dual.json $(1)/usr/share/rpcd/acl.d/
$(INSTALL_DIR) $(1)/usr/libexec/rpcd
$(INSTALL_BIN) ./root/usr/libexec/rpcd/luci.dpi-dual $(1)/usr/libexec/rpcd/
$(INSTALL_DIR) $(1)/www/luci-static/resources/view/dpi-dual
$(INSTALL_DATA) ./htdocs/luci-static/resources/view/dpi-dual/*.js $(1)/www/luci-static/resources/view/dpi-dual/
endef
$(eval $(call BuildPackage,luci-app-dpi-dual))

View File

@ -0,0 +1,313 @@
'use strict';
'require view';
'require dom';
'require poll';
'require rpc';
'require ui';
var callStatus = rpc.declare({
object: 'luci.dpi-dual',
method: 'status',
expect: {}
});
var callGetFlows = rpc.declare({
object: 'luci.dpi-dual',
method: 'get_flows',
expect: {}
});
var callGetThreats = rpc.declare({
object: 'luci.dpi-dual',
method: 'get_threats',
params: ['limit'],
expect: {}
});
var callGetCorrelation = rpc.declare({
object: 'luci.dpi-dual',
method: 'get_correlation',
params: ['limit'],
expect: {}
});
var callStart = rpc.declare({
object: 'luci.dpi-dual',
method: 'start',
expect: {}
});
var callStop = rpc.declare({
object: 'luci.dpi-dual',
method: 'stop',
expect: {}
});
var callRestart = rpc.declare({
object: 'luci.dpi-dual',
method: 'restart',
expect: {}
});
var callCorrelateIP = rpc.declare({
object: 'luci.dpi-dual',
method: 'correlate_ip',
params: ['ip'],
expect: {}
});
function formatBytes(bytes) {
if (bytes === 0) return '0 B';
var k = 1024;
var sizes = ['B', 'KB', 'MB', 'GB'];
var i = Math.floor(Math.log(bytes) / Math.log(k));
return parseFloat((bytes / Math.pow(k, i)).toFixed(1)) + ' ' + sizes[i];
}
function formatTimestamp(ts) {
if (!ts) return '-';
var d = new Date(ts);
return d.toLocaleTimeString();
}
function createStatusLED(running) {
var color = running ? '#00d4aa' : '#ff4d4d';
var label = running ? 'RUNNING' : 'STOPPED';
return E('span', {
'style': 'display:inline-flex;align-items:center;gap:6px;'
}, [
E('span', {
'style': 'width:12px;height:12px;border-radius:50%;background:' + color +
';box-shadow:0 0 8px ' + color + ';'
}),
E('span', { 'style': 'font-weight:600;color:' + color + ';' }, label)
]);
}
function createCard(title, icon, content, borderColor) {
return E('div', {
'class': 'cbi-section',
'style': 'background:#12121a;border-radius:12px;padding:1rem;margin:0.5rem 0;' +
'border-left:4px solid ' + (borderColor || '#2a2a3a') + ';'
}, [
E('div', {
'style': 'display:flex;align-items:center;gap:8px;margin-bottom:0.8rem;'
}, [
E('span', { 'style': 'font-size:1.3rem;' }, icon),
E('span', { 'style': 'font-size:1.1rem;font-weight:600;color:#fff;' }, title)
]),
E('div', {}, content)
]);
}
function createMetric(label, value, color) {
return E('div', {
'style': 'background:#1a1a24;padding:0.6rem 1rem;border-radius:8px;text-align:center;min-width:80px;'
}, [
E('div', {
'style': 'font-size:1.5rem;font-weight:700;color:' + (color || '#00d4aa') + ';font-family:monospace;'
}, String(value)),
E('div', {
'style': 'font-size:0.7rem;color:#808090;text-transform:uppercase;margin-top:2px;'
}, label)
]);
}
function createThreatRow(threat) {
var scoreColor = threat.threat_score > 70 ? '#ff4d4d' :
threat.threat_score > 40 ? '#ffa500' : '#00d4aa';
return E('tr', {}, [
E('td', { 'style': 'padding:8px;color:#808090;' }, formatTimestamp(threat.timestamp)),
E('td', { 'style': 'padding:8px;font-family:monospace;color:#00a0ff;' }, threat.client_ip || '-'),
E('td', { 'style': 'padding:8px;' }, threat.host || '-'),
E('td', { 'style': 'padding:8px;max-width:200px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap;' },
threat.path || '-'),
E('td', { 'style': 'padding:8px;' }, (threat.categories || []).join(', ') || '-'),
E('td', { 'style': 'padding:8px;text-align:center;' },
E('span', {
'style': 'background:' + scoreColor + '22;color:' + scoreColor +
';padding:2px 8px;border-radius:10px;font-weight:600;'
}, String(threat.threat_score || 0))
),
E('td', { 'style': 'padding:8px;text-align:center;' },
threat.blocked ?
E('span', { 'style': 'color:#ff4d4d;' }, '🚫') :
E('span', { 'style': 'color:#808090;' }, '-')
)
]);
}
return view.extend({
load: function() {
return Promise.all([
callStatus().catch(function() { return {}; }),
callGetFlows().catch(function() { return {}; }),
callGetThreats(20).catch(function() { return { alerts: [] }; }),
callGetCorrelation(10).catch(function() { return { correlated: [] }; })
]);
},
render: function(data) {
var status = data[0] || {};
var flows = data[1] || {};
var threats = data[2] || {};
var correlation = data[3] || {};
var mitm = status.mitm_stream || {};
var tap = status.tap_stream || {};
var corr = status.correlation || {};
var view = E('div', { 'class': 'cbi-map', 'style': 'background:#0a0a12;min-height:100vh;' }, [
// Header
E('div', { 'style': 'text-align:center;padding:1rem 0;' }, [
E('h1', {
'style': 'font-size:1.8rem;font-weight:700;background:linear-gradient(90deg,#00d4aa,#00a0ff);' +
'-webkit-background-clip:text;-webkit-text-fill-color:transparent;margin:0;'
}, 'DPI Dual-Stream'),
E('div', { 'style': 'color:#606070;margin-top:4px;' },
'Mode: ' + (status.mode || 'dual').toUpperCase())
]),
// Action buttons
E('div', { 'style': 'display:flex;gap:8px;justify-content:center;margin-bottom:1rem;' }, [
E('button', {
'class': 'btn cbi-button cbi-button-apply',
'click': ui.createHandlerFn(this, function() {
return callStart().then(function() {
ui.addNotification(null, E('p', 'DPI started'), 'info');
window.location.reload();
});
})
}, '▶ Start'),
E('button', {
'class': 'btn cbi-button cbi-button-reset',
'click': ui.createHandlerFn(this, function() {
return callStop().then(function() {
ui.addNotification(null, E('p', 'DPI stopped'), 'info');
window.location.reload();
});
})
}, '⏹ Stop'),
E('button', {
'class': 'btn cbi-button',
'click': ui.createHandlerFn(this, function() {
return callRestart().then(function() {
ui.addNotification(null, E('p', 'DPI restarted'), 'info');
window.location.reload();
});
})
}, '🔄 Restart')
]),
// Stream status cards
E('div', { 'style': 'display:grid;grid-template-columns:repeat(auto-fit,minmax(300px,1fr));gap:1rem;' }, [
// MITM Stream Card
createCard('MITM Stream', '🔍', E('div', {}, [
E('div', { 'style': 'margin-bottom:0.8rem;' }, createStatusLED(mitm.running)),
E('div', { 'style': 'display:flex;flex-wrap:wrap;gap:8px;' }, [
createMetric('Buffer', mitm.buffer_entries || 0, '#00d4aa'),
createMetric('Threats', mitm.threats_detected || 0, '#ffa500'),
createMetric('Blocked', mitm.blocked_count || 0, '#ff4d4d')
])
]), mitm.running ? '#00d4aa' : '#ff4d4d'),
// TAP Stream Card
createCard('TAP Stream', '📡', E('div', {}, [
E('div', { 'style': 'margin-bottom:0.8rem;' }, createStatusLED(tap.running)),
E('div', { 'style': 'display:flex;flex-wrap:wrap;gap:8px;' }, [
createMetric('Interface', tap.interface || 'tap0', tap.interface_up ? '#00d4aa' : '#808090'),
createMetric('RX', formatBytes(tap.rx_bytes || 0), '#00a0ff'),
createMetric('TX', formatBytes(tap.tx_bytes || 0), '#00a0ff'),
createMetric('Flows/min', tap.flows_1min || 0, '#00d4aa')
])
]), tap.running ? '#00d4aa' : '#ff4d4d'),
// Correlation Card
createCard('Correlation Engine', '🔗', E('div', {}, [
E('div', { 'style': 'margin-bottom:0.8rem;' }, createStatusLED(corr.running)),
E('div', { 'style': 'display:flex;flex-wrap:wrap;gap:8px;' }, [
createMetric('Correlated', corr.threats_correlated || 0, '#ffa500')
]),
E('div', { 'style': 'margin-top:0.8rem;' }, [
E('input', {
'type': 'text',
'id': 'correlate-ip',
'placeholder': 'IP to correlate...',
'style': 'background:#1a1a24;border:1px solid #2a2a3a;border-radius:6px;' +
'padding:6px 10px;color:#fff;width:140px;margin-right:8px;'
}),
E('button', {
'class': 'btn cbi-button',
'click': ui.createHandlerFn(this, function() {
var ip = document.getElementById('correlate-ip').value;
if (ip) {
return callCorrelateIP(ip).then(function(res) {
ui.addNotification(null, E('p', res.message || 'Correlation triggered'), 'info');
});
}
})
}, 'Correlate')
])
]), corr.running ? '#00d4aa' : '#808090')
]),
// Threats Table
createCard('Recent Threats', '⚠️', E('div', {}, [
E('div', { 'style': 'color:#808090;margin-bottom:8px;' },
'Total alerts: ' + (threats.total || 0)),
E('div', { 'style': 'overflow-x:auto;' }, [
E('table', {
'style': 'width:100%;border-collapse:collapse;font-size:0.85rem;'
}, [
E('thead', {}, [
E('tr', { 'style': 'border-bottom:1px solid #2a2a3a;' }, [
E('th', { 'style': 'padding:8px;text-align:left;color:#808090;' }, 'Time'),
E('th', { 'style': 'padding:8px;text-align:left;color:#808090;' }, 'Client IP'),
E('th', { 'style': 'padding:8px;text-align:left;color:#808090;' }, 'Host'),
E('th', { 'style': 'padding:8px;text-align:left;color:#808090;' }, 'Path'),
E('th', { 'style': 'padding:8px;text-align:left;color:#808090;' }, 'Categories'),
E('th', { 'style': 'padding:8px;text-align:center;color:#808090;' }, 'Score'),
E('th', { 'style': 'padding:8px;text-align:center;color:#808090;' }, 'Blocked')
])
]),
E('tbody', {},
((threats.alerts || []).slice(-15).reverse()).map(createThreatRow)
)
])
])
]), '#ffa500'),
// Flow Protocols
flows.protocols ? createCard('Protocol Distribution', '📊', E('div', {}, [
E('div', { 'style': 'display:flex;flex-wrap:wrap;gap:8px;' },
Object.entries(flows.protocols || {}).slice(0, 10).map(function(entry) {
return E('div', {
'style': 'background:#1a1a24;padding:6px 12px;border-radius:6px;'
}, [
E('span', { 'style': 'color:#00d4aa;font-weight:600;' }, entry[0]),
E('span', { 'style': 'color:#808090;margin-left:6px;' }, '(' + entry[1] + ')')
]);
})
)
]), '#00a0ff') : E('div')
]);
// Auto-refresh every 10 seconds
poll.add(L.bind(function() {
return Promise.all([
callStatus().catch(function() { return {}; }),
callGetThreats(20).catch(function() { return { alerts: [] }; })
]).then(L.bind(function(data) {
// Update would require DOM manipulation - for now just log
// Full implementation would update metrics in place
}, this));
}, this), 10);
return view;
},
handleSaveApply: null,
handleSave: null,
handleReset: null
});

View File

@ -0,0 +1,252 @@
#!/bin/sh
# RPCD handler for DPI Dual-Stream dashboard
# Part of luci-app-dpi-dual
. /lib/functions.sh
. /usr/share/libubox/jshn.sh
STATS_DIR="/tmp/secubox"
FLOW_DIR="/tmp/dpi-flows"
BUFFER_FILE="$STATS_DIR/dpi-buffer.json"
FLOWS_FILE="$STATS_DIR/dpi-flows.json"
THREATS_FILE="$STATS_DIR/correlated-threats.json"
ALERTS_FILE="$STATS_DIR/waf-alerts.json"
read_json_file() {
local file="$1"
if [ -f "$file" ]; then
cat "$file"
else
echo '{}'
fi
}
case "$1" in
list)
cat << 'EOF'
{
"status": {},
"get_flows": {},
"get_buffer": {"limit": 100},
"get_threats": {"limit": 50},
"get_correlation": {"limit": 20},
"get_mirror_status": {},
"start": {},
"stop": {},
"restart": {},
"replay_request": {"req_hash": "string"},
"correlate_ip": {"ip": "string"}
}
EOF
;;
call)
case "$2" in
status)
# Get unified status of both streams
config_load dpi-dual
local enabled mode correlation
config_get enabled settings enabled "0"
config_get mode settings mode "dual"
config_get correlation settings correlation "0"
# Check processes
local mitm_running=0 tap_running=0 collector_running=0 correlator_running=0
pgrep mitmproxy >/dev/null 2>&1 && mitm_running=1
pgrep netifyd >/dev/null 2>&1 && tap_running=1
pgrep dpi-flow-collector >/dev/null 2>&1 && collector_running=1
pgrep dpi-correlator >/dev/null 2>&1 && correlator_running=1
# Get TAP interface status
local tap_if tap_up=0 tap_rx=0 tap_tx=0
config_get tap_if tap interface "tap0"
if ip link show "$tap_if" >/dev/null 2>&1; then
tap_up=1
tap_rx=$(cat "/sys/class/net/$tap_if/statistics/rx_bytes" 2>/dev/null || echo 0)
tap_tx=$(cat "/sys/class/net/$tap_if/statistics/tx_bytes" 2>/dev/null || echo 0)
fi
# Get buffer stats
local buffer_entries=0 buffer_threats=0 buffer_blocked=0
if [ -f "$BUFFER_FILE" ]; then
buffer_entries=$(jsonfilter -i "$BUFFER_FILE" -e '@.entries' 2>/dev/null || echo 0)
buffer_threats=$(jsonfilter -i "$BUFFER_FILE" -e '@.threats_detected' 2>/dev/null || echo 0)
buffer_blocked=$(jsonfilter -i "$BUFFER_FILE" -e '@.blocked_count' 2>/dev/null || echo 0)
fi
# Get flow stats
local flows_1min=0
if [ -f "$FLOWS_FILE" ]; then
flows_1min=$(jsonfilter -i "$FLOWS_FILE" -e '@.flows_1min' 2>/dev/null || echo 0)
fi
# Get correlation stats
local correlated_threats=0
if [ -f "$THREATS_FILE" ]; then
correlated_threats=$(wc -l < "$THREATS_FILE" 2>/dev/null || echo 0)
fi
cat << EOF
{
"enabled": $enabled,
"mode": "$mode",
"correlation_enabled": $correlation,
"mitm_stream": {
"running": $mitm_running,
"buffer_entries": $buffer_entries,
"threats_detected": $buffer_threats,
"blocked_count": $buffer_blocked
},
"tap_stream": {
"running": $tap_running,
"interface": "$tap_if",
"interface_up": $tap_up,
"rx_bytes": $tap_rx,
"tx_bytes": $tap_tx,
"flows_1min": $flows_1min,
"collector_running": $collector_running
},
"correlation": {
"running": $correlator_running,
"threats_correlated": $correlated_threats
}
}
EOF
;;
get_flows)
read_json_file "$FLOWS_FILE"
;;
get_buffer)
read "$3"
json_load "$REPLY"
json_get_var limit limit 100
if [ -f "$BUFFER_FILE" ]; then
cat "$BUFFER_FILE"
else
echo '{"entries": 0, "requests": []}'
fi
;;
get_threats)
read "$3"
json_load "$REPLY"
json_get_var limit limit 50
if [ -f "$ALERTS_FILE" ]; then
# Return last N alerts
local total
total=$(jsonfilter -i "$ALERTS_FILE" -e '@[*]' 2>/dev/null | wc -l)
cat << EOF
{
"total": $total,
"alerts": $(tail -c 50000 "$ALERTS_FILE" 2>/dev/null || echo '[]')
}
EOF
else
echo '{"total": 0, "alerts": []}'
fi
;;
get_correlation)
read "$3"
json_load "$REPLY"
json_get_var limit limit 20
if [ -f "$THREATS_FILE" ]; then
local total
total=$(wc -l < "$THREATS_FILE" 2>/dev/null || echo 0)
cat << EOF
{
"total": $total,
"correlated": $(tail -"$limit" "$THREATS_FILE" 2>/dev/null | tr '\n' ',' | sed 's/,$//' | awk '{print "["$0"]"}')
}
EOF
else
echo '{"total": 0, "correlated": []}'
fi
;;
get_mirror_status)
/usr/lib/dpi-dual/mirror-setup.sh status 2>&1 | \
awk 'BEGIN{print "{"}
/TAP Interface/ {tap=1}
/not found/ {up=0}
/UP/ {up=1}
/RX:/ {rx=$2}
/TX:/ {tx=$2}
/ingress/ {ing=1}
END{
printf "\"tap_found\": %s, \"tap_up\": %s, \"ingress_configured\": %s",
(tap?1:0), (up?1:0), (ing?1:0);
print "}"
}'
;;
start)
/usr/sbin/dpi-dualctl start >/dev/null 2>&1
echo '{"success": true}'
;;
stop)
/usr/sbin/dpi-dualctl stop >/dev/null 2>&1
echo '{"success": true}'
;;
restart)
/usr/sbin/dpi-dualctl restart >/dev/null 2>&1
echo '{"success": true}'
;;
replay_request)
read "$3"
json_load "$REPLY"
json_get_var req_hash req_hash ""
if [ -z "$req_hash" ]; then
echo '{"success": false, "error": "req_hash required"}'
else
# Add to replay queue (read by mitmproxy addon)
local queue_file="/tmp/dpi-buffer/replay-queue.json"
mkdir -p /tmp/dpi-buffer
if [ ! -f "$queue_file" ]; then
echo "[]" > "$queue_file"
fi
local entry="{\"req_hash\":\"$req_hash\",\"queued_at\":\"$(date -Iseconds)\",\"status\":\"pending\"}"
# Append to queue (keep last 100)
(cat "$queue_file" | jsonfilter -e '@[*]' 2>/dev/null; echo "$entry") | \
tail -100 | \
awk 'BEGIN{print "["} {if(NR>1)print ","; print} END{print "]"}' > "$queue_file.tmp"
mv "$queue_file.tmp" "$queue_file"
echo '{"success": true, "message": "Request queued for replay"}'
fi
;;
correlate_ip)
read "$3"
json_load "$REPLY"
json_get_var ip ip ""
if [ -z "$ip" ]; then
echo '{"success": false, "error": "IP required"}'
else
/usr/sbin/dpi-correlator correlate "$ip" "manual_request" >/dev/null 2>&1
echo '{"success": true, "message": "Correlation triggered for '"$ip"'"}'
fi
;;
*)
echo '{"error": "Unknown method"}'
;;
esac
;;
esac

View File

@ -0,0 +1,14 @@
{
"admin/secubox/dpi-dual": {
"title": "DPI Dual-Stream",
"order": 45,
"action": {
"type": "view",
"path": "dpi-dual/overview"
},
"depends": {
"acl": ["luci-app-dpi-dual"],
"uci": { "dpi-dual": true }
}
}
}

View File

@ -0,0 +1,30 @@
{
"luci-app-dpi-dual": {
"description": "Grant access to DPI Dual-Stream dashboard",
"read": {
"ubus": {
"luci.dpi-dual": [
"status",
"get_flows",
"get_buffer",
"get_threats",
"get_correlation",
"get_mirror_status"
]
},
"uci": ["dpi-dual"]
},
"write": {
"ubus": {
"luci.dpi-dual": [
"start",
"stop",
"restart",
"replay_request",
"correlate_ip"
]
},
"uci": ["dpi-dual"]
}
}
}

View File

@ -105,6 +105,16 @@ def get_data():
d["active_bans"] = cs_detail.get("active_bans", 0)
d["total_decisions"] = cs_detail.get("total_decisions", 0)
# DPI Dual-Stream stats
dpi_buffer = read_cache("/tmp/secubox/dpi-buffer.json")
dpi_flows = read_cache("/tmp/secubox/dpi-flows.json")
d["dpi_buffer_entries"] = dpi_buffer.get("entries", 0)
d["dpi_threats"] = dpi_buffer.get("threats_detected", 0)
d["dpi_blocked"] = dpi_buffer.get("blocked_count", 0)
d["dpi_flows"] = dpi_flows.get("flows_1min", 0)
d["dpi_rx"] = dpi_flows.get("rx_bytes", 0)
d["dpi_tx"] = dpi_flows.get("tx_bytes", 0)
d["p_haproxy"] = 3 if d["haproxy"] else 10
d["p_crowdsec"] = 3 if d["crowdsec"] and d["cs_alerts"] == 0 else 7 if d["cs_alerts"] > 0 else 10
d["p_mitmproxy"] = 3 if d["mitmproxy"] else 6
@ -136,7 +146,7 @@ def main():
''', unsafe_allow_html=True)
st.markdown('<div class="section-title">SERVICES</div>', unsafe_allow_html=True)
c1, c2, c3 = st.columns(3)
c1, c2, c3, c4 = st.columns(4)
with c1:
st.markdown(f'''
@ -168,6 +178,19 @@ def main():
</div>
''', unsafe_allow_html=True)
dpi_color = "#00d4aa" if d["dpi_buffer_entries"] > 0 else "#808090"
with c4:
st.markdown(f'''
<div class="status-card" style="border-left-color:{dpi_color};">
<div class="card-header"><span class="card-title">📡 DPI Dual</span></div>
<div class="metric-row">
<div class="metric-item"><div class="metric-value">{d['dpi_flows']}</div><div class="metric-label">Flows/min</div></div>
<div class="metric-item"><div class="metric-value">{d['dpi_threats']}</div><div class="metric-label">Threats</div></div>
<div class="metric-item"><div class="metric-value">{d['dpi_blocked']}</div><div class="metric-label">Blocked</div></div>
</div>
</div>
''', unsafe_allow_html=True)
st.markdown('<div class="section-title">SYSTEM</div>', unsafe_allow_html=True)
c1, c2, c3, c4 = st.columns(4)

View File

@ -1,24 +1,30 @@
#!/usr/bin/env python3
"""
DPI Double Buffer Addon for mitmproxy
DPI Double Buffer Addon for mitmproxy - Phase 2
Part of secubox-dpi-dual package
Implements the double-buffer pattern:
- Buffer A: Live path, minimal latency (default mitmproxy behavior)
- Buffer B: Copy for deep analysis, async processing
This addon queues requests for asynchronous analysis without
blocking the live traffic path.
Features:
- Ring buffer with configurable size
- Async threat analysis without blocking
- Request replay capability for forensic analysis
- Context gathering for correlated threat investigation
- Stats endpoint for monitoring
"""
import json
import time
import hashlib
import asyncio
import re
from pathlib import Path
from collections import deque
from typing import Optional, Dict, Any
from typing import Optional, Dict, Any, List
from mitmproxy import http, ctx
from mitmproxy.net.http import Response
class DPIBuffer:
@ -29,14 +35,56 @@ class DPIBuffer:
self.buffer: deque = deque(maxlen=self.buffer_size)
self.buffer_dir = Path("/tmp/dpi-buffer")
self.stats_file = Path("/tmp/secubox/dpi-buffer.json")
self.alerts_file = Path("/tmp/secubox/waf-alerts.json")
self.replay_queue_file = Path("/tmp/dpi-buffer/replay-queue.json")
self.analysis_enabled = True
self.replay_enabled = True
self.request_count = 0
self.threat_count = 0
self.blocked_count = 0
# Threat detection patterns
self.threat_patterns = {
"path_traversal": [
r"\.\./", r"\.\.\\", r"%2e%2e[/\\]", r"%252e%252e",
],
"xss": [
r"<script", r"javascript:", r"onerror\s*=", r"onload\s*=",
r"<img[^>]+onerror", r"<svg[^>]+onload",
],
"sqli": [
r"(?i)union\s+select", r"(?i)insert\s+into", r"(?i)drop\s+table",
r"(?i)or\s+1\s*=\s*1", r"(?i)'\s*or\s+'", r";\s*--",
],
"lfi": [
r"/etc/passwd", r"/etc/shadow", r"/proc/self",
r"php://filter", r"php://input",
],
"rce": [
r"(?i)cmd\s*=", r"(?i)exec\s*=", r"system\s*\(",
r"\$\{.*\}", r"`[^`]+`", r"\|\s*\w+",
],
"ssrf": [
r"(?i)url\s*=\s*https?://", r"(?i)file://", r"(?i)gopher://",
r"169\.254\.169\.254", r"127\.0\.0\.1", r"localhost",
],
}
# Compile patterns for performance
self.compiled_patterns = {}
for category, patterns in self.threat_patterns.items():
self.compiled_patterns[category] = [
re.compile(p, re.IGNORECASE) for p in patterns
]
# Ensure directories exist
self.buffer_dir.mkdir(parents=True, exist_ok=True)
self.stats_file.parent.mkdir(parents=True, exist_ok=True)
# Initialize replay queue
if not self.replay_queue_file.exists():
self.replay_queue_file.write_text("[]")
def load(self, loader):
"""Load configuration from mitmproxy options."""
loader.add_option(
@ -51,18 +99,32 @@ class DPIBuffer:
default=True,
help="Enable asynchronous request analysis",
)
loader.add_option(
name="dpi_replay_enabled",
typespec=bool,
default=True,
help="Enable request replay capability",
)
loader.add_option(
name="dpi_block_threats",
typespec=bool,
default=False,
help="Block requests that match threat patterns (careful!)",
)
def configure(self, updated):
"""Apply configuration updates."""
if "dpi_buffer_size" in updated:
self.buffer_size = ctx.options.dpi_buffer_size
# Resize buffer
new_buffer = deque(self.buffer, maxlen=self.buffer_size)
self.buffer = new_buffer
if "dpi_async_analysis" in updated:
self.analysis_enabled = ctx.options.dpi_async_analysis
if "dpi_replay_enabled" in updated:
self.replay_enabled = ctx.options.dpi_replay_enabled
def request(self, flow: http.HTTPFlow):
"""
Handle incoming request.
@ -73,13 +135,32 @@ class DPIBuffer:
# Build entry for Buffer B (async analysis)
entry = self._build_entry(flow)
# Quick synchronous threat check (for blocking mode)
threat_result = self._quick_threat_check(entry)
entry["threat_categories"] = threat_result["categories"]
entry["threat_score"] = threat_result["score"]
# Block if enabled and high threat score
if ctx.options.dpi_block_threats and threat_result["score"] >= 50:
self.blocked_count += 1
flow.response = Response.make(
403,
b"Request blocked by DPI analysis",
{"Content-Type": "text/plain"}
)
entry["blocked"] = True
self._log_threat_sync(entry)
return
# Add to buffer
self.buffer.append(entry)
# Queue for async analysis if enabled
# Queue for async deep analysis if enabled
if self.analysis_enabled:
asyncio.create_task(self._async_analyze(entry))
# Update stats periodically (every 10 requests)
# Update stats periodically
if self.request_count % 10 == 0:
self._write_stats()
@ -88,7 +169,6 @@ class DPIBuffer:
if not flow.request.timestamp_start:
return
# Find and update the corresponding entry
req_hash = self._request_hash(flow)
for entry in self.buffer:
if entry.get("req_hash") == req_hash:
@ -96,19 +176,32 @@ class DPIBuffer:
"status": flow.response.status_code if flow.response else None,
"content_length": len(flow.response.content) if flow.response and flow.response.content else 0,
"content_type": flow.response.headers.get("content-type", "") if flow.response else "",
"latency_ms": int((time.time() - entry["ts"]) * 1000),
}
break
def _build_entry(self, flow: http.HTTPFlow) -> Dict[str, Any]:
"""Build a buffer entry from a flow."""
content_hash = None
content_preview = None
if flow.request.content:
content_hash = hashlib.md5(flow.request.content).hexdigest()
# Store first 500 bytes for analysis
content_preview = flow.request.content[:500].decode('utf-8', errors='replace')
client_ip = "unknown"
if flow.client_conn and flow.client_conn.peername:
client_ip = flow.client_conn.peername[0]
# Extract query parameters
query_params = {}
if "?" in flow.request.path:
query_string = flow.request.path.split("?", 1)[1]
for param in query_string.split("&"):
if "=" in param:
key, value = param.split("=", 1)
query_params[key] = value
return {
"ts": flow.request.timestamp_start,
"req_hash": self._request_hash(flow),
@ -116,12 +209,17 @@ class DPIBuffer:
"host": flow.request.host,
"port": flow.request.port,
"path": flow.request.path,
"query_params": query_params,
"headers": dict(flow.request.headers),
"content_hash": content_hash,
"content_preview": content_preview,
"content_length": len(flow.request.content) if flow.request.content else 0,
"client_ip": client_ip,
"user_agent": flow.request.headers.get("user-agent", ""),
"analyzed": False,
"threat_score": 0,
"threat_categories": [],
"blocked": False,
}
def _request_hash(self, flow: http.HTTPFlow) -> str:
@ -129,58 +227,83 @@ class DPIBuffer:
key = f"{flow.request.timestamp_start}:{flow.request.host}:{flow.request.path}"
return hashlib.md5(key.encode()).hexdigest()[:16]
def _quick_threat_check(self, entry: Dict[str, Any]) -> Dict[str, Any]:
"""Quick synchronous threat check for blocking decisions."""
score = 0
categories = []
# Check path + query string
full_path = entry.get("path", "")
content = entry.get("content_preview", "") or ""
for category, patterns in self.compiled_patterns.items():
for pattern in patterns:
if pattern.search(full_path) or pattern.search(content):
if category not in categories:
categories.append(category)
score += 25
break
# Additional heuristics
headers = entry.get("headers", {})
# Suspicious user agent
ua = entry.get("user_agent", "").lower()
suspicious_ua = ["sqlmap", "nikto", "nmap", "masscan", "zgrab", "gobuster"]
if any(s in ua for s in suspicious_ua):
categories.append("scanner")
score += 30
# Missing or suspicious headers
if not ua or len(ua) < 10:
score += 5
# Large POST without content-type
if entry.get("method") == "POST" and entry.get("content_length", 0) > 0:
if "content-type" not in [k.lower() for k in headers.keys()]:
score += 10
return {"score": min(score, 100), "categories": categories}
async def _async_analyze(self, entry: Dict[str, Any]):
"""
Async analysis pipeline - runs without blocking live traffic.
Analysis steps:
1. Pattern matching against known threat signatures
2. Anomaly scoring based on request characteristics
3. Rate limiting detection
4. Write results to analysis log
Deep analysis including:
- Pattern matching with context
- Rate limiting detection
- Behavioral analysis
"""
try:
threat_score = 0
# Deep pattern analysis already done in quick check
# Here we can add more expensive analysis
# Simple heuristic analysis (placeholder for more sophisticated detection)
# Check for common attack patterns in path
suspicious_patterns = [
"../", "..\\", # Path traversal
"<script", "javascript:", # XSS
"SELECT ", "UNION ", "INSERT ", # SQL injection
"/etc/passwd", "/etc/shadow", # LFI
"cmd=", "exec=", "system(", # Command injection
]
# Check for rate limiting patterns
client_ip = entry.get("client_ip")
recent_from_ip = self.get_context(client_ip, window_sec=10)
if len(recent_from_ip) > 20:
entry["threat_categories"].append("rate_limit")
entry["threat_score"] = min(entry["threat_score"] + 20, 100)
path_lower = entry.get("path", "").lower()
for pattern in suspicious_patterns:
if pattern.lower() in path_lower:
threat_score += 20
# Check for unusual content types in requests
content_type = entry.get("headers", {}).get("content-type", "")
if "multipart/form-data" in content_type and entry.get("content_length", 0) > 1000000:
threat_score += 10 # Large file upload
# Update entry with analysis results
# Mark as analyzed
entry["analyzed"] = True
entry["threat_score"] = min(threat_score, 100)
# Track threats
if threat_score > 30:
# Log if threat detected
if entry["threat_score"] > 30:
self.threat_count += 1
await self._log_threat(entry)
except Exception as e:
ctx.log.error(f"DPI Buffer analysis error: {e}")
async def _log_threat(self, entry: Dict[str, Any]):
"""Log a detected threat to the alerts file."""
alert_file = Path("/tmp/secubox/waf-alerts.json")
def _log_threat_sync(self, entry: Dict[str, Any]):
"""Synchronous threat logging for blocked requests."""
try:
alerts = []
if alert_file.exists():
alerts = json.loads(alert_file.read_text())
if self.alerts_file.exists():
try:
alerts = json.loads(self.alerts_file.read_text())
except:
alerts = []
alert_id = len(alerts) + 1
alerts.append({
@ -191,32 +314,57 @@ class DPIBuffer:
"path": entry.get("path"),
"method": entry.get("method"),
"threat_score": entry.get("threat_score"),
"categories": entry.get("threat_categories", []),
"blocked": entry.get("blocked", False),
"rule": "dpi_buffer_analysis",
})
# Keep last 1000 alerts
alerts = alerts[-1000:]
alert_file.write_text(json.dumps(alerts, indent=2))
self.alerts_file.write_text(json.dumps(alerts, indent=2))
except Exception as e:
ctx.log.error(f"Failed to log threat: {e}")
async def _log_threat(self, entry: Dict[str, Any]):
"""Log a detected threat to the alerts file."""
self._log_threat_sync(entry)
def _write_stats(self):
"""Write buffer statistics to stats file."""
try:
# Calculate threat distribution
threat_dist = {}
high_threat_count = 0
for e in self.buffer:
for cat in e.get("threat_categories", []):
threat_dist[cat] = threat_dist.get(cat, 0) + 1
if e.get("threat_score", 0) > 30:
high_threat_count += 1
# Top hosts
host_counts = {}
for e in self.buffer:
host = e.get("host", "unknown")
host_counts[host] = host_counts.get(host, 0) + 1
top_hosts = sorted(host_counts.items(), key=lambda x: x[1], reverse=True)[:10]
stats = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"),
"entries": len(self.buffer),
"max_size": self.buffer_size,
"requests_total": self.request_count,
"threats_detected": self.threat_count,
"blocked_count": self.blocked_count,
"high_threat_in_buffer": high_threat_count,
"threat_distribution": threat_dist,
"top_hosts": dict(top_hosts),
"analysis_enabled": self.analysis_enabled,
"replay_enabled": self.replay_enabled,
}
self.stats_file.write_text(json.dumps(stats, indent=2))
except Exception as e:
ctx.log.error(f"Failed to write stats: {e}")
def get_context(self, client_ip: str, window_sec: int = 60) -> list:
def get_context(self, client_ip: str, window_sec: int = 60) -> List[Dict]:
"""
Get recent requests from the same IP for context on alerts.
Used by the correlation engine to gather context around threat events.
@ -228,6 +376,70 @@ class DPIBuffer:
and now - e.get("ts", 0) < window_sec
]
def get_replay_candidates(self, client_ip: str = None,
min_threat_score: int = 0,
limit: int = 100) -> List[Dict]:
"""Get requests suitable for replay analysis."""
candidates = []
for e in self.buffer:
if client_ip and e.get("client_ip") != client_ip:
continue
if e.get("threat_score", 0) < min_threat_score:
continue
candidates.append({
"req_hash": e.get("req_hash"),
"ts": e.get("ts"),
"method": e.get("method"),
"host": e.get("host"),
"path": e.get("path"),
"threat_score": e.get("threat_score"),
"categories": e.get("threat_categories", []),
})
if len(candidates) >= limit:
break
return candidates
def queue_replay(self, req_hash: str) -> bool:
"""Queue a request for replay analysis."""
if not self.replay_enabled:
return False
# Find the request in buffer
entry = None
for e in self.buffer:
if e.get("req_hash") == req_hash:
entry = e
break
if not entry:
return False
try:
queue = []
if self.replay_queue_file.exists():
queue = json.loads(self.replay_queue_file.read_text())
# Add to queue with replay metadata
replay_entry = {
"queued_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
"original_ts": entry.get("ts"),
"method": entry.get("method"),
"host": entry.get("host"),
"path": entry.get("path"),
"headers": entry.get("headers"),
"content_preview": entry.get("content_preview"),
"req_hash": req_hash,
"status": "pending",
}
queue.append(replay_entry)
queue = queue[-100:] # Keep last 100
self.replay_queue_file.write_text(json.dumps(queue, indent=2))
return True
except Exception as e:
ctx.log.error(f"Failed to queue replay: {e}")
return False
# Mitmproxy addon instance
addons = [DPIBuffer()]