feat(dpi): Phase 3 - Correlation engine + timeline view

Correlation Library (correlation-lib.sh):
- IP reputation tracking with configurable decay
- Full context gathering from MITM, DPI, WAF streams
- CrowdSec decision checking and notification
- Correlation entry builder with rich context

Enhanced Correlator (dpi-correlator v2):
- Watches WAF alerts, CrowdSec decisions, DPI flows
- Auto-ban for high-reputation IPs (threshold: 80)
- Notification queue for high-severity threats
- CLI: correlate, reputation, context, search, stats

LuCI Timeline View:
- Correlation timeline with colored event cards
- IP context modal showing MITM requests + WAF alerts
- Quick ban button with CrowdSec integration
- Search by IP functionality
- Stats: total, high-threat, banned, unique IPs

RPCD Methods (8 new):
- get_correlation_stats, get_ip_context, get_ip_reputation
- get_timeline, search_correlations, ban_ip, set_auto_ban

UCI Config: auto_ban, auto_ban_threshold, notifications

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
CyberMind-FR 2026-03-15 12:26:58 +01:00
parent a24beaf316
commit 7ff9ee3805
10 changed files with 1021 additions and 143 deletions

View File

@ -5213,3 +5213,10 @@ git checkout HEAD -- index.html
- Protocol distribution, manual IP correlation
- Streamlit control panel: Added DPI Dual card
- **Dual-Stream DPI Phase 3 - Correlation Engine + Timeline (Complete)**
- Correlation library: IP reputation tracking, context gathering, CrowdSec integration
- Enhanced correlator v2: Auto-ban, notifications, CLI commands
- LuCI timeline view: Event cards, IP context modal, quick ban, search
- 8 new RPCD methods for correlation access and control

View File

@ -581,35 +581,32 @@ _Last updated: 2026-03-15 (Wall Colorsets)_
### 2026-03-15
- **Dual-Stream DPI Architecture (Phase 2 Complete)**
- New `secubox-dpi-dual` package implementing parallel MITM + Passive TAP DPI
- **Dual-Stream DPI Architecture (Phase 3 Complete)**
- Architecture doc: `package/secubox/DUAL-STREAM-DPI.md`
- **Phase 1 - TAP Stream (Passive)**:
- `mirror-setup.sh`: tc mirred port mirroring (ingress + egress)
- `dpi-flow-collector`: Aggregates netifyd stats → `/tmp/secubox/dpi-flows.json`
- `dpi-correlator`: Matches MITM + TAP events, CrowdSec integration
- `dpi-dualctl`: CLI start/stop/status/flows/threats/mirror
- `init.d/dpi-dual`: Procd service for flow-collector + correlator
- **Phase 2 - MITM Double Buffer + LuCI**:
- Enhanced `dpi_buffer.py` mitmproxy addon:
- Compiled regex patterns for 6 threat categories (path_traversal, xss, sqli, lfi, rce, ssrf)
- Scanner detection (sqlmap, nikto, nmap, etc.)
- Optional blocking mode for high-score threats
- Request replay queue for forensic analysis
- Rate limiting detection
- Stats: buffer entries, threat distribution, top hosts
- **LuCI Dashboard** (`luci-app-dpi-dual`):
- RPCD handler with 10 methods (status, flows, buffer, threats, correlation, start/stop/restart, replay, correlate)
- KISS-themed overview with stream status cards
- LED indicators for MITM/TAP/Correlation running state
- Metrics: buffer entries, threats, blocked, flows/min, RX/TX bytes
- Threats table with timestamp, IP, host, path, categories, score, blocked status
- Protocol distribution from netifyd
- Manual IP correlation trigger
- ACL permissions for read/write
- **Streamlit Control Panel** updated:
- DPI Dual card with flows/min, threats, blocked metrics
- Reads from dpi-buffer.json and dpi-flows.json caches
- **Phase 1 - TAP Stream**: tc mirred, flow-collector, dpi-dualctl CLI
- **Phase 2 - MITM Double Buffer**: Enhanced dpi_buffer.py, LuCI dashboard
- **Phase 3 - Correlation Engine + Integration**:
- **Correlation Library** (`correlation-lib.sh`):
- IP reputation tracking with score decay
- Full context gathering (MITM, DPI, WAF)
- CrowdSec decision checking and notification
- Correlation entry builder with all stream context
- **Enhanced Correlator** (`dpi-correlator v2`):
- Watches WAF alerts, CrowdSec decisions, DPI flows
- Auto-ban for high-reputation IPs (configurable threshold)
- Notification queue for high-severity threats
- CLI: correlate, reputation, context, search, stats
- **LuCI Timeline View** (`timeline.js`):
- Correlation timeline with event cards
- IP context modal (MITM requests, WAF alerts)
- Quick ban button with CrowdSec integration
- Search by IP functionality
- Stats: total, high-threat, banned, unique IPs
- **RPCD Methods** (8 new):
- get_correlation_stats, get_ip_context, get_ip_reputation
- get_timeline, search_correlations
- ban_ip, set_auto_ban
- **UCI Config**: auto_ban, auto_ban_threshold, notifications, reputation_decay
---

View File

