- Add complete Project Hub & Workspace Interface implementation - New data models: Project, ModuleKit, Workspace - 3 fixture projects (cybermind.fr, cybermood.eu, secubox-c3) - 4 module kits (Security, Network, Automation, Media) - Workspace routes with project switching and kit installation - 4 workspace tabs: Overview, Module Kits, Devices, Composer - New navigation item: Workspace (7th section) - Remove all glowing effects from UI - Remove Command Center widget glow and backdrop blur - Remove device status indicator glow - Remove toggle button glow effects - Extend DataStore with 13 new methods for workspace management - Add 270+ lines of workspace-specific CSS with responsive layouts - Create workspace templates and result partials 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
269 lines
8.0 KiB
Python
269 lines
8.0 KiB
Python
"""
|
|
WebSocket endpoints for real-time Command Center data streaming.
|
|
|
|
Handles real-time metrics, threat feeds, traffic stats, and command execution
|
|
streaming to connected clients.
|
|
"""
|
|
|
|
from fastapi import WebSocket, WebSocketDisconnect
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Set, TYPE_CHECKING
|
|
from datetime import datetime
|
|
|
|
if TYPE_CHECKING:
|
|
from .services import DataStore
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CommandCenterManager:
|
|
"""Manages WebSocket connections for the Command Center."""
|
|
|
|
def __init__(self):
|
|
self.active_connections: Set[WebSocket] = set()
|
|
|
|
async def connect(self, websocket: WebSocket):
|
|
"""Accept and register a new WebSocket connection."""
|
|
await websocket.accept()
|
|
self.active_connections.add(websocket)
|
|
logger.info(f"Command Center client connected. Total clients: {len(self.active_connections)}")
|
|
|
|
def disconnect(self, websocket: WebSocket):
|
|
"""Remove a WebSocket connection."""
|
|
self.active_connections.discard(websocket)
|
|
logger.info(f"Command Center client disconnected. Total clients: {len(self.active_connections)}")
|
|
|
|
async def broadcast(self, message: dict):
|
|
"""Broadcast a message to all connected clients."""
|
|
dead_connections = set()
|
|
for connection in self.active_connections:
|
|
try:
|
|
await connection.send_json(message)
|
|
except Exception as e:
|
|
logger.error(f"Failed to send to client: {e}")
|
|
dead_connections.add(connection)
|
|
|
|
# Clean up dead connections
|
|
self.active_connections -= dead_connections
|
|
|
|
|
|
# Global manager instance
|
|
manager = CommandCenterManager()
|
|
|
|
|
|
async def stream_metrics(ws: WebSocket, store: "DataStore"):
|
|
"""
|
|
Stream system metrics every 2 seconds.
|
|
|
|
Args:
|
|
ws: WebSocket connection
|
|
store: DataStore instance for backend access
|
|
"""
|
|
while True:
|
|
try:
|
|
backend = store.get_backend()
|
|
metrics = await backend.get_system_metrics()
|
|
|
|
await ws.send_json({
|
|
'type': 'metrics',
|
|
'timestamp': datetime.now().timestamp(),
|
|
'data': metrics
|
|
})
|
|
except WebSocketDisconnect:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error streaming metrics: {e}")
|
|
# Continue streaming despite errors
|
|
pass
|
|
|
|
await asyncio.sleep(2) # Update every 2 seconds
|
|
|
|
|
|
async def stream_threats(ws: WebSocket, store: "DataStore"):
|
|
"""
|
|
Stream threat feed every 5 seconds.
|
|
|
|
Args:
|
|
ws: WebSocket connection
|
|
store: DataStore instance for backend access
|
|
"""
|
|
while True:
|
|
try:
|
|
backend = store.get_backend()
|
|
threats = await backend.get_threat_feed()
|
|
|
|
await ws.send_json({
|
|
'type': 'threats',
|
|
'timestamp': datetime.now().timestamp(),
|
|
'data': threats
|
|
})
|
|
except WebSocketDisconnect:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error streaming threats: {e}")
|
|
pass
|
|
|
|
await asyncio.sleep(5) # Update every 5 seconds
|
|
|
|
|
|
async def stream_traffic(ws: WebSocket, store: "DataStore"):
|
|
"""
|
|
Stream traffic stats every 3 seconds.
|
|
|
|
Args:
|
|
ws: WebSocket connection
|
|
store: DataStore instance for backend access
|
|
"""
|
|
while True:
|
|
try:
|
|
backend = store.get_backend()
|
|
traffic = await backend.get_traffic_stats()
|
|
|
|
await ws.send_json({
|
|
'type': 'traffic',
|
|
'timestamp': datetime.now().timestamp(),
|
|
'data': traffic
|
|
})
|
|
except WebSocketDisconnect:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error streaming traffic: {e}")
|
|
pass
|
|
|
|
await asyncio.sleep(3) # Update every 3 seconds
|
|
|
|
|
|
async def handle_command_execution(ws: WebSocket, command: str, store: "DataStore"):
|
|
"""
|
|
Execute a command and stream output back to client.
|
|
|
|
Args:
|
|
ws: WebSocket connection
|
|
command: Command string to execute
|
|
store: DataStore instance for backend access
|
|
"""
|
|
try:
|
|
# Send execution start acknowledgment
|
|
await ws.send_json({
|
|
'type': 'command_output',
|
|
'data': {
|
|
'command': command,
|
|
'output': f'[{datetime.now().strftime("%H:%M:%S")}] Executing: {command}\n',
|
|
'status': 'running'
|
|
}
|
|
})
|
|
|
|
# Execute command via backend
|
|
backend = store.get_backend()
|
|
output = await backend.get_command_output(command)
|
|
|
|
# Send command output
|
|
await ws.send_json({
|
|
'type': 'command_output',
|
|
'data': {
|
|
'command': command,
|
|
'output': output,
|
|
'status': 'completed'
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing command '{command}': {e}")
|
|
await ws.send_json({
|
|
'type': 'command_output',
|
|
'data': {
|
|
'command': command,
|
|
'output': f'[ERROR] {str(e)}\n',
|
|
'status': 'error'
|
|
}
|
|
})
|
|
|
|
|
|
async def command_center_websocket(websocket: WebSocket, store: "DataStore"):
|
|
"""
|
|
Main WebSocket endpoint handler for Command Center.
|
|
|
|
Manages connection lifecycle and spawns background streaming tasks
|
|
for metrics, threats, and traffic data.
|
|
|
|
Args:
|
|
websocket: FastAPI WebSocket connection
|
|
store: DataStore instance
|
|
"""
|
|
await manager.connect(websocket)
|
|
|
|
# Create background streaming tasks
|
|
metrics_task = asyncio.create_task(stream_metrics(websocket, store))
|
|
threats_task = asyncio.create_task(stream_threats(websocket, store))
|
|
traffic_task = asyncio.create_task(stream_traffic(websocket, store))
|
|
|
|
try:
|
|
# Listen for client messages
|
|
while True:
|
|
# Receive JSON message from client
|
|
data = await websocket.receive_json()
|
|
|
|
message_type = data.get('type')
|
|
|
|
if message_type == 'subscribe':
|
|
# Client subscribing to streams (already active via background tasks)
|
|
logger.info(f"Client subscribed to streams: {data.get('streams', [])}")
|
|
await websocket.send_json({
|
|
'type': 'subscribed',
|
|
'streams': ['metrics', 'threats', 'traffic', 'commands']
|
|
})
|
|
|
|
elif message_type == 'execute':
|
|
# Execute command
|
|
command = data.get('command', '')
|
|
if command:
|
|
await handle_command_execution(websocket, command, store)
|
|
else:
|
|
await websocket.send_json({
|
|
'type': 'error',
|
|
'message': 'No command specified'
|
|
})
|
|
|
|
elif message_type == 'ping':
|
|
# Heartbeat/keepalive
|
|
await websocket.send_json({'type': 'pong'})
|
|
|
|
else:
|
|
logger.warning(f"Unknown message type: {message_type}")
|
|
await websocket.send_json({
|
|
'type': 'error',
|
|
'message': f'Unknown message type: {message_type}'
|
|
})
|
|
|
|
except WebSocketDisconnect:
|
|
logger.info("Client disconnected normally")
|
|
except Exception as e:
|
|
logger.error(f"WebSocket error: {e}")
|
|
finally:
|
|
# Clean up
|
|
manager.disconnect(websocket)
|
|
|
|
# Cancel background streaming tasks
|
|
metrics_task.cancel()
|
|
threats_task.cancel()
|
|
traffic_task.cancel()
|
|
|
|
try:
|
|
await metrics_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
try:
|
|
await threats_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
try:
|
|
await traffic_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
logger.info("WebSocket connection closed and tasks cleaned up")
|