Files
bincio-activity/bincio/serve/deps.py
T
Davide Scaini 0e5044eb06 fix: close all bincio-auth migration holes
Pages (register, reset-password, invites) now redirect to bincio.org
like login already did. Admin user-state ops (reset-password-code,
suspend, unsuspend, delete account) are proxied to bincio-auth via
httpx so they write to the correct DB. Adds BINCIO_AUTH_API env var.
2026-06-03 09:36:20 +02:00

191 lines
6.4 KiB
Python

"""Shared state and FastAPI dependency functions for bincio.serve.
All module-level globals live here so routers can import them without
creating circular dependencies through server.py.
The CLI sets these before uvicorn starts.
"""
from __future__ import annotations
import json
import os
import re
import threading
import time
from pathlib import Path
import jwt as _jwt
from fastapi import Cookie, HTTPException, Request, Response
from bincio.edit.ops import VALID_ACTIVITY_ID as _VALID_ACTIVITY_ID
from bincio.serve.db import (
User,
get_session,
open_db,
)
from bincio.shared.images import ALLOWED_IMAGE_TYPES as _ALLOWED_IMAGE_TYPES # noqa: F401
from bincio.shared.images import MAX_IMAGE_BYTES as _MAX_IMAGE_BYTES # noqa: F401
from bincio.shared.images import unique_image_name as _unique_image_name # noqa: F401
# ── Module-level state (set by CLI before uvicorn starts) ─────────────────────
data_dir: Path | None = None
site_dir: Path | None = None
webroot: Path | None = None
strava_client_id: str = ""
strava_client_secret: str = ""
public_url: str = ""
dem_url: str = "https://api.open-elevation.com"
sync_secret: str = ""
jwt_secret: str = "" # when set, validates JWTs from bincio-auth instead of DB session lookup
auth_api: str = "" # when set, proxies user-state admin ops to bincio-auth (e.g. http://127.0.0.1:4040)
_db = None
_strava_sync_running = False
_strava_sync_lock = threading.Lock()
_garmin_sync_running = False
_garmin_sync_lock = threading.Lock()
# ── Constants ─────────────────────────────────────────────────────────────────
_VALID_HANDLE = re.compile(r'^[a-z0-9][a-z0-9_-]{0,29}$')
_SESSION_COOKIE = "bincio_session"
_COOKIE_MAX_AGE = 30 * 86400 # 30 days
_SESSION_DOMAIN = os.environ.get("SESSION_DOMAIN") or None
_STRAVA_CREDS_FILE = "strava_credentials.json"
_login_attempts: dict[str, list[float]] = {}
_register_attempts: dict[str, list[float]] = {}
_RATE_WINDOW = 900 # 15 minutes
_LOGIN_RATE_LIMIT = 10
_REGISTER_RATE_LIMIT = 5
# ── Core helpers ──────────────────────────────────────────────────────────────
def _get_data_dir() -> Path:
if data_dir is None:
raise HTTPException(500, "Server not configured")
return data_dir
def _get_db():
global _db
if _db is None:
_db = open_db(_get_data_dir())
return _db
def _strava_creds(handle: str) -> tuple[str, str]:
"""Return (client_id, client_secret) for a user.
Per-user credentials take precedence over the instance-level globals.
Returns ("", "") when neither is configured.
"""
creds_path = _get_data_dir() / handle / _STRAVA_CREDS_FILE
if creds_path.exists():
try:
d = json.loads(creds_path.read_text(encoding="utf-8"))
cid = str(d.get("client_id", "")).strip()
csec = str(d.get("client_secret", "")).strip()
if cid and csec:
return cid, csec
except (OSError, json.JSONDecodeError, KeyError, ValueError):
pass
return strava_client_id, strava_client_secret
def _check_id(activity_id: str) -> str:
if not _VALID_ACTIVITY_ID.match(activity_id):
raise HTTPException(400, "Invalid activity ID")
return activity_id
# ── Rate limiting ─────────────────────────────────────────────────────────────
def _check_rate_limit(
ip: str,
store: dict[str, list[float]],
limit: int,
msg: str = "Too many attempts. Try again later.",
) -> None:
now = time.time()
attempts = [t for t in store.get(ip, []) if now - t < _RATE_WINDOW]
store[ip] = attempts
if len(attempts) >= limit:
raise HTTPException(429, msg)
attempts.append(now)
store[ip] = attempts
# ── Auth dependency functions ─────────────────────────────────────────────────
def _decode_jwt(token: str) -> User | None:
"""Decode a bincio-auth JWT and return a User. Returns None on any failure."""
try:
payload = _jwt.decode(token, jwt_secret, algorithms=["HS256"])
except _jwt.PyJWTError:
return None
handle = payload.get("sub")
if not handle:
return None
return User(
handle=handle,
display_name=payload.get("display_name", ""),
is_admin=bool(payload.get("is_admin", False)),
wiki_access=bool(payload.get("wiki_access", True)),
activity_access=bool(payload.get("activity_access", False)),
suspended=False,
created_at=0,
)
def _current_user(bincio_session: str | None = Cookie(default=None)) -> User | None:
if not bincio_session:
return None
if jwt_secret:
return _decode_jwt(bincio_session)
return get_session(_get_db(), bincio_session)
def _require_user(bincio_session: str | None = Cookie(default=None)) -> User:
user = _current_user(bincio_session)
if not user:
raise HTTPException(401, "Not authenticated")
return user
def _require_admin(bincio_session: str | None = Cookie(default=None)) -> User:
user = _require_user(bincio_session)
if not user.is_admin:
raise HTTPException(403, "Admin required")
return user
def _require_auth(
request: Request,
bincio_session: str | None = Cookie(default=None),
) -> User:
"""Accept session cookie (web) OR Authorization: Bearer token (mobile)."""
token = bincio_session
if not token:
auth = request.headers.get("Authorization", "")
if auth.startswith("Bearer "):
token = auth[7:]
if not token:
raise HTTPException(401, "Not authenticated")
user = _decode_jwt(token) if jwt_secret else get_session(_get_db(), token)
if not user:
raise HTTPException(401, "Invalid or expired session")
return user
def _set_session_cookie(response: Response, token: str) -> None:
kwargs: dict = dict(
key=_SESSION_COOKIE,
value=token,
max_age=_COOKIE_MAX_AGE,
httponly=True,
samesite="lax",
secure=False,
)
if _SESSION_DOMAIN:
kwargs["domain"] = _SESSION_DOMAIN
response.set_cookie(**kwargs)