Files
bincio-auth/bincio/auth/deps.py
T
Davide Scaini ddd15cae0f auth: add FastAPI service — models, deps, server, routers, CLI
Steps 3–7 of the migration plan:
- models.py: Pydantic request/response types
- deps.py: shared state, JWT-based auth helpers, rate limiting
- server.py: FastAPI app with CORS + gzip
- routers/auth.py: login, logout, /api/me, reset-password, register
- routers/invites.py: GET/POST /api/invites
- routers/admin.py: user listing, suspend/unsuspend, delete, access flags, reset-password-code
- cli.py: `bincio-auth init` (creates DB + admin + JWT secret) and `bincio-auth serve`

Cookie carries a signed JWT (HS256); consumers validate locally with shared secret.
2026-06-02 14:38:56 +02:00

157 lines
4.7 KiB
Python

"""Shared state and FastAPI dependency functions for bincio-auth."""
from __future__ import annotations
import os
import re
import time
from pathlib import Path
import jwt as _jwt
from fastapi import Cookie, HTTPException, Request, Response
from bincio.auth.db import User, get_user, open_db
from bincio.auth.tokens import create_token, decode_token
# ── Module-level state (set by CLI before uvicorn starts) ─────────────────────
data_dir: Path | None = None
jwt_secret: str = ""
_db = None
# ── Constants ─────────────────────────────────────────────────────────────────
_VALID_HANDLE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,29}$")
_SESSION_COOKIE = "bincio_session"
_COOKIE_MAX_AGE = 30 * 86400
_COOKIE_DOMAIN = os.environ.get("SESSION_DOMAIN") or None
_JWT_TTL = 30 * 86400
_login_attempts: dict[str, list[float]] = {}
_register_attempts: dict[str, list[float]] = {}
_RATE_WINDOW = 900
_LOGIN_RATE_LIMIT = 10
_REGISTER_RATE_LIMIT = 5
# ── Core helpers ──────────────────────────────────────────────────────────────
def _get_data_dir() -> Path:
if data_dir is None:
raise HTTPException(500, "Server not configured")
return data_dir
def _get_db():
global _db
if _db is None:
_db = open_db(_get_data_dir())
return _db
def _issue_jwt(user: User) -> str:
return create_token(
{
"sub": user.handle,
"display_name": user.display_name,
"is_admin": user.is_admin,
"wiki_access": user.wiki_access,
"activity_access": user.activity_access,
},
jwt_secret,
_JWT_TTL,
)
# ── Rate limiting ─────────────────────────────────────────────────────────────
def _check_rate_limit(
ip: str,
store: dict[str, list[float]],
limit: int,
msg: str = "Too many attempts. Try again later.",
) -> None:
now = time.time()
attempts = [t for t in store.get(ip, []) if now - t < _RATE_WINDOW]
store[ip] = attempts
if len(attempts) >= limit:
raise HTTPException(429, msg)
attempts.append(now)
store[ip] = attempts
# ── Auth dependency functions ─────────────────────────────────────────────────
def _decode_session(token: str) -> User | None:
"""Decode JWT and return the live User, or None if invalid/suspended."""
try:
payload = decode_token(token, jwt_secret)
except _jwt.PyJWTError:
return None
handle = payload.get("sub")
if not handle:
return None
user = get_user(_get_db(), handle)
if not user or user.suspended:
return None
return user
def _current_user(bincio_session: str | None = Cookie(default=None)) -> User | None:
if not bincio_session:
return None
return _decode_session(bincio_session)
def _require_user(bincio_session: str | None = Cookie(default=None)) -> User:
user = _current_user(bincio_session)
if not user:
raise HTTPException(401, "Not authenticated")
return user
def _require_admin(bincio_session: str | None = Cookie(default=None)) -> User:
user = _require_user(bincio_session)
if not user.is_admin:
raise HTTPException(403, "Admin required")
return user
def _require_auth(
request: Request,
bincio_session: str | None = Cookie(default=None),
) -> User:
"""Accept JWT cookie (web) OR Authorization: Bearer token (mobile)."""
token = bincio_session
if not token:
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Not authenticated")
user = _decode_session(token)
if not user:
raise HTTPException(401, "Invalid or expired token")
return user
def _set_session_cookie(response: Response, token: str) -> None:
kwargs: dict = dict(
key=_SESSION_COOKIE,
value=token,
max_age=_COOKIE_MAX_AGE,
httponly=True,
samesite="lax",
secure=False,
)
if _COOKIE_DOMAIN:
kwargs["domain"] = _COOKIE_DOMAIN
response.set_cookie(**kwargs)
def _clear_session_cookie(response: Response) -> None:
kwargs: dict = dict(key=_SESSION_COOKIE)
if _COOKIE_DOMAIN:
kwargs["domain"] = _COOKIE_DOMAIN
response.delete_cookie(**kwargs)