@ -0,0 +1,332 @@
'use strict';
'require view';
'require dom';
'require poll';
'require rpc';
'require ui';
var callGetTimeline = rpc.declare({
object: 'luci.dpi-dual',
method: 'get_timeline',
params: ['limit'],
expect: {}
});
var callGetCorrelationStats = rpc.declare({
object: 'luci.dpi-dual',
method: 'get_correlation_stats',
expect: {}
});
var callGetIPContext = rpc.declare({
object: 'luci.dpi-dual',
method: 'get_ip_context',
params: ['ip'],
expect: {}
});
var callSearchCorrelations = rpc.declare({
object: 'luci.dpi-dual',
method: 'search_correlations',
params: ['ip', 'limit'],
expect: {}
});
var callBanIP = rpc.declare({
object: 'luci.dpi-dual',
method: 'ban_ip',
params: ['ip', 'duration'],
expect: {}
});
function formatTime(ts) {
if (!ts) return '-';
var d = new Date(ts);
return d.toLocaleString();
}
function getEventColor(event_type) {
switch (event_type) {
case 'waf_block': return '#ff4d4d';
case 'waf_alert': return '#ffa500';
case 'crowdsec_ban': return '#ff6b6b';
case 'dpi_threat': return '#00a0ff';
case 'scanner': return '#ff00ff';
default: return '#808090';
}
}
function getScoreColor(score) {
if (score >= 70) return '#ff4d4d';
if (score >= 40) return '#ffa500';
return '#00d4aa';
}
function createStatCard(label, value, color) {
return E('div', {
'style': 'background:#1a1a24;padding:1rem;border-radius:8px;text-align:center;min-width:120px;'
}, [
E('div', {
'style': 'font-size:2rem;font-weight:700;color:' + (color || '#00d4aa') + ';font-family:monospace;'
}, String(value)),
E('div', {
'style': 'font-size:0.8rem;color:#808090;text-transform:uppercase;margin-top:4px;'
}, label)
]);
}
function createTimelineEntry(entry, self) {
var color = getEventColor(entry.event_type);
var scoreColor = getScoreColor(entry.threat_score || 0);
return E('div', {
'class': 'timeline-entry',
'style': 'background:#12121a;border-radius:8px;padding:1rem;margin:0.5rem 0;' +
'border-left:4px solid ' + color + ';position:relative;'
}, [
// Timeline dot
E('div', {
'style': 'position:absolute;left:-8px;top:50%;transform:translateY(-50%);' +
'width:12px;height:12px;border-radius:50%;background:' + color + ';' +
'box-shadow:0 0 8px ' + color + ';'
}),
// Header row
E('div', {
'style': 'display:flex;justify-content:space-between;align-items:center;margin-bottom:0.5rem;'
}, [
E('div', { 'style': 'display:flex;align-items:center;gap:10px;' }, [
E('span', {
'style': 'font-family:monospace;font-size:1.1rem;color:#00a0ff;cursor:pointer;',
'click': function() {
self.showIPContext(entry.ip);
}
}, entry.ip || '-'),
E('span', {
'style': 'background:' + color + '22;color:' + color +
';padding:2px 8px;border-radius:10px;font-size:0.75rem;font-weight:600;'
}, entry.event_type || 'unknown')
]),
E('span', { 'style': 'color:#808090;font-size:0.8rem;' }, formatTime(entry.timestamp))
]),
// Details row
E('div', { 'style': 'display:flex;gap:1rem;flex-wrap:wrap;' }, [
E('div', {}, [
E('span', { 'style': 'color:#606070;font-size:0.75rem;' }, 'Reason: '),
E('span', { 'style': 'color:#fff;' }, entry.reason || '-')
]),
E('div', {}, [
E('span', { 'style': 'color:#606070;font-size:0.75rem;' }, 'Threat: '),
E('span', {
'style': 'color:' + scoreColor + ';font-weight:600;'
}, String(entry.threat_score || 0))
]),
E('div', {}, [
E('span', { 'style': 'color:#606070;font-size:0.75rem;' }, 'Reputation: '),
E('span', { 'style': 'color:#ffa500;font-weight:600;' }, String(entry.reputation_score || 0))
]),
entry.crowdsec_status === 'banned' ?
E('span', {
'style': 'background:#ff4d4d22;color:#ff4d4d;padding:2px 8px;border-radius:10px;font-size:0.75rem;'
}, '🚫 BANNED') : null
].filter(Boolean)),
// Action buttons
E('div', { 'style': 'margin-top:0.5rem;display:flex;gap:8px;' }, [
E('button', {
'class': 'btn cbi-button',
'style': 'font-size:0.75rem;padding:4px 10px;',
'click': function() {
self.showIPContext(entry.ip);
}
}, '🔍 Context'),
entry.crowdsec_status !== 'banned' ?
E('button', {
'class': 'btn cbi-button cbi-button-negative',
'style': 'font-size:0.75rem;padding:4px 10px;',
'click': function() {
self.banIP(entry.ip);
}
}, '🚫 Ban') : null
].filter(Boolean))
]);
}
return view.extend({
selectedIP: null,
load: function() {
return Promise.all([
callGetTimeline(50).catch(function() { return { entries: [] }; }),
callGetCorrelationStats().catch(function() { return {}; })
]);
},
showIPContext: function(ip) {
var self = this;
ui.showModal('IP Context: ' + ip, [
E('div', { 'style': 'min-width:500px;' }, [
E('div', { 'id': 'ip-context-loading', 'style': 'text-align:center;padding:2rem;' },
E('em', {}, 'Loading context...'))
]),
E('div', { 'class': 'right' }, [
E('button', {
'class': 'btn',
'click': ui.hideModal
}, 'Close')
])
]);
callGetIPContext(ip).then(function(ctx) {
var content = E('div', {}, [
E('div', { 'style': 'margin-bottom:1rem;' }, [
E('strong', {}, 'IP: '),
E('span', { 'style': 'font-family:monospace;color:#00a0ff;' }, ctx.ip || ip),
E('span', { 'style': 'margin-left:1rem;' }, [
E('strong', {}, 'Reputation: '),
E('span', {
'style': 'color:' + getScoreColor(ctx.reputation_score || 0) + ';font-weight:600;'
}, String(ctx.reputation_score || 0))
]),
E('span', { 'style': 'margin-left:1rem;' }, [
E('strong', {}, 'CrowdSec: '),
E('span', {
'style': 'color:' + (ctx.crowdsec_status === 'banned' ? '#ff4d4d' : '#00d4aa') + ';'
}, ctx.crowdsec_status || 'clean')
])
]),
ctx.context && ctx.context.mitm_requests && ctx.context.mitm_requests.length > 0 ?
E('div', { 'style': 'margin-bottom:1rem;' }, [
E('h4', { 'style': 'color:#00d4aa;margin-bottom:0.5rem;' }, 'MITM Requests'),
E('div', { 'style': 'max-height:150px;overflow-y:auto;background:#1a1a24;padding:8px;border-radius:6px;' },
ctx.context.mitm_requests.map(function(req) {
return E('div', { 'style': 'font-family:monospace;font-size:0.8rem;margin:2px 0;' },
(req.method || 'GET') + ' ' + (req.host || '') + (req.path || '/'));
})
)
]) : null,
ctx.context && ctx.context.waf_alerts && ctx.context.waf_alerts.length > 0 ?
E('div', { 'style': 'margin-bottom:1rem;' }, [
E('h4', { 'style': 'color:#ffa500;margin-bottom:0.5rem;' }, 'WAF Alerts'),
E('div', { 'style': 'max-height:150px;overflow-y:auto;background:#1a1a24;padding:8px;border-radius:6px;' },
ctx.context.waf_alerts.map(function(alert) {
return E('div', { 'style': 'font-family:monospace;font-size:0.8rem;margin:2px 0;' },
'[Score: ' + (alert.threat_score || 0) + '] ' + (alert.path || '/'));
})
)
]) : null
].filter(Boolean));
var loadingEl = document.getElementById('ip-context-loading');
if (loadingEl) {
loadingEl.parentNode.replaceChild(content, loadingEl);
}
});
},
banIP: function(ip) {
var self = this;
if (!confirm('Ban IP ' + ip + ' for 4 hours?')) {
return;
}
callBanIP(ip, '4h').then(function(res) {
if (res.success) {
ui.addNotification(null, E('p', res.message), 'info');
window.location.reload();
} else {
ui.addNotification(null, E('p', 'Error: ' + (res.error || 'Unknown')), 'error');
}
});
},
render: function(data) {
var timeline = data[0] || {};
var stats = data[1] || {};
var self = this;
var entries = timeline.entries || [];
var view = E('div', { 'class': 'cbi-map', 'style': 'background:#0a0a12;min-height:100vh;' }, [
// Header
E('div', { 'style': 'text-align:center;padding:1rem 0;' }, [
E('h1', {
'style': 'font-size:1.8rem;font-weight:700;background:linear-gradient(90deg,#ffa500,#ff6b6b);' +
'-webkit-background-clip:text;-webkit-text-fill-color:transparent;margin:0;'
}, 'Correlation Timeline'),
E('div', { 'style': 'color:#606070;margin-top:4px;' },
'Threat correlation across MITM + TAP streams')
]),
// Stats row
E('div', {
'style': 'display:flex;gap:1rem;justify-content:center;flex-wrap:wrap;margin-bottom:1.5rem;'
}, [
createStatCard('Total', stats.total_correlations || 0, '#00d4aa'),
createStatCard('High Threat', stats.high_threat_count || 0, '#ff4d4d'),
createStatCard('Banned IPs', stats.banned_ips || 0, '#ff6b6b'),
createStatCard('Unique IPs', stats.unique_ips || 0, '#00a0ff')
]),
// Search bar
E('div', { 'style': 'display:flex;gap:8px;justify-content:center;margin-bottom:1rem;' }, [
E('input', {
'type': 'text',
'id': 'search-ip',
'placeholder': 'Search by IP...',
'style': 'background:#1a1a24;border:1px solid #2a2a3a;border-radius:6px;' +
'padding:8px 12px;color:#fff;width:200px;'
}),
E('button', {
'class': 'btn cbi-button cbi-button-apply',
'click': function() {
var ip = document.getElementById('search-ip').value;
if (ip) {
callSearchCorrelations(ip, 50).then(function(res) {
ui.addNotification(null, E('p', 'Found ' + (res.results || []).length + ' correlations'), 'info');
});
}
}
}, '🔍 Search')
]),
// Timeline
E('div', { 'style': 'max-width:800px;margin:0 auto;padding:0 1rem;position:relative;' }, [
// Vertical line
E('div', {
'style': 'position:absolute;left:calc(1rem - 2px);top:0;bottom:0;width:4px;' +
'background:linear-gradient(to bottom,#2a2a3a,transparent);border-radius:2px;'
}),
// Timeline entries
E('div', { 'style': 'padding-left:1.5rem;' },
entries.length > 0 ?
entries.reverse().map(function(entry) {
return createTimelineEntry(entry, self);
}) :
E('div', { 'style': 'text-align:center;color:#606070;padding:2rem;' },
'No correlation events yet')
)
])
]);
// Auto-refresh
poll.add(L.bind(function() {
return callGetTimeline(50).then(function() {
// Refresh handled by poll
});
}, this), 30);
return view;
},
handleSaveApply: null,
handleSave: null,
handleReset: null
});

