#!/bin/sh # RPCD backend for Media Flow # Provides ubus interface: luci.media-flow . /lib/functions.sh . /usr/share/libubox/jshn.sh # Streaming services detection patterns # Based on netifyd application detection HISTORY_FILE="/tmp/media-flow-history.json" ALERTS_FILE="/etc/config/media_flow" STATS_DIR="/tmp/media-flow-stats" # Initialize init_storage() { mkdir -p "$STATS_DIR" [ ! -f "$HISTORY_FILE" ] && echo '[]' > "$HISTORY_FILE" } # Get netifyd flows and filter streaming services get_netifyd_flows() { # Try to get flows from netifyd socket or status file if [ -S /var/run/netifyd/netifyd.sock ]; then echo "status" | nc -U /var/run/netifyd/netifyd.sock 2>/dev/null elif [ -f /var/run/netifyd/status.json ]; then cat /var/run/netifyd/status.json else echo '{}' fi } # Detect if application is a streaming service is_streaming_service() { local app="$1" # Video streaming echo "$app" | grep -qiE 'netflix|youtube|disney|primevideo|amazon.*video|twitch|hulu|hbo|vimeo' && return 0 # Audio streaming echo "$app" | grep -qiE 'spotify|apple.*music|deezer|soundcloud|tidal|pandora' && return 0 # Video conferencing echo "$app" | grep -qiE 'zoom|teams|meet|discord|skype|webex' && return 0 return 1 } # Estimate quality based on bandwidth (kbps) estimate_quality() { local bandwidth="$1" # in kbps # Video streaming quality estimation if [ "$bandwidth" -lt 1000 ]; then echo "SD" elif [ "$bandwidth" -lt 3000 ]; then echo "HD" elif [ "$bandwidth" -lt 8000 ]; then echo "FHD" else echo "4K" fi } # Get service category get_service_category() { local app="$1" echo "$app" | grep -qiE 'netflix|youtube|disney|primevideo|twitch|hulu|hbo|vimeo' && echo "video" && return echo "$app" | grep -qiE 'spotify|apple.*music|deezer|soundcloud|tidal' && echo "audio" && return echo "$app" | grep -qiE 'zoom|teams|meet|discord|skype|webex' && echo "visio" && return echo "other" } # Save stream to history save_to_history() { local app="$1" local client="$2" local bandwidth="$3" local duration="$4" init_storage local timestamp=$(date -Iseconds) local quality=$(estimate_quality "$bandwidth") local category=$(get_service_category "$app") # Append to history (keep last 1000 entries) local entry="{\"timestamp\":\"$timestamp\",\"app\":\"$app\",\"client\":\"$client\",\"bandwidth\":$bandwidth,\"duration\":$duration,\"quality\":\"$quality\",\"category\":\"$category\"}" if [ -f "$HISTORY_FILE" ]; then jq ". += [$entry] | .[-1000:]" "$HISTORY_FILE" > "${HISTORY_FILE}.tmp" 2>/dev/null && mv "${HISTORY_FILE}.tmp" "$HISTORY_FILE" fi } case "$1" in list) # List available methods json_init json_add_object "status" json_close_object json_add_object "get_active_streams" json_close_object json_add_object "get_stream_history" json_add_string "hours" "int" json_close_object json_add_object "get_stats_by_service" json_close_object json_add_object "get_stats_by_client" json_close_object json_add_object "get_service_details" json_add_string "service" "string" json_close_object json_add_object "set_alert" json_add_string "service" "string" json_add_string "threshold_hours" "int" json_add_string "action" "string" json_close_object json_add_object "list_alerts" json_close_object json_dump ;; call) case "$2" in status) init_storage json_init json_add_boolean "enabled" 1 json_add_string "module" "media-flow" json_add_string "version" "1.0.0" # Check netifyd status if pgrep -x netifyd > /dev/null 2>&1; then json_add_boolean "netifyd_running" 1 else json_add_boolean "netifyd_running" 0 fi # Count active streams local active_count=0 local flows=$(get_netifyd_flows) if [ -n "$flows" ]; then active_count=$(echo "$flows" | jq '[.flows[]? | select(.detected_application != null)] | length' 2>/dev/null || echo 0) fi json_add_int "active_streams" "$active_count" # History size local history_count=0 if [ -f "$HISTORY_FILE" ]; then history_count=$(jq 'length' "$HISTORY_FILE" 2>/dev/null || echo 0) fi json_add_int "history_entries" "$history_count" json_dump ;; get_active_streams) json_init json_add_array "streams" # Get flows from netifyd local flows=$(get_netifyd_flows) if [ -n "$flows" ]; then # Parse flows and filter streaming services echo "$flows" | jq -c '.flows[]? | select(.detected_application != null)' 2>/dev/null | while read -r flow; do local app=$(echo "$flow" | jq -r '.detected_application // "unknown"') local src_ip=$(echo "$flow" | jq -r '.src_ip // "0.0.0.0"') local dst_ip=$(echo "$flow" | jq -r '.dst_ip // "0.0.0.0"') local bytes=$(echo "$flow" | jq -r '.total_bytes // 0') local packets=$(echo "$flow" | jq -r '.total_packets // 0') # Check if it's a streaming service if is_streaming_service "$app"; then # Estimate bandwidth (rough estimation) local bandwidth=0 if [ "$packets" -gt 0 ]; then bandwidth=$((bytes * 8 / packets / 100)) # Very rough kbps estimate fi local quality=$(estimate_quality "$bandwidth") local category=$(get_service_category "$app") json_add_object json_add_string "application" "$app" json_add_string "client_ip" "$src_ip" json_add_string "server_ip" "$dst_ip" json_add_int "bandwidth_kbps" "$bandwidth" json_add_string "quality" "$quality" json_add_string "category" "$category" json_add_int "total_bytes" "$bytes" json_add_int "total_packets" "$packets" json_close_object fi done fi json_close_array json_dump ;; get_stream_history) read -r input json_load "$input" json_get_var hours hours # Default to 24 hours hours=${hours:-24} init_storage json_init json_add_array "history" if [ -f "$HISTORY_FILE" ]; then # Filter by time (last N hours) local cutoff_time=$(date -d "$hours hours ago" -Iseconds 2>/dev/null || date -Iseconds) jq -c ".[] | select(.timestamp >= \"$cutoff_time\")" "$HISTORY_FILE" 2>/dev/null | while read -r entry; do echo "$entry" done | jq -s '.' | jq -c '.[]' | while read -r entry; do local timestamp=$(echo "$entry" | jq -r '.timestamp') local app=$(echo "$entry" | jq -r '.app') local client=$(echo "$entry" | jq -r '.client') local bandwidth=$(echo "$entry" | jq -r '.bandwidth') local duration=$(echo "$entry" | jq -r '.duration') local quality=$(echo "$entry" | jq -r '.quality') local category=$(echo "$entry" | jq -r '.category') json_add_object json_add_string "timestamp" "$timestamp" json_add_string "application" "$app" json_add_string "client" "$client" json_add_int "bandwidth_kbps" "$bandwidth" json_add_int "duration_seconds" "$duration" json_add_string "quality" "$quality" json_add_string "category" "$category" json_close_object done fi json_close_array json_dump ;; get_stats_by_service) init_storage json_init json_add_object "services" if [ -f "$HISTORY_FILE" ]; then # Aggregate by service local services=$(jq -r '.[].app' "$HISTORY_FILE" 2>/dev/null | sort -u) for service in $services; do local count=$(jq "[.[] | select(.app == \"$service\")] | length" "$HISTORY_FILE" 2>/dev/null || echo 0) local total_bandwidth=$(jq "[.[] | select(.app == \"$service\")] | map(.bandwidth) | add" "$HISTORY_FILE" 2>/dev/null || echo 0) local total_duration=$(jq "[.[] | select(.app == \"$service\")] | map(.duration) | add" "$HISTORY_FILE" 2>/dev/null || echo 0) local category=$(jq -r "[.[] | select(.app == \"$service\")][0].category" "$HISTORY_FILE" 2>/dev/null || echo "other") json_add_object "$service" json_add_int "sessions" "$count" json_add_int "total_bandwidth_kbps" "$total_bandwidth" json_add_int "total_duration_seconds" "$total_duration" json_add_string "category" "$category" json_close_object done fi json_close_object json_dump ;; get_stats_by_client) init_storage json_init json_add_object "clients" if [ -f "$HISTORY_FILE" ]; then # Aggregate by client local clients=$(jq -r '.[].client' "$HISTORY_FILE" 2>/dev/null | sort -u) for client in $clients; do local count=$(jq "[.[] | select(.client == \"$client\")] | length" "$HISTORY_FILE" 2>/dev/null || echo 0) local total_bandwidth=$(jq "[.[] | select(.client == \"$client\")] | map(.bandwidth) | add" "$HISTORY_FILE" 2>/dev/null || echo 0) local total_duration=$(jq "[.[] | select(.client == \"$client\")] | map(.duration) | add" "$HISTORY_FILE" 2>/dev/null || echo 0) local top_service=$(jq -r "[.[] | select(.client == \"$client\")] | group_by(.app) | max_by(length)[0].app" "$HISTORY_FILE" 2>/dev/null || echo "unknown") json_add_object "$client" json_add_int "sessions" "$count" json_add_int "total_bandwidth_kbps" "$total_bandwidth" json_add_int "total_duration_seconds" "$total_duration" json_add_string "top_service" "$top_service" json_close_object done fi json_close_object json_dump ;; get_service_details) read -r input json_load "$input" json_get_var service service init_storage json_init json_add_string "service" "$service" if [ -f "$HISTORY_FILE" ] && [ -n "$service" ]; then local count=$(jq "[.[] | select(.app == \"$service\")] | length" "$HISTORY_FILE" 2>/dev/null || echo 0) local avg_bandwidth=$(jq "[.[] | select(.app == \"$service\")] | map(.bandwidth) | add / length" "$HISTORY_FILE" 2>/dev/null || echo 0) local total_duration=$(jq "[.[] | select(.app == \"$service\")] | map(.duration) | add" "$HISTORY_FILE" 2>/dev/null || echo 0) local category=$(jq -r "[.[] | select(.app == \"$service\")][0].category" "$HISTORY_FILE" 2>/dev/null || echo "other") local quality=$(estimate_quality "$avg_bandwidth") json_add_int "total_sessions" "$count" json_add_int "avg_bandwidth_kbps" "$avg_bandwidth" json_add_int "total_duration_seconds" "$total_duration" json_add_string "category" "$category" json_add_string "typical_quality" "$quality" # Recent sessions json_add_array "recent_sessions" jq -c "[.[] | select(.app == \"$service\")] | .[-10:][]" "$HISTORY_FILE" 2>/dev/null | while read -r session; do json_add_object json_add_string "timestamp" "$(echo "$session" | jq -r '.timestamp')" json_add_string "client" "$(echo "$session" | jq -r '.client')" json_add_int "bandwidth_kbps" "$(echo "$session" | jq -r '.bandwidth')" json_add_int "duration_seconds" "$(echo "$session" | jq -r '.duration')" json_add_string "quality" "$(echo "$session" | jq -r '.quality')" json_close_object done json_close_array else json_add_int "total_sessions" 0 json_add_int "avg_bandwidth_kbps" 0 json_add_int "total_duration_seconds" 0 json_add_string "category" "unknown" json_add_string "typical_quality" "unknown" json_add_array "recent_sessions" json_close_array fi json_dump ;; set_alert) read -r input json_load "$input" json_get_var service service json_get_var threshold_hours threshold_hours json_get_var action action # Save alert to UCI config . /lib/functions.sh # Create config if not exists touch "$ALERTS_FILE" # Add or update alert local alert_id="alert_$(echo "$service" | tr -d ' ' | tr '[:upper:]' '[:lower:]')" uci -q delete "media_flow.${alert_id}" 2>/dev/null uci set "media_flow.${alert_id}=alert" uci set "media_flow.${alert_id}.service=${service}" uci set "media_flow.${alert_id}.threshold_hours=${threshold_hours}" uci set "media_flow.${alert_id}.action=${action}" uci set "media_flow.${alert_id}.enabled=1" uci commit media_flow json_init json_add_boolean "success" 1 json_add_string "message" "Alert configured for $service" json_add_string "alert_id" "$alert_id" json_dump ;; list_alerts) json_init json_add_array "alerts" if [ -f "$ALERTS_FILE" ]; then . /lib/functions.sh config_load media_flow config_cb() { local type="$1" local name="$2" if [ "$type" = "alert" ]; then local service threshold_hours action enabled config_get service "$name" service config_get threshold_hours "$name" threshold_hours config_get action "$name" action config_get enabled "$name" enabled json_add_object json_add_string "id" "$name" json_add_string "service" "$service" json_add_int "threshold_hours" "$threshold_hours" json_add_string "action" "$action" json_add_boolean "enabled" "$enabled" json_close_object fi } config_load media_flow fi json_close_array json_dump ;; *) json_init json_add_int "error" -32601 json_add_string "message" "Method not found: $2" json_dump ;; esac ;; esac