secubox-openwrt/package/secubox/luci-app-media-flow/root/usr/bin/media-flow-ndpid-collector
CyberMind-FR 30926404dc feat(media-flow): Add nDPId integration for local DPI streaming detection
- Add media-flow-ndpid-collector script for collecting streaming data from nDPId
- Update RPCD backend to detect and use nDPId as primary DPI source
- Update frontend dashboard to show DPI source indicator (nDPId/netifyd/none)
- Add active streams table displaying real-time streaming activity
- Update init.d script to auto-detect and use best available collector
- Remove hard dependency on netifyd, make DPI engines optional
- Bump version to 0.6.0

nDPId provides local deep packet inspection without requiring cloud
subscription, enabling accurate streaming service detection (Netflix,
YouTube, Spotify, etc.) with quality estimation.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 13:51:15 +01:00

195 lines
6.1 KiB
Bash

#!/bin/sh
#
# Media Flow nDPId Collector
# Collects streaming service data from nDPId flows and stores in history
# Uses nDPId's local DPI detection (no cloud subscription required)
#
# Copyright (C) 2025 CyberMind.fr
HISTORY_FILE="/tmp/media-flow-history.json"
NDPID_FLOWS="/tmp/ndpid-flows.json"
NDPID_APPS="/tmp/ndpid-apps.json"
MEDIA_CACHE="/tmp/media-flow-ndpid-cache.json"
MAX_ENTRIES=1000
LOCK_FILE="/tmp/media-flow-ndpid-collector.lock"
# Streaming services patterns for nDPId applications
# These match nDPId's application protocol names
STREAMING_VIDEO="YouTube|Netflix|Disney|AmazonVideo|PrimeVideo|Twitch|HboMax|Hulu|Vimeo|Peacock|Paramount|Crunchyroll|DailyMotion|Vevo|Plex|AppleTV"
STREAMING_AUDIO="Spotify|AppleMusic|Deezer|SoundCloud|Tidal|Pandora|AmazonMusic|YouTubeMusic|iHeartRadio|Audible"
STREAMING_VISIO="Zoom|Teams|GoogleMeet|Discord|Skype|Webex|FaceTime|WhatsApp|Signal|Telegram|Slack|GoToMeeting"
STREAMING_ALL="${STREAMING_VIDEO}|${STREAMING_AUDIO}|${STREAMING_VISIO}"
# Check if already running
if [ -f "$LOCK_FILE" ]; then
pid=$(cat "$LOCK_FILE" 2>/dev/null)
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
exit 0
fi
fi
echo $$ > "$LOCK_FILE"
trap "rm -f $LOCK_FILE" EXIT
# Check if enabled
enabled=$(uci -q get media_flow.global.enabled 2>/dev/null || echo "1")
[ "$enabled" != "1" ] && exit 0
# Check if nDPId data is available
if [ ! -f "$NDPID_FLOWS" ]; then
# Fall back to checking if ndpid is running
if ! pgrep -x ndpid > /dev/null 2>&1; then
exit 0
fi
fi
# Initialize history file
[ ! -f "$HISTORY_FILE" ] && echo '[]' > "$HISTORY_FILE"
# Function to categorize streaming service
categorize_service() {
local app="$1"
if echo "$app" | grep -qiE "$STREAMING_VIDEO"; then
echo "video"
elif echo "$app" | grep -qiE "$STREAMING_AUDIO"; then
echo "audio"
elif echo "$app" | grep -qiE "$STREAMING_VISIO"; then
echo "visio"
else
echo "other"
fi
}
# Function to estimate quality from bandwidth
estimate_quality() {
local kbps="$1"
local category="$2"
if [ "$category" = "audio" ]; then
# Audio quality tiers (kbps)
if [ "$kbps" -lt 96 ]; then
echo "Low"
elif [ "$kbps" -lt 192 ]; then
echo "Normal"
elif [ "$kbps" -lt 320 ]; then
echo "High"
else
echo "Lossless"
fi
elif [ "$category" = "visio" ]; then
# Video call quality tiers (kbps)
if [ "$kbps" -lt 500 ]; then
echo "Audio Only"
elif [ "$kbps" -lt 1500 ]; then
echo "SD"
elif [ "$kbps" -lt 3000 ]; then
echo "HD"
else
echo "FHD"
fi
else
# Video streaming quality tiers (kbps)
if [ "$kbps" -lt 1000 ]; then
echo "SD"
elif [ "$kbps" -lt 3000 ]; then
echo "HD"
elif [ "$kbps" -lt 8000 ]; then
echo "FHD"
else
echo "4K"
fi
fi
}
# Process nDPId flows
if [ -f "$NDPID_FLOWS" ] && command -v jq >/dev/null 2>&1; then
timestamp=$(date -Iseconds)
# Extract streaming flows from nDPId data
new_entries=$(jq -c --arg ts "$timestamp" --arg pattern "$STREAMING_ALL" '
[.[] |
select(.app != null and .app != "" and .app != "Unknown") |
select(.app | test($pattern; "i")) |
select(.state == "active" or .bytes_rx > 10000) |
{
timestamp: $ts,
app: .app,
client: (.src_ip // "unknown"),
server: (.dst_ip // "unknown"),
hostname: (.hostname // null),
protocol: (.proto // "unknown"),
bytes_rx: (.bytes_rx // 0),
bytes_tx: (.bytes_tx // 0),
packets: (.packets // 0),
confidence: (.confidence // "Unknown"),
ndpi_category: (.category // "Unknown"),
flow_id: (.id // 0),
state: (.state // "active")
}
] |
# Calculate bandwidth and quality for each entry
map(. + {
duration: 1,
bandwidth: ((.bytes_rx + .bytes_tx) * 8 / 1000 | floor),
category: (
if (.app | test("YouTube|Netflix|Disney|Amazon|Twitch|Hbo|Hulu|Vimeo|Peacock|Paramount|Plex|AppleTV"; "i")) then "video"
elif (.app | test("Spotify|Apple.*Music|Deezer|SoundCloud|Tidal|Pandora|iHeart|Audible"; "i")) then "audio"
elif (.app | test("Zoom|Teams|Meet|Discord|Skype|Webex|Face.*Time|WhatsApp|Signal|Telegram|Slack"; "i")) then "visio"
else "other"
end
)
}) |
# Add quality estimation
map(. + {
quality: (
if .category == "audio" then
(if .bandwidth < 96 then "Low" elif .bandwidth < 192 then "Normal" elif .bandwidth < 320 then "High" else "Lossless" end)
elif .category == "visio" then
(if .bandwidth < 500 then "Audio Only" elif .bandwidth < 1500 then "SD" elif .bandwidth < 3000 then "HD" else "FHD" end)
else
(if .bandwidth < 1000 then "SD" elif .bandwidth < 3000 then "HD" elif .bandwidth < 8000 then "FHD" else "4K" end)
end
)
}) |
# Only include flows with significant traffic
[.[] | select(.bytes_rx > 10000 or .packets > 100)]
' "$NDPID_FLOWS" 2>/dev/null)
# Save current state to cache for frontend
if [ -n "$new_entries" ] && [ "$new_entries" != "[]" ] && [ "$new_entries" != "null" ]; then
echo "$new_entries" > "$MEDIA_CACHE"
# Merge with history (avoid duplicates by flow_id within same minute)
jq -c --argjson new "$new_entries" '
# Add new entries
. + ($new | map(del(.flow_id, .state))) |
# Remove duplicates (same client+app within 60 seconds)
unique_by(.client + .app + (.timestamp | split("T")[0])) |
# Keep only last MAX_ENTRIES
.[-'"$MAX_ENTRIES"':]
' "$HISTORY_FILE" > "${HISTORY_FILE}.tmp" 2>/dev/null && mv "${HISTORY_FILE}.tmp" "$HISTORY_FILE"
else
# No active streams, save empty cache
echo '[]' > "$MEDIA_CACHE"
fi
fi
# Also process nDPId apps file for aggregated stats
if [ -f "$NDPID_APPS" ] && command -v jq >/dev/null 2>&1; then
# Extract streaming apps from aggregated data
jq -c --arg pattern "$STREAMING_ALL" '
[.[] | select(.name | test($pattern; "i"))] |
sort_by(-.bytes) |
.[0:20]
' "$NDPID_APPS" > "/tmp/media-flow-apps.json" 2>/dev/null
fi
# Clean old entries based on retention (days)
retention=$(uci -q get media_flow.global.history_retention 2>/dev/null || echo "7")
if [ "$retention" -gt 0 ] 2>/dev/null; then
cutoff_date=$(date -d "$retention days ago" -Iseconds 2>/dev/null || date -Iseconds)
jq -c --arg cutoff "$cutoff_date" '[.[] | select(.timestamp >= $cutoff)]' "$HISTORY_FILE" > "${HISTORY_FILE}.tmp" 2>/dev/null && mv "${HISTORY_FILE}.tmp" "$HISTORY_FILE"
fi
exit 0