Files

168 lines
6.1 KiB
Python

"""Session model."""
from datetime import datetime, timedelta, timezone
from flask import current_app
from gatehouse_app.extensions import db
from gatehouse_app.models.base import BaseModel
from gatehouse_app.utils.constants import SessionStatus, SessionType
class Session(BaseModel):
"""Session model for tracking user and superadmin sessions."""
__tablename__ = "sessions"
# Owner discriminator — determines which table the owner_id references
owner_type = db.Column(
db.String(20), nullable=False, default=SessionType.USER, index=True
)
owner_id = db.Column(db.String(36), nullable=False, index=True)
# Legacy column kept for backward compatibility during migration;
# new code should use owner_id / owner_type.
user_id = db.Column(db.String(36), db.ForeignKey("users.id"), nullable=True, index=True)
token = db.Column(db.String(255), unique=True, nullable=False, index=True)
status = db.Column(db.Enum(SessionStatus), default=SessionStatus.ACTIVE, nullable=False)
# Session metadata
ip_address = db.Column(db.String(45), nullable=True)
user_agent = db.Column(db.Text, nullable=True)
device_info = db.Column(db.JSON, nullable=True)
# Timing
expires_at = db.Column(db.DateTime, nullable=False)
last_activity_at = db.Column(
db.DateTime, nullable=False, default=lambda: datetime.now(timezone.utc)
)
revoked_at = db.Column(db.DateTime, nullable=True)
revoked_reason = db.Column(db.String(255), nullable=True)
# Compliance session flag
is_compliance_only = db.Column(db.Boolean, nullable=False, default=False)
# Relationships
user = db.relationship("User", back_populates="sessions")
# Composite index for owner-scoped queries
__table_args__ = (
db.Index("ix_sessions_owner_type_owner_id", "owner_type", "owner_id"),
)
# ---- Convenience properties ------------------------------------------------
@property
def is_user(self):
return self.owner_type == SessionType.USER
@property
def is_superadmin(self):
return self.owner_type == SessionType.SUPERADMIN
# ---- Core methods ----------------------------------------------------------
def __repr__(self):
return f"<Session owner_type={self.owner_type} owner_id={self.owner_id} status={self.status}>"
def is_active(self):
"""Check if session is currently active.
User sessions are evaluated against two independent timeouts:
- Idle timeout: expires if no request has been made within
SESSION_IDLE_TIMEOUT seconds (default 15 min).
- Absolute timeout: expires if SESSION_ABSOLUTE_TIMEOUT seconds
have elapsed since the session was created (default 8 h).
Superadmin sessions use absolute timeout only (no idle timeout).
A session must satisfy *all* applicable constraints to remain active.
"""
now = datetime.now(timezone.utc)
created_at = self.created_at
last_activity_at = self.last_activity_at
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
if last_activity_at.tzinfo is None:
last_activity_at = last_activity_at.replace(tzinfo=timezone.utc)
absolute_timeout = current_app.config.get("SESSION_ABSOLUTE_TIMEOUT", 28800)
absolute_expires_at = created_at + timedelta(seconds=absolute_timeout)
if self.is_superadmin:
# Superadmin: absolute timeout only
return (
self.status == SessionStatus.ACTIVE
and now < absolute_expires_at
and self.deleted_at is None
)
# User: idle + absolute timeout
idle_timeout = current_app.config.get("SESSION_IDLE_TIMEOUT", 900)
idle_expires_at = last_activity_at + timedelta(seconds=idle_timeout)
return (
self.status == SessionStatus.ACTIVE
and now < idle_expires_at
and now < absolute_expires_at
and self.deleted_at is None
)
def is_expired(self):
"""Check if session has expired (either idle or absolute)."""
return not self.is_active() and self.status != SessionStatus.REVOKED
def refresh(self, duration_seconds: int = None):
"""Extend the session expiration using a sliding window.
The new ``expires_at`` is set to *now + idle timeout*, but is
capped so that the session never exceeds the absolute lifetime
(``created_at + absolute timeout``).
Superadmin sessions only update last_activity_at (no sliding window).
Args:
duration_seconds: Override for the idle timeout. When *None*
(the common case), the value is read from
``SESSION_IDLE_TIMEOUT`` in the Flask config.
"""
now = datetime.now(timezone.utc)
if self.is_superadmin:
# Superadmin: just bump last_activity_at, no sliding window
self.last_activity_at = now
db.session.commit()
return
if duration_seconds is None:
duration_seconds = current_app.config.get("SESSION_IDLE_TIMEOUT", 900)
absolute_timeout = current_app.config.get("SESSION_ABSOLUTE_TIMEOUT", 28800)
idle_expires_at = now + timedelta(seconds=duration_seconds)
created_at = self.created_at
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
absolute_expires_at = created_at + timedelta(seconds=absolute_timeout)
self.expires_at = min(idle_expires_at, absolute_expires_at)
self.last_activity_at = now
db.session.commit()
def revoke(self, reason: str = None):
"""Revoke the session.
Args:
reason: Optional reason for revocation
"""
self.status = SessionStatus.REVOKED
self.revoked_at = datetime.now(timezone.utc)
if reason:
self.revoked_reason = reason
db.session.commit()
def to_dict(self, exclude=None):
"""Convert to dictionary, excluding sensitive fields."""
exclude = exclude or []
exclude.append("token")
return super().to_dict(exclude=exclude)