Files
ScriptVPS/vps-monitor/backend/main.py
jeanotx32 38bc430348
All checks were successful
Build and Push Docker Images / docker (push) Successful in 16s
Feat: implement dynamic agent version fetching and update mechanism
2026-06-02 20:06:39 -04:00

1287 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
VPS Monitor Backend — serveur central.
Agrège les données de tous les agents et expose une API REST pour le frontend.
"""
import asyncio
import base64
import json
import os
import re
import secrets
import sqlite3
import time
from collections import deque
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Annotated
import bcrypt as _bcrypt
import aiohttp
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from pydantic import BaseModel
from webauthn import (
generate_registration_options,
verify_registration_response,
generate_authentication_options,
verify_authentication_response,
options_to_json,
)
from webauthn.helpers.structs import (
AuthenticatorAttachment,
AuthenticatorSelectionCriteria,
UserVerificationRequirement,
ResidentKeyRequirement,
PublicKeyCredentialDescriptor,
RegistrationCredential as WebAuthnRegistrationCredential,
AuthenticatorAttestationResponse,
AuthenticationCredential as WebAuthnAuthenticationCredential,
AuthenticatorAssertionResponse,
)
# ─── Config ───────────────────────────────────────────────────────────────────
DB_FILE = Path(os.getenv("DB_FILE", "data/monitor.db"))
# Version de référence : d'abord env var explicite, sinon récupérée dynamiquement depuis le dépôt
_FORCED_AGENT_VERSION = os.getenv("EXPECTED_AGENT_VERSION", "") # si non vide, court-circuite le fetch
REPO_AGENT_URL = os.getenv(
"REPO_AGENT_URL",
"https://git.jeanbonapp.com/jeanbon/ScriptVPS/raw/branch/main/vps-monitor/agent/agent.py",
)
_latest_agent_version: str = _FORCED_AGENT_VERSION or "unknown"
def _get_expected_version() -> str:
return _latest_agent_version
# ─── Ring buffer de stats (en mémoire) ───────────────────────────────────────
_STATS_MAX_POINTS = 120 # 10 min à 5 s d'intervalle
_stats_history: dict[str, deque] = {}
SECRET_FILE = Path(os.getenv("SECRET_FILE", "data/.jwt_secret"))
AGENT_TIMEOUT = int(os.getenv("AGENT_TIMEOUT", "5"))
JWT_ALGORITHM = "HS256"
JWT_EXPIRE_MIN = int(os.getenv("JWT_EXPIRE_MINUTES", "1440")) # 24 h
DB_FILE.parent.mkdir(parents=True, exist_ok=True)
# Clé JWT : env var > fichier persisté > génération + sauvegarde
def _load_jwt_secret() -> str:
env = os.getenv("JWT_SECRET")
if env:
return env
if SECRET_FILE.exists():
return SECRET_FILE.read_text().strip()
secret = secrets.token_hex(32)
SECRET_FILE.write_text(secret)
SECRET_FILE.chmod(0o600)
return secret
JWT_SECRET = _load_jwt_secret()
# ─── WebAuthn / Passkeys config ───────────────────────────────────────────────
WEBAUTHN_RP_ID = os.getenv("WEBAUTHN_RP_ID", "localhost")
WEBAUTHN_RP_NAME = os.getenv("WEBAUTHN_RP_NAME", "VPS Monitor")
WEBAUTHN_ORIGIN = os.getenv("WEBAUTHN_ORIGIN", "http://localhost:3020")
# In-memory challenge store: session_id → {challenge, username, expires_at}
_webauthn_challenges: dict[str, dict] = {}
bearer_scheme = HTTPBearer()
# ─── Modèles ──────────────────────────────────────────────────────────────────
class VpsConfig(BaseModel):
id: str
name: str
host: str
port: int = 8001
api_key: str
description: str = ""
tags: list[str] = []
class ActionRequest(BaseModel):
action: str # start | stop | restart
class VpsUpdateRequest(BaseModel):
name: str
host: str
port: int = 8001
api_key: str = "" # vide = conserver la clé existante
description: str = ""
tags: list[str] = []
class ComposeUpdateRequest(BaseModel):
project: str
class RegisterRequest(BaseModel):
username: str
password: str
class LoginRequest(BaseModel):
username: str
password: str
class ChangePasswordRequest(BaseModel):
old_password: str
new_password: str
class SettingUpdateRequest(BaseModel):
value: str
class PurgeRequest(BaseModel):
table: str # 'vps_stats' | 'login_logs' | 'all'
period: str # 'last_24h' | 'last_7d' | 'last_30d' | 'all' | 'custom'
from_ts: int | None = None # epoch seconds, utilisé si period == 'custom'
to_ts: int | None = None # epoch seconds, utilisé si period == 'custom'
class PasskeyRegisterFinishRequest(BaseModel):
session_id: str
name: str = "Ma passkey"
credential: dict
class PasskeyLoginBeginRequest(BaseModel):
username: str | None = None
class PasskeyLoginFinishRequest(BaseModel):
session_id: str
credential: dict
# ─── SQLite ───────────────────────────────────────────────────────────────────
@contextmanager
def get_db():
conn = sqlite3.connect(DB_FILE, check_same_thread=False)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def init_db() -> None:
with get_db() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
username TEXT PRIMARY KEY,
password TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'admin'
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS vps (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
host TEXT NOT NULL,
port INTEGER NOT NULL DEFAULT 8001,
api_key TEXT NOT NULL,
description TEXT NOT NULL DEFAULT '',
tags TEXT NOT NULL DEFAULT '[]'
)
""")
# Migration : ajoute la colonne tags si elle n'existe pas encore
try:
conn.execute("ALTER TABLE vps ADD COLUMN tags TEXT NOT NULL DEFAULT '[]'")
except Exception:
pass # colonne déjà présente
conn.execute("""
CREATE TABLE IF NOT EXISTS vps_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
vps_id TEXT NOT NULL,
ts INTEGER NOT NULL,
cpu REAL NOT NULL,
ram_percent REAL NOT NULL,
ram_used INTEGER NOT NULL,
ram_total INTEGER NOT NULL,
net_sent_per_sec REAL NOT NULL DEFAULT 0,
net_recv_per_sec REAL NOT NULL DEFAULT 0,
net_bytes_sent INTEGER NOT NULL DEFAULT 0,
net_bytes_recv INTEGER NOT NULL DEFAULT 0
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_vps_stats
ON vps_stats(vps_id, ts DESC)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS login_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
username TEXT NOT NULL,
ip TEXT NOT NULL,
success INTEGER NOT NULL,
reason TEXT NOT NULL DEFAULT ''
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_login_logs_ts
ON login_logs(ts DESC)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
""")
conn.execute("""
INSERT OR IGNORE INTO settings (key, value) VALUES ('registration_open', 'false')
""")
conn.execute("""
INSERT OR IGNORE INTO settings (key, value) VALUES ('passkey_enabled', 'true')
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS passkeys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
credential_id TEXT NOT NULL UNIQUE,
public_key TEXT NOT NULL,
sign_count INTEGER NOT NULL DEFAULT 0,
name TEXT NOT NULL DEFAULT '',
created_at INTEGER NOT NULL
)
""")
init_db()
# ─── Persistance ──────────────────────────────────────────────────────────────
def load_users() -> list[dict]:
with get_db() as conn:
return [dict(r) for r in conn.execute("SELECT * FROM users").fetchall()]
def add_user(user: dict) -> None:
with get_db() as conn:
conn.execute(
"INSERT INTO users (username, password, role) VALUES (?, ?, ?)",
(user["username"], user["password"], user["role"]),
)
def load_vps() -> list[dict]:
with get_db() as conn:
rows = [dict(r) for r in conn.execute("SELECT * FROM vps").fetchall()]
for row in rows:
try:
row["tags"] = json.loads(row.get("tags") or "[]")
except Exception:
row["tags"] = []
return rows
def insert_vps(vps: dict) -> None:
with get_db() as conn:
conn.execute(
"INSERT INTO vps (id, name, host, port, api_key, description, tags) VALUES (?, ?, ?, ?, ?, ?, ?)",
(vps["id"], vps["name"], vps["host"], vps["port"], vps["api_key"],
vps.get("description", ""), json.dumps(vps.get("tags", []))),
)
def remove_vps(vps_id: str) -> bool:
with get_db() as conn:
cur = conn.execute("DELETE FROM vps WHERE id = ?", (vps_id,))
return cur.rowcount > 0
def update_vps(vps_id: str, data: dict) -> bool:
with get_db() as conn:
cur = conn.execute(
"UPDATE vps SET name=?, host=?, port=?, api_key=?, description=?, tags=? WHERE id=?",
(data["name"], data["host"], data["port"], data["api_key"],
data["description"], json.dumps(data.get("tags", [])), vps_id),
)
return cur.rowcount > 0
# ─── Settings helpers ────────────────────────────────────────────────────────
def _get_setting(key: str) -> str:
with get_db() as conn:
row = conn.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone()
return row["value"] if row else ""
# ─── WebAuthn / Passkey helpers ───────────────────────────────────────────────
def _b64url_encode(b: bytes) -> str:
return base64.urlsafe_b64encode(b).rstrip(b"=").decode()
def _b64url_decode(s: str) -> bytes:
padding = 4 - len(s) % 4
if padding != 4:
s += "=" * padding
return base64.urlsafe_b64decode(s)
def _store_challenge(challenge: bytes, username: str | None = None) -> str:
"""Stocke un challenge WebAuthn en mémoire et retourne un session_id."""
session_id = secrets.token_hex(16)
_webauthn_challenges[session_id] = {
"challenge": challenge,
"username": username,
"expires_at": time.time() + 300,
}
# Nettoyage des challenges expirés
now = time.time()
expired = [k for k, v in list(_webauthn_challenges.items()) if v["expires_at"] < now]
for k in expired:
_webauthn_challenges.pop(k, None)
return session_id
def _pop_challenge(session_id: str) -> dict | None:
entry = _webauthn_challenges.pop(session_id, None)
if entry is None:
return None
if time.time() > entry["expires_at"]:
return None
return entry
def _get_passkeys_for_user(username: str) -> list[dict]:
with get_db() as conn:
rows = conn.execute(
"SELECT * FROM passkeys WHERE username = ? ORDER BY created_at DESC",
(username,),
).fetchall()
return [dict(r) for r in rows]
def _get_passkey_by_credential_id(credential_id: str) -> dict | None:
with get_db() as conn:
row = conn.execute(
"SELECT * FROM passkeys WHERE credential_id = ?", (credential_id,)
).fetchone()
return dict(row) if row else None
def _update_passkey_sign_count(credential_id: str, sign_count: int) -> None:
with get_db() as conn:
conn.execute(
"UPDATE passkeys SET sign_count = ? WHERE credential_id = ?",
(sign_count, credential_id),
)
def _parse_registration_credential(data: dict) -> WebAuthnRegistrationCredential:
resp = data["response"]
return WebAuthnRegistrationCredential(
id=data["id"],
raw_id=_b64url_decode(data.get("rawId", data["id"])),
response=AuthenticatorAttestationResponse(
client_data_json=_b64url_decode(resp["clientDataJSON"]),
attestation_object=_b64url_decode(resp["attestationObject"]),
),
type=data["type"],
)
def _parse_authentication_credential(data: dict) -> WebAuthnAuthenticationCredential:
resp = data["response"]
user_handle = resp.get("userHandle")
return WebAuthnAuthenticationCredential(
id=data["id"],
raw_id=_b64url_decode(data.get("rawId", data["id"])),
response=AuthenticatorAssertionResponse(
client_data_json=_b64url_decode(resp["clientDataJSON"]),
authenticator_data=_b64url_decode(resp["authenticatorData"]),
signature=_b64url_decode(resp["signature"]),
user_handle=_b64url_decode(user_handle) if user_handle else None,
),
type=data["type"],
)
# ─── Auth helpers ─────────────────────────────────────────────────────────────
def create_token(username: str, role: str) -> str:
payload = {
"sub": username,
"role": role,
"exp": datetime.now(timezone.utc) + timedelta(minutes=JWT_EXPIRE_MIN),
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(bearer_scheme)]
) -> dict:
try:
payload = jwt.decode(credentials.credentials, JWT_SECRET, algorithms=[JWT_ALGORITHM])
username = payload.get("sub")
if not username:
raise HTTPException(status_code=401, detail="Token invalide")
except JWTError:
raise HTTPException(status_code=401, detail="Token invalide ou expiré")
user = next((u for u in load_users() if u["username"] == username), None)
if not user:
raise HTTPException(status_code=401, detail="Utilisateur introuvable")
return user
def require_admin(current_user: Annotated[dict, Depends(get_current_user)]) -> dict:
if current_user.get("role") != "admin":
raise HTTPException(status_code=403, detail="Accès réservé aux administrateurs")
return current_user
def _get_client_ip(request: Request) -> str:
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
return request.client.host if request.client else "unknown"
def _log_login(username: str, ip: str, success: bool, reason: str) -> None:
with get_db() as conn:
conn.execute(
"INSERT INTO login_logs (ts, username, ip, success, reason) VALUES (?, ?, ?, ?, ?)",
(int(time.time()), username, ip, 1 if success else 0, reason),
)
# ─── App ──────────────────────────────────────────────────────────────────────
app = FastAPI(title="VPS Monitor Backend", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=os.getenv("CORS_ORIGINS", "*").split(","),
allow_methods=["*"],
allow_headers=["*"],
)
# ─── Helpers HTTP ─────────────────────────────────────────────────────────────
async def agent_get(vps: dict, path: str):
url = f"http://{vps['host']}:{vps['port']}{path}"
headers = {"X-API-Key": vps["api_key"]}
timeout = aiohttp.ClientTimeout(total=AGENT_TIMEOUT)
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, timeout=timeout) as r:
r.raise_for_status()
return await r.json()
async def agent_post(vps: dict, path: str, payload: dict | None = None):
url = f"http://{vps['host']}:{vps['port']}{path}"
headers = {"X-API-Key": vps["api_key"]}
timeout = aiohttp.ClientTimeout(total=AGENT_TIMEOUT)
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload, timeout=timeout) as r:
r.raise_for_status()
return await r.json()
async def fetch_vps_status(vps: dict) -> dict:
"""Interroge un agent et retourne son état complet.
Les données système (CPU, RAM, réseau) proviennent en priorité du ring buffer
alimenté par le collecteur toutes les 5 s — même source que le modal de stats.
Si le buffer est encore vide (premier démarrage), un appel direct est fait.
"""
try:
containers_res = await agent_get(vps, "/containers")
# Services systemd (non-Docker) — optionnel, agent >= 1.2.0
try:
services_res = await agent_get(vps, "/services")
except Exception:
services_res = []
# Source unique : ring buffer → carte et modal affichent exactement la même valeur
history = _stats_history.get(vps["id"])
if history:
last = history[-1]
system = {
"cpu_percent": last["cpu"],
"ram_used": last["ram_used"],
"ram_total": last["ram_total"],
"ram_percent": last["ram_percent"],
"net_sent_per_sec": last["net_sent_per_sec"],
"net_recv_per_sec": last["net_recv_per_sec"],
}
else:
# Buffer vide (premier démarrage) — appel direct en fallback
try:
system = await agent_get(vps, "/system")
except Exception:
system = None
# Version de l'agent (endpoint sans auth)
try:
version_res = await agent_get(vps, "/version")
agent_version = version_res.get("version", "unknown")
except Exception:
agent_version = "unknown"
return {
"id": vps["id"],
"name": vps["name"],
"host": vps["host"],
"description": vps.get("description", ""),
"online": True,
"containers": containers_res,
"services": services_res,
"system": system,
"tags": vps.get("tags", []),
"agent_version": agent_version,
"expected_agent_version": _get_expected_version(),
"agent_up_to_date": agent_version == _get_expected_version() and _get_expected_version() != "unknown",
}
except Exception as e:
return {
"id": vps["id"],
"name": vps["name"],
"host": vps["host"],
"description": vps.get("description", ""),
"online": False,
"error": str(e),
"containers": [],
"services": [],
"system": None,
"tags": vps.get("tags", []),
"agent_version": None,
"expected_agent_version": _get_expected_version(),
"agent_up_to_date": False,
}
# ─── Collecteur de stats (tâche de fond) ─────────────────────────────────────
async def _fetch_system_stat(vps: dict) -> None:
"""Collecte un point de stats système pour un VPS et le persiste (ring buffer + SQLite)."""
try:
data = await agent_get(vps, "/system")
now = int(time.time())
# Ring buffer en mémoire (source pour la carte VPS — valeur courante)
buf = _stats_history.setdefault(vps["id"], deque(maxlen=_STATS_MAX_POINTS))
buf.append({
"ts": datetime.fromtimestamp(now, tz=timezone.utc).isoformat(),
"cpu": data["cpu_percent"],
"ram_percent": data["ram_percent"],
"ram_used": data["ram_used"],
"ram_total": data["ram_total"],
"net_sent_per_sec": data["net_sent_per_sec"],
"net_recv_per_sec": data["net_recv_per_sec"],
"net_bytes_sent": data.get("net_bytes_sent", 0),
"net_bytes_recv": data.get("net_bytes_recv", 0),
})
# Persistance SQLite (historique long terme)
with get_db() as conn:
conn.execute("""
INSERT INTO vps_stats
(vps_id, ts, cpu, ram_percent, ram_used, ram_total,
net_sent_per_sec, net_recv_per_sec, net_bytes_sent, net_bytes_recv)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
vps["id"], now,
data["cpu_percent"], data["ram_percent"],
data["ram_used"], data["ram_total"],
data["net_sent_per_sec"], data["net_recv_per_sec"],
data.get("net_bytes_sent", 0), data.get("net_bytes_recv", 0),
))
except Exception:
pass # VPS hors ligne ou agent non mis à jour — ignoré silencieusement
async def _stats_collector() -> None:
"""Tâche de fond : collecte les stats de tous les VPS toutes les 5 secondes."""
await asyncio.sleep(3) # laisse l'app finir de démarrer
while True:
vps_list = load_vps()
await asyncio.gather(
*[_fetch_system_stat(v) for v in vps_list],
return_exceptions=True,
)
await asyncio.sleep(5)
async def _refresh_latest_agent_version() -> None:
"""Récupère AGENT_VERSION depuis le dépôt Gitea toutes les heures.
Si EXPECTED_AGENT_VERSION est défini en env var, cette tâche n'écrase pas la valeur forcée.
"""
global _latest_agent_version
if _FORCED_AGENT_VERSION:
return # version forcée par env var — pas besoin de fetcher
while True:
try:
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession() as session:
async with session.get(REPO_AGENT_URL, timeout=timeout) as r:
if r.status == 200:
text = await r.text()
m = re.search(
r'^AGENT_VERSION\s*=\s*["\']([^"\']+)["\']',
text,
re.MULTILINE,
)
if m:
_latest_agent_version = m.group(1)
except Exception:
pass # garder la dernière valeur connue en cas d'erreur réseau
await asyncio.sleep(3600) # rafraîchit toutes les heures
@app.on_event("startup")
async def startup_event() -> None:
asyncio.create_task(_stats_collector())
asyncio.create_task(_cleanup_old_stats())
asyncio.create_task(_refresh_latest_agent_version())
async def _cleanup_old_stats() -> None:
"""Supprime les statistiques de plus de 31 jours (s'exécute toutes les heures)."""
while True:
await asyncio.sleep(3600)
cutoff = int(time.time()) - 31 * 24 * 3600
with get_db() as conn:
conn.execute("DELETE FROM vps_stats WHERE ts < ?", (cutoff,))
# ─── Routes Auth ──────────────────────────────────────────────────────────────
@app.get("/api/auth/status")
def auth_status():
"""Indique si des utilisateurs existent déjà (pour le frontend)."""
return {
"has_users": len(load_users()) > 0,
"passkey_enabled": _get_setting("passkey_enabled") == "true",
}
@app.post("/api/auth/register", status_code=201)
def register(body: RegisterRequest):
"""Enregistre un nouvel utilisateur. Ouvert uniquement si aucun utilisateur n'existe
ou si l'admin a activé les inscriptions."""
users = load_users()
if len(users) > 0:
if _get_setting("registration_open") != "true":
raise HTTPException(
status_code=403,
detail="L'enregistrement public est désactivé. Seul l'admin peut créer des comptes."
)
if not body.username.strip() or len(body.password) < 6:
raise HTTPException(status_code=422, detail="Mot de passe trop court (6 caractères min.)")
if any(u["username"] == body.username.strip() for u in users):
raise HTTPException(status_code=409, detail="Ce nom d'utilisateur est déjà pris")
role = "admin" if len(users) == 0 else "user"
user = {
"username": body.username.strip(),
"password": _bcrypt.hashpw(body.password.encode(), _bcrypt.gensalt()).decode(),
"role": role,
}
add_user(user)
token = create_token(user["username"], user["role"])
return {"access_token": token, "token_type": "bearer", "role": user["role"]}
@app.post("/api/auth/login")
def login(body: LoginRequest, request: Request):
"""Authentifie un utilisateur et retourne un JWT."""
ip = _get_client_ip(request)
users = load_users()
user = next((u for u in users if u["username"] == body.username), None)
if not user or not _bcrypt.checkpw(body.password.encode(), user["password"].encode()):
_log_login(body.username, ip, False, "Identifiants incorrects")
raise HTTPException(status_code=401, detail="Identifiants incorrects")
_log_login(user["username"], ip, True, "")
token = create_token(user["username"], user["role"])
return {"access_token": token, "token_type": "bearer", "role": user["role"]}
@app.get("/api/auth/me")
def me(current_user: Annotated[dict, Depends(get_current_user)]):
return {"username": current_user["username"], "role": current_user["role"]}
@app.post("/api/auth/change-password")
def change_password(
body: ChangePasswordRequest,
current_user: Annotated[dict, Depends(get_current_user)],
):
"""Permet à l'utilisateur connecté de changer son mot de passe."""
if not _bcrypt.checkpw(body.old_password.encode(), current_user["password"].encode()):
raise HTTPException(status_code=400, detail="Ancien mot de passe incorrect")
if len(body.new_password) < 6:
raise HTTPException(status_code=422, detail="Nouveau mot de passe trop court (6 caractères min.)")
new_hash = _bcrypt.hashpw(body.new_password.encode(), _bcrypt.gensalt()).decode()
with get_db() as conn:
conn.execute(
"UPDATE users SET password = ? WHERE username = ?",
(new_hash, current_user["username"]),
)
return {"status": "ok"}
# ─── Routes Admin ─────────────────────────────────────────────────────────────
@app.get("/api/admin/settings")
def admin_get_settings(_: Annotated[dict, Depends(require_admin)]):
"""Retourne les paramètres d'administration."""
with get_db() as conn:
rows = conn.execute("SELECT key, value FROM settings").fetchall()
return {row["key"]: row["value"] for row in rows}
@app.put("/api/admin/settings/{key}")
def admin_update_setting(
key: str,
body: SettingUpdateRequest,
_: Annotated[dict, Depends(require_admin)],
):
"""Met à jour un paramètre d'administration."""
allowed_keys = {"registration_open", "passkey_enabled"}
if key not in allowed_keys:
raise HTTPException(status_code=400, detail="Clé de paramètre inconnue")
with get_db() as conn:
conn.execute(
"INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
(key, body.value),
)
return {"status": "ok"}
@app.get("/api/admin/login-logs")
def admin_login_logs(
limit: int = 100,
offset: int = 0,
_: Annotated[dict, Depends(require_admin)] = None,
):
"""Retourne les tentatives de connexion enregistrées."""
limit = max(1, min(limit, 500))
offset = max(0, offset)
with get_db() as conn:
rows = conn.execute(
"SELECT * FROM login_logs ORDER BY ts DESC LIMIT ? OFFSET ?",
(limit, offset),
).fetchall()
total = conn.execute("SELECT COUNT(*) FROM login_logs").fetchone()[0]
return {
"total": total,
"logs": [
{
"id": row["id"],
"ts": datetime.fromtimestamp(row["ts"], tz=timezone.utc).isoformat(),
"username": row["username"],
"ip": row["ip"],
"success": bool(row["success"]),
"reason": row["reason"],
}
for row in rows
],
}
@app.get("/api/admin/users")
def admin_list_users(_: Annotated[dict, Depends(require_admin)]):
"""Liste les utilisateurs (sans mots de passe)."""
return [
{"username": u["username"], "role": u["role"]}
for u in load_users()
]
@app.get("/api/admin/db/info")
def admin_db_info(_: Annotated[dict, Depends(require_admin)]):
"""Retourne des informations sur le contenu de la base de données."""
with get_db() as conn:
def _table_info(table: str) -> dict:
row = conn.execute(
f"SELECT COUNT(*) AS cnt, MIN(ts) AS oldest, MAX(ts) AS newest FROM {table}"
).fetchone()
return {
"count": row["cnt"],
"oldest_ts": row["oldest"],
"newest_ts": row["newest"],
}
return {
"vps_stats": _table_info("vps_stats"),
"login_logs": _table_info("login_logs"),
}
_ALLOWED_TABLES = frozenset({"vps_stats", "login_logs"})
_ALLOWED_PERIODS = frozenset({"last_24h", "last_7d", "last_30d", "all", "custom"})
@app.delete("/api/admin/db/purge")
def admin_db_purge(body: PurgeRequest, _: Annotated[dict, Depends(require_admin)]):
"""Supprime des entrées de la base de données selon la table et la période choisies."""
if body.table not in _ALLOWED_TABLES and body.table != "all":
raise HTTPException(status_code=400, detail="Table inconnue")
if body.period not in _ALLOWED_PERIODS:
raise HTTPException(status_code=400, detail="Période inconnue")
tables = list(_ALLOWED_TABLES) if body.table == "all" else [body.table]
now = int(time.time())
period_cutoff = {
"last_24h": now - 24 * 3600,
"last_7d": now - 7 * 24 * 3600,
"last_30d": now - 30 * 24 * 3600,
}
deleted: dict[str, int] = {}
with get_db() as conn:
for tbl in tables:
if body.period == "all":
cur = conn.execute(f"DELETE FROM {tbl}") # nosec tbl validated above
elif body.period == "custom":
if body.from_ts is None or body.to_ts is None:
raise HTTPException(
status_code=400,
detail="Période personnalisée : from_ts et to_ts sont requis",
)
if body.from_ts > body.to_ts:
raise HTTPException(status_code=400, detail="from_ts doit être ≤ to_ts")
cur = conn.execute(
f"DELETE FROM {tbl} WHERE ts >= ? AND ts <= ?", # nosec
(body.from_ts, body.to_ts),
)
else:
cutoff = period_cutoff[body.period]
cur = conn.execute(
f"DELETE FROM {tbl} WHERE ts >= ?", # nosec
(cutoff,),
)
deleted[tbl] = cur.rowcount
return {"deleted": deleted, "status": "ok"}
# ─── Routes Passkeys ──────────────────────────────────────────────────────────
@app.post("/api/auth/passkey/register/begin")
def passkey_register_begin(
current_user: Annotated[dict, Depends(get_current_user)],
):
"""Démarre l'enregistrement d'une passkey pour l'utilisateur connecté."""
if _get_setting("passkey_enabled") != "true":
raise HTTPException(status_code=403, detail="Les passkeys sont désactivées")
username = current_user["username"]
existing = _get_passkeys_for_user(username)
exclude_creds = [
PublicKeyCredentialDescriptor(id=_b64url_decode(pk["credential_id"]))
for pk in existing
]
opts = generate_registration_options(
rp_id=WEBAUTHN_RP_ID,
rp_name=WEBAUTHN_RP_NAME,
user_id=username.encode(),
user_name=username,
user_display_name=username,
authenticator_selection=AuthenticatorSelectionCriteria(
authenticator_attachment=AuthenticatorAttachment.PLATFORM,
resident_key=ResidentKeyRequirement.REQUIRED,
user_verification=UserVerificationRequirement.REQUIRED,
),
exclude_credentials=exclude_creds,
)
session_id = _store_challenge(opts.challenge, username=username)
return {"session_id": session_id, "options": json.loads(options_to_json(opts))}
@app.post("/api/auth/passkey/register/finish")
def passkey_register_finish(
body: PasskeyRegisterFinishRequest,
current_user: Annotated[dict, Depends(get_current_user)],
):
"""Finalise l'enregistrement d'une passkey."""
if _get_setting("passkey_enabled") != "true":
raise HTTPException(status_code=403, detail="Les passkeys sont désactivées")
challenge_data = _pop_challenge(body.session_id)
if not challenge_data:
raise HTTPException(status_code=400, detail="Session expirée ou invalide")
if challenge_data["username"] != current_user["username"]:
raise HTTPException(status_code=403, detail="Session invalide")
try:
cred = _parse_registration_credential(body.credential)
verification = verify_registration_response(
credential=cred,
expected_challenge=challenge_data["challenge"],
expected_rp_id=WEBAUTHN_RP_ID,
expected_origin=WEBAUTHN_ORIGIN,
)
except Exception as exc:
raise HTTPException(status_code=400, detail=f"Vérification échouée : {exc}")
cred_id = _b64url_encode(verification.credential_id)
pub_key = _b64url_encode(verification.credential_public_key)
if _get_passkey_by_credential_id(cred_id):
raise HTTPException(status_code=409, detail="Cette passkey est déjà enregistrée")
with get_db() as conn:
conn.execute(
"""INSERT INTO passkeys (username, credential_id, public_key, sign_count, name, created_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(current_user["username"], cred_id, pub_key,
verification.sign_count, body.name, int(time.time())),
)
return {"status": "ok"}
@app.post("/api/auth/passkey/login/begin")
def passkey_login_begin(body: PasskeyLoginBeginRequest):
"""Démarre l'authentification par passkey."""
if _get_setting("passkey_enabled") != "true":
raise HTTPException(status_code=403, detail="Les passkeys sont désactivées")
allow_creds: list[PublicKeyCredentialDescriptor] = []
if body.username:
passkeys = _get_passkeys_for_user(body.username)
allow_creds = [
PublicKeyCredentialDescriptor(id=_b64url_decode(pk["credential_id"]))
for pk in passkeys
]
opts = generate_authentication_options(
rp_id=WEBAUTHN_RP_ID,
allow_credentials=allow_creds,
user_verification=UserVerificationRequirement.REQUIRED,
)
session_id = _store_challenge(opts.challenge, username=body.username)
return {"session_id": session_id, "options": json.loads(options_to_json(opts))}
@app.post("/api/auth/passkey/login/finish")
def passkey_login_finish(body: PasskeyLoginFinishRequest, request: Request):
"""Vérifie la passkey et retourne un JWT."""
if _get_setting("passkey_enabled") != "true":
raise HTTPException(status_code=403, detail="Les passkeys sont désactivées")
challenge_data = _pop_challenge(body.session_id)
if not challenge_data:
raise HTTPException(status_code=400, detail="Session expirée ou invalide")
cred_id = body.credential.get("id", "")
passkey = _get_passkey_by_credential_id(cred_id)
if not passkey:
ip = _get_client_ip(request)
_log_login("unknown", ip, False, "Passkey introuvable")
raise HTTPException(status_code=401, detail="Passkey non reconnue")
try:
cred = _parse_authentication_credential(body.credential)
verification = verify_authentication_response(
credential=cred,
expected_challenge=challenge_data["challenge"],
expected_rp_id=WEBAUTHN_RP_ID,
expected_origin=WEBAUTHN_ORIGIN,
credential_public_key=_b64url_decode(passkey["public_key"]),
credential_current_sign_count=passkey["sign_count"],
)
except Exception as exc:
ip = _get_client_ip(request)
_log_login(passkey["username"], ip, False, f"Passkey invalide : {exc}")
raise HTTPException(status_code=401, detail="Authentification échouée")
_update_passkey_sign_count(cred_id, verification.new_sign_count)
user = next((u for u in load_users() if u["username"] == passkey["username"]), None)
if not user:
raise HTTPException(status_code=401, detail="Utilisateur introuvable")
ip = _get_client_ip(request)
_log_login(user["username"], ip, True, "passkey")
token = create_token(user["username"], user["role"])
return {"access_token": token, "token_type": "bearer", "role": user["role"]}
@app.get("/api/auth/passkeys")
def list_my_passkeys(current_user: Annotated[dict, Depends(get_current_user)]):
"""Liste les passkeys de l'utilisateur connecté."""
passkeys = _get_passkeys_for_user(current_user["username"])
return [
{
"credential_id": pk["credential_id"],
"name": pk["name"],
"created_at": datetime.fromtimestamp(pk["created_at"], tz=timezone.utc).isoformat(),
}
for pk in passkeys
]
@app.delete("/api/auth/passkeys/{credential_id}")
def delete_my_passkey(
credential_id: str,
current_user: Annotated[dict, Depends(get_current_user)],
):
"""Supprime une passkey de l'utilisateur connecté."""
with get_db() as conn:
cur = conn.execute(
"DELETE FROM passkeys WHERE credential_id = ? AND username = ?",
(credential_id, current_user["username"]),
)
if cur.rowcount == 0:
raise HTTPException(status_code=404, detail="Passkey introuvable")
return {"status": "ok"}
@app.get("/api/admin/passkeys")
def admin_list_passkeys(_: Annotated[dict, Depends(require_admin)]):
"""Admin : liste toutes les passkeys enregistrées."""
with get_db() as conn:
rows = conn.execute(
"SELECT * FROM passkeys ORDER BY created_at DESC"
).fetchall()
return [
{
"credential_id": row["credential_id"],
"username": row["username"],
"name": row["name"],
"created_at": datetime.fromtimestamp(row["created_at"], tz=timezone.utc).isoformat(),
}
for row in rows
]
@app.delete("/api/admin/passkeys/{credential_id}")
def admin_delete_passkey(
credential_id: str,
_: Annotated[dict, Depends(require_admin)],
):
"""Admin : révoque n'importe quelle passkey."""
with get_db() as conn:
cur = conn.execute(
"DELETE FROM passkeys WHERE credential_id = ?", (credential_id,)
)
if cur.rowcount == 0:
raise HTTPException(status_code=404, detail="Passkey introuvable")
return {"status": "ok"}
# ─── Routes VPS ───────────────────────────────────────────────────────────────
@app.get("/api/vps")
def list_vps(_: Annotated[dict, Depends(get_current_user)]):
"""Liste les VPS configurés (sans les clés API)."""
return [
{"id": v["id"], "name": v["name"], "host": v["host"],
"description": v.get("description", ""), "tags": v.get("tags", [])}
for v in load_vps()
]
@app.post("/api/vps", status_code=201)
def add_vps(vps: VpsConfig, _: Annotated[dict, Depends(get_current_user)]):
"""Ajoute un nouveau VPS."""
if any(v["id"] == vps.id for v in load_vps()):
raise HTTPException(status_code=409, detail="Un VPS avec cet ID existe déjà")
insert_vps(vps.model_dump())
return {"status": "ok", "id": vps.id}
@app.delete("/api/vps/{vps_id}")
def delete_vps(vps_id: str, _: Annotated[dict, Depends(get_current_user)]):
"""Supprime un VPS de la configuration."""
if not remove_vps(vps_id):
raise HTTPException(status_code=404, detail="VPS introuvable")
return {"status": "ok"}
@app.put("/api/vps/{vps_id}")
def edit_vps(vps_id: str, body: VpsUpdateRequest, _: Annotated[dict, Depends(get_current_user)]):
"""Met à jour les paramètres d'un VPS (name, host, port, api_key, description)."""
existing = next((v for v in load_vps() if v["id"] == vps_id), None)
if not existing:
raise HTTPException(status_code=404, detail="VPS introuvable")
data = {
"name": body.name,
"host": body.host,
"port": body.port,
"api_key": body.api_key.strip() or existing["api_key"],
"description": body.description,
"tags": body.tags,
}
update_vps(vps_id, data)
return {"status": "ok"}
@app.get("/api/vps/{vps_id}/export")
def export_vps(vps_id: str, _: Annotated[dict, Depends(get_current_user)]):
"""Exporte la configuration complète d'un VPS (y compris la clé API) pour import ailleurs."""
vps = next((v for v in load_vps() if v["id"] == vps_id), None)
if not vps:
raise HTTPException(status_code=404, detail="VPS introuvable")
return {
"id": vps["id"],
"name": vps["name"],
"host": vps["host"],
"port": vps["port"],
"api_key": vps["api_key"],
"description": vps.get("description", ""),
"tags": vps.get("tags", []),
}
@app.get("/api/vps/{vps_id}/stats")
def get_vps_stats(
vps_id: str,
duration: int = 600, # secondes, défaut 10 min
_: Annotated[dict, Depends(get_current_user)] = None,
):
"""Retourne l'historique des stats système d'un VPS avec downsampling automatique.
Le paramètre `duration` (en secondes) détermine la fenêtre temporelle.
La réponse contient toujours ~300 points maximum (agrégation par bucket SQL).
"""
if not any(v["id"] == vps_id for v in load_vps()):
raise HTTPException(status_code=404, detail="VPS introuvable")
duration = max(60, min(duration, 31 * 24 * 3600)) # clamp 1 min 31 j
since = int(time.time()) - duration
bucket = max(5, duration // 300) # ≤ 300 points retournés
with get_db() as conn:
rows = conn.execute("""
SELECT
(ts / :b) * :b AS ts,
AVG(cpu) AS cpu,
AVG(ram_percent) AS ram_percent,
CAST(AVG(ram_used) AS INTEGER) AS ram_used,
MAX(ram_total) AS ram_total,
AVG(net_sent_per_sec) AS net_sent_per_sec,
AVG(net_recv_per_sec) AS net_recv_per_sec,
MAX(net_bytes_sent) AS net_bytes_sent,
MAX(net_bytes_recv) AS net_bytes_recv
FROM vps_stats
WHERE vps_id = :vps_id AND ts >= :since
GROUP BY (ts / :b)
ORDER BY ts ASC
""", {"b": bucket, "vps_id": vps_id, "since": since}).fetchall()
return [
{
"ts": datetime.fromtimestamp(int(row["ts"]), tz=timezone.utc).isoformat(),
"cpu": row["cpu"],
"ram_percent": row["ram_percent"],
"ram_used": row["ram_used"],
"ram_total": row["ram_total"],
"net_sent_per_sec": row["net_sent_per_sec"],
"net_recv_per_sec": row["net_recv_per_sec"],
"net_bytes_sent": row["net_bytes_sent"],
"net_bytes_recv": row["net_bytes_recv"],
}
for row in rows
]
@app.get("/api/status")
async def all_status(_: Annotated[dict, Depends(get_current_user)]):
"""Retourne l'état de tous les VPS en parallèle."""
vps_list = load_vps()
results = await asyncio.gather(*[fetch_vps_status(v) for v in vps_list])
return list(results)
@app.get("/api/vps/{vps_id}/status")
async def vps_status(vps_id: str, _: Annotated[dict, Depends(get_current_user)]):
"""Retourne l'état d'un VPS spécifique."""
vps = next((v for v in load_vps() if v["id"] == vps_id), None)
if not vps:
raise HTTPException(status_code=404, detail="VPS introuvable")
return await fetch_vps_status(vps)
@app.get("/api/vps/{vps_id}/containers/{container_id}/logs")
async def container_logs(
vps_id: str, container_id: str, lines: int = 100,
_: Annotated[dict, Depends(get_current_user)] = None
):
"""Récupère les logs d'un conteneur via l'agent."""
vps = next((v for v in load_vps() if v["id"] == vps_id), None)
if not vps:
raise HTTPException(status_code=404, detail="VPS introuvable")
try:
return await agent_get(vps, f"/containers/{container_id}/logs?lines={lines}")
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
@app.post("/api/vps/{vps_id}/containers/{container_id}/action")
async def container_action(
vps_id: str, container_id: str, body: ActionRequest,
_: Annotated[dict, Depends(get_current_user)] = None
):
"""Effectue une action sur un conteneur via l'agent."""
vps = next((v for v in load_vps() if v["id"] == vps_id), None)
if not vps:
raise HTTPException(status_code=404, detail="VPS introuvable")
try:
return await agent_post(vps, f"/containers/{container_id}/action?action={body.action}")
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
@app.post("/api/vps/{vps_id}/compose/update")
async def compose_update(
vps_id: str, body: ComposeUpdateRequest,
_: Annotated[dict, Depends(get_current_user)] = None
):
"""Lance docker compose pull + up sur un projet via l'agent."""
vps = next((v for v in load_vps() if v["id"] == vps_id), None)
if not vps:
raise HTTPException(status_code=404, detail="VPS introuvable")
try:
url = f"http://{vps['host']}:{vps['port']}/compose/update?project={body.project}"
headers = {"X-API-Key": vps["api_key"]}
timeout = aiohttp.ClientTimeout(total=600)
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, timeout=timeout) as r:
r.raise_for_status()
return await r.json()
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
@app.post("/api/vps/{vps_id}/agent/update")
async def agent_self_update(
vps_id: str,
_: Annotated[dict, Depends(get_current_user)] = None,
):
"""Déclenche la mise à jour de l'agent sur le VPS."""
vps = next((v for v in load_vps() if v["id"] == vps_id), None)
if not vps:
raise HTTPException(status_code=404, detail="VPS introuvable")
try:
return await agent_post(vps, "/self-update")
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))