#!/bin/sh # DPI LAN Flow Collector - Real-time passive flow analysis # No MITM, no caching - pure nDPI flow monitoring on br-lan # Part of secubox-dpi-dual package . /lib/functions.sh config_load dpi-dual STATS_DIR="" LAN_IF="" AGGREGATE_INTERVAL="" CLIENT_RETENTION="" NETIFYD_INSTANCE="" # Real-time data files CLIENTS_FILE="" FLOWS_FILE="" PROTOCOLS_FILE="" DESTINATIONS_FILE="" load_config() { config_get STATS_DIR settings stats_dir "/tmp/secubox" config_get LAN_IF lan interface "br-lan" config_get AGGREGATE_INTERVAL lan aggregate_interval "5" config_get CLIENT_RETENTION lan client_retention "3600" config_get NETIFYD_INSTANCE lan netifyd_instance "lan" CLIENTS_FILE="$STATS_DIR/lan-clients.json" FLOWS_FILE="$STATS_DIR/lan-flows.json" PROTOCOLS_FILE="$STATS_DIR/lan-protocols.json" DESTINATIONS_FILE="$STATS_DIR/lan-destinations.json" } init_dirs() { mkdir -p "$STATS_DIR" } # Parse netifyd JSON flow events in real-time parse_flow_event() { local line="$1" # Extract flow data using jsonfilter local flow_type=$(echo "$line" | jsonfilter -e '@.type' 2>/dev/null) [ "$flow_type" != "flow" ] && return local local_ip=$(echo "$line" | jsonfilter -e '@.flow.local_ip' 2>/dev/null) local other_ip=$(echo "$line" | jsonfilter -e '@.flow.other_ip' 2>/dev/null) local proto=$(echo "$line" | jsonfilter -e '@.flow.detected_protocol_name' 2>/dev/null) local app=$(echo "$line" | jsonfilter -e '@.flow.detected_application_name' 2>/dev/null) local bytes_in=$(echo "$line" | jsonfilter -e '@.flow.local_bytes' 2>/dev/null || echo 0) local bytes_out=$(echo "$line" | jsonfilter -e '@.flow.other_bytes' 2>/dev/null || echo 0) local local_port=$(echo "$line" | jsonfilter -e '@.flow.local_port' 2>/dev/null || echo 0) local other_port=$(echo "$line" | jsonfilter -e '@.flow.other_port' 2>/dev/null || echo 0) [ -z "$local_ip" ] && return # Determine direction (LAN client -> external) local client_ip="" local dest_ip="" local dest_port="" # Check if local_ip is in LAN range (192.168.x.x, 10.x.x.x, 172.16-31.x.x) case "$local_ip" in 192.168.*|10.*|172.1[6-9].*|172.2[0-9].*|172.3[0-1].*) client_ip="$local_ip" dest_ip="$other_ip" dest_port="$other_port" ;; *) # other_ip is the LAN client client_ip="$other_ip" dest_ip="$local_ip" dest_port="$local_port" ;; esac [ -z "$client_ip" ] && return # Update real-time tracking files update_client_stats "$client_ip" "$bytes_in" "$bytes_out" "$proto" "$app" update_destination_stats "$dest_ip" "$dest_port" "$proto" "$bytes_in" "$bytes_out" update_protocol_stats "$proto" "$app" "$bytes_in" "$bytes_out" } # Update client statistics update_client_stats() { local client_ip="$1" local bytes_in="$2" local bytes_out="$3" local proto="$4" local app="$5" local timestamp=$(date +%s) local client_file="$STATS_DIR/client_${client_ip}.tmp" # Read existing stats local existing_bytes_in=0 local existing_bytes_out=0 local existing_flows=0 local first_seen=$timestamp if [ -f "$client_file" ]; then existing_bytes_in=$(jsonfilter -i "$client_file" -e '@.bytes_in' 2>/dev/null || echo 0) existing_bytes_out=$(jsonfilter -i "$client_file" -e '@.bytes_out' 2>/dev/null || echo 0) existing_flows=$(jsonfilter -i "$client_file" -e '@.flows' 2>/dev/null || echo 0) first_seen=$(jsonfilter -i "$client_file" -e '@.first_seen' 2>/dev/null || echo $timestamp) fi # Accumulate bytes_in=$((existing_bytes_in + bytes_in)) bytes_out=$((existing_bytes_out + bytes_out)) existing_flows=$((existing_flows + 1)) # Write updated stats cat > "$client_file" << EOF {"ip":"$client_ip","bytes_in":$bytes_in,"bytes_out":$bytes_out,"flows":$existing_flows,"last_proto":"$proto","last_app":"$app","first_seen":$first_seen,"last_seen":$timestamp} EOF } # Update destination statistics update_destination_stats() { local dest_ip="$1" local dest_port="$2" local proto="$3" local bytes_in="$4" local bytes_out="$5" # Skip internal destinations case "$dest_ip" in 192.168.*|10.*|172.1[6-9].*|172.2[0-9].*|172.3[0-1].*|127.*) return ;; esac local timestamp=$(date +%s) local dest_key=$(echo "${dest_ip}_${dest_port}" | tr '.:' '__') local dest_file="$STATS_DIR/dest_${dest_key}.tmp" local existing_bytes=0 local existing_hits=0 if [ -f "$dest_file" ]; then existing_bytes=$(jsonfilter -i "$dest_file" -e '@.bytes' 2>/dev/null || echo 0) existing_hits=$(jsonfilter -i "$dest_file" -e '@.hits' 2>/dev/null || echo 0) fi bytes_total=$((bytes_in + bytes_out + existing_bytes)) existing_hits=$((existing_hits + 1)) cat > "$dest_file" << EOF {"ip":"$dest_ip","port":$dest_port,"proto":"$proto","bytes":$bytes_total,"hits":$existing_hits,"last_seen":$timestamp} EOF } # Update protocol statistics update_protocol_stats() { local proto="$1" local app="$2" local bytes_in="$3" local bytes_out="$4" [ -z "$proto" ] && proto="Unknown" [ -z "$app" ] && app="Unknown" local proto_key=$(echo "${proto}_${app}" | tr ' /:' '___') local proto_file="$STATS_DIR/proto_${proto_key}.tmp" local existing_bytes=0 local existing_flows=0 if [ -f "$proto_file" ]; then existing_bytes=$(jsonfilter -i "$proto_file" -e '@.bytes' 2>/dev/null || echo 0) existing_flows=$(jsonfilter -i "$proto_file" -e '@.flows' 2>/dev/null || echo 0) fi bytes_total=$((bytes_in + bytes_out + existing_bytes)) existing_flows=$((existing_flows + 1)) cat > "$proto_file" << EOF {"protocol":"$proto","application":"$app","bytes":$bytes_total,"flows":$existing_flows} EOF } # Aggregate stats into summary JSON files aggregate_stats() { local timestamp=$(date -Iseconds) local cutoff=$(($(date +%s) - CLIENT_RETENTION)) # Aggregate clients { printf '{"timestamp":"%s","clients":[' "$timestamp" local first=1 for f in "$STATS_DIR"/client_*.tmp 2>/dev/null; do [ -f "$f" ] || continue local last_seen=$(jsonfilter -i "$f" -e '@.last_seen' 2>/dev/null || echo 0) # Skip expired entries [ "$last_seen" -lt "$cutoff" ] && { rm -f "$f"; continue; } [ $first -eq 0 ] && printf ',' cat "$f" first=0 done printf ']}' } > "$CLIENTS_FILE" # Aggregate destinations (top 100) { printf '{"timestamp":"%s","destinations":[' "$timestamp" local first=1 for f in "$STATS_DIR"/dest_*.tmp 2>/dev/null; do [ -f "$f" ] || continue local last_seen=$(jsonfilter -i "$f" -e '@.last_seen' 2>/dev/null || echo 0) [ "$last_seen" -lt "$cutoff" ] && { rm -f "$f"; continue; } [ $first -eq 0 ] && printf ',' cat "$f" first=0 done printf ']}' } > "$DESTINATIONS_FILE" # Aggregate protocols { printf '{"timestamp":"%s","protocols":[' "$timestamp" local first=1 for f in "$STATS_DIR"/proto_*.tmp 2>/dev/null; do [ -f "$f" ] || continue [ $first -eq 0 ] && printf ',' cat "$f" first=0 done printf ']}' } > "$PROTOCOLS_FILE" # Write summary flows file local total_clients=$(ls -1 "$STATS_DIR"/client_*.tmp 2>/dev/null | wc -l) local total_dests=$(ls -1 "$STATS_DIR"/dest_*.tmp 2>/dev/null | wc -l) local total_protos=$(ls -1 "$STATS_DIR"/proto_*.tmp 2>/dev/null | wc -l) # Get interface stats local rx_bytes=0 tx_bytes=0 rx_packets=0 tx_packets=0 if [ -d "/sys/class/net/$LAN_IF/statistics" ]; then rx_bytes=$(cat "/sys/class/net/$LAN_IF/statistics/rx_bytes" 2>/dev/null || echo 0) tx_bytes=$(cat "/sys/class/net/$LAN_IF/statistics/tx_bytes" 2>/dev/null || echo 0) rx_packets=$(cat "/sys/class/net/$LAN_IF/statistics/rx_packets" 2>/dev/null || echo 0) tx_packets=$(cat "/sys/class/net/$LAN_IF/statistics/tx_packets" 2>/dev/null || echo 0) fi cat > "$FLOWS_FILE" << EOF { "timestamp": "$timestamp", "mode": "lan_passive", "interface": "$LAN_IF", "active_clients": $total_clients, "unique_destinations": $total_dests, "detected_protocols": $total_protos, "rx_bytes": $rx_bytes, "tx_bytes": $tx_bytes, "rx_packets": $rx_packets, "tx_packets": $tx_packets } EOF } # Watch netifyd JSON output in real-time watch_netifyd() { local netifyd_socket="/var/run/netifyd/netifyd-${NETIFYD_INSTANCE}.sock" # Fall back to default socket if instance-specific doesn't exist [ ! -S "$netifyd_socket" ] && netifyd_socket="/var/run/netifyd/netifyd.sock" if [ -S "$netifyd_socket" ]; then echo "Connecting to netifyd socket: $netifyd_socket" # Subscribe to flow events echo '{"type":"subscribe","channel":"flow_update"}' | nc -U "$netifyd_socket" 2>/dev/null | while read -r line; do parse_flow_event "$line" done else echo "Netifyd socket not found, using /var/log/netifyd.log" # Fallback: tail the netifyd log tail -F /var/log/netifyd.log 2>/dev/null | while read -r line; do # Extract JSON from log lines case "$line" in *'{"type":"flow'*) json_part="${line#*\{}" json_part="{$json_part" parse_flow_event "$json_part" ;; esac done fi } # Background aggregation loop run_aggregator() { while true; do aggregate_stats sleep "$AGGREGATE_INTERVAL" done } run_collector() { load_config init_dirs echo "DPI LAN Flow Collector started" echo " Interface: $LAN_IF" echo " Aggregate interval: ${AGGREGATE_INTERVAL}s" echo " Client retention: ${CLIENT_RETENTION}s" echo " Stats dir: $STATS_DIR" # Initialize empty files echo '{"timestamp":"","clients":[]}' > "$CLIENTS_FILE" echo '{"timestamp":"","destinations":[]}' > "$DESTINATIONS_FILE" echo '{"timestamp":"","protocols":[]}' > "$PROTOCOLS_FILE" # Start background aggregator run_aggregator & AGGREGATOR_PID=$! trap "kill $AGGREGATOR_PID 2>/dev/null; exit 0" INT TERM # Watch netifyd in foreground watch_netifyd } status() { load_config echo "=== LAN Flow Collector Status ===" echo "Interface: $LAN_IF" if [ -f "$FLOWS_FILE" ]; then echo "" echo "Current Stats:" local active=$(jsonfilter -i "$FLOWS_FILE" -e '@.active_clients' 2>/dev/null || echo 0) local dests=$(jsonfilter -i "$FLOWS_FILE" -e '@.unique_destinations' 2>/dev/null || echo 0) local protos=$(jsonfilter -i "$FLOWS_FILE" -e '@.detected_protocols' 2>/dev/null || echo 0) echo " Active clients: $active" echo " Unique destinations: $dests" echo " Detected protocols: $protos" fi if [ -f "$CLIENTS_FILE" ]; then echo "" echo "Top Clients (by flows):" jsonfilter -i "$CLIENTS_FILE" -e '@.clients[*]' 2>/dev/null | head -5 fi } case "$1" in start) run_collector ;; status) status ;; aggregate) load_config init_dirs aggregate_stats ;; *) echo "Usage: $0 {start|status|aggregate}" exit 1 ;; esac