Feat : add Webauth
All checks were successful
Build and Push Docker Images / docker (push) Successful in 1m22s

This commit is contained in:
jeanotx32
2026-06-02 19:46:58 -04:00
parent 6616e6525d
commit 57132f92ee
8 changed files with 888 additions and 58 deletions

View File

@@ -5,6 +5,7 @@ Agrège les données de tous les agents et expose une API REST pour le frontend.
"""
import asyncio
import base64
import json
import os
import secrets
@@ -24,6 +25,25 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from pydantic import BaseModel
from webauthn import (
generate_registration_options,
verify_registration_response,
generate_authentication_options,
verify_authentication_response,
options_to_json,
)
from webauthn.helpers.structs import (
AuthenticatorAttachment,
AuthenticatorSelectionCriteria,
UserVerificationRequirement,
ResidentKeyRequirement,
PublicKeyCredentialDescriptor,
RegistrationCredential as WebAuthnRegistrationCredential,
AuthenticatorAttestationResponse,
AuthenticationCredential as WebAuthnAuthenticationCredential,
AuthenticatorAssertionResponse,
)
# ─── Config ───────────────────────────────────────────────────────────────────
DB_FILE = Path(os.getenv("DB_FILE", "data/monitor.db"))
@@ -53,6 +73,14 @@ def _load_jwt_secret() -> str:
JWT_SECRET = _load_jwt_secret()
# ─── WebAuthn / Passkeys config ───────────────────────────────────────────────
WEBAUTHN_RP_ID = os.getenv("WEBAUTHN_RP_ID", "localhost")
WEBAUTHN_RP_NAME = os.getenv("WEBAUTHN_RP_NAME", "VPS Monitor")
WEBAUTHN_ORIGIN = os.getenv("WEBAUTHN_ORIGIN", "http://localhost:3020")
# In-memory challenge store: session_id → {challenge, username, expires_at}
_webauthn_challenges: dict[str, dict] = {}
bearer_scheme = HTTPBearer()
# ─── Modèles ──────────────────────────────────────────────────────────────────
@@ -110,6 +138,21 @@ class PurgeRequest(BaseModel):
to_ts: int | None = None # epoch seconds, utilisé si period == 'custom'
class PasskeyRegisterFinishRequest(BaseModel):
session_id: str
name: str = "Ma passkey"
credential: dict
class PasskeyLoginBeginRequest(BaseModel):
username: str | None = None
class PasskeyLoginFinishRequest(BaseModel):
session_id: str
credential: dict
# ─── SQLite ───────────────────────────────────────────────────────────────────
@contextmanager
@@ -194,6 +237,20 @@ def init_db() -> None:
conn.execute("""
INSERT OR IGNORE INTO settings (key, value) VALUES ('registration_open', 'false')
""")
conn.execute("""
INSERT OR IGNORE INTO settings (key, value) VALUES ('passkey_enabled', 'true')
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS passkeys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
credential_id TEXT NOT NULL UNIQUE,
public_key TEXT NOT NULL,
sign_count INTEGER NOT NULL DEFAULT 0,
name TEXT NOT NULL DEFAULT '',
created_at INTEGER NOT NULL
)
""")
init_db()
@@ -258,6 +315,98 @@ def _get_setting(key: str) -> str:
return row["value"] if row else ""
# ─── WebAuthn / Passkey helpers ───────────────────────────────────────────────
def _b64url_encode(b: bytes) -> str:
return base64.urlsafe_b64encode(b).rstrip(b"=").decode()
def _b64url_decode(s: str) -> bytes:
padding = 4 - len(s) % 4
if padding != 4:
s += "=" * padding
return base64.urlsafe_b64decode(s)
def _store_challenge(challenge: bytes, username: str | None = None) -> str:
"""Stocke un challenge WebAuthn en mémoire et retourne un session_id."""
session_id = secrets.token_hex(16)
_webauthn_challenges[session_id] = {
"challenge": challenge,
"username": username,
"expires_at": time.time() + 300,
}
# Nettoyage des challenges expirés
now = time.time()
expired = [k for k, v in list(_webauthn_challenges.items()) if v["expires_at"] < now]
for k in expired:
_webauthn_challenges.pop(k, None)
return session_id
def _pop_challenge(session_id: str) -> dict | None:
entry = _webauthn_challenges.pop(session_id, None)
if entry is None:
return None
if time.time() > entry["expires_at"]:
return None
return entry
def _get_passkeys_for_user(username: str) -> list[dict]:
with get_db() as conn:
rows = conn.execute(
"SELECT * FROM passkeys WHERE username = ? ORDER BY created_at DESC",
(username,),
).fetchall()
return [dict(r) for r in rows]
def _get_passkey_by_credential_id(credential_id: str) -> dict | None:
with get_db() as conn:
row = conn.execute(
"SELECT * FROM passkeys WHERE credential_id = ?", (credential_id,)
).fetchone()
return dict(row) if row else None
def _update_passkey_sign_count(credential_id: str, sign_count: int) -> None:
with get_db() as conn:
conn.execute(
"UPDATE passkeys SET sign_count = ? WHERE credential_id = ?",
(sign_count, credential_id),
)
def _parse_registration_credential(data: dict) -> WebAuthnRegistrationCredential:
resp = data["response"]
return WebAuthnRegistrationCredential(
id=data["id"],
raw_id=_b64url_decode(data.get("rawId", data["id"])),
response=AuthenticatorAttestationResponse(
client_data_json=_b64url_decode(resp["clientDataJSON"]),
attestation_object=_b64url_decode(resp["attestationObject"]),
),
type=data["type"],
)
def _parse_authentication_credential(data: dict) -> WebAuthnAuthenticationCredential:
resp = data["response"]
user_handle = resp.get("userHandle")
return WebAuthnAuthenticationCredential(
id=data["id"],
raw_id=_b64url_decode(data.get("rawId", data["id"])),
response=AuthenticatorAssertionResponse(
client_data_json=_b64url_decode(resp["clientDataJSON"]),
authenticator_data=_b64url_decode(resp["authenticatorData"]),
signature=_b64url_decode(resp["signature"]),
user_handle=_b64url_decode(user_handle) if user_handle else None,
),
type=data["type"],
)
# ─── Auth helpers ─────────────────────────────────────────────────────────────
def create_token(username: str, role: str) -> str:
@@ -477,7 +626,10 @@ async def _cleanup_old_stats() -> None:
@app.get("/api/auth/status")
def auth_status():
"""Indique si des utilisateurs existent déjà (pour le frontend)."""
return {"has_users": len(load_users()) > 0}
return {
"has_users": len(load_users()) > 0,
"passkey_enabled": _get_setting("passkey_enabled") == "true",
}
@app.post("/api/auth/register", status_code=201)
@@ -561,7 +713,7 @@ def admin_update_setting(
_: Annotated[dict, Depends(require_admin)],
):
"""Met à jour un paramètre d'administration."""
allowed_keys = {"registration_open"}
allowed_keys = {"registration_open", "passkey_enabled"}
if key not in allowed_keys:
raise HTTPException(status_code=400, detail="Clé de paramètre inconnue")
with get_db() as conn:
@@ -680,6 +832,214 @@ def admin_db_purge(body: PurgeRequest, _: Annotated[dict, Depends(require_admin)
return {"deleted": deleted, "status": "ok"}
# ─── Routes Passkeys ──────────────────────────────────────────────────────────
@app.post("/api/auth/passkey/register/begin")
def passkey_register_begin(
current_user: Annotated[dict, Depends(get_current_user)],
):
"""Démarre l'enregistrement d'une passkey pour l'utilisateur connecté."""
if _get_setting("passkey_enabled") != "true":
raise HTTPException(status_code=403, detail="Les passkeys sont désactivées")
username = current_user["username"]
existing = _get_passkeys_for_user(username)
exclude_creds = [
PublicKeyCredentialDescriptor(id=_b64url_decode(pk["credential_id"]))
for pk in existing
]
opts = generate_registration_options(
rp_id=WEBAUTHN_RP_ID,
rp_name=WEBAUTHN_RP_NAME,
user_id=username.encode(),
user_name=username,
user_display_name=username,
authenticator_selection=AuthenticatorSelectionCriteria(
authenticator_attachment=AuthenticatorAttachment.PLATFORM,
resident_key=ResidentKeyRequirement.REQUIRED,
user_verification=UserVerificationRequirement.REQUIRED,
),
exclude_credentials=exclude_creds,
)
session_id = _store_challenge(opts.challenge, username=username)
return {"session_id": session_id, "options": json.loads(options_to_json(opts))}
@app.post("/api/auth/passkey/register/finish")
def passkey_register_finish(
body: PasskeyRegisterFinishRequest,
current_user: Annotated[dict, Depends(get_current_user)],
):
"""Finalise l'enregistrement d'une passkey."""
if _get_setting("passkey_enabled") != "true":
raise HTTPException(status_code=403, detail="Les passkeys sont désactivées")
challenge_data = _pop_challenge(body.session_id)
if not challenge_data:
raise HTTPException(status_code=400, detail="Session expirée ou invalide")
if challenge_data["username"] != current_user["username"]:
raise HTTPException(status_code=403, detail="Session invalide")
try:
cred = _parse_registration_credential(body.credential)
verification = verify_registration_response(
credential=cred,
expected_challenge=challenge_data["challenge"],
expected_rp_id=WEBAUTHN_RP_ID,
expected_origin=WEBAUTHN_ORIGIN,
)
except Exception as exc:
raise HTTPException(status_code=400, detail=f"Vérification échouée : {exc}")
cred_id = _b64url_encode(verification.credential_id)
pub_key = _b64url_encode(verification.credential_public_key)
if _get_passkey_by_credential_id(cred_id):
raise HTTPException(status_code=409, detail="Cette passkey est déjà enregistrée")
with get_db() as conn:
conn.execute(
"""INSERT INTO passkeys (username, credential_id, public_key, sign_count, name, created_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(current_user["username"], cred_id, pub_key,
verification.sign_count, body.name, int(time.time())),
)
return {"status": "ok"}
@app.post("/api/auth/passkey/login/begin")
def passkey_login_begin(body: PasskeyLoginBeginRequest):
"""Démarre l'authentification par passkey."""
if _get_setting("passkey_enabled") != "true":
raise HTTPException(status_code=403, detail="Les passkeys sont désactivées")
allow_creds: list[PublicKeyCredentialDescriptor] = []
if body.username:
passkeys = _get_passkeys_for_user(body.username)
allow_creds = [
PublicKeyCredentialDescriptor(id=_b64url_decode(pk["credential_id"]))
for pk in passkeys
]
opts = generate_authentication_options(
rp_id=WEBAUTHN_RP_ID,
allow_credentials=allow_creds,
user_verification=UserVerificationRequirement.REQUIRED,
)
session_id = _store_challenge(opts.challenge, username=body.username)
return {"session_id": session_id, "options": json.loads(options_to_json(opts))}
@app.post("/api/auth/passkey/login/finish")
def passkey_login_finish(body: PasskeyLoginFinishRequest, request: Request):
"""Vérifie la passkey et retourne un JWT."""
if _get_setting("passkey_enabled") != "true":
raise HTTPException(status_code=403, detail="Les passkeys sont désactivées")
challenge_data = _pop_challenge(body.session_id)
if not challenge_data:
raise HTTPException(status_code=400, detail="Session expirée ou invalide")
cred_id = body.credential.get("id", "")
passkey = _get_passkey_by_credential_id(cred_id)
if not passkey:
ip = _get_client_ip(request)
_log_login("unknown", ip, False, "Passkey introuvable")
raise HTTPException(status_code=401, detail="Passkey non reconnue")
try:
cred = _parse_authentication_credential(body.credential)
verification = verify_authentication_response(
credential=cred,
expected_challenge=challenge_data["challenge"],
expected_rp_id=WEBAUTHN_RP_ID,
expected_origin=WEBAUTHN_ORIGIN,
credential_public_key=_b64url_decode(passkey["public_key"]),
credential_current_sign_count=passkey["sign_count"],
)
except Exception as exc:
ip = _get_client_ip(request)
_log_login(passkey["username"], ip, False, f"Passkey invalide : {exc}")
raise HTTPException(status_code=401, detail="Authentification échouée")
_update_passkey_sign_count(cred_id, verification.new_sign_count)
user = next((u for u in load_users() if u["username"] == passkey["username"]), None)
if not user:
raise HTTPException(status_code=401, detail="Utilisateur introuvable")
ip = _get_client_ip(request)
_log_login(user["username"], ip, True, "passkey")
token = create_token(user["username"], user["role"])
return {"access_token": token, "token_type": "bearer", "role": user["role"]}
@app.get("/api/auth/passkeys")
def list_my_passkeys(current_user: Annotated[dict, Depends(get_current_user)]):
"""Liste les passkeys de l'utilisateur connecté."""
passkeys = _get_passkeys_for_user(current_user["username"])
return [
{
"credential_id": pk["credential_id"],
"name": pk["name"],
"created_at": datetime.fromtimestamp(pk["created_at"], tz=timezone.utc).isoformat(),
}
for pk in passkeys
]
@app.delete("/api/auth/passkeys/{credential_id}")
def delete_my_passkey(
credential_id: str,
current_user: Annotated[dict, Depends(get_current_user)],
):
"""Supprime une passkey de l'utilisateur connecté."""
with get_db() as conn:
cur = conn.execute(
"DELETE FROM passkeys WHERE credential_id = ? AND username = ?",
(credential_id, current_user["username"]),
)
if cur.rowcount == 0:
raise HTTPException(status_code=404, detail="Passkey introuvable")
return {"status": "ok"}
@app.get("/api/admin/passkeys")
def admin_list_passkeys(_: Annotated[dict, Depends(require_admin)]):
"""Admin : liste toutes les passkeys enregistrées."""
with get_db() as conn:
rows = conn.execute(
"SELECT * FROM passkeys ORDER BY created_at DESC"
).fetchall()
return [
{
"credential_id": row["credential_id"],
"username": row["username"],
"name": row["name"],
"created_at": datetime.fromtimestamp(row["created_at"], tz=timezone.utc).isoformat(),
}
for row in rows
]
@app.delete("/api/admin/passkeys/{credential_id}")
def admin_delete_passkey(
credential_id: str,
_: Annotated[dict, Depends(require_admin)],
):
"""Admin : révoque n'importe quelle passkey."""
with get_db() as conn:
cur = conn.execute(
"DELETE FROM passkeys WHERE credential_id = ?", (credential_id,)
)
if cur.rowcount == 0:
raise HTTPException(status_code=404, detail="Passkey introuvable")
return {"status": "ok"}
# ─── Routes VPS ───────────────────────────────────────────────────────────────
@app.get("/api/vps")

View File

@@ -4,3 +4,4 @@ aiohttp>=3.9.0
pydantic>=2.0.0
python-jose[cryptography]>=3.3.0
bcrypt>=4.0.0
webauthn>=2.0.0