feat(p2p): Implement distributed catalog with Gitea sync and health probing
Distributed Catalog:
- Implement catalog_push_gitea() to push node catalogs via Gitea REST API
- Add catalog_push_merged_gitea() for merged catalog sync
- Create /api/factory/catalog-sync POST endpoint for triggering sync
- Catalogs pushed to catalog/nodes/{hostname}.json in Gitea repo
Health Probing:
- Add get_service_health() with cached latency measurement
- HTTP probe with curl to measure response time
- Fallback to /proc/net/tcp port check
- 60-second cache TTL to keep catalog endpoint fast
Files:
- factory.sh: Gitea REST API integration for catalog push
- catalog: Health probing with latency measurement
- catalog-sync: New CGI endpoint for sync operations
- Makefile: Install catalog-sync endpoint
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
29f55ec6bc
commit
c33a9650e7
@ -69,6 +69,7 @@ define Package/secubox-p2p/install
|
||||
$(INSTALL_BIN) ./root/www/api/factory/pubkey $(1)/www/api/factory/
|
||||
$(INSTALL_BIN) ./root/www/api/factory/mesh-services $(1)/www/api/factory/
|
||||
$(INSTALL_BIN) ./root/www/api/factory/catalog $(1)/www/api/factory/
|
||||
$(INSTALL_BIN) ./root/www/api/factory/catalog-sync $(1)/www/api/factory/
|
||||
|
||||
# Factory Web UI
|
||||
$(INSTALL_DIR) $(1)/www/factory
|
||||
|
||||
@ -385,11 +385,15 @@ catalog_sync() {
|
||||
catalog_merge
|
||||
|
||||
# Push to Gitea if enabled
|
||||
local gitea_result=""
|
||||
if [ "$(uci -q get secubox-p2p.gitea.enabled)" = "1" ]; then
|
||||
catalog_push_gitea
|
||||
# Push local node catalog
|
||||
gitea_result=$(catalog_push_gitea)
|
||||
# Push merged catalog
|
||||
catalog_push_merged_gitea
|
||||
fi
|
||||
|
||||
echo "{\"synced\":$synced,\"failed\":$failed}"
|
||||
echo "{\"synced\":$synced,\"failed\":$failed,\"gitea\":\"$gitea_result\"}"
|
||||
}
|
||||
|
||||
# Merge all catalogs into unified view (CRDT union)
|
||||
@ -425,19 +429,116 @@ catalog_merge() {
|
||||
echo "$merged_services" > "$MERGED_CATALOG"
|
||||
}
|
||||
|
||||
# Push catalog to Gitea repository
|
||||
# Push catalog to Gitea repository via REST API
|
||||
catalog_push_gitea() {
|
||||
local gitea_enabled=$(uci -q get secubox-p2p.gitea.enabled)
|
||||
[ "$gitea_enabled" = "1" ] || return 0
|
||||
[ "$gitea_enabled" = "1" ] || { echo "gitea_disabled"; return 0; }
|
||||
|
||||
local server_url=$(uci -q get secubox-p2p.gitea.server_url)
|
||||
local access_token=$(uci -q get secubox-p2p.gitea.access_token)
|
||||
local repo_owner=$(uci -q get secubox-p2p.gitea.repo_owner)
|
||||
local repo_name=$(uci -q get secubox-p2p.gitea.repo_name)
|
||||
|
||||
if [ -z "$server_url" ] || [ -z "$access_token" ] || [ -z "$repo_owner" ] || [ -z "$repo_name" ]; then
|
||||
echo "gitea_not_configured"
|
||||
return 1
|
||||
fi
|
||||
|
||||
local node_name=$(uci -q get system.@system[0].hostname || hostname)
|
||||
|
||||
# This would push to secubox-catalog repo in Gitea
|
||||
# For now, just log the action
|
||||
logger -t factory "Catalog sync: would push to Gitea nodes/${node_name}.json"
|
||||
# Check if local catalog exists
|
||||
[ -f "$LOCAL_CATALOG" ] || {
|
||||
catalog_generate_local >/dev/null 2>&1
|
||||
}
|
||||
[ -f "$LOCAL_CATALOG" ] || { echo "no_local_catalog"; return 1; }
|
||||
|
||||
# Future: use ubus call to trigger actual git push
|
||||
# ubus call luci.secubox-p2p push_catalog_gitea "{\"file\":\"nodes/${node_name}.json\"}" 2>/dev/null
|
||||
# Prepare catalog content for Gitea API
|
||||
# Base64 encode the catalog JSON
|
||||
local content=$(cat "$LOCAL_CATALOG" | base64 -w0 2>/dev/null || cat "$LOCAL_CATALOG" | base64 2>/dev/null | tr -d '\n')
|
||||
local file_path="catalog/nodes/${node_name}.json"
|
||||
local commit_msg="Catalog sync: ${node_name} $(date -Iseconds 2>/dev/null || date '+%Y-%m-%dT%H:%M:%S')"
|
||||
|
||||
# Check if file exists to get SHA for update
|
||||
local existing_sha=""
|
||||
local existing=$(curl -s --connect-timeout 5 --max-time 10 \
|
||||
-H "Authorization: token $access_token" \
|
||||
"${server_url}/api/v1/repos/${repo_owner}/${repo_name}/contents/${file_path}" 2>/dev/null)
|
||||
|
||||
if echo "$existing" | grep -q '"sha"'; then
|
||||
existing_sha=$(echo "$existing" | jsonfilter -e '@.sha' 2>/dev/null)
|
||||
fi
|
||||
|
||||
# Build API request body
|
||||
local request_body
|
||||
if [ -n "$existing_sha" ]; then
|
||||
# Update existing file
|
||||
request_body="{\"content\":\"${content}\",\"message\":\"${commit_msg}\",\"sha\":\"${existing_sha}\"}"
|
||||
else
|
||||
# Create new file
|
||||
request_body="{\"content\":\"${content}\",\"message\":\"${commit_msg}\"}"
|
||||
fi
|
||||
|
||||
# Push to Gitea
|
||||
local response=$(curl -s --connect-timeout 5 --max-time 15 \
|
||||
-X PUT \
|
||||
-H "Authorization: token $access_token" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$request_body" \
|
||||
"${server_url}/api/v1/repos/${repo_owner}/${repo_name}/contents/${file_path}" 2>/dev/null)
|
||||
|
||||
if echo "$response" | grep -q '"sha"'; then
|
||||
local new_sha=$(echo "$response" | jsonfilter -e '@.content.sha' 2>/dev/null)
|
||||
logger -t factory "Catalog pushed to Gitea: ${file_path} (sha: ${new_sha})"
|
||||
echo "pushed:${file_path}"
|
||||
return 0
|
||||
else
|
||||
local error_msg=$(echo "$response" | jsonfilter -e '@.message' 2>/dev/null || echo "unknown_error")
|
||||
logger -t factory "Catalog push failed: $error_msg"
|
||||
echo "error:$error_msg"
|
||||
return 1
|
||||
fi
|
||||
}
|
||||
|
||||
# Push merged catalog to Gitea
|
||||
catalog_push_merged_gitea() {
|
||||
local gitea_enabled=$(uci -q get secubox-p2p.gitea.enabled)
|
||||
[ "$gitea_enabled" = "1" ] || return 0
|
||||
|
||||
local server_url=$(uci -q get secubox-p2p.gitea.server_url)
|
||||
local access_token=$(uci -q get secubox-p2p.gitea.access_token)
|
||||
local repo_owner=$(uci -q get secubox-p2p.gitea.repo_owner)
|
||||
local repo_name=$(uci -q get secubox-p2p.gitea.repo_name)
|
||||
|
||||
[ -z "$server_url" ] || [ -z "$access_token" ] && return 1
|
||||
[ -f "$MERGED_CATALOG" ] || return 1
|
||||
|
||||
local content=$(cat "$MERGED_CATALOG" | base64 -w0 2>/dev/null || cat "$MERGED_CATALOG" | base64 2>/dev/null | tr -d '\n')
|
||||
local file_path="catalog/catalog.json"
|
||||
local commit_msg="Merged catalog $(date -Iseconds 2>/dev/null || date '+%Y-%m-%dT%H:%M:%S')"
|
||||
|
||||
# Get existing SHA if file exists
|
||||
local existing_sha=""
|
||||
local existing=$(curl -s --connect-timeout 5 --max-time 10 \
|
||||
-H "Authorization: token $access_token" \
|
||||
"${server_url}/api/v1/repos/${repo_owner}/${repo_name}/contents/${file_path}" 2>/dev/null)
|
||||
|
||||
if echo "$existing" | grep -q '"sha"'; then
|
||||
existing_sha=$(echo "$existing" | jsonfilter -e '@.sha' 2>/dev/null)
|
||||
fi
|
||||
|
||||
local request_body
|
||||
if [ -n "$existing_sha" ]; then
|
||||
request_body="{\"content\":\"${content}\",\"message\":\"${commit_msg}\",\"sha\":\"${existing_sha}\"}"
|
||||
else
|
||||
request_body="{\"content\":\"${content}\",\"message\":\"${commit_msg}\"}"
|
||||
fi
|
||||
|
||||
curl -s --connect-timeout 5 --max-time 15 \
|
||||
-X PUT \
|
||||
-H "Authorization: token $access_token" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$request_body" \
|
||||
"${server_url}/api/v1/repos/${repo_owner}/${repo_name}/contents/${file_path}" >/dev/null 2>&1
|
||||
}
|
||||
|
||||
# Get merged catalog JSON
|
||||
|
||||
@ -95,6 +95,61 @@ normalize_backend_name() {
|
||||
echo "$backend" | sed -e 's/^backend_//' -e 's/^srv_//' -e 's/^be_//' -e 's/_/-/g'
|
||||
}
|
||||
|
||||
# Health cache directory
|
||||
HEALTH_CACHE_DIR="/tmp/secubox-catalog-health"
|
||||
HEALTH_CACHE_TTL=60 # Cache TTL in seconds
|
||||
|
||||
# Get cached health or probe service
|
||||
get_service_health() {
|
||||
local svc_name="$1"
|
||||
local port="$2"
|
||||
local ip="${3:-127.0.0.1}"
|
||||
|
||||
local cache_file="$HEALTH_CACHE_DIR/${svc_name}.health"
|
||||
local now=$(date +%s)
|
||||
|
||||
# Check cache
|
||||
if [ -f "$cache_file" ]; then
|
||||
local cache_time=$(stat -c %Y "$cache_file" 2>/dev/null || echo 0)
|
||||
local age=$((now - cache_time))
|
||||
if [ $age -lt $HEALTH_CACHE_TTL ]; then
|
||||
cat "$cache_file"
|
||||
return
|
||||
fi
|
||||
fi
|
||||
|
||||
# Probe service (quick TCP connect test)
|
||||
local latency=0
|
||||
local health_status="unknown"
|
||||
|
||||
if [ -n "$port" ] && [ "$port" != "0" ]; then
|
||||
# Use curl to measure HTTP response time
|
||||
local start_ms=$(($(date +%s%N 2>/dev/null || echo "0") / 1000000))
|
||||
if curl -s --connect-timeout 1 --max-time 2 -o /dev/null "http://${ip}:${port}/" 2>/dev/null; then
|
||||
local end_ms=$(($(date +%s%N 2>/dev/null || echo "0") / 1000000))
|
||||
if [ $start_ms -gt 0 ] && [ $end_ms -gt 0 ]; then
|
||||
latency=$((end_ms - start_ms))
|
||||
fi
|
||||
health_status="healthy"
|
||||
else
|
||||
# Fallback: check if port is open via /proc/net/tcp
|
||||
local port_hex=$(printf '%04X' "$port" 2>/dev/null)
|
||||
if grep -qi ":${port_hex} " /proc/net/tcp 2>/dev/null; then
|
||||
health_status="degraded"
|
||||
latency=0
|
||||
else
|
||||
health_status="unhealthy"
|
||||
latency=0
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
|
||||
# Cache the result
|
||||
mkdir -p "$HEALTH_CACHE_DIR"
|
||||
echo "${health_status}|${latency}" > "$cache_file"
|
||||
echo "${health_status}|${latency}"
|
||||
}
|
||||
|
||||
# Get service status and port from init scripts
|
||||
get_service_info() {
|
||||
local svc_name="$1"
|
||||
@ -208,6 +263,12 @@ build_catalog() {
|
||||
|
||||
endpoints="$endpoints]"
|
||||
|
||||
# Get health with latency measurement
|
||||
local health_data=$(get_service_health "$svc_name" "$port")
|
||||
local health_status=$(echo "$health_data" | cut -d'|' -f1)
|
||||
local health_latency=$(echo "$health_data" | cut -d'|' -f2)
|
||||
[ -z "$health_latency" ] && health_latency=0
|
||||
|
||||
# Output service entry
|
||||
cat << SVCEOF
|
||||
{
|
||||
@ -219,8 +280,8 @@ build_catalog() {
|
||||
"endpoints": $endpoints,
|
||||
"health": {
|
||||
"last_check": "$UPDATED",
|
||||
"status": "$([ \"$status\" = \"running\" ] && echo healthy || echo unhealthy)",
|
||||
"latency_ms": 0
|
||||
"status": "${health_status:-unknown}",
|
||||
"latency_ms": ${health_latency:-0}
|
||||
}
|
||||
}
|
||||
SVCEOF
|
||||
@ -263,6 +324,12 @@ SVCEOF
|
||||
|
||||
endpoints="$endpoints]"
|
||||
|
||||
# Get health with latency
|
||||
local health_data=$(get_service_health "$svc_name" "$svc_port")
|
||||
local health_status=$(echo "$health_data" | cut -d'|' -f1)
|
||||
local health_latency=$(echo "$health_data" | cut -d'|' -f2)
|
||||
[ -z "$health_latency" ] && health_latency=0
|
||||
|
||||
cat << SVCEOF
|
||||
{
|
||||
"name": "$svc_name",
|
||||
@ -273,8 +340,8 @@ SVCEOF
|
||||
"endpoints": $endpoints,
|
||||
"health": {
|
||||
"last_check": "$UPDATED",
|
||||
"status": "$([ \"$svc_status\" = \"running\" ] && echo healthy || echo unhealthy)",
|
||||
"latency_ms": 0
|
||||
"status": "${health_status:-unknown}",
|
||||
"latency_ms": ${health_latency:-0}
|
||||
}
|
||||
}
|
||||
SVCEOF
|
||||
|
||||
85
package/secubox/secubox-p2p/root/www/api/factory/catalog-sync
Executable file
85
package/secubox/secubox-p2p/root/www/api/factory/catalog-sync
Executable file
@ -0,0 +1,85 @@
|
||||
#!/bin/sh
|
||||
# Factory Catalog Sync - Trigger distributed catalog synchronization
|
||||
# POST endpoint to sync catalogs with mesh peers
|
||||
|
||||
echo "Content-Type: application/json"
|
||||
echo "Access-Control-Allow-Origin: *"
|
||||
echo "Access-Control-Allow-Methods: POST, OPTIONS"
|
||||
echo "Access-Control-Allow-Headers: Content-Type"
|
||||
echo ""
|
||||
|
||||
# Handle CORS preflight
|
||||
if [ "$REQUEST_METHOD" = "OPTIONS" ]; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Only allow POST
|
||||
if [ "$REQUEST_METHOD" != "POST" ]; then
|
||||
echo '{"success":false,"error":"method_not_allowed","message":"Use POST to trigger catalog sync"}'
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Load factory library
|
||||
. /usr/lib/secubox/factory.sh 2>/dev/null
|
||||
|
||||
# Optionally parse request body for options
|
||||
read -r body 2>/dev/null || body="{}"
|
||||
|
||||
# Options from request
|
||||
push_gitea=$(echo "$body" | jsonfilter -e '@.push_gitea' 2>/dev/null || echo "true")
|
||||
pull_peers=$(echo "$body" | jsonfilter -e '@.pull_peers' 2>/dev/null || echo "true")
|
||||
specific_peer=$(echo "$body" | jsonfilter -e '@.peer' 2>/dev/null)
|
||||
|
||||
# Initialize catalog directory
|
||||
catalog_init
|
||||
|
||||
# Generate local catalog first
|
||||
local_result=$(catalog_generate_local 2>&1)
|
||||
|
||||
synced=0
|
||||
failed=0
|
||||
gitea_result=""
|
||||
|
||||
# Pull from specific peer or all peers
|
||||
if [ -n "$specific_peer" ]; then
|
||||
# Sync with specific peer only
|
||||
result=$(catalog_pull_peer "$specific_peer" 2>&1)
|
||||
if echo "$result" | grep -q "^pulled:"; then
|
||||
synced=1
|
||||
else
|
||||
failed=1
|
||||
fi
|
||||
elif [ "$pull_peers" = "true" ]; then
|
||||
# Full sync with all peers
|
||||
sync_output=$(catalog_sync 2>&1)
|
||||
synced=$(echo "$sync_output" | jsonfilter -e '@.synced' 2>/dev/null || echo "0")
|
||||
failed=$(echo "$sync_output" | jsonfilter -e '@.failed' 2>/dev/null || echo "0")
|
||||
gitea_result=$(echo "$sync_output" | jsonfilter -e '@.gitea' 2>/dev/null || echo "")
|
||||
else
|
||||
# Just regenerate local catalog and merge
|
||||
catalog_merge
|
||||
if [ "$push_gitea" = "true" ] && [ "$(uci -q get secubox-p2p.gitea.enabled)" = "1" ]; then
|
||||
gitea_result=$(catalog_push_gitea 2>&1)
|
||||
catalog_push_merged_gitea 2>&1
|
||||
fi
|
||||
fi
|
||||
|
||||
# Get summary stats
|
||||
node_name=$(uci -q get system.@system[0].hostname || hostname)
|
||||
updated=$(date -Iseconds 2>/dev/null || date '+%Y-%m-%dT%H:%M:%S')
|
||||
peer_count=$(ls -1 "$CATALOG_DIR/peers/"*.json 2>/dev/null | wc -l)
|
||||
|
||||
# Output result
|
||||
cat << EOF
|
||||
{
|
||||
"success": true,
|
||||
"node_name": "$node_name",
|
||||
"updated": "$updated",
|
||||
"peers_synced": $synced,
|
||||
"peers_failed": $failed,
|
||||
"peer_catalogs": $peer_count,
|
||||
"gitea": "$gitea_result",
|
||||
"local_catalog": "$LOCAL_CATALOG",
|
||||
"merged_catalog": "$MERGED_CATALOG"
|
||||
}
|
||||
EOF
|
||||
Loading…
Reference in New Issue
Block a user