#!/bin/sh # # Media Flow nDPId Collector # Collects streaming service data from nDPId flows and stores in history # Uses nDPId's local DPI detection (no cloud subscription required) # # Copyright (C) 2025 CyberMind.fr HISTORY_FILE="/tmp/media-flow-history.json" NDPID_FLOWS="/tmp/ndpid-flows.json" NDPID_APPS="/tmp/ndpid-apps.json" MEDIA_CACHE="/tmp/media-flow-ndpid-cache.json" MAX_ENTRIES=1000 LOCK_FILE="/tmp/media-flow-ndpid-collector.lock" # Streaming services patterns for nDPId applications # These match nDPId's application protocol names STREAMING_VIDEO="YouTube|Netflix|Disney|AmazonVideo|PrimeVideo|Twitch|HboMax|Hulu|Vimeo|Peacock|Paramount|Crunchyroll|DailyMotion|Vevo|Plex|AppleTV" STREAMING_AUDIO="Spotify|AppleMusic|Deezer|SoundCloud|Tidal|Pandora|AmazonMusic|YouTubeMusic|iHeartRadio|Audible" STREAMING_VISIO="Zoom|Teams|GoogleMeet|Discord|Skype|Webex|FaceTime|WhatsApp|Signal|Telegram|Slack|GoToMeeting" STREAMING_ALL="${STREAMING_VIDEO}|${STREAMING_AUDIO}|${STREAMING_VISIO}" # Check if already running if [ -f "$LOCK_FILE" ]; then pid=$(cat "$LOCK_FILE" 2>/dev/null) if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then exit 0 fi fi echo $$ > "$LOCK_FILE" trap "rm -f $LOCK_FILE" EXIT # Check if enabled enabled=$(uci -q get media_flow.global.enabled 2>/dev/null || echo "1") [ "$enabled" != "1" ] && exit 0 # Check if nDPId data is available if [ ! -f "$NDPID_FLOWS" ]; then # Fall back to checking if ndpid is running if ! pgrep -x ndpid > /dev/null 2>&1; then exit 0 fi fi # Initialize history file [ ! -f "$HISTORY_FILE" ] && echo '[]' > "$HISTORY_FILE" # Function to categorize streaming service categorize_service() { local app="$1" if echo "$app" | grep -qiE "$STREAMING_VIDEO"; then echo "video" elif echo "$app" | grep -qiE "$STREAMING_AUDIO"; then echo "audio" elif echo "$app" | grep -qiE "$STREAMING_VISIO"; then echo "visio" else echo "other" fi } # Function to estimate quality from bandwidth estimate_quality() { local kbps="$1" local category="$2" if [ "$category" = "audio" ]; then # Audio quality tiers (kbps) if [ "$kbps" -lt 96 ]; then echo "Low" elif [ "$kbps" -lt 192 ]; then echo "Normal" elif [ "$kbps" -lt 320 ]; then echo "High" else echo "Lossless" fi elif [ "$category" = "visio" ]; then # Video call quality tiers (kbps) if [ "$kbps" -lt 500 ]; then echo "Audio Only" elif [ "$kbps" -lt 1500 ]; then echo "SD" elif [ "$kbps" -lt 3000 ]; then echo "HD" else echo "FHD" fi else # Video streaming quality tiers (kbps) if [ "$kbps" -lt 1000 ]; then echo "SD" elif [ "$kbps" -lt 3000 ]; then echo "HD" elif [ "$kbps" -lt 8000 ]; then echo "FHD" else echo "4K" fi fi } # Process nDPId flows if [ -f "$NDPID_FLOWS" ] && command -v jq >/dev/null 2>&1; then timestamp=$(date -Iseconds) # Extract streaming flows from nDPId data new_entries=$(jq -c --arg ts "$timestamp" --arg pattern "$STREAMING_ALL" ' [.[] | select(.app != null and .app != "" and .app != "Unknown") | select(.app | test($pattern; "i")) | select(.state == "active" or .bytes_rx > 10000) | { timestamp: $ts, app: .app, client: (.src_ip // "unknown"), server: (.dst_ip // "unknown"), hostname: (.hostname // null), protocol: (.proto // "unknown"), bytes_rx: (.bytes_rx // 0), bytes_tx: (.bytes_tx // 0), packets: (.packets // 0), confidence: (.confidence // "Unknown"), ndpi_category: (.category // "Unknown"), flow_id: (.id // 0), state: (.state // "active") } ] | # Calculate bandwidth and quality for each entry map(. + { duration: 1, bandwidth: ((.bytes_rx + .bytes_tx) * 8 / 1000 | floor), category: ( if (.app | test("YouTube|Netflix|Disney|Amazon|Twitch|Hbo|Hulu|Vimeo|Peacock|Paramount|Plex|AppleTV"; "i")) then "video" elif (.app | test("Spotify|Apple.*Music|Deezer|SoundCloud|Tidal|Pandora|iHeart|Audible"; "i")) then "audio" elif (.app | test("Zoom|Teams|Meet|Discord|Skype|Webex|Face.*Time|WhatsApp|Signal|Telegram|Slack"; "i")) then "visio" else "other" end ) }) | # Add quality estimation map(. + { quality: ( if .category == "audio" then (if .bandwidth < 96 then "Low" elif .bandwidth < 192 then "Normal" elif .bandwidth < 320 then "High" else "Lossless" end) elif .category == "visio" then (if .bandwidth < 500 then "Audio Only" elif .bandwidth < 1500 then "SD" elif .bandwidth < 3000 then "HD" else "FHD" end) else (if .bandwidth < 1000 then "SD" elif .bandwidth < 3000 then "HD" elif .bandwidth < 8000 then "FHD" else "4K" end) end ) }) | # Only include flows with significant traffic [.[] | select(.bytes_rx > 10000 or .packets > 100)] ' "$NDPID_FLOWS" 2>/dev/null) # Save current state to cache for frontend if [ -n "$new_entries" ] && [ "$new_entries" != "[]" ] && [ "$new_entries" != "null" ]; then echo "$new_entries" > "$MEDIA_CACHE" # Merge with history (avoid duplicates by flow_id within same minute) jq -c --argjson new "$new_entries" ' # Add new entries . + ($new | map(del(.flow_id, .state))) | # Remove duplicates (same client+app within 60 seconds) unique_by(.client + .app + (.timestamp | split("T")[0])) | # Keep only last MAX_ENTRIES .[-'"$MAX_ENTRIES"':] ' "$HISTORY_FILE" > "${HISTORY_FILE}.tmp" 2>/dev/null && mv "${HISTORY_FILE}.tmp" "$HISTORY_FILE" else # No active streams, save empty cache echo '[]' > "$MEDIA_CACHE" fi fi # Also process nDPId apps file for aggregated stats if [ -f "$NDPID_APPS" ] && command -v jq >/dev/null 2>&1; then # Extract streaming apps from aggregated data jq -c --arg pattern "$STREAMING_ALL" ' [.[] | select(.name | test($pattern; "i"))] | sort_by(-.bytes) | .[0:20] ' "$NDPID_APPS" > "/tmp/media-flow-apps.json" 2>/dev/null fi # Clean old entries based on retention (days) retention=$(uci -q get media_flow.global.history_retention 2>/dev/null || echo "7") if [ "$retention" -gt 0 ] 2>/dev/null; then cutoff_date=$(date -d "$retention days ago" -Iseconds 2>/dev/null || date -Iseconds) jq -c --arg cutoff "$cutoff_date" '[.[] | select(.timestamp >= $cutoff)]' "$HISTORY_FILE" > "${HISTORY_FILE}.tmp" 2>/dev/null && mv "${HISTORY_FILE}.tmp" "$HISTORY_FILE" fi exit 0