Files
Davide Scaini 42bc476882 feat: OIDC Identity Provider — Phase 1 endpoints
Add OIDC/OAuth2 endpoints to bincio-auth so it acts as a full IdP:
  GET  /.well-known/openid-configuration
  GET  /.well-known/jwks.json
  GET  /oauth2/authorize  (auth-code flow, redirects to /login/ if no session)
  POST /oauth2/token      (exchanges code for RS256 id_token; PKCE supported)
  GET  /oauth2/userinfo   (Bearer token → profile claims)

Infrastructure:
  - oauth2_clients + oauth2_codes tables in db.py with CRUD helpers
  - RS256 sign/verify helpers in tokens.py (create_id_token, get_jwks)
  - oidc_private_key_pem / oidc_issuer state + _issue_id_token in deps.py
  - serve_cmd reads BINCIO_OIDC_PRIVATE_KEY_FILE / BINCIO_OIDC_ISSUER env vars
  - `bincio-auth client add/list` commands for managing OAuth2 clients
2026-06-03 15:11:43 +02:00

258 lines
9.5 KiB
Python

"""OIDC / OAuth2 endpoints for bincio-auth acting as an Identity Provider.
Implements the authorization code flow (+ PKCE for public clients).
Endpoints are at standard paths:
GET /.well-known/openid-configuration
GET /.well-known/jwks.json
GET /oauth2/authorize
POST /oauth2/token
GET /oauth2/userinfo
"""
from __future__ import annotations
import base64
import hashlib
import time
from urllib.parse import urlencode
from fastapi import APIRouter, Cookie, HTTPException, Request
from fastapi.responses import JSONResponse, RedirectResponse
from bincio.auth import deps
from bincio.auth.db import (
create_oauth2_code,
get_oauth2_client,
get_oauth2_code,
get_user,
use_oauth2_code,
)
from bincio.auth.tokens import decode_id_token, get_jwks
router = APIRouter()
# ── Discovery ─────────────────────────────────────────────────────────────────
@router.get("/.well-known/openid-configuration")
async def oidc_discovery() -> JSONResponse:
if not deps.oidc_issuer:
raise HTTPException(503, "OIDC not configured")
iss = deps.oidc_issuer.rstrip("/")
return JSONResponse({
"issuer": iss,
"authorization_endpoint": f"{iss}/oauth2/authorize",
"token_endpoint": f"{iss}/oauth2/token",
"userinfo_endpoint": f"{iss}/oauth2/userinfo",
"jwks_uri": f"{iss}/.well-known/jwks.json",
"response_types_supported": ["code"],
"subject_types_supported": ["public"],
"id_token_signing_alg_values_supported": ["RS256"],
"scopes_supported": ["openid", "profile"],
"token_endpoint_auth_methods_supported": ["client_secret_post", "none"],
"code_challenge_methods_supported": ["S256"],
"claims_supported": [
"iss", "sub", "aud", "exp", "iat", "nonce",
"name", "preferred_username",
],
})
@router.get("/.well-known/jwks.json")
async def jwks() -> JSONResponse:
if not deps.oidc_private_key_pem:
raise HTTPException(503, "OIDC not configured")
return JSONResponse(get_jwks(deps.oidc_private_key_pem))
# ── Authorization endpoint ────────────────────────────────────────────────────
@router.get("/oauth2/authorize")
async def authorize(
request: Request,
bincio_session: str | None = Cookie(default=None),
) -> RedirectResponse:
if not deps.oidc_issuer:
raise HTTPException(503, "OIDC not configured")
p = request.query_params
client_id = p.get("client_id", "")
redirect_uri = p.get("redirect_uri", "")
response_type = p.get("response_type", "")
scope = p.get("scope", "openid")
state = p.get("state", "")
nonce = p.get("nonce") or None
code_challenge = p.get("code_challenge") or None
code_challenge_method = p.get("code_challenge_method") or None
def _err(error: str, description: str) -> RedirectResponse:
qs = urlencode({"error": error, "error_description": description, "state": state})
return RedirectResponse(f"{redirect_uri}?{qs}", status_code=302)
# Validate client and redirect_uri before anything else
if not client_id or not redirect_uri:
raise HTTPException(400, "client_id and redirect_uri are required")
client = get_oauth2_client(deps._get_db(), client_id)
if not client:
raise HTTPException(400, "Unknown client_id")
if redirect_uri not in client["redirect_uris"]:
raise HTTPException(400, "redirect_uri not registered for this client")
if response_type != "code":
return _err("unsupported_response_type", "Only response_type=code is supported")
# Check session — redirect to login if not authenticated
user = deps._current_user(bincio_session)
if not user:
login_url = f"{deps.oidc_issuer.rstrip('/')}/login/"
next_url = str(request.url)
return RedirectResponse(
f"{login_url}?next={_pct_encode(next_url)}",
status_code=302,
)
# Issue authorization code
code = create_oauth2_code(
deps._get_db(),
client_id=client_id,
handle=user.handle,
redirect_uri=redirect_uri,
scope=scope,
nonce=nonce,
code_challenge=code_challenge,
code_challenge_method=code_challenge_method,
)
qs_parts: dict[str, str] = {"code": code}
if state:
qs_parts["state"] = state
return RedirectResponse(f"{redirect_uri}?{urlencode(qs_parts)}", status_code=302)
# ── Token endpoint ────────────────────────────────────────────────────────────
@router.post("/oauth2/token")
async def token(request: Request) -> JSONResponse:
if not deps.oidc_issuer:
raise HTTPException(503, "OIDC not configured")
form = await request.form()
grant_type = str(form.get("grant_type", ""))
code = str(form.get("code", ""))
redirect_uri = str(form.get("redirect_uri", ""))
client_id = str(form.get("client_id", ""))
client_secret = str(form.get("client_secret", ""))
code_verifier = str(form.get("code_verifier", ""))
if grant_type != "authorization_code":
raise HTTPException(400, "unsupported_grant_type")
if not code or not redirect_uri or not client_id:
raise HTTPException(400, "code, redirect_uri, client_id are required")
client = get_oauth2_client(deps._get_db(), client_id)
if not client:
raise HTTPException(401, "invalid_client")
# Authenticate client: secret (confidential) or PKCE verifier (public)
if client["client_secret"] is not None:
if client_secret != client["client_secret"]:
raise HTTPException(401, "invalid_client")
# Public clients (no secret) rely on PKCE — verified below against the code record
rec = get_oauth2_code(deps._get_db(), code)
if not rec:
raise HTTPException(400, "invalid_grant")
if rec["used_at"] is not None:
raise HTTPException(400, "invalid_grant")
if rec["expires_at"] < int(time.time()):
raise HTTPException(400, "invalid_grant")
if rec["client_id"] != client_id:
raise HTTPException(400, "invalid_grant")
if rec["redirect_uri"] != redirect_uri:
raise HTTPException(400, "invalid_grant")
# PKCE verification for public clients
if rec["code_challenge"]:
if not code_verifier:
raise HTTPException(400, "code_verifier required")
if not _verify_pkce(code_verifier, rec["code_challenge"], rec["code_challenge_method"] or "S256"):
raise HTTPException(400, "invalid_grant")
use_oauth2_code(deps._get_db(), code)
user = get_user(deps._get_db(), rec["handle"])
if not user or user.suspended:
raise HTTPException(400, "invalid_grant")
id_token = deps._issue_id_token(user, client_id, nonce=rec["nonce"])
return JSONResponse({
"access_token": id_token, # access_token = id_token (validated by userinfo)
"token_type": "Bearer",
"expires_in": deps._ID_TOKEN_TTL,
"id_token": id_token,
})
# ── Userinfo endpoint ─────────────────────────────────────────────────────────
@router.get("/oauth2/userinfo")
async def userinfo(request: Request) -> JSONResponse:
if not deps.oidc_issuer:
raise HTTPException(503, "OIDC not configured")
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
raise HTTPException(401, "Bearer token required")
token_str = auth[7:]
# We need the client_id (audience) to validate — try all registered clients
# by decoding without aud check first, then validate aud matches a real client
try:
import jwt as _jwt
from cryptography.hazmat.primitives.serialization import load_pem_private_key
priv = load_pem_private_key(deps.oidc_private_key_pem.encode(), password=None)
payload = _jwt.decode(
token_str,
priv.public_key(),
algorithms=["RS256"],
options={"verify_aud": False},
)
except Exception:
raise HTTPException(401, "Invalid token")
handle = payload.get("sub")
if not handle:
raise HTTPException(401, "Invalid token")
user = get_user(deps._get_db(), handle)
if not user or user.suspended:
raise HTTPException(401, "User not found or suspended")
return JSONResponse({
"sub": user.handle,
"name": user.display_name,
"preferred_username": user.handle,
"is_admin": user.is_admin,
"wiki_access": user.wiki_access,
"activity_access": user.activity_access,
})
# ── Helpers ───────────────────────────────────────────────────────────────────
def _pct_encode(url: str) -> str:
from urllib.parse import quote
return quote(url, safe="")
def _verify_pkce(verifier: str, challenge: str, method: str) -> bool:
if method == "S256":
digest = hashlib.sha256(verifier.encode()).digest()
computed = base64.urlsafe_b64encode(digest).rstrip(b"=").decode()
return computed == challenge
if method == "plain":
return verifier == challenge
return False