Files
bincio-auth/bincio/auth/deps.py
T

202 lines
6.2 KiB
Python

"""Shared state and FastAPI dependency functions for bincio-auth."""
from __future__ import annotations
import os
import re
import time
from pathlib import Path
import jwt as _jwt
from fastapi import Cookie, HTTPException, Request, Response
from bincio.auth.db import User, get_user, open_db
from bincio.auth.tokens import create_id_token, create_token, decode_token
# ── Module-level state (set by CLI before uvicorn starts) ─────────────────────
data_dir: Path | None = None
jwt_secret: str = ""
oidc_private_key_pem: str = "" # RS256 private key PEM (loaded from file at startup)
oidc_issuer: str = "" # e.g. "https://bincio.org"
_db = None
# ── Constants ─────────────────────────────────────────────────────────────────
_VALID_HANDLE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,29}$")
_SESSION_COOKIE = "bincio_session"
_COOKIE_MAX_AGE = 30 * 86400
_COOKIE_DOMAIN = os.environ.get("SESSION_DOMAIN") or None
_JWT_TTL = 30 * 86400
_login_attempts: dict[str, list[float]] = {}
_register_attempts: dict[str, list[float]] = {}
_RATE_WINDOW = 900
_LOGIN_RATE_LIMIT = 10
_REGISTER_RATE_LIMIT = 5
# ── Core helpers ──────────────────────────────────────────────────────────────
def _get_data_dir() -> Path:
if data_dir is None:
raise HTTPException(500, "Server not configured")
return data_dir
def _get_db():
global _db
if _db is None:
_db = open_db(_get_data_dir())
return _db
def _issue_jwt(user: User) -> str:
return create_token(
{
"sub": user.handle,
"display_name": user.display_name,
"is_admin": user.is_admin,
"wiki_access": user.wiki_access,
"activity_access": user.activity_access,
},
jwt_secret,
_JWT_TTL,
)
_ID_TOKEN_TTL = 3600 # 1 hour — short-lived; clients use refresh or re-auth
def _issue_id_token(
user: User,
client_id: str,
nonce: str | None = None,
ttl: int | None = None,
) -> str:
"""Issue an RS256 OIDC id_token for the given user and client."""
claims: dict = {
"iss": oidc_issuer,
"sub": user.handle,
"aud": client_id,
"name": user.display_name,
"preferred_username": user.handle,
# bincio-specific claims (used by bincio-activity for authz)
"is_admin": user.is_admin,
"wiki_access": user.wiki_access,
"activity_access": user.activity_access,
}
if nonce:
claims["nonce"] = nonce
return create_id_token(claims, oidc_private_key_pem, ttl if ttl is not None else _ID_TOKEN_TTL)
# ── Rate limiting ─────────────────────────────────────────────────────────────
def _check_rate_limit(
ip: str,
store: dict[str, list[float]],
limit: int,
msg: str = "Too many attempts. Try again later.",
) -> None:
now = time.time()
attempts = [t for t in store.get(ip, []) if now - t < _RATE_WINDOW]
store[ip] = attempts
if len(attempts) >= limit:
raise HTTPException(429, msg)
attempts.append(now)
store[ip] = attempts
# ── Auth dependency functions ─────────────────────────────────────────────────
def _decode_session(token: str) -> User | None:
"""Decode JWT (RS256 or HS256) and return the live User, or None if invalid/suspended."""
payload = None
if oidc_private_key_pem:
try:
from cryptography.hazmat.primitives.serialization import load_pem_private_key
priv = load_pem_private_key(oidc_private_key_pem.encode(), password=None)
payload = _jwt.decode(
token,
priv.public_key(),
algorithms=["RS256"],
options={"verify_aud": False},
)
except Exception:
pass
if payload is None:
try:
payload = decode_token(token, jwt_secret)
except _jwt.PyJWTError:
return None
handle = payload.get("sub")
if not handle:
return None
user = get_user(_get_db(), handle)
if not user or user.suspended:
return None
return user
def _current_user(bincio_session: str | None = Cookie(default=None)) -> User | None:
if not bincio_session:
return None
return _decode_session(bincio_session)
def _require_user(bincio_session: str | None = Cookie(default=None)) -> User:
user = _current_user(bincio_session)
if not user:
raise HTTPException(401, "Not authenticated")
return user
def _require_admin(bincio_session: str | None = Cookie(default=None)) -> User:
user = _require_user(bincio_session)
if not user.is_admin:
raise HTTPException(403, "Admin required")
return user
def _require_auth(
request: Request,
bincio_session: str | None = Cookie(default=None),
) -> User:
"""Accept JWT cookie (web) OR Authorization: Bearer token (mobile)."""
token = bincio_session
if not token:
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Not authenticated")
user = _decode_session(token)
if not user:
raise HTTPException(401, "Invalid or expired token")
return user
def _set_session_cookie(response: Response, token: str) -> None:
kwargs: dict = dict(
key=_SESSION_COOKIE,
value=token,
max_age=_COOKIE_MAX_AGE,
httponly=True,
samesite="lax",
secure=False,
)
if _COOKIE_DOMAIN:
kwargs["domain"] = _COOKIE_DOMAIN
response.set_cookie(**kwargs)
def _clear_session_cookie(response: Response) -> None:
kwargs: dict = dict(key=_SESSION_COOKIE)
if _COOKIE_DOMAIN:
kwargs["domain"] = _COOKIE_DOMAIN
response.delete_cookie(**kwargs)