"""Authentication and registration endpoints.""" from __future__ import annotations from fastapi import APIRouter, Cookie, HTTPException, Request from fastapi.responses import JSONResponse from bincio.auth import deps from bincio.auth.db import ( authenticate, change_password, count_activity_users, count_wiki_users, create_user, get_invite, get_setting, get_user, use_invite, use_reset_code, ) from bincio.auth.models import ( GenericResponse, LoginRequest, LoginResponse, RegisterRequest, RegisterResponse, ResetPasswordRequest, ) router = APIRouter() @router.post("/api/auth/login", response_model=LoginResponse) async def login(body: LoginRequest, request: Request) -> JSONResponse: ip = request.client.host if request.client else "unknown" deps._check_rate_limit(ip, deps._login_attempts, deps._LOGIN_RATE_LIMIT, "Too many login attempts. Try again later.") user = authenticate(deps._get_db(), body.handle.strip().lower(), body.password) if not user: raise HTTPException(401, "Invalid credentials") token = deps._issue_jwt(user) resp = JSONResponse({ "ok": True, "handle": user.handle, "display_name": user.display_name, "is_admin": user.is_admin, "wiki_access": user.wiki_access, "activity_access": user.activity_access, }) deps._set_session_cookie(resp, token) return resp @router.post("/api/auth/token") async def get_token(body: LoginRequest, request: Request) -> JSONResponse: """Mobile auth: same as login but returns the JWT in the body.""" ip = request.client.host if request.client else "unknown" deps._check_rate_limit(ip, deps._login_attempts, deps._LOGIN_RATE_LIMIT, "Too many login attempts. Try again later.") user = authenticate(deps._get_db(), body.handle.strip().lower(), body.password) if not user: raise HTTPException(401, "Invalid credentials") token = deps._issue_jwt(user) return JSONResponse({ "ok": True, "token": token, "handle": user.handle, "display_name": user.display_name, "is_admin": user.is_admin, "wiki_access": user.wiki_access, "activity_access": user.activity_access, }) @router.post("/api/auth/logout", response_model=GenericResponse) async def logout(bincio_session: str | None = Cookie(default=None)) -> JSONResponse: resp = JSONResponse({"ok": True}) deps._clear_session_cookie(resp) return resp @router.get("/api/me") async def me(bincio_session: str | None = Cookie(default=None)) -> JSONResponse: user = deps._require_user(bincio_session) return JSONResponse({ "handle": user.handle, "display_name": user.display_name, "is_admin": user.is_admin, "wiki_access": user.wiki_access, "activity_access": user.activity_access, }) @router.post("/api/auth/reset-password", response_model=GenericResponse) async def reset_password(body: ResetPasswordRequest) -> JSONResponse: handle = body.handle.strip().lower() code = body.code.strip().upper() if len(body.password) < 8: raise HTTPException(400, "Password must be at least 8 characters") db = deps._get_db() if not use_reset_code(db, code, handle): raise HTTPException(400, "Invalid or expired reset code") change_password(db, handle, body.password) return JSONResponse({"ok": True}) # ── Registration ────────────────────────────────────────────────────────────── @router.post("/api/register", response_model=RegisterResponse) async def register(body: RegisterRequest, request: Request) -> JSONResponse: ip = request.client.host if request.client else "unknown" deps._check_rate_limit(ip, deps._register_attempts, deps._REGISTER_RATE_LIMIT, "Too many registration attempts. Try again later.") handle = body.handle.strip().lower() code = body.code.strip().upper() display = body.display_name.strip() or handle if not deps._VALID_HANDLE.match(handle): raise HTTPException(400, "Invalid handle (lowercase letters, numbers, _ - only; max 30 chars)") if len(body.password) < 8: raise HTTPException(400, "Password must be at least 8 characters") db = deps._get_db() invite = get_invite(db, code) if not invite or invite.used: raise HTTPException(400, "Invalid or already-used invite code") if get_user(db, handle): raise HTTPException(409, "Handle already taken") max_wiki_val = get_setting(db, "max_wiki_users") or get_setting(db, "max_users") if max_wiki_val is not None: limit = int(max_wiki_val) if limit > 0 and count_wiki_users(db) >= limit: raise HTTPException(403, f"This instance has reached its user limit ({limit})") if invite.grants_activity: max_act_val = get_setting(db, "max_activity_users") if max_act_val is not None: limit = int(max_act_val) if limit > 0 and count_activity_users(db) >= limit: raise HTTPException(403, f"This instance has reached its activity user limit ({limit})") user = create_user(db, handle, display, body.password, wiki_access=True, activity_access=invite.grants_activity) use_invite(db, code, handle) token = deps._issue_jwt(user) resp = JSONResponse({"ok": True, "handle": handle}) deps._set_session_cookie(resp, token) return resp