#!/bin/sh # RPCD backend for Media Flow # Provides ubus interface: luci.media-flow # Supports both nDPId (local DPI) and netifyd as data sources # nDPId provides local application detection without cloud subscription . /lib/functions.sh . /usr/share/libubox/jshn.sh HISTORY_FILE="/tmp/media-flow-history.json" STATS_DIR="/tmp/media-flow-stats" NDPID_FLOWS="/tmp/ndpid-flows.json" NDPID_APPS="/tmp/ndpid-apps.json" MEDIA_CACHE="/tmp/media-flow-ndpid-cache.json" # Streaming patterns for filtering STREAMING_PATTERN="YouTube|Netflix|Disney|Amazon|Twitch|Hbo|Hulu|Vimeo|Peacock|Paramount|Spotify|Apple.*Music|Deezer|SoundCloud|Tidal|Zoom|Teams|Meet|Discord|Skype|Webex" # Initialize storage init_storage() { mkdir -p "$STATS_DIR" [ ! -f "$HISTORY_FILE" ] && echo '[]' > "$HISTORY_FILE" } # Detect available DPI source get_dpi_source() { # Prefer nDPId if running # Check for ndpid process and either direct flows file or compat layer output if pgrep ndpid >/dev/null 2>&1; then # nDPId running - check for data files if [ -f "$NDPID_FLOWS" ] || [ -f "$MEDIA_CACHE" ] || [ -f /var/run/netifyd/status.json ]; then echo "ndpid" return fi # nDPId running but no data yet echo "ndpid" elif [ -f /var/run/netifyd/status.json ] && pgrep netifyd >/dev/null 2>&1; then echo "netifyd" else echo "none" fi } # Get nDPId flow data get_ndpid_data() { if [ -f "$NDPID_FLOWS" ]; then cat "$NDPID_FLOWS" else echo '[]' fi } # Get nDPId apps data get_ndpid_apps() { if [ -f "$NDPID_APPS" ]; then cat "$NDPID_APPS" else echo '[]' fi } # Get netifyd status data get_netifyd_data() { if [ -f /var/run/netifyd/status.json ]; then cat /var/run/netifyd/status.json else echo '{}' fi } # Build network stats from netifyd status.json build_network_stats_json() { netifyd_data="$1" # Extract interface stats from netifyd echo "$netifyd_data" | jq -c '{ interfaces: (if .stats then [.stats | to_entries[] | { name: .key, rx_bytes: (.value.ip_bytes // 0), tx_bytes: (.value.wire_bytes // 0), rx_packets: (.value.ip // 0), tx_packets: (.value.raw // 0), tcp: (.value.tcp // 0), udp: (.value.udp // 0), icmp: (.value.icmp // 0) }] else [] end), flows_active: (.flows_active // 0), flow_count: (.flow_count // 0), uptime: (.uptime // 0), agent_version: (.agent_version // 0), cpu_system: (.cpu_system // 0), cpu_user: (.cpu_user // 0), memrss_kb: (.memrss_kb // 0) }' 2>/dev/null || echo '{"interfaces":[],"flows_active":0,"flow_count":0,"uptime":0,"agent_version":0}' } case "$1" in list) cat <<-'EOF' { "status": {}, "get_active_streams": {}, "get_network_stats": {}, "get_stream_history": {"hours": 24}, "get_stats_by_service": {}, "get_stats_by_client": {}, "get_service_details": {"service": "string"}, "set_alert": {"service": "string", "threshold_hours": 4, "action": "notify"}, "delete_alert": {"alert_id": "string"}, "list_alerts": {}, "clear_history": {}, "get_settings": {}, "set_settings": {"enabled": 1, "history_retention": 7, "refresh_interval": 5}, "start_ndpid": {}, "stop_ndpid": {}, "start_netifyd": {}, "stop_netifyd": {} } EOF ;; call) case "$2" in status) init_storage # Check nDPId status ndpid_running=0 ndpid_version="unknown" ndpid_flows=0 pgrep ndpid > /dev/null 2>&1 && ndpid_running=1 if [ "$ndpid_running" = "1" ]; then ndpid_version=$(ndpid -v 2>/dev/null | head -1 | grep -oE '[0-9]+\.[0-9]+' | head -1) [ -z "$ndpid_version" ] && ndpid_version="unknown" # Check for flows from compat layer status.json if [ -f /var/run/netifyd/status.json ] && [ -s /var/run/netifyd/status.json ]; then # Read flow_count using jsonfilter (busybox-friendly) ndpid_flows=$(jsonfilter -i /var/run/netifyd/status.json -e '@.flow_count' 2>/dev/null) [ -z "$ndpid_flows" ] && ndpid_flows=$(jsonfilter -i /var/run/netifyd/status.json -e '@.flows_active' 2>/dev/null) [ -z "$ndpid_flows" ] && ndpid_flows=0 elif [ -f "$NDPID_FLOWS" ] && [ -s "$NDPID_FLOWS" ]; then ndpid_flows=$(jsonfilter -i "$NDPID_FLOWS" -e '@[*]' 2>/dev/null | wc -l) [ -z "$ndpid_flows" ] && ndpid_flows=0 fi fi # Check netifyd status netifyd_running=0 pgrep netifyd > /dev/null 2>&1 && netifyd_running=1 netifyd_data=$(get_netifyd_data) netifyd_flows=0 netifyd_version="unknown" if [ "$netifyd_running" = "1" ] && [ -f /var/run/netifyd/status.json ] && [ -s /var/run/netifyd/status.json ]; then netifyd_flows=$(jsonfilter -i /var/run/netifyd/status.json -e '@.flows_active' 2>/dev/null) [ -z "$netifyd_flows" ] && netifyd_flows=$(jsonfilter -i /var/run/netifyd/status.json -e '@.flow_count' 2>/dev/null) [ -z "$netifyd_flows" ] && netifyd_flows=0 netifyd_version=$(jsonfilter -i /var/run/netifyd/status.json -e '@.agent_version' 2>/dev/null) [ -z "$netifyd_version" ] && netifyd_version="unknown" fi # Determine active DPI source dpi_source=$(get_dpi_source) # Use flow count from active source if [ "$dpi_source" = "ndpid" ]; then flow_count=$ndpid_flows else flow_count=$netifyd_flows fi history_count=0 if [ -f "$HISTORY_FILE" ]; then history_count=$(jq 'length' "$HISTORY_FILE" 2>/dev/null) [ -z "$history_count" ] && history_count=0 fi # Check if nDPId cache has active streams active_streams=0 if [ -f "$MEDIA_CACHE" ]; then active_streams=$(jq 'length' "$MEDIA_CACHE" 2>/dev/null) [ -z "$active_streams" ] && active_streams=0 fi enabled=$(uci -q get media_flow.global.enabled 2>/dev/null || echo "1") refresh=$(uci -q get media_flow.global.refresh_interval 2>/dev/null || echo "5") cat <<-EOF { "enabled": $enabled, "module": "media-flow", "version": "0.6.0", "dpi_source": "$dpi_source", "ndpid_running": $ndpid_running, "ndpid_version": "$ndpid_version", "ndpid_flows": $ndpid_flows, "netifyd_running": $netifyd_running, "netifyd_version": "$netifyd_version", "netifyd_flows": $netifyd_flows, "active_flows": $flow_count, "active_streams": $active_streams, "history_entries": $history_count, "refresh_interval": $refresh } EOF ;; get_active_streams) init_storage dpi_source=$(get_dpi_source) streams="[]" flow_count=0 if [ "$dpi_source" = "ndpid" ]; then # Get active streams - try cache first, then parse apps file directly if [ -f "$MEDIA_CACHE" ] && [ -s "$MEDIA_CACHE" ]; then streams=$(cat "$MEDIA_CACHE" 2>/dev/null || echo "[]") elif [ -f "$NDPID_APPS" ] && [ -s "$NDPID_APPS" ]; then # Parse ndpid-apps.json directly and filter streaming services # Format is array: [{"name": "TLS.YouTube", "category": "Media", "flows": 33, "bytes": 123456}, ...] if command -v jq >/dev/null 2>&1; then # jq without regex (ONIGURUMA not available on OpenWrt) # Filter streaming services by checking if name contains known patterns streams=$(jq -c ' [.[] | select( (.name | ascii_downcase | contains("youtube")) or (.name | ascii_downcase | contains("netflix")) or (.name | ascii_downcase | contains("disney")) or (.name | ascii_downcase | contains("amazon")) or (.name | ascii_downcase | contains("twitch")) or (.name | ascii_downcase | contains("hbo")) or (.name | ascii_downcase | contains("hulu")) or (.name | ascii_downcase | contains("vimeo")) or (.name | ascii_downcase | contains("peacock")) or (.name | ascii_downcase | contains("paramount")) or (.name | ascii_downcase | contains("spotify")) or (.name | ascii_downcase | contains("applemusic")) or (.name | ascii_downcase | contains("appleitunes")) or (.name | ascii_downcase | contains("deezer")) or (.name | ascii_downcase | contains("soundcloud")) or (.name | ascii_downcase | contains("tidal")) or (.name | ascii_downcase | contains("zoom")) or (.name | ascii_downcase | contains("teams")) or (.name | ascii_downcase | contains("meet")) or (.name | ascii_downcase | contains("discord")) or (.name | ascii_downcase | contains("skype")) or (.name | ascii_downcase | contains("webex")) )] | map({ app: (if (.name | contains(".")) then (.name | split(".") | .[1:] | join(".")) else .name end), category: (.category // "Media"), flows: (.flows // 0), bytes: (.bytes // 0) }) | group_by(.app) | map({ app: .[0].app, category: .[0].category, flows: (map(.flows) | add), bytes_rx: (map(.bytes) | add), bytes_tx: 0, bandwidth: ((map(.bytes) | add) / 125 | floor), quality: (if ((map(.bytes) | add) > 100000000) then "FHD" elif ((map(.bytes) | add) > 10000000) then "HD" else "SD" end), client: "LAN" }) ' "$NDPID_APPS" 2>/dev/null) || streams="[]" else # Fallback without jq - grep names from JSON array streams="[" first=1 for name in $(jq -r '.[].name' "$NDPID_APPS" 2>/dev/null | grep -iE "$STREAMING_PATTERN"); do app_name=$(echo "$name" | sed 's/^[^.]*\.//') [ "$first" = "0" ] && streams="$streams," first=0 streams="$streams{\"app\":\"$app_name\",\"category\":\"Media\",\"flows\":1,\"bandwidth\":0,\"quality\":\"HD\",\"client\":\"LAN\"}" done streams="$streams]" fi fi # Get flow count - prefer compat layer status.json if [ -f /var/run/netifyd/status.json ] && [ -s /var/run/netifyd/status.json ]; then flow_count=$(jsonfilter -i /var/run/netifyd/status.json -e '@.flow_count' 2>/dev/null) [ -z "$flow_count" ] && flow_count=$(jsonfilter -i /var/run/netifyd/status.json -e '@.flows_active' 2>/dev/null) [ -z "$flow_count" ] && flow_count=0 elif [ -f "$NDPID_FLOWS" ] && [ -s "$NDPID_FLOWS" ]; then flow_count=$(jsonfilter -i "$NDPID_FLOWS" -e '@[*]' 2>/dev/null | wc -l) [ -z "$flow_count" ] && flow_count=0 fi note="Streams detected via nDPId local DPI" elif [ "$dpi_source" = "netifyd" ]; then if [ -f /var/run/netifyd/status.json ] && [ -s /var/run/netifyd/status.json ]; then flow_count=$(jsonfilter -i /var/run/netifyd/status.json -e '@.flows_active' 2>/dev/null) [ -z "$flow_count" ] && flow_count=$(jsonfilter -i /var/run/netifyd/status.json -e '@.flow_count' 2>/dev/null) [ -z "$flow_count" ] && flow_count=0 fi note="Application detection requires netifyd cloud subscription" else note="No DPI engine available" fi cat <<-EOF { "streams": $streams, "dpi_source": "$dpi_source", "note": "$note", "flow_count": $flow_count } EOF ;; get_network_stats) init_storage netifyd_data=$(get_netifyd_data) stats=$(build_network_stats_json "$netifyd_data") echo "{\"stats\": $stats}" ;; get_stream_history) read -r input hours=$(echo "$input" | jq -r '.hours // 24' 2>/dev/null) [ -z "$hours" ] || [ "$hours" = "null" ] && hours=24 init_storage history="[]" if [ -f "$HISTORY_FILE" ]; then history=$(jq -c '.' "$HISTORY_FILE" 2>/dev/null || echo "[]") fi cat <<-EOF {"history": $history, "hours_requested": $hours} EOF ;; get_stats_by_service) init_storage services="{}" if [ -f "$HISTORY_FILE" ] && [ -s "$HISTORY_FILE" ]; then services=$(jq -c ' group_by(.app) | map({ key: .[0].app, value: { sessions: length, total_bandwidth_kbps: (map(.bandwidth) | add // 0), total_duration_seconds: (map(.duration) | add // 0), category: .[0].category } }) | from_entries ' "$HISTORY_FILE" 2>/dev/null) || services="{}" fi cat <<-EOF {"services": $services} EOF ;; get_stats_by_client) init_storage clients="{}" if [ -f "$HISTORY_FILE" ] && [ -s "$HISTORY_FILE" ]; then clients=$(jq -c ' group_by(.client) | map({ key: .[0].client, value: { sessions: length, total_bandwidth_kbps: (map(.bandwidth) | add // 0), total_duration_seconds: (map(.duration) | add // 0), top_service: (group_by(.app) | max_by(length) | .[0].app // "unknown") } }) | from_entries ' "$HISTORY_FILE" 2>/dev/null) || clients="{}" fi cat <<-EOF {"clients": $clients} EOF ;; get_service_details) read -r input service=$(echo "$input" | jq -r '.service // ""' 2>/dev/null) init_storage result='{}' if [ -n "$service" ] && [ -f "$HISTORY_FILE" ] && [ -s "$HISTORY_FILE" ]; then result=$(jq -c --arg svc "$service" ' [.[] | select(.app == $svc)] | { service: $svc, total_sessions: length, avg_bandwidth_kbps: (if length > 0 then (map(.bandwidth) | add / length | floor) else 0 end), total_duration_seconds: (map(.duration) | add // 0), category: (.[0].category // "unknown"), typical_quality: (.[0].quality // "unknown"), recent_sessions: (.[-10:] | map({ timestamp: .timestamp, client: .client, bandwidth_kbps: .bandwidth, duration_seconds: .duration, quality: .quality })) } ' "$HISTORY_FILE" 2>/dev/null) || result='{"service":"'$service'","total_sessions":0,"avg_bandwidth_kbps":0,"total_duration_seconds":0,"category":"unknown","typical_quality":"unknown","recent_sessions":[]}' else result='{"service":"'$service'","total_sessions":0,"avg_bandwidth_kbps":0,"total_duration_seconds":0,"category":"unknown","typical_quality":"unknown","recent_sessions":[]}' fi echo "$result" ;; set_alert) read -r input service=$(echo "$input" | jq -r '.service // ""' 2>/dev/null) threshold_hours=$(echo "$input" | jq -r '.threshold_hours // 4' 2>/dev/null) action=$(echo "$input" | jq -r '.action // "notify"' 2>/dev/null) if [ -z "$service" ]; then echo '{"success": false, "message": "Service name required"}' exit 0 fi alert_id="alert_$(echo "$service" | tr -d ' ' | tr '[:upper:]' '[:lower:]' | tr -cd 'a-z0-9_')" uci -q delete "media_flow.${alert_id}" 2>/dev/null uci set "media_flow.${alert_id}=alert" uci set "media_flow.${alert_id}.service=${service}" uci set "media_flow.${alert_id}.threshold_hours=${threshold_hours}" uci set "media_flow.${alert_id}.action=${action}" uci set "media_flow.${alert_id}.enabled=1" uci commit media_flow cat <<-EOF {"success": true, "message": "Alert configured for $service", "alert_id": "$alert_id"} EOF ;; delete_alert) read -r input alert_id=$(echo "$input" | jq -r '.alert_id // ""' 2>/dev/null) if [ -z "$alert_id" ]; then echo '{"success": false, "message": "Alert ID required"}' exit 0 fi if uci -q get "media_flow.${alert_id}" >/dev/null 2>&1; then uci delete "media_flow.${alert_id}" uci commit media_flow echo '{"success": true, "message": "Alert deleted"}' else echo '{"success": false, "message": "Alert not found"}' fi ;; list_alerts) alerts="[]" # Build alerts array from UCI alerts=$(uci show media_flow 2>/dev/null | grep "=alert$" | while read -r line; do section=$(echo "$line" | cut -d. -f2 | cut -d= -f1) svc=$(uci -q get "media_flow.${section}.service") threshold=$(uci -q get "media_flow.${section}.threshold_hours") act=$(uci -q get "media_flow.${section}.action") en=$(uci -q get "media_flow.${section}.enabled") [ -z "$en" ] && en="1" cat <<-ALERT {"id":"$section","service":"$svc","threshold_hours":$threshold,"action":"$act","enabled":$en} ALERT done | jq -s '.' 2>/dev/null) || alerts="[]" [ -z "$alerts" ] || [ "$alerts" = "null" ] && alerts="[]" cat <<-EOF {"alerts": $alerts} EOF ;; clear_history) echo '[]' > "$HISTORY_FILE" echo '{"success": true, "message": "History cleared"}' ;; get_settings) enabled=$(uci -q get media_flow.global.enabled 2>/dev/null || echo "1") retention=$(uci -q get media_flow.global.history_retention 2>/dev/null || echo "7") refresh=$(uci -q get media_flow.global.refresh_interval 2>/dev/null || echo "5") cat <<-EOF { "enabled": $enabled, "history_retention": $retention, "refresh_interval": $refresh } EOF ;; set_settings) read -r input enabled=$(echo "$input" | jq -r '.enabled // 1' 2>/dev/null) retention=$(echo "$input" | jq -r '.history_retention // 7' 2>/dev/null) refresh=$(echo "$input" | jq -r '.refresh_interval // 5' 2>/dev/null) uci set media_flow.global.enabled="$enabled" uci set media_flow.global.history_retention="$retention" uci set media_flow.global.refresh_interval="$refresh" uci commit media_flow echo '{"success": true, "message": "Settings saved"}' ;; start_ndpid) # Start nDPId and compatibility layer result_ndpid=0 result_compat=0 msg="" if [ -x /etc/init.d/ndpid ]; then /etc/init.d/ndpid start 2>/dev/null && result_ndpid=1 /etc/init.d/ndpid enable 2>/dev/null fi if [ -x /etc/init.d/ndpid-compat ]; then sleep 2 /etc/init.d/ndpid-compat start 2>/dev/null && result_compat=1 /etc/init.d/ndpid-compat enable 2>/dev/null fi if [ "$result_ndpid" = "1" ]; then msg="nDPId started" [ "$result_compat" = "1" ] && msg="$msg with compatibility layer" echo "{\"success\": true, \"message\": \"$msg\"}" else echo '{"success": false, "message": "nDPId not installed or failed to start"}' fi ;; stop_ndpid) /etc/init.d/ndpid-compat stop 2>/dev/null /etc/init.d/ndpid stop 2>/dev/null echo '{"success": true, "message": "nDPId stopped"}' ;; start_netifyd) if [ -x /etc/init.d/netifyd ]; then /etc/init.d/netifyd start 2>/dev/null /etc/init.d/netifyd enable 2>/dev/null echo '{"success": true, "message": "Netifyd started"}' else echo '{"success": false, "message": "Netifyd not installed"}' fi ;; stop_netifyd) /etc/init.d/netifyd stop 2>/dev/null echo '{"success": true, "message": "Netifyd stopped"}' ;; *) cat <<-EOF {"error": -32601, "message": "Method not found: $2"} EOF ;; esac ;; esac