secubox-openwrt/secubox-tools/webui/app/backends/metrics.py
CyberMind-FR 0d6aaa1111 feat(webui): add Project Hub workspace and remove Command Center glow effects
- Add complete Project Hub & Workspace Interface implementation
  - New data models: Project, ModuleKit, Workspace
  - 3 fixture projects (cybermind.fr, cybermood.eu, secubox-c3)
  - 4 module kits (Security, Network, Automation, Media)
  - Workspace routes with project switching and kit installation
  - 4 workspace tabs: Overview, Module Kits, Devices, Composer
  - New navigation item: Workspace (7th section)

- Remove all glowing effects from UI
  - Remove Command Center widget glow and backdrop blur
  - Remove device status indicator glow
  - Remove toggle button glow effects

- Extend DataStore with 13 new methods for workspace management
- Add 270+ lines of workspace-specific CSS with responsive layouts
- Create workspace templates and result partials

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-03 08:10:22 +01:00

233 lines
8.5 KiB
Python

"""
Real-time metrics collection for Command Center.
Provides simulated data for virtualized backend and fallback data
for HTTP backend when real metrics aren't available.
"""
from typing import Dict, Any
import random
import time
from datetime import datetime
class MetricsCollector:
"""Generates realistic system metrics for visualization."""
@staticmethod
async def get_system_metrics() -> Dict[str, Any]:
"""
Collect CPU, memory, and network metrics.
Returns:
dict: System metrics including CPU usage, memory stats, and network throughput
"""
# Simulate realistic CPU usage with some variability
cpu_usage = random.randint(15, 75) + random.random() * 10
# Simulate per-core usage
cores = [
max(0, min(100, cpu_usage + random.randint(-20, 20)))
for _ in range(4)
]
# Memory stats
total_memory = 4096 # 4GB
used_memory = random.randint(1024, 3072)
memory_percent = (used_memory / total_memory) * 100
# Network throughput (bytes per second)
rx_rate = random.randint(50, 500) * 1024 # 50KB-500KB/s download
tx_rate = random.randint(20, 200) * 1024 # 20KB-200KB/s upload
return {
'cpu': {
'usage': round(cpu_usage, 2),
'cores': [round(c, 2) for c in cores],
'temperature': random.randint(45, 75) # Celsius
},
'memory': {
'used': used_memory,
'total': total_memory,
'percent': round(memory_percent, 2),
'available': total_memory - used_memory
},
'network': {
'rx_bytes': random.randint(1000000, 10000000),
'tx_bytes': random.randint(500000, 5000000),
'rx_rate': rx_rate,
'tx_rate': tx_rate,
'connections': random.randint(10, 50)
},
'uptime': random.randint(100000, 500000), # seconds
'load_avg': [
round(cpu_usage / 100 * 4, 2), # 1min
round(cpu_usage / 100 * 4 * 0.9, 2), # 5min
round(cpu_usage / 100 * 4 * 0.8, 2) # 15min
]
}
@staticmethod
async def get_threat_feed() -> Dict[str, Any]:
"""
Collect security threat events from firewall/IDS.
Returns:
dict: Threat events and counters
"""
# Threat event templates
threat_templates = [
{
'ip': f'192.168.{random.randint(1, 254)}.{random.randint(1, 254)}',
'severity': 'high',
'action': 'blocked',
'reason': 'SSH brute force attempt'
},
{
'ip': f'10.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}',
'severity': 'medium',
'action': 'quarantined',
'reason': 'Port scan detected'
},
{
'ip': f'172.{random.randint(16, 31)}.{random.randint(0, 255)}.{random.randint(1, 254)}',
'severity': 'high',
'action': 'blocked',
'reason': 'SQL injection attempt'
},
{
'ip': f'192.168.{random.randint(1, 254)}.{random.randint(1, 254)}',
'severity': 'medium',
'action': 'blocked',
'reason': 'Malformed HTTP request'
},
{
'ip': f'10.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}',
'severity': 'low',
'action': 'allowed',
'reason': 'Rate limit exceeded'
},
{
'ip': f'203.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}',
'severity': 'high',
'action': 'blocked',
'reason': 'Known botnet IP'
}
]
# Generate 0-3 recent events
num_events = random.randint(0, 3)
recent_events = []
for _ in range(num_events):
event = random.choice(threat_templates).copy()
# Add timestamp
now = datetime.now()
event['time'] = now.strftime('%H:%M:%S')
event['timestamp'] = now.isoformat()
recent_events.append(event)
# Sort by time (most recent first)
recent_events.sort(key=lambda x: x['timestamp'], reverse=True)
return {
'recent_events': recent_events,
'counters': {
'blocked': random.randint(150, 250),
'allowed': random.randint(4500, 5500),
'quarantined': random.randint(0, 8)
},
'threat_level': 'moderate' if num_events > 1 else 'low',
'last_update': datetime.now().isoformat()
}
@staticmethod
async def get_traffic_stats() -> Dict[str, Any]:
"""
Collect network traffic classification and statistics.
Returns:
dict: Protocol distribution and top domains
"""
# Protocol distribution (percentages)
total = 100
https_percent = random.randint(50, 70)
dns_percent = random.randint(10, 20)
ssh_percent = random.randint(5, 15)
other_percent = max(0, total - https_percent - dns_percent - ssh_percent)
protocols = {
'https': https_percent,
'dns': dns_percent,
'ssh': ssh_percent,
'http': max(0, other_percent - 5),
'other': max(0, 5)
}
# Top domain templates
domain_templates = [
{'domain': 'api.openai.com', 'category': 'cloud'},
{'domain': 'github.com', 'category': 'development'},
{'domain': 'stackoverflow.com', 'category': 'development'},
{'domain': 'reddit.com', 'category': 'social'},
{'domain': 'youtube.com', 'category': 'media'},
{'domain': 'cloudflare.com', 'category': 'cdn'},
{'domain': 'google.com', 'category': 'search'},
{'domain': 'aws.amazon.com', 'category': 'cloud'},
{'domain': 'microsoft.com', 'category': 'software'},
{'domain': 'docker.io', 'category': 'containers'}
]
# Select 3-5 random domains
num_domains = random.randint(3, 5)
selected_domains = random.sample(domain_templates, num_domains)
# Assign random bytes to each domain
top_domains = [
{
**domain,
'bytes': random.randint(100000, 5000000),
'packets': random.randint(100, 10000),
'connections': random.randint(1, 20)
}
for domain in selected_domains
]
# Sort by bytes (descending)
top_domains.sort(key=lambda x: x['bytes'], reverse=True)
return {
'protocols': protocols,
'top_domains': top_domains,
'total_bandwidth': sum(d['bytes'] for d in top_domains),
'active_connections': random.randint(15, 50),
'last_update': datetime.now().isoformat()
}
@staticmethod
async def get_command_output(command: str) -> str:
"""
Generate simulated command output.
Args:
command: The command to execute
Returns:
str: Simulated command output
"""
timestamp = datetime.now().strftime('[%H:%M:%S]')
# Simulate different command outputs
if command.startswith('uname'):
return f"{timestamp} Linux secubox 5.15.0-openwrt #0 SMP PREEMPT\n"
elif command.startswith('uptime'):
return f"{timestamp} 12:34:56 up 3 days, 5:23, 2 users, load average: 0.45, 0.32, 0.28\n"
elif command.startswith('free'):
return f"{timestamp}\n total used free shared buff/cache available\nMem: 4096000 2048000 1024000 128000 1024000 1920000\nSwap: 0 0 0\n"
elif command.startswith('df'):
return f"{timestamp}\nFilesystem Size Used Avail Use% Mounted on\n/dev/root 32G 12G 18G 40% /\n"
elif command.startswith('ps'):
return f"{timestamp}\n PID TTY TIME CMD\n 1 ? 00:00:01 init\n 412 ? 00:00:00 uhttpd\n 528 ? 00:00:00 dnsmasq\n"
else:
return f"{timestamp} Command '{command}' executed successfully\n"