feat: add user profile and admin management features
Some checks failed
Build and Push Docker Images / docker (push) Failing after 9s

This commit is contained in:
jeanotx32
2026-05-19 01:25:21 -04:00
parent 43dd3c614d
commit daf68d98fa
8 changed files with 646 additions and 23 deletions

View File

@@ -18,7 +18,7 @@ from typing import Annotated
import bcrypt as _bcrypt
import aiohttp
from fastapi import Depends, FastAPI, HTTPException
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
@@ -93,6 +93,15 @@ class LoginRequest(BaseModel):
password: str
class ChangePasswordRequest(BaseModel):
old_password: str
new_password: str
class SettingUpdateRequest(BaseModel):
value: str
# ─── SQLite ───────────────────────────────────────────────────────────────────
@contextmanager
@@ -154,6 +163,29 @@ def init_db() -> None:
CREATE INDEX IF NOT EXISTS idx_vps_stats
ON vps_stats(vps_id, ts DESC)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS login_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
username TEXT NOT NULL,
ip TEXT NOT NULL,
success INTEGER NOT NULL,
reason TEXT NOT NULL DEFAULT ''
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_login_logs_ts
ON login_logs(ts DESC)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
""")
conn.execute("""
INSERT OR IGNORE INTO settings (key, value) VALUES ('registration_open', 'false')
""")
init_db()
@@ -210,6 +242,14 @@ def update_vps(vps_id: str, data: dict) -> bool:
return cur.rowcount > 0
# ─── Settings helpers ────────────────────────────────────────────────────────
def _get_setting(key: str) -> str:
with get_db() as conn:
row = conn.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone()
return row["value"] if row else ""
# ─── Auth helpers ─────────────────────────────────────────────────────────────
def create_token(username: str, role: str) -> str:
@@ -237,6 +277,27 @@ def get_current_user(
return user
def require_admin(current_user: Annotated[dict, Depends(get_current_user)]) -> dict:
if current_user.get("role") != "admin":
raise HTTPException(status_code=403, detail="Accès réservé aux administrateurs")
return current_user
def _get_client_ip(request: Request) -> str:
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
return request.client.host if request.client else "unknown"
def _log_login(username: str, ip: str, success: bool, reason: str) -> None:
with get_db() as conn:
conn.execute(
"INSERT INTO login_logs (ts, username, ip, success, reason) VALUES (?, ?, ?, ?, ?)",
(int(time.time()), username, ip, 1 if success else 0, reason),
)
# ─── App ──────────────────────────────────────────────────────────────────────
app = FastAPI(title="VPS Monitor Backend", version="1.0.0")
@@ -400,18 +461,24 @@ def auth_status():
@app.post("/api/auth/register", status_code=201)
def register(body: RegisterRequest):
"""Enregistre le premier utilisateur (admin). Fermé ensuite."""
if len(load_users()) > 0:
raise HTTPException(
status_code=403,
detail="L'enregistrement public est désactivé. Seul l'admin peut créer des comptes."
)
"""Enregistre un nouvel utilisateur. Ouvert uniquement si aucun utilisateur n'existe
ou si l'admin a activé les inscriptions."""
users = load_users()
if len(users) > 0:
if _get_setting("registration_open") != "true":
raise HTTPException(
status_code=403,
detail="L'enregistrement public est désactivé. Seul l'admin peut créer des comptes."
)
if not body.username.strip() or len(body.password) < 6:
raise HTTPException(status_code=422, detail="Mot de passe trop court (6 caractères min.)")
if any(u["username"] == body.username.strip() for u in users):
raise HTTPException(status_code=409, detail="Ce nom d'utilisateur est déjà pris")
role = "admin" if len(users) == 0 else "user"
user = {
"username": body.username.strip(),
"password": _bcrypt.hashpw(body.password.encode(), _bcrypt.gensalt()).decode(),
"role": "admin",
"role": role,
}
add_user(user)
token = create_token(user["username"], user["role"])
@@ -419,12 +486,15 @@ def register(body: RegisterRequest):
@app.post("/api/auth/login")
def login(body: LoginRequest):
def login(body: LoginRequest, request: Request):
"""Authentifie un utilisateur et retourne un JWT."""
ip = _get_client_ip(request)
users = load_users()
user = next((u for u in users if u["username"] == body.username), None)
if not user or not _bcrypt.checkpw(body.password.encode(), user["password"].encode()):
_log_login(body.username, ip, False, "Identifiants incorrects")
raise HTTPException(status_code=401, detail="Identifiants incorrects")
_log_login(user["username"], ip, True, "")
token = create_token(user["username"], user["role"])
return {"access_token": token, "token_type": "bearer", "role": user["role"]}
@@ -434,6 +504,93 @@ def me(current_user: Annotated[dict, Depends(get_current_user)]):
return {"username": current_user["username"], "role": current_user["role"]}
@app.post("/api/auth/change-password")
def change_password(
body: ChangePasswordRequest,
current_user: Annotated[dict, Depends(get_current_user)],
):
"""Permet à l'utilisateur connecté de changer son mot de passe."""
if not _bcrypt.checkpw(body.old_password.encode(), current_user["password"].encode()):
raise HTTPException(status_code=400, detail="Ancien mot de passe incorrect")
if len(body.new_password) < 6:
raise HTTPException(status_code=422, detail="Nouveau mot de passe trop court (6 caractères min.)")
new_hash = _bcrypt.hashpw(body.new_password.encode(), _bcrypt.gensalt()).decode()
with get_db() as conn:
conn.execute(
"UPDATE users SET password = ? WHERE username = ?",
(new_hash, current_user["username"]),
)
return {"status": "ok"}
# ─── Routes Admin ─────────────────────────────────────────────────────────────
@app.get("/api/admin/settings")
def admin_get_settings(_: Annotated[dict, Depends(require_admin)]):
"""Retourne les paramètres d'administration."""
with get_db() as conn:
rows = conn.execute("SELECT key, value FROM settings").fetchall()
return {row["key"]: row["value"] for row in rows}
@app.put("/api/admin/settings/{key}")
def admin_update_setting(
key: str,
body: SettingUpdateRequest,
_: Annotated[dict, Depends(require_admin)],
):
"""Met à jour un paramètre d'administration."""
allowed_keys = {"registration_open"}
if key not in allowed_keys:
raise HTTPException(status_code=400, detail="Clé de paramètre inconnue")
with get_db() as conn:
conn.execute(
"INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
(key, body.value),
)
return {"status": "ok"}
@app.get("/api/admin/login-logs")
def admin_login_logs(
limit: int = 100,
offset: int = 0,
_: Annotated[dict, Depends(require_admin)] = None,
):
"""Retourne les tentatives de connexion enregistrées."""
limit = max(1, min(limit, 500))
offset = max(0, offset)
with get_db() as conn:
rows = conn.execute(
"SELECT * FROM login_logs ORDER BY ts DESC LIMIT ? OFFSET ?",
(limit, offset),
).fetchall()
total = conn.execute("SELECT COUNT(*) FROM login_logs").fetchone()[0]
return {
"total": total,
"logs": [
{
"id": row["id"],
"ts": datetime.fromtimestamp(row["ts"], tz=timezone.utc).isoformat(),
"username": row["username"],
"ip": row["ip"],
"success": bool(row["success"]),
"reason": row["reason"],
}
for row in rows
],
}
@app.get("/api/admin/users")
def admin_list_users(_: Annotated[dict, Depends(require_admin)]):
"""Liste les utilisateurs (sans mots de passe)."""
return [
{"username": u["username"], "role": u["role"]}
for u in load_users()
]
# ─── Routes VPS ───────────────────────────────────────────────────────────────
@app.get("/api/vps")