feat(mitmproxy): Add subdomain-based WAF metrics tracking

- Track requests, threats, protocols per subdomain
- Record HTTP methods, status codes, top URIs, countries
- New RPCD method: subdomain_metrics
- Metrics auto-saved to /tmp/secubox-subdomain-metrics.json
- Add wan_setup/wan_clear to ACL write permissions

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
CyberMind-FR 2026-02-08 11:15:04 +01:00
parent e6b65679a4
commit 46af1ccdd1
3 changed files with 186 additions and 3 deletions

View File

@ -531,6 +531,34 @@ get_threat_stats() {
EOFJ
}
get_subdomain_metrics() {
local metrics_file="/tmp/secubox-subdomain-metrics.json"
local subdomain_metrics=""
# Try to get metrics from LXC container
if command -v lxc-attach >/dev/null 2>&1; then
subdomain_metrics=$(lxc-attach -n "$LXC_NAME" -- cat /tmp/secubox-subdomain-metrics.json 2>/dev/null)
fi
# Fall back to host path
if [ -z "$subdomain_metrics" ]; then
[ -f "$metrics_file" ] && subdomain_metrics=$(cat "$metrics_file" 2>/dev/null)
fi
# Default empty metrics
if [ -z "$subdomain_metrics" ]; then
subdomain_metrics='{"updated":null,"subdomains":{}}'
fi
cat <<EOFJ
{
"success": true,
"metrics": $subdomain_metrics,
"timestamp": "$(date -Iseconds)"
}
EOFJ
}
clear_alerts() {
# Clear the host-visible threats log file
local log_file="/srv/mitmproxy/threats.log"
@ -667,7 +695,7 @@ wan_clear() {
}
list_methods() { cat <<'EOFM'
{"status":{},"status_cached":{},"settings":{},"save_settings":{"mode":"str","enabled":"bool","proxy_port":"int","web_port":"int","apply_now":"bool","wan_protection_enabled":"bool","wan_interface":"str"},"set_mode":{"mode":"str","apply_now":"bool"},"setup_firewall":{},"clear_firewall":{},"wan_setup":{},"wan_clear":{},"install":{},"start":{},"stop":{},"restart":{},"alerts":{},"threat_stats":{},"clear_alerts":{},"haproxy_enable":{},"haproxy_disable":{},"sync_routes":{}}
{"status":{},"status_cached":{},"settings":{},"save_settings":{"mode":"str","enabled":"bool","proxy_port":"int","web_port":"int","apply_now":"bool","wan_protection_enabled":"bool","wan_interface":"str"},"set_mode":{"mode":"str","apply_now":"bool"},"setup_firewall":{},"clear_firewall":{},"wan_setup":{},"wan_clear":{},"install":{},"start":{},"stop":{},"restart":{},"alerts":{},"threat_stats":{},"subdomain_metrics":{},"clear_alerts":{},"haproxy_enable":{},"haproxy_disable":{},"sync_routes":{}}
EOFM
}
@ -690,6 +718,7 @@ case "$1" in
restart) do_restart ;;
alerts) get_alerts ;;
threat_stats) get_threat_stats ;;
subdomain_metrics) get_subdomain_metrics ;;
clear_alerts) clear_alerts ;;
haproxy_enable) haproxy_enable ;;
haproxy_disable) haproxy_disable ;;

View File

@ -3,7 +3,7 @@
"description": "Grant access to mitmproxy",
"read": {
"ubus": {
"luci.mitmproxy": ["status", "settings", "alerts", "threat_stats"]
"luci.mitmproxy": ["status", "settings", "alerts", "threat_stats", "subdomain_metrics"]
},
"uci": ["mitmproxy"]
},
@ -18,6 +18,8 @@
"set_mode",
"setup_firewall",
"clear_firewall",
"wan_setup",
"wan_clear",
"clear_alerts",
"haproxy_enable",
"haproxy_disable",

View File

