"""Authentication and registration endpoints.""" from __future__ import annotations from fastapi import APIRouter, Cookie, HTTPException, Request from fastapi.responses import JSONResponse from bincio.serve import deps, tasks from bincio.serve.models import ( CreateInviteRequest, GenericResponse, LoginRequest, LoginResponse, RegisterRequest, RegisterResponse, ResetPasswordRequest, ) from bincio.serve.db import ( authenticate, count_activity_users, count_wiki_users, create_invite, create_session, create_user, delete_session, get_invite, get_setting, get_user, list_invites, use_invite, ) router = APIRouter() @router.post("/api/auth/login", response_model=LoginResponse) async def login( login_req: LoginRequest, request: Request, ) -> JSONResponse: ip = request.client.host if request.client else "unknown" deps._check_rate_limit(ip, deps._login_attempts, deps._LOGIN_RATE_LIMIT, "Too many login attempts. Try again later.") handle = login_req.handle.strip().lower() password = login_req.password user = authenticate(deps._get_db(), handle, password) if not user: raise HTTPException(401, "Invalid credentials") token = create_session(deps._get_db(), handle) resp = JSONResponse({ "ok": True, "handle": user.handle, "display_name": user.display_name, "wiki_access": user.wiki_access, "activity_access": user.activity_access, }) deps._set_session_cookie(resp, token) return resp @router.post("/api/auth/logout", response_model=GenericResponse) async def logout(bincio_session: str | None = Cookie(default=None)) -> JSONResponse: if bincio_session: delete_session(deps._get_db(), bincio_session) resp = JSONResponse({"ok": True}) kwargs: dict = dict(key=deps._SESSION_COOKIE) if deps._SESSION_DOMAIN: kwargs["domain"] = deps._SESSION_DOMAIN resp.delete_cookie(**kwargs) return resp @router.post("/api/auth/token") async def get_token(login_req: LoginRequest, request: Request) -> JSONResponse: """Mobile auth: same as /api/auth/login but returns the token in the body instead of a cookie.""" ip = request.client.host if request.client else "unknown" deps._check_rate_limit(ip, deps._login_attempts, deps._LOGIN_RATE_LIMIT, "Too many login attempts. Try again later.") handle = login_req.handle.strip().lower() user = authenticate(deps._get_db(), handle, login_req.password) if not user: raise HTTPException(401, "Invalid credentials") token = create_session(deps._get_db(), handle) return JSONResponse({ "ok": True, "token": token, "handle": user.handle, "display_name": user.display_name, }) @router.post("/api/auth/reset-password", response_model=GenericResponse) async def reset_password(reset_req: ResetPasswordRequest) -> JSONResponse: """Validate a reset code and set a new password. Public endpoint.""" from bincio.serve.db import use_reset_code, change_password handle = reset_req.handle.strip().lower() code = reset_req.code.strip().upper() new_pw = reset_req.password if len(new_pw) < 8: raise HTTPException(400, "Password must be at least 8 characters") db = deps._get_db() if not use_reset_code(db, code, handle): raise HTTPException(400, "Invalid or expired reset code") change_password(db, handle, new_pw) return JSONResponse({"ok": True}) # ── Registration ────────────────────────────────────────────────────────────── @router.post("/api/register", response_model=RegisterResponse) async def register( register_req: RegisterRequest, request: Request, ) -> JSONResponse: ip = request.client.host if request.client else "unknown" deps._check_rate_limit(ip, deps._register_attempts, deps._REGISTER_RATE_LIMIT, "Too many registration attempts. Try again later.") code = register_req.code.strip().upper() handle = register_req.handle.strip().lower() password = register_req.password display = register_req.display_name.strip() or handle if not deps._VALID_HANDLE.match(handle): raise HTTPException(400, "Invalid handle (lowercase letters, numbers, _ - only; max 30 chars)") if len(password) < 8: raise HTTPException(400, "Password must be at least 8 characters") invite = get_invite(deps._get_db(), code) if not invite or invite.used: raise HTTPException(400, "Invalid or already-used invite code") if get_user(deps._get_db(), handle): raise HTTPException(409, "Handle already taken") db = deps._get_db() max_wiki_val = get_setting(db, "max_wiki_users") or get_setting(db, "max_users") if max_wiki_val is not None: limit = int(max_wiki_val) if limit > 0 and count_wiki_users(db) >= limit: raise HTTPException(403, f"This instance has reached its wiki user limit ({limit})") if invite.grants_activity: max_act_val = get_setting(db, "max_activity_users") if max_act_val is not None: limit = int(max_act_val) if limit > 0 and count_activity_users(db) >= limit: raise HTTPException(403, f"This instance has reached its activity user limit ({limit})") create_user(deps._get_db(), handle, display, password, is_admin=False, wiki_access=True, activity_access=invite.grants_activity) use_invite(deps._get_db(), code, handle) # Create per-user directories dd = deps._get_data_dir() user_dir = dd / handle (user_dir / "activities").mkdir(parents=True, exist_ok=True) (user_dir / "edits").mkdir(parents=True, exist_ok=True) # Write an empty index.json so the shard URL resolves immediately, # even before the user uploads any activities. from bincio.extract.writer import write_index index_path = user_dir / "index.json" if not index_path.exists(): write_index([], user_dir, {"handle": handle, "display_name": display or handle}) # Update root manifest so the new user's shard is discoverable immediately from bincio.render.cli import _write_root_manifest _write_root_manifest(dd) # Rebuild site so the new user's profile pages exist immediately tasks._trigger_rebuild(handle) token = create_session(deps._get_db(), handle) resp = JSONResponse({"ok": True, "handle": handle}) deps._set_session_cookie(resp, token) return resp # ── Invites ─────────────────────────────────────────────────────────────────── @router.get("/api/invites") async def get_invites(bincio_session: str | None = Cookie(default=None)) -> JSONResponse: user = deps._require_user(bincio_session) invites = list_invites(deps._get_db(), user.handle) return JSONResponse([{ "code": i.code, "used": i.used, "used_by": i.used_by, "created_at": i.created_at, "used_at": i.used_at, "grants_activity": i.grants_activity, } for i in invites]) @router.post("/api/invites") async def post_invite( body: CreateInviteRequest = CreateInviteRequest(), bincio_session: str | None = Cookie(default=None), ) -> JSONResponse: user = deps._require_user(bincio_session) try: code = create_invite(deps._get_db(), user.handle, grants_activity=body.grants_activity) except ValueError as e: raise HTTPException(400, str(e)) return JSONResponse({"ok": True, "code": code, "grants_activity": body.grants_activity})