diff --git a/package/secubox/luci-app-mitmproxy/root/usr/libexec/rpcd/luci.mitmproxy b/package/secubox/luci-app-mitmproxy/root/usr/libexec/rpcd/luci.mitmproxy index d03c3672..9317e2fd 100755 --- a/package/secubox/luci-app-mitmproxy/root/usr/libexec/rpcd/luci.mitmproxy +++ b/package/secubox/luci-app-mitmproxy/root/usr/libexec/rpcd/luci.mitmproxy @@ -531,6 +531,34 @@ get_threat_stats() { EOFJ } +get_subdomain_metrics() { + local metrics_file="/tmp/secubox-subdomain-metrics.json" + local subdomain_metrics="" + + # Try to get metrics from LXC container + if command -v lxc-attach >/dev/null 2>&1; then + subdomain_metrics=$(lxc-attach -n "$LXC_NAME" -- cat /tmp/secubox-subdomain-metrics.json 2>/dev/null) + fi + + # Fall back to host path + if [ -z "$subdomain_metrics" ]; then + [ -f "$metrics_file" ] && subdomain_metrics=$(cat "$metrics_file" 2>/dev/null) + fi + + # Default empty metrics + if [ -z "$subdomain_metrics" ]; then + subdomain_metrics='{"updated":null,"subdomains":{}}' + fi + + cat < tuple: + """ + Extract subdomain and base domain from host. + Returns (subdomain, base_domain) tuple. + Examples: + 'api.example.com' -> ('api', 'example.com') + 'www.blog.example.com' -> ('www.blog', 'example.com') + 'example.com' -> ('', 'example.com') + '192.168.1.1' -> ('', '192.168.1.1') + """ + if not host: + return ('', 'unknown') + + # Remove port if present + host = host.split(':')[0].lower() + + # Check if it's an IP address + if re.match(r'^\d+\.\d+\.\d+\.\d+$', host): + return ('', host) + + parts = host.split('.') + + # Handle common TLDs (2-part TLDs like co.uk, com.au, etc.) + two_part_tlds = ['co.uk', 'com.au', 'co.nz', 'org.uk', 'net.au', 'gov.uk', + 'com.br', 'co.jp', 'co.kr', 'co.in', 'org.au'] + + # Check for 2-part TLD + if len(parts) >= 3: + potential_tld = '.'.join(parts[-2:]) + if potential_tld in two_part_tlds: + base = '.'.join(parts[-3:]) + subdomain = '.'.join(parts[:-3]) if len(parts) > 3 else '' + return (subdomain, base) + + # Standard case: last 2 parts are base domain + if len(parts) >= 2: + base = '.'.join(parts[-2:]) + subdomain = '.'.join(parts[:-2]) if len(parts) > 2 else '' + return (subdomain, base) + + return ('', host) + + def _update_subdomain_metrics(self, entry: dict): + """Update per-subdomain metrics""" + host = entry.get('host', 'unknown') + subdomain, base_domain = self._extract_subdomain(host) + + # Use full subdomain identifier (subdomain.base or just base) + if subdomain: + full_subdomain = f"{subdomain}.{base_domain}" + else: + full_subdomain = base_domain + + metrics = self.subdomain_metrics[full_subdomain] + + # Basic counts + metrics['requests'] += 1 + metrics['last_seen'] = entry.get('timestamp') + + # Protocol (detect from scheme or port) + path = entry.get('path', '') + # Check if HTTPS (from routing or headers) + is_https = entry.get('headers', {}).get('x-forwarded-proto') == 'https' + protocol = 'https' if is_https else 'http' + metrics['protocols'][protocol] += 1 + + # HTTP method + method = entry.get('method', 'GET') + metrics['methods'][method] += 1 + + # Country + country = entry.get('country', 'XX') + metrics['countries'][country] += 1 + + # Track URI (normalize path, limit to first segment) + if path: + # Get first path segment for grouping + path_parts = path.split('?')[0].split('/') + if len(path_parts) > 1 and path_parts[1]: + normalized_path = '/' + path_parts[1] + if len(path_parts) > 2: + normalized_path += '/...' + else: + normalized_path = '/' + metrics['top_uris'][normalized_path] += 1 + + # Threat tracking + scan_data = entry.get('scan', {}) + if scan_data.get('is_scan'): + metrics['threats'] += 1 + threat_type = scan_data.get('type', 'unknown') + metrics['threat_types'][threat_type] += 1 + + # Write metrics periodically (every 50 requests per subdomain) + if metrics['requests'] % 50 == 0: + self._write_subdomain_metrics() + + def _write_subdomain_metrics(self): + """Write subdomain metrics to file""" + try: + # Convert defaultdicts to regular dicts for JSON serialization + output = {} + for subdomain, metrics in self.subdomain_metrics.items(): + output[subdomain] = { + 'requests': metrics['requests'], + 'threats': metrics['threats'], + 'protocols': dict(metrics['protocols']), + 'methods': dict(metrics['methods']), + 'status_codes': dict(metrics['status_codes']), + 'countries': dict(metrics['countries']), + 'threat_types': dict(metrics['threat_types']), + 'last_seen': metrics['last_seen'], + # Keep only top 20 URIs + 'top_uris': dict(sorted( + metrics['top_uris'].items(), + key=lambda x: x[1], + reverse=True + )[:20]) + } + + with open(SUBDOMAIN_METRICS_FILE, 'w') as f: + json.dump({ + 'updated': datetime.utcnow().isoformat() + 'Z', + 'subdomains': output + }, f) + except Exception as e: + ctx.log.error(f"Failed to write subdomain metrics: {e}") + def _load_autoban_config(self): """Load auto-ban configuration from host""" try: @@ -1284,6 +1427,7 @@ class SecuBoxAnalytics: # Update statistics self._update_stats(entry) + self._update_subdomain_metrics(entry) # Log and alert based on severity if scan_result.get('is_scan'): @@ -1423,6 +1567,14 @@ class SecuBoxAnalytics: if cache_hit is not None: ctx.log.debug(f"CACHE {'HIT' if cache_hit else 'MISS'}: {entry['path']} ({entry['response_time_ms']}ms)") + # Update subdomain status code metrics + host = entry.get('host', 'unknown') + subdomain, base_domain = self._extract_subdomain(host) + full_subdomain = f"{subdomain}.{base_domain}" if subdomain else base_domain + if full_subdomain in self.subdomain_metrics: + status_bucket = f"{response.status_code // 100}xx" + self.subdomain_metrics[full_subdomain]['status_codes'][status_bucket] += 1 + # Log failed auth attempts (4xx on auth paths) if entry['is_auth_attempt'] and 400 <= response.status_code < 500: ctx.log.warn(f"AUTH FAILED: {entry['client_ip']} ({entry['country']}) - {response.status_code}")