262 lines
9.7 KiB
Python
262 lines
9.7 KiB
Python
"""OIDC / OAuth2 endpoints for bincio-auth acting as an Identity Provider.
|
|
|
|
Implements the authorization code flow (+ PKCE for public clients).
|
|
Endpoints are at standard paths:
|
|
GET /.well-known/openid-configuration
|
|
GET /.well-known/jwks.json
|
|
GET /oauth2/authorize
|
|
POST /oauth2/token
|
|
GET /oauth2/userinfo
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import time
|
|
from urllib.parse import urlencode
|
|
|
|
from fastapi import APIRouter, Cookie, HTTPException, Request
|
|
from fastapi.responses import JSONResponse, RedirectResponse
|
|
|
|
from bincio.auth import deps
|
|
from bincio.auth.db import (
|
|
create_oauth2_code,
|
|
get_oauth2_client,
|
|
get_oauth2_code,
|
|
get_user,
|
|
use_oauth2_code,
|
|
)
|
|
from bincio.auth.tokens import decode_id_token, get_jwks
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# ── Discovery ─────────────────────────────────────────────────────────────────
|
|
|
|
@router.get("/.well-known/openid-configuration")
|
|
async def oidc_discovery() -> JSONResponse:
|
|
if not deps.oidc_issuer:
|
|
raise HTTPException(503, "OIDC not configured")
|
|
iss = deps.oidc_issuer.rstrip("/")
|
|
return JSONResponse({
|
|
"issuer": iss,
|
|
"authorization_endpoint": f"{iss}/oauth2/authorize",
|
|
"token_endpoint": f"{iss}/oauth2/token",
|
|
"userinfo_endpoint": f"{iss}/oauth2/userinfo",
|
|
"jwks_uri": f"{iss}/.well-known/jwks.json",
|
|
"response_types_supported": ["code"],
|
|
"subject_types_supported": ["public"],
|
|
"id_token_signing_alg_values_supported": ["RS256"],
|
|
"scopes_supported": ["openid", "profile"],
|
|
"token_endpoint_auth_methods_supported": ["client_secret_post", "none"],
|
|
"code_challenge_methods_supported": ["S256"],
|
|
"claims_supported": [
|
|
"iss", "sub", "aud", "exp", "iat", "nonce",
|
|
"name", "preferred_username",
|
|
],
|
|
})
|
|
|
|
|
|
@router.get("/.well-known/jwks.json")
|
|
async def jwks() -> JSONResponse:
|
|
if not deps.oidc_private_key_pem:
|
|
raise HTTPException(503, "OIDC not configured")
|
|
return JSONResponse(get_jwks(deps.oidc_private_key_pem))
|
|
|
|
|
|
# ── Authorization endpoint ────────────────────────────────────────────────────
|
|
|
|
@router.get("/oauth2/authorize")
|
|
async def authorize(
|
|
request: Request,
|
|
bincio_session: str | None = Cookie(default=None),
|
|
) -> RedirectResponse:
|
|
if not deps.oidc_issuer:
|
|
raise HTTPException(503, "OIDC not configured")
|
|
|
|
p = request.query_params
|
|
client_id = p.get("client_id", "")
|
|
redirect_uri = p.get("redirect_uri", "")
|
|
response_type = p.get("response_type", "")
|
|
scope = p.get("scope", "openid")
|
|
state = p.get("state", "")
|
|
nonce = p.get("nonce") or None
|
|
code_challenge = p.get("code_challenge") or None
|
|
code_challenge_method = p.get("code_challenge_method") or None
|
|
|
|
def _err(error: str, description: str) -> RedirectResponse:
|
|
qs = urlencode({"error": error, "error_description": description, "state": state})
|
|
return RedirectResponse(f"{redirect_uri}?{qs}", status_code=302)
|
|
|
|
# Validate client and redirect_uri before anything else
|
|
if not client_id or not redirect_uri:
|
|
raise HTTPException(400, "client_id and redirect_uri are required")
|
|
|
|
client = get_oauth2_client(deps._get_db(), client_id)
|
|
if not client:
|
|
raise HTTPException(400, "Unknown client_id")
|
|
if redirect_uri not in client["redirect_uris"]:
|
|
raise HTTPException(400, "redirect_uri not registered for this client")
|
|
|
|
if response_type != "code":
|
|
return _err("unsupported_response_type", "Only response_type=code is supported")
|
|
|
|
# Check session — redirect to login if not authenticated
|
|
user = deps._current_user(bincio_session)
|
|
if not user:
|
|
login_url = f"{deps.oidc_issuer.rstrip('/')}/login/"
|
|
next_url = str(request.url)
|
|
return RedirectResponse(
|
|
f"{login_url}?next={_pct_encode(next_url)}",
|
|
status_code=302,
|
|
)
|
|
|
|
# Per-client access control
|
|
if client_id == "gitea" and not user.activity_access:
|
|
return _err("access_denied", "Gitea access is restricted to activity users")
|
|
|
|
# Issue authorization code
|
|
code = create_oauth2_code(
|
|
deps._get_db(),
|
|
client_id=client_id,
|
|
handle=user.handle,
|
|
redirect_uri=redirect_uri,
|
|
scope=scope,
|
|
nonce=nonce,
|
|
code_challenge=code_challenge,
|
|
code_challenge_method=code_challenge_method,
|
|
)
|
|
|
|
qs_parts: dict[str, str] = {"code": code}
|
|
if state:
|
|
qs_parts["state"] = state
|
|
return RedirectResponse(f"{redirect_uri}?{urlencode(qs_parts)}", status_code=302)
|
|
|
|
|
|
# ── Token endpoint ────────────────────────────────────────────────────────────
|
|
|
|
@router.post("/oauth2/token")
|
|
async def token(request: Request) -> JSONResponse:
|
|
if not deps.oidc_issuer:
|
|
raise HTTPException(503, "OIDC not configured")
|
|
|
|
form = await request.form()
|
|
grant_type = str(form.get("grant_type", ""))
|
|
code = str(form.get("code", ""))
|
|
redirect_uri = str(form.get("redirect_uri", ""))
|
|
client_id = str(form.get("client_id", ""))
|
|
client_secret = str(form.get("client_secret", ""))
|
|
code_verifier = str(form.get("code_verifier", ""))
|
|
|
|
if grant_type != "authorization_code":
|
|
raise HTTPException(400, "unsupported_grant_type")
|
|
if not code or not redirect_uri or not client_id:
|
|
raise HTTPException(400, "code, redirect_uri, client_id are required")
|
|
|
|
client = get_oauth2_client(deps._get_db(), client_id)
|
|
if not client:
|
|
raise HTTPException(401, "invalid_client")
|
|
|
|
# Authenticate client: secret (confidential) or PKCE verifier (public)
|
|
if client["client_secret"] is not None:
|
|
if client_secret != client["client_secret"]:
|
|
raise HTTPException(401, "invalid_client")
|
|
# Public clients (no secret) rely on PKCE — verified below against the code record
|
|
|
|
rec = get_oauth2_code(deps._get_db(), code)
|
|
if not rec:
|
|
raise HTTPException(400, "invalid_grant")
|
|
if rec["used_at"] is not None:
|
|
raise HTTPException(400, "invalid_grant")
|
|
if rec["expires_at"] < int(time.time()):
|
|
raise HTTPException(400, "invalid_grant")
|
|
if rec["client_id"] != client_id:
|
|
raise HTTPException(400, "invalid_grant")
|
|
if rec["redirect_uri"] != redirect_uri:
|
|
raise HTTPException(400, "invalid_grant")
|
|
|
|
# PKCE verification for public clients
|
|
if rec["code_challenge"]:
|
|
if not code_verifier:
|
|
raise HTTPException(400, "code_verifier required")
|
|
if not _verify_pkce(code_verifier, rec["code_challenge"], rec["code_challenge_method"] or "S256"):
|
|
raise HTTPException(400, "invalid_grant")
|
|
|
|
use_oauth2_code(deps._get_db(), code)
|
|
|
|
user = get_user(deps._get_db(), rec["handle"])
|
|
if not user or user.suspended:
|
|
raise HTTPException(400, "invalid_grant")
|
|
|
|
id_token = deps._issue_id_token(user, client_id, nonce=rec["nonce"])
|
|
|
|
return JSONResponse({
|
|
"access_token": id_token, # access_token = id_token (validated by userinfo)
|
|
"token_type": "Bearer",
|
|
"expires_in": deps._ID_TOKEN_TTL,
|
|
"id_token": id_token,
|
|
})
|
|
|
|
|
|
# ── Userinfo endpoint ─────────────────────────────────────────────────────────
|
|
|
|
@router.get("/oauth2/userinfo")
|
|
async def userinfo(request: Request) -> JSONResponse:
|
|
if not deps.oidc_issuer:
|
|
raise HTTPException(503, "OIDC not configured")
|
|
|
|
auth = request.headers.get("Authorization", "")
|
|
if not auth.startswith("Bearer "):
|
|
raise HTTPException(401, "Bearer token required")
|
|
token_str = auth[7:]
|
|
|
|
# We need the client_id (audience) to validate — try all registered clients
|
|
# by decoding without aud check first, then validate aud matches a real client
|
|
try:
|
|
import jwt as _jwt
|
|
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
|
priv = load_pem_private_key(deps.oidc_private_key_pem.encode(), password=None)
|
|
payload = _jwt.decode(
|
|
token_str,
|
|
priv.public_key(),
|
|
algorithms=["RS256"],
|
|
options={"verify_aud": False},
|
|
)
|
|
except Exception:
|
|
raise HTTPException(401, "Invalid token")
|
|
|
|
handle = payload.get("sub")
|
|
if not handle:
|
|
raise HTTPException(401, "Invalid token")
|
|
|
|
user = get_user(deps._get_db(), handle)
|
|
if not user or user.suspended:
|
|
raise HTTPException(401, "User not found or suspended")
|
|
|
|
return JSONResponse({
|
|
"sub": user.handle,
|
|
"name": user.display_name,
|
|
"preferred_username": user.handle,
|
|
"is_admin": user.is_admin,
|
|
"wiki_access": user.wiki_access,
|
|
"activity_access": user.activity_access,
|
|
})
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _pct_encode(url: str) -> str:
|
|
from urllib.parse import quote
|
|
return quote(url, safe="")
|
|
|
|
|
|
def _verify_pkce(verifier: str, challenge: str, method: str) -> bool:
|
|
if method == "S256":
|
|
digest = hashlib.sha256(verifier.encode()).digest()
|
|
computed = base64.urlsafe_b64encode(digest).rstrip(b"=").decode()
|
|
return computed == challenge
|
|
if method == "plain":
|
|
return verifier == challenge
|
|
return False
|