From 81a221bd2b5429261286b8bd4f44505963d29346 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 8 May 2026 06:26:32 +0000 Subject: [PATCH] refactor: consolidate login audit logging and add superadmin user audit endpoints --- gatehouse_app/api/v1/auth/core.py | 15 ++ gatehouse_app/api/v1/auth/totp.py | 27 +++ gatehouse_app/api/v1/auth/webauthn.py | 26 +++ gatehouse_app/api/v1/superadmin/users.py | 232 +++++++++++++++++++++++ gatehouse_app/services/audit_service.py | 2 +- gatehouse_app/services/auth_service.py | 13 +- 6 files changed, 303 insertions(+), 12 deletions(-) diff --git a/gatehouse_app/api/v1/auth/core.py b/gatehouse_app/api/v1/auth/core.py index 4fb0071..e1d50c9 100644 --- a/gatehouse_app/api/v1/auth/core.py +++ b/gatehouse_app/api/v1/auth/core.py @@ -121,6 +121,21 @@ def login(): user_session = AuthService.create_session(user, duration_seconds=duration, is_compliance_only=is_compliance_only) + # Log successful login (after MFA complete, if applicable) + login_org_id = None + if policy_result.compliance_summary and policy_result.compliance_summary.orgs: + login_org_id = policy_result.compliance_summary.orgs[0].organization_id + + AuditService.log_action( + action=AuditAction.USER_LOGIN, + user_id=user.id, + organization_id=login_org_id, + ip_address=request.remote_addr, + user_agent=request.headers.get("User-Agent"), + description="User logged in (password)", + success=True, + ) + response_data = { "user": user.to_dict(), "token": user_session.token, diff --git a/gatehouse_app/api/v1/auth/totp.py b/gatehouse_app/api/v1/auth/totp.py index 0bb8505..8507173 100644 --- a/gatehouse_app/api/v1/auth/totp.py +++ b/gatehouse_app/api/v1/auth/totp.py @@ -1,4 +1,5 @@ """TOTP authentication endpoints.""" +import logging from flask import request, session, g, current_app from marshmallow import ValidationError from gatehouse_app.api.v1 import api_v1_bp @@ -12,6 +13,7 @@ from gatehouse_app.schemas.auth_schema import ( ) from gatehouse_app.services.auth_service import AuthService from gatehouse_app.services.mfa_policy_service import MfaPolicyService +from gatehouse_app.utils.constants import AuditAction from gatehouse_app.utils.decorators import login_required from gatehouse_app.exceptions.auth_exceptions import InvalidCredentialsError from gatehouse_app.exceptions.validation_exceptions import ConflictError @@ -78,6 +80,21 @@ def verify_totp(): is_compliance_only = policy_result.create_compliance_only_session user_session = AuthService.create_session(user, is_compliance_only=is_compliance_only) + # Log successful login (after MFA complete) + login_org_id = None + if policy_result.compliance_summary and policy_result.compliance_summary.orgs: + login_org_id = policy_result.compliance_summary.orgs[0].organization_id + + AuditService.log_action( + action=AuditAction.USER_LOGIN, + user_id=user.id, + organization_id=login_org_id, + ip_address=request.remote_addr, + user_agent=request.headers.get("User-Agent"), + description="User logged in (TOTP)", + success=True, + ) + session.pop("totp_pending_user_id", None) session.pop("webauthn_pending_user_id", None) @@ -112,6 +129,16 @@ def verify_totp(): except ValidationError as e: return api_response(success=False, message="Validation failed", status=400, error_type="VALIDATION_ERROR", error_details=e.messages) except InvalidCredentialsError as e: + # Log failed TOTP verification + AuditService.log_action( + action=AuditAction.TOTP_VERIFY_FAILED, + user_id=user.id, + ip_address=request.remote_addr, + user_agent=request.headers.get("User-Agent"), + description="TOTP verification failed", + success=False, + error_message=e.message, + ) return api_response(success=False, message=e.message, status=e.status_code, error_type=e.error_type) diff --git a/gatehouse_app/api/v1/auth/webauthn.py b/gatehouse_app/api/v1/auth/webauthn.py index dc5cf8b..ef07693 100644 --- a/gatehouse_app/api/v1/auth/webauthn.py +++ b/gatehouse_app/api/v1/auth/webauthn.py @@ -16,6 +16,7 @@ from gatehouse_app.schemas.webauthn_schema import ( from gatehouse_app.services.auth_service import AuthService from gatehouse_app.services.webauthn_service import WebAuthnService from gatehouse_app.services.mfa_policy_service import MfaPolicyService +from gatehouse_app.utils.constants import AuditAction from gatehouse_app.utils.decorators import login_required from gatehouse_app.exceptions.auth_exceptions import InvalidCredentialsError @@ -128,6 +129,21 @@ def complete_webauthn_login(): user_session = AuthService.create_session(user, is_compliance_only=is_compliance_only) session.pop("webauthn_pending_user_id", None) + # Log successful login (after MFA complete) + login_org_id = None + if policy_result.compliance_summary and policy_result.compliance_summary.orgs: + login_org_id = policy_result.compliance_summary.orgs[0].organization_id + + AuditService.log_action( + action=AuditAction.USER_LOGIN, + user_id=user.id, + organization_id=login_org_id, + ip_address=request.remote_addr, + user_agent=request.headers.get("User-Agent"), + description="User logged in (WebAuthn)", + success=True, + ) + logger.info(f"WebAuthn login completed successfully for user: {user.email}") response_data = { @@ -161,6 +177,16 @@ def complete_webauthn_login(): except ValidationError as e: return api_response(success=False, message="Validation failed", status=400, error_type="VALIDATION_ERROR", error_details=e.messages) except InvalidCredentialsError as e: + # Log failed WebAuthn verification + AuditService.log_action( + action=AuditAction.WEBAUTHN_LOGIN_FAILED, + user_id=user.id, + ip_address=request.remote_addr, + user_agent=request.headers.get("User-Agent"), + description="WebAuthn login failed", + success=False, + error_message=e.message, + ) return api_response(success=False, message=e.message, status=e.status_code, error_type=e.error_type) except Exception as e: logger.exception(f"WebAuthn login complete unexpected error: {e}") diff --git a/gatehouse_app/api/v1/superadmin/users.py b/gatehouse_app/api/v1/superadmin/users.py index fcac4aa..7f263fd 100644 --- a/gatehouse_app/api/v1/superadmin/users.py +++ b/gatehouse_app/api/v1/superadmin/users.py @@ -514,3 +514,235 @@ def remove_user_from_org(user_id, org_id): status=500, error_type="INTERNAL_ERROR", ) + + +# ============ User Audit Log Endpoints ============ + +@superadmin_bp.route("/users//audit-logs", methods=["GET"]) +@superadmin_required +def get_user_audit_logs(user_id): + """Get audit logs for a specific user (superadmin only). + + Query params: + page: Page number (default 1) + per_page: Items per page (default 50, max 200) + action: Filter by action type + success: Filter by success (true/false) + start_date: Filter by start date (ISO 8601) + end_date: Filter by end date (ISO 8601) + """ + try: + from gatehouse_app.models.auth.audit_log import AuditLog + from gatehouse_app.models.user.user import User + + user = User.query.get(user_id) + if not user or user.deleted_at is not None: + return api_response( + success=False, + message="User not found", + status=404, + error_type="NOT_FOUND", + ) + + page = max(1, int(request.args.get("page", 1))) + per_page = min(200, max(1, int(request.args.get("per_page", 50)))) + + query = AuditLog.query.filter_by(user_id=user_id) + + # Filters + action_filter = request.args.get("action") + if action_filter: + query = query.filter(AuditLog.action == action_filter) + + success_filter = request.args.get("success") + if success_filter is not None: + query = query.filter(AuditLog.success == (success_filter.lower() == "true")) + + start_date = request.args.get("start_date") + if start_date: + from datetime import datetime, timezone + try: + start_dt = datetime.fromisoformat(start_date.replace("Z", "+00:00")) + query = query.filter(AuditLog.created_at >= start_dt) + except ValueError: + return api_response( + success=False, + message="Invalid start_date format. Use ISO 8601.", + status=400, + error_type="VALIDATION_ERROR", + ) + + end_date = request.args.get("end_date") + if end_date: + from datetime import datetime, timezone + try: + end_dt = datetime.fromisoformat(end_date.replace("Z", "+00:00")) + query = query.filter(AuditLog.created_at <= end_dt) + except ValueError: + return api_response( + success=False, + message="Invalid end_date format. Use ISO 8601.", + status=400, + error_type="VALIDATION_ERROR", + ) + + query = query.order_by(AuditLog.created_at.desc()) + total = query.count() + logs = query.offset((page - 1) * per_page).limit(per_page).all() + + def log_to_dict(log): + action = log.action + return { + "id": log.id, + "action": action.value if hasattr(action, "value") else action, + "user_id": log.user_id, + "user": ( + {"id": log.user.id, "email": log.user.email, "full_name": log.user.full_name} + if log.user else None + ), + "organization_id": log.organization_id, + "resource_type": log.resource_type, + "resource_id": log.resource_id, + "ip_address": log.ip_address, + "user_agent": log.user_agent, + "request_id": log.request_id, + "description": log.description, + "success": log.success, + "error_message": log.error_message, + "metadata": log.extra_data, + "created_at": log.created_at.isoformat() if log.created_at else None, + } + + return api_response( + data={ + "user": { + "id": user.id, + "email": user.email, + "full_name": user.full_name, + }, + "audit_logs": [log_to_dict(log) for log in logs], + "count": total, + "page": page, + "per_page": per_page, + "pages": (total + per_page - 1) // per_page, + }, + message="User audit logs retrieved successfully", + ) + + except Exception as e: + logger.error(f"[SuperadminUsers] Get user audit logs error: {e}") + return api_response( + success=False, + message="An error occurred", + status=500, + error_type="INTERNAL_ERROR", + ) + + +@superadmin_bp.route("/users//audit-logs/export", methods=["GET"]) +@superadmin_required +def export_user_audit_logs(user_id): + """Export audit logs for a specific user as CSV (superadmin only). + + Query params: + action: Filter by action type + success: Filter by success (true/false) + start_date: Filter by start date (ISO 8601) + end_date: Filter by end date (ISO 8601) + """ + try: + from gatehouse_app.models.auth.audit_log import AuditLog + from gatehouse_app.models.user.user import User + import csv + from flask import make_response + from io import StringIO + + user = User.query.get(user_id) + if not user or user.deleted_at is not None: + return api_response( + success=False, + message="User not found", + status=404, + error_type="NOT_FOUND", + ) + + query = AuditLog.query.filter_by(user_id=user_id) + + # Apply same filters as get_user_audit_logs + action_filter = request.args.get("action") + if action_filter: + query = query.filter(AuditLog.action == action_filter) + + success_filter = request.args.get("success") + if success_filter is not None: + query = query.filter(AuditLog.success == (success_filter.lower() == "true")) + + start_date = request.args.get("start_date") + if start_date: + from datetime import datetime, timezone + try: + start_dt = datetime.fromisoformat(start_date.replace("Z", "+00:00")) + query = query.filter(AuditLog.created_at >= start_dt) + except ValueError: + return api_response( + success=False, + message="Invalid start_date format. Use ISO 8601.", + status=400, + error_type="VALIDATION_ERROR", + ) + + end_date = request.args.get("end_date") + if end_date: + from datetime import datetime, timezone + try: + end_dt = datetime.fromisoformat(end_date.replace("Z", "+00:00")) + query = query.filter(AuditLog.created_at <= end_dt) + except ValueError: + return api_response( + success=False, + message="Invalid end_date format. Use ISO 8601.", + status=400, + error_type="VALIDATION_ERROR", + ) + + query = query.order_by(AuditLog.created_at.desc()) + logs = query.all() + + # Generate CSV + output = StringIO() + writer = csv.writer(output) + writer.writerow([ + "id", "timestamp", "action", "user_email", "organization_id", + "resource_type", "resource_id", "ip_address", "success", + "description", "error_message" + ]) + + for log in logs: + action = log.action + writer.writerow([ + log.id, + log.created_at.isoformat() if log.created_at else "", + action.value if hasattr(action, "value") else action, + log.user.email if log.user else "", + log.organization_id or "", + log.resource_type or "", + log.resource_id or "", + log.ip_address or "", + log.success, + log.description or "", + log.error_message or "", + ]) + + response = make_response(output.getvalue()) + response.headers["Content-Type"] = "text/csv" + response.headers["Content-Disposition"] = f"attachment; filename=user_{user_id}_audit_logs.csv" + return response + + except Exception as e: + logger.error(f"[SuperadminUsers] Export user audit logs error: {e}") + return api_response( + success=False, + message="An error occurred", + status=500, + error_type="INTERNAL_ERROR", + ) diff --git a/gatehouse_app/services/audit_service.py b/gatehouse_app/services/audit_service.py index 2baac77..8cee67c 100644 --- a/gatehouse_app/services/audit_service.py +++ b/gatehouse_app/services/audit_service.py @@ -214,7 +214,7 @@ class AuditService: ): """Log external auth login event.""" return AuditService.log_action( - action=AuditAction.EXTERNAL_AUTH_LOGIN, + action=AuditAction.USER_LOGIN, user_id=user_id, organization_id=organization_id, resource_type="session", diff --git a/gatehouse_app/services/auth_service.py b/gatehouse_app/services/auth_service.py index c662ba8..9211116 100644 --- a/gatehouse_app/services/auth_service.py +++ b/gatehouse_app/services/auth_service.py @@ -176,15 +176,6 @@ class AuthService: ) session.save() - # Log session creation - AuditService.log_action( - action=AuditAction.SESSION_CREATE, - user_id=user.id, - resource_type="session", - resource_id=session.id, - description="User session created", - ) - return session @staticmethod @@ -254,9 +245,9 @@ class AuthService: if session: session.revoke(reason=reason) - # Log session revocation + # Log session revocation (user logout) AuditService.log_action( - action=AuditAction.SESSION_REVOKE, + action=AuditAction.USER_LOGOUT, user_id=session.user_id, resource_type="session", resource_id=session.id,