Files
gatehouse-api/gatehouse_app/services/superadmin_auth_service.py
T

240 lines
8.3 KiB
Python

"""Superadmin authentication service."""
import logging
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional
from flask import request, current_app
from gatehouse_app.extensions import db, bcrypt
from gatehouse_app.models.superadmin import Superadmin, SuperadminSession
from gatehouse_app.exceptions.auth_exceptions import InvalidCredentialsError
logger = logging.getLogger(__name__)
class SuperadminAuthService:
"""Service for superadmin authentication operations."""
@staticmethod
def authenticate(email, credentials):
"""Authenticate superadmin with email/password credentials.
Args:
email: Superadmin email
credentials: Plain text credential
Returns:
Superadmin instance if authentication succeeds
Raises:
InvalidCredentialsError: If credentials are invalid or account is disabled
"""
# Find superadmin by email
superadmin = Superadmin.query.filter_by(email=email.lower()).first()
if not superadmin:
logger.warning(f"[SuperadminAuth] Login attempt for non-existent email: {email}")
raise InvalidCredentialsError()
# Check if account is active
if not superadmin.is_active:
logger.warning(f"[SuperadminAuth] Login attempt for disabled account: {email}")
raise InvalidCredentialsError("Account is disabled")
# Check credential
if not superadmin.password_hash:
logger.warning(f"[SuperadminAuth] Login attempt for account with no credential set: {email}")
raise InvalidCredentialsError()
# Verify credential
password_valid = bcrypt.check_password_hash(superadmin.password_hash, credentials)
if not password_valid:
logger.warning(f"[SuperadminAuth] Invalid password for: {email}")
raise InvalidCredentialsError()
# Update last login
superadmin.last_login_at = datetime.now(timezone.utc)
db.session.commit()
logger.info(f"[SuperadminAuth] Successful login for: {email}")
return superadmin
@staticmethod
def create_session(superadmin_id, duration_seconds=28800):
"""Create a new session for superadmin.
Args:
superadmin_id: Superadmin ID
duration_seconds: Session duration in seconds (default 8 hours)
Returns:
SuperadminSession instance
"""
# Generate secure token
token = secrets.token_urlsafe(32)
# Create session
session = SuperadminSession(
superadmin_id=superadmin_id,
token=token,
expires_at=datetime.now(timezone.utc) + timedelta(seconds=duration_seconds),
last_activity_at=datetime.now(timezone.utc),
ip_address=request.remote_addr,
user_agent=request.headers.get("User-Agent"),
)
session.save()
logger.info(f"[SuperadminAuth] Session created for superadmin_id={superadmin_id}")
return session
@staticmethod
def revoke_session(session_id, reason=None):
"""Revoke a superadmin session.
Args:
session_id: Session ID to revoke
reason: Optional revocation reason
"""
session = SuperadminSession.query.get(session_id)
if session:
session.revoke(reason=reason)
logger.info(f"[SuperadminAuth] Session {session_id} revoked: {reason or 'No reason'}")
@staticmethod
def revoke_all_sessions(superadmin_id, except_token=None, reason=None):
"""Revoke all sessions for a superadmin.
Args:
superadmin_id: Superadmin ID
except_token: Optional token to keep (current session)
reason: Optional revocation reason
"""
query = SuperadminSession.query.filter_by(superadmin_id=superadmin_id)
if except_token:
query = query.filter(SuperadminSession.token != except_token)
sessions = query.all()
for session in sessions:
session.revoke(reason=reason)
logger.info(f"[SuperadminAuth] Revoked {len(sessions)} sessions for superadmin_id={superadmin_id}")
return len(sessions)
@staticmethod
def create_emergency_access(superadmin_id, target_user_id, reason, duration_minutes=15):
"""Create emergency access to a user's account.
This creates a special emergency session that grants temporary elevated access.
Args:
superadmin_id: Superadmin ID initiating emergency access
target_user_id: User ID to access
reason: Reason for emergency access
duration_minutes: Duration of emergency access in minutes
Returns:
Dictionary with emergency session info
"""
from gatehouse_app.models.user.user import User
from gatehouse_app.services.session_service import SessionService
from gatehouse_app.services.audit_service import AuditService
# Verify target user exists
target_user = User.query.get(target_user_id)
if not target_user:
raise ValueError(f"Target user not found: {target_user_id}")
# Create emergency session for the target user
emergency_session = SessionService.create_session(
user=target_user,
duration_seconds=duration_minutes * 60,
is_compliance_only=False
)
# Log the emergency access
logger.warning(
f"[SuperadminAuth] EMERGENCY ACCESS: superadmin_id={superadmin_id} "
f"accessed user_id={target_user_id} reason={reason}"
)
return {
"session": emergency_session,
"expires_at": emergency_session.expires_at,
"reason": reason,
"target_user_id": target_user_id,
}
@staticmethod
def hash_password(plain_credential):
"""Hash a credential for storage.
Args:
plain_credential: Plain text credential
Returns:
Hashed credential string
"""
return bcrypt.generate_password_hash(plain_credential).decode("utf-8")
@staticmethod
def create_superadmin(email, credential, full_name=None):
"""Create a new superadmin.
Args:
email: Superadmin email
credential: Plain text credential
full_name: Optional full name
Returns:
Superadmin instance
"""
# Check if email already exists
existing = Superadmin.query.filter_by(email=email.lower()).first()
if existing:
raise ValueError(f"Superadmin with email {email} already exists")
# Hash credential
password_hash = bcrypt.generate_password_hash(credential).decode("utf-8")
# Create superadmin
superadmin = Superadmin(
email=email.lower(),
password_hash=password_hash,
full_name=full_name,
is_active=True,
)
superadmin.save()
logger.info(f"[SuperadminAuth] Created new superadmin: {email}")
return superadmin
@staticmethod
def update_superadmin(superadmin_id, **kwargs):
"""Update superadmin details.
Args:
superadmin_id: Superadmin ID
**kwargs: Fields to update (email, full_name, is_active, credential)
Returns:
Updated Superadmin instance
"""
superadmin = Superadmin.query.get(superadmin_id)
if not superadmin:
raise ValueError(f"Superadmin not found: {superadmin_id}")
# Handle credential update
if 'password' in kwargs:
kwargs['password_hash'] = bcrypt.generate_password_hash(kwargs.pop('password')).decode("utf-8")
# Update fields
for key, value in kwargs.items():
if hasattr(superadmin, key):
setattr(superadmin, key, value)
superadmin.save()
logger.info(f"[SuperadminAuth] Updated superadmin_id={superadmin_id}")
return superadmin