"""ZeroTier network governance API endpoints.""" from flask import g, request from marshmallow import Schema, fields, validate, ValidationError from sqlalchemy.exc import IntegrityError from gatehouse_app.api.v1 import api_v1_bp from gatehouse_app.extensions import db from gatehouse_app.utils.response import api_response from gatehouse_app.utils.decorators import login_required, require_admin, full_access_required from gatehouse_app.services import portal_network_service from gatehouse_app.services import device_service from gatehouse_app.services import network_access_service from gatehouse_app.services import zerotier_api_service as zt from gatehouse_app.services import zerotier_reconciliation_service from gatehouse_app.models import ( PortalNetwork, Device, DeviceNetworkMembership, UserNetworkApproval, ActivationSession, ) from gatehouse_app.models.organization import Organization from gatehouse_app.models.organization.organization_member import OrganizationMember from gatehouse_app.utils.constants import OrganizationRole from gatehouse_app.exceptions import ( ValidationError as AppValidationError, ZeroTierAPIError, NetworkNotFoundError, DeviceNotFoundError, DeviceAlreadyExistsError, ApprovalNotFoundError, MembershipNotFoundError, ) def _org_check(org_id): org = Organization.query.filter_by(id=org_id, deleted_at=None).first() if not org: return None, api_response(success=False, message="Organization not found", status=404, error_type="NOT_FOUND") if not org.is_member(g.current_user.id): return None, api_response(success=False, message="Not a member of this organization", status=403, error_type="AUTHORIZATION_ERROR") return org, None def _is_org_admin(org_id: str, user_id: str) -> bool: """Return True if the user is an admin or owner of the org.""" return OrganizationMember.query.filter( OrganizationMember.organization_id == org_id, OrganizationMember.user_id == user_id, OrganizationMember.role.in_([OrganizationRole.ADMIN, OrganizationRole.OWNER]), OrganizationMember.deleted_at.is_(None), ).first() is not None # ── Schemas ─────────────────────────────────────────────────────────────────── class CreateNetworkSchema(Schema): name = fields.Str(required=True, validate=validate.Length(min=1, max=255)) description = fields.Str(allow_none=True) zerotier_network_id = fields.Str(required=True, validate=validate.Length(equal=16)) environment = fields.Str(load_default="development") request_mode = fields.Str(load_default="approval_required") default_activation_lifetime_minutes = fields.Int(load_default=480) max_activation_lifetime_minutes = fields.Int(allow_none=True, load_default=None) class UpdateNetworkSchema(Schema): name = fields.Str(validate=validate.Length(min=1, max=255)) description = fields.Str(allow_none=True) environment = fields.Str() request_mode = fields.Str() default_activation_lifetime_minutes = fields.Int() max_activation_lifetime_minutes = fields.Int(allow_none=True) is_active = fields.Bool() class RegisterDeviceSchema(Schema): node_id = fields.Str(required=True, validate=validate.Length(equal=10)) nickname = fields.Str(allow_none=True, validate=validate.Length(max=255)) hostname = fields.Str(allow_none=True, validate=validate.Length(max=255)) asset_tag = fields.Str(allow_none=True, validate=validate.Length(max=255)) serial_number = fields.Str(allow_none=True, validate=validate.Length(max=255)) class UpdateDeviceSchema(Schema): nickname = fields.Str(allow_none=True, validate=validate.Length(max=255)) hostname = fields.Str(allow_none=True, validate=validate.Length(max=255)) asset_tag = fields.Str(allow_none=True, validate=validate.Length(max=255)) serial_number = fields.Str(allow_none=True, validate=validate.Length(max=255)) class RequestAccessSchema(Schema): portal_network_id = fields.Str(required=True) device_id = fields.Str(required=True) justification = fields.Str(allow_none=True, validate=validate.Length(max=1000)) class AssignAccessSchema(Schema): target_user_id = fields.Str(required=True) portal_network_id = fields.Str(required=True) justification = fields.Str(allow_none=True, validate=validate.Length(max=1000)) class ActivateSchema(Schema): lifetime_minutes = fields.Int(allow_none=True) class KillSwitchSchema(Schema): target_user_id = fields.Str(required=True) scope = fields.Str(load_default="organization") reason = fields.Str(allow_none=True, validate=validate.Length(max=500)) network_ids = fields.List(fields.Str(), allow_none=True) # ── Networks ────────────────────────────────────────────────────────────────── @api_v1_bp.route("/organizations//networks", methods=["GET"]) @login_required @full_access_required def list_networks(org_id): """List portal networks for an organization.""" org, err = _org_check(org_id) if err: return err include_inactive = request.args.get("include_inactive", "false").lower() == "true" networks = portal_network_service.list_networks(org_id, include_inactive=include_inactive) return api_response( data={"networks": [n.to_dict() for n in networks], "count": len(networks)}, message="Networks retrieved successfully", ) @api_v1_bp.route("/organizations//networks", methods=["POST"]) @login_required @require_admin @full_access_required def create_network(org_id): """Create a new portal network (admin only).""" org, err = _org_check(org_id) if err: return err try: schema = CreateNetworkSchema() data = schema.load(request.json or {}) except ValidationError as e: return api_response(success=False, message="Validation failed", status=400, error_type="VALIDATION_ERROR", error_details=e.messages) try: network = portal_network_service.create_network( organization_id=org_id, name=data["name"], owner_user_id=g.current_user.id, zerotier_network_id=data["zerotier_network_id"], description=data.get("description"), environment=data.get("environment"), request_mode=data.get("request_mode"), default_activation_lifetime_minutes=data.get("default_activation_lifetime_minutes", 480), max_activation_lifetime_minutes=data.get("max_activation_lifetime_minutes"), ) return api_response(data={"network": network.to_dict()}, message="Network created successfully", status=201) except AppValidationError as e: return api_response(success=False, message=str(e.message), status=400, error_type=e.error_type) except ZeroTierAPIError as e: return api_response(success=False, message=str(e), status=502, error_type=e.error_type) except IntegrityError: db.session.rollback() return api_response( success=False, message="A portal network with this ZeroTier ID already exists in this organization.", status=409, error_type="DUPLICATE_NETWORK", ) @api_v1_bp.route("/organizations//zerotier/available-networks", methods=["GET"]) @login_required @require_admin @full_access_required def list_zerotier_available_networks(org_id): """List all ZeroTier networks from the org's ZT controller/account. Cross-references against managed portal networks so the UI can show which ones are already imported and which can be imported. """ org, err = _org_check(org_id) if err: return err # Fetch all active portal networks for this org, keyed by ZT network ID managed = { pn.zerotier_network_id: pn for pn in PortalNetwork.query.filter( PortalNetwork.organization_id == org_id, PortalNetwork.deleted_at.is_(None), ).all() } try: zt_networks = zt.list_networks(organization_id=org_id) except ZeroTierAPIError as e: # Return an empty list with a flag so the UI can show a helpful message # rather than an error page (e.g. "ZeroTier not configured yet"). return api_response( data={"networks": [], "count": 0, "zt_error": str(e)}, message="ZeroTier unavailable — no networks returned", ) result = [] for zt_net in zt_networks: portal = managed.get(zt_net.id) result.append({ **zt_net.to_dict(), "already_managed": portal is not None, "portal_network_id": portal.id if portal else None, "portal_network_name": portal.name if portal else None, }) return api_response( data={"networks": result, "count": len(result)}, message="Available ZeroTier networks retrieved", ) @api_v1_bp.route("/organizations//networks/", methods=["GET"]) @login_required @full_access_required def get_network(org_id, network_id): """Get a portal network by ID.""" org, err = _org_check(org_id) if err: return err try: network = portal_network_service.get_network(network_id, organization_id=org_id) return api_response(data={"network": network.to_dict()}, message="Network retrieved successfully") except NetworkNotFoundError as e: return api_response(success=False, message=str(e), status=404, error_type=e.error_type) @api_v1_bp.route("/organizations//networks/", methods=["PATCH"]) @login_required @require_admin @full_access_required def update_network(org_id, network_id): """Update network metadata (admin only).""" org, err = _org_check(org_id) if err: return err try: schema = UpdateNetworkSchema() data = schema.load(request.json or {}) except ValidationError as e: return api_response(success=False, message="Validation failed", status=400, error_type="VALIDATION_ERROR", error_details=e.messages) try: network = portal_network_service.update_network(network_id, g.current_user.id, **data) return api_response(data={"network": network.to_dict()}, message="Network updated successfully") except NetworkNotFoundError as e: return api_response(success=False, message=str(e), status=404, error_type=e.error_type) except AppValidationError as e: return api_response(success=False, message=str(e.message), status=400, error_type=e.error_type) @api_v1_bp.route("/organizations//networks/", methods=["DELETE"]) @login_required @require_admin @full_access_required def delete_network(org_id, network_id): """Delete a portal network (admin only).""" org, err = _org_check(org_id) if err: return err try: portal_network_service.delete_network(network_id, g.current_user.id) return api_response(message="Network deleted successfully") except NetworkNotFoundError as e: return api_response(success=False, message=str(e), status=404, error_type=e.error_type) @api_v1_bp.route("/organizations//networks//members", methods=["GET"]) @login_required @full_access_required def get_network_members(org_id, network_id): """List all device memberships for a network.""" org, err = _org_check(org_id) if err: return err try: portal_network_service.get_network(network_id, organization_id=org_id) except NetworkNotFoundError as e: return api_response(success=False, message=str(e), status=404, error_type=e.error_type) memberships = portal_network_service.get_network_members(network_id) return api_response( data={"memberships": [m.to_dict() for m in memberships], "count": len(memberships)}, message="Network members retrieved successfully", ) @api_v1_bp.route("/organizations//networks//requests", methods=["GET"]) @login_required @full_access_required def get_network_pending_requests(org_id, network_id): """List pending access requests for a network (manager view).""" org, err = _org_check(org_id) if err: return err try: portal_network_service.get_network(network_id, organization_id=org_id) except NetworkNotFoundError as e: return api_response(success=False, message=str(e), status=404, error_type=e.error_type) requests = portal_network_service.get_network_pending_requests(network_id) return api_response( data={"requests": [r.to_dict() for r in requests], "count": len(requests)}, message="Pending requests retrieved successfully", ) # ── Devices ─────────────────────────────────────────────────────────────────── @api_v1_bp.route("/organizations//devices", methods=["GET"]) @login_required @full_access_required def list_devices(org_id): """List the current user's registered devices.""" org, err = _org_check(org_id) if err: return err devices = device_service.list_user_devices(g.current_user.id, org_id) return api_response( data={"devices": [d.to_dict() for d in devices], "count": len(devices)}, message="Devices retrieved successfully", ) @api_v1_bp.route("/organizations//devices", methods=["POST"]) @login_required @full_access_required def register_device(org_id): """Register a new device for the current user.""" org, err = _org_check(org_id) if err: return err try: schema = RegisterDeviceSchema() data = schema.load(request.json or {}) except ValidationError as e: return api_response(success=False, message="Validation failed", status=400, error_type="VALIDATION_ERROR", error_details=e.messages) try: device = device_service.register_device( user_id=g.current_user.id, organization_id=org_id, node_id=data["node_id"], nickname=data.get("nickname"), hostname=data.get("hostname"), asset_tag=data.get("asset_tag"), serial_number=data.get("serial_number"), ) from gatehouse_app.services.network_access_service import materialize_device_memberships memberships = materialize_device_memberships(device.id) return api_response( data={"device": device.to_dict(), "memberships_created": len(memberships)}, message="Device registered successfully", status=201, ) except AppValidationError as e: return api_response(success=False, message=str(e.message), status=400, error_type=e.error_type) except DeviceAlreadyExistsError as e: return api_response(success=False, message=str(e), status=409, error_type=e.error_type) @api_v1_bp.route("/organizations//devices/", methods=["GET"]) @login_required @full_access_required def get_device(org_id, device_id): """Get a device by ID.""" org, err = _org_check(org_id) if err: return err try: device = device_service.get_device(device_id, organization_id=org_id) return api_response(data={"device": device.to_dict()}, message="Device retrieved successfully") except DeviceNotFoundError as e: return api_response(success=False, message=str(e), status=404, error_type=e.error_type) @api_v1_bp.route("/organizations//devices/", methods=["PATCH"]) @login_required @full_access_required def update_device(org_id, device_id): """Update device fields (owner only).""" org, err = _org_check(org_id) if err: return err try: schema = UpdateDeviceSchema() data = schema.load(request.json or {}) except ValidationError as e: return api_response(success=False, message="Validation failed", status=400, error_type="VALIDATION_ERROR", error_details=e.messages) if "nickname" in data: data["device_nickname"] = data.pop("nickname") try: device = device_service.update_device(device_id, g.current_user.id, **data) return api_response(data={"device": device.to_dict()}, message="Device updated successfully") except DeviceNotFoundError as e: return api_response(success=False, message=str(e), status=404, error_type=e.error_type) @api_v1_bp.route("/organizations//devices/", methods=["DELETE"]) @login_required @full_access_required def remove_device(org_id, device_id): """Remove a registered device (owner only).""" org, err = _org_check(org_id) if err: return err try: device_service.remove_device(device_id, g.current_user.id) return api_response(message="Device removed successfully") except DeviceNotFoundError as e: return api_response(success=False, message=str(e), status=404, error_type=e.error_type) # ── Approvals ───────────────────────────────────────────────────────────────── @api_v1_bp.route("/organizations//approvals", methods=["POST"]) @login_required @full_access_required def request_access(org_id): """Request access to a network for a specific registered device.""" org, err = _org_check(org_id) if err: return err try: schema = RequestAccessSchema() data = schema.load(request.json or {}) except ValidationError as e: return api_response(success=False, message="Validation failed", status=400, error_type="VALIDATION_ERROR", error_details=e.messages) try: approval = network_access_service.request_access( user_id=g.current_user.id, organization_id=org_id, portal_network_id=data["portal_network_id"], device_id=data["device_id"], justification=data.get("justification"), ) return api_response(data={"approval": approval.to_dict()}, message="Access requested successfully", status=201) except AppValidationError as e: return api_response(success=False, message=str(e.message), status=400, error_type=e.error_type) except DeviceNotFoundError as e: return api_response(success=False, message=str(e), status=400, error_type=e.error_type) @api_v1_bp.route("/organizations//approvals", methods=["GET"]) @login_required @full_access_required def list_my_approvals(org_id): """List the current user's approval records.""" org, err = _org_check(org_id) if err: return err approvals = network_access_service.list_user_approvals(g.current_user.id, org_id) return api_response( data={"approvals": [a.to_dict() for a in approvals], "count": len(approvals)}, message="Approvals retrieved successfully", ) @api_v1_bp.route("/organizations//approvals/pending", methods=["GET"]) @login_required @require_admin @full_access_required def list_pending_approvals(org_id): """List all pending approval requests (admin only).""" org, err = _org_check(org_id) if err: return err network_id = request.args.get("network_id") approvals = network_access_service.list_pending_approvals(org_id, network_id=network_id) return api_response( data={"approvals": [a.to_dict() for a in approvals], "count": len(approvals)}, message="Pending approvals retrieved successfully", ) @api_v1_bp.route("/organizations//approvals//approve", methods=["POST"]) @login_required @require_admin @full_access_required def approve_request(org_id, approval_id): """Approve a pending access request (admin only).""" org, err = _org_check(org_id) if err: return err try: approval = network_access_service.approve_request(approval_id, g.current_user.id) return api_response(data={"approval": approval.to_dict()}, message="Request approved successfully") except ApprovalNotFoundError as e: return api_response(success=False, message=str(e), status=404, error_type=e.error_type) except AppValidationError as e: return api_response(success=False, message=str(e.message), status=400, error_type=e.error_type) @api_v1_bp.route("/organizations//approvals//reject", methods=["POST"]) @login_required @require_admin @full_access_required def reject_request(org_id, approval_id): """Reject a pending access request (admin only).""" org, err = _org_check(org_id) if err: return err try: approval = network_access_service.reject_request(approval_id, g.current_user.id) return api_response(data={"approval": approval.to_dict()}, message="Request rejected successfully") except ApprovalNotFoundError as e: return api_response(success=False, message=str(e), status=404, error_type=e.error_type) except AppValidationError as e: return api_response(success=False, message=str(e.message), status=400, error_type=e.error_type) @api_v1_bp.route("/organizations//approvals//revoke", methods=["POST"]) @login_required @require_admin @full_access_required def revoke_approval(org_id, approval_id): """Revoke an approved access record (admin only).""" org, err = _org_check(org_id) if err: return err try: approval = network_access_service.revoke_approval(approval_id, g.current_user.id) return api_response(data={"approval": approval.to_dict()}, message="Approval revoked successfully") except ApprovalNotFoundError as e: return api_response(success=False, message=str(e), status=404, error_type=e.error_type) @api_v1_bp.route("/organizations//approvals/assign", methods=["POST"]) @login_required @require_admin @full_access_required def assign_access(org_id): """Directly assign network access to a user (admin only).""" org, err = _org_check(org_id) if err: return err try: schema = AssignAccessSchema() data = schema.load(request.json or {}) except ValidationError as e: return api_response(success=False, message="Validation failed", status=400, error_type="VALIDATION_ERROR", error_details=e.messages) try: approval = network_access_service.assign_access( portal_network_id=data["portal_network_id"], target_user_id=data["target_user_id"], granted_by_user_id=g.current_user.id, organization_id=org_id, justification=data.get("justification"), ) return api_response(data={"approval": approval.to_dict()}, message="Access assigned successfully", status=201) except AppValidationError as e: return api_response(success=False, message=str(e.message), status=400, error_type=e.error_type) @api_v1_bp.route("/organizations//admin/approvals", methods=["GET"]) @login_required @require_admin @full_access_required def admin_list_all_approvals(org_id): """List ALL approval records across all users in the org (admin only).""" org, err = _org_check(org_id) if err: return err network_id = request.args.get("network_id") state = request.args.get("state") approvals = network_access_service.list_all_org_approvals(org_id, network_id=network_id, state=state) return api_response( data={"approvals": [a.to_dict() for a in approvals], "count": len(approvals)}, message="Approvals retrieved successfully", ) # ── Memberships ─────────────────────────────────────────────────────────────── @api_v1_bp.route("/organizations//memberships", methods=["GET"]) @login_required @full_access_required def list_memberships(org_id): """List the current user's device network memberships.""" org, err = _org_check(org_id) if err: return err memberships = DeviceNetworkMembership.query.filter( DeviceNetworkMembership.user_id == g.current_user.id, DeviceNetworkMembership.organization_id == org_id, DeviceNetworkMembership.deleted_at.is_(None), ).all() return api_response( data={"memberships": [m.to_dict() for m in memberships], "count": len(memberships)}, message="Memberships retrieved successfully", ) @api_v1_bp.route("/organizations//memberships//activate", methods=["POST"]) @login_required @full_access_required def activate_membership(org_id, membership_id): """Activate an approved device membership. Admins can activate any membership; regular members can only activate their own.""" org, err = _org_check(org_id) if err: return err try: schema = ActivateSchema() data = schema.load(request.json or {}) except ValidationError as e: return api_response(success=False, message="Validation failed", status=400, error_type="VALIDATION_ERROR", error_details=e.messages) is_admin = _is_org_admin(org_id, g.current_user.id) try: session = network_access_service.activate_device_membership( membership_id=membership_id, user_id=g.current_user.id, lifetime_minutes=data.get("lifetime_minutes"), admin_override=is_admin, ) membership = DeviceNetworkMembership.query.get(membership_id) return api_response(data={"session": session.to_dict(), "membership": membership.to_dict()}, message="Membership activated successfully") except MembershipNotFoundError as e: return api_response(success=False, message=str(e), status=404, error_type=e.error_type) except AppValidationError as e: return api_response(success=False, message=str(e.message), status=400, error_type=e.error_type) @api_v1_bp.route("/organizations//memberships//deactivate", methods=["POST"]) @login_required @full_access_required def deactivate_membership(org_id, membership_id): """Deactivate an active device membership. Admins can deactivate any; regular members can only deactivate their own.""" org, err = _org_check(org_id) if err: return err # Verify ownership for non-admins if not _is_org_admin(org_id, g.current_user.id): membership_check = DeviceNetworkMembership.query.filter( DeviceNetworkMembership.id == membership_id, DeviceNetworkMembership.user_id == g.current_user.id, DeviceNetworkMembership.deleted_at.is_(None), ).first() if not membership_check: return api_response(success=False, message="Membership not found", status=404, error_type="NOT_FOUND") try: membership = network_access_service.deactivate_membership( membership_id=membership_id, reason="manual_revoke", deactivated_by_user_id=g.current_user.id, ) return api_response(data={"membership": membership.to_dict()}, message="Membership deactivated successfully") except MembershipNotFoundError as e: return api_response(success=False, message=str(e), status=404, error_type=e.error_type) @api_v1_bp.route("/organizations//memberships/activate-all", methods=["POST"]) @login_required @full_access_required def activate_all_memberships(org_id): """Bulk-activate all of the caller's approved inactive memberships in this org.""" org, err = _org_check(org_id) if err: return err try: schema = ActivateSchema() data = schema.load(request.json or {}) except ValidationError as e: return api_response(success=False, message="Validation failed", status=400, error_type="VALIDATION_ERROR", error_details=e.messages) sessions = network_access_service.activate_all_approved( user_id=g.current_user.id, organization_id=org_id, lifetime_minutes=data.get("lifetime_minutes"), ) return api_response( data={"sessions": [s.to_dict() for s in sessions], "count": len(sessions)}, message=f"{len(sessions)} memberships activated", ) @api_v1_bp.route("/organizations//devices//join-network/", methods=["POST"]) @login_required @full_access_required def join_network(org_id, device_id, portal_network_id): """Join an open network directly with a registered device.""" org, err = _org_check(org_id) if err: return err try: membership = network_access_service.join_network_for_device( user_id=g.current_user.id, organization_id=org_id, device_id=device_id, portal_network_id=portal_network_id, ) return api_response(data={"membership": membership.to_dict()}, message="Joined network successfully", status=201) except AppValidationError as e: return api_response(success=False, message=str(e.message), status=400, error_type=e.error_type) except DeviceNotFoundError as e: return api_response(success=False, message=str(e), status=404, error_type=e.error_type) @api_v1_bp.route("/organizations//memberships/", methods=["DELETE"]) @login_required @full_access_required def delete_membership(org_id, membership_id): """Soft-delete the current user's own membership.""" org, err = _org_check(org_id) if err: return err try: network_access_service.revoke_membership_soft( membership_id=membership_id, revoked_by_user_id=g.current_user.id, ) return api_response(message="Membership removed successfully") except MembershipNotFoundError as e: return api_response(success=False, message=str(e), status=404, error_type=e.error_type) # ── Sessions ───────────────────────────────────────────────────────────────── @api_v1_bp.route("/organizations//sessions", methods=["GET"]) @login_required @full_access_required def list_sessions(org_id): """List the current user's active activation sessions.""" org, err = _org_check(org_id) if err: return err sessions = ActivationSession.query.filter( ActivationSession.user_id == g.current_user.id, ActivationSession.organization_id == org_id, ActivationSession.ended_at.is_(None), ActivationSession.deleted_at.is_(None), ).all() return api_response( data={"sessions": [s.to_dict() for s in sessions], "count": len(sessions)}, message="Sessions retrieved successfully", ) @api_v1_bp.route("/organizations//sessions/", methods=["DELETE"]) @login_required @full_access_required def end_session(org_id, session_id): """End an active activation session.""" org, err = _org_check(org_id) if err: return err session = ActivationSession.query.filter( ActivationSession.id == session_id, ActivationSession.user_id == g.current_user.id, ActivationSession.deleted_at.is_(None), ).first() if not session: return api_response(success=False, message="Session not found", status=404, error_type="NOT_FOUND") if session.ended_at: return api_response(success=False, message="Session already ended", status=400, error_type="VALIDATION_ERROR") from gatehouse_app.services.network_access_service import _end_session from gatehouse_app.utils.constants import ActivationEndReason from datetime import datetime, timezone _end_session(session, ActivationEndReason.LOGOUT) membership = DeviceNetworkMembership.query.get(session.device_network_membership_id) if membership: from gatehouse_app.services.network_access_service import deactivate_membership deactivate_membership(membership.id, reason="logout") return api_response(message="Session ended successfully") # ── Kill Switch ─────────────────────────────────────────────────────────────── @api_v1_bp.route("/organizations//kill-switch", methods=["POST"]) @login_required @require_admin @full_access_required def trigger_kill_switch(org_id): """Trigger a kill switch on a user (admin only).""" org, err = _org_check(org_id) if err: return err try: schema = KillSwitchSchema() data = schema.load(request.json or {}) except ValidationError as e: return api_response(success=False, message="Validation failed", status=400, error_type="VALIDATION_ERROR", error_details=e.messages) try: event = network_access_service.kill_switch( target_user_id=data["target_user_id"], triggered_by_user_id=g.current_user.id, organization_id=org_id, scope=data.get("scope", "organization"), reason=data.get("reason"), network_ids=data.get("network_ids"), ) return api_response(data={"event": event.to_dict()}, message="Kill switch triggered successfully") except AppValidationError as e: return api_response(success=False, message=str(e.message), status=400, error_type=e.error_type) # ── Admin / ZeroTier Controller ─────────────────────────────────────────────── @api_v1_bp.route("/organizations//admin/memberships", methods=["GET"]) @login_required @require_admin @full_access_required def admin_list_memberships(org_id): """List all memberships across all users and networks (admin only).""" org, err = _org_check(org_id) if err: return err memberships = network_access_service.get_all_memberships_with_details(org_id) return api_response( data={"memberships": memberships, "count": len(memberships)}, message="All memberships retrieved successfully", ) @api_v1_bp.route("/organizations//admin/memberships/", methods=["DELETE"]) @login_required @require_admin @full_access_required def admin_delete_membership(org_id, membership_id): """Hard-delete a membership and remove it from ZeroTier (admin only).""" org, err = _org_check(org_id) if err: return err try: network_access_service.hard_delete_membership(membership_id) return api_response(message="Membership permanently deleted") except MembershipNotFoundError as e: return api_response(success=False, message=str(e), status=404, error_type=e.error_type) # ── ZeroTier Controller ─────────────────────────────────────────────────────── @api_v1_bp.route("/admin/zerotier/status", methods=["GET"]) @login_required @full_access_required def zerotier_status(): """Check ZeroTier controller connectivity and status. Requires ?org_id= — credentials are looked up from that org. Caller must be an admin/owner of that specific org. """ org_id = request.args.get("org_id") if not org_id: return api_response(success=False, message="org_id query parameter is required", status=400, error_type="VALIDATION_ERROR") if not _is_org_admin(org_id, g.current_user.id): return api_response(success=False, message="Admin or owner role required for this organization", status=403, error_type="AUTHORIZATION_ERROR") try: status = zt.get_status(organization_id=org_id) return api_response(data={"status": status}, message="ZeroTier controller is reachable") except ZeroTierAPIError as e: return api_response(success=False, message=str(e), status=502, error_type=e.error_type) @api_v1_bp.route("/admin/zerotier/networks", methods=["GET"]) @login_required @full_access_required def zerotier_list_networks(): """List networks from the ZeroTier controller. Requires ?org_id= — credentials are looked up from that org. Caller must be an admin/owner of that specific org. """ org_id = request.args.get("org_id") if not org_id: return api_response(success=False, message="org_id query parameter is required", status=400, error_type="VALIDATION_ERROR") if not _is_org_admin(org_id, g.current_user.id): return api_response(success=False, message="Admin or owner role required for this organization", status=403, error_type="AUTHORIZATION_ERROR") try: networks = zt.list_networks(organization_id=org_id) return api_response( data={"networks": [n.to_dict() if hasattr(n, 'to_dict') else {"id": getattr(n, "id", str(n))} for n in networks], "count": len(networks)}, message="Networks retrieved successfully", ) except ZeroTierAPIError as e: return api_response(success=False, message=str(e), status=502, error_type=e.error_type) @api_v1_bp.route("/admin/zerotier/networks/", methods=["GET"]) @login_required @full_access_required def zerotier_get_network(network_id): """Get a ZeroTier network from the controller. Requires ?org_id= — credentials are looked up from that org. Caller must be an admin/owner of that specific org. """ org_id = request.args.get("org_id") if not org_id: return api_response(success=False, message="org_id query parameter is required", status=400, error_type="VALIDATION_ERROR") if not _is_org_admin(org_id, g.current_user.id): return api_response(success=False, message="Admin or owner role required for this organization", status=403, error_type="AUTHORIZATION_ERROR") try: network = zt.get_network(network_id, organization_id=org_id) return api_response(data={"network": network.to_dict()}, message="Network retrieved successfully") except ZeroTierAPIError as e: return api_response(success=False, message=str(e), status=502, error_type=e.error_type) @api_v1_bp.route("/admin/zerotier/networks//members", methods=["GET"]) @login_required @full_access_required def zerotier_list_members(network_id): """List members on a ZeroTier network from the controller. Requires ?org_id= — credentials are looked up from that org. Caller must be an admin/owner of that specific org. """ org_id = request.args.get("org_id") if not org_id: return api_response(success=False, message="org_id query parameter is required", status=400, error_type="VALIDATION_ERROR") if not _is_org_admin(org_id, g.current_user.id): return api_response(success=False, message="Admin or owner role required for this organization", status=403, error_type="AUTHORIZATION_ERROR") try: members = zt.list_members(network_id, organization_id=org_id) return api_response( data={"members": [m.to_dict() for m in members], "count": len(members)}, message="Members retrieved successfully", ) except ZeroTierAPIError as e: return api_response(success=False, message=str(e), status=502, error_type=e.error_type) @api_v1_bp.route("/admin/zerotier/reconcile", methods=["POST"]) @login_required @full_access_required def trigger_reconciliation(): """Trigger full reconciliation across all networks (requires org admin in at least one org).""" from gatehouse_app.models.organization.organization_member import OrganizationMember is_any_admin = OrganizationMember.query.filter( OrganizationMember.user_id == g.current_user.id, OrganizationMember.role.in_([OrganizationRole.ADMIN, OrganizationRole.OWNER]), OrganizationMember.deleted_at.is_(None), ).first() is not None if not is_any_admin: return api_response(success=False, message="Admin or owner role required", status=403, error_type="AUTHORIZATION_ERROR") result = zerotier_reconciliation_service.reconcile_all() return api_response(data=result, message="Reconciliation complete") # ── Per-org ZeroTier configuration ─────────────────────────────────────────── class ZeroTierConfigSchema(Schema): zt_api_token = fields.Str(required=True, validate=validate.Length(min=1, max=512)) zt_api_url = fields.Str(required=True, validate=validate.Length(min=1, max=512)) zt_api_mode = fields.Str( required=True, validate=validate.OneOf(["central", "controller"]), ) @api_v1_bp.route("/organizations//zerotier-config", methods=["GET"]) @login_required @require_admin @full_access_required def get_zerotier_config(org_id): """Return the current ZeroTier configuration for an organization (admin only). The token is masked — only its presence is indicated, not the value. """ org, err = _org_check(org_id) if err: return err return api_response( data={ "zerotier_config": { "zt_api_token_set": bool(org.zt_api_token), "zt_api_url": org.zt_api_url, "zt_api_mode": org.zt_api_mode, } }, message="ZeroTier configuration retrieved successfully", ) @api_v1_bp.route("/organizations//zerotier-config", methods=["PUT"]) @login_required @require_admin @full_access_required def set_zerotier_config(org_id): """Set (or replace) the ZeroTier credentials for an organization (admin only). All three fields are required — there are no server-level defaults. Body: zt_api_token (required) – API token for ZeroTier Central or authtoken.secret zt_api_url (required) – full base URL, e.g. http://host:9993 or https://api.zerotier.com/api/v1 zt_api_mode (required) – "central" | "controller" """ org, err = _org_check(org_id) if err: return err try: schema = ZeroTierConfigSchema() data = schema.load(request.json or {}) except ValidationError as e: return api_response( success=False, message="Validation failed", status=400, error_type="VALIDATION_ERROR", error_details=e.messages, ) # Test connectivity BEFORE saving — reject bad credentials early connectivity_ok = False connectivity_error = None # Temporarily set the credentials so _get_client() can build a client old_token, old_url, old_mode = org.zt_api_token, org.zt_api_url, org.zt_api_mode org.zt_api_token = data["zt_api_token"] org.zt_api_url = data["zt_api_url"] org.zt_api_mode = data["zt_api_mode"] db.session.flush() # make visible to _get_client query without committing try: zt.get_status(organization_id=org_id) connectivity_ok = True except ZeroTierAPIError as exc: connectivity_error = str(exc) except Exception as exc: connectivity_error = str(exc) if not connectivity_ok: # Roll back — don't persist bad credentials org.zt_api_token = old_token org.zt_api_url = old_url org.zt_api_mode = old_mode db.session.commit() return api_response( success=False, message="Controller Connectivity test failed", status=400, error_type="ZEROTIER_CONNECTIVITY_FAILED", error_details={ "connectivity_test": { "ok": False, "error": connectivity_error, }, }, ) # Connectivity verified — commit the new credentials org.save() from gatehouse_app.services.audit_service import AuditService AuditService.log_action( action="org.zerotier_config.updated", user_id=g.current_user.id, organization_id=org_id, resource_type="organization", resource_id=org_id, metadata={ "zt_api_url": org.zt_api_url, "zt_api_mode": org.zt_api_mode, "connectivity_ok": connectivity_ok, }, description="Organization ZeroTier config updated", success=True, ) return api_response( data={ "zerotier_config": { "zt_api_token_set": True, "zt_api_url": org.zt_api_url, "zt_api_mode": org.zt_api_mode, }, "connectivity_test": { "ok": True, "error": None, }, }, message="ZeroTier configuration saved successfully", ) @api_v1_bp.route("/organizations//zerotier-config", methods=["DELETE"]) @login_required @require_admin @full_access_required def delete_zerotier_config(org_id): """Remove the org-level ZeroTier credentials (admin only). After removal, all ZeroTier operations for this organization will fail until new credentials are configured via the ZeroTier Config page. """ org, err = _org_check(org_id) if err: return err org.zt_api_token = None org.zt_api_url = None org.zt_api_mode = None org.save() from gatehouse_app.services.audit_service import AuditService AuditService.log_action( action="org.zerotier_config.deleted", user_id=g.current_user.id, organization_id=org_id, resource_type="organization", resource_id=org_id, metadata={}, description="Organization ZeroTier config removed — ZeroTier operations disabled until reconfigured", success=True, ) return api_response(message="ZeroTier configuration removed. Configure new credentials to re-enable ZeroTier features.")