Files
bincio-auth/tests/test_oidc.py
T

345 lines
12 KiB
Python

"""Tests for the OIDC / OAuth2 authorization code flow."""
from __future__ import annotations
import base64
import hashlib
import secrets
from urllib.parse import parse_qs, urlparse
import jwt
import pytest
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from bincio.auth import deps
from bincio.auth.db import upsert_oauth2_client
from .conftest import auth_cookies, login
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture(autouse=True)
def oidc_config(tmp_path):
"""Enable OIDC mode for all tests in this module."""
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
).decode()
deps.oidc_private_key_pem = pem
deps.oidc_issuer = "https://auth.example.test"
@pytest.fixture()
def confidential_client(db):
upsert_oauth2_client(
db,
client_id="test-app",
client_secret="test-secret",
name="Test App",
redirect_uris=["https://app.example.test/callback"],
)
return {"client_id": "test-app", "client_secret": "test-secret",
"redirect_uri": "https://app.example.test/callback"}
@pytest.fixture()
def public_client(db):
upsert_oauth2_client(
db,
client_id="mobile-app",
client_secret=None,
name="Mobile App",
redirect_uris=["myapp://callback"],
)
return {"client_id": "mobile-app", "redirect_uri": "myapp://callback"}
# ── Discovery ─────────────────────────────────────────────────────────────────
def test_openid_configuration(client):
r = client.get("/.well-known/openid-configuration")
assert r.status_code == 200
data = r.json()
assert data["issuer"] == "https://auth.example.test"
assert "authorization_endpoint" in data
assert "token_endpoint" in data
assert "jwks_uri" in data
def test_jwks(client):
r = client.get("/.well-known/jwks.json")
assert r.status_code == 200
keys = r.json()["keys"]
assert len(keys) == 1
assert keys[0]["kty"] == "RSA"
assert keys[0]["alg"] == "RS256"
def test_discovery_disabled_without_oidc(client):
deps.oidc_issuer = ""
r = client.get("/.well-known/openid-configuration")
assert r.status_code == 503
# ── Authorization endpoint ────────────────────────────────────────────────────
def test_authorize_redirects_to_login_when_unauthenticated(client, confidential_client):
r = client.get(
"/oauth2/authorize",
params={
"client_id": confidential_client["client_id"],
"redirect_uri": confidential_client["redirect_uri"],
"response_type": "code",
"scope": "openid profile",
},
follow_redirects=False,
)
assert r.status_code == 302
assert "/login/" in r.headers["location"]
def test_authorize_issues_code_when_authenticated(client, admin, confidential_client):
cookies = auth_cookies("admin", "adminpass1", client)
r = client.get(
"/oauth2/authorize",
params={
"client_id": confidential_client["client_id"],
"redirect_uri": confidential_client["redirect_uri"],
"response_type": "code",
"scope": "openid profile",
"state": "xyz",
},
cookies=cookies,
follow_redirects=False,
)
assert r.status_code == 302
location = r.headers["location"]
qs = parse_qs(urlparse(location).query)
assert "code" in qs
assert qs["state"][0] == "xyz"
def test_authorize_unknown_client(client, admin):
cookies = auth_cookies("admin", "adminpass1", client)
r = client.get(
"/oauth2/authorize",
params={
"client_id": "unknown",
"redirect_uri": "https://somewhere.test/cb",
"response_type": "code",
},
cookies=cookies,
follow_redirects=False,
)
assert r.status_code == 400
def test_authorize_unregistered_redirect_uri(client, admin, confidential_client):
cookies = auth_cookies("admin", "adminpass1", client)
r = client.get(
"/oauth2/authorize",
params={
"client_id": confidential_client["client_id"],
"redirect_uri": "https://evil.example.test/steal",
"response_type": "code",
},
cookies=cookies,
follow_redirects=False,
)
assert r.status_code == 400
# ── Full authorization code flow (confidential client) ────────────────────────
def _get_auth_code(client, cookies, confidential_client, **extra_params) -> str:
r = client.get(
"/oauth2/authorize",
params={
"client_id": confidential_client["client_id"],
"redirect_uri": confidential_client["redirect_uri"],
"response_type": "code",
"scope": "openid profile",
**extra_params,
},
cookies=cookies,
follow_redirects=False,
)
assert r.status_code == 302
qs = parse_qs(urlparse(r.headers["location"]).query)
return qs["code"][0]
def test_token_exchange_confidential_client(client, admin, confidential_client):
cookies = auth_cookies("admin", "adminpass1", client)
code = _get_auth_code(client, cookies, confidential_client)
r = client.post("/oauth2/token", data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": confidential_client["redirect_uri"],
"client_id": confidential_client["client_id"],
"client_secret": confidential_client["client_secret"],
})
assert r.status_code == 200
data = r.json()
assert "id_token" in data
assert data["token_type"] == "Bearer"
def test_token_exchange_wrong_secret(client, admin, confidential_client):
cookies = auth_cookies("admin", "adminpass1", client)
code = _get_auth_code(client, cookies, confidential_client)
r = client.post("/oauth2/token", data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": confidential_client["redirect_uri"],
"client_id": confidential_client["client_id"],
"client_secret": "wrong-secret",
})
assert r.status_code == 401
def test_token_exchange_code_reuse(client, admin, confidential_client):
cookies = auth_cookies("admin", "adminpass1", client)
code = _get_auth_code(client, cookies, confidential_client)
payload = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": confidential_client["redirect_uri"],
"client_id": confidential_client["client_id"],
"client_secret": confidential_client["client_secret"],
}
client.post("/oauth2/token", data=payload)
r = client.post("/oauth2/token", data=payload)
assert r.status_code == 400
def test_id_token_claims(client, admin, confidential_client):
cookies = auth_cookies("admin", "adminpass1", client)
code = _get_auth_code(client, cookies, confidential_client)
r = client.post("/oauth2/token", data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": confidential_client["redirect_uri"],
"client_id": confidential_client["client_id"],
"client_secret": confidential_client["client_secret"],
})
id_token = r.json()["id_token"]
# Decode without verifying signature to inspect claims
payload = jwt.decode(id_token, options={"verify_signature": False})
assert payload["sub"] == "admin"
assert payload["iss"] == "https://auth.example.test"
assert payload["aud"] == "test-app"
assert payload["is_admin"] is True
# ── PKCE flow (public client) ─────────────────────────────────────────────────
def _pkce_pair() -> tuple[str, str]:
verifier = secrets.token_urlsafe(48)
challenge = base64.urlsafe_b64encode(
hashlib.sha256(verifier.encode()).digest()
).rstrip(b"=").decode()
return verifier, challenge
def test_pkce_flow(client, admin, public_client):
cookies = auth_cookies("admin", "adminpass1", client)
verifier, challenge = _pkce_pair()
# Authorize with PKCE challenge
r = client.get(
"/oauth2/authorize",
params={
"client_id": public_client["client_id"],
"redirect_uri": public_client["redirect_uri"],
"response_type": "code",
"scope": "openid profile",
"code_challenge": challenge,
"code_challenge_method": "S256",
},
cookies=cookies,
follow_redirects=False,
)
assert r.status_code == 302
code = parse_qs(urlparse(r.headers["location"]).query)["code"][0]
# Exchange with verifier
r2 = client.post("/oauth2/token", data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": public_client["redirect_uri"],
"client_id": public_client["client_id"],
"code_verifier": verifier,
})
assert r2.status_code == 200
assert "id_token" in r2.json()
def test_pkce_wrong_verifier(client, admin, public_client):
cookies = auth_cookies("admin", "adminpass1", client)
verifier, challenge = _pkce_pair()
r = client.get(
"/oauth2/authorize",
params={
"client_id": public_client["client_id"],
"redirect_uri": public_client["redirect_uri"],
"response_type": "code",
"scope": "openid profile",
"code_challenge": challenge,
"code_challenge_method": "S256",
},
cookies=cookies,
follow_redirects=False,
)
code = parse_qs(urlparse(r.headers["location"]).query)["code"][0]
r2 = client.post("/oauth2/token", data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": public_client["redirect_uri"],
"client_id": public_client["client_id"],
"code_verifier": "wrong-verifier",
})
assert r2.status_code == 400
# ── Userinfo endpoint ─────────────────────────────────────────────────────────
def test_userinfo(client, admin, confidential_client):
cookies = auth_cookies("admin", "adminpass1", client)
code = _get_auth_code(client, cookies, confidential_client)
token_resp = client.post("/oauth2/token", data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": confidential_client["redirect_uri"],
"client_id": confidential_client["client_id"],
"client_secret": confidential_client["client_secret"],
})
access_token = token_resp.json()["access_token"]
r = client.get("/oauth2/userinfo", headers={"Authorization": f"Bearer {access_token}"})
assert r.status_code == 200
data = r.json()
assert data["sub"] == "admin"
assert data["preferred_username"] == "admin"
def test_userinfo_invalid_token(client):
r = client.get("/oauth2/userinfo", headers={"Authorization": "Bearer bogus"})
assert r.status_code == 401
def test_userinfo_no_token(client):
r = client.get("/oauth2/userinfo")
assert r.status_code == 401