"""Superadmin authentication service.""" import logging import secrets from datetime import datetime, timedelta, timezone from typing import Optional from flask import request, current_app from gatehouse_app.extensions import db, bcrypt from gatehouse_app.models.superadmin import Superadmin from gatehouse_app.models.user.session import Session from gatehouse_app.utils.constants import SessionType from gatehouse_app.exceptions.auth_exceptions import InvalidCredentialsError logger = logging.getLogger(__name__) class SuperadminAuthService: """Service for superadmin authentication operations.""" @staticmethod def authenticate(email, credentials): """Authenticate superadmin with email/password credentials. Args: email: Superadmin email credentials: Plain text credential Returns: Superadmin instance if authentication succeeds Raises: InvalidCredentialsError: If credentials are invalid or account is disabled """ # Find superadmin by email superadmin = Superadmin.query.filter_by(email=email.lower()).first() if not superadmin: logger.warning(f"[SuperadminAuth] Login attempt for non-existent email: {email}") raise InvalidCredentialsError() # Check if account is active if not superadmin.is_active: logger.warning(f"[SuperadminAuth] Login attempt for disabled account: {email}") raise InvalidCredentialsError("Account is disabled") # Check credential if not superadmin.password_hash: logger.warning(f"[SuperadminAuth] Login attempt for account with no credential set: {email}") raise InvalidCredentialsError() # Verify credential password_valid = bcrypt.check_password_hash(superadmin.password_hash, credentials) if not password_valid: logger.warning(f"[SuperadminAuth] Invalid password for: {email}") raise InvalidCredentialsError() # Update last login superadmin.last_login_at = datetime.now(timezone.utc) db.session.commit() logger.info(f"[SuperadminAuth] Successful login for: {email}") return superadmin @staticmethod def create_session(superadmin_id, duration_seconds=28800): """Create a new session for superadmin. Args: superadmin_id: Superadmin ID duration_seconds: Session duration in seconds (default 8 hours) Returns: Session instance """ # Generate secure token token = secrets.token_urlsafe(32) # Create session using unified model session = Session( owner_type=SessionType.SUPERADMIN, owner_id=superadmin_id, token=token, status="active", expires_at=datetime.now(timezone.utc) + timedelta(seconds=duration_seconds), last_activity_at=datetime.now(timezone.utc), ip_address=request.remote_addr, user_agent=request.headers.get("User-Agent"), ) session.save() logger.info(f"[SuperadminAuth] Session created for superadmin_id={superadmin_id}") return session @staticmethod def revoke_session(session_id, reason=None): """Revoke a superadmin session. Args: session_id: Session ID to revoke reason: Optional revocation reason """ session = Session.query.filter_by( id=session_id, owner_type=SessionType.SUPERADMIN ).first() if session: session.revoke(reason=reason) logger.info(f"[SuperadminAuth] Session {session_id} revoked: {reason or 'No reason'}") @staticmethod def revoke_all_sessions(superadmin_id, except_token=None, reason=None): """Revoke all sessions for a superadmin. Args: superadmin_id: Superadmin ID except_token: Optional token to keep (current session) reason: Optional revocation reason """ query = Session.query.filter_by( owner_type=SessionType.SUPERADMIN, owner_id=superadmin_id ) if except_token: query = query.filter(Session.token != except_token) sessions = query.all() for session in sessions: session.revoke(reason=reason) logger.info(f"[SuperadminAuth] Revoked {len(sessions)} sessions for superadmin_id={superadmin_id}") return len(sessions) @staticmethod def create_emergency_access(superadmin_id, target_user_id, reason, duration_minutes=15): """Create emergency access to a user's account. This creates a special emergency session that grants temporary elevated access. Args: superadmin_id: Superadmin ID initiating emergency access target_user_id: User ID to access reason: Reason for emergency access duration_minutes: Duration of emergency access in minutes Returns: Dictionary with emergency session info """ from gatehouse_app.models.user.user import User from gatehouse_app.services.auth_service import AuthService from gatehouse_app.services.audit_service import AuditService # Verify target user exists target_user = User.query.get(target_user_id) if not target_user: raise ValueError(f"Target user not found: {target_user_id}") # Create emergency session for the target user emergency_session = AuthService.create_session( user=target_user, duration_seconds=duration_minutes * 60, is_compliance_only=False ) # Log the emergency access logger.warning( f"[SuperadminAuth] EMERGENCY ACCESS: superadmin_id={superadmin_id} " f"accessed user_id={target_user_id} reason={reason}" ) return { "session": emergency_session, "expires_at": emergency_session.expires_at, "reason": reason, "target_user_id": target_user_id, } @staticmethod def hash_password(plain_credential): """Hash a credential for storage. Args: plain_credential: Plain text credential Returns: Hashed credential string """ return bcrypt.generate_password_hash(plain_credential).decode("utf-8") @staticmethod def create_superadmin(email, credential, full_name=None): """Create a new superadmin. Args: email: Superadmin email credential: Plain text credential full_name: Optional full name Returns: Superadmin instance """ # Check if email already exists existing = Superadmin.query.filter_by(email=email.lower()).first() if existing: raise ValueError(f"Superadmin with email {email} already exists") # Hash credential password_hash = bcrypt.generate_password_hash(credential).decode("utf-8") # Create superadmin superadmin = Superadmin( email=email.lower(), password_hash=password_hash, full_name=full_name, is_active=True, ) superadmin.save() logger.info(f"[SuperadminAuth] Created new superadmin: {email}") return superadmin @staticmethod def update_superadmin(superadmin_id, **kwargs): """Update superadmin details. Args: superadmin_id: Superadmin ID **kwargs: Fields to update (email, full_name, is_active, credential) Returns: Updated Superadmin instance """ superadmin = Superadmin.query.get(superadmin_id) if not superadmin: raise ValueError(f"Superadmin not found: {superadmin_id}") # Handle credential update if 'password' in kwargs: kwargs['password_hash'] = bcrypt.generate_password_hash(kwargs.pop('password')).decode("utf-8") # Update fields for key, value in kwargs.items(): if hasattr(superadmin, key): setattr(superadmin, key, value) superadmin.save() logger.info(f"[SuperadminAuth] Updated superadmin_id={superadmin_id}") return superadmin