"""Authentication and registration endpoints.""" from __future__ import annotations import time from fastapi import APIRouter, Cookie, HTTPException, Request from fastapi.responses import JSONResponse from bincio.auth import deps from bincio.auth import smtp as _smtp from bincio.auth.db import ( authenticate, change_password, count_activity_users, count_wiki_users, create_email_reset_token, create_user, get_email_reset_token, get_invite, get_setting, get_user, get_user_by_email, get_user_email, set_user_email, use_email_reset_token, use_invite, use_reset_code, ) from bincio.auth.models import ( GenericResponse, LoginRequest, LoginResponse, RegisterRequest, RegisterResponse, RequestResetRequest, ResetPasswordRequest, ResetPasswordTokenRequest, SetEmailRequest, ) router = APIRouter() @router.post("/api/auth/login", response_model=LoginResponse) async def login(body: LoginRequest, request: Request) -> JSONResponse: ip = request.client.host if request.client else "unknown" deps._check_rate_limit(ip, deps._login_attempts, deps._LOGIN_RATE_LIMIT, "Too many login attempts. Try again later.") user = authenticate(deps._get_db(), body.handle.strip().lower(), body.password) if not user: raise HTTPException(401, "Invalid credentials") if deps.oidc_private_key_pem: token = deps._issue_id_token(user, client_id="bincio", ttl=deps._JWT_TTL) else: token = deps._issue_jwt(user) resp = JSONResponse({ "ok": True, "handle": user.handle, "display_name": user.display_name, "is_admin": user.is_admin, "wiki_access": user.wiki_access, "activity_access": user.activity_access, }) deps._set_session_cookie(resp, token) return resp @router.post("/api/auth/token") async def get_token(body: LoginRequest, request: Request) -> JSONResponse: """Mobile auth: same as login but returns the JWT in the body.""" ip = request.client.host if request.client else "unknown" deps._check_rate_limit(ip, deps._login_attempts, deps._LOGIN_RATE_LIMIT, "Too many login attempts. Try again later.") user = authenticate(deps._get_db(), body.handle.strip().lower(), body.password) if not user: raise HTTPException(401, "Invalid credentials") token = deps._issue_jwt(user) return JSONResponse({ "ok": True, "token": token, "handle": user.handle, "display_name": user.display_name, "is_admin": user.is_admin, "wiki_access": user.wiki_access, "activity_access": user.activity_access, }) @router.post("/api/auth/logout", response_model=GenericResponse) async def logout(bincio_session: str | None = Cookie(default=None)) -> JSONResponse: resp = JSONResponse({"ok": True}) deps._clear_session_cookie(resp) return resp @router.get("/api/me") async def me(bincio_session: str | None = Cookie(default=None)) -> JSONResponse: user = deps._require_user(bincio_session) return JSONResponse({ "handle": user.handle, "display_name": user.display_name, "is_admin": user.is_admin, "wiki_access": user.wiki_access, "activity_access": user.activity_access, }) @router.post("/api/auth/reset-password", response_model=GenericResponse) async def reset_password(body: ResetPasswordRequest) -> JSONResponse: handle = body.handle.strip().lower() code = body.code.strip().upper() if len(body.password) < 8: raise HTTPException(400, "Password must be at least 8 characters") db = deps._get_db() if not use_reset_code(db, code, handle): raise HTTPException(400, "Invalid or expired reset code") change_password(db, handle, body.password) return JSONResponse({"ok": True}) # ── Self-service password reset (email) ────────────────────────────────────── @router.post("/api/auth/request-reset", response_model=GenericResponse) async def request_reset(body: RequestResetRequest, request: Request) -> JSONResponse: ip = request.client.host if request.client else "unknown" deps._check_rate_limit(ip, deps._login_attempts, deps._LOGIN_RATE_LIMIT, "Too many attempts. Try again later.") email = body.email.strip().lower() db = deps._get_db() user = get_user_by_email(db, email) if user and not user.suspended and _smtp.configured(): token = create_email_reset_token(db, user.handle) issuer = deps.oidc_issuer.rstrip("/") if deps.oidc_issuer else "" reset_url = f"{issuer}/reset-password/?token={token}" try: _smtp.send_reset_email(email, user.handle, reset_url) except Exception: pass # never reveal SMTP errors to the caller # Always return 200 so the response doesn't leak whether an email is registered return JSONResponse({"ok": True}) @router.post("/api/auth/reset-password-token", response_model=GenericResponse) async def reset_password_via_token(body: ResetPasswordTokenRequest) -> JSONResponse: if len(body.password) < 8: raise HTTPException(400, "Password must be at least 8 characters") db = deps._get_db() rec = get_email_reset_token(db, body.token) if not rec or rec["used_at"] is not None or rec["expires_at"] < int(time.time()): raise HTTPException(400, "Invalid or expired reset link") use_email_reset_token(db, body.token) change_password(db, rec["handle"], body.password) return JSONResponse({"ok": True}) @router.post("/api/me/email", response_model=GenericResponse) async def set_email( body: SetEmailRequest, bincio_session: str | None = Cookie(default=None), ) -> JSONResponse: user = deps._require_user(bincio_session) email = body.email.strip().lower() or None set_user_email(deps._get_db(), user.handle, email) return JSONResponse({"ok": True}) @router.get("/api/me/email") async def get_email(bincio_session: str | None = Cookie(default=None)) -> JSONResponse: user = deps._require_user(bincio_session) email = get_user_email(deps._get_db(), user.handle) return JSONResponse({"email": email}) # ── Registration ────────────────────────────────────────────────────────────── @router.post("/api/register", response_model=RegisterResponse) async def register(body: RegisterRequest, request: Request) -> JSONResponse: ip = request.client.host if request.client else "unknown" deps._check_rate_limit(ip, deps._register_attempts, deps._REGISTER_RATE_LIMIT, "Too many registration attempts. Try again later.") handle = body.handle.strip().lower() code = body.code.strip().upper() display = body.display_name.strip() or handle if not deps._VALID_HANDLE.match(handle): raise HTTPException(400, "Invalid handle (lowercase letters, numbers, _ - only; max 30 chars)") if len(body.password) < 8: raise HTTPException(400, "Password must be at least 8 characters") db = deps._get_db() invite = get_invite(db, code) if not invite or invite.used: raise HTTPException(400, "Invalid or already-used invite code") if get_user(db, handle): raise HTTPException(409, "Handle already taken") max_wiki_val = get_setting(db, "max_wiki_users") or get_setting(db, "max_users") if max_wiki_val is not None: limit = int(max_wiki_val) if limit > 0 and count_wiki_users(db) >= limit: raise HTTPException(403, f"This instance has reached its user limit ({limit})") if invite.grants_activity: max_act_val = get_setting(db, "max_activity_users") if max_act_val is not None: limit = int(max_act_val) if limit > 0 and count_activity_users(db) >= limit: raise HTTPException(403, f"This instance has reached its activity user limit ({limit})") user = create_user(db, handle, display, body.password, wiki_access=True, activity_access=invite.grants_activity) use_invite(db, code, handle) token = deps._issue_jwt(user) resp = JSONResponse({"ok": True, "handle": handle}) deps._set_session_cookie(resp, token) return resp