View File

@ -30,12 +30,19 @@ case "$1" in
"get_buffer": {"limit": 100},
"get_threats": {"limit": 50},
"get_correlation": {"limit": 20},
"get_correlation_stats": {},
"get_ip_context": {"ip": "string"},
"get_ip_reputation": {"ip": "string"},
"get_timeline": {"limit": 50},
"get_mirror_status": {},
"search_correlations": {"ip": "string", "limit": 50},
"start": {},
"stop": {},
"restart": {},
"replay_request": {"req_hash": "string"},
"correlate_ip": {"ip": "string"}
"correlate_ip": {"ip": "string"},
"ban_ip": {"ip": "string", "duration": "string"},
"set_auto_ban": {"enabled": true}
}
EOF
;;
@ -244,6 +251,113 @@ EOF
fi
;;
get_correlation_stats)
/usr/sbin/dpi-correlator stats 2>/dev/null || echo '{"total_correlations": 0}'
;;
get_ip_context)
read "$3"
json_load "$REPLY"
json_get_var ip ip ""
if [ -z "$ip" ]; then
echo '{"error": "IP required"}'
else
/usr/sbin/dpi-correlator context "$ip" 2>/dev/null || echo '{"error": "Context not available"}'
fi
;;
get_ip_reputation)
read "$3"
json_load "$REPLY"
json_get_var ip ip ""
if [ -z "$ip" ]; then
echo '{"error": "IP required"}'
else
. /usr/lib/dpi-dual/correlation-lib.sh
init_reputation_db
local score
score=$(get_ip_reputation "$ip")
echo "{\"ip\": \"$ip\", \"reputation_score\": $score}"
fi
;;
get_timeline)
read "$3"
json_load "$REPLY"
json_get_var limit limit 50
local log_file="/tmp/secubox/correlated-threats.json"
if [ -f "$log_file" ]; then
local total
total=$(wc -l < "$log_file" 2>/dev/null || echo 0)
# Get last N entries as JSON array
local entries
entries=$(tail -"$limit" "$log_file" 2>/dev/null | \
awk 'BEGIN { printf "[" }
{ if (NR > 1) printf ","; print }
END { printf "]" }')
cat << EOF
{
"total": $total,
"limit": $limit,
"entries": $entries
}
EOF
else
echo '{"total": 0, "limit": '$limit', "entries": []}'
fi
;;
search_correlations)
read "$3"
json_load "$REPLY"
json_get_var ip ip ""
json_get_var limit limit 50
local results
results=$(/usr/sbin/dpi-correlator search "$ip" "$limit" 2>/dev/null | \
awk 'BEGIN { printf "[" }
{ if (NR > 1) printf ","; print }
END { printf "]" }')
echo "{\"results\": $results}"
;;
ban_ip)
read "$3"
json_load "$REPLY"
json_get_var ip ip ""
json_get_var duration duration "4h"
if [ -z "$ip" ]; then
echo '{"success": false, "error": "IP required"}'
else
if command -v cscli >/dev/null 2>&1; then
cscli decisions add -i "$ip" -d "$duration" -r "dpi-dual-manual" -t ban >/dev/null 2>&1
echo '{"success": true, "message": "IP '"$ip"' banned for '"$duration"'"}'
else
echo '{"success": false, "error": "CrowdSec not available"}'
fi
fi
;;
set_auto_ban)
read "$3"
json_load "$REPLY"
json_get_var enabled enabled "0"
local val="0"
[ "$enabled" = "true" ] && val="1"
uci set dpi-dual.correlation.auto_ban="$val"
uci commit dpi-dual
echo '{"success": true, "auto_ban": '$val'}'
;;
*)
echo '{"error": "Unknown method"}'
;;

