2026-01-08 01:00:26 +10:30
|
|
|
"""Session model."""
|
2026-01-14 18:06:17 +10:30
|
|
|
from datetime import datetime, timedelta, timezone
|
2026-04-26 18:12:37 +09:30
|
|
|
from flask import current_app
|
2026-01-15 03:40:29 +10:30
|
|
|
from gatehouse_app.extensions import db
|
|
|
|
|
from gatehouse_app.models.base import BaseModel
|
2026-04-28 20:54:15 +09:30
|
|
|
from gatehouse_app.utils.constants import SessionStatus, SessionType
|
2026-01-08 01:00:26 +10:30
|
|
|
|
|
|
|
|
|
|
|
|
|
class Session(BaseModel):
|
2026-04-28 20:54:15 +09:30
|
|
|
"""Session model for tracking user and superadmin sessions."""
|
2026-01-08 01:00:26 +10:30
|
|
|
|
|
|
|
|
__tablename__ = "sessions"
|
|
|
|
|
|
2026-04-28 20:54:15 +09:30
|
|
|
# Owner discriminator — determines which table the owner_id references
|
|
|
|
|
owner_type = db.Column(
|
|
|
|
|
db.String(20), nullable=False, default=SessionType.USER, index=True
|
|
|
|
|
)
|
|
|
|
|
owner_id = db.Column(db.String(36), nullable=False, index=True)
|
|
|
|
|
|
|
|
|
|
# Legacy column kept for backward compatibility during migration;
|
|
|
|
|
# new code should use owner_id / owner_type.
|
|
|
|
|
user_id = db.Column(db.String(36), db.ForeignKey("users.id"), nullable=True, index=True)
|
|
|
|
|
|
2026-01-08 01:00:26 +10:30
|
|
|
token = db.Column(db.String(255), unique=True, nullable=False, index=True)
|
|
|
|
|
status = db.Column(db.Enum(SessionStatus), default=SessionStatus.ACTIVE, nullable=False)
|
|
|
|
|
|
|
|
|
|
# Session metadata
|
|
|
|
|
ip_address = db.Column(db.String(45), nullable=True)
|
|
|
|
|
user_agent = db.Column(db.Text, nullable=True)
|
|
|
|
|
device_info = db.Column(db.JSON, nullable=True)
|
|
|
|
|
|
|
|
|
|
# Timing
|
|
|
|
|
expires_at = db.Column(db.DateTime, nullable=False)
|
2026-03-01 12:40:48 +05:45
|
|
|
last_activity_at = db.Column(
|
|
|
|
|
db.DateTime, nullable=False, default=lambda: datetime.now(timezone.utc)
|
|
|
|
|
)
|
2026-01-08 01:00:26 +10:30
|
|
|
revoked_at = db.Column(db.DateTime, nullable=True)
|
|
|
|
|
revoked_reason = db.Column(db.String(255), nullable=True)
|
|
|
|
|
|
2026-01-16 17:31:20 +10:30
|
|
|
# Compliance session flag
|
|
|
|
|
is_compliance_only = db.Column(db.Boolean, nullable=False, default=False)
|
|
|
|
|
|
2026-01-08 01:00:26 +10:30
|
|
|
# Relationships
|
|
|
|
|
user = db.relationship("User", back_populates="sessions")
|
|
|
|
|
|
2026-04-28 20:54:15 +09:30
|
|
|
# Composite index for owner-scoped queries
|
|
|
|
|
__table_args__ = (
|
|
|
|
|
db.Index("ix_sessions_owner_type_owner_id", "owner_type", "owner_id"),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# ---- Convenience properties ------------------------------------------------
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def is_user(self):
|
|
|
|
|
return self.owner_type == SessionType.USER
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def is_superadmin(self):
|
|
|
|
|
return self.owner_type == SessionType.SUPERADMIN
|
|
|
|
|
|
|
|
|
|
# ---- Core methods ----------------------------------------------------------
|
|
|
|
|
|
2026-01-08 01:00:26 +10:30
|
|
|
def __repr__(self):
|
2026-04-28 20:54:15 +09:30
|
|
|
return f"<Session owner_type={self.owner_type} owner_id={self.owner_id} status={self.status}>"
|
2026-01-08 01:00:26 +10:30
|
|
|
|
|
|
|
|
def is_active(self):
|
2026-04-26 18:12:37 +09:30
|
|
|
"""Check if session is currently active.
|
|
|
|
|
|
2026-04-28 20:54:15 +09:30
|
|
|
User sessions are evaluated against two independent timeouts:
|
2026-04-26 18:12:37 +09:30
|
|
|
- Idle timeout: expires if no request has been made within
|
|
|
|
|
SESSION_IDLE_TIMEOUT seconds (default 15 min).
|
|
|
|
|
- Absolute timeout: expires if SESSION_ABSOLUTE_TIMEOUT seconds
|
2026-04-28 20:54:15 +09:30
|
|
|
have elapsed since the session was created (default 8 h).
|
2026-04-26 18:12:37 +09:30
|
|
|
|
2026-04-28 20:54:15 +09:30
|
|
|
Superadmin sessions use absolute timeout only (no idle timeout).
|
|
|
|
|
A session must satisfy *all* applicable constraints to remain active.
|
2026-04-26 18:12:37 +09:30
|
|
|
"""
|
2026-01-14 18:06:17 +10:30
|
|
|
now = datetime.now(timezone.utc)
|
2026-04-26 18:12:37 +09:30
|
|
|
created_at = self.created_at
|
|
|
|
|
last_activity_at = self.last_activity_at
|
|
|
|
|
|
|
|
|
|
if created_at.tzinfo is None:
|
|
|
|
|
created_at = created_at.replace(tzinfo=timezone.utc)
|
|
|
|
|
if last_activity_at.tzinfo is None:
|
|
|
|
|
last_activity_at = last_activity_at.replace(tzinfo=timezone.utc)
|
|
|
|
|
|
|
|
|
|
absolute_timeout = current_app.config.get("SESSION_ABSOLUTE_TIMEOUT", 28800)
|
2026-04-28 20:54:15 +09:30
|
|
|
absolute_expires_at = created_at + timedelta(seconds=absolute_timeout)
|
|
|
|
|
|
|
|
|
|
if self.is_superadmin:
|
|
|
|
|
# Superadmin: absolute timeout only
|
|
|
|
|
return (
|
|
|
|
|
self.status == SessionStatus.ACTIVE
|
|
|
|
|
and now < absolute_expires_at
|
|
|
|
|
and self.deleted_at is None
|
|
|
|
|
)
|
2026-04-26 18:12:37 +09:30
|
|
|
|
2026-04-28 20:54:15 +09:30
|
|
|
# User: idle + absolute timeout
|
|
|
|
|
idle_timeout = current_app.config.get("SESSION_IDLE_TIMEOUT", 900)
|
2026-04-26 18:12:37 +09:30
|
|
|
idle_expires_at = last_activity_at + timedelta(seconds=idle_timeout)
|
|
|
|
|
|
2026-01-08 01:00:26 +10:30
|
|
|
return (
|
|
|
|
|
self.status == SessionStatus.ACTIVE
|
2026-04-26 18:12:37 +09:30
|
|
|
and now < idle_expires_at
|
|
|
|
|
and now < absolute_expires_at
|
2026-01-08 01:00:26 +10:30
|
|
|
and self.deleted_at is None
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def is_expired(self):
|
2026-04-26 18:12:37 +09:30
|
|
|
"""Check if session has expired (either idle or absolute)."""
|
|
|
|
|
return not self.is_active() and self.status != SessionStatus.REVOKED
|
2026-01-08 01:00:26 +10:30
|
|
|
|
2026-04-26 18:12:37 +09:30
|
|
|
def refresh(self, duration_seconds: int = None):
|
|
|
|
|
"""Extend the session expiration using a sliding window.
|
|
|
|
|
|
|
|
|
|
The new ``expires_at`` is set to *now + idle timeout*, but is
|
|
|
|
|
capped so that the session never exceeds the absolute lifetime
|
|
|
|
|
(``created_at + absolute timeout``).
|
2026-01-08 01:00:26 +10:30
|
|
|
|
2026-04-28 20:54:15 +09:30
|
|
|
Superadmin sessions only update last_activity_at (no sliding window).
|
|
|
|
|
|
2026-01-08 01:00:26 +10:30
|
|
|
Args:
|
2026-04-26 18:12:37 +09:30
|
|
|
duration_seconds: Override for the idle timeout. When *None*
|
|
|
|
|
(the common case), the value is read from
|
|
|
|
|
``SESSION_IDLE_TIMEOUT`` in the Flask config.
|
2026-01-08 01:00:26 +10:30
|
|
|
"""
|
2026-04-26 18:12:37 +09:30
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
|
|
2026-04-28 20:54:15 +09:30
|
|
|
if self.is_superadmin:
|
|
|
|
|
# Superadmin: just bump last_activity_at, no sliding window
|
|
|
|
|
self.last_activity_at = now
|
|
|
|
|
db.session.commit()
|
|
|
|
|
return
|
|
|
|
|
|
2026-04-26 18:12:37 +09:30
|
|
|
if duration_seconds is None:
|
|
|
|
|
duration_seconds = current_app.config.get("SESSION_IDLE_TIMEOUT", 900)
|
|
|
|
|
|
|
|
|
|
absolute_timeout = current_app.config.get("SESSION_ABSOLUTE_TIMEOUT", 28800)
|
|
|
|
|
|
|
|
|
|
idle_expires_at = now + timedelta(seconds=duration_seconds)
|
|
|
|
|
|
|
|
|
|
created_at = self.created_at
|
|
|
|
|
if created_at.tzinfo is None:
|
|
|
|
|
created_at = created_at.replace(tzinfo=timezone.utc)
|
|
|
|
|
absolute_expires_at = created_at + timedelta(seconds=absolute_timeout)
|
|
|
|
|
|
|
|
|
|
self.expires_at = min(idle_expires_at, absolute_expires_at)
|
|
|
|
|
self.last_activity_at = now
|
2026-01-08 01:00:26 +10:30
|
|
|
db.session.commit()
|
|
|
|
|
|
2026-03-01 12:40:48 +05:45
|
|
|
def revoke(self, reason: str = None):
|
|
|
|
|
"""Revoke the session.
|
2026-01-08 01:00:26 +10:30
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
reason: Optional reason for revocation
|
|
|
|
|
"""
|
|
|
|
|
self.status = SessionStatus.REVOKED
|
2026-01-14 18:06:17 +10:30
|
|
|
self.revoked_at = datetime.now(timezone.utc)
|
2026-01-08 01:00:26 +10:30
|
|
|
if reason:
|
|
|
|
|
self.revoked_reason = reason
|
|
|
|
|
db.session.commit()
|
|
|
|
|
|
|
|
|
|
def to_dict(self, exclude=None):
|
|
|
|
|
"""Convert to dictionary, excluding sensitive fields."""
|
|
|
|
|
exclude = exclude or []
|
|
|
|
|
exclude.append("token")
|
|
|
|
|
return super().to_dict(exclude=exclude)
|