- Add media-flow-ndpid-collector script for collecting streaming data from nDPId - Update RPCD backend to detect and use nDPId as primary DPI source - Update frontend dashboard to show DPI source indicator (nDPId/netifyd/none) - Add active streams table displaying real-time streaming activity - Update init.d script to auto-detect and use best available collector - Remove hard dependency on netifyd, make DPI engines optional - Bump version to 0.6.0 nDPId provides local deep packet inspection without requiring cloud subscription, enabling accurate streaming service detection (Netflix, YouTube, Spotify, etc.) with quality estimation. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
433 lines
12 KiB
Bash
Executable File
433 lines
12 KiB
Bash
Executable File
#!/bin/sh
|
|
# RPCD backend for Media Flow
|
|
# Provides ubus interface: luci.media-flow
|
|
# Supports both nDPId (local DPI) and netifyd as data sources
|
|
# nDPId provides local application detection without cloud subscription
|
|
|
|
. /lib/functions.sh
|
|
. /usr/share/libubox/jshn.sh
|
|
|
|
HISTORY_FILE="/tmp/media-flow-history.json"
|
|
STATS_DIR="/tmp/media-flow-stats"
|
|
NDPID_FLOWS="/tmp/ndpid-flows.json"
|
|
NDPID_APPS="/tmp/ndpid-apps.json"
|
|
MEDIA_CACHE="/tmp/media-flow-ndpid-cache.json"
|
|
|
|
# Streaming patterns for filtering
|
|
STREAMING_PATTERN="YouTube|Netflix|Disney|Amazon|Twitch|Hbo|Hulu|Vimeo|Peacock|Paramount|Spotify|Apple.*Music|Deezer|SoundCloud|Tidal|Zoom|Teams|Meet|Discord|Skype|Webex"
|
|
|
|
# Initialize storage
|
|
init_storage() {
|
|
mkdir -p "$STATS_DIR"
|
|
[ ! -f "$HISTORY_FILE" ] && echo '[]' > "$HISTORY_FILE"
|
|
}
|
|
|
|
# Detect available DPI source
|
|
get_dpi_source() {
|
|
# Prefer nDPId if running and has data
|
|
if [ -f "$NDPID_FLOWS" ] && pgrep -x ndpid >/dev/null 2>&1; then
|
|
echo "ndpid"
|
|
elif [ -f /var/run/netifyd/status.json ] && pgrep -x netifyd >/dev/null 2>&1; then
|
|
echo "netifyd"
|
|
else
|
|
echo "none"
|
|
fi
|
|
}
|
|
|
|
# Get nDPId flow data
|
|
get_ndpid_data() {
|
|
if [ -f "$NDPID_FLOWS" ]; then
|
|
cat "$NDPID_FLOWS"
|
|
else
|
|
echo '[]'
|
|
fi
|
|
}
|
|
|
|
# Get nDPId apps data
|
|
get_ndpid_apps() {
|
|
if [ -f "$NDPID_APPS" ]; then
|
|
cat "$NDPID_APPS"
|
|
else
|
|
echo '[]'
|
|
fi
|
|
}
|
|
|
|
# Get netifyd status data
|
|
get_netifyd_data() {
|
|
if [ -f /var/run/netifyd/status.json ]; then
|
|
cat /var/run/netifyd/status.json
|
|
else
|
|
echo '{}'
|
|
fi
|
|
}
|
|
|
|
# Build network stats from netifyd status.json
|
|
build_network_stats_json() {
|
|
netifyd_data="$1"
|
|
|
|
# Extract interface stats from netifyd
|
|
echo "$netifyd_data" | jq -c '{
|
|
interfaces: (if .stats then
|
|
[.stats | to_entries[] | {
|
|
name: .key,
|
|
rx_bytes: (.value.ip_bytes // 0),
|
|
tx_bytes: (.value.wire_bytes // 0),
|
|
rx_packets: (.value.ip // 0),
|
|
tx_packets: (.value.raw // 0),
|
|
tcp: (.value.tcp // 0),
|
|
udp: (.value.udp // 0),
|
|
icmp: (.value.icmp // 0)
|
|
}]
|
|
else [] end),
|
|
flows_active: (.flows_active // 0),
|
|
flow_count: (.flow_count // 0),
|
|
uptime: (.uptime // 0),
|
|
agent_version: (.agent_version // 0),
|
|
cpu_system: (.cpu_system // 0),
|
|
cpu_user: (.cpu_user // 0),
|
|
memrss_kb: (.memrss_kb // 0)
|
|
}' 2>/dev/null || echo '{"interfaces":[],"flows_active":0,"flow_count":0,"uptime":0,"agent_version":0}'
|
|
}
|
|
|
|
case "$1" in
|
|
list)
|
|
cat <<-'EOF'
|
|
{
|
|
"status": {},
|
|
"get_active_streams": {},
|
|
"get_network_stats": {},
|
|
"get_stream_history": {"hours": 24},
|
|
"get_stats_by_service": {},
|
|
"get_stats_by_client": {},
|
|
"get_service_details": {"service": "string"},
|
|
"set_alert": {"service": "string", "threshold_hours": 4, "action": "notify"},
|
|
"delete_alert": {"alert_id": "string"},
|
|
"list_alerts": {},
|
|
"clear_history": {},
|
|
"get_settings": {},
|
|
"set_settings": {"enabled": 1, "history_retention": 7, "refresh_interval": 5}
|
|
}
|
|
EOF
|
|
;;
|
|
|
|
call)
|
|
case "$2" in
|
|
status)
|
|
init_storage
|
|
|
|
# Check nDPId status
|
|
ndpid_running=0
|
|
ndpid_version="unknown"
|
|
ndpid_flows=0
|
|
pgrep -x ndpid > /dev/null 2>&1 && ndpid_running=1
|
|
if [ "$ndpid_running" = "1" ] && [ -f "$NDPID_FLOWS" ]; then
|
|
ndpid_flows=$(jq 'length' "$NDPID_FLOWS" 2>/dev/null || echo 0)
|
|
ndpid_version=$(ndpid -v 2>/dev/null | head -1 | grep -oE '[0-9]+\.[0-9]+' | head -1 || echo "unknown")
|
|
fi
|
|
|
|
# Check netifyd status
|
|
netifyd_running=0
|
|
pgrep -x netifyd > /dev/null 2>&1 && netifyd_running=1
|
|
|
|
netifyd_data=$(get_netifyd_data)
|
|
netifyd_flows=0
|
|
netifyd_version="unknown"
|
|
|
|
if [ "$netifyd_running" = "1" ] && [ -n "$netifyd_data" ] && [ "$netifyd_data" != "{}" ]; then
|
|
netifyd_flows=$(echo "$netifyd_data" | jq '.flows_active // .flow_count // 0' 2>/dev/null || echo 0)
|
|
netifyd_version=$(echo "$netifyd_data" | jq -r '.agent_version // "unknown"' 2>/dev/null || echo "unknown")
|
|
fi
|
|
|
|
# Determine active DPI source
|
|
dpi_source=$(get_dpi_source)
|
|
|
|
# Use flow count from active source
|
|
if [ "$dpi_source" = "ndpid" ]; then
|
|
flow_count=$ndpid_flows
|
|
else
|
|
flow_count=$netifyd_flows
|
|
fi
|
|
|
|
history_count=0
|
|
[ -f "$HISTORY_FILE" ] && history_count=$(jq 'length' "$HISTORY_FILE" 2>/dev/null || echo 0)
|
|
|
|
# Check if nDPId cache has active streams
|
|
active_streams=0
|
|
[ -f "$MEDIA_CACHE" ] && active_streams=$(jq 'length' "$MEDIA_CACHE" 2>/dev/null || echo 0)
|
|
|
|
enabled=$(uci -q get media_flow.global.enabled 2>/dev/null || echo "1")
|
|
refresh=$(uci -q get media_flow.global.refresh_interval 2>/dev/null || echo "5")
|
|
|
|
cat <<-EOF
|
|
{
|
|
"enabled": $enabled,
|
|
"module": "media-flow",
|
|
"version": "0.6.0",
|
|
"dpi_source": "$dpi_source",
|
|
"ndpid_running": $ndpid_running,
|
|
"ndpid_version": "$ndpid_version",
|
|
"ndpid_flows": $ndpid_flows,
|
|
"netifyd_running": $netifyd_running,
|
|
"netifyd_version": "$netifyd_version",
|
|
"netifyd_flows": $netifyd_flows,
|
|
"active_flows": $flow_count,
|
|
"active_streams": $active_streams,
|
|
"history_entries": $history_count,
|
|
"refresh_interval": $refresh
|
|
}
|
|
EOF
|
|
;;
|
|
|
|
get_active_streams)
|
|
init_storage
|
|
|
|
dpi_source=$(get_dpi_source)
|
|
streams="[]"
|
|
flow_count=0
|
|
|
|
if [ "$dpi_source" = "ndpid" ]; then
|
|
# Get active streams from nDPId cache
|
|
if [ -f "$MEDIA_CACHE" ]; then
|
|
streams=$(cat "$MEDIA_CACHE" 2>/dev/null || echo "[]")
|
|
fi
|
|
if [ -f "$NDPID_FLOWS" ]; then
|
|
flow_count=$(jq 'length' "$NDPID_FLOWS" 2>/dev/null || echo 0)
|
|
fi
|
|
note="Streams detected via nDPId local DPI"
|
|
elif [ "$dpi_source" = "netifyd" ]; then
|
|
netifyd_data=$(get_netifyd_data)
|
|
flow_count=$(echo "$netifyd_data" | jq '.flows_active // .flow_count // 0' 2>/dev/null || echo 0)
|
|
note="Application detection requires netifyd cloud subscription"
|
|
else
|
|
note="No DPI engine available"
|
|
fi
|
|
|
|
cat <<-EOF
|
|
{
|
|
"streams": $streams,
|
|
"dpi_source": "$dpi_source",
|
|
"note": "$note",
|
|
"flow_count": $flow_count
|
|
}
|
|
EOF
|
|
;;
|
|
|
|
get_network_stats)
|
|
init_storage
|
|
|
|
netifyd_data=$(get_netifyd_data)
|
|
stats=$(build_network_stats_json "$netifyd_data")
|
|
|
|
echo "{\"stats\": $stats}"
|
|
;;
|
|
|
|
get_stream_history)
|
|
read -r input
|
|
hours=$(echo "$input" | jq -r '.hours // 24' 2>/dev/null)
|
|
[ -z "$hours" ] || [ "$hours" = "null" ] && hours=24
|
|
|
|
init_storage
|
|
|
|
history="[]"
|
|
if [ -f "$HISTORY_FILE" ]; then
|
|
history=$(jq -c '.' "$HISTORY_FILE" 2>/dev/null || echo "[]")
|
|
fi
|
|
|
|
cat <<-EOF
|
|
{"history": $history, "hours_requested": $hours}
|
|
EOF
|
|
;;
|
|
|
|
get_stats_by_service)
|
|
init_storage
|
|
|
|
services="{}"
|
|
if [ -f "$HISTORY_FILE" ] && [ -s "$HISTORY_FILE" ]; then
|
|
services=$(jq -c '
|
|
group_by(.app) |
|
|
map({
|
|
key: .[0].app,
|
|
value: {
|
|
sessions: length,
|
|
total_bandwidth_kbps: (map(.bandwidth) | add // 0),
|
|
total_duration_seconds: (map(.duration) | add // 0),
|
|
category: .[0].category
|
|
}
|
|
}) |
|
|
from_entries
|
|
' "$HISTORY_FILE" 2>/dev/null) || services="{}"
|
|
fi
|
|
|
|
cat <<-EOF
|
|
{"services": $services}
|
|
EOF
|
|
;;
|
|
|
|
get_stats_by_client)
|
|
init_storage
|
|
|
|
clients="{}"
|
|
if [ -f "$HISTORY_FILE" ] && [ -s "$HISTORY_FILE" ]; then
|
|
clients=$(jq -c '
|
|
group_by(.client) |
|
|
map({
|
|
key: .[0].client,
|
|
value: {
|
|
sessions: length,
|
|
total_bandwidth_kbps: (map(.bandwidth) | add // 0),
|
|
total_duration_seconds: (map(.duration) | add // 0),
|
|
top_service: (group_by(.app) | max_by(length) | .[0].app // "unknown")
|
|
}
|
|
}) |
|
|
from_entries
|
|
' "$HISTORY_FILE" 2>/dev/null) || clients="{}"
|
|
fi
|
|
|
|
cat <<-EOF
|
|
{"clients": $clients}
|
|
EOF
|
|
;;
|
|
|
|
get_service_details)
|
|
read -r input
|
|
service=$(echo "$input" | jq -r '.service // ""' 2>/dev/null)
|
|
|
|
init_storage
|
|
|
|
result='{}'
|
|
if [ -n "$service" ] && [ -f "$HISTORY_FILE" ] && [ -s "$HISTORY_FILE" ]; then
|
|
result=$(jq -c --arg svc "$service" '
|
|
[.[] | select(.app == $svc)] |
|
|
{
|
|
service: $svc,
|
|
total_sessions: length,
|
|
avg_bandwidth_kbps: (if length > 0 then (map(.bandwidth) | add / length | floor) else 0 end),
|
|
total_duration_seconds: (map(.duration) | add // 0),
|
|
category: (.[0].category // "unknown"),
|
|
typical_quality: (.[0].quality // "unknown"),
|
|
recent_sessions: (.[-10:] | map({
|
|
timestamp: .timestamp,
|
|
client: .client,
|
|
bandwidth_kbps: .bandwidth,
|
|
duration_seconds: .duration,
|
|
quality: .quality
|
|
}))
|
|
}
|
|
' "$HISTORY_FILE" 2>/dev/null) || result='{"service":"'$service'","total_sessions":0,"avg_bandwidth_kbps":0,"total_duration_seconds":0,"category":"unknown","typical_quality":"unknown","recent_sessions":[]}'
|
|
else
|
|
result='{"service":"'$service'","total_sessions":0,"avg_bandwidth_kbps":0,"total_duration_seconds":0,"category":"unknown","typical_quality":"unknown","recent_sessions":[]}'
|
|
fi
|
|
|
|
echo "$result"
|
|
;;
|
|
|
|
set_alert)
|
|
read -r input
|
|
service=$(echo "$input" | jq -r '.service // ""' 2>/dev/null)
|
|
threshold_hours=$(echo "$input" | jq -r '.threshold_hours // 4' 2>/dev/null)
|
|
action=$(echo "$input" | jq -r '.action // "notify"' 2>/dev/null)
|
|
|
|
if [ -z "$service" ]; then
|
|
echo '{"success": false, "message": "Service name required"}'
|
|
exit 0
|
|
fi
|
|
|
|
alert_id="alert_$(echo "$service" | tr -d ' ' | tr '[:upper:]' '[:lower:]' | tr -cd 'a-z0-9_')"
|
|
|
|
uci -q delete "media_flow.${alert_id}" 2>/dev/null
|
|
uci set "media_flow.${alert_id}=alert"
|
|
uci set "media_flow.${alert_id}.service=${service}"
|
|
uci set "media_flow.${alert_id}.threshold_hours=${threshold_hours}"
|
|
uci set "media_flow.${alert_id}.action=${action}"
|
|
uci set "media_flow.${alert_id}.enabled=1"
|
|
uci commit media_flow
|
|
|
|
cat <<-EOF
|
|
{"success": true, "message": "Alert configured for $service", "alert_id": "$alert_id"}
|
|
EOF
|
|
;;
|
|
|
|
delete_alert)
|
|
read -r input
|
|
alert_id=$(echo "$input" | jq -r '.alert_id // ""' 2>/dev/null)
|
|
|
|
if [ -z "$alert_id" ]; then
|
|
echo '{"success": false, "message": "Alert ID required"}'
|
|
exit 0
|
|
fi
|
|
|
|
if uci -q get "media_flow.${alert_id}" >/dev/null 2>&1; then
|
|
uci delete "media_flow.${alert_id}"
|
|
uci commit media_flow
|
|
echo '{"success": true, "message": "Alert deleted"}'
|
|
else
|
|
echo '{"success": false, "message": "Alert not found"}'
|
|
fi
|
|
;;
|
|
|
|
list_alerts)
|
|
alerts="[]"
|
|
|
|
# Build alerts array from UCI
|
|
alerts=$(uci show media_flow 2>/dev/null | grep "=alert$" | while read -r line; do
|
|
section=$(echo "$line" | cut -d. -f2 | cut -d= -f1)
|
|
svc=$(uci -q get "media_flow.${section}.service")
|
|
threshold=$(uci -q get "media_flow.${section}.threshold_hours")
|
|
act=$(uci -q get "media_flow.${section}.action")
|
|
en=$(uci -q get "media_flow.${section}.enabled")
|
|
[ -z "$en" ] && en="1"
|
|
|
|
cat <<-ALERT
|
|
{"id":"$section","service":"$svc","threshold_hours":$threshold,"action":"$act","enabled":$en}
|
|
ALERT
|
|
done | jq -s '.' 2>/dev/null) || alerts="[]"
|
|
|
|
[ -z "$alerts" ] || [ "$alerts" = "null" ] && alerts="[]"
|
|
|
|
cat <<-EOF
|
|
{"alerts": $alerts}
|
|
EOF
|
|
;;
|
|
|
|
clear_history)
|
|
echo '[]' > "$HISTORY_FILE"
|
|
echo '{"success": true, "message": "History cleared"}'
|
|
;;
|
|
|
|
get_settings)
|
|
enabled=$(uci -q get media_flow.global.enabled 2>/dev/null || echo "1")
|
|
retention=$(uci -q get media_flow.global.history_retention 2>/dev/null || echo "7")
|
|
refresh=$(uci -q get media_flow.global.refresh_interval 2>/dev/null || echo "5")
|
|
|
|
cat <<-EOF
|
|
{
|
|
"enabled": $enabled,
|
|
"history_retention": $retention,
|
|
"refresh_interval": $refresh
|
|
}
|
|
EOF
|
|
;;
|
|
|
|
set_settings)
|
|
read -r input
|
|
enabled=$(echo "$input" | jq -r '.enabled // 1' 2>/dev/null)
|
|
retention=$(echo "$input" | jq -r '.history_retention // 7' 2>/dev/null)
|
|
refresh=$(echo "$input" | jq -r '.refresh_interval // 5' 2>/dev/null)
|
|
|
|
uci set media_flow.global.enabled="$enabled"
|
|
uci set media_flow.global.history_retention="$retention"
|
|
uci set media_flow.global.refresh_interval="$refresh"
|
|
uci commit media_flow
|
|
|
|
echo '{"success": true, "message": "Settings saved"}'
|
|
;;
|
|
|
|
*)
|
|
cat <<-EOF
|
|
{"error": -32601, "message": "Method not found: $2"}
|
|
EOF
|
|
;;
|
|
esac
|
|
;;
|
|
esac
|