View File

@ -3,12 +3,28 @@
"title": "DPI Dual-Stream",
"order": 45,
"action": {
"type": "view",
"path": "dpi-dual/overview"
"type": "firstchildview",
"recurse": true
},
"depends": {
"acl": ["luci-app-dpi-dual"],
"uci": { "dpi-dual": true }
}
},
"admin/secubox/dpi-dual/overview": {
"title": "Overview",
"order": 1,
"action": {
"type": "view",
"path": "dpi-dual/overview"
}
},
"admin/secubox/dpi-dual/timeline": {
"title": "Correlation Timeline",
"order": 2,
"action": {
"type": "view",
"path": "dpi-dual/timeline"
}
}
}

View File

@ -9,7 +9,12 @@
"get_buffer",
"get_threats",
"get_correlation",
"get_mirror_status"
"get_correlation_stats",
"get_ip_context",
"get_ip_reputation",
"get_timeline",
"get_mirror_status",
"search_correlations"
]
},
"uci": ["dpi-dual"]
@ -21,7 +26,9 @@
"stop",
"restart",
"replay_request",
"correlate_ip"
"correlate_ip",
"ban_ip",
"set_auto_ban"
]
},
"uci": ["dpi-dual"]

View File

@ -43,6 +43,7 @@ define Package/secubox-dpi-dual/install
$(INSTALL_DIR) $(1)/usr/lib/dpi-dual
$(INSTALL_BIN) ./files/usr/lib/dpi-dual/mirror-setup.sh $(1)/usr/lib/dpi-dual/
$(INSTALL_DATA) ./files/usr/lib/dpi-dual/correlation-lib.sh $(1)/usr/lib/dpi-dual/
$(INSTALL_DIR) $(1)/srv/mitmproxy/addons
$(INSTALL_DATA) ./files/srv/mitmproxy/addons/dpi_buffer.py $(1)/srv/mitmproxy/addons/

View File

@ -25,3 +25,7 @@ config correlation 'correlation'
option window '60'
option output '/tmp/secubox/correlated-threats.json'
option watch_crowdsec '1'
option auto_ban '0'
option auto_ban_threshold '80'
option notifications '1'
option reputation_decay '5'

View File

