Add session auth: FastAPI login/logout/me endpoints, uv-managed deps, fix cookie on JSONResponse

This commit is contained in:
brutsalvadi
2026-04-23 15:29:56 +02:00
parent f5a38b2ca8
commit 3632ab3e9a
7 changed files with 910 additions and 17 deletions
+144 -6
View File
@@ -4,10 +4,15 @@ from __future__ import annotations
import os
import re
import secrets
import sqlite3
import subprocess
import time
from contextlib import contextmanager
from pathlib import Path
from fastapi import FastAPI, HTTPException
import bcrypt
from fastapi import Cookie, Depends, FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import JSONResponse
@@ -17,19 +22,150 @@ from pydantic import BaseModel
_ROOT = Path(__file__).parent.parent
pages_dir: Path = _ROOT / os.environ.get("WIKI_PAGES_DIR", "pages")
site_dir: Path = _ROOT / "site"
_DB_PATH = _ROOT / "data" / "wiki.db"
_SAFE_SLUG = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_\-/]*$")
_SESSION_TTL = 7 * 24 * 3600 # 7 days
def _hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def _verify_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode(), hashed.encode())
# ── Database ──────────────────────────────────────────────────────────────────
@contextmanager
def _db():
con = sqlite3.connect(_DB_PATH)
con.row_factory = sqlite3.Row
try:
yield con
finally:
con.close()
def _init_db() -> None:
_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
with _db() as con:
con.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS sessions (
token TEXT PRIMARY KEY,
user_id INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
""")
con.commit()
# Seed first admin user from env vars if the table is empty
count = con.execute("SELECT COUNT(*) FROM users").fetchone()[0]
if count == 0:
admin_user = os.environ.get("WIKI_ADMIN_USER")
admin_pass = os.environ.get("WIKI_ADMIN_PASSWORD")
if admin_user and admin_pass:
ph = _hash_password(admin_pass)
con.execute(
"INSERT INTO users (username, password_hash) VALUES (?, ?)",
(admin_user, ph),
)
con.commit()
_init_db()
# ── Auth helpers ──────────────────────────────────────────────────────────────
def _get_session_user(token: str | None) -> dict | None:
if not token:
return None
with _db() as con:
row = con.execute(
"""SELECT u.id, u.username
FROM sessions s JOIN users u ON u.id = s.user_id
WHERE s.token = ? AND s.expires_at > ?""",
(token, int(time.time())),
).fetchone()
return dict(row) if row else None
async def require_auth(bincio_session: str | None = Cookie(default=None)) -> dict:
user = _get_session_user(bincio_session)
if not user:
raise HTTPException(401, "Authentication required")
return user
# ── App setup ─────────────────────────────────────────────────────────────────
_extra_origin = os.environ.get("WIKI_ORIGIN", "")
_origins = [_extra_origin] if _extra_origin else []
app = FastAPI(title="BincioWiki Edit Sidecar", docs_url=None, redoc_url=None)
app.add_middleware(GZipMiddleware, minimum_size=1024)
app.add_middleware(
CORSMiddleware,
allow_origins=_origins,
allow_origin_regex=r"https?://localhost(:\d+)?",
allow_credentials=True,
allow_methods=["GET", "POST", "DELETE"],
allow_headers=["Content-Type"],
)
# ── Auth endpoints ────────────────────────────────────────────────────────────
class LoginBody(BaseModel):
username: str
password: str
@app.post("/api/auth/login")
async def login(body: LoginBody) -> JSONResponse:
with _db() as con:
row = con.execute(
"SELECT id, username, password_hash FROM users WHERE username = ?",
(body.username,),
).fetchone()
if not row or not _verify_password(body.password, row["password_hash"]):
raise HTTPException(401, "Credenziali non valide")
token = secrets.token_urlsafe(32)
expires = int(time.time()) + _SESSION_TTL
with _db() as con:
con.execute(
"INSERT INTO sessions (token, user_id, expires_at) VALUES (?, ?, ?)",
(token, row["id"], expires),
)
con.commit()
resp = JSONResponse({"username": row["username"]})
resp.set_cookie("bincio_session", token, httponly=True, samesite="lax", max_age=_SESSION_TTL)
return resp
@app.post("/api/auth/logout")
async def logout(bincio_session: str | None = Cookie(default=None)) -> JSONResponse:
if bincio_session:
with _db() as con:
con.execute("DELETE FROM sessions WHERE token = ?", (bincio_session,))
con.commit()
resp = JSONResponse({"ok": True})
resp.delete_cookie("bincio_session", httponly=True, samesite="lax")
return resp
@app.get("/api/me")
async def me(user: dict = Depends(require_auth)) -> JSONResponse:
return JSONResponse({"username": user["username"]})
# ── Page helpers ──────────────────────────────────────────────────────────────
def _slug_to_path(slug: str) -> Path:
if not _SAFE_SLUG.match(slug):
@@ -44,8 +180,10 @@ class PageBody(BaseModel):
content: str
# ── Page endpoints (all require auth) ────────────────────────────────────────
@app.get("/pages")
async def list_pages() -> JSONResponse:
async def list_pages(user: dict = Depends(require_auth)) -> JSONResponse:
"""Return all wiki page slugs."""
if not pages_dir.exists():
return JSONResponse({"pages": []})
@@ -57,7 +195,7 @@ async def list_pages() -> JSONResponse:
@app.get("/pages/{slug:path}")
async def get_page(slug: str) -> JSONResponse:
async def get_page(slug: str, user: dict = Depends(require_auth)) -> JSONResponse:
"""Return raw markdown content for a page."""
path = _slug_to_path(slug)
if not path.exists():
@@ -66,7 +204,7 @@ async def get_page(slug: str) -> JSONResponse:
@app.post("/pages/{slug:path}")
async def save_page(slug: str, body: PageBody) -> JSONResponse:
async def save_page(slug: str, body: PageBody, user: dict = Depends(require_auth)) -> JSONResponse:
"""Create or update a wiki page."""
path = _slug_to_path(slug)
path.parent.mkdir(parents=True, exist_ok=True)
@@ -75,7 +213,7 @@ async def save_page(slug: str, body: PageBody) -> JSONResponse:
@app.delete("/pages/{slug:path}")
async def delete_page(slug: str) -> JSONResponse:
async def delete_page(slug: str, user: dict = Depends(require_auth)) -> JSONResponse:
"""Delete a wiki page."""
path = _slug_to_path(slug)
if not path.exists():
@@ -85,7 +223,7 @@ async def delete_page(slug: str) -> JSONResponse:
@app.post("/rebuild")
async def rebuild() -> JSONResponse:
async def rebuild(user: dict = Depends(require_auth)) -> JSONResponse:
"""Trigger an astro build of the site."""
try:
result = subprocess.run(