Improvments to logging\auditing

This commit is contained in:
Ubuntu
2026-05-19 10:38:26 +00:00
31 changed files with 2101 additions and 131 deletions
@@ -0,0 +1,186 @@
"""Superadmin session timeout integration tests.
Validates the absolute-only timeout policy for superadmin sessions.
Superadmin sessions do NOT have idle timeout — only absolute timeout.
"""
import pytest
import uuid
from datetime import datetime, timedelta, timezone
from tests.integration.client.base import ApiError
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def assert_success(response: dict, message_contains: str = "") -> dict:
"""Assert that an api_response-wrapped payload succeeded."""
data = response.get("data", {})
assert response.get("success") is not False, (
f"Expected success but got error: {response.get('message')}"
)
if message_contains:
assert message_contains.lower() in response.get("message", "").lower(), (
f"Expected message to contain '{message_contains}' but got: {response.get('message')}"
)
return data
def _get_session_row(integration_app, token: str):
"""Look up the Session model row for a given bearer token."""
from gatehouse_app.models.user.session import Session
with integration_app.app_context():
return Session.query.filter_by(token=token).first()
def _touch_session(integration_app, session_id: str, **updates):
"""Directly update columns on a Session row.
Only use this to simulate the passage of time — never to assert
internal state.
"""
from gatehouse_app.models.user.session import Session
with integration_app.app_context():
sess = Session.query.get(session_id)
for attr, value in updates.items():
setattr(sess, attr, value)
from gatehouse_app import db
db.session.commit()
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def superadmin_credentials(integration_app):
"""Create a superadmin and return login credentials."""
from gatehouse_app.services.superadmin_auth_service import SuperadminAuthService
email = f"admin_{uuid.uuid4().hex[:8]}@gatehouse.local"
password = "SuperAdmin123!"
with integration_app.app_context():
sa = SuperadminAuthService.create_superadmin(
email=email,
credential=password,
full_name="Test Superadmin",
)
return {"id": str(sa.id), "email": email, "password": password}
@pytest.fixture
def logged_in_superadmin(integration_client, superadmin_credentials, integration_app):
"""Log in as superadmin and return session metadata.
Returns dict with ``superadmin``, ``token``, ``session_id``, ``session_row``.
"""
creds = superadmin_credentials
resp = integration_client.post(
"/api/v1/superadmin/auth/login",
data={"email": creds["email"], "password": creds["password"]},
)
data = assert_success(resp)
token = data["token"]
session_row = _get_session_row(integration_app, token)
assert session_row is not None, "Session row should exist after superadmin login"
return {
"superadmin": creds,
"token": token,
"session_id": session_row.id,
"session_row": session_row,
}
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestSuperadminSessionTimeouts:
"""Absolute-only timeout behavior for superadmin sessions."""
def test_superadmin_session_valid_before_timeout(
self, integration_client, logged_in_superadmin,
):
"""SA-SESS-01 — Fresh superadmin session is accepted."""
integration_client.set_token(logged_in_superadmin["token"])
result = integration_client.get("/api/v1/superadmin/auth/me")
data = assert_success(result)
assert "superadmin" in data
def test_absolute_timeout_rejects_superadmin(
self, integration_client, logged_in_superadmin, integration_app,
):
"""SA-SESS-02 — Superadmin session rejected after absolute timeout.
Push ``created_at`` far into the past. The session must be
rejected even though ``last_activity_at`` is fresh.
"""
_touch_session(
integration_app,
logged_in_superadmin["session_id"],
created_at=datetime.now(timezone.utc) - timedelta(days=1),
last_activity_at=datetime.now(timezone.utc),
)
integration_client.set_token(logged_in_superadmin["token"])
with pytest.raises(ApiError) as exc_info:
integration_client.get("/api/v1/superadmin/auth/me")
assert exc_info.value.status_code == 401
def test_idle_timeout_does_NOT_reject_superadmin(
self, integration_client, logged_in_superadmin, integration_app,
):
"""SA-SESS-03 — Superadmin sessions have NO idle timeout.
Push ``last_activity_at`` far into the past but keep
``created_at`` recent. The session should still be valid
because superadmin sessions only use absolute timeout.
"""
_touch_session(
integration_app,
logged_in_superadmin["session_id"],
last_activity_at=datetime.now(timezone.utc) - timedelta(hours=1),
)
integration_client.set_token(logged_in_superadmin["token"])
result = integration_client.get("/api/v1/superadmin/auth/me")
data = assert_success(result)
assert "superadmin" in data
def test_revoked_superadmin_session_rejected(
self, integration_client, logged_in_superadmin,
):
"""SA-SESS-04 — Revoked superadmin session is rejected."""
integration_client.set_token(logged_in_superadmin["token"])
# Logout revokes the session
integration_client.post("/api/v1/superadmin/auth/logout")
integration_client.clear_token()
# Try using the old token
integration_client.set_token(logged_in_superadmin["token"])
with pytest.raises(ApiError) as exc_info:
integration_client.get("/api/v1/superadmin/auth/me")
assert exc_info.value.status_code == 401
def test_superadmin_session_has_owner_type(
self, integration_app, logged_in_superadmin,
):
"""SA-SESS-05 — Superadmin session row has owner_type='superadmin'."""
from gatehouse_app.models.user.session import Session
from gatehouse_app.utils.constants import SessionType
with integration_app.app_context():
sess = Session.query.get(logged_in_superadmin["session_id"])
assert sess is not None
assert sess.owner_type == SessionType.SUPERADMIN
assert sess.owner_id == logged_in_superadmin["superadmin"]["id"]
+503
View File
@@ -0,0 +1,503 @@
"""Unit tests for per-client CORS feature.
WHAT: Tests for per-client CORS origin resolution, including OIDCClient
model methods, request client_id extraction, effective origin
resolution, and integration with the CORS middleware.
WHY: Per-client CORS prevents one OIDC client from making cross-origin
requests meant for another; misconfiguration breaks browser flows.
EXPECTED: Correct origin derivation, proper client_id extraction, and
correct CORS headers on OIDC endpoints.
"""
import base64
import json
from unittest.mock import patch
import pytest
from flask import Flask, request as flask_request
import gatehouse_app.middleware.cors as cors_module
from gatehouse_app.middleware.cors import (
_get_oidc_client_id_from_request,
_get_effective_cors_origins,
setup_cors,
)
# ---------------------------------------------------------------------------
# Helper: build a lightweight stub that quacks like OIDCClient
# ---------------------------------------------------------------------------
class StubClient:
"""Minimal stand-in for OIDCClient -- no SQLAlchemy, no DB needed."""
def __init__(self, *, allowed_cors_origins=None, redirect_uris=None):
self.allowed_cors_origins = allowed_cors_origins
self.redirect_uris = redirect_uris or []
def get_effective_origins(self):
from urllib.parse import urlparse
if self.allowed_cors_origins is None:
return None
if "+" in self.allowed_cors_origins:
origins = set()
for uri in self.redirect_uris:
parsed = urlparse(uri)
if parsed.scheme and parsed.hostname:
port = f":{parsed.port}" if parsed.port else ""
origins.add(f"{parsed.scheme}://{parsed.hostname}{port}")
return sorted(origins)
return list(self.allowed_cors_origins)
def is_origin_allowed(self, origin):
effective = self.get_effective_origins()
if effective is None:
return None
if "*" in effective:
return True
return origin in effective
def _basic_auth_header(client_id, secret="secret"):
"""Return a 'Basic <b64>' Authorization header value."""
return "Basic " + base64.b64encode(f"{client_id}:{secret}".encode()).decode()
# ---------------------------------------------------------------------------
# OIDCClient.get_effective_origins
# ---------------------------------------------------------------------------
class TestGetEffectiveOrigins:
def test_returns_none_when_allowed_cors_origins_is_none(self):
"""TEST: PCORS-GE-01 -- None config signals 'use global'."""
client = StubClient(allowed_cors_origins=None)
assert client.get_effective_origins() is None
def test_derives_from_redirect_uris_when_plus_sign(self):
"""TEST: PCORS-GE-02 -- '+' in list derives origins from redirect_uris."""
client = StubClient(
allowed_cors_origins=["+"],
redirect_uris=[
"https://app.example.com/callback",
"http://localhost:3000/callback",
],
)
assert client.get_effective_origins() == [
"http://localhost:3000",
"https://app.example.com",
]
def test_derives_with_port(self):
"""TEST: PCORS-GE-03 -- Non-standard ports are preserved."""
client = StubClient(
allowed_cors_origins=["+"],
redirect_uris=["https://app.example.com:8443/cb"],
)
assert client.get_effective_origins() == ["https://app.example.com:8443"]
def test_deduplicates_derived_origins(self):
"""TEST: PCORS-GE-04 -- Duplicate redirect URIs produce unique origins."""
client = StubClient(
allowed_cors_origins=["+"],
redirect_uris=[
"https://app.example.com/cb1",
"https://app.example.com/cb2",
],
)
assert client.get_effective_origins() == ["https://app.example.com"]
def test_returns_list_as_is_when_normal_list(self):
"""TEST: PCORS-GE-05 -- Normal list is returned unchanged."""
origins = ["https://a.com", "https://b.com"]
client = StubClient(allowed_cors_origins=origins)
assert client.get_effective_origins() == origins
def test_returns_wildcard_list_as_is(self):
"""TEST: PCORS-GE-06 -- ['*'] is returned (handled downstream)."""
client = StubClient(allowed_cors_origins=["*"])
assert client.get_effective_origins() == ["*"]
def test_empty_redirect_uris_with_plus_returns_empty(self):
"""TEST: PCORS-GE-07 -- '+' with empty redirect_uris yields empty list."""
client = StubClient(
allowed_cors_origins=["+"],
redirect_uris=[],
)
assert client.get_effective_origins() == []
def test_skips_malformed_redirect_uris(self):
"""TEST: PCORS-GE-08 -- URIs without scheme/host are skipped."""
client = StubClient(
allowed_cors_origins=["+"],
redirect_uris=["not-a-uri", "https://good.com/cb"],
)
assert client.get_effective_origins() == ["https://good.com"]
# ---------------------------------------------------------------------------
# OIDCClient.is_origin_allowed
# ---------------------------------------------------------------------------
class TestIsOriginAllowed:
def test_returns_none_when_no_per_client_config(self):
"""TEST: PCORS-IO-01 -- None config defers to global CORS."""
client = StubClient(allowed_cors_origins=None)
assert client.is_origin_allowed("https://anything.com") is None
def test_returns_true_when_wildcard(self):
"""TEST: PCORS-IO-02 -- '*' in effective origins allows any origin."""
client = StubClient(allowed_cors_origins=["*"])
assert client.is_origin_allowed("https://evil.com") is True
def test_returns_true_for_matching_origin(self):
"""TEST: PCORS-IO-03 -- Matching origin is allowed."""
client = StubClient(
allowed_cors_origins=["https://app.example.com", "https://other.com"],
)
assert client.is_origin_allowed("https://app.example.com") is True
def test_returns_false_for_non_matching_origin(self):
"""TEST: PCORS-IO-04 -- Non-matching origin is rejected."""
client = StubClient(allowed_cors_origins=["https://app.example.com"])
assert client.is_origin_allowed("https://evil.com") is False
def test_returns_false_for_empty_list(self):
"""TEST: PCORS-IO-05 -- Empty list rejects everything."""
client = StubClient(allowed_cors_origins=[])
assert client.is_origin_allowed("https://anything.com") is False
# ---------------------------------------------------------------------------
# _get_oidc_client_id_from_request
# ---------------------------------------------------------------------------
class TestGetOidcClientIdFromRequest:
@pytest.fixture
def app(self):
app = Flask(__name__)
app.config["TESTING"] = True
return app
def test_extracts_from_basic_auth(self, app):
"""TEST: PCORS-CI-01 -- Basic Auth header yields client_id."""
with app.test_request_context(
"/oidc/token",
method="POST",
headers={"Authorization": _basic_auth_header("my-client")},
):
assert _get_oidc_client_id_from_request() == "my-client"
def test_extracts_from_form_body(self, app):
"""TEST: PCORS-CI-02 -- Form-encoded body yields client_id."""
with app.test_request_context(
"/oidc/token",
method="POST",
data={"client_id": "form-client", "grant_type": "client_credentials"},
):
assert _get_oidc_client_id_from_request() == "form-client"
def test_extracts_from_json_body(self, app):
"""TEST: PCORS-CI-03 -- JSON body yields client_id."""
with app.test_request_context(
"/oidc/token",
method="POST",
data=json.dumps({"client_id": "json-client", "grant_type": "client_credentials"}),
content_type="application/json",
):
assert _get_oidc_client_id_from_request() == "json-client"
def test_extracts_from_bearer_jwt(self, app):
"""TEST: PCORS-CI-04 -- Bearer JWT payload yields client_id."""
payload = base64.urlsafe_b64encode(
json.dumps({"client_id": "jwt-client"}).encode()
).rstrip(b"=").decode()
token = f"eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.{payload}.sig"
with app.test_request_context(
"/oidc/userinfo",
method="GET",
headers={"Authorization": f"Bearer {token}"},
):
assert _get_oidc_client_id_from_request() == "jwt-client"
def test_returns_none_for_non_oidc_endpoint(self, app):
"""TEST: PCORS-CI-05 -- Non-OIDC path returns None."""
with app.test_request_context(
"/api/v1/users",
method="GET",
headers={"Authorization": _basic_auth_header("x")},
):
assert _get_oidc_client_id_from_request() is None
def test_returns_none_when_no_client_id_found(self, app):
"""TEST: PCORS-CI-06 -- OIDC token endpoint with no credentials returns None."""
with app.test_request_context(
"/oidc/token",
method="POST",
data={"grant_type": "client_credentials"},
):
assert _get_oidc_client_id_from_request() is None
def test_extracts_from_revoke_endpoint(self, app):
"""TEST: PCORS-CI-07 -- /oidc/revoke also accepts Basic Auth."""
with app.test_request_context(
"/oidc/revoke",
method="POST",
headers={"Authorization": _basic_auth_header("rev-client")},
):
assert _get_oidc_client_id_from_request() == "rev-client"
def test_extracts_from_introspect_endpoint(self, app):
"""TEST: PCORS-CI-08 -- /oidc/introspect also accepts Basic Auth."""
with app.test_request_context(
"/oidc/introspect",
method="POST",
headers={"Authorization": _basic_auth_header("int-client")},
):
assert _get_oidc_client_id_from_request() == "int-client"
def test_returns_none_for_options_preflight(self, app):
"""TEST: PCORS-CI-09 -- OPTIONS preflight cannot carry client credentials."""
with app.test_request_context(
"/oidc/token",
method="OPTIONS",
headers={
"Origin": "https://app.com",
"Access-Control-Request-Method": "POST",
},
):
assert _get_oidc_client_id_from_request() is None
# ---------------------------------------------------------------------------
# _get_effective_cors_origins
# ---------------------------------------------------------------------------
class TestGetEffectiveCorsOrigins:
@pytest.fixture
def app(self):
app = Flask(__name__)
app.config["TESTING"] = True
app.config["CORS_ORIGINS"] = ["https://global.com"]
return app
def test_global_config_for_non_oidc_endpoint(self, app):
"""TEST: PCORS-EO-01 -- Non-OIDC path always uses global config."""
with app.test_request_context("/api/v1/users", method="GET"):
result = _get_effective_cors_origins(app, flask_request)
assert result == ["https://global.com"]
def test_per_client_origins_for_oidc_endpoint(self, app):
"""TEST: PCORS-EO-02 -- OIDC endpoint with configured client uses per-client origins."""
fake_client = StubClient(allowed_cors_origins=["https://client.com"])
with app.test_request_context(
"/oidc/token",
method="POST",
headers={"Authorization": _basic_auth_header("test-client")},
):
with patch.object(cors_module, "OIDCClient") as MockModel:
MockModel.query.filter_by.return_value.first.return_value = fake_client
result = _get_effective_cors_origins(app, flask_request)
assert result == ["https://client.com"]
def test_fallback_when_client_not_found(self, app):
"""TEST: PCORS-EO-03 -- Unknown client_id falls back to global config."""
with app.test_request_context(
"/oidc/token",
method="POST",
headers={"Authorization": _basic_auth_header("unknown")},
):
with patch.object(cors_module, "OIDCClient") as MockModel:
MockModel.query.filter_by.return_value.first.return_value = None
result = _get_effective_cors_origins(app, flask_request)
assert result == ["https://global.com"]
def test_fallback_when_allowed_cors_origins_is_none(self, app):
"""TEST: PCORS-EO-04 -- Client with None origins falls back to global."""
fake_client = StubClient(allowed_cors_origins=None)
with app.test_request_context(
"/oidc/token",
method="POST",
headers={"Authorization": _basic_auth_header("test-client")},
):
with patch.object(cors_module, "OIDCClient") as MockModel:
MockModel.query.filter_by.return_value.first.return_value = fake_client
result = _get_effective_cors_origins(app, flask_request)
assert result == ["https://global.com"]
def test_fallback_on_db_error(self, app):
"""TEST: PCORS-EO-05 -- Database exception falls back to global config."""
with app.test_request_context(
"/oidc/token",
method="POST",
headers={"Authorization": _basic_auth_header("test-client")},
):
with patch.object(cors_module, "OIDCClient") as MockModel:
MockModel.query.filter_by.side_effect = Exception("DB down")
result = _get_effective_cors_origins(app, flask_request)
assert result == ["https://global.com"]
def test_fallback_when_no_client_id_extracted(self, app):
"""TEST: PCORS-EO-06 -- OIDC path with no credentials falls back to global."""
with app.test_request_context(
"/oidc/token",
method="POST",
data={"grant_type": "client_credentials"},
):
result = _get_effective_cors_origins(app, flask_request)
assert result == ["https://global.com"]
# ---------------------------------------------------------------------------
# Integration: OIDC endpoint CORS headers
# ---------------------------------------------------------------------------
class TestOidcEndpointCorsIntegration:
@pytest.fixture
def app_with_global_and_client(self):
"""Flask app with global CORS and route stubs for integration tests."""
app = Flask(__name__)
app.config["TESTING"] = True
app.config["CORS_ORIGINS"] = ["https://global.com"]
app.config["CORS_SUPPORTS_CREDENTIALS"] = True
@app.route("/oidc/token", methods=["POST", "OPTIONS"])
def oidc_token():
return {"status": "ok"}, 200
@app.route("/api/v1/users", methods=["GET", "OPTIONS"])
def api_users():
return {"users": []}, 200
setup_cors(app)
return app
def test_post_oidc_with_per_client_origin_includes_cors_headers(
self, app_with_global_and_client
):
"""TEST: PCORS-INT-01 -- POST to /oidc/token with per-client origin and
Basic Auth includes CORS headers. Per-client CORS applies to the actual
request (which carries credentials), not the preflight."""
fake_client = StubClient(allowed_cors_origins=["https://client-app.com"])
with patch.object(cors_module, "OIDCClient") as MockModel:
MockModel.query.filter_by.return_value.first.return_value = fake_client
with app_with_global_and_client.test_client() as client:
resp = client.post(
"/oidc/token",
headers={
"Origin": "https://client-app.com",
"Authorization": _basic_auth_header("test-client"),
},
)
assert resp.status_code == 200
assert (
resp.headers.get("Access-Control-Allow-Origin")
== "https://client-app.com"
)
assert resp.headers.get("Access-Control-Allow-Credentials") == "true"
def test_post_oidc_with_non_matching_per_client_origin_no_cors_headers(
self, app_with_global_and_client
):
"""TEST: PCORS-INT-02 -- POST with origin not in per-client list has no
CORS headers."""
fake_client = StubClient(allowed_cors_origins=["https://allowed.com"])
with patch.object(cors_module, "OIDCClient") as MockModel:
MockModel.query.filter_by.return_value.first.return_value = fake_client
with app_with_global_and_client.test_client() as client:
resp = client.post(
"/oidc/token",
headers={
"Origin": "https://evil.com",
"Authorization": _basic_auth_header("test-client"),
},
)
assert resp.status_code == 200
assert resp.headers.get("Access-Control-Allow-Origin") is None
def test_post_oidc_wildcard_client_echoes_origin(self, app_with_global_and_client):
"""TEST: PCORS-INT-03 -- Client with '*' echoes the request origin."""
fake_client = StubClient(allowed_cors_origins=["*"])
with patch.object(cors_module, "OIDCClient") as MockModel:
MockModel.query.filter_by.return_value.first.return_value = fake_client
with app_with_global_and_client.test_client() as client:
resp = client.post(
"/oidc/token",
headers={
"Origin": "https://any-origin.com",
"Authorization": _basic_auth_header("test-client"),
},
)
assert resp.status_code == 200
assert (
resp.headers.get("Access-Control-Allow-Origin")
== "https://any-origin.com"
)
def test_preflight_oidc_falls_back_to_global(self, app_with_global_and_client):
"""TEST: PCORS-INT-04 -- OPTIONS preflight cannot carry client credentials,
so it uses global CORS config. A preflight from a per-client-only origin
that is not in the global list will not receive CORS headers."""
fake_client = StubClient(allowed_cors_origins=["https://client-app.com"])
with patch.object(cors_module, "OIDCClient") as MockModel:
MockModel.query.filter_by.return_value.first.return_value = fake_client
with app_with_global_and_client.test_client() as client:
resp = client.options(
"/oidc/token",
headers={
"Origin": "https://client-app.com",
"Access-Control-Request-Method": "POST",
},
)
# Origin not in global list; no CORS headers on preflight
assert resp.headers.get("Access-Control-Allow-Origin") is None
def test_preflight_oidc_with_global_origin_succeeds(self, app_with_global_and_client):
"""TEST: PCORS-INT-05 -- OPTIONS preflight from a globally-allowed origin
returns 204 with CORS headers even for OIDC endpoints."""
with app_with_global_and_client.test_client() as client:
resp = client.options(
"/oidc/token",
headers={
"Origin": "https://global.com",
"Access-Control-Request-Method": "POST",
},
)
assert resp.status_code == 204
assert resp.headers.get("Access-Control-Allow-Origin") == "https://global.com"
assert resp.headers.get("Access-Control-Allow-Credentials") == "true"
def test_non_oidc_endpoint_uses_global_cors(self, app_with_global_and_client):
"""TEST: PCORS-INT-06 -- Non-OIDC endpoint uses global CORS config."""
with app_with_global_and_client.test_client() as client:
resp = client.options(
"/api/v1/users",
headers={
"Origin": "https://global.com",
"Access-Control-Request-Method": "GET",
},
)
assert resp.status_code == 204
assert resp.headers.get("Access-Control-Allow-Origin") == "https://global.com"
def test_post_oidc_no_auth_uses_global_cors(self, app_with_global_and_client):
"""TEST: PCORS-INT-07 -- POST to OIDC endpoint without credentials uses
global CORS (cannot identify client)."""
with app_with_global_and_client.test_client() as client:
resp = client.post(
"/oidc/token",
headers={"Origin": "https://global.com"},
)
assert resp.status_code == 200
assert (
resp.headers.get("Access-Control-Allow-Origin") == "https://global.com"
)