#!/usr/bin/env python3 """ VPS Monitor Backend — serveur central. Agrège les données de tous les agents et expose une API REST pour le frontend. """ import asyncio import json import os import secrets from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Annotated import bcrypt as _bcrypt import aiohttp from fastapi import Depends, FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import JWTError, jwt from pydantic import BaseModel # ─── Config ─────────────────────────────────────────────────────────────────── CONFIG_FILE = Path(os.getenv("CONFIG_FILE", "data/vps.json")) USERS_FILE = Path(os.getenv("USERS_FILE", "data/users.json")) SECRET_FILE = Path(os.getenv("SECRET_FILE", "data/.jwt_secret")) AGENT_TIMEOUT = int(os.getenv("AGENT_TIMEOUT", "5")) JWT_ALGORITHM = "HS256" JWT_EXPIRE_MIN = int(os.getenv("JWT_EXPIRE_MINUTES", "1440")) # 24 h CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True) if not CONFIG_FILE.exists(): CONFIG_FILE.write_text("[]") if not USERS_FILE.exists(): USERS_FILE.write_text("[]") # Clé JWT : env var > fichier persisté > génération + sauvegarde def _load_jwt_secret() -> str: env = os.getenv("JWT_SECRET") if env: return env if SECRET_FILE.exists(): return SECRET_FILE.read_text().strip() secret = secrets.token_hex(32) SECRET_FILE.write_text(secret) SECRET_FILE.chmod(0o600) return secret JWT_SECRET = _load_jwt_secret() bearer_scheme = HTTPBearer() # ─── Modèles ────────────────────────────────────────────────────────────────── class VpsConfig(BaseModel): id: str name: str host: str port: int = 8001 api_key: str description: str = "" class ActionRequest(BaseModel): action: str # start | stop | restart class RegisterRequest(BaseModel): username: str password: str class LoginRequest(BaseModel): username: str password: str # ─── Persistance ────────────────────────────────────────────────────────────── def load_vps() -> list[dict]: return json.loads(CONFIG_FILE.read_text()) def save_vps(data: list[dict]) -> None: CONFIG_FILE.write_text(json.dumps(data, indent=2)) def load_users() -> list[dict]: return json.loads(USERS_FILE.read_text()) def save_users(data: list[dict]) -> None: USERS_FILE.write_text(json.dumps(data, indent=2)) # ─── Auth helpers ───────────────────────────────────────────────────────────── def create_token(username: str, role: str) -> str: payload = { "sub": username, "role": role, "exp": datetime.now(timezone.utc) + timedelta(minutes=JWT_EXPIRE_MIN), } return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) def get_current_user( credentials: Annotated[HTTPAuthorizationCredentials, Depends(bearer_scheme)] ) -> dict: try: payload = jwt.decode(credentials.credentials, JWT_SECRET, algorithms=[JWT_ALGORITHM]) username = payload.get("sub") if not username: raise HTTPException(status_code=401, detail="Token invalide") except JWTError: raise HTTPException(status_code=401, detail="Token invalide ou expiré") user = next((u for u in load_users() if u["username"] == username), None) if not user: raise HTTPException(status_code=401, detail="Utilisateur introuvable") return user # ─── App ────────────────────────────────────────────────────────────────────── app = FastAPI(title="VPS Monitor Backend", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=os.getenv("CORS_ORIGINS", "*").split(","), allow_methods=["*"], allow_headers=["*"], ) # ─── Helpers HTTP ───────────────────────────────────────────────────────────── async def agent_get(vps: dict, path: str): url = f"http://{vps['host']}:{vps['port']}{path}" headers = {"X-API-Key": vps["api_key"]} timeout = aiohttp.ClientTimeout(total=AGENT_TIMEOUT) async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, timeout=timeout) as r: r.raise_for_status() return await r.json() async def agent_post(vps: dict, path: str, payload: dict | None = None): url = f"http://{vps['host']}:{vps['port']}{path}" headers = {"X-API-Key": vps["api_key"]} timeout = aiohttp.ClientTimeout(total=AGENT_TIMEOUT) async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=payload, timeout=timeout) as r: r.raise_for_status() return await r.json() async def fetch_vps_status(vps: dict) -> dict: """Interroge un agent et retourne son état complet.""" try: containers = await agent_get(vps, "/containers") return { "id": vps["id"], "name": vps["name"], "host": vps["host"], "description": vps.get("description", ""), "online": True, "containers": containers, } except Exception as e: return { "id": vps["id"], "name": vps["name"], "host": vps["host"], "description": vps.get("description", ""), "online": False, "error": str(e), "containers": [], } # ─── Routes Auth ────────────────────────────────────────────────────────────── @app.get("/api/auth/status") def auth_status(): """Indique si des utilisateurs existent déjà (pour le frontend).""" return {"has_users": len(load_users()) > 0} @app.post("/api/auth/register", status_code=201) def register(body: RegisterRequest): """Enregistre le premier utilisateur (admin). Fermé ensuite.""" users = load_users() if len(users) > 0: raise HTTPException( status_code=403, detail="L'enregistrement public est désactivé. Seul l'admin peut créer des comptes." ) if not body.username.strip() or len(body.password) < 6: raise HTTPException(status_code=422, detail="Mot de passe trop court (6 caractères min.)") user = { "username": body.username.strip(), "password": _bcrypt.hashpw(body.password.encode(), _bcrypt.gensalt()).decode(), "role": "admin", } users.append(user) save_users(users) token = create_token(user["username"], user["role"]) return {"access_token": token, "token_type": "bearer", "role": user["role"]} @app.post("/api/auth/login") def login(body: LoginRequest): """Authentifie un utilisateur et retourne un JWT.""" users = load_users() user = next((u for u in users if u["username"] == body.username), None) if not user or not _bcrypt.checkpw(body.password.encode(), user["password"].encode()): raise HTTPException(status_code=401, detail="Identifiants incorrects") token = create_token(user["username"], user["role"]) return {"access_token": token, "token_type": "bearer", "role": user["role"]} @app.get("/api/auth/me") def me(current_user: Annotated[dict, Depends(get_current_user)]): return {"username": current_user["username"], "role": current_user["role"]} # ─── Routes VPS ─────────────────────────────────────────────────────────────── @app.get("/api/vps") def list_vps(_: Annotated[dict, Depends(get_current_user)]): """Liste les VPS configurés (sans les clés API).""" return [ {"id": v["id"], "name": v["name"], "host": v["host"], "description": v.get("description", "")} for v in load_vps() ] @app.post("/api/vps", status_code=201) def add_vps(vps: VpsConfig, _: Annotated[dict, Depends(get_current_user)]): """Ajoute un nouveau VPS.""" data = load_vps() if any(v["id"] == vps.id for v in data): raise HTTPException(status_code=409, detail="Un VPS avec cet ID existe déjà") data.append(vps.model_dump()) save_vps(data) return {"status": "ok", "id": vps.id} @app.delete("/api/vps/{vps_id}") def delete_vps(vps_id: str, _: Annotated[dict, Depends(get_current_user)]): """Supprime un VPS de la configuration.""" data = load_vps() filtered = [v for v in data if v["id"] != vps_id] if len(filtered) == len(data): raise HTTPException(status_code=404, detail="VPS introuvable") save_vps(filtered) return {"status": "ok"} @app.get("/api/status") async def all_status(_: Annotated[dict, Depends(get_current_user)]): """Retourne l'état de tous les VPS en parallèle.""" vps_list = load_vps() results = await asyncio.gather(*[fetch_vps_status(v) for v in vps_list]) return list(results) @app.get("/api/vps/{vps_id}/status") async def vps_status(vps_id: str, _: Annotated[dict, Depends(get_current_user)]): """Retourne l'état d'un VPS spécifique.""" vps = next((v for v in load_vps() if v["id"] == vps_id), None) if not vps: raise HTTPException(status_code=404, detail="VPS introuvable") return await fetch_vps_status(vps) @app.get("/api/vps/{vps_id}/containers/{container_id}/logs") async def container_logs( vps_id: str, container_id: str, lines: int = 100, _: Annotated[dict, Depends(get_current_user)] = None ): """Récupère les logs d'un conteneur via l'agent.""" vps = next((v for v in load_vps() if v["id"] == vps_id), None) if not vps: raise HTTPException(status_code=404, detail="VPS introuvable") try: return await agent_get(vps, f"/containers/{container_id}/logs?lines={lines}") except Exception as e: raise HTTPException(status_code=502, detail=str(e)) @app.post("/api/vps/{vps_id}/containers/{container_id}/action") async def container_action( vps_id: str, container_id: str, body: ActionRequest, _: Annotated[dict, Depends(get_current_user)] = None ): """Effectue une action sur un conteneur via l'agent.""" vps = next((v for v in load_vps() if v["id"] == vps_id), None) if not vps: raise HTTPException(status_code=404, detail="VPS introuvable") try: return await agent_post(vps, f"/containers/{container_id}/action?action={body.action}") except Exception as e: raise HTTPException(status_code=502, detail=str(e))