562 lines
26 KiB
Python
562 lines
26 KiB
Python
"""Admin operations integration tests.
|
|
|
|
Covers user suspension, MFA removal, password reset, and hard deletion.
|
|
All endpoints require admin/superadmin privileges.
|
|
"""
|
|
import pytest
|
|
|
|
from tests.integration.client.base import ApiError
|
|
from gatehouse_app.utils.constants import OrganizationRole
|
|
|
|
|
|
def assert_success(response: dict, message_contains: str = "") -> dict:
|
|
data = response.get("data", {})
|
|
assert response.get("success") is not False, (
|
|
f"Expected success but got error: {response.get('message')}"
|
|
)
|
|
if message_contains:
|
|
assert message_contains.lower() in response.get("message", "").lower()
|
|
return data
|
|
|
|
|
|
def assert_error(exc: ApiError, expected_status: int, expected_error_type: str | None = None):
|
|
assert exc.status_code == expected_status, (
|
|
f"Expected status {expected_status} but got {exc.status_code}"
|
|
)
|
|
if expected_error_type:
|
|
assert exc.error_type == expected_error_type
|
|
|
|
|
|
class TestAdminUserManagement:
|
|
"""Test admin-only user management endpoints."""
|
|
|
|
def test_list_users_positive(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ADMIN-01 — List all users as admin.
|
|
|
|
WHAT: Create an admin user, login, then GET /admin/users.
|
|
WHY: The user management page needs a paginated user list.
|
|
EXPECTED: 200 OK with users array.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.OWNER)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.admin.list_users()
|
|
data = assert_success(result)
|
|
assert "users" in data or "count" in data
|
|
|
|
def test_list_users_non_admin_negative(self, integration_client, create_test_user):
|
|
"""TEST: ADMIN-02 — Reject listing users as non-admin.
|
|
|
|
WHAT: Regular user attempts GET /admin/users.
|
|
WHY: User lists contain sensitive data; must be admin-only.
|
|
EXPECTED: 403 Forbidden.
|
|
"""
|
|
user = create_test_user(password="MyPassword123!")
|
|
integration_client.auth.login(email=user["email"], password="MyPassword123!")
|
|
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.admin.list_users()
|
|
|
|
assert exc_info.value.status_code == 403
|
|
|
|
def test_suspend_user_positive(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ADMIN-03 — Suspend user account.
|
|
|
|
WHAT: Admin suspends a user, then verify the user cannot login.
|
|
WHY: Suspension is a critical security tool for compromised
|
|
accounts.
|
|
EXPECTED: 200 OK on suspend. Login returns 403.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
victim = create_test_user(password="VictimPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.OWNER)
|
|
create_test_membership(victim["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.admin.suspend_user(victim["id"])
|
|
assert_success(result)
|
|
|
|
# Verify victim cannot login
|
|
integration_client.auth.logout()
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.auth.login(email=victim["email"], password="VictimPass123!")
|
|
assert exc_info.value.status_code == 403
|
|
|
|
def test_unsuspend_user_positive(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ADMIN-05 — Unsuspend user account.
|
|
|
|
WHAT: Admin suspends then unsuspends a user, verify they can
|
|
login again.
|
|
WHY: False positives happen; admins must be able to restore
|
|
access.
|
|
EXPECTED: 200 OK on unsuspend. Login succeeds afterwards.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
victim = create_test_user(password="VictimPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.OWNER)
|
|
create_test_membership(victim["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
integration_client.admin.suspend_user(victim["id"])
|
|
result = integration_client.admin.unsuspend_user(victim["id"])
|
|
assert_success(result)
|
|
|
|
integration_client.auth.logout()
|
|
login_result = integration_client.auth.login(email=victim["email"], password="VictimPass123!")
|
|
assert_success(login_result, "login successful")
|
|
|
|
def test_admin_verify_email_positive(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ADMIN-07 — Admin verifies user email.
|
|
|
|
WHAT: Create an unverified user, admin calls verify endpoint.
|
|
WHY: Admins may need to bypass verification for support
|
|
reasons.
|
|
EXPECTED: 200 OK, user.email_verified becomes True.
|
|
"""
|
|
from gatehouse_app.models.user.user import User
|
|
from gatehouse_app.extensions import db
|
|
|
|
admin = create_test_user(password="AdminPass123!")
|
|
victim = create_test_user(password="VictimPass123!", email_verified=False)
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.OWNER)
|
|
create_test_membership(victim["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.admin.verify_user_email(victim["id"])
|
|
assert_success(result)
|
|
|
|
with integration_app.app_context():
|
|
user = User.query.get(victim["id"])
|
|
assert user.email_verified is True
|
|
|
|
def test_admin_set_password_positive(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ADMIN-08 — Admin sets user password.
|
|
|
|
WHAT: Admin overrides a user's password, then verify the user
|
|
can login with the new password.
|
|
WHY: Account recovery when user has lost access to email/MFA.
|
|
EXPECTED: 200 OK. Login with new password succeeds.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
victim = create_test_user(password="VictimPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.OWNER)
|
|
create_test_membership(victim["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.admin.set_user_password(victim["id"], "NewAdminSet456!")
|
|
assert_success(result)
|
|
|
|
integration_client.auth.logout()
|
|
login_result = integration_client.auth.login(email=victim["email"], password="NewAdminSet456!")
|
|
assert_success(login_result, "login successful")
|
|
|
|
def test_admin_remove_totp_positive(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ADMIN-10 — Admin removes user TOTP.
|
|
|
|
WHAT: User enrolls TOTP, admin removes it.
|
|
WHY: Account recovery when user lost their authenticator.
|
|
EXPECTED: 200 OK. TOTP status returns disabled.
|
|
"""
|
|
import pyotp
|
|
|
|
admin = create_test_user(password="AdminPass123!")
|
|
victim = create_test_user(password="VictimPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.OWNER)
|
|
create_test_membership(victim["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
# Victim enrolls TOTP
|
|
integration_client.auth.login(email=victim["email"], password="VictimPass123!")
|
|
enroll = integration_client.mfa.enroll_totp()
|
|
secret = enroll["data"]["secret"]
|
|
integration_client.mfa.verify_enrollment(pyotp.TOTP(secret).now())
|
|
integration_client.auth.logout()
|
|
|
|
# Admin removes TOTP
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.admin.remove_user_mfa(victim["id"], "totp")
|
|
assert_success(result)
|
|
|
|
# Verify victim's TOTP is disabled
|
|
integration_client.auth.logout()
|
|
integration_client.auth.login(email=victim["email"], password="VictimPass123!")
|
|
status = integration_client.mfa.get_totp_status()
|
|
assert status["data"].get("totp_enabled") is False
|
|
|
|
def test_admin_hard_delete_user_positive(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ADMIN-11 — Admin hard-deletes user.
|
|
|
|
WHAT: Admin hard-deletes a user, verify they cannot login.
|
|
WHY: GDPR compliance and removing malicious actors.
|
|
EXPECTED: 200 OK. Login fails (user no longer exists).
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
victim = create_test_user(password="VictimPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.OWNER)
|
|
create_test_membership(victim["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.admin.hard_delete_user(victim["id"], confirm=True)
|
|
assert_success(result)
|
|
|
|
# Verify victim cannot login
|
|
integration_client.auth.logout()
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.auth.login(email=victim["email"], password="VictimPass123!")
|
|
assert exc_info.value.status_code in (400, 401)
|
|
|
|
def test_admin_soft_delete_cascades_org_membership(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ADMIN-12 — Admin soft-delete cascades to org memberships.
|
|
|
|
WHAT: Admin soft-deletes a user in an org. The membership row
|
|
and the user row both get soft-deleted.
|
|
WHY: Ghost memberships (membership active but user deleted) would
|
|
make Organization.is_member() return True and break lookups.
|
|
EXPECTED: 200 OK. OrganizationMember.deleted_at and User.deleted_at
|
|
are set. Organization.is_member() returns False.
|
|
"""
|
|
from gatehouse_app.models.organization.organization_member import OrganizationMember
|
|
from gatehouse_app.models.organization.organization import Organization
|
|
from gatehouse_app.models.user.user import User
|
|
from gatehouse_app.extensions import db
|
|
|
|
admin = create_test_user(password="AdminPass123!")
|
|
victim = create_test_user(password="VictimPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.OWNER)
|
|
create_test_membership(victim["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.admin.hard_delete_user(victim["id"], confirm=True)
|
|
assert_success(result)
|
|
|
|
with integration_app.app_context():
|
|
# Membership row still exists but is soft-deleted
|
|
membership = OrganizationMember.query.filter_by(
|
|
user_id=victim["id"], organization_id=org["id"]
|
|
).first()
|
|
assert membership is not None
|
|
assert membership.deleted_at is not None
|
|
|
|
# User row is soft-deleted
|
|
user = User.query.get(victim["id"])
|
|
assert user is not None
|
|
assert user.deleted_at is not None
|
|
|
|
# Organization.is_member() returns False (defense in depth)
|
|
org_obj = Organization.query.get(org["id"])
|
|
assert org_obj.is_member(victim["id"]) is False
|
|
|
|
|
|
class TestAdminSSHCertificates:
|
|
"""Test admin SSH certificate listing endpoints."""
|
|
|
|
def _create_test_cert(
|
|
self, integration_app, user_id: str, ca_id: str, *, ssh_key_id=None,
|
|
status="issued", revoked=False, valid_after=None, valid_before=None,
|
|
cert_type="user", principals=None,
|
|
):
|
|
"""Create a test SSH certificate record."""
|
|
from datetime import datetime, timezone, timedelta
|
|
from gatehouse_app.models.ssh_ca.ssh_certificate import SSHCertificate, CertificateStatus
|
|
from gatehouse_app.models.ssh_ca.ca import CertType
|
|
|
|
now = datetime.now(timezone.utc)
|
|
valid_after = valid_after or (now - timedelta(hours=1))
|
|
valid_before = valid_before or (now + timedelta(hours=23))
|
|
principals = principals or ["prod-servers"]
|
|
|
|
with integration_app.app_context():
|
|
cert = SSHCertificate(
|
|
ca_id=ca_id,
|
|
user_id=user_id,
|
|
ssh_key_id=ssh_key_id,
|
|
certificate=f"ssh-ed25519-cert-v01@openssh.com AAAA...test_serial_{uuid.uuid4().hex[:8]}",
|
|
serial=str(uuid.uuid4().int)[:20],
|
|
key_id=f"test@example.com-{uuid.uuid4().hex[:8]}",
|
|
cert_type=CertType(cert_type),
|
|
principals=principals,
|
|
valid_after=valid_after,
|
|
valid_before=valid_before,
|
|
revoked=revoked,
|
|
status=CertificateStatus(status),
|
|
request_ip="192.168.1.100",
|
|
request_user_agent="OpenSSH_9.0",
|
|
)
|
|
if revoked:
|
|
cert.revoked_at = now
|
|
cert.revoke_reason = "test revocation"
|
|
db.session.add(cert)
|
|
db.session.commit()
|
|
return str(cert.id)
|
|
|
|
def _create_test_ssh_key(self, integration_app, user_id: str, fingerprint: str = None):
|
|
"""Create a test SSH key record."""
|
|
from gatehouse_app.models.ssh_ca.ssh_key import SSHKey
|
|
|
|
fingerprint = fingerprint or f"SHA256:{uuid.uuid4().hex[:43]}"
|
|
with integration_app.app_context():
|
|
key = SSHKey(
|
|
user_id=user_id,
|
|
payload=f"ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAI...test",
|
|
fingerprint=fingerprint,
|
|
description="Test laptop key",
|
|
verified=True,
|
|
key_type="ssh-ed25519",
|
|
key_bits=256,
|
|
key_comment="test@laptop",
|
|
)
|
|
db.session.add(key)
|
|
db.session.commit()
|
|
return str(key.id)
|
|
|
|
def test_list_user_ssh_certs_positive(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership, create_test_ca):
|
|
"""TEST: ADMIN-SSH-01 — List all SSH certificates for a user as admin.
|
|
|
|
WHAT: Create a user with two certs (one active, one expired),
|
|
admin lists all certs via the new endpoint.
|
|
WHY: Admin needs full visibility of user SSH certificate history.
|
|
EXPECTED: 200 OK with certificates array containing both certs.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
victim = create_test_user(password="VictimPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.OWNER)
|
|
create_test_membership(victim["id"], org["id"], OrganizationRole.MEMBER)
|
|
ca = create_test_ca(org_id=org["id"])
|
|
|
|
from datetime import datetime, timezone, timedelta
|
|
now = datetime.now(timezone.utc)
|
|
|
|
# Create an active cert
|
|
self._create_test_cert(
|
|
integration_app, victim["id"], ca["id"],
|
|
status="issued", valid_after=now - timedelta(hours=1),
|
|
valid_before=now + timedelta(hours=23),
|
|
)
|
|
# Create an expired cert
|
|
self._create_test_cert(
|
|
integration_app, victim["id"], ca["id"],
|
|
status="expired", valid_after=now - timedelta(days=7),
|
|
valid_before=now - timedelta(days=1),
|
|
)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.admin.get_user_ssh_certificates(victim["id"])
|
|
data = assert_success(result)
|
|
assert "certificates" in data
|
|
assert data["count"] == 2
|
|
assert len(data["certificates"]) == 2
|
|
|
|
def test_list_user_ssh_certs_with_key_metadata(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership, create_test_ca):
|
|
"""TEST: ADMIN-SSH-02 — Certificate includes SSH key metadata.
|
|
|
|
WHAT: Create a cert linked to an SSH key, verify key details
|
|
appear in the response.
|
|
WHY: Admin needs to see which key was used to request the cert.
|
|
EXPECTED: ssh_key object with fingerprint, key_type, key_bits.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
victim = create_test_user(password="VictimPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.OWNER)
|
|
create_test_membership(victim["id"], org["id"], OrganizationRole.MEMBER)
|
|
ca = create_test_ca(org_id=org["id"])
|
|
|
|
key_id = self._create_test_ssh_key(integration_app, victim["id"])
|
|
self._create_test_cert(integration_app, victim["id"], ca["id"], ssh_key_id=key_id)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.admin.get_user_ssh_certificates(victim["id"])
|
|
data = assert_success(result)
|
|
|
|
cert = data["certificates"][0]
|
|
assert cert["ssh_key"] is not None
|
|
assert cert["ssh_key"]["key_type"] == "ssh-ed25519"
|
|
assert cert["ssh_key"]["fingerprint"] is not None
|
|
assert cert["ssh_key"]["description"] == "Test laptop key"
|
|
|
|
def test_list_user_ssh_certs_non_admin_negative(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership, create_test_ca):
|
|
"""TEST: ADMIN-SSH-03 — Non-admin cannot list another user's certs.
|
|
|
|
WHAT: Regular member tries to list admin's certs.
|
|
WHY: Certificate data is sensitive and admin-only.
|
|
EXPECTED: 403 Forbidden.
|
|
"""
|
|
member = create_test_user(password="MemberPass123!")
|
|
admin_user = create_test_user(password="AdminPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(member["id"], org["id"], OrganizationRole.MEMBER)
|
|
create_test_membership(admin_user["id"], org["id"], OrganizationRole.OWNER)
|
|
|
|
integration_client.auth.login(email=member["email"], password="MemberPass123!")
|
|
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.admin.get_user_ssh_certificates(admin_user["id"])
|
|
|
|
assert exc_info.value.status_code == 403
|
|
|
|
def test_list_user_ssh_certs_filter_by_status(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership, create_test_ca):
|
|
"""TEST: ADMIN-SSH-04 — Filter certificates by status.
|
|
|
|
WHAT: Create certs with different statuses, filter by status=revoked.
|
|
WHY: Admin may want to see only revoked certs to audit access.
|
|
EXPECTED: Only revoked certs returned.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
victim = create_test_user(password="VictimPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.OWNER)
|
|
create_test_membership(victim["id"], org["id"], OrganizationRole.MEMBER)
|
|
ca = create_test_ca(org_id=org["id"])
|
|
|
|
from datetime import datetime, timezone, timedelta
|
|
now = datetime.now(timezone.utc)
|
|
|
|
self._create_test_cert(integration_app, victim["id"], ca["id"], status="issued")
|
|
self._create_test_cert(integration_app, victim["id"], ca["id"], status="revoked", revoked=True)
|
|
self._create_test_cert(integration_app, victim["id"], ca["id"], status="expired")
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.admin.get_user_ssh_certificates(victim["id"], status="revoked")
|
|
data = assert_success(result)
|
|
|
|
assert data["count"] == 1
|
|
assert data["certificates"][0]["status"] == "revoked"
|
|
assert data["certificates"][0]["revoked"] is True
|
|
|
|
def test_list_user_ssh_certs_filter_active_only(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership, create_test_ca):
|
|
"""TEST: ADMIN-SSH-05 — Filter for only currently valid certificates.
|
|
|
|
WHAT: Create active and expired certs, filter by active=true.
|
|
WHY: Admin needs quick view of currently active certs.
|
|
EXPECTED: Only valid (non-revoked, non-expired) certs returned.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
victim = create_test_user(password="VictimPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.OWNER)
|
|
create_test_membership(victim["id"], org["id"], OrganizationRole.MEMBER)
|
|
ca = create_test_ca(org_id=org["id"])
|
|
|
|
from datetime import datetime, timezone, timedelta
|
|
now = datetime.now(timezone.utc)
|
|
|
|
self._create_test_cert(
|
|
integration_app, victim["id"], ca["id"], status="issued",
|
|
valid_after=now - timedelta(hours=1), valid_before=now + timedelta(hours=23),
|
|
)
|
|
self._create_test_cert(
|
|
integration_app, victim["id"], ca["id"], status="expired",
|
|
valid_after=now - timedelta(days=7), valid_before=now - timedelta(days=1),
|
|
)
|
|
self._create_test_cert(
|
|
integration_app, victim["id"], ca["id"], status="revoked", revoked=True,
|
|
valid_after=now - timedelta(hours=1), valid_before=now + timedelta(hours=23),
|
|
)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.admin.get_user_ssh_certificates(victim["id"], active="true")
|
|
data = assert_success(result)
|
|
|
|
assert data["count"] == 1
|
|
cert = data["certificates"][0]
|
|
assert cert["is_valid"] is True
|
|
assert cert["revoked"] is False
|
|
|
|
def test_list_user_ssh_certs_user_not_found(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ADMIN-SSH-06 — Return 404 for non-existent user.
|
|
|
|
WHAT: Admin requests certs for a user ID that doesn't exist.
|
|
WHY: Clear error for missing resources.
|
|
EXPECTED: 404 NOT_FOUND.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.OWNER)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.admin.get_user_ssh_certificates("non-existent-user-id")
|
|
|
|
assert exc_info.value.status_code == 404
|
|
assert exc_info.value.error_type == "NOT_FOUND"
|
|
|
|
def test_list_user_ssh_certs_empty_result(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ADMIN-SSH-07 — Empty result when user has no certs.
|
|
|
|
WHAT: Admin lists certs for a user who has never requested one.
|
|
WHY: Endpoint should handle gracefully, not error.
|
|
EXPECTED: 200 OK with empty certificates array and count=0.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
victim = create_test_user(password="VictimPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.OWNER)
|
|
create_test_membership(victim["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.admin.get_user_ssh_certificates(victim["id"])
|
|
data = assert_success(result)
|
|
|
|
assert data["certificates"] == []
|
|
assert data["count"] == 0
|
|
|
|
def test_list_user_ssh_certs_revoked_cert_details(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership, create_test_ca):
|
|
"""TEST: ADMIN-SSH-08 — Revoked certificate shows revocation details.
|
|
|
|
WHAT: Create a revoked cert, verify revoke metadata is present.
|
|
WHY: Admin needs to know when and why a cert was revoked.
|
|
EXPECTED: revoked=True, revoked_at populated, revoke_reason present.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
victim = create_test_user(password="VictimPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.OWNER)
|
|
create_test_membership(victim["id"], org["id"], OrganizationRole.MEMBER)
|
|
ca = create_test_ca(org_id=org["id"])
|
|
|
|
self._create_test_cert(
|
|
integration_app, victim["id"], ca["id"],
|
|
status="revoked", revoked=True,
|
|
)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
result = integration_client.admin.get_user_ssh_certificates(victim["id"])
|
|
data = assert_success(result)
|
|
|
|
cert = data["certificates"][0]
|
|
assert cert["revoked"] is True
|
|
assert cert["revoked_at"] is not None
|
|
assert cert["revoke_reason"] == "test revocation"
|
|
assert cert["status"] == "revoked"
|
|
|
|
def test_list_user_ssh_certs_invalid_status_filter(self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership):
|
|
"""TEST: ADMIN-SSH-09 — Invalid status filter returns 400.
|
|
|
|
WHAT: Admin passes an invalid status value.
|
|
WHY: Input validation prevents confusing queries.
|
|
EXPECTED: 400 VALIDATION_ERROR.
|
|
"""
|
|
admin = create_test_user(password="AdminPass123!")
|
|
victim = create_test_user(password="VictimPass123!")
|
|
org = create_test_org()
|
|
create_test_membership(admin["id"], org["id"], OrganizationRole.OWNER)
|
|
create_test_membership(victim["id"], org["id"], OrganizationRole.MEMBER)
|
|
|
|
integration_client.auth.login(email=admin["email"], password="AdminPass123!")
|
|
|
|
with pytest.raises(ApiError) as exc_info:
|
|
integration_client.admin.get_user_ssh_certificates(victim["id"], status="bogus")
|
|
|
|
assert exc_info.value.status_code == 400
|
|
assert exc_info.value.error_type == "VALIDATION_ERROR"
|