Files
gatehouse-api/tests/integration/test_ssh_workflows.py
T
nexgen_mirrors 015c622016 test: add comprehensive integration test suite for IAM platform
Add 162 integration tests covering authentication flows, TOTP MFA,
SSH key/certificate management, organization workflows, multi-org
access, self-service features, admin operations, authorization,
security edge cases, department/principal management, CA management,
policy compliance, WebAuthn passkeys, and ZeroTier network access.

Includes:
- Reusable API client library with session management
- Test fixtures for users, organizations, memberships, and CAs
- Helper functions for SSH key generation and verification
- Documentation for running and writing tests

Also update test configuration to disable conflicting maas plugins
and configure WebAuthn/session settings for localhost testing.
2026-04-23 15:41:37 +09:30

936 lines
41 KiB
Python

"""SSH workflow integration tests.
Covers SSH key management, verification, certificate listing,
and CA public key retrieval.
"""
import pytest
import uuid
import tempfile
import subprocess
import os
import base64
from tests.integration.client.base import ApiError
from tests.integration.fixtures.ssh_keys import (
generate_unique_public_key,
TEST_PUBLIC_KEY,
INVALID_PUBLIC_KEY,
)
from gatehouse_app.utils.constants import OrganizationRole
def generate_real_public_key() -> str:
"""Return a cryptographically valid Ed25519 public key.
``generate_unique_public_key()`` creates structurally valid but
cryptographically invalid keys that fail the signing service's
stricter validation. This helper uses ``sshkey_tools`` (same
library the backend uses) to generate real key pairs.
"""
from sshkey_tools.keys import Ed25519PrivateKey
private_key_obj = Ed25519PrivateKey.generate()
return private_key_obj.public_key.to_string()
def assert_success(response: dict, message_contains: str = "") -> dict:
"""Assert that an api_response-wrapped payload succeeded."""
data = response.get("data", {})
assert response.get("success") is not False, (
f"Expected success but got error: {response.get('message')}"
)
if message_contains:
assert message_contains.lower() in response.get("message", "").lower(), (
f"Expected message to contain '{message_contains}' but got: {response.get('message')}"
)
return data
def assert_error(exc: ApiError, expected_status: int, expected_error_type: str | None = None):
"""Assert that an ApiError carries the expected status (and optionally error_type)."""
assert isinstance(exc, ApiError), (
f"Expected ApiError but got: {type(exc).__name__}{exc}"
)
assert exc.status_code == expected_status, (
f"Expected status {expected_status} but got {exc.status_code}\n"
f"URL: {exc.method} {exc.url}\n"
f"Response: {exc.response_data}"
)
if expected_error_type:
assert exc.error_type == expected_error_type, (
f"Expected error_type '{expected_error_type}' but got '{exc.error_type}'"
)
# =============================================================================
# Tier 1 — A. SSH Key Management
# =============================================================================
class TestSSHKeyManagement:
"""Test SSH key CRUD at POST /ssh/keys and related endpoints."""
def test_add_key_positive(self, integration_client, create_test_user):
"""TEST: SSH-KEY-01 — Add a new SSH public key.
WHAT: Authenticated user POSTs a valid public key with a description.
WHY: Users must be able to register their SSH keys for later
certificate signing and server access.
EXPECTED: 201 Created, response contains key id and metadata.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
key = generate_unique_public_key()
result = integration_client.ssh.add_key(key, "My Test Key")
data = assert_success(result, "added")
assert "id" in data, "Response missing key id"
# Verify it appears in the list
list_result = integration_client.ssh.list_keys()
list_data = assert_success(list_result)
assert list_data.get("count", 0) >= 1, "Key not found in list"
def test_add_key_invalid_format_negative(self, integration_client, create_test_user):
"""TEST: SSH-KEY-02 — Reject invalid public key format.
WHAT: POST /ssh/keys with a malformed public key string.
WHY: Invalid keys must be rejected early to prevent storage
of garbage data and downstream signing failures.
EXPECTED: 400 Bad Request.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.add_key(INVALID_PUBLIC_KEY, "Bad Key")
assert_error(exc_info.value, 400)
def test_add_duplicate_key_negative(self, integration_client, create_test_user):
"""TEST: SSH-KEY-03 — Reject duplicate SSH key.
WHAT: User adds TEST_PUBLIC_KEY, then tries to add it again.
WHY: Fingerprints must be unique per database to avoid
ambiguity in key-to-user mappings.
EXPECTED: 409 Conflict with error_type SSH_KEY_ALREADY_EXISTS.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
integration_client.ssh.add_key(TEST_PUBLIC_KEY, "First")
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.add_key(TEST_PUBLIC_KEY, "Duplicate")
assert_error(exc_info.value, 409, "SSH_KEY_ALREADY_EXISTS")
def test_add_key_without_auth_negative(self, integration_client):
"""TEST: SSH-KEY-04 — Reject key upload without authentication.
WHAT: Clear token and attempt POST /ssh/keys.
WHY: Only authenticated users should register keys.
EXPECTED: 401 Unauthorized.
"""
integration_client.clear_token()
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.add_key(TEST_PUBLIC_KEY, "No Auth")
assert_error(exc_info.value, 401)
def test_get_own_key_positive(self, integration_client, create_test_user):
"""TEST: SSH-KEY-05 — Retrieve own SSH key by ID.
WHAT: Add a key, then GET /ssh/keys/<id>.
WHY: Key detail view shows fingerprint, description, and
verification status.
EXPECTED: 200 OK with key data.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
key = generate_unique_public_key()
add_result = integration_client.ssh.add_key(key, "Detail Test")
key_id = add_result["data"]["id"]
result = integration_client.ssh.get_key(key_id)
data = assert_success(result, "retrieved")
assert data["id"] == key_id
def test_get_another_users_key_negative(self, integration_client, create_test_user):
"""TEST: SSH-KEY-06 — Reject retrieving another user's key.
WHAT: User A adds a key. User B tries to GET it.
WHY: Keys must be private to their owner.
EXPECTED: 403 Forbidden.
"""
user_a = create_test_user(password="PassA123!")
user_b = create_test_user(password="PassB123!")
key = generate_unique_public_key()
integration_client.auth.login(email=user_a["email"], password="PassA123!")
add_result = integration_client.ssh.add_key(key, "User A Key")
key_id = add_result["data"]["id"]
integration_client.auth.logout()
integration_client.auth.login(email=user_b["email"], password="PassB123!")
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.get_key(key_id)
assert_error(exc_info.value, 403, "FORBIDDEN")
def test_get_nonexistent_key_negative(self, integration_client, create_test_user):
"""TEST: SSH-KEY-07 — Reject retrieving a non-existent key.
WHAT: GET /ssh/keys/<random-uuid>.
WHY: Clean 404 handling avoids information leakage.
EXPECTED: 404 Not Found.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.get_key(str(uuid.uuid4()))
assert_error(exc_info.value, 404, "NOT_FOUND")
def test_update_description_positive(self, integration_client, create_test_user):
"""TEST: SSH-KEY-08 — Update key description.
WHAT: Add a key, then PATCH description.
WHY: Users rename keys as their usage changes (e.g.
"laptop" -> "desktop").
EXPECTED: 200 OK with updated data.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
key = generate_unique_public_key()
add_result = integration_client.ssh.add_key(key, "Old Name")
key_id = add_result["data"]["id"]
result = integration_client.ssh.update_description(key_id, "New Name")
assert_success(result, "updated")
def test_update_description_other_users_key_negative(self, integration_client, create_test_user):
"""TEST: SSH-KEY-09 — Reject updating another user's key description.
WHAT: User A adds a key. User B tries to PATCH it.
WHY: Users must not modify each other's key metadata.
EXPECTED: 403 Forbidden.
"""
user_a = create_test_user(password="PassA123!")
user_b = create_test_user(password="PassB123!")
key = generate_unique_public_key()
integration_client.auth.login(email=user_a["email"], password="PassA123!")
add_result = integration_client.ssh.add_key(key, "User A")
key_id = add_result["data"]["id"]
integration_client.auth.logout()
integration_client.auth.login(email=user_b["email"], password="PassB123!")
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.update_description(key_id, "Hacked")
assert_error(exc_info.value, 403, "FORBIDDEN")
def test_update_description_missing_field_negative(self, integration_client, create_test_user):
"""TEST: SSH-KEY-10 — Reject update without description field.
WHAT: PATCH /ssh/keys/<id>/update-description with empty body.
WHY: The endpoint requires a description value.
EXPECTED: 400 Bad Request.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
key = generate_unique_public_key()
add_result = integration_client.ssh.add_key(key, "Test")
key_id = add_result["data"]["id"]
with pytest.raises(ApiError) as exc_info:
integration_client.patch(f"/ssh/keys/{key_id}/update-description", data={})
assert_error(exc_info.value, 400, "BAD_REQUEST")
def test_delete_key_positive(self, integration_client, create_test_user):
"""TEST: SSH-KEY-11 — Delete own SSH key.
WHAT: Add a key, DELETE it, then list keys.
WHY: Users rotate or retire keys and must remove stale entries.
EXPECTED: 200 OK; subsequent list shows count == 0.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
key = generate_unique_public_key()
add_result = integration_client.ssh.add_key(key, "To Delete")
key_id = add_result["data"]["id"]
result = integration_client.ssh.delete_key(key_id)
assert_success(result)
list_result = integration_client.ssh.list_keys()
list_data = assert_success(list_result)
assert list_data.get("count", -1) == 0, "Key was not deleted"
def test_delete_other_users_key_negative(self, integration_client, create_test_user):
"""TEST: SSH-KEY-12 — Reject deleting another user's key.
WHAT: User A adds a key. User B tries to DELETE it.
WHY: Cross-user deletion must be blocked.
EXPECTED: 403 Forbidden.
"""
user_a = create_test_user(password="PassA123!")
user_b = create_test_user(password="PassB123!")
key = generate_unique_public_key()
integration_client.auth.login(email=user_a["email"], password="PassA123!")
add_result = integration_client.ssh.add_key(key, "User A Key")
key_id = add_result["data"]["id"]
integration_client.auth.logout()
integration_client.auth.login(email=user_b["email"], password="PassB123!")
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.delete_key(key_id)
assert_error(exc_info.value, 403, "FORBIDDEN")
# =============================================================================
# Tier 1 — B. SSH Key Verification
# =============================================================================
class TestSSHKeyVerification:
"""Test SSH key ownership verification using real ssh-keygen signatures."""
def test_verify_key_positive(self, integration_client, create_test_user):
"""TEST: SSH-VERIFY-01 — Verify ownership with valid signature.
WHAT: Generate a real Ed25519 key pair, upload the public key,
request a challenge, sign it with ssh-keygen, and submit
the signature.
WHY: Proving key ownership is required before certificates
can be issued.
EXPECTED: 200 OK with verified=True.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
with tempfile.TemporaryDirectory() as tmpdir:
key_path = os.path.join(tmpdir, "test_key")
gen_proc = subprocess.run(
["ssh-keygen", "-t", "ed25519", "-f", key_path, "-N", "", "-C", "test@example.com"],
capture_output=True,
)
if gen_proc.returncode != 0:
pytest.skip(f"ssh-keygen not available: {gen_proc.stderr.decode()}")
with open(key_path + ".pub", "r") as pub_f:
public_key = pub_f.read().strip()
add_result = integration_client.ssh.add_key(public_key, "Verify Test")
key_id = add_result["data"]["id"]
# Get challenge
challenge_result = integration_client.ssh.get_challenge(key_id)
challenge_text = challenge_result["data"]["challenge_text"]
# Sign challenge with ssh-keygen
sig_path = key_path + ".sig"
sign_proc = subprocess.run(
["ssh-keygen", "-Y", "sign", "-f", key_path, "-n", "file", sig_path],
input=challenge_text.encode(),
capture_output=True,
)
if sign_proc.returncode != 0:
pytest.skip(f"ssh-keygen sign failed: {sign_proc.stderr.decode()}")
with open(sig_path, "rb") as sf:
signature_b64 = base64.b64encode(sf.read()).decode()
result = integration_client.ssh.verify_key(key_id, signature_b64)
data = assert_success(result, "verification complete")
assert data.get("verified") is True
def test_verify_key_invalid_signature_negative(self, integration_client, create_test_user):
"""TEST: SSH-VERIFY-02 — Reject verification with invalid signature.
WHAT: Add a key and submit a bogus base64 signature.
WHY: Forged signatures must fail verification.
EXPECTED: 400 Bad Request with error_type VERIFICATION_FAILED.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
key = generate_unique_public_key()
add_result = integration_client.ssh.add_key(key, "Invalid Sig")
key_id = add_result["data"]["id"]
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.verify_key(key_id, "bm90LWEtdmFsaWQtc2lnbmF0dXJl")
assert_error(exc_info.value, 400, "VERIFICATION_FAILED")
def test_verify_key_without_signature_negative(self, integration_client, create_test_user):
"""TEST: SSH-VERIFY-03 — Reject verification without signature field.
WHAT: POST /ssh/keys/<id>/verify with action but no signature.
WHY: The endpoint requires a signature to verify.
EXPECTED: 400 Bad Request with error_type BAD_REQUEST.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
key = generate_unique_public_key()
add_result = integration_client.ssh.add_key(key, "No Sig")
key_id = add_result["data"]["id"]
with pytest.raises(ApiError) as exc_info:
integration_client.post(
f"/ssh/keys/{key_id}/verify",
data={"action": "verify_signature"},
)
assert_error(exc_info.value, 400, "BAD_REQUEST")
def test_verify_key_other_users_key_negative(self, integration_client, create_test_user):
"""TEST: SSH-VERIFY-04 — Reject verifying another user's key.
WHAT: User A adds a key. User B tries to verify it.
WHY: Users must not verify keys they do not own.
EXPECTED: 403 Forbidden.
"""
user_a = create_test_user(password="PassA123!")
user_b = create_test_user(password="PassB123!")
key = generate_unique_public_key()
integration_client.auth.login(email=user_a["email"], password="PassA123!")
add_result = integration_client.ssh.add_key(key, "User A Key")
key_id = add_result["data"]["id"]
integration_client.auth.logout()
integration_client.auth.login(email=user_b["email"], password="PassB123!")
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.verify_key(key_id, "fake-sig")
assert_error(exc_info.value, 403, "FORBIDDEN")
def test_verify_key_nonexistent_key_negative(self, integration_client, create_test_user):
"""TEST: SSH-VERIFY-05 — Reject verifying a non-existent key.
WHAT: Attempt verify_key on a random UUID.
WHY: Clean 404 handling.
EXPECTED: 404 Not Found.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.verify_key(str(uuid.uuid4()), "fake-sig")
assert_error(exc_info.value, 404, "NOT_FOUND")
def test_list_keys_empty_positive(self, integration_client, create_test_user):
"""TEST: SSH-VERIFY-06 — List keys returns empty for new user.
WHAT: Create a fresh user and call list_keys.
WHY: UI expects a consistent empty state before any keys are added.
EXPECTED: 200 OK with count == 0 and keys == [].
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
result = integration_client.ssh.list_keys()
data = assert_success(result)
assert data.get("count", -1) == 0
assert data.get("keys", None) == []
# =============================================================================
# Tier 1 — C. SSH Certificate Listing & CA Public Key
# =============================================================================
class TestCertificateListing:
"""Test certificate listing and CA public key retrieval."""
def test_list_certificates_empty_positive(self, integration_client, create_test_user):
"""TEST: SSH-CERT-10 — List certificates returns empty for new user.
WHAT: Fresh user calls list_certificates.
WHY: UI needs an empty state before any certificates are issued.
EXPECTED: 200 OK with count == 0.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
result = integration_client.ssh.list_certificates()
data = assert_success(result)
assert data.get("count", -1) == 0
def test_get_ca_public_key_positive(
self, integration_client, create_test_user, create_test_org, create_test_membership, create_test_ca
):
"""TEST: SSH-CERT-11 — Retrieve CA public key when CA exists.
WHAT: User is a member of an org that has an active CA.
WHY: Clients need the CA public key to configure
TrustedUserCAKeys on servers.
EXPECTED: 200 OK with public_key, fingerprint, ca_name.
"""
user = create_test_user(password="MyPassword123!")
org = create_test_org()
create_test_membership(user["id"], org["id"], OrganizationRole.MEMBER)
create_test_ca(org_id=org["id"], name="Test CA", ca_type="user", key_type="ed25519")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
result = integration_client.ssh.get_ca_public_key()
data = assert_success(result, "retrieved")
assert "public_key" in data, "Response missing public_key"
assert "fingerprint" in data, "Response missing fingerprint"
def test_get_ca_public_key_no_ca_negative(
self, integration_client, create_test_user, create_test_org, create_test_membership
):
"""TEST: SSH-CERT-12 — Reject CA public key retrieval when no CA exists.
WHAT: User is a member of an org with NO CA configured.
WHY: Clear error when infrastructure is missing.
EXPECTED: 404 Not Found with error_type CA_NOT_CONFIGURED.
"""
user = create_test_user(password="MyPassword123!")
org = create_test_org()
create_test_membership(user["id"], org["id"], OrganizationRole.MEMBER)
integration_client.auth.login(email=user["email"], password="MyPassword123!")
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.get_ca_public_key()
assert_error(exc_info.value, 404, "CA_NOT_CONFIGURED")
# =============================================================================
# Helpers for certificate tests
# =============================================================================
def _mark_key_verified(integration_app, key_id: str) -> None:
"""Bypass the signature verification step by marking the key verified in DB.
The test environment does not provide ssh-keygen, so tests that need
a verified key (prerequisite for certificate signing) set the flag
directly. This keeps the certificate signing tests independent of
external crypto tooling while still exercising the real API endpoints.
"""
from gatehouse_app.models.ssh_ca.ssh_key import SSHKey
from gatehouse_app.extensions import db
with integration_app.app_context():
ssh_key = db.session.get(SSHKey, key_id)
if ssh_key:
ssh_key.verified = True
db.session.commit()
# =============================================================================
# Tier 1 — D. SSH Certificate Signing
# =============================================================================
class TestCertificateSigning:
"""Test SSH certificate signing at POST /ssh/sign."""
def test_sign_certificate_default_principals_positive(
self, integration_app, integration_client, create_test_user
):
"""TEST: SSH-CERT-01 — Sign certificate with default principals.
WHAT: Owner user with verified key, org, principal, and CA.
Request certificate without specifying principals.
WHY: Default principals should auto-populate from the user's
assigned principals.
EXPECTED: 201 Created, response contains certificate, serial,
and the principal name.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
# Create org (caller becomes owner)
org_result = integration_client.orgs.create(
f"Cert Org {uuid.uuid4().hex[:6]}", f"cert-org-{uuid.uuid4().hex[:6]}"
)
org_id = org_result["data"]["organization"]["id"]
# Create principal
princ_result = integration_client.orgs.create_principal(org_id, "deploy", "Deploy principal")
princ_name = princ_result["data"]["principal"]["name"]
# Create CA
integration_client.orgs.create_ca(org_id, "Test CA", ca_type="user", key_type="ed25519")
# Add and verify key
key = generate_real_public_key()
add_result = integration_client.ssh.add_key(key, "Cert Key")
key_id = add_result["data"]["id"]
_mark_key_verified(integration_app, key_id)
# Sign certificate (no principals specified -> defaults)
result = integration_client.ssh.sign_certificate(key_id=key_id)
data = assert_success(result, "signed successfully")
assert "certificate" in data, "Response missing certificate"
assert "serial" in data, "Response missing serial"
assert princ_name in data.get("principals", []), "Expected principal not in response"
def test_sign_certificate_custom_principals_positive(
self, integration_app, integration_client, create_test_user
):
"""TEST: SSH-CERT-02 — Sign certificate with custom principals.
WHAT: Owner user with verified key, org, two principals, and CA.
Request certificate with only one of the principals.
WHY: Users should be able to request a subset of their
authorized principals.
EXPECTED: 201 Created, principals list contains exactly the
requested principal.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
org_result = integration_client.orgs.create(
f"Cert Org 2 {uuid.uuid4().hex[:6]}", f"cert-org-2-{uuid.uuid4().hex[:6]}"
)
org_id = org_result["data"]["organization"]["id"]
integration_client.orgs.create_principal(org_id, "deploy", "Deploy")
integration_client.orgs.create_principal(org_id, "prod", "Production")
integration_client.orgs.create_ca(org_id, "Test CA 2", ca_type="user", key_type="ed25519")
key = generate_real_public_key()
add_result = integration_client.ssh.add_key(key, "Cert Key 2")
key_id = add_result["data"]["id"]
_mark_key_verified(integration_app, key_id)
result = integration_client.ssh.sign_certificate(key_id=key_id, principals=["deploy"])
data = assert_success(result, "signed successfully")
assert data.get("principals") == ["deploy"], f"Unexpected principals: {data.get('principals')}"
def test_sign_certificate_unverified_key_negative(
self, integration_app, integration_client, create_test_user
):
"""TEST: SSH-CERT-03 — Reject signing with unverified key.
WHAT: User with an UNVERIFIED key, org, principal, and CA.
WHY: Only verified keys should be allowed to request certificates
to prevent certificate issuance for keys the user does not own.
EXPECTED: 400 Bad Request with error_type KEY_NOT_VERIFIED.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
org_result = integration_client.orgs.create(
f"Cert Org 3 {uuid.uuid4().hex[:6]}", f"cert-org-3-{uuid.uuid4().hex[:6]}"
)
org_id = org_result["data"]["organization"]["id"]
integration_client.orgs.create_principal(org_id, "deploy", "Deploy")
integration_client.orgs.create_ca(org_id, "Test CA 3", ca_type="user", key_type="ed25519")
key = generate_real_public_key()
add_result = integration_client.ssh.add_key(key, "Unverified Key")
key_id = add_result["data"]["id"]
# Deliberately NOT calling _mark_key_verified
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.sign_certificate(key_id=key_id)
assert_error(exc_info.value, 400, "KEY_NOT_VERIFIED")
def test_sign_certificate_no_principals_negative(
self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership
):
"""TEST: SSH-CERT-04 — Reject signing when user has no principals.
WHAT: Regular member with verified key and CA, but no principals
assigned.
WHY: Principals are required for certificate signing to control
access permissions.
EXPECTED: 400 Bad Request with error_type NO_PRINCIPALS.
"""
# Owner creates org and CA
owner = create_test_user(password="OwnerPass123!")
integration_client.auth.login(email=owner["email"], password="OwnerPass123!")
org_result = integration_client.orgs.create(
f"No Princ Org {uuid.uuid4().hex[:6]}", f"no-princ-org-{uuid.uuid4().hex[:6]}"
)
org_id = org_result["data"]["organization"]["id"]
integration_client.orgs.create_ca(org_id, "Test CA 4", ca_type="user", key_type="ed25519")
# Member joins org but gets no principals
member = create_test_user(password="MemberPass123!")
create_test_membership(member["id"], org_id, OrganizationRole.MEMBER)
integration_client.auth.logout()
integration_client.auth.login(email=member["email"], password="MemberPass123!")
key = generate_real_public_key()
add_result = integration_client.ssh.add_key(key, "No Princ Key")
key_id = add_result["data"]["id"]
_mark_key_verified(integration_app, key_id)
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.sign_certificate(key_id=key_id)
assert_error(exc_info.value, 400, "NO_PRINCIPALS")
def test_sign_certificate_unauthorized_principals_negative(
self, integration_app, integration_client, create_test_user, create_test_org, create_test_membership
):
"""TEST: SSH-CERT-05 — Reject signing with unauthorized principals.
WHAT: Member has verified key and is assigned to principal "deploy".
They request a certificate with principals ["deploy", "prod"].
WHY: Users must not request principals they are not authorized for.
EXPECTED: 403 Forbidden with error_type UNAUTHORIZED_PRINCIPALS.
"""
owner = create_test_user(password="OwnerPass123!")
integration_client.auth.login(email=owner["email"], password="OwnerPass123!")
org_result = integration_client.orgs.create(
f"Authz Org {uuid.uuid4().hex[:6]}", f"authz-org-{uuid.uuid4().hex[:6]}"
)
org_id = org_result["data"]["organization"]["id"]
integration_client.orgs.create_ca(org_id, "Test CA 5", ca_type="user", key_type="ed25519")
princ_result = integration_client.orgs.create_principal(org_id, "deploy", "Deploy")
princ_id = princ_result["data"]["principal"]["id"]
integration_client.orgs.create_principal(org_id, "prod", "Production")
member = create_test_user(password="MemberPass123!")
create_test_membership(member["id"], org_id, OrganizationRole.MEMBER)
integration_client.orgs.add_principal_member(org_id, princ_id, member["email"])
integration_client.auth.logout()
integration_client.auth.login(email=member["email"], password="MemberPass123!")
key = generate_real_public_key()
add_result = integration_client.ssh.add_key(key, "Authz Key")
key_id = add_result["data"]["id"]
_mark_key_verified(integration_app, key_id)
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.sign_certificate(key_id=key_id, principals=["deploy", "prod"])
assert_error(exc_info.value, 403, "UNAUTHORIZED_PRINCIPALS")
def test_sign_certificate_suspended_account_negative(
self, integration_app, integration_client, create_test_user
):
"""TEST: SSH-CERT-06 — Reject signing with suspended account.
WHAT: User with verified key, principals, and CA is then suspended.
WHY: Suspended accounts should not be able to obtain new credentials.
EXPECTED: 403 Forbidden with error_type ACCOUNT_SUSPENDED.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
org_result = integration_client.orgs.create(
f"Susp Org {uuid.uuid4().hex[:6]}", f"susp-org-{uuid.uuid4().hex[:6]}"
)
org_id = org_result["data"]["organization"]["id"]
integration_client.orgs.create_principal(org_id, "deploy", "Deploy")
integration_client.orgs.create_ca(org_id, "Test CA 6", ca_type="user", key_type="ed25519")
key = generate_real_public_key()
add_result = integration_client.ssh.add_key(key, "Susp Key")
key_id = add_result["data"]["id"]
_mark_key_verified(integration_app, key_id)
# Suspend user via DB (no admin setup required)
from gatehouse_app.models.user.user import User
from gatehouse_app.utils.constants import UserStatus
from gatehouse_app.extensions import db
with integration_app.app_context():
user_obj = db.session.get(User, user["id"])
user_obj.status = UserStatus.SUSPENDED
db.session.commit()
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.sign_certificate(key_id=key_id)
assert_error(exc_info.value, 403, "ACCOUNT_SUSPENDED")
def test_sign_certificate_no_ca_negative(
self, integration_app, integration_client, create_test_user
):
"""TEST: SSH-CERT-07 — Reject signing when no CA is configured.
WHAT: User with verified key and principals, but org has NO CA.
WHY: A CA is required to sign certificates.
EXPECTED: 503 Service Unavailable with error_type CA_NOT_CONFIGURED.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
org_result = integration_client.orgs.create(
f"No CA Org {uuid.uuid4().hex[:6]}", f"no-ca-org-{uuid.uuid4().hex[:6]}"
)
org_id = org_result["data"]["organization"]["id"]
integration_client.orgs.create_principal(org_id, "deploy", "Deploy")
# Deliberately NOT creating a CA
key = generate_real_public_key()
add_result = integration_client.ssh.add_key(key, "No CA Key")
key_id = add_result["data"]["id"]
_mark_key_verified(integration_app, key_id)
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.sign_certificate(key_id=key_id)
assert_error(exc_info.value, 503, "CA_NOT_CONFIGURED")
def test_sign_certificate_cross_user_key_negative(
self, integration_app, integration_client, create_test_user
):
"""TEST: SSH-CERT-08 — Reject signing with another user's key.
WHAT: User A adds and verifies a key. User B creates org, CA,
and principals, then tries to sign using User A's key_id.
WHY: Cross-user certificate signing must be blocked.
EXPECTED: 403 Forbidden.
"""
user_a = create_test_user(password="PassA123!")
integration_client.auth.login(email=user_a["email"], password="PassA123!")
key = generate_real_public_key()
add_result = integration_client.ssh.add_key(key, "User A Key")
key_id_a = add_result["data"]["id"]
_mark_key_verified(integration_app, key_id_a)
user_b = create_test_user(password="PassB123!")
integration_client.auth.logout()
integration_client.auth.login(email=user_b["email"], password="PassB123!")
org_result = integration_client.orgs.create(
f"Cross Org {uuid.uuid4().hex[:6]}", f"cross-org-{uuid.uuid4().hex[:6]}"
)
org_id = org_result["data"]["organization"]["id"]
integration_client.orgs.create_principal(org_id, "deploy", "Deploy")
integration_client.orgs.create_ca(org_id, "Test CA 7", ca_type="user", key_type="ed25519")
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.sign_certificate(key_id=key_id_a)
assert_error(exc_info.value, 403, "FORBIDDEN")
# =============================================================================
# Tier 1 — E. SSH Certificate Management
# =============================================================================
class TestCertificateManagement:
"""Test SSH certificate get and revoke operations."""
def _sign_cert_for_user(
self, integration_app, integration_client, create_test_user
) -> tuple[dict, str]:
"""Helper: create org, principal, CA, key, sign cert. Return (user, cert_id)."""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
org_result = integration_client.orgs.create(
f"Mgmt Org {uuid.uuid4().hex[:6]}", f"mgmt-org-{uuid.uuid4().hex[:6]}"
)
org_id = org_result["data"]["organization"]["id"]
integration_client.orgs.create_principal(org_id, "deploy", "Deploy")
integration_client.orgs.create_ca(org_id, "Mgmt CA", ca_type="user", key_type="ed25519")
key = generate_real_public_key()
add_result = integration_client.ssh.add_key(key, "Mgmt Key")
key_id = add_result["data"]["id"]
_mark_key_verified(integration_app, key_id)
sign_result = integration_client.ssh.sign_certificate(key_id=key_id)
data = assert_success(sign_result, "signed successfully")
cert_id = data["cert_id"]
return user, cert_id
def test_get_certificate_positive(self, integration_app, integration_client, create_test_user):
"""TEST: SSH-CERT-13 — Retrieve own certificate details.
WHAT: Sign a certificate, then GET /ssh/certificates/<id>.
WHY: Users need to inspect certificate metadata (serial,
principals, validity window).
EXPECTED: 200 OK with certificate data.
"""
user, cert_id = self._sign_cert_for_user(integration_app, integration_client, create_test_user)
result = integration_client.ssh.get_certificate(cert_id)
data = assert_success(result, "retrieved")
assert data.get("id") == cert_id
assert "serial" in data
def test_revoke_certificate_positive(self, integration_app, integration_client, create_test_user):
"""TEST: SSH-CERT-14 — Revoke own certificate.
WHAT: Sign a certificate, then POST /ssh/certificates/<id>/revoke.
WHY: Users must be able to invalidate compromised or
no-longer-needed certificates.
EXPECTED: 200 OK with status revoked.
"""
user, cert_id = self._sign_cert_for_user(integration_app, integration_client, create_test_user)
result = integration_client.ssh.revoke_certificate(cert_id, reason="Rotated")
data = assert_success(result, "revoked")
assert data.get("status") == "revoked"
def test_revoke_already_revoked_certificate_negative(
self, integration_app, integration_client, create_test_user
):
"""TEST: SSH-CERT-15 — Reject revoking an already-revoked certificate.
WHAT: Sign, revoke, then attempt to revoke again.
WHY: Idempotent revocation attempts should return a clear error.
EXPECTED: 409 Conflict with error_type ALREADY_REVOKED.
"""
user, cert_id = self._sign_cert_for_user(integration_app, integration_client, create_test_user)
integration_client.ssh.revoke_certificate(cert_id)
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.revoke_certificate(cert_id)
assert_error(exc_info.value, 409, "ALREADY_REVOKED")
def test_revoke_other_users_certificate_negative(
self, integration_app, integration_client, create_test_user
):
"""TEST: SSH-CERT-16 — Reject revoking another user's certificate.
WHAT: User A signs a certificate. User B tries to revoke it.
WHY: Cross-user revocation must be blocked.
EXPECTED: 403 Forbidden.
"""
user_a, cert_id = self._sign_cert_for_user(integration_app, integration_client, create_test_user)
user_b = create_test_user(password="PassB123!")
integration_client.auth.logout()
integration_client.auth.login(email=user_b["email"], password="PassB123!")
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.revoke_certificate(cert_id)
assert_error(exc_info.value, 403, "FORBIDDEN")
def test_get_nonexistent_certificate_negative(self, integration_client, create_test_user):
"""TEST: SSH-CERT-17 — Reject retrieving a non-existent certificate.
WHAT: GET /ssh/certificates/<random-uuid>.
WHY: Clean 404 handling.
EXPECTED: 404 Not Found.
"""
user = create_test_user(password="MyPassword123!")
integration_client.auth.login(email=user["email"], password="MyPassword123!")
with pytest.raises(ApiError) as exc_info:
integration_client.ssh.get_certificate(str(uuid.uuid4()))
assert_error(exc_info.value, 404, "NOT_FOUND")