From 803bf4f4f24d5aee7b6e25db2cb6f42177d7d43f Mon Sep 17 00:00:00 2001 From: Cory Hawklvelt Date: Tue, 28 Apr 2026 20:54:15 +0930 Subject: [PATCH] refactor: consolidate user and superadmin sessions into unified model --- gatehouse_app/decorators/superadmin.py | 16 +- gatehouse_app/models/__init__.py | 2 - gatehouse_app/models/superadmin/__init__.py | 4 +- gatehouse_app/models/superadmin/superadmin.py | 12 +- .../models/superadmin/superadmin_session.py | 80 -------- gatehouse_app/models/user/session.py | 66 +++++-- gatehouse_app/services/auth_service.py | 4 +- gatehouse_app/services/session_service.py | 75 +++++-- .../services/superadmin_auth_service.py | 24 ++- gatehouse_app/utils/constants.py | 7 + .../c8d2e4f6a1b3_consolidate_sessions.py | 122 ++++++++++++ .../test_superadmin_session_timeouts.py | 186 ++++++++++++++++++ 12 files changed, 472 insertions(+), 126 deletions(-) delete mode 100644 gatehouse_app/models/superadmin/superadmin_session.py create mode 100644 migrations/versions/c8d2e4f6a1b3_consolidate_sessions.py create mode 100644 tests/integration/test_superadmin_session_timeouts.py diff --git a/gatehouse_app/decorators/superadmin.py b/gatehouse_app/decorators/superadmin.py index c318a68..d7401ac 100644 --- a/gatehouse_app/decorators/superadmin.py +++ b/gatehouse_app/decorators/superadmin.py @@ -15,7 +15,7 @@ def superadmin_required(f): """Decorator to require superadmin Bearer token authentication. Extracts token from Authorization: Bearer {token} header, - validates the session against SuperadminSession table, + validates the session against the unified sessions table, and sets g.current_superadmin and g.superadmin_session. Returns 401 if no valid session, 403 if not a superadmin. @@ -46,10 +46,14 @@ def superadmin_required(f): token = parts[1] # Import here to avoid circular imports - from gatehouse_app.models.superadmin import SuperadminSession, Superadmin + from gatehouse_app.models.user.session import Session + from gatehouse_app.models.superadmin import Superadmin + from gatehouse_app.utils.constants import SessionType - # Get active session by token - session = SuperadminSession.query.filter_by(token=token).first() + # Get active session by token, scoped to superadmin + session = Session.query.filter_by( + token=token, owner_type=SessionType.SUPERADMIN + ).first() if not session: return api_response( @@ -68,8 +72,8 @@ def superadmin_required(f): error_type="SESSION_INACTIVE" ) - # Get the superadmin - superadmin = session.superadmin + # Get the superadmin by owner_id + superadmin = Superadmin.query.get(session.owner_id) if not superadmin: return api_response( success=False, diff --git a/gatehouse_app/models/__init__.py b/gatehouse_app/models/__init__.py index fe62307..3378ecb 100644 --- a/gatehouse_app/models/__init__.py +++ b/gatehouse_app/models/__init__.py @@ -118,7 +118,6 @@ from gatehouse_app.models.zerotier import ( # noqa: F401 from gatehouse_app.models.superadmin import ( # noqa: F401 Superadmin, SuperadminSession, - SuperadminSessionStatus, ) from gatehouse_app.models.superadmin_audit_log import SuperadminAuditLog # noqa: F401 from gatehouse_app.models.security.user_security_policy import ( # noqa: F401 @@ -186,6 +185,5 @@ __all__ = [ # Superadmin "Superadmin", "SuperadminSession", - "SuperadminSessionStatus", "SuperadminAuditLog", ] diff --git a/gatehouse_app/models/superadmin/__init__.py b/gatehouse_app/models/superadmin/__init__.py index ff9fddc..529038f 100644 --- a/gatehouse_app/models/superadmin/__init__.py +++ b/gatehouse_app/models/superadmin/__init__.py @@ -1,5 +1,5 @@ """Superadmin models.""" from gatehouse_app.models.superadmin.superadmin import Superadmin -from gatehouse_app.models.superadmin.superadmin_session import SuperadminSession, SuperadminSessionStatus +from gatehouse_app.models.user.session import Session as SuperadminSession -__all__ = ["Superadmin", "SuperadminSession", "SuperadminSessionStatus"] +__all__ = ["Superadmin", "SuperadminSession"] diff --git a/gatehouse_app/models/superadmin/superadmin.py b/gatehouse_app/models/superadmin/superadmin.py index 525b410..bdbd495 100644 --- a/gatehouse_app/models/superadmin/superadmin.py +++ b/gatehouse_app/models/superadmin/superadmin.py @@ -23,11 +23,15 @@ class Superadmin(BaseModel): is_active = db.Column(db.Boolean, default=True, nullable=False) last_login_at = db.Column(db.DateTime, nullable=True) - # Relationship to sessions + # Relationship to sessions (unified model, scoped to superadmin owner_type) sessions = db.relationship( - "SuperadminSession", - back_populates="superadmin", - cascade="all, delete-orphan" + "Session", + primaryjoin=( + "and_(Superadmin.id == foreign(Session.owner_id), " + "Session.owner_type == 'superadmin')" + ), + cascade="all, delete-orphan", + lazy="dynamic", ) # Relationship to audit logs diff --git a/gatehouse_app/models/superadmin/superadmin_session.py b/gatehouse_app/models/superadmin/superadmin_session.py deleted file mode 100644 index 8eef79b..0000000 --- a/gatehouse_app/models/superadmin/superadmin_session.py +++ /dev/null @@ -1,80 +0,0 @@ -"""Superadmin session model.""" -import logging -from datetime import datetime, timezone, timedelta - -from gatehouse_app.extensions import db -from gatehouse_app.models.base import BaseModel - - -logger = logging.getLogger(__name__) - - -class SuperadminSessionStatus: - """Session status constants.""" - ACTIVE = "active" - REVOKED = "revoked" - EXPIRED = "expired" - - -class SuperadminSession(BaseModel): - """Session model for superadmin authentication.""" - - __tablename__ = "superadmin_sessions" - - superadmin_id = db.Column( - db.String(36), - db.ForeignKey("superadmins.id"), - nullable=False, - index=True - ) - token = db.Column(db.String(255), unique=True, nullable=False, index=True) - expires_at = db.Column(db.DateTime, nullable=False) - last_activity_at = db.Column( - db.DateTime, - nullable=False, - default=lambda: datetime.now(timezone.utc) - ) - ip_address = db.Column(db.String(45), nullable=True) - user_agent = db.Column(db.Text, nullable=True) - revoked_at = db.Column(db.DateTime, nullable=True) - revoked_reason = db.Column(db.String(255), nullable=True) - - # Relationship - superadmin = db.relationship("Superadmin", back_populates="sessions") - - def __repr__(self): - return f"" - - def is_active(self): - """Check if session is currently active.""" - now = datetime.now(timezone.utc) - expires_at = self.expires_at - if expires_at.tzinfo is None: - expires_at = expires_at.replace(tzinfo=timezone.utc) - return ( - self.deleted_at is None - and self.revoked_at is None - and expires_at > now - ) - - def is_expired(self): - """Check if session has expired.""" - now = datetime.now(timezone.utc) - expires_at = self.expires_at - if expires_at.tzinfo is None: - expires_at = expires_at.replace(tzinfo=timezone.utc) - return now > expires_at - - def revoke(self, reason: str = None): - """Revoke the session.""" - self.revoked_at = datetime.now(timezone.utc) - if reason: - self.revoked_reason = reason - from gatehouse_app import db - db.session.commit() - - def to_dict(self, exclude=None): - """Convert to dictionary, excluding sensitive fields.""" - exclude = exclude or [] - exclude.append("token") - return super().to_dict(exclude=exclude) diff --git a/gatehouse_app/models/user/session.py b/gatehouse_app/models/user/session.py index e6300dd..cf8ee32 100644 --- a/gatehouse_app/models/user/session.py +++ b/gatehouse_app/models/user/session.py @@ -3,15 +3,24 @@ from datetime import datetime, timedelta, timezone from flask import current_app from gatehouse_app.extensions import db from gatehouse_app.models.base import BaseModel -from gatehouse_app.utils.constants import SessionStatus +from gatehouse_app.utils.constants import SessionStatus, SessionType class Session(BaseModel): - """Session model for tracking user sessions.""" + """Session model for tracking user and superadmin sessions.""" __tablename__ = "sessions" - user_id = db.Column(db.String(36), db.ForeignKey("users.id"), nullable=False, index=True) + # Owner discriminator — determines which table the owner_id references + owner_type = db.Column( + db.String(20), nullable=False, default=SessionType.USER, index=True + ) + owner_id = db.Column(db.String(36), nullable=False, index=True) + + # Legacy column kept for backward compatibility during migration; + # new code should use owner_id / owner_type. + user_id = db.Column(db.String(36), db.ForeignKey("users.id"), nullable=True, index=True) + token = db.Column(db.String(255), unique=True, nullable=False, index=True) status = db.Column(db.Enum(SessionStatus), default=SessionStatus.ACTIVE, nullable=False) @@ -34,21 +43,37 @@ class Session(BaseModel): # Relationships user = db.relationship("User", back_populates="sessions") + # Composite index for owner-scoped queries + __table_args__ = ( + db.Index("ix_sessions_owner_type_owner_id", "owner_type", "owner_id"), + ) + + # ---- Convenience properties ------------------------------------------------ + + @property + def is_user(self): + return self.owner_type == SessionType.USER + + @property + def is_superadmin(self): + return self.owner_type == SessionType.SUPERADMIN + + # ---- Core methods ---------------------------------------------------------- + def __repr__(self): - """String representation of Session.""" - return f"" + return f"" def is_active(self): """Check if session is currently active. - Sessions are evaluated against two independent timeouts: + User sessions are evaluated against two independent timeouts: - Idle timeout: expires if no request has been made within SESSION_IDLE_TIMEOUT seconds (default 15 min). - Absolute timeout: expires if SESSION_ABSOLUTE_TIMEOUT seconds - have elapsed since the session was created (default 8 h), - regardless of activity. + have elapsed since the session was created (default 8 h). - A session must satisfy *both* constraints to remain active. + Superadmin sessions use absolute timeout only (no idle timeout). + A session must satisfy *all* applicable constraints to remain active. """ now = datetime.now(timezone.utc) created_at = self.created_at @@ -59,12 +84,21 @@ class Session(BaseModel): if last_activity_at.tzinfo is None: last_activity_at = last_activity_at.replace(tzinfo=timezone.utc) - idle_timeout = current_app.config.get("SESSION_IDLE_TIMEOUT", 900) absolute_timeout = current_app.config.get("SESSION_ABSOLUTE_TIMEOUT", 28800) - - idle_expires_at = last_activity_at + timedelta(seconds=idle_timeout) absolute_expires_at = created_at + timedelta(seconds=absolute_timeout) + if self.is_superadmin: + # Superadmin: absolute timeout only + return ( + self.status == SessionStatus.ACTIVE + and now < absolute_expires_at + and self.deleted_at is None + ) + + # User: idle + absolute timeout + idle_timeout = current_app.config.get("SESSION_IDLE_TIMEOUT", 900) + idle_expires_at = last_activity_at + timedelta(seconds=idle_timeout) + return ( self.status == SessionStatus.ACTIVE and now < idle_expires_at @@ -83,6 +117,8 @@ class Session(BaseModel): capped so that the session never exceeds the absolute lifetime (``created_at + absolute timeout``). + Superadmin sessions only update last_activity_at (no sliding window). + Args: duration_seconds: Override for the idle timeout. When *None* (the common case), the value is read from @@ -90,6 +126,12 @@ class Session(BaseModel): """ now = datetime.now(timezone.utc) + if self.is_superadmin: + # Superadmin: just bump last_activity_at, no sliding window + self.last_activity_at = now + db.session.commit() + return + if duration_seconds is None: duration_seconds = current_app.config.get("SESSION_IDLE_TIMEOUT", 900) diff --git a/gatehouse_app/services/auth_service.py b/gatehouse_app/services/auth_service.py index c662ba8..af438e0 100644 --- a/gatehouse_app/services/auth_service.py +++ b/gatehouse_app/services/auth_service.py @@ -8,7 +8,7 @@ from gatehouse_app.extensions import db, bcrypt from gatehouse_app.models.user.user import User from gatehouse_app.models.auth.authentication_method import AuthenticationMethod from gatehouse_app.models.user.session import Session -from gatehouse_app.utils.constants import AuthMethodType, SessionStatus, UserStatus, AuditAction +from gatehouse_app.utils.constants import AuthMethodType, SessionStatus, SessionType, UserStatus, AuditAction from gatehouse_app.exceptions.auth_exceptions import InvalidCredentialsError, AccountSuspendedError, AccountInactiveError from gatehouse_app.exceptions.validation_exceptions import EmailAlreadyExistsError from gatehouse_app.services.audit_service import AuditService @@ -165,6 +165,8 @@ class AuthService: # Create session session = Session( + owner_type=SessionType.USER, + owner_id=user.id, user_id=user.id, token=token, status=SessionStatus.ACTIVE, diff --git a/gatehouse_app/services/session_service.py b/gatehouse_app/services/session_service.py index e86cd6f..6f3e595 100644 --- a/gatehouse_app/services/session_service.py +++ b/gatehouse_app/services/session_service.py @@ -1,7 +1,7 @@ """Session service.""" from datetime import datetime, timezone from gatehouse_app.models.user.session import Session -from gatehouse_app.utils.constants import SessionStatus +from gatehouse_app.utils.constants import SessionStatus, SessionType class SessionService: @@ -28,18 +28,22 @@ class SessionService: ).first() @staticmethod - def get_user_sessions(user_id, active_only=True): - """ - Get all sessions for a user. + def get_owner_sessions(owner_type, owner_id, active_only=True): + """Get all sessions for an owner (user or superadmin). Args: - user_id: User ID + owner_type: SessionType.USER or SessionType.SUPERADMIN + owner_id: Owner ID active_only: If True, only return active sessions Returns: List of Session instances """ - query = Session.query.filter_by(user_id=user_id, deleted_at=None) + query = Session.query.filter_by( + owner_type=owner_type, + owner_id=owner_id, + deleted_at=None, + ) if active_only: query = query.filter_by(status=SessionStatus.ACTIVE).filter( @@ -49,18 +53,67 @@ class SessionService: return query.all() @staticmethod - def revoke_user_sessions(user_id, reason="User logged out from all devices"): + def get_user_sessions(user_id, active_only=True): + """Get all sessions for a user. + + Args: + user_id: User ID + active_only: If True, only return active sessions + + Returns: + List of Session instances """ - Revoke all active sessions for a user. + return SessionService.get_owner_sessions( + SessionType.USER, user_id, active_only=active_only + ) + + @staticmethod + def get_superadmin_sessions(superadmin_id, active_only=True): + """Get all sessions for a superadmin. + + Args: + superadmin_id: Superadmin ID + active_only: If True, only return active sessions + + Returns: + List of Session instances + """ + return SessionService.get_owner_sessions( + SessionType.SUPERADMIN, superadmin_id, active_only=active_only + ) + + @staticmethod + def revoke_owner_sessions(owner_type, owner_id, reason="Logged out from all devices"): + """Revoke all active sessions for an owner. + + Args: + owner_type: SessionType.USER or SessionType.SUPERADMIN + owner_id: Owner ID + reason: Reason for revocation + """ + sessions = SessionService.get_owner_sessions(owner_type, owner_id, active_only=True) + for session in sessions: + session.revoke(reason=reason) + + @staticmethod + def revoke_user_sessions(user_id, reason="User logged out from all devices"): + """Revoke all active sessions for a user. Args: user_id: User ID reason: Reason for revocation """ - sessions = SessionService.get_user_sessions(user_id, active_only=True) + SessionService.revoke_owner_sessions(SessionType.USER, user_id, reason=reason) - for session in sessions: - session.revoke(reason=reason) + @staticmethod + def revoke_superadmin_sessions(superadmin_id, reason="Superadmin logged out"): + """Revoke all active sessions for a superadmin. + + Args: + superadmin_id: Superadmin ID + reason: Reason for revocation + """ + SessionService.revoke_owner_sessions(SessionType.SUPERADMIN, superadmin_id, reason=reason) @staticmethod def cleanup_expired_sessions(): diff --git a/gatehouse_app/services/superadmin_auth_service.py b/gatehouse_app/services/superadmin_auth_service.py index 31e798b..5d91ebb 100644 --- a/gatehouse_app/services/superadmin_auth_service.py +++ b/gatehouse_app/services/superadmin_auth_service.py @@ -6,7 +6,9 @@ from typing import Optional from flask import request, current_app from gatehouse_app.extensions import db, bcrypt -from gatehouse_app.models.superadmin import Superadmin, SuperadminSession +from gatehouse_app.models.superadmin import Superadmin +from gatehouse_app.models.user.session import Session +from gatehouse_app.utils.constants import SessionType from gatehouse_app.exceptions.auth_exceptions import InvalidCredentialsError @@ -70,15 +72,17 @@ class SuperadminAuthService: duration_seconds: Session duration in seconds (default 8 hours) Returns: - SuperadminSession instance + Session instance """ # Generate secure token token = secrets.token_urlsafe(32) - # Create session - session = SuperadminSession( - superadmin_id=superadmin_id, + # Create session using unified model + session = Session( + owner_type=SessionType.SUPERADMIN, + owner_id=superadmin_id, token=token, + status="active", expires_at=datetime.now(timezone.utc) + timedelta(seconds=duration_seconds), last_activity_at=datetime.now(timezone.utc), ip_address=request.remote_addr, @@ -97,7 +101,9 @@ class SuperadminAuthService: session_id: Session ID to revoke reason: Optional revocation reason """ - session = SuperadminSession.query.get(session_id) + session = Session.query.filter_by( + id=session_id, owner_type=SessionType.SUPERADMIN + ).first() if session: session.revoke(reason=reason) logger.info(f"[SuperadminAuth] Session {session_id} revoked: {reason or 'No reason'}") @@ -111,9 +117,11 @@ class SuperadminAuthService: except_token: Optional token to keep (current session) reason: Optional revocation reason """ - query = SuperadminSession.query.filter_by(superadmin_id=superadmin_id) + query = Session.query.filter_by( + owner_type=SessionType.SUPERADMIN, owner_id=superadmin_id + ) if except_token: - query = query.filter(SuperadminSession.token != except_token) + query = query.filter(Session.token != except_token) sessions = query.all() for session in sessions: diff --git a/gatehouse_app/utils/constants.py b/gatehouse_app/utils/constants.py index e31bcab..c489b1d 100644 --- a/gatehouse_app/utils/constants.py +++ b/gatehouse_app/utils/constants.py @@ -52,6 +52,13 @@ class SessionStatus(str, Enum): REVOKED = "revoked" +class SessionType(str, Enum): + """Session owner type discriminator.""" + + USER = "user" + SUPERADMIN = "superadmin" + + class AuditAction(str, Enum): """Audit log action types.""" diff --git a/migrations/versions/c8d2e4f6a1b3_consolidate_sessions.py b/migrations/versions/c8d2e4f6a1b3_consolidate_sessions.py new file mode 100644 index 0000000..4a0af90 --- /dev/null +++ b/migrations/versions/c8d2e4f6a1b3_consolidate_sessions.py @@ -0,0 +1,122 @@ +"""Consolidate user and superadmin sessions into unified sessions table. + +Revision ID: c8d2e4f6a1b3 +Revises: b7e3f1a92c4d +Create Date: 2026-04-28 00:00:00.000000 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = 'c8d2e4f6a1b3' +down_revision = 'b7e3f1a92c4d' +branch_labels = None +depends_on = None + + +def upgrade(): + # 1. Add new columns (nullable initially for data migration) + op.add_column('sessions', sa.Column('owner_type', sa.String(20), nullable=True)) + op.add_column('sessions', sa.Column('owner_id', sa.String(36), nullable=True)) + + # 2. Backfill existing user sessions: owner_type = 'user', owner_id = user_id + op.execute(""" + UPDATE sessions + SET owner_type = 'user', + owner_id = user_id + WHERE owner_type IS NULL + """) + + # 3. Migrate superadmin sessions into the sessions table + op.execute(""" + INSERT INTO sessions ( + id, owner_type, owner_id, token, status, + ip_address, user_agent, device_info, + expires_at, last_activity_at, revoked_at, revoked_reason, + is_compliance_only, created_at, updated_at, deleted_at + ) + SELECT + id, 'superadmin', superadmin_id, token, 'active', + ip_address, user_agent, NULL, + expires_at, last_activity_at, revoked_at, revoked_reason, + FALSE, created_at, updated_at, deleted_at + FROM superadmin_sessions + """) + + # 4. Make owner_type and owner_id NOT NULL + op.alter_column('sessions', 'owner_type', nullable=False) + op.alter_column('sessions', 'owner_id', nullable=False) + + # 5. Make user_id nullable (no longer the sole owner reference) + op.alter_column('sessions', 'user_id', nullable=True) + + # 6. Create indexes for efficient owner-scoped queries + op.create_index( + 'ix_sessions_owner_type_owner_id', + 'sessions', + ['owner_type', 'owner_id'] + ) + op.create_index( + 'ix_sessions_owner_type', + 'sessions', + ['owner_type'] + ) + op.create_index( + 'ix_sessions_owner_id', + 'sessions', + ['owner_id'] + ) + + # 7. Drop the now-redundant superadmin_sessions table + op.drop_table('superadmin_sessions') + + +def downgrade(): + # 1. Recreate superadmin_sessions table + op.create_table( + 'superadmin_sessions', + sa.Column('id', sa.String(36), primary_key=True), + sa.Column('superadmin_id', sa.String(36), sa.ForeignKey('superadmins.id'), nullable=False, index=True), + sa.Column('token', sa.String(255), unique=True, nullable=False, index=True), + sa.Column('expires_at', sa.DateTime, nullable=False), + sa.Column('last_activity_at', sa.DateTime, nullable=False), + sa.Column('ip_address', sa.String(45), nullable=True), + sa.Column('user_agent', sa.Text, nullable=True), + sa.Column('revoked_at', sa.DateTime, nullable=True), + sa.Column('revoked_reason', sa.String(255), nullable=True), + sa.Column('created_at', sa.DateTime, nullable=False), + sa.Column('updated_at', sa.DateTime, nullable=False), + sa.Column('deleted_at', sa.DateTime, nullable=True), + ) + + # 2. Move superadmin sessions back to superadmin_sessions + op.execute(""" + INSERT INTO superadmin_sessions ( + id, superadmin_id, token, expires_at, last_activity_at, + ip_address, user_agent, revoked_at, revoked_reason, + created_at, updated_at, deleted_at + ) + SELECT + id, owner_id, token, expires_at, last_activity_at, + ip_address, user_agent, revoked_at, revoked_reason, + created_at, updated_at, deleted_at + FROM sessions + WHERE owner_type = 'superadmin' + """) + + # 3. Remove superadmin sessions from sessions table + op.execute("DELETE FROM sessions WHERE owner_type = 'superadmin'") + + # 4. Drop indexes + op.drop_index('ix_sessions_owner_id', table_name='sessions') + op.drop_index('ix_sessions_owner_type', table_name='sessions') + op.drop_index('ix_sessions_owner_type_owner_id', table_name='sessions') + + # 5. Remove new columns + op.drop_column('sessions', 'owner_id') + op.drop_column('sessions', 'owner_type') + + # 6. Make user_id NOT NULL again + op.alter_column('sessions', 'user_id', nullable=False) diff --git a/tests/integration/test_superadmin_session_timeouts.py b/tests/integration/test_superadmin_session_timeouts.py new file mode 100644 index 0000000..21410ae --- /dev/null +++ b/tests/integration/test_superadmin_session_timeouts.py @@ -0,0 +1,186 @@ +"""Superadmin session timeout integration tests. + +Validates the absolute-only timeout policy for superadmin sessions. +Superadmin sessions do NOT have idle timeout — only absolute timeout. +""" +import pytest +import uuid +from datetime import datetime, timedelta, timezone + +from tests.integration.client.base import ApiError + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def assert_success(response: dict, message_contains: str = "") -> dict: + """Assert that an api_response-wrapped payload succeeded.""" + data = response.get("data", {}) + assert response.get("success") is not False, ( + f"Expected success but got error: {response.get('message')}" + ) + if message_contains: + assert message_contains.lower() in response.get("message", "").lower(), ( + f"Expected message to contain '{message_contains}' but got: {response.get('message')}" + ) + return data + + +def _get_session_row(integration_app, token: str): + """Look up the Session model row for a given bearer token.""" + from gatehouse_app.models.user.session import Session + with integration_app.app_context(): + return Session.query.filter_by(token=token).first() + + +def _touch_session(integration_app, session_id: str, **updates): + """Directly update columns on a Session row. + + Only use this to simulate the passage of time — never to assert + internal state. + """ + from gatehouse_app.models.user.session import Session + with integration_app.app_context(): + sess = Session.query.get(session_id) + for attr, value in updates.items(): + setattr(sess, attr, value) + from gatehouse_app import db + db.session.commit() + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def superadmin_credentials(integration_app): + """Create a superadmin and return login credentials.""" + from gatehouse_app.services.superadmin_auth_service import SuperadminAuthService + + email = f"admin_{uuid.uuid4().hex[:8]}@gatehouse.local" + password = "SuperAdmin123!" + + with integration_app.app_context(): + sa = SuperadminAuthService.create_superadmin( + email=email, + credential=password, + full_name="Test Superadmin", + ) + return {"id": str(sa.id), "email": email, "password": password} + + +@pytest.fixture +def logged_in_superadmin(integration_client, superadmin_credentials, integration_app): + """Log in as superadmin and return session metadata. + + Returns dict with ``superadmin``, ``token``, ``session_id``, ``session_row``. + """ + creds = superadmin_credentials + resp = integration_client.post( + "/api/v1/superadmin/auth/login", + data={"email": creds["email"], "password": creds["password"]}, + ) + data = assert_success(resp) + token = data["token"] + + session_row = _get_session_row(integration_app, token) + assert session_row is not None, "Session row should exist after superadmin login" + + return { + "superadmin": creds, + "token": token, + "session_id": session_row.id, + "session_row": session_row, + } + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestSuperadminSessionTimeouts: + """Absolute-only timeout behavior for superadmin sessions.""" + + def test_superadmin_session_valid_before_timeout( + self, integration_client, logged_in_superadmin, + ): + """SA-SESS-01 — Fresh superadmin session is accepted.""" + integration_client.set_token(logged_in_superadmin["token"]) + result = integration_client.get("/api/v1/superadmin/auth/me") + data = assert_success(result) + assert "superadmin" in data + + def test_absolute_timeout_rejects_superadmin( + self, integration_client, logged_in_superadmin, integration_app, + ): + """SA-SESS-02 — Superadmin session rejected after absolute timeout. + + Push ``created_at`` far into the past. The session must be + rejected even though ``last_activity_at`` is fresh. + """ + _touch_session( + integration_app, + logged_in_superadmin["session_id"], + created_at=datetime.now(timezone.utc) - timedelta(days=1), + last_activity_at=datetime.now(timezone.utc), + ) + + integration_client.set_token(logged_in_superadmin["token"]) + with pytest.raises(ApiError) as exc_info: + integration_client.get("/api/v1/superadmin/auth/me") + + assert exc_info.value.status_code == 401 + + def test_idle_timeout_does_NOT_reject_superadmin( + self, integration_client, logged_in_superadmin, integration_app, + ): + """SA-SESS-03 — Superadmin sessions have NO idle timeout. + + Push ``last_activity_at`` far into the past but keep + ``created_at`` recent. The session should still be valid + because superadmin sessions only use absolute timeout. + """ + _touch_session( + integration_app, + logged_in_superadmin["session_id"], + last_activity_at=datetime.now(timezone.utc) - timedelta(hours=1), + ) + + integration_client.set_token(logged_in_superadmin["token"]) + result = integration_client.get("/api/v1/superadmin/auth/me") + data = assert_success(result) + assert "superadmin" in data + + def test_revoked_superadmin_session_rejected( + self, integration_client, logged_in_superadmin, + ): + """SA-SESS-04 — Revoked superadmin session is rejected.""" + integration_client.set_token(logged_in_superadmin["token"]) + + # Logout revokes the session + integration_client.post("/api/v1/superadmin/auth/logout") + integration_client.clear_token() + + # Try using the old token + integration_client.set_token(logged_in_superadmin["token"]) + with pytest.raises(ApiError) as exc_info: + integration_client.get("/api/v1/superadmin/auth/me") + + assert exc_info.value.status_code == 401 + + def test_superadmin_session_has_owner_type( + self, integration_app, logged_in_superadmin, + ): + """SA-SESS-05 — Superadmin session row has owner_type='superadmin'.""" + from gatehouse_app.models.user.session import Session + from gatehouse_app.utils.constants import SessionType + + with integration_app.app_context(): + sess = Session.query.get(logged_in_superadmin["session_id"]) + assert sess is not None + assert sess.owner_type == SessionType.SUPERADMIN + assert sess.owner_id == logged_in_superadmin["superadmin"]["id"]