@ -28,6 +28,8 @@ STATS_FILE = "/tmp/secubox-mitm-stats.json"
AUTOBAN_FILE = "/data/autoban-requests.log"
# Auto-ban config file (written by host from UCI)
AUTOBAN_CONFIG = "/data/autoban.json"
# Subdomain metrics file
SUBDOMAIN_METRICS_FILE = "/tmp/secubox-subdomain-metrics.json"
# ============================================================================
# THREAT DETECTION PATTERNS
@ -490,10 +492,23 @@ class SecuBoxAnalytics:
# Attempt tracking for sensitivity-based auto-ban
# Structure: {ip: [(timestamp, severity, reason), ...]}
self.threat_attempts = defaultdict(list)
# Subdomain metrics tracking
# Structure: {subdomain: {requests, threats, protocols: {http, https}, top_uris: {path: count}}}
self.subdomain_metrics = defaultdict(lambda: {
'requests': 0,
'threats': 0,
'protocols': defaultdict(int),
'methods': defaultdict(int),
'status_codes': defaultdict(int),
'top_uris': defaultdict(int),
'threat_types': defaultdict(int),
'countries': defaultdict(int),
'last_seen': None
})
self._load_geoip()
self._load_blocked_ips()
self._load_autoban_config()
ctx.log.info("SecuBox Analytics addon v2.2 loaded - Enhanced threat detection with sensitivity-based auto-ban")
ctx.log.info("SecuBox Analytics addon v2.3 loaded - Enhanced threat detection with subdomain metrics")
def _load_geoip(self):
"""Load GeoIP database if available"""
@ -518,6 +533,134 @@ class SecuBoxAnalytics:
except Exception as e:
ctx.log.debug(f"Could not load blocked IPs: {e}")
def _extract_subdomain(self, host: str) -> tuple:
"""
Extract subdomain and base domain from host.
Returns (subdomain, base_domain) tuple.
Examples:
'api.example.com' -> ('api', 'example.com')
'www.blog.example.com' -> ('www.blog', 'example.com')
'example.com' -> ('', 'example.com')
'192.168.1.1' -> ('', '192.168.1.1')
"""
if not host:
return ('', 'unknown')
# Remove port if present
host = host.split(':')[0].lower()
# Check if it's an IP address
if re.match(r'^\d+\.\d+\.\d+\.\d+$', host):
return ('', host)
parts = host.split('.')
# Handle common TLDs (2-part TLDs like co.uk, com.au, etc.)
two_part_tlds = ['co.uk', 'com.au', 'co.nz', 'org.uk', 'net.au', 'gov.uk',
'com.br', 'co.jp', 'co.kr', 'co.in', 'org.au']
# Check for 2-part TLD
if len(parts) >= 3:
potential_tld = '.'.join(parts[-2:])
if potential_tld in two_part_tlds:
base = '.'.join(parts[-3:])
subdomain = '.'.join(parts[:-3]) if len(parts) > 3 else ''
return (subdomain, base)
# Standard case: last 2 parts are base domain
if len(parts) >= 2:
base = '.'.join(parts[-2:])
subdomain = '.'.join(parts[:-2]) if len(parts) > 2 else ''
return (subdomain, base)
return ('', host)
def _update_subdomain_metrics(self, entry: dict):
"""Update per-subdomain metrics"""
host = entry.get('host', 'unknown')
subdomain, base_domain = self._extract_subdomain(host)
# Use full subdomain identifier (subdomain.base or just base)
if subdomain:
full_subdomain = f"{subdomain}.{base_domain}"
else:
full_subdomain = base_domain
metrics = self.subdomain_metrics[full_subdomain]
# Basic counts
metrics['requests'] += 1
metrics['last_seen'] = entry.get('timestamp')
# Protocol (detect from scheme or port)
path = entry.get('path', '')
# Check if HTTPS (from routing or headers)
is_https = entry.get('headers', {}).get('x-forwarded-proto') == 'https'
protocol = 'https' if is_https else 'http'
metrics['protocols'][protocol] += 1
# HTTP method
method = entry.get('method', 'GET')
metrics['methods'][method] += 1
# Country
country = entry.get('country', 'XX')
metrics['countries'][country] += 1
# Track URI (normalize path, limit to first segment)
if path:
# Get first path segment for grouping
path_parts = path.split('?')[0].split('/')
if len(path_parts) > 1 and path_parts[1]:
normalized_path = '/' + path_parts[1]
if len(path_parts) > 2:
normalized_path += '/...'
else:
normalized_path = '/'
metrics['top_uris'][normalized_path] += 1
# Threat tracking
scan_data = entry.get('scan', {})
if scan_data.get('is_scan'):
metrics['threats'] += 1
threat_type = scan_data.get('type', 'unknown')
metrics['threat_types'][threat_type] += 1
# Write metrics periodically (every 50 requests per subdomain)
if metrics['requests'] % 50 == 0:
self._write_subdomain_metrics()
def _write_subdomain_metrics(self):
"""Write subdomain metrics to file"""
try:
# Convert defaultdicts to regular dicts for JSON serialization
output = {}
for subdomain, metrics in self.subdomain_metrics.items():
output[subdomain] = {
'requests': metrics['requests'],
'threats': metrics['threats'],
'protocols': dict(metrics['protocols']),
'methods': dict(metrics['methods']),
'status_codes': dict(metrics['status_codes']),
'countries': dict(metrics['countries']),
'threat_types': dict(metrics['threat_types']),
'last_seen': metrics['last_seen'],
# Keep only top 20 URIs
'top_uris': dict(sorted(
metrics['top_uris'].items(),
key=lambda x: x[1],
reverse=True
)[:20])
}
with open(SUBDOMAIN_METRICS_FILE, 'w') as f:
json.dump({
'updated': datetime.utcnow().isoformat() + 'Z',
'subdomains': output
}, f)
except Exception as e:
ctx.log.error(f"Failed to write subdomain metrics: {e}")
def _load_autoban_config(self):
"""Load auto-ban configuration from host"""
try:
@ -1284,6 +1427,7 @@ class SecuBoxAnalytics:
# Update statistics
self._update_stats(entry)
self._update_subdomain_metrics(entry)
# Log and alert based on severity
if scan_result.get('is_scan'):
@ -1423,6 +1567,14 @@ class SecuBoxAnalytics:
if cache_hit is not None:
ctx.log.debug(f"CACHE {'HIT' if cache_hit else 'MISS'}: {entry['path']} ({entry['response_time_ms']}ms)")
# Update subdomain status code metrics
host = entry.get('host', 'unknown')
subdomain, base_domain = self._extract_subdomain(host)
full_subdomain = f"{subdomain}.{base_domain}" if subdomain else base_domain
if full_subdomain in self.subdomain_metrics:
status_bucket = f"{response.status_code // 100}xx"
self.subdomain_metrics[full_subdomain]['status_codes'][status_bucket] += 1
# Log failed auth attempts (4xx on auth paths)
if entry['is_auth_attempt'] and 400 <= response.status_code < 500:
ctx.log.warn(f"AUTH FAILED: {entry['client_ip']} ({entry['country']}) - {response.status_code}")