"""Superadmin authentication and audit decorators.""" import logging from functools import wraps from datetime import datetime, timezone from flask import request, g from gatehouse_app.utils.response import api_response logger = logging.getLogger(__name__) def superadmin_required(f): """Decorator to require superadmin Bearer token authentication. Extracts token from Authorization: Bearer {token} header, validates the session against SuperadminSession table, and sets g.current_superadmin and g.superadmin_session. Returns 401 if no valid session, 403 if not a superadmin. """ @wraps(f) def decorated_function(*args, **kwargs): # Extract token from Authorization header auth_header = request.headers.get('Authorization') if not auth_header: return api_response( success=False, message="Authorization header is required", status=401, error_type="AUTH_REQUIRED" ) # Expect format: "Bearer {token}" parts = auth_header.split() if len(parts) != 2 or parts[0].lower() != 'bearer': return api_response( success=False, message="Invalid authorization format. Use: Bearer {token}", status=401, error_type="INVALID_AUTH_FORMAT" ) token = parts[1] # Import here to avoid circular imports from gatehouse_app.models.superadmin import SuperadminSession, Superadmin # Get active session by token session = SuperadminSession.query.filter_by(token=token).first() if not session: return api_response( success=False, message="Invalid or expired session", status=401, error_type="INVALID_TOKEN" ) # Check if session is active if not session.is_active(): return api_response( success=False, message="Session is no longer active", status=401, error_type="SESSION_INACTIVE" ) # Get the superadmin superadmin = session.superadmin if not superadmin: return api_response( success=False, message="Superadmin not found", status=401, error_type="INVALID_TOKEN" ) # Check if superadmin is active if not superadmin.is_active: return api_response( success=False, message="Superadmin account is disabled", status=403, error_type="ACCOUNT_DISABLED" ) # Update last_activity_at timestamp session.last_activity_at = datetime.now(timezone.utc) from gatehouse_app import db db.session.commit() # Set context variables g.current_superadmin = superadmin g.superadmin_session = session return f(*args, **kwargs) return decorated_function def superadmin_audit_log(action, resource_type): """Decorator to log superadmin actions to SuperadminAuditLog. Must be used AFTER @superadmin_required to have access to g.current_superadmin. Args: action: The action being performed (e.g., 'update', 'delete', 'create') resource_type: The type of resource being acted on (e.g., 'organization', 'user') Captures: superadmin_id, action, resource_type, resource_id, org_id, user_id, ip_address, user_agent, request_id, extra_data """ def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): # Get superadmin from context (set by @superadmin_required) superadmin = getattr(g, 'current_superadmin', None) session = getattr(g, 'superadmin_session', None) if not superadmin: logger.warning(f"superadmin_audit_log used without @superadmin_required on {f.__name__}") return f(*args, **kwargs) # Extract resource_id from kwargs if present resource_id = kwargs.get('resource_id') or kwargs.get(f'{resource_type}_id') or None # Extract org_id and user_id from kwargs if present org_id = kwargs.get('org_id') or None user_id = kwargs.get('user_id') or None # Get IP address and user agent ip_address = request.remote_addr or None user_agent = request.headers.get('User-Agent') or None request_id = request.headers.get('X-Request-ID') or None # Get extra data from request body (for POST/PATCH requests) extra_data = None if request.is_json: try: # Exclude sensitive fields body = request.get_json(silent=True) or {} sensitive_fields = {'password', 'password_hash', 'token', 'secret', 'key'} extra_data = {k: v for k, v in body.items() if k not in sensitive_fields} except Exception: pass # Get success status (default to True unless an error is raised) success = True error_message = None try: result = f(*args, **kwargs) # Check if the response indicates failure if hasattr(result, 'get_json'): result_data = result.get_json() if result_data and result_data.get('success') is False: success = False error_message = result_data.get('message') return result except Exception as e: success = False error_message = str(e) raise finally: # Log the action try: from gatehouse_app.models.superadmin_audit_log import SuperadminAuditLog from gatehouse_app import db audit_entry = SuperadminAuditLog( superadmin_id=superadmin.id, action=action, resource_type=resource_type, resource_id=resource_id, org_id=org_id, user_id=user_id, ip_address=ip_address, user_agent=user_agent, request_id=request_id, extra_data=extra_data, success=success, error_message=error_message ) db.session.add(audit_entry) db.session.commit() logger.info( f"Superadmin audit: superadmin={superadmin.email} action={action} " f"resource_type={resource_type} resource_id={resource_id} success={success}" ) except Exception as e: # Never let audit logging failures break the main operation logger.error(f"Failed to write superadmin audit log: {e}") return decorated_function return decorator