diff --git a/vps-monitor/backend/__pycache__/main.cpython-313.pyc b/vps-monitor/backend/__pycache__/main.cpython-313.pyc
index 9ac257c..af6dd08 100644
Binary files a/vps-monitor/backend/__pycache__/main.cpython-313.pyc and b/vps-monitor/backend/__pycache__/main.cpython-313.pyc differ
diff --git a/vps-monitor/backend/main.py b/vps-monitor/backend/main.py
index f6da13d..e6118dc 100644
--- a/vps-monitor/backend/main.py
+++ b/vps-monitor/backend/main.py
@@ -5,6 +5,7 @@ Agrège les données de tous les agents et expose une API REST pour le frontend.
"""
import asyncio
+import base64
import json
import os
import secrets
@@ -24,6 +25,25 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from pydantic import BaseModel
+from webauthn import (
+ generate_registration_options,
+ verify_registration_response,
+ generate_authentication_options,
+ verify_authentication_response,
+ options_to_json,
+)
+from webauthn.helpers.structs import (
+ AuthenticatorAttachment,
+ AuthenticatorSelectionCriteria,
+ UserVerificationRequirement,
+ ResidentKeyRequirement,
+ PublicKeyCredentialDescriptor,
+ RegistrationCredential as WebAuthnRegistrationCredential,
+ AuthenticatorAttestationResponse,
+ AuthenticationCredential as WebAuthnAuthenticationCredential,
+ AuthenticatorAssertionResponse,
+)
+
# ─── Config ───────────────────────────────────────────────────────────────────
DB_FILE = Path(os.getenv("DB_FILE", "data/monitor.db"))
@@ -53,6 +73,14 @@ def _load_jwt_secret() -> str:
JWT_SECRET = _load_jwt_secret()
+# ─── WebAuthn / Passkeys config ───────────────────────────────────────────────
+WEBAUTHN_RP_ID = os.getenv("WEBAUTHN_RP_ID", "localhost")
+WEBAUTHN_RP_NAME = os.getenv("WEBAUTHN_RP_NAME", "VPS Monitor")
+WEBAUTHN_ORIGIN = os.getenv("WEBAUTHN_ORIGIN", "http://localhost:3020")
+
+# In-memory challenge store: session_id → {challenge, username, expires_at}
+_webauthn_challenges: dict[str, dict] = {}
+
bearer_scheme = HTTPBearer()
# ─── Modèles ──────────────────────────────────────────────────────────────────
@@ -110,6 +138,21 @@ class PurgeRequest(BaseModel):
to_ts: int | None = None # epoch seconds, utilisé si period == 'custom'
+class PasskeyRegisterFinishRequest(BaseModel):
+ session_id: str
+ name: str = "Ma passkey"
+ credential: dict
+
+
+class PasskeyLoginBeginRequest(BaseModel):
+ username: str | None = None
+
+
+class PasskeyLoginFinishRequest(BaseModel):
+ session_id: str
+ credential: dict
+
+
# ─── SQLite ───────────────────────────────────────────────────────────────────
@contextmanager
@@ -194,6 +237,20 @@ def init_db() -> None:
conn.execute("""
INSERT OR IGNORE INTO settings (key, value) VALUES ('registration_open', 'false')
""")
+ conn.execute("""
+ INSERT OR IGNORE INTO settings (key, value) VALUES ('passkey_enabled', 'true')
+ """)
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS passkeys (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ username TEXT NOT NULL,
+ credential_id TEXT NOT NULL UNIQUE,
+ public_key TEXT NOT NULL,
+ sign_count INTEGER NOT NULL DEFAULT 0,
+ name TEXT NOT NULL DEFAULT '',
+ created_at INTEGER NOT NULL
+ )
+ """)
init_db()
@@ -258,6 +315,98 @@ def _get_setting(key: str) -> str:
return row["value"] if row else ""
+# ─── WebAuthn / Passkey helpers ───────────────────────────────────────────────
+
+def _b64url_encode(b: bytes) -> str:
+ return base64.urlsafe_b64encode(b).rstrip(b"=").decode()
+
+
+def _b64url_decode(s: str) -> bytes:
+ padding = 4 - len(s) % 4
+ if padding != 4:
+ s += "=" * padding
+ return base64.urlsafe_b64decode(s)
+
+
+def _store_challenge(challenge: bytes, username: str | None = None) -> str:
+ """Stocke un challenge WebAuthn en mémoire et retourne un session_id."""
+ session_id = secrets.token_hex(16)
+ _webauthn_challenges[session_id] = {
+ "challenge": challenge,
+ "username": username,
+ "expires_at": time.time() + 300,
+ }
+ # Nettoyage des challenges expirés
+ now = time.time()
+ expired = [k for k, v in list(_webauthn_challenges.items()) if v["expires_at"] < now]
+ for k in expired:
+ _webauthn_challenges.pop(k, None)
+ return session_id
+
+
+def _pop_challenge(session_id: str) -> dict | None:
+ entry = _webauthn_challenges.pop(session_id, None)
+ if entry is None:
+ return None
+ if time.time() > entry["expires_at"]:
+ return None
+ return entry
+
+
+def _get_passkeys_for_user(username: str) -> list[dict]:
+ with get_db() as conn:
+ rows = conn.execute(
+ "SELECT * FROM passkeys WHERE username = ? ORDER BY created_at DESC",
+ (username,),
+ ).fetchall()
+ return [dict(r) for r in rows]
+
+
+def _get_passkey_by_credential_id(credential_id: str) -> dict | None:
+ with get_db() as conn:
+ row = conn.execute(
+ "SELECT * FROM passkeys WHERE credential_id = ?", (credential_id,)
+ ).fetchone()
+ return dict(row) if row else None
+
+
+def _update_passkey_sign_count(credential_id: str, sign_count: int) -> None:
+ with get_db() as conn:
+ conn.execute(
+ "UPDATE passkeys SET sign_count = ? WHERE credential_id = ?",
+ (sign_count, credential_id),
+ )
+
+
+def _parse_registration_credential(data: dict) -> WebAuthnRegistrationCredential:
+ resp = data["response"]
+ return WebAuthnRegistrationCredential(
+ id=data["id"],
+ raw_id=_b64url_decode(data.get("rawId", data["id"])),
+ response=AuthenticatorAttestationResponse(
+ client_data_json=_b64url_decode(resp["clientDataJSON"]),
+ attestation_object=_b64url_decode(resp["attestationObject"]),
+ ),
+ type=data["type"],
+ )
+
+
+def _parse_authentication_credential(data: dict) -> WebAuthnAuthenticationCredential:
+ resp = data["response"]
+ user_handle = resp.get("userHandle")
+ return WebAuthnAuthenticationCredential(
+ id=data["id"],
+ raw_id=_b64url_decode(data.get("rawId", data["id"])),
+ response=AuthenticatorAssertionResponse(
+ client_data_json=_b64url_decode(resp["clientDataJSON"]),
+ authenticator_data=_b64url_decode(resp["authenticatorData"]),
+ signature=_b64url_decode(resp["signature"]),
+ user_handle=_b64url_decode(user_handle) if user_handle else None,
+ ),
+ type=data["type"],
+ )
+
+
# ─── Auth helpers ─────────────────────────────────────────────────────────────
def create_token(username: str, role: str) -> str:
@@ -477,7 +626,10 @@ async def _cleanup_old_stats() -> None:
@app.get("/api/auth/status")
def auth_status():
"""Indique si des utilisateurs existent déjà (pour le frontend)."""
- return {"has_users": len(load_users()) > 0}
+ return {
+ "has_users": len(load_users()) > 0,
+ "passkey_enabled": _get_setting("passkey_enabled") == "true",
+ }
@app.post("/api/auth/register", status_code=201)
@@ -561,7 +713,7 @@ def admin_update_setting(
_: Annotated[dict, Depends(require_admin)],
):
"""Met à jour un paramètre d'administration."""
- allowed_keys = {"registration_open"}
+ allowed_keys = {"registration_open", "passkey_enabled"}
if key not in allowed_keys:
raise HTTPException(status_code=400, detail="Clé de paramètre inconnue")
with get_db() as conn:
@@ -680,6 +832,214 @@ def admin_db_purge(body: PurgeRequest, _: Annotated[dict, Depends(require_admin)
return {"deleted": deleted, "status": "ok"}
+# ─── Routes Passkeys ──────────────────────────────────────────────────────────
+
+@app.post("/api/auth/passkey/register/begin")
+def passkey_register_begin(
+ current_user: Annotated[dict, Depends(get_current_user)],
+):
+ """Démarre l'enregistrement d'une passkey pour l'utilisateur connecté."""
+ if _get_setting("passkey_enabled") != "true":
+ raise HTTPException(status_code=403, detail="Les passkeys sont désactivées")
+
+ username = current_user["username"]
+ existing = _get_passkeys_for_user(username)
+ exclude_creds = [
+ PublicKeyCredentialDescriptor(id=_b64url_decode(pk["credential_id"]))
+ for pk in existing
+ ]
+
+ opts = generate_registration_options(
+ rp_id=WEBAUTHN_RP_ID,
+ rp_name=WEBAUTHN_RP_NAME,
+ user_id=username.encode(),
+ user_name=username,
+ user_display_name=username,
+ authenticator_selection=AuthenticatorSelectionCriteria(
+ authenticator_attachment=AuthenticatorAttachment.PLATFORM,
+ resident_key=ResidentKeyRequirement.REQUIRED,
+ user_verification=UserVerificationRequirement.REQUIRED,
+ ),
+ exclude_credentials=exclude_creds,
+ )
+
+ session_id = _store_challenge(opts.challenge, username=username)
+ return {"session_id": session_id, "options": json.loads(options_to_json(opts))}
+
+
+@app.post("/api/auth/passkey/register/finish")
+def passkey_register_finish(
+ body: PasskeyRegisterFinishRequest,
+ current_user: Annotated[dict, Depends(get_current_user)],
+):
+ """Finalise l'enregistrement d'une passkey."""
+ if _get_setting("passkey_enabled") != "true":
+ raise HTTPException(status_code=403, detail="Les passkeys sont désactivées")
+
+ challenge_data = _pop_challenge(body.session_id)
+ if not challenge_data:
+ raise HTTPException(status_code=400, detail="Session expirée ou invalide")
+ if challenge_data["username"] != current_user["username"]:
+ raise HTTPException(status_code=403, detail="Session invalide")
+
+ try:
+ cred = _parse_registration_credential(body.credential)
+ verification = verify_registration_response(
+ credential=cred,
+ expected_challenge=challenge_data["challenge"],
+ expected_rp_id=WEBAUTHN_RP_ID,
+ expected_origin=WEBAUTHN_ORIGIN,
+ )
+ except Exception as exc:
+ raise HTTPException(status_code=400, detail=f"Vérification échouée : {exc}")
+
+ cred_id = _b64url_encode(verification.credential_id)
+ pub_key = _b64url_encode(verification.credential_public_key)
+
+ if _get_passkey_by_credential_id(cred_id):
+ raise HTTPException(status_code=409, detail="Cette passkey est déjà enregistrée")
+
+ with get_db() as conn:
+ conn.execute(
+ """INSERT INTO passkeys (username, credential_id, public_key, sign_count, name, created_at)
+ VALUES (?, ?, ?, ?, ?, ?)""",
+ (current_user["username"], cred_id, pub_key,
+ verification.sign_count, body.name, int(time.time())),
+ )
+ return {"status": "ok"}
+
+
+@app.post("/api/auth/passkey/login/begin")
+def passkey_login_begin(body: PasskeyLoginBeginRequest):
+ """Démarre l'authentification par passkey."""
+ if _get_setting("passkey_enabled") != "true":
+ raise HTTPException(status_code=403, detail="Les passkeys sont désactivées")
+
+ allow_creds: list[PublicKeyCredentialDescriptor] = []
+ if body.username:
+ passkeys = _get_passkeys_for_user(body.username)
+ allow_creds = [
+ PublicKeyCredentialDescriptor(id=_b64url_decode(pk["credential_id"]))
+ for pk in passkeys
+ ]
+
+ opts = generate_authentication_options(
+ rp_id=WEBAUTHN_RP_ID,
+ allow_credentials=allow_creds,
+ user_verification=UserVerificationRequirement.REQUIRED,
+ )
+
+ session_id = _store_challenge(opts.challenge, username=body.username)
+ return {"session_id": session_id, "options": json.loads(options_to_json(opts))}
+
+
+@app.post("/api/auth/passkey/login/finish")
+def passkey_login_finish(body: PasskeyLoginFinishRequest, request: Request):
+ """Vérifie la passkey et retourne un JWT."""
+ if _get_setting("passkey_enabled") != "true":
+ raise HTTPException(status_code=403, detail="Les passkeys sont désactivées")
+
+ challenge_data = _pop_challenge(body.session_id)
+ if not challenge_data:
+ raise HTTPException(status_code=400, detail="Session expirée ou invalide")
+
+ cred_id = body.credential.get("id", "")
+ passkey = _get_passkey_by_credential_id(cred_id)
+ if not passkey:
+ ip = _get_client_ip(request)
+ _log_login("unknown", ip, False, "Passkey introuvable")
+ raise HTTPException(status_code=401, detail="Passkey non reconnue")
+
+ try:
+ cred = _parse_authentication_credential(body.credential)
+ verification = verify_authentication_response(
+ credential=cred,
+ expected_challenge=challenge_data["challenge"],
+ expected_rp_id=WEBAUTHN_RP_ID,
+ expected_origin=WEBAUTHN_ORIGIN,
+ credential_public_key=_b64url_decode(passkey["public_key"]),
+ credential_current_sign_count=passkey["sign_count"],
+ )
+ except Exception as exc:
+ ip = _get_client_ip(request)
+ _log_login(passkey["username"], ip, False, f"Passkey invalide : {exc}")
+ raise HTTPException(status_code=401, detail="Authentification échouée")
+
+ _update_passkey_sign_count(cred_id, verification.new_sign_count)
+
+ user = next((u for u in load_users() if u["username"] == passkey["username"]), None)
+ if not user:
+ raise HTTPException(status_code=401, detail="Utilisateur introuvable")
+
+ ip = _get_client_ip(request)
+ _log_login(user["username"], ip, True, "passkey")
+ token = create_token(user["username"], user["role"])
+ return {"access_token": token, "token_type": "bearer", "role": user["role"]}
+
+
+@app.get("/api/auth/passkeys")
+def list_my_passkeys(current_user: Annotated[dict, Depends(get_current_user)]):
+ """Liste les passkeys de l'utilisateur connecté."""
+ passkeys = _get_passkeys_for_user(current_user["username"])
+ return [
+ {
+ "credential_id": pk["credential_id"],
+ "name": pk["name"],
+ "created_at": datetime.fromtimestamp(pk["created_at"], tz=timezone.utc).isoformat(),
+ }
+ for pk in passkeys
+ ]
+
+
+@app.delete("/api/auth/passkeys/{credential_id}")
+def delete_my_passkey(
+ credential_id: str,
+ current_user: Annotated[dict, Depends(get_current_user)],
+):
+ """Supprime une passkey de l'utilisateur connecté."""
+ with get_db() as conn:
+ cur = conn.execute(
+ "DELETE FROM passkeys WHERE credential_id = ? AND username = ?",
+ (credential_id, current_user["username"]),
+ )
+ if cur.rowcount == 0:
+ raise HTTPException(status_code=404, detail="Passkey introuvable")
+ return {"status": "ok"}
+
+
+@app.get("/api/admin/passkeys")
+def admin_list_passkeys(_: Annotated[dict, Depends(require_admin)]):
+ """Admin : liste toutes les passkeys enregistrées."""
+ with get_db() as conn:
+ rows = conn.execute(
+ "SELECT * FROM passkeys ORDER BY created_at DESC"
+ ).fetchall()
+ return [
+ {
+ "credential_id": row["credential_id"],
+ "username": row["username"],
+ "name": row["name"],
+ "created_at": datetime.fromtimestamp(row["created_at"], tz=timezone.utc).isoformat(),
+ }
+ for row in rows
+ ]
+
+
+@app.delete("/api/admin/passkeys/{credential_id}")
+def admin_delete_passkey(
+ credential_id: str,
+ _: Annotated[dict, Depends(require_admin)],
+):
+ """Admin : révoque n'importe quelle passkey."""
+ with get_db() as conn:
+ cur = conn.execute(
+ "DELETE FROM passkeys WHERE credential_id = ?", (credential_id,)
+ )
+ if cur.rowcount == 0:
+ raise HTTPException(status_code=404, detail="Passkey introuvable")
+ return {"status": "ok"}
+
+
# ─── Routes VPS ───────────────────────────────────────────────────────────────
@app.get("/api/vps")
diff --git a/vps-monitor/backend/requirements.txt b/vps-monitor/backend/requirements.txt
index a6c3ec3..869a466 100644
--- a/vps-monitor/backend/requirements.txt
+++ b/vps-monitor/backend/requirements.txt
@@ -4,3 +4,4 @@ aiohttp>=3.9.0
pydantic>=2.0.0
python-jose[cryptography]>=3.3.0
bcrypt>=4.0.0
+webauthn>=2.0.0
diff --git a/vps-monitor/frontend/src/App.jsx b/vps-monitor/frontend/src/App.jsx
index 2851778..cde6a89 100644
--- a/vps-monitor/frontend/src/App.jsx
+++ b/vps-monitor/frontend/src/App.jsx
@@ -25,6 +25,7 @@ export default function App() {
const [role, setRole] = useState(null)
const [page, setPage] = useState('main') // 'main' | 'profile' | 'admin'
const [isFirstUser, setIsFirstUser] = useState(false)
+ const [passkeyEnabled, setPasskeyEnabled] = useState(false)
const [authChecked, setAuthChecked] = useState(false)
const [vpsList, setVpsList] = useState([])
@@ -56,7 +57,10 @@ export default function App() {
// Vérifie si des utilisateurs existent (pour afficher login ou register)
useEffect(() => {
authStatus()
- .then(({ has_users }) => setIsFirstUser(!has_users))
+ .then(({ has_users, passkey_enabled }) => {
+ setIsFirstUser(!has_users)
+ setPasskeyEnabled(!!passkey_enabled)
+ })
.catch(() => setIsFirstUser(false))
.finally(() => setAuthChecked(true))
}, [])
@@ -211,6 +215,7 @@ export default function App() {
return (
+ Gérez les passkeys (TouchID / FaceID) de tous les utilisateurs. +
+Chargement…
+ : adminPasskeys.length === 0 + ? ( +Aucune passkey enregistrée.
+| Utilisateur | +Appareil | +Enregistrée le | ++ |
|---|---|---|---|
| {pk.username} | +
+ |
+ + {new Date(pk.created_at).toLocaleString('fr-FR')} + | ++ + | +
+ Connectez-vous sans mot de passe grâce à la biométrie de votre appareil. +
+ + {passkeysError && ( +Chargement…
+ : passkeys.length === 0 + ?Aucune passkey enregistrée.
+ : ( +{pk.name}
++ {new Date(pk.created_at).toLocaleDateString('fr-FR')} +
+