"""Superadmin user management endpoints.""" import logging from flask import request, g from gatehouse_app.api.v1.superadmin import superadmin_bp from gatehouse_app.utils.response import api_response from gatehouse_app.decorators.superadmin import superadmin_required, superadmin_audit_log from gatehouse_app.models.user.user import User from gatehouse_app.models.user.session import Session from gatehouse_app.models.organization.organization import Organization from gatehouse_app.models.organization.organization_member import OrganizationMember from gatehouse_app.extensions import db logger = logging.getLogger(__name__) @superadmin_bp.route("/users", methods=["GET"]) @superadmin_required def list_users(): """Get paginated list of users with optional filters. Query params: page: Page number (default 1) per_page: Items per page (default 20, max 100) organization_id: Filter by organization status: Filter by status (active/suspended) search: Search by email or name """ try: page = max(1, int(request.args.get("page", 1))) per_page = min(100, max(1, int(request.args.get("per_page", 20)))) org_id = request.args.get("organization_id") status = request.args.get("status") search = request.args.get("search", "").strip() # Base query query = User.query.filter(User.deleted_at.is_(None)) # Filter by organization if org_id: member_user_ids = db.session.query(OrganizationMember.user_id).filter( OrganizationMember.organization_id == org_id, OrganizationMember.deleted_at.is_(None), ).all() user_ids = [m.user_id for m in member_user_ids] query = query.filter(User.id.in_(user_ids)) # Filter by status if status == "suspended": query = query.filter(User.status == "GLOBAL_SUSPENDED") elif status == "active": query = query.filter(User.status != "GLOBAL_SUSPENDED") # Search by email or name if search: search_filter = f"%{search}%" query = query.filter( db.or_( User.email.ilike(search_filter), User.full_name.ilike(search_filter), ) ) # Order by created_at desc query = query.order_by(User.created_at.desc()) # Paginate total = query.count() users = query.offset((page - 1) * per_page).limit(per_page).all() # Get org memberships for each user items = [] for user in users: # Get organization memberships memberships = db.session.query(OrganizationMember).filter( OrganizationMember.user_id == user.id, OrganizationMember.deleted_at.is_(None), ).all() orgs = [] for m in memberships: org = Organization.query.get(m.organization_id) if org: orgs.append({ "org_id": org.id, "org_name": org.name, "role": m.role, "joined_at": m.created_at.isoformat() + "Z" if m.created_at else None, }) # Get active session count active_sessions = Session.query.filter( Session.user_id == user.id, Session.deleted_at.is_(None), Session.status == "active", ).count() items.append({ "id": user.id, "email": user.email, "full_name": user.full_name, "status": user.status, "mfa_enabled": user.mfa_enabled if hasattr(user, 'mfa_enabled') else False, "org_count": len(orgs), "orgs": orgs, "active_sessions": active_sessions, "last_login_at": user.last_login_at.isoformat() + "Z" if user.last_login_at else None, "created_at": user.created_at.isoformat() + "Z" if user.created_at else None, }) return api_response(data={ "items": items, "total": total, "page": page, "per_page": per_page, "pages": (total + per_page - 1) // per_page if per_page > 0 else 0, }, message="Users retrieved successfully") except Exception as e: logger.error(f"[SuperadminUsers] List users error: {e}") return api_response( success=False, message="An error occurred", status=500, error_type="INTERNAL_ERROR", ) @superadmin_bp.route("/users/", methods=["GET"]) @superadmin_required def get_user(user_id): """Get detailed user information. Returns user + all org memberships + active sessions + security methods. """ try: user = User.query.get(user_id) if not user or user.deleted_at is not None: return api_response( success=False, message="User not found", status=404, error_type="NOT_FOUND", ) # Get organization memberships memberships = db.session.query(OrganizationMember).filter( OrganizationMember.user_id == user_id, OrganizationMember.deleted_at.is_(None), ).all() orgs = [] for m in memberships: org = Organization.query.get(m.organization_id) if org: orgs.append({ "org_id": org.id, "org_name": org.name, "org_slug": org.slug, "role": m.role, "joined_at": m.created_at.isoformat() + "Z" if m.created_at else None, }) # Get active sessions sessions = Session.query.filter( Session.user_id == user_id, Session.deleted_at.is_(None), Session.status == "active", ).all() active_sessions = [{ "id": s.id, "ip_address": s.ip_address, "user_agent": s.user_agent, "created_at": s.created_at.isoformat() + "Z" if s.created_at else None, "last_active_at": s.last_active_at.isoformat() + "Z" if hasattr(s, 'last_active_at') and s.last_active_at else None, } for s in sessions] # Get security methods (simplified - would need UserSecurityMethod model) security_methods = [] if hasattr(user, 'totp_enabled') and user.totp_enabled: security_methods.append({"type": "totp", "enabled": True}) if hasattr(user, 'webauthn_enabled') and user.webauthn_enabled: security_methods.append({"type": "webauthn", "enabled": True}) return api_response(data={ "user": { "id": user.id, "email": user.email, "full_name": user.full_name, "status": user.status, "mfa_enabled": user.mfa_enabled if hasattr(user, 'mfa_enabled') else False, "last_login_at": user.last_login_at.isoformat() + "Z" if user.last_login_at else None, "created_at": user.created_at.isoformat() + "Z" if user.created_at else None, }, "organizations": orgs, "active_sessions": active_sessions, "security_methods": security_methods, }, message="User retrieved successfully") except Exception as e: logger.error(f"[SuperadminUsers] Get user error: {e}") return api_response( success=False, message="An error occurred", status=500, error_type="INTERNAL_ERROR", ) @superadmin_bp.route("/users//suspend", methods=["POST"]) @superadmin_required @superadmin_audit_log(action="user.suspend", resource_type="user") def suspend_user(user_id): """Globally suspend a user (sets status=GLOBAL_SUSPENDED).""" try: user = User.query.get(user_id) if not user or user.deleted_at is not None: return api_response( success=False, message="User not found", status=404, error_type="NOT_FOUND", ) if user.status == "GLOBAL_SUSPENDED": return api_response( success=False, message="User is already suspended", status=400, error_type="VALIDATION_ERROR", ) user.status = "GLOBAL_SUSPENDED" db.session.commit() # Revoke all sessions revoked_count = Session.query.filter( Session.user_id == user_id, Session.deleted_at.is_(None), ).update({"status": "revoked", "deleted_at": db.func.now()}) db.session.commit() logger.warning(f"[SuperadminUsers] User {user_id} globally suspended by {getattr(g, 'current_superadmin', {}).get('id', 'unknown')}") return api_response(data={ "user": { "id": user.id, "email": user.email, "status": user.status, }, "sessions_revoked": revoked_count, }, message="User suspended successfully") except Exception as e: db.session.rollback() logger.error(f"[SuperadminUsers] Suspend user error: {e}") return api_response( success=False, message="An error occurred", status=500, error_type="INTERNAL_ERROR", ) @superadmin_bp.route("/users//unsuspend", methods=["POST"]) @superadmin_required @superadmin_audit_log(action="user.unsuspend", resource_type="user") def unsuspend_user(user_id): """Remove global suspension from a user.""" try: user = User.query.get(user_id) if not user or user.deleted_at is not None: return api_response( success=False, message="User not found", status=404, error_type="NOT_FOUND", ) if user.status != "GLOBAL_SUSPENDED": return api_response( success=False, message="User is not suspended", status=400, error_type="VALIDATION_ERROR", ) user.status = "active" db.session.commit() return api_response(data={ "user": { "id": user.id, "email": user.email, "status": user.status, }, }, message="User unsuspended successfully") except Exception as e: db.session.rollback() logger.error(f"[SuperadminUsers] Unsuspend user error: {e}") return api_response( success=False, message="An error occurred", status=500, error_type="INTERNAL_ERROR", ) @superadmin_bp.route("/users//reset-password", methods=["POST"]) @superadmin_required @superadmin_audit_log(action="user.reset_password", resource_type="user") def reset_user_password(user_id): """Trigger password reset email flow for user.""" try: user = User.query.get(user_id) if not user or user.deleted_at is not None: return api_response( success=False, message="User not found", status=404, error_type="NOT_FOUND", ) # In production, this would call AuthService.send_password_reset_email(user.email) # For now, just log and return success logger.info(f"[SuperadminUsers] Password reset requested for {user.email} by superadmin") return api_response(data={ "email": user.email, }, message="Password reset email sent successfully") except Exception as e: logger.error(f"[SuperadminUsers] Reset password error: {e}") return api_response( success=False, message="An error occurred", status=500, error_type="INTERNAL_ERROR", ) @superadmin_bp.route("/users//sessions", methods=["DELETE"]) @superadmin_required @superadmin_audit_log(action="user.revoke_sessions", resource_type="user") def revoke_user_sessions(user_id): """Revoke all sessions for a user.""" try: user = User.query.get(user_id) if not user or user.deleted_at is not None: return api_response( success=False, message="User not found", status=404, error_type="NOT_FOUND", ) # Revoke all sessions result = Session.query.filter( Session.user_id == user_id, Session.deleted_at.is_(None), ).update({"status": "revoked", "deleted_at": db.func.now()}) db.session.commit() return api_response(data={ "user_id": user_id, "count": result, }, message=f"All sessions revoked ({result} sessions)") except Exception as e: db.session.rollback() logger.error(f"[SuperadminUsers] Revoke sessions error: {e}") return api_response( success=False, message="An error occurred", status=500, error_type="INTERNAL_ERROR", ) @superadmin_bp.route("/users//add-to-org/", methods=["POST"]) @superadmin_required @superadmin_audit_log(action="user.add_to_org", resource_type="user") def add_user_to_org(user_id, org_id): """Add a user to an organization with specified role.""" try: data = request.json or {} role = data.get("role", "member") valid_roles = ["member", "admin", "owner"] if role not in valid_roles: return api_response( success=False, message=f"Invalid role. Must be one of: {', '.join(valid_roles)}", status=400, error_type="VALIDATION_ERROR", ) user = User.query.get(user_id) if not user or user.deleted_at is not None: return api_response( success=False, message="User not found", status=404, error_type="NOT_FOUND", ) org = Organization.query.get(org_id) if not org or org.deleted_at is not None: return api_response( success=False, message="Organization not found", status=404, error_type="NOT_FOUND", ) # Check if already a member existing = OrganizationMember.query.filter( OrganizationMember.user_id == user_id, OrganizationMember.organization_id == org_id, OrganizationMember.deleted_at.is_(None), ).first() if existing: return api_response( success=False, message="User is already a member of this organization", status=400, error_type="VALIDATION_ERROR", ) # Create membership membership = OrganizationMember( user_id=user_id, organization_id=org_id, role=role, ) db.session.add(membership) db.session.commit() logger.info(f"[SuperadminUsers] User {user_id} added to org {org_id} as {role} by superadmin") return api_response(data={ "user_id": user_id, "organization_id": org_id, "role": role, "joined_at": membership.created_at.isoformat() + "Z" if membership.created_at else None, }, message="User added to organization successfully") except Exception as e: db.session.rollback() logger.error(f"[SuperadminUsers] Add to org error: {e}") return api_response( success=False, message="An error occurred", status=500, error_type="INTERNAL_ERROR", ) @superadmin_bp.route("/users//orgs/", methods=["DELETE"]) @superadmin_required @superadmin_audit_log(action="user.remove_from_org", resource_type="user") def remove_user_from_org(user_id, org_id): """Remove a user from an organization.""" try: membership = OrganizationMember.query.filter( OrganizationMember.user_id == user_id, OrganizationMember.organization_id == org_id, OrganizationMember.deleted_at.is_(None), ).first() if not membership: return api_response( success=False, message="User is not a member of this organization", status=404, error_type="NOT_FOUND", ) # Check if user is the only owner if membership.role == "owner": owner_count = OrganizationMember.query.filter( OrganizationMember.organization_id == org_id, OrganizationMember.role == "owner", OrganizationMember.deleted_at.is_(None), ).count() if owner_count <= 1: return api_response( success=False, message="Cannot remove the only owner from an organization. Transfer ownership first.", status=400, error_type="VALIDATION_ERROR", ) # Soft delete membership membership.deleted_at = db.func.now() db.session.commit() logger.info(f"[SuperadminUsers] User {user_id} removed from org {org_id} by superadmin") return api_response(data={ "user_id": user_id, "organization_id": org_id, }, message="User removed from organization successfully") except Exception as e: db.session.rollback() logger.error(f"[SuperadminUsers] Remove from org error: {e}") return api_response( success=False, message="An error occurred", status=500, error_type="INTERNAL_ERROR", ) # ============ User Audit Log Endpoints ============ @superadmin_bp.route("/users//audit-logs", methods=["GET"]) @superadmin_required def get_user_audit_logs(user_id): """Get audit logs for a specific user (superadmin only). Query params: page: Page number (default 1) per_page: Items per page (default 50, max 200) action: Filter by action type success: Filter by success (true/false) start_date: Filter by start date (ISO 8601) end_date: Filter by end date (ISO 8601) """ try: from gatehouse_app.models.auth.audit_log import AuditLog from gatehouse_app.models.user.user import User user = User.query.get(user_id) if not user or user.deleted_at is not None: return api_response( success=False, message="User not found", status=404, error_type="NOT_FOUND", ) page = max(1, int(request.args.get("page", 1))) per_page = min(200, max(1, int(request.args.get("per_page", 50)))) query = AuditLog.query.filter_by(user_id=user_id) # Filters action_filter = request.args.get("action") if action_filter: query = query.filter(AuditLog.action == action_filter) success_filter = request.args.get("success") if success_filter is not None: query = query.filter(AuditLog.success == (success_filter.lower() == "true")) start_date = request.args.get("start_date") if start_date: from datetime import datetime, timezone try: start_dt = datetime.fromisoformat(start_date.replace("Z", "+00:00")) query = query.filter(AuditLog.created_at >= start_dt) except ValueError: return api_response( success=False, message="Invalid start_date format. Use ISO 8601.", status=400, error_type="VALIDATION_ERROR", ) end_date = request.args.get("end_date") if end_date: from datetime import datetime, timezone try: end_dt = datetime.fromisoformat(end_date.replace("Z", "+00:00")) query = query.filter(AuditLog.created_at <= end_dt) except ValueError: return api_response( success=False, message="Invalid end_date format. Use ISO 8601.", status=400, error_type="VALIDATION_ERROR", ) query = query.order_by(AuditLog.created_at.desc()) total = query.count() logs = query.offset((page - 1) * per_page).limit(per_page).all() def log_to_dict(log): action = log.action return { "id": log.id, "action": action.value if hasattr(action, "value") else action, "user_id": log.user_id, "user": ( {"id": log.user.id, "email": log.user.email, "full_name": log.user.full_name} if log.user else None ), "organization_id": log.organization_id, "resource_type": log.resource_type, "resource_id": log.resource_id, "ip_address": log.ip_address, "user_agent": log.user_agent, "request_id": log.request_id, "description": log.description, "success": log.success, "error_message": log.error_message, "metadata": log.extra_data, "created_at": log.created_at.isoformat() if log.created_at else None, } return api_response( data={ "user": { "id": user.id, "email": user.email, "full_name": user.full_name, }, "audit_logs": [log_to_dict(log) for log in logs], "count": total, "page": page, "per_page": per_page, "pages": (total + per_page - 1) // per_page, }, message="User audit logs retrieved successfully", ) except Exception as e: logger.error(f"[SuperadminUsers] Get user audit logs error: {e}") return api_response( success=False, message="An error occurred", status=500, error_type="INTERNAL_ERROR", ) @superadmin_bp.route("/users//audit-logs/export", methods=["GET"]) @superadmin_required def export_user_audit_logs(user_id): """Export audit logs for a specific user as CSV (superadmin only). Query params: action: Filter by action type success: Filter by success (true/false) start_date: Filter by start date (ISO 8601) end_date: Filter by end date (ISO 8601) """ try: from gatehouse_app.models.auth.audit_log import AuditLog from gatehouse_app.models.user.user import User import csv from flask import make_response from io import StringIO user = User.query.get(user_id) if not user or user.deleted_at is not None: return api_response( success=False, message="User not found", status=404, error_type="NOT_FOUND", ) query = AuditLog.query.filter_by(user_id=user_id) # Apply same filters as get_user_audit_logs action_filter = request.args.get("action") if action_filter: query = query.filter(AuditLog.action == action_filter) success_filter = request.args.get("success") if success_filter is not None: query = query.filter(AuditLog.success == (success_filter.lower() == "true")) start_date = request.args.get("start_date") if start_date: from datetime import datetime, timezone try: start_dt = datetime.fromisoformat(start_date.replace("Z", "+00:00")) query = query.filter(AuditLog.created_at >= start_dt) except ValueError: return api_response( success=False, message="Invalid start_date format. Use ISO 8601.", status=400, error_type="VALIDATION_ERROR", ) end_date = request.args.get("end_date") if end_date: from datetime import datetime, timezone try: end_dt = datetime.fromisoformat(end_date.replace("Z", "+00:00")) query = query.filter(AuditLog.created_at <= end_dt) except ValueError: return api_response( success=False, message="Invalid end_date format. Use ISO 8601.", status=400, error_type="VALIDATION_ERROR", ) query = query.order_by(AuditLog.created_at.desc()) logs = query.all() # Generate CSV output = StringIO() writer = csv.writer(output) writer.writerow([ "id", "timestamp", "action", "user_email", "organization_id", "resource_type", "resource_id", "ip_address", "success", "description", "error_message" ]) for log in logs: action = log.action writer.writerow([ log.id, log.created_at.isoformat() if log.created_at else "", action.value if hasattr(action, "value") else action, log.user.email if log.user else "", log.organization_id or "", log.resource_type or "", log.resource_id or "", log.ip_address or "", log.success, log.description or "", log.error_message or "", ]) response = make_response(output.getvalue()) response.headers["Content-Type"] = "text/csv" response.headers["Content-Disposition"] = f"attachment; filename=user_{user_id}_audit_logs.csv" return response except Exception as e: logger.error(f"[SuperadminUsers] Export user audit logs error: {e}") return api_response( success=False, message="An error occurred", status=500, error_type="INTERNAL_ERROR", )