@ -0,0 +1,274 @@
#!/bin/sh
# Correlation Library for DPI Dual-Stream
# Shared functions for event matching and threat correlation
# Paths
STATS_DIR="/tmp/secubox"
FLOW_DIR="/tmp/dpi-flows"
BUFFER_DIR="/tmp/dpi-buffer"
REPUTATION_DB="$STATS_DIR/ip-reputation.json"
CORRELATION_LOG="$STATS_DIR/correlated-threats.json"
WAF_ALERTS="$STATS_DIR/waf-alerts.json"
MITM_LOG="/var/log/mitmproxy/access.log"
# Initialize reputation database
init_reputation_db() {
[ ! -f "$REPUTATION_DB" ] && echo '{}' > "$REPUTATION_DB"
}
# Get IP reputation score (0-100, higher = more suspicious)
get_ip_reputation() {
local ip="$1"
init_reputation_db
local score
score=$(jsonfilter -i "$REPUTATION_DB" -e "@[\"$ip\"].score" 2>/dev/null)
echo "${score:-0}"
}
# Update IP reputation based on event
update_ip_reputation() {
local ip="$1"
local event_type="$2" # threat, alert, block, clean
local delta="$3" # score change
init_reputation_db
local current_score last_seen event_count
current_score=$(jsonfilter -i "$REPUTATION_DB" -e "@[\"$ip\"].score" 2>/dev/null || echo 0)
event_count=$(jsonfilter -i "$REPUTATION_DB" -e "@[\"$ip\"].events" 2>/dev/null || echo 0)
# Calculate new score
local new_score=$((current_score + delta))
[ "$new_score" -lt 0 ] && new_score=0
[ "$new_score" -gt 100 ] && new_score=100
event_count=$((event_count + 1))
# Build updated entry
local tmp_file="/tmp/reputation_update_$$.json"
cat "$REPUTATION_DB" > "$tmp_file"
# Update using shell (jsonfilter is read-only)
local now
now=$(date -Iseconds)
# Simple JSON update - replace or add entry
if grep -q "\"$ip\"" "$tmp_file"; then
# Update existing - this is simplified, full implementation would use jq
sed -i "s/\"$ip\":{[^}]*}/\"$ip\":{\"score\":$new_score,\"events\":$event_count,\"last_event\":\"$event_type\",\"updated\":\"$now\"}/" "$tmp_file"
else
# Add new entry
sed -i "s/^{/{\n\"$ip\":{\"score\":$new_score,\"events\":$event_count,\"last_event\":\"$event_type\",\"updated\":\"$now\"},/" "$tmp_file"
fi
mv "$tmp_file" "$REPUTATION_DB"
}
# Get MITM context for IP (recent requests)
get_mitm_context() {
local ip="$1"
local count="${2:-10}"
local window="${3:-300}" # seconds
local result="[]"
if [ -f "$MITM_LOG" ]; then
# Get recent entries from MITM log
local now
now=$(date +%s)
result=$(grep "$ip" "$MITM_LOG" 2>/dev/null | tail -"$count" | \
awk -F'\t' -v now="$now" -v window="$window" '
BEGIN { printf "[" }
{
if (NR > 1) printf ","
printf "{\"method\":\"%s\",\"host\":\"%s\",\"path\":\"%s\",\"status\":\"%s\"}",
$2, $3, $4, $5
}
END { printf "]" }
')
fi
echo "$result"
}
# Get DPI flow context for IP
get_dpi_context() {
local ip="$1"
local count="${2:-5}"
local result="[]"
if [ -d "$FLOW_DIR" ]; then
result=$(find "$FLOW_DIR" -name "*.json" -mmin -5 -exec grep -l "$ip" {} \; 2>/dev/null | \
head -"$count" | xargs cat 2>/dev/null | \
awk 'BEGIN { printf "[" }
{ if (NR > 1) printf ","; print }
END { printf "]" }')
fi
[ -z "$result" ] && result="[]"
echo "$result"
}
# Get WAF alert context for IP
get_waf_context() {
local ip="$1"
local count="${2:-10}"
if [ -f "$WAF_ALERTS" ]; then
jsonfilter -i "$WAF_ALERTS" -e "@[*]" 2>/dev/null | \
grep "\"client_ip\":\"$ip\"" | \
tail -"$count" | \
awk 'BEGIN { printf "[" }
{ if (NR > 1) printf ","; print }
END { printf "]" }'
else
echo "[]"
fi
}
# Get buffer context for IP (from mitmproxy double buffer)
get_buffer_context() {
local ip="$1"
local count="${2:-20}"
local buffer_file="$BUFFER_DIR/entries.jsonl"
if [ -f "$buffer_file" ]; then
grep "\"client_ip\":\"$ip\"" "$buffer_file" 2>/dev/null | \
tail -"$count" | \
awk 'BEGIN { printf "[" }
{ if (NR > 1) printf ","; print }
END { printf "]" }'
else
echo "[]"
fi
}
# Check if IP is in CrowdSec decisions
check_crowdsec_decision() {
local ip="$1"
if command -v cscli >/dev/null 2>&1; then
local decision
decision=$(cscli decisions list -o json 2>/dev/null | jsonfilter -e "@[*]" 2>/dev/null | grep "\"value\":\"$ip\"" | head -1)
if [ -n "$decision" ]; then
echo "banned"
return 0
fi
fi
echo "clean"
return 1
}
# Notify CrowdSec about a threat (for potential ban)
notify_crowdsec() {
local ip="$1"
local reason="$2"
local duration="${3:-4h}"
if command -v cscli >/dev/null 2>&1; then
# Add a decision manually
cscli decisions add -i "$ip" -d "$duration" -r "$reason" -t ban >/dev/null 2>&1
return $?
fi
return 1
}
# Build full correlation entry
build_correlation_entry() {
local ip="$1"
local event_type="$2"
local reason="$3"
local threat_score="${4:-0}"
local mitm_ctx dpi_ctx waf_ctx reputation cs_status
mitm_ctx=$(get_mitm_context "$ip" 10)
dpi_ctx=$(get_dpi_context "$ip" 5)
waf_ctx=$(get_waf_context "$ip" 10)
reputation=$(get_ip_reputation "$ip")
cs_status=$(check_crowdsec_decision "$ip")
cat << EOF
{
"ip": "$ip",
"timestamp": "$(date -Iseconds)",
"event_type": "$event_type",
"reason": "$reason",
"threat_score": $threat_score,
"reputation_score": $reputation,
"crowdsec_status": "$cs_status",
"context": {
"mitm_requests": $mitm_ctx,
"dpi_flows": $dpi_ctx,
"waf_alerts": $waf_ctx
}
}
EOF
}
# Save correlation entry to log
save_correlation() {
local entry="$1"
mkdir -p "$(dirname "$CORRELATION_LOG")"
# Append to JSONL file (one JSON object per line)
echo "$entry" >> "$CORRELATION_LOG"
# Rotate if too large (keep last 10000 entries)
local lines
lines=$(wc -l < "$CORRELATION_LOG" 2>/dev/null || echo 0)
if [ "$lines" -gt 10000 ]; then
tail -5000 "$CORRELATION_LOG" > "$CORRELATION_LOG.tmp"
mv "$CORRELATION_LOG.tmp" "$CORRELATION_LOG"
fi
}
# Get correlation summary stats
get_correlation_stats() {
local total=0 high_threat=0 banned=0 unique_ips=0
if [ -f "$CORRELATION_LOG" ]; then
total=$(wc -l < "$CORRELATION_LOG")
high_threat=$(grep -c '"threat_score":[5-9][0-9]' "$CORRELATION_LOG" 2>/dev/null || echo 0)
banned=$(grep -c '"crowdsec_status":"banned"' "$CORRELATION_LOG" 2>/dev/null || echo 0)
unique_ips=$(cut -d'"' -f4 "$CORRELATION_LOG" | sort -u | wc -l)
fi
cat << EOF
{
"total_correlations": $total,
"high_threat_count": $high_threat,
"banned_ips": $banned,
"unique_ips": $unique_ips,
"updated": "$(date -Iseconds)"
}
EOF
}
# Search correlations by IP or time range
search_correlations() {
local ip="$1"
local since="$2" # ISO timestamp
local limit="${3:-50}"
if [ -f "$CORRELATION_LOG" ]; then
if [ -n "$ip" ]; then
grep "\"ip\":\"$ip\"" "$CORRELATION_LOG" | tail -"$limit"
elif [ -n "$since" ]; then
# Simple time filter - assumes entries are chronological
awk -v since="$since" '
$0 ~ since || found { found=1; print }
' "$CORRELATION_LOG" | tail -"$limit"
else
tail -"$limit" "$CORRELATION_LOG"
fi
fi
}

View File

@ -1,184 +1,310 @@
#!/bin/sh
# DPI Correlator - Matches events from MITM and TAP streams
# DPI Correlator v2 - Enhanced event matching and threat correlation
# Part of secubox-dpi-dual package
. /lib/functions.sh
. /usr/lib/dpi-dual/correlation-lib.sh
config_load dpi-dual
MITM_LOG=""
DPI_FLOWS=""
CORRELATED=""
# Configuration
WINDOW=""
WATCH_CROWDSEC=""
AUTO_BAN=""
AUTO_BAN_THRESHOLD=""
NOTIFICATION_ENABLED=""
load_config() {
config_get MITM_LOG mitm log_file "/var/log/mitmproxy/access.log"
config_get DPI_FLOWS settings flow_dir "/tmp/dpi-flows"
config_get CORRELATED correlation output "/tmp/secubox/correlated-threats.json"
config_get WINDOW correlation window "60"
config_get WATCH_CROWDSEC correlation watch_crowdsec "1"
config_get AUTO_BAN correlation auto_ban "0"
config_get AUTO_BAN_THRESHOLD correlation auto_ban_threshold "80"
config_get NOTIFICATION_ENABLED correlation notifications "1"
}
init_files() {
local corr_dir
corr_dir=$(dirname "$CORRELATED")
mkdir -p "$corr_dir"
[ ! -f "$CORRELATED" ] && echo "[]" > "$CORRELATED"
}
# Get recent MITM requests from an IP
get_mitm_context() {
# Process a threat event from any source
process_threat_event() {
local ip="$1"
local count="${2:-10}"
local event_type="$2"
local reason="$3"
local threat_score="${4:-50}"
if [ -f "$MITM_LOG" ]; then
grep "$ip" "$MITM_LOG" 2>/dev/null | tail -"$count" | \
awk -F'\t' '{printf "{\"ts\":\"%s\",\"method\":\"%s\",\"host\":\"%s\",\"path\":\"%s\"},", $1, $2, $3, $4}' | \
sed 's/,$//'
fi
}
[ -z "$ip" ] && return 1
# Get DPI flow info for an IP
get_dpi_context() {
local ip="$1"
echo "[$(date '+%H:%M:%S')] Processing threat: $ip ($event_type: $reason, score: $threat_score)"
if [ -d "$DPI_FLOWS" ]; then
find "$DPI_FLOWS" -name "*.json" -exec grep -l "$ip" {} \; 2>/dev/null | \
head -5 | xargs cat 2>/dev/null | \
jsonfilter -e '@' 2>/dev/null | \
tr '\n' ',' | sed 's/,$//'
fi
}
# Update IP reputation
local delta=10
case "$event_type" in
waf_block) delta=25 ;;
waf_alert) delta=15 ;;
crowdsec_ban) delta=30 ;;
dpi_threat) delta=20 ;;
scanner) delta=35 ;;
*) delta=10 ;;
esac
update_ip_reputation "$ip" "$event_type" "$delta"
# Correlate a threat event
correlate_threat() {
local ip="$1"
local timestamp="$2"
local reason="${3:-unknown}"
local mitm_ctx dpi_ctx
mitm_ctx=$(get_mitm_context "$ip")
dpi_ctx=$(get_dpi_context "$ip")
# Build correlation entry
# Build full correlation entry with context from all streams
local entry
entry=$(cat << EOF
{
"ip": "$ip",
"timestamp": "$timestamp",
"reason": "$reason",
"mitm_context": [$mitm_ctx],
"dpi_context": [$dpi_ctx],
"correlated_at": "$(date -Iseconds)"
}
EOF
)
entry=$(build_correlation_entry "$ip" "$event_type" "$reason" "$threat_score")
# Append to correlated file (keep last 1000 entries)
local tmp_file="/tmp/correlated_$$.json"
if [ -f "$CORRELATED" ]; then
# Read existing, add new, keep last 1000
(cat "$CORRELATED" 2>/dev/null | jsonfilter -e '@[*]' 2>/dev/null; echo "$entry") | \
tail -1000 | \
awk 'BEGIN{print "["} {if(NR>1)print ","; print} END{print "]"}' > "$tmp_file"
mv "$tmp_file" "$CORRELATED"
else
echo "[$entry]" > "$CORRELATED"
# Save to correlation log
save_correlation "$entry"
# Check for auto-ban
if [ "$AUTO_BAN" = "1" ]; then
local reputation
reputation=$(get_ip_reputation "$ip")
if [ "$reputation" -ge "$AUTO_BAN_THRESHOLD" ]; then
echo "[$(date '+%H:%M:%S')] Auto-banning $ip (reputation: $reputation)"
notify_crowdsec "$ip" "dpi-dual-autoban" "4h"
fi
fi
echo "Correlated threat: $ip ($reason)"
}
# Watch CrowdSec for new decisions
watch_crowdsec_decisions() {
local cs_db="/var/lib/crowdsec/data/crowdsec.db"
local last_check="/tmp/dpi-correlator-lastcheck"
[ ! -f "$cs_db" ] && return
# Get timestamp of last check
local last_ts=0
[ -f "$last_check" ] && last_ts=$(cat "$last_check")
# Query new decisions (simplified - just check recent bans)
if command -v cscli >/dev/null 2>&1; then
cscli decisions list -o json 2>/dev/null | \
jsonfilter -e '@[*].value' 2>/dev/null | \
while read -r ip; do
[ -n "$ip" ] && correlate_threat "$ip" "$(date -Iseconds)" "crowdsec_ban"
done
# Send notification if enabled
if [ "$NOTIFICATION_ENABLED" = "1" ] && [ "$threat_score" -ge 70 ]; then
send_notification "$ip" "$event_type" "$reason" "$threat_score"
fi
# Update last check timestamp
date +%s > "$last_check"
}
# Watch for WAF alerts from mitmproxy
# Watch for new WAF alerts
watch_waf_alerts() {
local waf_alerts="/tmp/secubox/waf-alerts.json"
local last_alert="/tmp/dpi-correlator-lastalert"
local last_id_file="/tmp/dpi-correlator-waf-lastid"
[ ! -f "$waf_alerts" ] && return
local last_id=0
[ -f "$last_alert" ] && last_id=$(cat "$last_alert")
[ -f "$last_id_file" ] && last_id=$(cat "$last_id_file")
# Process new alerts
# Get latest alert ID
local current_id
current_id=$(jsonfilter -i "$waf_alerts" -e '@[-1].id' 2>/dev/null || echo 0)
if [ "$current_id" -gt "$last_id" ]; then
# Get new alerts
# Process new alerts
jsonfilter -i "$waf_alerts" -e '@[*]' 2>/dev/null | while read -r alert; do
local alert_id ip reason
local alert_id ip categories score blocked
alert_id=$(echo "$alert" | jsonfilter -e '@.id' 2>/dev/null)
[ "$alert_id" -le "$last_id" ] && continue
ip=$(echo "$alert" | jsonfilter -e '@.client_ip' 2>/dev/null)
reason=$(echo "$alert" | jsonfilter -e '@.rule' 2>/dev/null || echo "waf_alert")
categories=$(echo "$alert" | jsonfilter -e '@.categories' 2>/dev/null | tr -d '[]"' | tr ',' ' ')
score=$(echo "$alert" | jsonfilter -e '@.threat_score' 2>/dev/null || echo 50)
blocked=$(echo "$alert" | jsonfilter -e '@.blocked' 2>/dev/null)
[ -n "$ip" ] && correlate_threat "$ip" "$(date -Iseconds)" "$reason"
[ -z "$ip" ] && continue
local event_type="waf_alert"
[ "$blocked" = "true" ] && event_type="waf_block"
local reason="waf:${categories:-unknown}"
process_threat_event "$ip" "$event_type" "$reason" "$score"
done
echo "$current_id" > "$last_alert"
echo "$current_id" > "$last_id_file"
fi
}
# Watch for CrowdSec decisions
watch_crowdsec_decisions() {
[ "$WATCH_CROWDSEC" != "1" ] && return
local cs_db="/var/lib/crowdsec/data/crowdsec.db"
local last_check_file="/tmp/dpi-correlator-cs-lastcheck"
[ ! -f "$cs_db" ] && return
# Rate limit: check at most every 30 seconds
local now last_check
now=$(date +%s)
last_check=$(cat "$last_check_file" 2>/dev/null || echo 0)
[ $((now - last_check)) -lt 30 ] && return
echo "$now" > "$last_check_file"
# Get recent decisions
if command -v cscli >/dev/null 2>&1; then
cscli decisions list -o json 2>/dev/null | \
jsonfilter -e '@[*]' 2>/dev/null | \
while read -r decision; do
local ip scenario
ip=$(echo "$decision" | jsonfilter -e '@.value' 2>/dev/null)
scenario=$(echo "$decision" | jsonfilter -e '@.scenario' 2>/dev/null)
[ -z "$ip" ] && continue
# Check if already correlated recently
if grep -q "\"ip\":\"$ip\".*crowdsec_ban" "$CORRELATION_LOG" 2>/dev/null | tail -1 | grep -q "$(date +%Y-%m-%d)"; then
continue
fi
process_threat_event "$ip" "crowdsec_ban" "crowdsec:$scenario" 75
done
fi
}
# Watch for high-threat DPI flows
watch_dpi_flows() {
local flow_dir="/tmp/dpi-flows"
local last_scan_file="/tmp/dpi-correlator-flow-lastscan"
[ ! -d "$flow_dir" ] && return
local now last_scan
now=$(date +%s)
last_scan=$(cat "$last_scan_file" 2>/dev/null || echo 0)
# Scan flows every 60 seconds
[ $((now - last_scan)) -lt 60 ] && return
echo "$now" > "$last_scan_file"
# Look for suspicious protocols in recent flows
find "$flow_dir" -name "*.json" -mmin -1 2>/dev/null | while read -r flow_file; do
local protocol detected_app risk_level
protocol=$(jsonfilter -i "$flow_file" -e '@.detected_protocol' 2>/dev/null)
detected_app=$(jsonfilter -i "$flow_file" -e '@.detected_app' 2>/dev/null)
# Check for risky protocols
case "$protocol" in
tor|i2p|bittorrent)
risk_level=40
;;
ssh|telnet)
# Only flag if unexpected
risk_level=20
;;
*)
risk_level=0
;;
esac
if [ "$risk_level" -gt 0 ]; then
local ip
ip=$(jsonfilter -i "$flow_file" -e '@.local_ip' 2>/dev/null)
[ -n "$ip" ] && process_threat_event "$ip" "dpi_threat" "protocol:$protocol" "$risk_level"
fi
done
}
# Send notification for high-severity threats
send_notification() {
local ip="$1"
local event_type="$2"
local reason="$3"
local score="$4"
# Write to notification queue for external consumption
local notif_file="/tmp/secubox/notifications.json"
mkdir -p "$(dirname "$notif_file")"
local notif
notif=$(cat << EOF
{"timestamp":"$(date -Iseconds)","type":"dpi_threat","severity":"high","ip":"$ip","event":"$event_type","reason":"$reason","score":$score}
EOF
)
echo "$notif" >> "$notif_file"
# Keep last 100 notifications
tail -100 "$notif_file" > "$notif_file.tmp" 2>/dev/null
mv "$notif_file.tmp" "$notif_file" 2>/dev/null
}
# Main correlation loop
run_correlator() {
load_config
init_files
init_reputation_db
echo "DPI Correlator started"
echo " Window: ${WINDOW}s"
echo " Output: $CORRELATED"
echo "DPI Correlator v2 started"
echo " Correlation window: ${WINDOW}s"
echo " Watch CrowdSec: $WATCH_CROWDSEC"
echo " Auto-ban: $AUTO_BAN (threshold: $AUTO_BAN_THRESHOLD)"
echo " Notifications: $NOTIFICATION_ENABLED"
while true; do
[ "$WATCH_CROWDSEC" = "1" ] && watch_crowdsec_decisions
watch_waf_alerts
watch_crowdsec_decisions
watch_dpi_flows
sleep 5
done
}
# CLI interface
case "$1" in
start)
run_correlator
;;
correlate)
# Manual correlation: dpi-correlator correlate <ip> [reason]
# Manual correlation: dpi-correlator correlate <ip> [event_type] [reason] [score]
load_config
init_files
[ -n "$2" ] && correlate_threat "$2" "$(date -Iseconds)" "${3:-manual}"
[ -z "$2" ] && { echo "Usage: $0 correlate <ip> [event_type] [reason] [score]"; exit 1; }
process_threat_event "$2" "${3:-manual}" "${4:-manual_request}" "${5:-50}"
;;
reputation)
# Get IP reputation: dpi-correlator reputation <ip>
[ -z "$2" ] && { echo "Usage: $0 reputation <ip>"; exit 1; }
init_reputation_db
score=$(get_ip_reputation "$2")
echo "IP: $2"
echo "Reputation Score: $score"
;;
context)
# Get full context for IP: dpi-correlator context <ip>
[ -z "$2" ] && { echo "Usage: $0 context <ip>"; exit 1; }
load_config
build_correlation_entry "$2" "context_query" "manual" 0
;;
search)
# Search correlations: dpi-correlator search [ip] [limit]
search_correlations "$2" "" "${3:-20}"
;;
stats)
get_correlation_stats
;;
status)
load_config
echo "Correlated threats: $(wc -l < "$CORRELATED" 2>/dev/null || echo 0)"
echo "Last 5 correlations:"
tail -5 "$CORRELATED" 2>/dev/null | jsonfilter -e '@[*]' 2>/dev/null || echo " (none)"
echo "=== Correlator Status ==="
if pgrep -f "dpi-correlator start" >/dev/null 2>&1; then
echo "Status: RUNNING"
else
echo "Status: STOPPED"
fi
echo ""
get_correlation_stats
;;
*)
echo "Usage: $0 {start|correlate <ip> [reason]|status}"
cat << EOF
DPI Correlator v2 - Threat Correlation Engine
Usage: $0 <command> [args]
Commands:
start Start the correlation daemon
correlate <ip> [...] Manually correlate an IP
reputation <ip> Get IP reputation score
context <ip> Get full context for an IP
search [ip] [limit] Search correlation log
stats Show correlation statistics
status Show correlator status
Configuration: /etc/config/dpi-dual (correlation section)
EOF
exit 1
;;
esac