#!/usr/bin/env python3 """ VPS Monitor Backend — serveur central. Agrège les données de tous les agents et expose une API REST pour le frontend. """ import asyncio import json import os import secrets import sqlite3 import time from collections import deque from contextlib import contextmanager from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Annotated import bcrypt as _bcrypt import aiohttp from fastapi import Depends, FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import JWTError, jwt from pydantic import BaseModel # ─── Config ─────────────────────────────────────────────────────────────────── DB_FILE = Path(os.getenv("DB_FILE", "data/monitor.db")) # ─── Ring buffer de stats (en mémoire) ─────────────────────────────────────── _STATS_MAX_POINTS = 120 # 10 min à 5 s d'intervalle _stats_history: dict[str, deque] = {} SECRET_FILE = Path(os.getenv("SECRET_FILE", "data/.jwt_secret")) AGENT_TIMEOUT = int(os.getenv("AGENT_TIMEOUT", "5")) JWT_ALGORITHM = "HS256" JWT_EXPIRE_MIN = int(os.getenv("JWT_EXPIRE_MINUTES", "1440")) # 24 h DB_FILE.parent.mkdir(parents=True, exist_ok=True) # Clé JWT : env var > fichier persisté > génération + sauvegarde def _load_jwt_secret() -> str: env = os.getenv("JWT_SECRET") if env: return env if SECRET_FILE.exists(): return SECRET_FILE.read_text().strip() secret = secrets.token_hex(32) SECRET_FILE.write_text(secret) SECRET_FILE.chmod(0o600) return secret JWT_SECRET = _load_jwt_secret() bearer_scheme = HTTPBearer() # ─── Modèles ────────────────────────────────────────────────────────────────── class VpsConfig(BaseModel): id: str name: str host: str port: int = 8001 api_key: str description: str = "" tags: list[str] = [] class ActionRequest(BaseModel): action: str # start | stop | restart class VpsUpdateRequest(BaseModel): name: str host: str port: int = 8001 api_key: str = "" # vide = conserver la clé existante description: str = "" tags: list[str] = [] class ComposeUpdateRequest(BaseModel): project: str class RegisterRequest(BaseModel): username: str password: str class LoginRequest(BaseModel): username: str password: str # ─── SQLite ─────────────────────────────────────────────────────────────────── @contextmanager def get_db(): conn = sqlite3.connect(DB_FILE, check_same_thread=False) conn.row_factory = sqlite3.Row try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() def init_db() -> None: with get_db() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS users ( username TEXT PRIMARY KEY, password TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'admin' ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS vps ( id TEXT PRIMARY KEY, name TEXT NOT NULL, host TEXT NOT NULL, port INTEGER NOT NULL DEFAULT 8001, api_key TEXT NOT NULL, description TEXT NOT NULL DEFAULT '', tags TEXT NOT NULL DEFAULT '[]' ) """) # Migration : ajoute la colonne tags si elle n'existe pas encore try: conn.execute("ALTER TABLE vps ADD COLUMN tags TEXT NOT NULL DEFAULT '[]'") except Exception: pass # colonne déjà présente conn.execute(""" CREATE TABLE IF NOT EXISTS vps_stats ( id INTEGER PRIMARY KEY AUTOINCREMENT, vps_id TEXT NOT NULL, ts INTEGER NOT NULL, cpu REAL NOT NULL, ram_percent REAL NOT NULL, ram_used INTEGER NOT NULL, ram_total INTEGER NOT NULL, net_sent_per_sec REAL NOT NULL DEFAULT 0, net_recv_per_sec REAL NOT NULL DEFAULT 0, net_bytes_sent INTEGER NOT NULL DEFAULT 0, net_bytes_recv INTEGER NOT NULL DEFAULT 0 ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_vps_stats ON vps_stats(vps_id, ts DESC) """) init_db() # ─── Persistance ────────────────────────────────────────────────────────────── def load_users() -> list[dict]: with get_db() as conn: return [dict(r) for r in conn.execute("SELECT * FROM users").fetchall()] def add_user(user: dict) -> None: with get_db() as conn: conn.execute( "INSERT INTO users (username, password, role) VALUES (?, ?, ?)", (user["username"], user["password"], user["role"]), ) def load_vps() -> list[dict]: with get_db() as conn: rows = [dict(r) for r in conn.execute("SELECT * FROM vps").fetchall()] for row in rows: try: row["tags"] = json.loads(row.get("tags") or "[]") except Exception: row["tags"] = [] return rows def insert_vps(vps: dict) -> None: with get_db() as conn: conn.execute( "INSERT INTO vps (id, name, host, port, api_key, description, tags) VALUES (?, ?, ?, ?, ?, ?, ?)", (vps["id"], vps["name"], vps["host"], vps["port"], vps["api_key"], vps.get("description", ""), json.dumps(vps.get("tags", []))), ) def remove_vps(vps_id: str) -> bool: with get_db() as conn: cur = conn.execute("DELETE FROM vps WHERE id = ?", (vps_id,)) return cur.rowcount > 0 def update_vps(vps_id: str, data: dict) -> bool: with get_db() as conn: cur = conn.execute( "UPDATE vps SET name=?, host=?, port=?, api_key=?, description=?, tags=? WHERE id=?", (data["name"], data["host"], data["port"], data["api_key"], data["description"], json.dumps(data.get("tags", [])), vps_id), ) return cur.rowcount > 0 # ─── Auth helpers ───────────────────────────────────────────────────────────── def create_token(username: str, role: str) -> str: payload = { "sub": username, "role": role, "exp": datetime.now(timezone.utc) + timedelta(minutes=JWT_EXPIRE_MIN), } return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) def get_current_user( credentials: Annotated[HTTPAuthorizationCredentials, Depends(bearer_scheme)] ) -> dict: try: payload = jwt.decode(credentials.credentials, JWT_SECRET, algorithms=[JWT_ALGORITHM]) username = payload.get("sub") if not username: raise HTTPException(status_code=401, detail="Token invalide") except JWTError: raise HTTPException(status_code=401, detail="Token invalide ou expiré") user = next((u for u in load_users() if u["username"] == username), None) if not user: raise HTTPException(status_code=401, detail="Utilisateur introuvable") return user # ─── App ────────────────────────────────────────────────────────────────────── app = FastAPI(title="VPS Monitor Backend", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=os.getenv("CORS_ORIGINS", "*").split(","), allow_methods=["*"], allow_headers=["*"], ) # ─── Helpers HTTP ───────────────────────────────────────────────────────────── async def agent_get(vps: dict, path: str): url = f"http://{vps['host']}:{vps['port']}{path}" headers = {"X-API-Key": vps["api_key"]} timeout = aiohttp.ClientTimeout(total=AGENT_TIMEOUT) async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, timeout=timeout) as r: r.raise_for_status() return await r.json() async def agent_post(vps: dict, path: str, payload: dict | None = None): url = f"http://{vps['host']}:{vps['port']}{path}" headers = {"X-API-Key": vps["api_key"]} timeout = aiohttp.ClientTimeout(total=AGENT_TIMEOUT) async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=payload, timeout=timeout) as r: r.raise_for_status() return await r.json() async def fetch_vps_status(vps: dict) -> dict: """Interroge un agent et retourne son état complet. Les données système (CPU, RAM, réseau) proviennent en priorité du ring buffer alimenté par le collecteur toutes les 5 s — même source que le modal de stats. Si le buffer est encore vide (premier démarrage), un appel direct est fait. """ try: containers_res = await agent_get(vps, "/containers") # Source unique : ring buffer → carte et modal affichent exactement la même valeur history = _stats_history.get(vps["id"]) if history: last = history[-1] system = { "cpu_percent": last["cpu"], "ram_used": last["ram_used"], "ram_total": last["ram_total"], "ram_percent": last["ram_percent"], "net_sent_per_sec": last["net_sent_per_sec"], "net_recv_per_sec": last["net_recv_per_sec"], } else: # Buffer vide (premier démarrage) — appel direct en fallback try: system = await agent_get(vps, "/system") except Exception: system = None return { "id": vps["id"], "name": vps["name"], "host": vps["host"], "description": vps.get("description", ""), "online": True, "containers": containers_res, "system": system, "tags": vps.get("tags", []), } except Exception as e: return { "id": vps["id"], "name": vps["name"], "host": vps["host"], "description": vps.get("description", ""), "online": False, "error": str(e), "containers": [], "system": None, "tags": vps.get("tags", []), } # ─── Collecteur de stats (tâche de fond) ───────────────────────────────────── async def _fetch_system_stat(vps: dict) -> None: """Collecte un point de stats système pour un VPS et le persiste (ring buffer + SQLite).""" try: data = await agent_get(vps, "/system") now = int(time.time()) # Ring buffer en mémoire (source pour la carte VPS — valeur courante) buf = _stats_history.setdefault(vps["id"], deque(maxlen=_STATS_MAX_POINTS)) buf.append({ "ts": datetime.fromtimestamp(now, tz=timezone.utc).isoformat(), "cpu": data["cpu_percent"], "ram_percent": data["ram_percent"], "ram_used": data["ram_used"], "ram_total": data["ram_total"], "net_sent_per_sec": data["net_sent_per_sec"], "net_recv_per_sec": data["net_recv_per_sec"], "net_bytes_sent": data.get("net_bytes_sent", 0), "net_bytes_recv": data.get("net_bytes_recv", 0), }) # Persistance SQLite (historique long terme) with get_db() as conn: conn.execute(""" INSERT INTO vps_stats (vps_id, ts, cpu, ram_percent, ram_used, ram_total, net_sent_per_sec, net_recv_per_sec, net_bytes_sent, net_bytes_recv) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( vps["id"], now, data["cpu_percent"], data["ram_percent"], data["ram_used"], data["ram_total"], data["net_sent_per_sec"], data["net_recv_per_sec"], data.get("net_bytes_sent", 0), data.get("net_bytes_recv", 0), )) except Exception: pass # VPS hors ligne ou agent non mis à jour — ignoré silencieusement async def _stats_collector() -> None: """Tâche de fond : collecte les stats de tous les VPS toutes les 5 secondes.""" await asyncio.sleep(3) # laisse l'app finir de démarrer while True: vps_list = load_vps() await asyncio.gather( *[_fetch_system_stat(v) for v in vps_list], return_exceptions=True, ) await asyncio.sleep(5) @app.on_event("startup") async def startup_event() -> None: asyncio.create_task(_stats_collector()) asyncio.create_task(_cleanup_old_stats()) async def _cleanup_old_stats() -> None: """Supprime les statistiques de plus de 31 jours (s'exécute toutes les heures).""" while True: await asyncio.sleep(3600) cutoff = int(time.time()) - 31 * 24 * 3600 with get_db() as conn: conn.execute("DELETE FROM vps_stats WHERE ts < ?", (cutoff,)) # ─── Routes Auth ────────────────────────────────────────────────────────────── @app.get("/api/auth/status") def auth_status(): """Indique si des utilisateurs existent déjà (pour le frontend).""" return {"has_users": len(load_users()) > 0} @app.post("/api/auth/register", status_code=201) def register(body: RegisterRequest): """Enregistre le premier utilisateur (admin). Fermé ensuite.""" if len(load_users()) > 0: raise HTTPException( status_code=403, detail="L'enregistrement public est désactivé. Seul l'admin peut créer des comptes." ) if not body.username.strip() or len(body.password) < 6: raise HTTPException(status_code=422, detail="Mot de passe trop court (6 caractères min.)") user = { "username": body.username.strip(), "password": _bcrypt.hashpw(body.password.encode(), _bcrypt.gensalt()).decode(), "role": "admin", } add_user(user) token = create_token(user["username"], user["role"]) return {"access_token": token, "token_type": "bearer", "role": user["role"]} @app.post("/api/auth/login") def login(body: LoginRequest): """Authentifie un utilisateur et retourne un JWT.""" users = load_users() user = next((u for u in users if u["username"] == body.username), None) if not user or not _bcrypt.checkpw(body.password.encode(), user["password"].encode()): raise HTTPException(status_code=401, detail="Identifiants incorrects") token = create_token(user["username"], user["role"]) return {"access_token": token, "token_type": "bearer", "role": user["role"]} @app.get("/api/auth/me") def me(current_user: Annotated[dict, Depends(get_current_user)]): return {"username": current_user["username"], "role": current_user["role"]} # ─── Routes VPS ─────────────────────────────────────────────────────────────── @app.get("/api/vps") def list_vps(_: Annotated[dict, Depends(get_current_user)]): """Liste les VPS configurés (sans les clés API).""" return [ {"id": v["id"], "name": v["name"], "host": v["host"], "description": v.get("description", ""), "tags": v.get("tags", [])} for v in load_vps() ] @app.post("/api/vps", status_code=201) def add_vps(vps: VpsConfig, _: Annotated[dict, Depends(get_current_user)]): """Ajoute un nouveau VPS.""" if any(v["id"] == vps.id for v in load_vps()): raise HTTPException(status_code=409, detail="Un VPS avec cet ID existe déjà") insert_vps(vps.model_dump()) return {"status": "ok", "id": vps.id} @app.delete("/api/vps/{vps_id}") def delete_vps(vps_id: str, _: Annotated[dict, Depends(get_current_user)]): """Supprime un VPS de la configuration.""" if not remove_vps(vps_id): raise HTTPException(status_code=404, detail="VPS introuvable") return {"status": "ok"} @app.put("/api/vps/{vps_id}") def edit_vps(vps_id: str, body: VpsUpdateRequest, _: Annotated[dict, Depends(get_current_user)]): """Met à jour les paramètres d'un VPS (name, host, port, api_key, description).""" existing = next((v for v in load_vps() if v["id"] == vps_id), None) if not existing: raise HTTPException(status_code=404, detail="VPS introuvable") data = { "name": body.name, "host": body.host, "port": body.port, "api_key": body.api_key.strip() or existing["api_key"], "description": body.description, "tags": body.tags, } update_vps(vps_id, data) return {"status": "ok"} @app.get("/api/vps/{vps_id}/stats") def get_vps_stats( vps_id: str, duration: int = 600, # secondes, défaut 10 min _: Annotated[dict, Depends(get_current_user)] = None, ): """Retourne l'historique des stats système d'un VPS avec downsampling automatique. Le paramètre `duration` (en secondes) détermine la fenêtre temporelle. La réponse contient toujours ~300 points maximum (agrégation par bucket SQL). """ if not any(v["id"] == vps_id for v in load_vps()): raise HTTPException(status_code=404, detail="VPS introuvable") duration = max(60, min(duration, 31 * 24 * 3600)) # clamp 1 min – 31 j since = int(time.time()) - duration bucket = max(5, duration // 300) # ≤ 300 points retournés with get_db() as conn: rows = conn.execute(""" SELECT (ts / :b) * :b AS ts, AVG(cpu) AS cpu, AVG(ram_percent) AS ram_percent, CAST(AVG(ram_used) AS INTEGER) AS ram_used, MAX(ram_total) AS ram_total, AVG(net_sent_per_sec) AS net_sent_per_sec, AVG(net_recv_per_sec) AS net_recv_per_sec, MAX(net_bytes_sent) AS net_bytes_sent, MAX(net_bytes_recv) AS net_bytes_recv FROM vps_stats WHERE vps_id = :vps_id AND ts >= :since GROUP BY (ts / :b) ORDER BY ts ASC """, {"b": bucket, "vps_id": vps_id, "since": since}).fetchall() return [ { "ts": datetime.fromtimestamp(int(row["ts"]), tz=timezone.utc).isoformat(), "cpu": row["cpu"], "ram_percent": row["ram_percent"], "ram_used": row["ram_used"], "ram_total": row["ram_total"], "net_sent_per_sec": row["net_sent_per_sec"], "net_recv_per_sec": row["net_recv_per_sec"], "net_bytes_sent": row["net_bytes_sent"], "net_bytes_recv": row["net_bytes_recv"], } for row in rows ] @app.get("/api/status") async def all_status(_: Annotated[dict, Depends(get_current_user)]): """Retourne l'état de tous les VPS en parallèle.""" vps_list = load_vps() results = await asyncio.gather(*[fetch_vps_status(v) for v in vps_list]) return list(results) @app.get("/api/vps/{vps_id}/status") async def vps_status(vps_id: str, _: Annotated[dict, Depends(get_current_user)]): """Retourne l'état d'un VPS spécifique.""" vps = next((v for v in load_vps() if v["id"] == vps_id), None) if not vps: raise HTTPException(status_code=404, detail="VPS introuvable") return await fetch_vps_status(vps) @app.get("/api/vps/{vps_id}/containers/{container_id}/logs") async def container_logs( vps_id: str, container_id: str, lines: int = 100, _: Annotated[dict, Depends(get_current_user)] = None ): """Récupère les logs d'un conteneur via l'agent.""" vps = next((v for v in load_vps() if v["id"] == vps_id), None) if not vps: raise HTTPException(status_code=404, detail="VPS introuvable") try: return await agent_get(vps, f"/containers/{container_id}/logs?lines={lines}") except Exception as e: raise HTTPException(status_code=502, detail=str(e)) @app.post("/api/vps/{vps_id}/containers/{container_id}/action") async def container_action( vps_id: str, container_id: str, body: ActionRequest, _: Annotated[dict, Depends(get_current_user)] = None ): """Effectue une action sur un conteneur via l'agent.""" vps = next((v for v in load_vps() if v["id"] == vps_id), None) if not vps: raise HTTPException(status_code=404, detail="VPS introuvable") try: return await agent_post(vps, f"/containers/{container_id}/action?action={body.action}") except Exception as e: raise HTTPException(status_code=502, detail=str(e)) @app.post("/api/vps/{vps_id}/compose/update") async def compose_update( vps_id: str, body: ComposeUpdateRequest, _: Annotated[dict, Depends(get_current_user)] = None ): """Lance docker compose pull + up sur un projet via l'agent.""" vps = next((v for v in load_vps() if v["id"] == vps_id), None) if not vps: raise HTTPException(status_code=404, detail="VPS introuvable") try: url = f"http://{vps['host']}:{vps['port']}/compose/update?project={body.project}" headers = {"X-API-Key": vps["api_key"]} timeout = aiohttp.ClientTimeout(total=600) async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, timeout=timeout) as r: r.raise_for_status() return await r.json() except Exception as e: raise HTTPException(status_code=502, detail=str(e))