"""Shared state and FastAPI dependency functions for bincio-auth.""" from __future__ import annotations import os import re import time from pathlib import Path import jwt as _jwt from fastapi import Cookie, HTTPException, Request, Response from bincio.auth.db import User, get_user, open_db from bincio.auth.tokens import create_id_token, create_token, decode_token # ── Module-level state (set by CLI before uvicorn starts) ───────────────────── data_dir: Path | None = None jwt_secret: str = "" oidc_private_key_pem: str = "" # RS256 private key PEM (loaded from file at startup) oidc_issuer: str = "" # e.g. "https://bincio.org" _db = None # ── Constants ───────────────────────────────────────────────────────────────── _VALID_HANDLE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,29}$") _SESSION_COOKIE = "bincio_session" _COOKIE_MAX_AGE = 30 * 86400 _COOKIE_DOMAIN = os.environ.get("SESSION_DOMAIN") or None _JWT_TTL = 30 * 86400 _login_attempts: dict[str, list[float]] = {} _register_attempts: dict[str, list[float]] = {} _RATE_WINDOW = 900 _LOGIN_RATE_LIMIT = 10 _REGISTER_RATE_LIMIT = 5 # ── Core helpers ────────────────────────────────────────────────────────────── def _get_data_dir() -> Path: if data_dir is None: raise HTTPException(500, "Server not configured") return data_dir def _get_db(): global _db if _db is None: _db = open_db(_get_data_dir()) return _db def _issue_jwt(user: User) -> str: return create_token( { "sub": user.handle, "display_name": user.display_name, "is_admin": user.is_admin, "wiki_access": user.wiki_access, "activity_access": user.activity_access, }, jwt_secret, _JWT_TTL, ) _ID_TOKEN_TTL = 3600 # 1 hour — short-lived; clients use refresh or re-auth def _issue_id_token( user: User, client_id: str, nonce: str | None = None, ttl: int | None = None, ) -> str: """Issue an RS256 OIDC id_token for the given user and client.""" claims: dict = { "iss": oidc_issuer, "sub": user.handle, "aud": client_id, "name": user.display_name, "preferred_username": user.handle, # bincio-specific claims (used by bincio-activity for authz) "is_admin": user.is_admin, "wiki_access": user.wiki_access, "activity_access": user.activity_access, } if nonce: claims["nonce"] = nonce return create_id_token(claims, oidc_private_key_pem, ttl if ttl is not None else _ID_TOKEN_TTL) # ── Rate limiting ───────────────────────────────────────────────────────────── def _check_rate_limit( ip: str, store: dict[str, list[float]], limit: int, msg: str = "Too many attempts. Try again later.", ) -> None: now = time.time() attempts = [t for t in store.get(ip, []) if now - t < _RATE_WINDOW] store[ip] = attempts if len(attempts) >= limit: raise HTTPException(429, msg) attempts.append(now) store[ip] = attempts # ── Auth dependency functions ───────────────────────────────────────────────── def _decode_session(token: str) -> User | None: """Decode JWT and return the live User, or None if invalid/suspended.""" try: payload = decode_token(token, jwt_secret) except _jwt.PyJWTError: return None handle = payload.get("sub") if not handle: return None user = get_user(_get_db(), handle) if not user or user.suspended: return None return user def _current_user(bincio_session: str | None = Cookie(default=None)) -> User | None: if not bincio_session: return None return _decode_session(bincio_session) def _require_user(bincio_session: str | None = Cookie(default=None)) -> User: user = _current_user(bincio_session) if not user: raise HTTPException(401, "Not authenticated") return user def _require_admin(bincio_session: str | None = Cookie(default=None)) -> User: user = _require_user(bincio_session) if not user.is_admin: raise HTTPException(403, "Admin required") return user def _require_auth( request: Request, bincio_session: str | None = Cookie(default=None), ) -> User: """Accept JWT cookie (web) OR Authorization: Bearer token (mobile).""" token = bincio_session if not token: auth_header = request.headers.get("Authorization", "") if auth_header.startswith("Bearer "): token = auth_header[7:] if not token: raise HTTPException(401, "Not authenticated") user = _decode_session(token) if not user: raise HTTPException(401, "Invalid or expired token") return user def _set_session_cookie(response: Response, token: str) -> None: kwargs: dict = dict( key=_SESSION_COOKIE, value=token, max_age=_COOKIE_MAX_AGE, httponly=True, samesite="lax", secure=False, ) if _COOKIE_DOMAIN: kwargs["domain"] = _COOKIE_DOMAIN response.set_cookie(**kwargs) def _clear_session_cookie(response: Response) -> None: kwargs: dict = dict(key=_SESSION_COOKIE) if _COOKIE_DOMAIN: kwargs["domain"] = _COOKIE_DOMAIN response.delete_cookie(**kwargs)