1413 lines
58 KiB
Python
1413 lines
58 KiB
Python
"""ZeroTier network access integration tests.
|
|
|
|
Covers network CRUD, device registration, access requests, approvals,
|
|
and membership activation. External ZeroTier API calls are mocked.
|
|
"""
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import pytest
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
from tests.integration.client.base import ApiError
|
|
from gatehouse_app.utils.constants import OrganizationRole, ActivationEndReason
|
|
|
|
|
|
def assert_success(response: dict, message_contains: str = "") -> dict:
|
|
data = response.get("data", {})
|
|
assert response.get("success") is not False, (
|
|
f"Expected success but got error: {response.get('message')}"
|
|
)
|
|
if message_contains:
|
|
assert message_contains.lower() in response.get("message", "").lower()
|
|
return data
|
|
|
|
|
|
class TestZeroTierNetworkCRUD:
|
|
"""Test ZeroTier network lifecycle."""
|
|
|
|
@patch("gatehouse_app.services.portal_network_service.create_network")
|
|
def test_create_network_positive(self, mock_create_network, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ZT-01 — Create ZeroTier network.
|
|
|
|
WHAT: Admin POST /organizations/<id>/networks with mocked ZT API.
|
|
WHY: Networks are the top-level ZeroTier resource.
|
|
EXPECTED: 201 Created.
|
|
"""
|
|
from gatehouse_app.models.zerotier.portal_network import PortalNetwork
|
|
mock_network = MagicMock()
|
|
mock_network.to_dict.return_value = {"id": "net-123", "name": "Test Network"}
|
|
mock_create_network.return_value = mock_network
|
|
|
|
admin = create_test_user(password="AdminPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.ADMIN)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.post(
|
|
f"/organizations/{org['id']}/networks",
|
|
data={
|
|
"name": "Test Network",
|
|
"zerotier_network_id": "a84ac5c10a6e4c7e",
|
|
"environment": "development",
|
|
},
|
|
)
|
|
assert_success(result)
|
|
|
|
def test_list_networks_positive(self, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ZT-02 — List networks.
|
|
|
|
WHAT: GET /organizations/<id>/networks.
|
|
WHY: Network overview page uses this endpoint.
|
|
EXPECTED: 200 OK with networks array.
|
|
"""
|
|
user = create_test_user(password="MyPassword123!")
|
|
org = create_test_org()
|
|
create_test_membership(user["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=user["email"], password="MyPassword123!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/networks")
|
|
assert_success(result)
|
|
|
|
def test_create_network_non_admin_negative(self, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ZT-03 — Reject network creation as member.
|
|
|
|
WHAT: Member attempts POST /organizations/<id>/networks.
|
|
WHY: Network management is admin-only.
|
|
EXPECTED: 403 Forbidden.
|
|
"""
|
|
member = create_test_user(password="MemberPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(member["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=member["email"], password="MemberPass123!")
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.post(
|
|
f"/organizations/{org['id']}/networks",
|
|
data={"name": "Hacked", "zerotier_network_id": "a84ac5c10a6e4c7e"},
|
|
)
|
|
assert exc_info.value.status_code == 403
|
|
|
|
|
|
class TestZeroTierDeviceManagement:
|
|
"""Test device registration and management."""
|
|
|
|
def test_register_device_positive(self, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ZT-04 — Register a device.
|
|
|
|
WHAT: POST /organizations/<id>/devices.
|
|
WHY: Devices must be registered before network access.
|
|
EXPECTED: 201 Created.
|
|
"""
|
|
user = create_test_user(password="MyPassword123!")
|
|
org = create_test_org()
|
|
create_test_membership(user["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=user["email"], password="MyPassword123!")
|
|
result = integration_client.post(
|
|
f"/organizations/{org['id']}/devices",
|
|
data={
|
|
"node_id": "1234567890",
|
|
"nickname": "Test Device",
|
|
"hostname": "test-device",
|
|
},
|
|
)
|
|
# May succeed or fail depending on ZT config; accept both for now
|
|
assert result.get("success") is not False or result.get("code") in (201, 400, 500)
|
|
|
|
def test_list_devices_positive(self, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ZT-05 — List devices.
|
|
|
|
WHAT: GET /organizations/<id>/devices.
|
|
WHY: Device management page uses this endpoint.
|
|
EXPECTED: 200 OK.
|
|
"""
|
|
user = create_test_user(password="MyPassword123!")
|
|
org = create_test_org()
|
|
create_test_membership(user["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=user["email"], password="MyPassword123!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/devices")
|
|
assert_success(result)
|
|
|
|
|
|
class TestZeroTierApprovals:
|
|
"""Test approval flows."""
|
|
|
|
def test_list_pending_approvals_positive(self, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ZT-06 — List pending approvals as admin.
|
|
|
|
WHAT: GET /organizations/<id>/approvals/pending.
|
|
WHY: Admins review pending access requests.
|
|
EXPECTED: 200 OK.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.ADMIN)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/approvals/pending")
|
|
assert_success(result)
|
|
|
|
def test_list_approvals_positive(self, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ZT-07 — List all approvals.
|
|
|
|
WHAT: GET /organizations/<id>/approvals.
|
|
WHY: Approval history page uses this endpoint.
|
|
EXPECTED: 200 OK.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.ADMIN)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/approvals")
|
|
assert_success(result)
|
|
|
|
|
|
class TestZeroTierMembership:
|
|
"""Test membership activation and deactivation."""
|
|
|
|
def test_get_memberships_positive(self, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ZT-08 — Get ZeroTier memberships.
|
|
|
|
WHAT: GET /organizations/<id>/memberships.
|
|
WHY: Users see their active network memberships.
|
|
EXPECTED: 200 OK.
|
|
"""
|
|
user = create_test_user(password="MyPassword123!")
|
|
org = create_test_org()
|
|
create_test_membership(user["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=user["email"], password="MyPassword123!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/memberships")
|
|
assert_success(result)
|
|
|
|
def test_kill_switch_positive(self, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ZT-09 — Trigger kill switch.
|
|
|
|
WHAT: POST /organizations/<id>/kill-switch.
|
|
WHY: Emergency access revocation.
|
|
EXPECTED: 200 OK or error if no memberships exist.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.ADMIN)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
try:
|
|
result = integration_client.post(
|
|
f"/organizations/{org['id']}/kill-switch",
|
|
data={"target_user_id": admin["id"], "reason": "Test kill switch"},
|
|
)
|
|
assert_success(result)
|
|
except ApiError as exc:
|
|
# Accept errors when no active memberships to kill
|
|
assert exc.status_code in (400, 500)
|
|
|
|
@patch("gatehouse_app.services.network_access_service._end_active_session")
|
|
@patch("gatehouse_app.services.network_access_service.zt.deauthorize_member")
|
|
def test_network_kill_switch_positive(
|
|
self, mock_deauth, mock_end_session,
|
|
integration_client, create_test_user, create_test_org,
|
|
create_test_membership, integration_app,
|
|
):
|
|
"""TEST: ZT-10 — Trigger network kill switch.
|
|
|
|
WHAT: POST /organizations/<id>/networks/<id>/kill-switch.
|
|
WHY: Admin needs to kill all device access on a network.
|
|
EXPECTED: 200 OK, all active memberships on the network deactivated.
|
|
"""
|
|
from gatehouse_app.models.zerotier.network_access_request import NetworkAccessRequest
|
|
from gatehouse_app.models.zerotier.portal_network import PortalNetwork
|
|
from gatehouse_app.models.zerotier.device import Device
|
|
from gatehouse_app.extensions import db as _db
|
|
from gatehouse_app.utils.constants import ApprovalState
|
|
|
|
admin = create_test_user(password="AdminPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.ADMIN)
|
|
|
|
with integration_app.app_context():
|
|
network = PortalNetwork(
|
|
organization_id=org["id"],
|
|
name="Kill Test Net",
|
|
zerotier_network_id="zt_kill_test",
|
|
request_mode="open",
|
|
owner_user_id=admin["id"],
|
|
)
|
|
_db.session.add(network)
|
|
_db.session.flush()
|
|
network_id = network.id
|
|
|
|
device = Device(
|
|
user_id=admin["id"],
|
|
organization_id=org["id"],
|
|
node_id="9999999999",
|
|
device_nickname="Kill Test Device",
|
|
)
|
|
_db.session.add(device)
|
|
_db.session.flush()
|
|
device_id = device.id
|
|
|
|
req = NetworkAccessRequest(
|
|
organization_id=org["id"],
|
|
user_id=admin["id"],
|
|
device_id=device_id,
|
|
portal_network_id=network_id,
|
|
status=ApprovalState.APPROVED,
|
|
active=True,
|
|
)
|
|
_db.session.add(req)
|
|
_db.session.flush()
|
|
req_id = req.id
|
|
_db.session.commit()
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.post(
|
|
f"/organizations/{org['id']}/networks/{network_id}/kill-switch",
|
|
data={},
|
|
)
|
|
assert_success(result, "triggered")
|
|
|
|
with integration_app.app_context():
|
|
updated = NetworkAccessRequest.query.get(req_id)
|
|
assert updated.active is False
|
|
assert updated.status == ApprovalState.SUSPENDED
|
|
|
|
def test_network_kill_switch_non_admin_negative(
|
|
self, integration_client, create_test_user, create_test_org,
|
|
create_test_membership, integration_app,
|
|
):
|
|
"""TEST: ZT-11 — Non-admin cannot trigger network kill switch."""
|
|
from gatehouse_app.models.zerotier.portal_network import PortalNetwork
|
|
from gatehouse_app.extensions import db as _db
|
|
|
|
member = create_test_user(password="Pass1234!")
|
|
org = create_test_org()
|
|
create_test_membership(member["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
with integration_app.app_context():
|
|
network = PortalNetwork(
|
|
organization_id=org["id"],
|
|
name="Locked Net",
|
|
zerotier_network_id="zt_locked",
|
|
request_mode="open",
|
|
owner_user_id=member["id"],
|
|
)
|
|
_db.session.add(network)
|
|
_db.session.commit()
|
|
network_id = network.id
|
|
|
|
integration_client.auth.login(email=member["email"], password="Pass1234!")
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.post(
|
|
f"/organizations/{org['id']}/networks/{network_id}/kill-switch",
|
|
)
|
|
assert exc_info.value.status_code == 403
|
|
|
|
def test_network_kill_switch_unauth_negative(
|
|
self, integration_client, create_test_org,
|
|
):
|
|
"""TEST: ZT-12 — Unauthenticated user cannot trigger network kill switch."""
|
|
org = create_test_org()
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.post(
|
|
f"/organizations/{org['id']}/networks/does-not-matter/kill-switch",
|
|
)
|
|
assert exc_info.value.status_code == 401
|
|
|
|
|
|
class TestZeroTierJoinNetwork:
|
|
"""Test joining a network with a registered device."""
|
|
|
|
@patch("gatehouse_app.services.network_access_service._ensure_zerotier_member")
|
|
def test_rejoin_after_deactivation_positive(
|
|
self,
|
|
mock_ensure_member,
|
|
integration_client,
|
|
create_test_user,
|
|
create_test_org,
|
|
create_test_membership,
|
|
integration_app,
|
|
):
|
|
"""TEST: ZT-15 — Re-join network after deactivation.
|
|
|
|
WHAT: User with deactivated membership (APPROVED + active=False)
|
|
POSTs to join-network for the same device and network.
|
|
WHY: Deactivated memberships should not block re-join attempts.
|
|
EXPECTED: 201 Created (re-opens existing request), not 409 Conflict.
|
|
"""
|
|
from gatehouse_app.models.zerotier.device import Device
|
|
from gatehouse_app.models.zerotier.portal_network import PortalNetwork
|
|
from gatehouse_app.models.zerotier.network_access_request import NetworkAccessRequest
|
|
from gatehouse_app.extensions import db as _db
|
|
from gatehouse_app.utils.constants import ApprovalState, NetworkRequestMode
|
|
|
|
user = create_test_user(password="MyPassword123!")
|
|
org = create_test_org()
|
|
create_test_membership(user["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
with integration_app.app_context():
|
|
device = Device(
|
|
user_id=user["id"],
|
|
organization_id=org["id"],
|
|
node_id="1234567890",
|
|
device_nickname="Test Device",
|
|
hostname="test-device",
|
|
)
|
|
_db.session.add(device)
|
|
_db.session.flush()
|
|
device_id = device.id
|
|
|
|
portal_network = PortalNetwork(
|
|
organization_id=org["id"],
|
|
name="Test Network",
|
|
owner_user_id=user["id"],
|
|
zerotier_network_id="a84ac5c10a6e4c7e",
|
|
request_mode=NetworkRequestMode.OPEN,
|
|
)
|
|
_db.session.add(portal_network)
|
|
_db.session.flush()
|
|
portal_network_id = portal_network.id
|
|
|
|
deactivated_request = NetworkAccessRequest(
|
|
organization_id=org["id"],
|
|
user_id=user["id"],
|
|
device_id=device_id,
|
|
portal_network_id=portal_network_id,
|
|
status=ApprovalState.APPROVED,
|
|
active=False,
|
|
)
|
|
_db.session.add(deactivated_request)
|
|
_db.session.commit()
|
|
|
|
integration_client.auth.login(email=user["email"], password="MyPassword123!")
|
|
result = integration_client.post(
|
|
f"/organizations/{org['id']}/devices/{device_id}/join-network/{portal_network_id}",
|
|
)
|
|
data = assert_success(result, "joined network successfully")
|
|
assert result.get("code") == 201
|
|
assert "membership" in data
|
|
mock_ensure_member.assert_called_once()
|
|
|
|
|
|
class TestAdminUserDevices:
|
|
"""Test admin endpoint to list devices for a specific user."""
|
|
|
|
def test_list_user_devices_positive(
|
|
self, integration_client, create_test_user, create_test_org, create_test_membership, integration_app
|
|
):
|
|
"""TEST: ZT-10 — Admin lists devices for a user with devices.
|
|
|
|
WHAT: Admin GET /organizations/<id>/users/<user_id>/devices.
|
|
WHY: Admins need to see what devices a user has registered.
|
|
EXPECTED: 200 OK with devices array.
|
|
"""
|
|
from gatehouse_app.models.zerotier.device import Device
|
|
|
|
admin = create_test_user(password="AdminPass123!")
|
|
member = create_test_user(password="MemberPass123!")
|
|
org = create_test_org()
|
|
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.ADMIN)
|
|
create_test_membership(member["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
# Create test devices for the member
|
|
from gatehouse_app.extensions import db as _db
|
|
with integration_app.app_context():
|
|
device1 = Device(
|
|
user_id=member["id"],
|
|
organization_id=org["id"],
|
|
node_id="1234567890",
|
|
device_nickname="Member Laptop",
|
|
hostname="member-laptop",
|
|
)
|
|
device2 = Device(
|
|
user_id=member["id"],
|
|
organization_id=org["id"],
|
|
node_id="0987654321",
|
|
device_nickname="Member Phone",
|
|
hostname="member-phone",
|
|
)
|
|
_db.session.add_all([device1, device2])
|
|
_db.session.commit()
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/users/{member['id']}/devices")
|
|
data = assert_success(result, "devices retrieved")
|
|
|
|
assert "devices" in data
|
|
assert data["count"] == 2
|
|
assert data["user_id"] == member["id"]
|
|
assert data["organization_id"] == org["id"]
|
|
device_node_ids = [d["node_id"] for d in data["devices"]]
|
|
assert "1234567890" in device_node_ids
|
|
assert "0987654321" in device_node_ids
|
|
|
|
def test_list_user_devices_no_devices(
|
|
self, integration_client, create_test_user, create_test_org, create_test_membership
|
|
):
|
|
"""TEST: ZT-11 — Admin lists devices for a user with no devices.
|
|
|
|
WHAT: Admin GET /organizations/<id>/users/<user_id>/devices for user with no devices.
|
|
WHY: Endpoint should return empty list, not error.
|
|
EXPECTED: 200 OK with empty devices array.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
member = create_test_user(password="MemberPass123!")
|
|
org = create_test_org()
|
|
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.ADMIN)
|
|
create_test_membership(member["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/users/{member['id']}/devices")
|
|
data = assert_success(result)
|
|
|
|
assert data["count"] == 0
|
|
assert data["devices"] == []
|
|
|
|
def test_list_user_devices_non_admin_negative(
|
|
self, integration_client, create_test_user, create_test_org, create_test_membership
|
|
):
|
|
"""TEST: ZT-12 — Non-admin cannot list another user's devices.
|
|
|
|
WHAT: Member attempts GET /organizations/<id>/users/<user_id>/devices.
|
|
WHY: This endpoint is admin-only.
|
|
EXPECTED: 403 Forbidden.
|
|
"""
|
|
member1 = create_test_user(password="Member1Pass123!")
|
|
member2 = create_test_user(password="Member2Pass123!")
|
|
org = create_test_org()
|
|
|
|
create_test_membership(member1["id"], org["id"], OrganizationRole.MEMBER)
|
|
create_test_membership(member2["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=member1["email"], password="Member1Pass123!")
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.get(f"/organizations/{org['id']}/users/{member2['id']}/devices")
|
|
assert exc_info.value.status_code == 403
|
|
|
|
def test_list_user_devices_user_not_in_org_negative(
|
|
self, integration_client, create_test_user, create_test_org, create_test_membership
|
|
):
|
|
"""TEST: ZT-13 — Cannot list devices for user not in organization.
|
|
|
|
WHAT: Admin GET /organizations/<id>/users/<user_id>/devices for user not in org.
|
|
WHY: User must be a member of the organization.
|
|
EXPECTED: 404 Not Found.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
outside_user = create_test_user(password="OutsidePass123!")
|
|
org = create_test_org()
|
|
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.ADMIN)
|
|
# outside_user is NOT added to the org
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.get(f"/organizations/{org['id']}/users/{outside_user['id']}/devices")
|
|
assert exc_info.value.status_code == 404
|
|
|
|
def test_list_user_devices_user_not_found_negative(
|
|
self, integration_client, create_test_user, create_test_org, create_test_membership
|
|
):
|
|
"""TEST: ZT-14 — Cannot list devices for non-existent user.
|
|
|
|
WHAT: Admin GET /organizations/<id>/users/<non_existent_id>/devices.
|
|
WHY: User must exist.
|
|
EXPECTED: 404 Not Found.
|
|
"""
|
|
import uuid
|
|
|
|
admin = create_test_user(password="AdminPass123!")
|
|
org = create_test_org()
|
|
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.ADMIN)
|
|
|
|
non_existent_id = str(uuid.uuid4())
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.get(f"/organizations/{org['id']}/users/{non_existent_id}/devices")
|
|
assert exc_info.value.status_code == 404
|
|
|
|
|
|
class TestAdminForceDeleteMembership:
|
|
"""Test admin force-delete of network access requests."""
|
|
|
|
@patch("gatehouse_app.services.zerotier_api_service.deauthorize_member")
|
|
@patch("gatehouse_app.services.zerotier_api_service.delete_network_member")
|
|
def test_force_delete_active_membership_positive(
|
|
self,
|
|
mock_delete_member,
|
|
mock_deauthorize_member,
|
|
integration_client,
|
|
create_test_user,
|
|
create_test_org,
|
|
create_test_membership,
|
|
integration_app,
|
|
):
|
|
"""TEST: ZT-16 — Admin force-deletes an active membership.
|
|
|
|
WHAT: Admin calls DELETE admin/memberships/<id> on an active,
|
|
non-soft-deleted request. Should deactivate, remove from ZT,
|
|
and hard-delete the DB record in one step.
|
|
EXPECTED: 200 OK, request no longer exists in DB.
|
|
"""
|
|
from gatehouse_app.models.zerotier.device import Device
|
|
from gatehouse_app.models.zerotier.portal_network import PortalNetwork
|
|
from gatehouse_app.models.zerotier.network_access_request import NetworkAccessRequest
|
|
from gatehouse_app.extensions import db as _db
|
|
from gatehouse_app.utils.constants import ApprovalState, NetworkRequestMode
|
|
|
|
admin = create_test_user(password="AdminPass123!")
|
|
user = create_test_user(password="UserPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.ADMIN)
|
|
create_test_membership(user["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
with integration_app.app_context():
|
|
device = Device(
|
|
user_id=user["id"],
|
|
organization_id=org["id"],
|
|
node_id="1234567890",
|
|
device_nickname="Test Device",
|
|
hostname="test-device",
|
|
)
|
|
_db.session.add(device)
|
|
_db.session.flush()
|
|
device_id = device.id
|
|
|
|
portal_network = PortalNetwork(
|
|
organization_id=org["id"],
|
|
name="Test Network",
|
|
owner_user_id=admin["id"],
|
|
zerotier_network_id="a84ac5c10a6e4c7e",
|
|
request_mode=NetworkRequestMode.OPEN,
|
|
)
|
|
_db.session.add(portal_network)
|
|
_db.session.flush()
|
|
portal_network_id = portal_network.id
|
|
|
|
request = NetworkAccessRequest(
|
|
organization_id=org["id"],
|
|
user_id=user["id"],
|
|
device_id=device_id,
|
|
portal_network_id=portal_network_id,
|
|
status=ApprovalState.APPROVED,
|
|
active=True,
|
|
)
|
|
_db.session.add(request)
|
|
_db.session.commit()
|
|
request_id = request.id
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.delete(
|
|
f"/organizations/{org['id']}/admin/memberships/{request_id}",
|
|
)
|
|
assert_success(result, "permanently deleted")
|
|
|
|
# Verify request is gone from DB
|
|
with integration_app.app_context():
|
|
deleted = NetworkAccessRequest.query.get(request_id)
|
|
assert deleted is None
|
|
|
|
mock_deauthorize_member.assert_called_once()
|
|
mock_delete_member.assert_called_once()
|
|
|
|
@patch("gatehouse_app.services.zerotier_api_service.delete_network_member")
|
|
def test_force_delete_soft_deleted_membership_positive(
|
|
self,
|
|
mock_delete_member,
|
|
integration_client,
|
|
create_test_user,
|
|
create_test_org,
|
|
create_test_membership,
|
|
integration_app,
|
|
):
|
|
"""TEST: ZT-17 — Admin force-deletes an already-soft-deleted membership.
|
|
|
|
WHAT: Admin calls DELETE admin/memberships/<id> on a soft-deleted
|
|
request. Should still work (remove from ZT, hard-delete).
|
|
EXPECTED: 200 OK.
|
|
"""
|
|
from datetime import datetime, timezone
|
|
from gatehouse_app.models.zerotier.device import Device
|
|
from gatehouse_app.models.zerotier.portal_network import PortalNetwork
|
|
from gatehouse_app.models.zerotier.network_access_request import NetworkAccessRequest
|
|
from gatehouse_app.extensions import db as _db
|
|
from gatehouse_app.utils.constants import ApprovalState, NetworkRequestMode
|
|
|
|
admin = create_test_user(password="AdminPass123!")
|
|
user = create_test_user(password="UserPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.ADMIN)
|
|
create_test_membership(user["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
with integration_app.app_context():
|
|
device = Device(
|
|
user_id=user["id"],
|
|
organization_id=org["id"],
|
|
node_id="1234567890",
|
|
device_nickname="Test Device",
|
|
hostname="test-device",
|
|
)
|
|
_db.session.add(device)
|
|
_db.session.flush()
|
|
device_id = device.id
|
|
|
|
portal_network = PortalNetwork(
|
|
organization_id=org["id"],
|
|
name="Test Network",
|
|
owner_user_id=admin["id"],
|
|
zerotier_network_id="a84ac5c10a6e4c7e",
|
|
request_mode=NetworkRequestMode.OPEN,
|
|
)
|
|
_db.session.add(portal_network)
|
|
_db.session.flush()
|
|
portal_network_id = portal_network.id
|
|
|
|
request = NetworkAccessRequest(
|
|
organization_id=org["id"],
|
|
user_id=user["id"],
|
|
device_id=device_id,
|
|
portal_network_id=portal_network_id,
|
|
status=ApprovalState.APPROVED,
|
|
active=False,
|
|
deleted_at=datetime.now(timezone.utc),
|
|
)
|
|
_db.session.add(request)
|
|
_db.session.commit()
|
|
request_id = request.id
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.delete(
|
|
f"/organizations/{org['id']}/admin/memberships/{request_id}",
|
|
)
|
|
assert_success(result, "permanently deleted")
|
|
|
|
# Verify request is gone from DB
|
|
with integration_app.app_context():
|
|
deleted = NetworkAccessRequest.query.get(request_id)
|
|
assert deleted is None
|
|
|
|
mock_delete_member.assert_called_once()
|
|
|
|
def test_force_delete_non_existent_membership_negative(
|
|
self,
|
|
integration_client,
|
|
create_test_user,
|
|
create_test_org,
|
|
create_test_membership,
|
|
):
|
|
"""TEST: ZT-18 — Admin force-deletes a non-existent membership.
|
|
|
|
WHAT: Admin calls DELETE admin/memberships/<non_existent_id>.
|
|
EXPECTED: 404 Not Found.
|
|
"""
|
|
import uuid
|
|
|
|
admin = create_test_user(password="AdminPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.ADMIN)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.delete(
|
|
f"/organizations/{org['id']}/admin/memberships/{uuid.uuid4()}",
|
|
)
|
|
assert exc_info.value.status_code == 404
|
|
|
|
class TestZeroTierNetworkMembers:
|
|
"""Test GET /organizations/<org_id>/networks/<network_id>/members.
|
|
|
|
Ensures the endpoint returns human-friendly names (user_email, user_name,
|
|
device_name, device_node_id) instead of bare UUIDs.
|
|
"""
|
|
|
|
def _setup_network_and_member(
|
|
self, integration_app, org_id, user_id, device_kwargs=None
|
|
):
|
|
"""Helper: create a PortalNetwork and an approved NetworkAccessRequest.
|
|
|
|
Returns (network_id, request_id).
|
|
"""
|
|
from gatehouse_app.models.zerotier.device import Device
|
|
from gatehouse_app.models.zerotier.portal_network import PortalNetwork
|
|
from gatehouse_app.models.zerotier.network_access_request import NetworkAccessRequest
|
|
from gatehouse_app.extensions import db as _db
|
|
from gatehouse_app.utils.constants import ApprovalState, NetworkRequestMode
|
|
|
|
kwargs = {
|
|
"node_id": "a1b2c3d4e5",
|
|
"device_nickname": "My Laptop",
|
|
"hostname": "my-laptop",
|
|
}
|
|
if device_kwargs:
|
|
kwargs.update(device_kwargs)
|
|
|
|
with integration_app.app_context():
|
|
device = Device(
|
|
user_id=user_id,
|
|
organization_id=org_id,
|
|
**{k: v for k, v in kwargs.items() if hasattr(Device, k)},
|
|
)
|
|
_db.session.add(device)
|
|
_db.session.flush()
|
|
device_id = device.id
|
|
|
|
network = PortalNetwork(
|
|
organization_id=org_id,
|
|
name="Test Net",
|
|
owner_user_id=user_id,
|
|
zerotier_network_id="bbbbbbbbbb",
|
|
request_mode=NetworkRequestMode.OPEN,
|
|
)
|
|
_db.session.add(network)
|
|
_db.session.flush()
|
|
network_id = network.id
|
|
|
|
request = NetworkAccessRequest(
|
|
organization_id=org_id,
|
|
user_id=user_id,
|
|
device_id=device_id,
|
|
portal_network_id=network_id,
|
|
status=ApprovalState.APPROVED,
|
|
active=True,
|
|
justification="Need access for work",
|
|
)
|
|
_db.session.add(request)
|
|
_db.session.commit()
|
|
return network_id, device_id, request.id
|
|
|
|
def test_empty_when_no_members(
|
|
self, integration_client, create_test_user, create_test_org, create_test_membership, integration_app
|
|
):
|
|
"""ZT-MEM-01: Network with no approved members returns empty list.
|
|
|
|
Creates a network with no NetworkAccessRequests at all.
|
|
"""
|
|
from gatehouse_app.models.zerotier.portal_network import PortalNetwork
|
|
from gatehouse_app.extensions import db as _db
|
|
from gatehouse_app.utils.constants import NetworkRequestMode
|
|
|
|
user = create_test_user(password="Pass1234!")
|
|
org = create_test_org()
|
|
create_test_membership(user["id"], org["id"])
|
|
|
|
with integration_app.app_context():
|
|
network = PortalNetwork(
|
|
organization_id=org["id"],
|
|
name="Empty Net",
|
|
owner_user_id=user["id"],
|
|
zerotier_network_id="cccccccccc",
|
|
request_mode=NetworkRequestMode.OPEN,
|
|
)
|
|
_db.session.add(network)
|
|
_db.session.commit()
|
|
net_id = network.id
|
|
|
|
integration_client.auth.login(email=user["email"], password="Pass1234!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/networks/{net_id}/members")
|
|
data = result.get("data", {})
|
|
assert data.get("memberships") == []
|
|
assert data.get("count") == 0
|
|
|
|
def test_returns_enriched_fields(
|
|
self, integration_client, create_test_user, create_test_org, create_test_membership, integration_app
|
|
):
|
|
"""ZT-MEM-02: Approved members include user_email, user_name, device_name, device_node_id.
|
|
|
|
Verifies the full enrichment pipeline with a user that has both
|
|
email and full_name and a device with all identifiers.
|
|
"""
|
|
user = create_test_user(
|
|
password="Pass1234!",
|
|
email="enriched_alice@test.com",
|
|
full_name="Alice Smith",
|
|
)
|
|
org = create_test_org()
|
|
create_test_membership(user["id"], org["id"])
|
|
|
|
net_id, device_id, request_id = self._setup_network_and_member(
|
|
integration_app, org["id"], user["id"],
|
|
{"device_nickname": "Alice Laptop", "hostname": "alice-laptop", "node_id": "aaaaaaaaaa"},
|
|
)
|
|
|
|
integration_client.auth.login(email=user["email"], password="Pass1234!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/networks/{net_id}/members")
|
|
data = result.get("data", {})
|
|
|
|
assert data["count"] == 1
|
|
member = data["memberships"][0]
|
|
assert member["user_email"] == "enriched_alice@test.com"
|
|
assert member["user_name"] == "Alice Smith"
|
|
assert member["device_name"] == "Alice Laptop"
|
|
assert member["device_node_id"] == "aaaaaaaaaa"
|
|
assert member["user_id"] == user["id"]
|
|
assert member["device_id"] == device_id
|
|
assert "id" in member
|
|
assert member["status"] == "approved"
|
|
assert member["justification"] == "Need access for work"
|
|
|
|
def test_device_name_fallback_hostname(
|
|
self, integration_client, create_test_user, create_test_org, create_test_membership, integration_app
|
|
):
|
|
"""ZT-MEM-03: device_name falls back to hostname when device_nickname is None."""
|
|
user = create_test_user(password="Pass1234!")
|
|
org = create_test_org()
|
|
create_test_membership(user["id"], org["id"])
|
|
|
|
net_id, _, _ = self._setup_network_and_member(
|
|
integration_app, org["id"], user["id"],
|
|
{"device_nickname": None, "hostname": "headless-server", "node_id": "bbbbbbbbbb"},
|
|
)
|
|
|
|
integration_client.auth.login(email=user["email"], password="Pass1234!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/networks/{net_id}/members")
|
|
data = result.get("data", {})
|
|
assert data["count"] == 1
|
|
assert data["memberships"][0]["device_name"] == "headless-server"
|
|
|
|
def test_device_name_fallback_node_id(
|
|
self, integration_client, create_test_user, create_test_org, create_test_membership, integration_app
|
|
):
|
|
"""ZT-MEM-04: device_name falls back to node_id when nickname and hostname are both None."""
|
|
user = create_test_user(password="Pass1234!")
|
|
org = create_test_org()
|
|
create_test_membership(user["id"], org["id"])
|
|
|
|
net_id, _, _ = self._setup_network_and_member(
|
|
integration_app, org["id"], user["id"],
|
|
{"device_nickname": None, "hostname": None, "node_id": "cccccccccc"},
|
|
)
|
|
|
|
integration_client.auth.login(email=user["email"], password="Pass1234!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/networks/{net_id}/members")
|
|
data = result.get("data", {})
|
|
assert data["count"] == 1
|
|
assert data["memberships"][0]["device_name"] == "cccccccccc"
|
|
|
|
def test_user_name_is_none_when_missing(
|
|
self, integration_client, create_test_user, create_test_org, create_test_membership, integration_app
|
|
):
|
|
"""ZT-MEM-05: user_name is None when the user record has no full_name."""
|
|
user = create_test_user(password="Pass1234!", full_name=None)
|
|
org = create_test_org()
|
|
create_test_membership(user["id"], org["id"])
|
|
|
|
net_id, _, _ = self._setup_network_and_member(
|
|
integration_app, org["id"], user["id"],
|
|
)
|
|
|
|
integration_client.auth.login(email=user["email"], password="Pass1234!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/networks/{net_id}/members")
|
|
data = result.get("data", {})
|
|
assert data["count"] == 1
|
|
assert data["memberships"][0]["user_name"] is None
|
|
assert data["memberships"][0]["user_email"] == user["email"]
|
|
|
|
def test_multiple_members_returned(
|
|
self, integration_client, create_test_user, create_test_org, create_test_membership, integration_app
|
|
):
|
|
"""ZT-MEM-06: Multiple approved members for the same network all appear.
|
|
|
|
Two users, each with their own device, both approved on the same network.
|
|
"""
|
|
from gatehouse_app.models.zerotier.device import Device
|
|
from gatehouse_app.models.zerotier.portal_network import PortalNetwork
|
|
from gatehouse_app.models.zerotier.network_access_request import NetworkAccessRequest
|
|
from gatehouse_app.extensions import db as _db
|
|
from gatehouse_app.utils.constants import ApprovalState, NetworkRequestMode
|
|
|
|
admin = create_test_user(password="Pass1234!")
|
|
user_a = create_test_user(password="Pass1234!", email="multi_alice@test.com", full_name="Alice")
|
|
user_b = create_test_user(password="Pass1234!", email="multi_bob@test.com", full_name="Bob")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"])
|
|
create_test_membership(user_a["id"], org["id"])
|
|
create_test_membership(user_b["id"], org["id"])
|
|
|
|
with integration_app.app_context():
|
|
network = PortalNetwork(
|
|
organization_id=org["id"],
|
|
name="Shared Net",
|
|
owner_user_id=admin["id"],
|
|
zerotier_network_id="dddddddddd",
|
|
request_mode=NetworkRequestMode.OPEN,
|
|
)
|
|
_db.session.add(network)
|
|
_db.session.flush()
|
|
net_id = network.id
|
|
|
|
dev_a = Device(user_id=user_a["id"], organization_id=org["id"],
|
|
node_id="device_a_01", device_nickname="Alice Phone")
|
|
_db.session.add(dev_a)
|
|
_db.session.flush()
|
|
dev_b = Device(user_id=user_b["id"], organization_id=org["id"],
|
|
node_id="device_b_01", device_nickname="Bob Tablet")
|
|
_db.session.add(dev_b)
|
|
_db.session.flush()
|
|
|
|
req_a = NetworkAccessRequest(
|
|
organization_id=org["id"], user_id=user_a["id"], device_id=dev_a.id,
|
|
portal_network_id=net_id, status=ApprovalState.APPROVED, active=True,
|
|
)
|
|
req_b = NetworkAccessRequest(
|
|
organization_id=org["id"], user_id=user_b["id"], device_id=dev_b.id,
|
|
portal_network_id=net_id, status=ApprovalState.APPROVED, active=True,
|
|
)
|
|
_db.session.add_all([req_a, req_b])
|
|
_db.session.commit()
|
|
|
|
integration_client.auth.login(email=admin["email"], password="Pass1234!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/networks/{net_id}/members")
|
|
data = result.get("data", {})
|
|
assert data["count"] == 2
|
|
|
|
emails = {m["user_email"] for m in data["memberships"]}
|
|
names = {m["user_name"] for m in data["memberships"]}
|
|
device_names = {m["device_name"] for m in data["memberships"]}
|
|
assert emails == {"multi_alice@test.com", "multi_bob@test.com"}
|
|
assert names == {"Alice", "Bob"}
|
|
assert device_names == {"Alice Phone", "Bob Tablet"}
|
|
|
|
def test_pending_memberships_excluded(
|
|
self, integration_client, create_test_user, create_test_org, create_test_membership, integration_app
|
|
):
|
|
"""ZT-MEM-07: Pending (non-approved) memberships are NOT returned.
|
|
|
|
Only APPROVED status memberships should appear.
|
|
"""
|
|
from gatehouse_app.models.zerotier.device import Device
|
|
from gatehouse_app.models.zerotier.portal_network import PortalNetwork
|
|
from gatehouse_app.models.zerotier.network_access_request import NetworkAccessRequest
|
|
from gatehouse_app.extensions import db as _db
|
|
from gatehouse_app.utils.constants import ApprovalState, NetworkRequestMode
|
|
|
|
user = create_test_user(password="Pass1234!")
|
|
org = create_test_org()
|
|
create_test_membership(user["id"], org["id"])
|
|
|
|
with integration_app.app_context():
|
|
network = PortalNetwork(
|
|
organization_id=org["id"], name="Test", owner_user_id=user["id"],
|
|
zerotier_network_id="eeeeeeeeee", request_mode=NetworkRequestMode.OPEN,
|
|
)
|
|
_db.session.add(network)
|
|
_db.session.flush()
|
|
net_id = network.id
|
|
|
|
device = Device(
|
|
user_id=user["id"], organization_id=org["id"],
|
|
node_id="ffffffffff", device_nickname="Pending Device",
|
|
)
|
|
_db.session.add(device)
|
|
_db.session.flush()
|
|
|
|
req = NetworkAccessRequest(
|
|
organization_id=org["id"], user_id=user["id"], device_id=device.id,
|
|
portal_network_id=net_id, status=ApprovalState.PENDING, active=False,
|
|
)
|
|
_db.session.add(req)
|
|
_db.session.commit()
|
|
|
|
integration_client.auth.login(email=user["email"], password="Pass1234!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/networks/{net_id}/members")
|
|
data = result.get("data", {})
|
|
assert data["count"] == 0
|
|
assert data["memberships"] == []
|
|
|
|
def test_soft_deleted_memberships_excluded(
|
|
self, integration_client, create_test_user, create_test_org, create_test_membership, integration_app
|
|
):
|
|
"""ZT-MEM-08: Soft-deleted (deleted_at set) memberships are not returned."""
|
|
from datetime import datetime, timezone
|
|
from gatehouse_app.models.zerotier.device import Device
|
|
from gatehouse_app.models.zerotier.portal_network import PortalNetwork
|
|
from gatehouse_app.models.zerotier.network_access_request import NetworkAccessRequest
|
|
from gatehouse_app.extensions import db as _db
|
|
from gatehouse_app.utils.constants import ApprovalState, NetworkRequestMode
|
|
|
|
user = create_test_user(password="Pass1234!")
|
|
org = create_test_org()
|
|
create_test_membership(user["id"], org["id"])
|
|
|
|
with integration_app.app_context():
|
|
network = PortalNetwork(
|
|
organization_id=org["id"], name="Test", owner_user_id=user["id"],
|
|
zerotier_network_id="gggggggggg", request_mode=NetworkRequestMode.OPEN,
|
|
)
|
|
_db.session.add(network)
|
|
_db.session.flush()
|
|
net_id = network.id
|
|
|
|
device = Device(
|
|
user_id=user["id"], organization_id=org["id"],
|
|
node_id="hhhhhhhhhh", device_nickname="Deleted Device",
|
|
)
|
|
_db.session.add(device)
|
|
_db.session.flush()
|
|
|
|
req = NetworkAccessRequest(
|
|
organization_id=org["id"], user_id=user["id"], device_id=device.id,
|
|
portal_network_id=net_id, status=ApprovalState.APPROVED, active=True,
|
|
deleted_at=datetime.now(timezone.utc),
|
|
)
|
|
_db.session.add(req)
|
|
_db.session.commit()
|
|
|
|
integration_client.auth.login(email=user["email"], password="Pass1234!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/networks/{net_id}/members")
|
|
data = result.get("data", {})
|
|
assert data["count"] == 0
|
|
|
|
def test_network_not_found_negative(
|
|
self, integration_client, create_test_user, create_test_org, create_test_membership
|
|
):
|
|
"""ZT-MEM-09: 404 when network_id does not exist in the organization."""
|
|
import uuid
|
|
|
|
user = create_test_user(password="Pass1234!")
|
|
org = create_test_org()
|
|
create_test_membership(user["id"], org["id"])
|
|
|
|
integration_client.auth.login(email=user["email"], password="Pass1234!")
|
|
with pytest.raises(ApiError) as exc:
|
|
integration_client.get(
|
|
f"/organizations/{org['id']}/networks/{uuid.uuid4()}/members"
|
|
)
|
|
assert exc.value.status_code == 404
|
|
|
|
def test_user_not_in_org_negative(
|
|
self, integration_client, create_test_user, create_test_org, create_test_membership, integration_app
|
|
):
|
|
"""ZT-MEM-10: 403 when the requesting user is not a member of the org."""
|
|
from gatehouse_app.models.zerotier.portal_network import PortalNetwork
|
|
from gatehouse_app.extensions import db as _db
|
|
from gatehouse_app.utils.constants import NetworkRequestMode
|
|
|
|
owner = create_test_user(password="Owner1234!")
|
|
intruder = create_test_user(password="Intruder1234!")
|
|
org = create_test_org()
|
|
|
|
# Only the owner is a member; intruder is NOT in this org
|
|
create_test_membership(owner["id"], org["id"], OrganizationRole.ADMIN)
|
|
|
|
with integration_app.app_context():
|
|
network = PortalNetwork(
|
|
organization_id=org["id"], name="Secret Net", owner_user_id=owner["id"],
|
|
zerotier_network_id="iiiiiiiiii", request_mode=NetworkRequestMode.OPEN,
|
|
)
|
|
_db.session.add(network)
|
|
_db.session.commit()
|
|
net_id = network.id
|
|
|
|
integration_client.auth.login(email=intruder["email"], password="Intruder1234!")
|
|
with pytest.raises(ApiError) as exc:
|
|
integration_client.get(f"/organizations/{org['id']}/networks/{net_id}/members")
|
|
assert exc.value.status_code == 403
|
|
|
|
def test_auth_required_negative(self, integration_client):
|
|
"""ZT-MEM-11: 401 when no auth token is provided."""
|
|
import uuid
|
|
|
|
with pytest.raises(ApiError) as exc:
|
|
integration_client.get(
|
|
f"/organizations/{uuid.uuid4()}/networks/{uuid.uuid4()}/members"
|
|
)
|
|
assert exc.value.status_code == 401
|
|
|
|
def test_no_active_session(
|
|
self, integration_client, create_test_user, create_test_org, create_test_membership, integration_app
|
|
):
|
|
"""ZT-MEM-12: Inactive (active=False) approved memberships are still returned.
|
|
|
|
The endpoint returns ALL approved memberships regardless of the
|
|
`active` flag — both active=True and active=False appear.
|
|
"""
|
|
from gatehouse_app.models.zerotier.device import Device
|
|
from gatehouse_app.models.zerotier.portal_network import PortalNetwork
|
|
from gatehouse_app.models.zerotier.network_access_request import NetworkAccessRequest
|
|
from gatehouse_app.extensions import db as _db
|
|
from gatehouse_app.utils.constants import ApprovalState, NetworkRequestMode
|
|
|
|
user = create_test_user(password="Pass1234!")
|
|
org = create_test_org()
|
|
create_test_membership(user["id"], org["id"])
|
|
|
|
with integration_app.app_context():
|
|
network = PortalNetwork(
|
|
organization_id=org["id"], name="Test", owner_user_id=user["id"],
|
|
zerotier_network_id="jjjjjjjjjj", request_mode=NetworkRequestMode.OPEN,
|
|
)
|
|
_db.session.add(network)
|
|
_db.session.flush()
|
|
net_id = network.id
|
|
|
|
device = Device(
|
|
user_id=user["id"], organization_id=org["id"],
|
|
node_id="kkkkkkkkkk", device_nickname="Inactive Dev",
|
|
)
|
|
_db.session.add(device)
|
|
_db.session.flush()
|
|
|
|
req = NetworkAccessRequest(
|
|
organization_id=org["id"], user_id=user["id"], device_id=device.id,
|
|
portal_network_id=net_id, status=ApprovalState.APPROVED, active=False,
|
|
)
|
|
_db.session.add(req)
|
|
_db.session.commit()
|
|
|
|
integration_client.auth.login(email=user["email"], password="Pass1234!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/networks/{net_id}/members")
|
|
data = result.get("data", {})
|
|
assert data["count"] == 1
|
|
assert data["memberships"][0]["active"] is False
|
|
|
|
|
|
class TestAdminForceDeleteMembership:
|
|
"""Test admin force-delete of network access requests."""
|
|
|
|
def test_force_delete_non_admin_negative(
|
|
self,
|
|
integration_client,
|
|
create_test_user,
|
|
create_test_org,
|
|
create_test_membership,
|
|
):
|
|
"""TEST: ZT-19 — Non-admin cannot force-delete a membership.
|
|
|
|
WHAT: A MEMBER calls DELETE admin/memberships/<id>.
|
|
EXPECTED: 403 Forbidden.
|
|
"""
|
|
import uuid
|
|
|
|
member = create_test_user(password="MemberPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(member["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=member["email"], password="MemberPass123!")
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.delete(
|
|
f"/organizations/{org['id']}/admin/memberships/{uuid.uuid4()}",
|
|
)
|
|
assert exc_info.value.status_code == 403
|
|
|
|
|
|
class TestSessions:
|
|
"""Test user and admin session listing endpoints."""
|
|
|
|
def test_list_sessions_positive(
|
|
self, integration_client, create_test_user, create_test_org,
|
|
create_test_membership, integration_app,
|
|
):
|
|
"""ZT-SESS-01: User lists their own active sessions.
|
|
|
|
WHAT: GET /organizations/<id>/sessions for a user with an active session.
|
|
EXPECTED: 200 OK with session containing device, network, and timing fields.
|
|
"""
|
|
from gatehouse_app.models.zerotier.device import Device
|
|
from gatehouse_app.models.zerotier.portal_network import PortalNetwork
|
|
from gatehouse_app.models.zerotier.network_access_request import NetworkAccessRequest
|
|
from gatehouse_app.models.zerotier.activation_session import ActivationSession
|
|
from gatehouse_app.extensions import db as _db
|
|
from gatehouse_app.utils.constants import ApprovalState, ApprovalGrantType, NetworkRequestMode
|
|
|
|
user = create_test_user(password="Pass1234!")
|
|
org = create_test_org()
|
|
create_test_membership(user["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
with integration_app.app_context():
|
|
network = PortalNetwork(
|
|
organization_id=org["id"], name="Test Net",
|
|
owner_user_id=user["id"],
|
|
zerotier_network_id="aabbccddee11",
|
|
request_mode=NetworkRequestMode.OPEN,
|
|
)
|
|
_db.session.add(network)
|
|
_db.session.flush()
|
|
|
|
device = Device(
|
|
user_id=user["id"], organization_id=org["id"],
|
|
node_id="deadbeef01", device_nickname="My Laptop",
|
|
hostname="my-laptop",
|
|
)
|
|
_db.session.add(device)
|
|
_db.session.flush()
|
|
|
|
req = NetworkAccessRequest(
|
|
organization_id=org["id"], user_id=user["id"],
|
|
device_id=device.id, portal_network_id=network.id,
|
|
status=ApprovalState.APPROVED, active=True,
|
|
grant_type=ApprovalGrantType.REQUESTED,
|
|
)
|
|
_db.session.add(req)
|
|
_db.session.flush()
|
|
|
|
session = ActivationSession(
|
|
organization_id=org["id"], user_id=user["id"],
|
|
network_access_request_id=req.id,
|
|
authenticated_at=now - timedelta(hours=1),
|
|
expires_at=now + timedelta(hours=7),
|
|
created_by=user["id"],
|
|
)
|
|
_db.session.add(session)
|
|
_db.session.commit()
|
|
|
|
saved_session_id = session.id
|
|
|
|
integration_client.auth.login(email=user["email"], password="Pass1234!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/sessions")
|
|
data = result.get("data", {})
|
|
|
|
assert data["count"] == 1
|
|
s = data["sessions"][0]
|
|
assert s["id"] == saved_session_id
|
|
assert s["is_active"] is True
|
|
assert s["is_expired"] is False
|
|
assert s["duration_seconds"] == 28800
|
|
assert s["remaining_seconds"] > 0
|
|
|
|
assert s["device"]["node_id"] == "deadbeef01"
|
|
assert s["device"]["name"] == "My Laptop"
|
|
|
|
assert s["network"]["name"] == "Test Net"
|
|
|
|
assert "user" not in s
|
|
|
|
def test_list_sessions_empty(
|
|
self, integration_client, create_test_user, create_test_org,
|
|
create_test_membership,
|
|
):
|
|
"""ZT-SESS-02: User with no sessions gets empty array."""
|
|
user = create_test_user(password="Pass1234!")
|
|
org = create_test_org()
|
|
create_test_membership(user["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=user["email"], password="Pass1234!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/sessions")
|
|
data = result.get("data", {})
|
|
assert data["count"] == 0
|
|
assert data["sessions"] == []
|
|
|
|
def test_list_sessions_unauth_negative(
|
|
self, integration_client, create_test_org,
|
|
):
|
|
"""ZT-SESS-03: Unauthenticated user gets 401."""
|
|
org = create_test_org()
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.get(f"/organizations/{org['id']}/sessions")
|
|
assert exc_info.value.status_code == 401
|
|
|
|
def test_admin_list_sessions_positive(
|
|
self, integration_client, create_test_user, create_test_org,
|
|
create_test_membership, integration_app,
|
|
):
|
|
"""ZT-SESS-04: Admin lists all sessions across users.
|
|
|
|
WHAT: GET /organizations/<id>/admin/sessions.
|
|
EXPECTED: 200 OK with all users' sessions including user details.
|
|
"""
|
|
from gatehouse_app.models.zerotier.device import Device
|
|
from gatehouse_app.models.zerotier.portal_network import PortalNetwork
|
|
from gatehouse_app.models.zerotier.network_access_request import NetworkAccessRequest
|
|
from gatehouse_app.models.zerotier.activation_session import ActivationSession
|
|
from gatehouse_app.extensions import db as _db
|
|
from gatehouse_app.utils.constants import ApprovalState, ApprovalGrantType, NetworkRequestMode
|
|
|
|
admin = create_test_user(password="AdminPass123!")
|
|
member1 = create_test_user(password="Mem1Pass123!", full_name="Alice")
|
|
member2 = create_test_user(password="Mem2Pass123!", full_name="Bob")
|
|
org = create_test_org()
|
|
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.ADMIN)
|
|
create_test_membership(member1["id"], org["id"], OrganizationRole.MEMBER)
|
|
create_test_membership(member2["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
with integration_app.app_context():
|
|
network = PortalNetwork(
|
|
organization_id=org["id"], name="Shared Net",
|
|
owner_user_id=admin["id"],
|
|
zerotier_network_id="ffgg00112233",
|
|
request_mode=NetworkRequestMode.OPEN,
|
|
)
|
|
_db.session.add(network)
|
|
_db.session.flush()
|
|
|
|
for member, node_id, nick, full_name in [
|
|
(member1, "aaaa01", "Alice Mac", None),
|
|
(member2, "bbbb02", None, "bob-pc"),
|
|
]:
|
|
device = Device(
|
|
user_id=member["id"], organization_id=org["id"],
|
|
node_id=node_id, device_nickname=nick,
|
|
hostname="host",
|
|
)
|
|
_db.session.add(device)
|
|
_db.session.flush()
|
|
|
|
req = NetworkAccessRequest(
|
|
organization_id=org["id"], user_id=member["id"],
|
|
device_id=device.id, portal_network_id=network.id,
|
|
status=ApprovalState.APPROVED, active=True,
|
|
grant_type=ApprovalGrantType.REQUESTED,
|
|
)
|
|
_db.session.add(req)
|
|
_db.session.flush()
|
|
|
|
session = ActivationSession(
|
|
organization_id=org["id"], user_id=member["id"],
|
|
network_access_request_id=req.id,
|
|
authenticated_at=now - timedelta(hours=2),
|
|
expires_at=now + timedelta(hours=6),
|
|
created_by=member["id"],
|
|
)
|
|
_db.session.add(session)
|
|
_db.session.commit()
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.get(f"/organizations/{org['id']}/admin/sessions")
|
|
data = result.get("data", {})
|
|
|
|
assert data["count"] == 2
|
|
|
|
user_names = {s["user"]["full_name"] for s in data["sessions"]}
|
|
assert "Alice" in user_names
|
|
assert "Bob" in user_names
|
|
|
|
for s in data["sessions"]:
|
|
assert s["is_active"] is True
|
|
assert "device" in s
|
|
assert "node_id" in s["device"]
|
|
assert "network" in s
|
|
assert s["network"]["name"] == "Shared Net"
|
|
|
|
def test_admin_list_sessions_non_admin_negative(
|
|
self, integration_client, create_test_user, create_test_org,
|
|
create_test_membership,
|
|
):
|
|
"""ZT-SESS-05: Non-admin gets 403 on admin sessions endpoint."""
|
|
member = create_test_user(password="Pass1234!")
|
|
org = create_test_org()
|
|
create_test_membership(member["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=member["email"], password="Pass1234!")
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.get(f"/organizations/{org['id']}/admin/sessions")
|
|
assert exc_info.value.status_code == 403
|
|
|
|
def test_admin_list_sessions_unauth_negative(
|
|
self, integration_client, create_test_org,
|
|
):
|
|
"""ZT-SESS-06: Unauthenticated user gets 401 on admin endpoint."""
|
|
org = create_test_org()
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.get(f"/organizations/{org['id']}/admin/sessions")
|
|
assert exc_info.value.status_code == 401
|