diff --git a/gatehouse_app/api/v1/zerotier.py b/gatehouse_app/api/v1/zerotier.py index ebd8776..17c2aeb 100644 --- a/gatehouse_app/api/v1/zerotier.py +++ b/gatehouse_app/api/v1/zerotier.py @@ -1,8 +1,11 @@ """ZeroTier network governance API endpoints.""" +from datetime import datetime, timezone + from flask import g, request from marshmallow import Schema, fields, validate, ValidationError from sqlalchemy.exc import IntegrityError +from sqlalchemy.orm import joinedload from gatehouse_app.api.v1 import api_v1_bp from gatehouse_app.extensions import db @@ -808,6 +811,62 @@ def delete_membership(org_id, membership_id): return api_response(success=False, message=str(e), status=404, error_type=e.error_type) +# ── Session helpers ────────────────────────────────────────────────────────── + + +def _session_to_dict(session, include_user=False): + """Build a rich session dict with device, network, and timing details.""" + now = datetime.now(timezone.utc) + exp = session.expires_at + if exp.tzinfo is None: + exp = exp.replace(tzinfo=timezone.utc) + remaining = (exp - now).total_seconds() if exp > now else 0 + duration = (session.expires_at - session.authenticated_at).total_seconds() + + auth_at = session.authenticated_at + if auth_at.tzinfo is None: + auth_at = auth_at.replace(tzinfo=timezone.utc) + exp_at = session.expires_at + if exp_at.tzinfo is None: + exp_at = exp_at.replace(tzinfo=timezone.utc) + + d = { + "id": session.id, + "authenticated_at": auth_at.isoformat(), + "expires_at": exp_at.isoformat(), + "duration_seconds": int(duration), + "remaining_seconds": max(0, int(remaining)), + "is_active": session.is_active, + "is_expired": session.is_expired, + "ended_at": session.ended_at.isoformat() if session.ended_at else None, + "end_reason": session.end_reason.value if session.end_reason else None, + } + + if session.access_request: + if session.access_request.device: + dev = session.access_request.device + d["device"] = { + "id": dev.id, + "node_id": dev.node_id, + "name": dev.display_name, + } + if session.access_request.portal_network: + net = session.access_request.portal_network + d["network"] = { + "id": net.id, + "name": net.name, + } + + if include_user: + d["user"] = { + "id": session.user.id, + "full_name": session.user.full_name, + "email": session.user.email, + } + + return d + + # ── Sessions ───────────────────────────────────────────────────────────────── @@ -820,15 +879,27 @@ def list_sessions(org_id): if err: return err - sessions = ActivationSession.query.filter( - ActivationSession.user_id == g.current_user.id, - ActivationSession.organization_id == org_id, - ActivationSession.ended_at.is_(None), - ActivationSession.deleted_at.is_(None), - ).all() + sessions = ( + ActivationSession.query.options( + joinedload(ActivationSession.access_request) + .joinedload(NetworkAccessRequest.device), + joinedload(ActivationSession.access_request) + .joinedload(NetworkAccessRequest.portal_network), + ) + .filter( + ActivationSession.user_id == g.current_user.id, + ActivationSession.organization_id == org_id, + ActivationSession.ended_at.is_(None), + ActivationSession.deleted_at.is_(None), + ) + .all() + ) return api_response( - data={"sessions": [s.to_dict() for s in sessions], "count": len(sessions)}, + data={ + "sessions": [_session_to_dict(s) for s in sessions], + "count": len(sessions), + }, message="Sessions retrieved successfully", ) @@ -856,7 +927,6 @@ def end_session(org_id, session_id): from gatehouse_app.services.network_access_service import _end_session from gatehouse_app.utils.constants import ActivationEndReason - from datetime import datetime, timezone _end_session(session, ActivationEndReason.LOGOUT) @@ -866,6 +936,41 @@ def end_session(org_id, session_id): return api_response(message="Session ended successfully") +@api_v1_bp.route("/organizations//admin/sessions", methods=["GET"]) +@login_required +@require_admin +@full_access_required +def admin_list_sessions(org_id): + """List all active activation sessions across all users (admin only).""" + org, err = _org_check(org_id) + if err: + return err + + sessions = ( + ActivationSession.query.options( + joinedload(ActivationSession.user), + joinedload(ActivationSession.access_request) + .joinedload(NetworkAccessRequest.device), + joinedload(ActivationSession.access_request) + .joinedload(NetworkAccessRequest.portal_network), + ) + .filter( + ActivationSession.organization_id == org_id, + ActivationSession.ended_at.is_(None), + ActivationSession.deleted_at.is_(None), + ) + .all() + ) + + return api_response( + data={ + "sessions": [_session_to_dict(s, include_user=True) for s in sessions], + "count": len(sessions), + }, + message="Admin sessions retrieved successfully", + ) + + # ── Kill Switch ─────────────────────────────────────────────────────────────── diff --git a/tests/integration/test_zerotier.py b/tests/integration/test_zerotier.py index 0d780b3..9fe5cba 100644 --- a/tests/integration/test_zerotier.py +++ b/tests/integration/test_zerotier.py @@ -3,11 +3,13 @@ Covers network CRUD, device registration, access requests, approvals, and membership activation. External ZeroTier API calls are mocked. """ +from datetime import datetime, timedelta, timezone + import pytest from unittest.mock import patch, MagicMock from tests.integration.client.base import ApiError -from gatehouse_app.utils.constants import OrganizationRole +from gatehouse_app.utils.constants import OrganizationRole, ActivationEndReason def assert_success(response: dict, message_contains: str = "") -> dict: @@ -1080,3 +1082,219 @@ class TestAdminForceDeleteMembership: f"/organizations/{org['id']}/admin/memberships/{uuid.uuid4()}", ) assert exc_info.value.status_code == 403 + + +class TestSessions: + """Test user and admin session listing endpoints.""" + + def test_list_sessions_positive( + self, integration_client, create_test_user, create_test_org, + create_test_membership, integration_app, + ): + """ZT-SESS-01: User lists their own active sessions. + + WHAT: GET /organizations//sessions for a user with an active session. + EXPECTED: 200 OK with session containing device, network, and timing fields. + """ + from gatehouse_app.models.zerotier.device import Device + from gatehouse_app.models.zerotier.portal_network import PortalNetwork + from gatehouse_app.models.zerotier.network_access_request import NetworkAccessRequest + from gatehouse_app.models.zerotier.activation_session import ActivationSession + from gatehouse_app.extensions import db as _db + from gatehouse_app.utils.constants import ApprovalState, ApprovalGrantType, NetworkRequestMode + + user = create_test_user(password="Pass1234!") + org = create_test_org() + create_test_membership(user["id"], org["id"], OrganizationRole.MEMBER) + + now = datetime.now(timezone.utc) + + with integration_app.app_context(): + network = PortalNetwork( + organization_id=org["id"], name="Test Net", + owner_user_id=user["id"], + zerotier_network_id="aabbccddee11", + request_mode=NetworkRequestMode.OPEN, + ) + _db.session.add(network) + _db.session.flush() + + device = Device( + user_id=user["id"], organization_id=org["id"], + node_id="deadbeef01", device_nickname="My Laptop", + hostname="my-laptop", + ) + _db.session.add(device) + _db.session.flush() + + req = NetworkAccessRequest( + organization_id=org["id"], user_id=user["id"], + device_id=device.id, portal_network_id=network.id, + status=ApprovalState.APPROVED, active=True, + grant_type=ApprovalGrantType.REQUESTED, + ) + _db.session.add(req) + _db.session.flush() + + session = ActivationSession( + organization_id=org["id"], user_id=user["id"], + network_access_request_id=req.id, + authenticated_at=now - timedelta(hours=1), + expires_at=now + timedelta(hours=7), + created_by=user["id"], + ) + _db.session.add(session) + _db.session.commit() + + saved_session_id = session.id + + integration_client.auth.login(email=user["email"], password="Pass1234!") + result = integration_client.get(f"/organizations/{org['id']}/sessions") + data = result.get("data", {}) + + assert data["count"] == 1 + s = data["sessions"][0] + assert s["id"] == saved_session_id + assert s["is_active"] is True + assert s["is_expired"] is False + assert s["duration_seconds"] == 28800 + assert s["remaining_seconds"] > 0 + + assert s["device"]["node_id"] == "deadbeef01" + assert s["device"]["name"] == "My Laptop" + + assert s["network"]["name"] == "Test Net" + + assert "user" not in s + + def test_list_sessions_empty( + self, integration_client, create_test_user, create_test_org, + create_test_membership, + ): + """ZT-SESS-02: User with no sessions gets empty array.""" + user = create_test_user(password="Pass1234!") + org = create_test_org() + create_test_membership(user["id"], org["id"], OrganizationRole.MEMBER) + + integration_client.auth.login(email=user["email"], password="Pass1234!") + result = integration_client.get(f"/organizations/{org['id']}/sessions") + data = result.get("data", {}) + assert data["count"] == 0 + assert data["sessions"] == [] + + def test_list_sessions_unauth_negative( + self, integration_client, create_test_org, + ): + """ZT-SESS-03: Unauthenticated user gets 401.""" + org = create_test_org() + with pytest.raises(ApiError) as exc_info: + integration_client.get(f"/organizations/{org['id']}/sessions") + assert exc_info.value.status_code == 401 + + def test_admin_list_sessions_positive( + self, integration_client, create_test_user, create_test_org, + create_test_membership, integration_app, + ): + """ZT-SESS-04: Admin lists all sessions across users. + + WHAT: GET /organizations//admin/sessions. + EXPECTED: 200 OK with all users' sessions including user details. + """ + from gatehouse_app.models.zerotier.device import Device + from gatehouse_app.models.zerotier.portal_network import PortalNetwork + from gatehouse_app.models.zerotier.network_access_request import NetworkAccessRequest + from gatehouse_app.models.zerotier.activation_session import ActivationSession + from gatehouse_app.extensions import db as _db + from gatehouse_app.utils.constants import ApprovalState, ApprovalGrantType, NetworkRequestMode + + admin = create_test_user(password="AdminPass123!") + member1 = create_test_user(password="Mem1Pass123!", full_name="Alice") + member2 = create_test_user(password="Mem2Pass123!", full_name="Bob") + org = create_test_org() + + create_test_membership(admin["id"], org["id"], OrganizationRole.ADMIN) + create_test_membership(member1["id"], org["id"], OrganizationRole.MEMBER) + create_test_membership(member2["id"], org["id"], OrganizationRole.MEMBER) + + now = datetime.now(timezone.utc) + + with integration_app.app_context(): + network = PortalNetwork( + organization_id=org["id"], name="Shared Net", + owner_user_id=admin["id"], + zerotier_network_id="ffgg00112233", + request_mode=NetworkRequestMode.OPEN, + ) + _db.session.add(network) + _db.session.flush() + + for member, node_id, nick, full_name in [ + (member1, "aaaa01", "Alice Mac", None), + (member2, "bbbb02", None, "bob-pc"), + ]: + device = Device( + user_id=member["id"], organization_id=org["id"], + node_id=node_id, device_nickname=nick, + hostname="host", + ) + _db.session.add(device) + _db.session.flush() + + req = NetworkAccessRequest( + organization_id=org["id"], user_id=member["id"], + device_id=device.id, portal_network_id=network.id, + status=ApprovalState.APPROVED, active=True, + grant_type=ApprovalGrantType.REQUESTED, + ) + _db.session.add(req) + _db.session.flush() + + session = ActivationSession( + organization_id=org["id"], user_id=member["id"], + network_access_request_id=req.id, + authenticated_at=now - timedelta(hours=2), + expires_at=now + timedelta(hours=6), + created_by=member["id"], + ) + _db.session.add(session) + _db.session.commit() + + integration_client.auth.login(email=admin["email"], password="AdminPass123!") + result = integration_client.get(f"/organizations/{org['id']}/admin/sessions") + data = result.get("data", {}) + + assert data["count"] == 2 + + user_names = {s["user"]["full_name"] for s in data["sessions"]} + assert "Alice" in user_names + assert "Bob" in user_names + + for s in data["sessions"]: + assert s["is_active"] is True + assert "device" in s + assert "node_id" in s["device"] + assert "network" in s + assert s["network"]["name"] == "Shared Net" + + def test_admin_list_sessions_non_admin_negative( + self, integration_client, create_test_user, create_test_org, + create_test_membership, + ): + """ZT-SESS-05: Non-admin gets 403 on admin sessions endpoint.""" + member = create_test_user(password="Pass1234!") + org = create_test_org() + create_test_membership(member["id"], org["id"], OrganizationRole.MEMBER) + + integration_client.auth.login(email=member["email"], password="Pass1234!") + with pytest.raises(ApiError) as exc_info: + integration_client.get(f"/organizations/{org['id']}/admin/sessions") + assert exc_info.value.status_code == 403 + + def test_admin_list_sessions_unauth_negative( + self, integration_client, create_test_org, + ): + """ZT-SESS-06: Unauthenticated user gets 401 on admin endpoint.""" + org = create_test_org() + with pytest.raises(ApiError) as exc_info: + integration_client.get(f"/organizations/{org['id']}/admin/sessions") + assert exc_info.value.status_code == 401