"""Tests for the OIDC / OAuth2 authorization code flow.""" from __future__ import annotations import base64 import hashlib import secrets from urllib.parse import parse_qs, urlparse import jwt import pytest from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa from bincio.auth import deps from bincio.auth.db import upsert_oauth2_client from .conftest import auth_cookies, login # ── Fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture(autouse=True) def oidc_config(tmp_path): """Enable OIDC mode for all tests in this module.""" private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ).decode() deps.oidc_private_key_pem = pem deps.oidc_issuer = "https://auth.example.test" @pytest.fixture() def confidential_client(db): upsert_oauth2_client( db, client_id="test-app", client_secret="test-secret", name="Test App", redirect_uris=["https://app.example.test/callback"], ) return {"client_id": "test-app", "client_secret": "test-secret", "redirect_uri": "https://app.example.test/callback"} @pytest.fixture() def public_client(db): upsert_oauth2_client( db, client_id="mobile-app", client_secret=None, name="Mobile App", redirect_uris=["myapp://callback"], ) return {"client_id": "mobile-app", "redirect_uri": "myapp://callback"} # ── Discovery ───────────────────────────────────────────────────────────────── def test_openid_configuration(client): r = client.get("/.well-known/openid-configuration") assert r.status_code == 200 data = r.json() assert data["issuer"] == "https://auth.example.test" assert "authorization_endpoint" in data assert "token_endpoint" in data assert "jwks_uri" in data def test_jwks(client): r = client.get("/.well-known/jwks.json") assert r.status_code == 200 keys = r.json()["keys"] assert len(keys) == 1 assert keys[0]["kty"] == "RSA" assert keys[0]["alg"] == "RS256" def test_discovery_disabled_without_oidc(client): deps.oidc_issuer = "" r = client.get("/.well-known/openid-configuration") assert r.status_code == 503 # ── Authorization endpoint ──────────────────────────────────────────────────── def test_authorize_redirects_to_login_when_unauthenticated(client, confidential_client): r = client.get( "/oauth2/authorize", params={ "client_id": confidential_client["client_id"], "redirect_uri": confidential_client["redirect_uri"], "response_type": "code", "scope": "openid profile", }, follow_redirects=False, ) assert r.status_code == 302 assert "/login/" in r.headers["location"] def test_authorize_issues_code_when_authenticated(client, admin, confidential_client): cookies = auth_cookies("admin", "adminpass1", client) r = client.get( "/oauth2/authorize", params={ "client_id": confidential_client["client_id"], "redirect_uri": confidential_client["redirect_uri"], "response_type": "code", "scope": "openid profile", "state": "xyz", }, cookies=cookies, follow_redirects=False, ) assert r.status_code == 302 location = r.headers["location"] qs = parse_qs(urlparse(location).query) assert "code" in qs assert qs["state"][0] == "xyz" def test_authorize_unknown_client(client, admin): cookies = auth_cookies("admin", "adminpass1", client) r = client.get( "/oauth2/authorize", params={ "client_id": "unknown", "redirect_uri": "https://somewhere.test/cb", "response_type": "code", }, cookies=cookies, follow_redirects=False, ) assert r.status_code == 400 def test_authorize_unregistered_redirect_uri(client, admin, confidential_client): cookies = auth_cookies("admin", "adminpass1", client) r = client.get( "/oauth2/authorize", params={ "client_id": confidential_client["client_id"], "redirect_uri": "https://evil.example.test/steal", "response_type": "code", }, cookies=cookies, follow_redirects=False, ) assert r.status_code == 400 # ── Full authorization code flow (confidential client) ──────────────────────── def _get_auth_code(client, cookies, confidential_client, **extra_params) -> str: r = client.get( "/oauth2/authorize", params={ "client_id": confidential_client["client_id"], "redirect_uri": confidential_client["redirect_uri"], "response_type": "code", "scope": "openid profile", **extra_params, }, cookies=cookies, follow_redirects=False, ) assert r.status_code == 302 qs = parse_qs(urlparse(r.headers["location"]).query) return qs["code"][0] def test_token_exchange_confidential_client(client, admin, confidential_client): cookies = auth_cookies("admin", "adminpass1", client) code = _get_auth_code(client, cookies, confidential_client) r = client.post("/oauth2/token", data={ "grant_type": "authorization_code", "code": code, "redirect_uri": confidential_client["redirect_uri"], "client_id": confidential_client["client_id"], "client_secret": confidential_client["client_secret"], }) assert r.status_code == 200 data = r.json() assert "id_token" in data assert data["token_type"] == "Bearer" def test_token_exchange_wrong_secret(client, admin, confidential_client): cookies = auth_cookies("admin", "adminpass1", client) code = _get_auth_code(client, cookies, confidential_client) r = client.post("/oauth2/token", data={ "grant_type": "authorization_code", "code": code, "redirect_uri": confidential_client["redirect_uri"], "client_id": confidential_client["client_id"], "client_secret": "wrong-secret", }) assert r.status_code == 401 def test_token_exchange_code_reuse(client, admin, confidential_client): cookies = auth_cookies("admin", "adminpass1", client) code = _get_auth_code(client, cookies, confidential_client) payload = { "grant_type": "authorization_code", "code": code, "redirect_uri": confidential_client["redirect_uri"], "client_id": confidential_client["client_id"], "client_secret": confidential_client["client_secret"], } client.post("/oauth2/token", data=payload) r = client.post("/oauth2/token", data=payload) assert r.status_code == 400 def test_id_token_claims(client, admin, confidential_client): cookies = auth_cookies("admin", "adminpass1", client) code = _get_auth_code(client, cookies, confidential_client) r = client.post("/oauth2/token", data={ "grant_type": "authorization_code", "code": code, "redirect_uri": confidential_client["redirect_uri"], "client_id": confidential_client["client_id"], "client_secret": confidential_client["client_secret"], }) id_token = r.json()["id_token"] # Decode without verifying signature to inspect claims payload = jwt.decode(id_token, options={"verify_signature": False}) assert payload["sub"] == "admin" assert payload["iss"] == "https://auth.example.test" assert payload["aud"] == "test-app" assert payload["is_admin"] is True # ── PKCE flow (public client) ───────────────────────────────────────────────── def _pkce_pair() -> tuple[str, str]: verifier = secrets.token_urlsafe(48) challenge = base64.urlsafe_b64encode( hashlib.sha256(verifier.encode()).digest() ).rstrip(b"=").decode() return verifier, challenge def test_pkce_flow(client, admin, public_client): cookies = auth_cookies("admin", "adminpass1", client) verifier, challenge = _pkce_pair() # Authorize with PKCE challenge r = client.get( "/oauth2/authorize", params={ "client_id": public_client["client_id"], "redirect_uri": public_client["redirect_uri"], "response_type": "code", "scope": "openid profile", "code_challenge": challenge, "code_challenge_method": "S256", }, cookies=cookies, follow_redirects=False, ) assert r.status_code == 302 code = parse_qs(urlparse(r.headers["location"]).query)["code"][0] # Exchange with verifier r2 = client.post("/oauth2/token", data={ "grant_type": "authorization_code", "code": code, "redirect_uri": public_client["redirect_uri"], "client_id": public_client["client_id"], "code_verifier": verifier, }) assert r2.status_code == 200 assert "id_token" in r2.json() def test_pkce_wrong_verifier(client, admin, public_client): cookies = auth_cookies("admin", "adminpass1", client) verifier, challenge = _pkce_pair() r = client.get( "/oauth2/authorize", params={ "client_id": public_client["client_id"], "redirect_uri": public_client["redirect_uri"], "response_type": "code", "scope": "openid profile", "code_challenge": challenge, "code_challenge_method": "S256", }, cookies=cookies, follow_redirects=False, ) code = parse_qs(urlparse(r.headers["location"]).query)["code"][0] r2 = client.post("/oauth2/token", data={ "grant_type": "authorization_code", "code": code, "redirect_uri": public_client["redirect_uri"], "client_id": public_client["client_id"], "code_verifier": "wrong-verifier", }) assert r2.status_code == 400 # ── Userinfo endpoint ───────────────────────────────────────────────────────── def test_userinfo(client, admin, confidential_client): cookies = auth_cookies("admin", "adminpass1", client) code = _get_auth_code(client, cookies, confidential_client) token_resp = client.post("/oauth2/token", data={ "grant_type": "authorization_code", "code": code, "redirect_uri": confidential_client["redirect_uri"], "client_id": confidential_client["client_id"], "client_secret": confidential_client["client_secret"], }) access_token = token_resp.json()["access_token"] r = client.get("/oauth2/userinfo", headers={"Authorization": f"Bearer {access_token}"}) assert r.status_code == 200 data = r.json() assert data["sub"] == "admin" assert data["preferred_username"] == "admin" def test_userinfo_invalid_token(client): r = client.get("/oauth2/userinfo", headers={"Authorization": "Bearer bogus"}) assert r.status_code == 401 def test_userinfo_no_token(client): r = client.get("/oauth2/userinfo") assert r.status_code == 401