c1c1e7ae4e
Login endpoint switches from HS256 JWT to RS256 id_token (aud="bincio", 30-day TTL) when oidc_private_key_pem is set. Existing HS256 sessions remain valid on bincio-activity until they naturally expire.
159 lines
5.7 KiB
Python
159 lines
5.7 KiB
Python
"""Authentication and registration endpoints."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from fastapi import APIRouter, Cookie, HTTPException, Request
|
|
from fastapi.responses import JSONResponse
|
|
|
|
from bincio.auth import deps
|
|
from bincio.auth.db import (
|
|
authenticate,
|
|
change_password,
|
|
count_activity_users,
|
|
count_wiki_users,
|
|
create_user,
|
|
get_invite,
|
|
get_setting,
|
|
get_user,
|
|
use_invite,
|
|
use_reset_code,
|
|
)
|
|
from bincio.auth.models import (
|
|
GenericResponse,
|
|
LoginRequest,
|
|
LoginResponse,
|
|
RegisterRequest,
|
|
RegisterResponse,
|
|
ResetPasswordRequest,
|
|
)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("/api/auth/login", response_model=LoginResponse)
|
|
async def login(body: LoginRequest, request: Request) -> JSONResponse:
|
|
ip = request.client.host if request.client else "unknown"
|
|
deps._check_rate_limit(ip, deps._login_attempts, deps._LOGIN_RATE_LIMIT,
|
|
"Too many login attempts. Try again later.")
|
|
|
|
user = authenticate(deps._get_db(), body.handle.strip().lower(), body.password)
|
|
if not user:
|
|
raise HTTPException(401, "Invalid credentials")
|
|
|
|
if deps.oidc_private_key_pem:
|
|
token = deps._issue_id_token(user, client_id="bincio", ttl=deps._JWT_TTL)
|
|
else:
|
|
token = deps._issue_jwt(user)
|
|
resp = JSONResponse({
|
|
"ok": True,
|
|
"handle": user.handle,
|
|
"display_name": user.display_name,
|
|
"is_admin": user.is_admin,
|
|
"wiki_access": user.wiki_access,
|
|
"activity_access": user.activity_access,
|
|
})
|
|
deps._set_session_cookie(resp, token)
|
|
return resp
|
|
|
|
|
|
@router.post("/api/auth/token")
|
|
async def get_token(body: LoginRequest, request: Request) -> JSONResponse:
|
|
"""Mobile auth: same as login but returns the JWT in the body."""
|
|
ip = request.client.host if request.client else "unknown"
|
|
deps._check_rate_limit(ip, deps._login_attempts, deps._LOGIN_RATE_LIMIT,
|
|
"Too many login attempts. Try again later.")
|
|
|
|
user = authenticate(deps._get_db(), body.handle.strip().lower(), body.password)
|
|
if not user:
|
|
raise HTTPException(401, "Invalid credentials")
|
|
|
|
token = deps._issue_jwt(user)
|
|
return JSONResponse({
|
|
"ok": True,
|
|
"token": token,
|
|
"handle": user.handle,
|
|
"display_name": user.display_name,
|
|
"is_admin": user.is_admin,
|
|
"wiki_access": user.wiki_access,
|
|
"activity_access": user.activity_access,
|
|
})
|
|
|
|
|
|
@router.post("/api/auth/logout", response_model=GenericResponse)
|
|
async def logout(bincio_session: str | None = Cookie(default=None)) -> JSONResponse:
|
|
resp = JSONResponse({"ok": True})
|
|
deps._clear_session_cookie(resp)
|
|
return resp
|
|
|
|
|
|
@router.get("/api/me")
|
|
async def me(bincio_session: str | None = Cookie(default=None)) -> JSONResponse:
|
|
user = deps._require_user(bincio_session)
|
|
return JSONResponse({
|
|
"handle": user.handle,
|
|
"display_name": user.display_name,
|
|
"is_admin": user.is_admin,
|
|
"wiki_access": user.wiki_access,
|
|
"activity_access": user.activity_access,
|
|
})
|
|
|
|
|
|
@router.post("/api/auth/reset-password", response_model=GenericResponse)
|
|
async def reset_password(body: ResetPasswordRequest) -> JSONResponse:
|
|
handle = body.handle.strip().lower()
|
|
code = body.code.strip().upper()
|
|
if len(body.password) < 8:
|
|
raise HTTPException(400, "Password must be at least 8 characters")
|
|
db = deps._get_db()
|
|
if not use_reset_code(db, code, handle):
|
|
raise HTTPException(400, "Invalid or expired reset code")
|
|
change_password(db, handle, body.password)
|
|
return JSONResponse({"ok": True})
|
|
|
|
|
|
# ── Registration ──────────────────────────────────────────────────────────────
|
|
|
|
@router.post("/api/register", response_model=RegisterResponse)
|
|
async def register(body: RegisterRequest, request: Request) -> JSONResponse:
|
|
ip = request.client.host if request.client else "unknown"
|
|
deps._check_rate_limit(ip, deps._register_attempts, deps._REGISTER_RATE_LIMIT,
|
|
"Too many registration attempts. Try again later.")
|
|
|
|
handle = body.handle.strip().lower()
|
|
code = body.code.strip().upper()
|
|
display = body.display_name.strip() or handle
|
|
|
|
if not deps._VALID_HANDLE.match(handle):
|
|
raise HTTPException(400, "Invalid handle (lowercase letters, numbers, _ - only; max 30 chars)")
|
|
if len(body.password) < 8:
|
|
raise HTTPException(400, "Password must be at least 8 characters")
|
|
|
|
db = deps._get_db()
|
|
invite = get_invite(db, code)
|
|
if not invite or invite.used:
|
|
raise HTTPException(400, "Invalid or already-used invite code")
|
|
if get_user(db, handle):
|
|
raise HTTPException(409, "Handle already taken")
|
|
|
|
max_wiki_val = get_setting(db, "max_wiki_users") or get_setting(db, "max_users")
|
|
if max_wiki_val is not None:
|
|
limit = int(max_wiki_val)
|
|
if limit > 0 and count_wiki_users(db) >= limit:
|
|
raise HTTPException(403, f"This instance has reached its user limit ({limit})")
|
|
|
|
if invite.grants_activity:
|
|
max_act_val = get_setting(db, "max_activity_users")
|
|
if max_act_val is not None:
|
|
limit = int(max_act_val)
|
|
if limit > 0 and count_activity_users(db) >= limit:
|
|
raise HTTPException(403, f"This instance has reached its activity user limit ({limit})")
|
|
|
|
user = create_user(db, handle, display, body.password,
|
|
wiki_access=True, activity_access=invite.grants_activity)
|
|
use_invite(db, code, handle)
|
|
|
|
token = deps._issue_jwt(user)
|
|
resp = JSONResponse({"ok": True, "handle": handle})
|
|
deps._set_session_cookie(resp, token)
|
|
return resp
|