test: add API-level coverage for internal helpers, schemas, and service validation

This commit is contained in:
2026-04-25 22:17:41 +09:30
parent 1de10323af
commit bb977aedf9
11 changed files with 1079 additions and 0 deletions
@@ -0,0 +1,92 @@
import pytest
from gatehouse_app.api.v1.ssh._helpers import _classify_ssh_key_material
class TestClassifySSHKeyMaterial:
def test_classifies_certificate(self):
result = _classify_ssh_key_material("ssh-ed25519-cert-v01@openssh.com AAAA comment")
assert result == "certificate"
def test_classifies_ed25519_public_key(self):
result = _classify_ssh_key_material("ssh-ed25519 AAAAB3NzaC1lZDI1NTE5AAAAI... comment")
assert result == "public_key"
def test_classifies_rsa_public_key(self):
result = _classify_ssh_key_material("ssh-rsa AAAAB3NzaC1yc2E... comment")
assert result == "public_key"
def test_classifies_dss_public_key(self):
result = _classify_ssh_key_material("ssh-dss AAAAB3NzaC1kc3M... comment")
assert result == "public_key"
def test_classifies_ecdsa_nistp256_public_key(self):
result = _classify_ssh_key_material("ecdsa-sha2-nistp256 AAAAE2Vj... comment")
assert result == "public_key"
def test_classifies_ecdsa_nistp384_public_key(self):
result = _classify_ssh_key_material("ecdsa-sha2-nistp384 AAAAE2Vj... comment")
assert result == "public_key"
def test_classifies_ecdsa_nistp521_public_key(self):
result = _classify_ssh_key_material("ecdsa-sha2-nistp521 AAAAE2Vj... comment")
assert result == "public_key"
def test_classifies_sk_ed25519_public_key(self):
result = _classify_ssh_key_material(
"sk-ssh-ed25519@openssh.com AAAAGnNrLXNzaC1lZDI1NTE5... comment"
)
assert result == "public_key"
def test_classifies_openssh_private_key(self):
result = _classify_ssh_key_material(
"-----BEGIN OPENSSH PRIVATE KEY-----\n"
"base64data==\n"
"-----END OPENSSH PRIVATE KEY-----"
)
assert result == "private_key"
def test_classifies_rsa_private_key(self):
result = _classify_ssh_key_material(
"-----BEGIN RSA PRIVATE KEY-----\n"
"base64data==\n"
"-----END RSA PRIVATE KEY-----"
)
assert result == "private_key"
def test_unknown_for_empty_string(self):
result = _classify_ssh_key_material("")
assert result == "unknown"
def test_unknown_for_whitespace_string(self):
result = _classify_ssh_key_material(" \n ")
assert result == "unknown"
def test_unknown_for_gibberish(self):
result = _classify_ssh_key_material("not a valid ssh key")
assert result == "unknown"
def test_unknown_for_unsupported_key_type(self):
result = _classify_ssh_key_material("ssh-nonsense AAAABogus...")
assert result == "unknown"
@pytest.mark.parametrize("raw,expected", [
("ssh-rsa AAAAB3Nza... user@host", "public_key"),
("ssh-ed25519 AAAAC3... john@laptop", "public_key"),
("ecdsa-sha2-nistp256 AAAAE2Vj... me@box", "public_key"),
("sk-ssh-ed25519@openssh.com AAAAGn...", "public_key"),
("ssh-ed25519-cert-v01@openssh.com AAAAB3Nza cert for user", "certificate"),
(
"-----BEGIN OPENSSH PRIVATE KEY-----\n"
"abcdefghijklmnopqrstuvwxyz\n"
"-----END OPENSSH PRIVATE KEY-----",
"private_key",
),
("", "unknown"),
("totally random garbage here", "unknown"),
])
def test_parametrized_variants(self, raw, expected):
assert _classify_ssh_key_material(raw) == expected
def test_certificate_with_leading_whitespace(self):
raw = " ssh-ed25519-cert-v01@openssh.com AAAAB3Nza extra words"
assert _classify_ssh_key_material(raw) == "certificate"
+275
View File
@@ -0,0 +1,275 @@
import pytest
from datetime import datetime, timezone
from gatehouse_app.extensions import db
from gatehouse_app.models.organization.department import (
Department,
DepartmentMembership,
)
from gatehouse_app.models.organization.department_cert_policy import DepartmentCertPolicy
from gatehouse_app.api.v1.ssh._helpers import _get_merged_dept_cert_policy
class TestDeptCertPolicy:
def test_no_departments_returns_none(self, app, test_user):
with app.app_context():
result = _get_merged_dept_cert_policy(test_user)
assert result is None
def test_department_without_policy_returns_none(self, app, test_user, test_org):
with app.app_context():
dept = Department(
organization_id=test_org,
name="No Policy Dept",
)
db.session.add(dept)
db.session.commit()
membership = DepartmentMembership(
user_id=test_user,
department_id=dept.id,
)
db.session.add(membership)
db.session.commit()
result = _get_merged_dept_cert_policy(test_user)
assert result is None
def test_single_department_policy(self, app, test_user, test_org):
with app.app_context():
dept = Department(
organization_id=test_org,
name="Engineering",
)
db.session.add(dept)
db.session.commit()
membership = DepartmentMembership(
user_id=test_user,
department_id=dept.id,
)
db.session.add(membership)
db.session.commit()
policy = DepartmentCertPolicy(
department_id=dept.id,
allow_user_expiry=True,
default_expiry_hours=4,
max_expiry_hours=48,
allowed_extensions=["permit-pty", "permit-agent-forwarding"],
)
db.session.add(policy)
db.session.commit()
result = _get_merged_dept_cert_policy(test_user)
assert result is not None
assert result["allow_user_expiry"] is True
assert result["default_expiry_hours"] == 4
assert result["max_expiry_hours"] == 48
assert set(result["extensions"]) == {"permit-pty", "permit-agent-forwarding"}
def test_both_departments_same_policies(self, app, test_user, test_org):
with app.app_context():
dept1 = Department(
organization_id=test_org,
name="Engineering",
)
dept2 = Department(
organization_id=test_org,
name="SRE",
)
db.session.add_all([dept1, dept2])
db.session.commit()
member1 = DepartmentMembership(user_id=test_user, department_id=dept1.id)
member2 = DepartmentMembership(user_id=test_user, department_id=dept2.id)
db.session.add_all([member1, member2])
db.session.commit()
policy1 = DepartmentCertPolicy(
department_id=dept1.id,
allow_user_expiry=True,
default_expiry_hours=4,
max_expiry_hours=48,
allowed_extensions=["permit-pty", "permit-agent-forwarding"],
)
policy2 = DepartmentCertPolicy(
department_id=dept2.id,
allow_user_expiry=True,
default_expiry_hours=4,
max_expiry_hours=48,
allowed_extensions=["permit-pty", "permit-agent-forwarding"],
)
db.session.add_all([policy1, policy2])
db.session.commit()
result = _get_merged_dept_cert_policy(test_user)
assert result["allow_user_expiry"] is True
assert result["default_expiry_hours"] == 4
assert result["max_expiry_hours"] == 48
def test_merges_min_expiry_across_departments(self, app, test_user, test_org):
with app.app_context():
dept1 = Department(
organization_id=test_org,
name="Engineering",
)
dept2 = Department(
organization_id=test_org,
name="SRE",
)
db.session.add_all([dept1, dept2])
db.session.commit()
member1 = DepartmentMembership(user_id=test_user, department_id=dept1.id)
member2 = DepartmentMembership(user_id=test_user, department_id=dept2.id)
db.session.add_all([member1, member2])
db.session.commit()
policy1 = DepartmentCertPolicy(
department_id=dept1.id,
allow_user_expiry=True,
default_expiry_hours=24,
max_expiry_hours=720,
)
policy2 = DepartmentCertPolicy(
department_id=dept2.id,
allow_user_expiry=True,
default_expiry_hours=1,
max_expiry_hours=72,
)
db.session.add_all([policy1, policy2])
db.session.commit()
result = _get_merged_dept_cert_policy(test_user)
assert result["default_expiry_hours"] == 1
assert result["max_expiry_hours"] == 72
def test_extends_intersection_across_departments(self, app, test_user, test_org):
with app.app_context():
dept1 = Department(
organization_id=test_org,
name="Engineering",
)
dept2 = Department(
organization_id=test_org,
name="SRE",
)
db.session.add_all([dept1, dept2])
db.session.commit()
member1 = DepartmentMembership(user_id=test_user, department_id=dept1.id)
member2 = DepartmentMembership(user_id=test_user, department_id=dept2.id)
db.session.add_all([member1, member2])
db.session.commit()
policy1 = DepartmentCertPolicy(
department_id=dept1.id,
allowed_extensions=["permit-pty", "permit-agent-forwarding"],
)
policy2 = DepartmentCertPolicy(
department_id=dept2.id,
allowed_extensions=["permit-pty", "permit-port-forwarding"],
)
db.session.add_all([policy1, policy2])
db.session.commit()
result = _get_merged_dept_cert_policy(test_user)
assert set(result["extensions"]) == {"permit-pty"}
def test_any_false_user_expiry_means_overall_false(
self, app, test_user, test_org
):
with app.app_context():
dept1 = Department(
organization_id=test_org,
name="Engineering",
)
dept2 = Department(
organization_id=test_org,
name="SRE",
)
db.session.add_all([dept1, dept2])
db.session.commit()
member1 = DepartmentMembership(user_id=test_user, department_id=dept1.id)
member2 = DepartmentMembership(user_id=test_user, department_id=dept2.id)
db.session.add_all([member1, member2])
db.session.commit()
policy1 = DepartmentCertPolicy(
department_id=dept1.id,
allow_user_expiry=True,
)
policy2 = DepartmentCertPolicy(
department_id=dept2.id,
allow_user_expiry=False,
)
db.session.add_all([policy1, policy2])
db.session.commit()
result = _get_merged_dept_cert_policy(test_user)
assert result["allow_user_expiry"] is False
def test_deleted_department_filtered(self, app, test_user, test_org):
with app.app_context():
active_dept = Department(
organization_id=test_org,
name="Active Dept",
)
deleted_dept = Department(
organization_id=test_org,
name="Deleted Dept",
deleted_at=datetime.now(timezone.utc),
)
db.session.add_all([active_dept, deleted_dept])
db.session.commit()
active_member = DepartmentMembership(
user_id=test_user, department_id=active_dept.id
)
deleted_member = DepartmentMembership(
user_id=test_user,
department_id=deleted_dept.id,
deleted_at=datetime.now(timezone.utc),
)
db.session.add_all([active_member, deleted_member])
db.session.commit()
policy = DepartmentCertPolicy(
department_id=active_dept.id,
allow_user_expiry=True,
default_expiry_hours=12,
max_expiry_hours=96,
)
db.session.add(policy)
db.session.commit()
result = _get_merged_dept_cert_policy(test_user)
assert result is not None
assert result["default_expiry_hours"] == 12
def test_single_department_no_extensions(self, app, test_user, test_org):
with app.app_context():
dept = Department(
organization_id=test_org,
name="Minimal Dept",
)
db.session.add(dept)
db.session.commit()
membership = DepartmentMembership(
user_id=test_user, department_id=dept.id
)
db.session.add(membership)
db.session.commit()
policy = DepartmentCertPolicy(
department_id=dept.id,
allowed_extensions=[],
)
db.session.add(policy)
db.session.commit()
result = _get_merged_dept_cert_policy(test_user)
assert result is not None
assert result["extensions"] == []
+118
View File
@@ -0,0 +1,118 @@
import pytest
from uuid import uuid4
from datetime import datetime, timezone
from gatehouse_app.extensions import db
from gatehouse_app.models.user.user import User
from gatehouse_app.models.organization.organization import Organization
from gatehouse_app.models.organization.organization_member import OrganizationMember
from gatehouse_app.models.ssh_ca.ca import CA, CaType, KeyType
from gatehouse_app.api.v1.ssh._helpers import _get_org_ca_for_user
from gatehouse_app.utils.constants import OrganizationRole
class TestOrgCAForUser:
def test_organization_id_param_overrides_membership(self, app, test_user, test_org, test_ca, test_membership):
with app.app_context():
org2 = Organization(name="Org 2", slug="org-2")
db.session.add(org2)
db.session.commit()
ca2 = CA(
organization_id=org2.id,
name="Org 2 CA",
ca_type=CaType.USER,
key_type=KeyType.ED25519,
private_key="key2",
public_key="pubkey2",
fingerprint="sha256:org2...",
is_active=True,
)
db.session.add(ca2)
db.session.commit()
user = db.session.get(User, test_user)
result = _get_org_ca_for_user(user, ca_type="user", organization_id=test_org)
assert result is not None
assert result.organization_id == test_org
def test_multiple_orgs_returns_ca(self, app, test_user, test_org, test_ca, test_membership):
with app.app_context():
org2 = Organization(name="Org 2", slug="org-2")
db.session.add(org2)
db.session.commit()
user = db.session.get(User, test_user)
member2 = OrganizationMember(
user_id=test_user, organization_id=org2.id, role=OrganizationRole.MEMBER
)
db.session.add(member2)
db.session.commit()
result = _get_org_ca_for_user(user, ca_type="user")
assert result is not None
def test_user_with_no_memberships_returns_none(self, app):
with app.app_context():
user = User(email="lonely@test.com", full_name="Lonely User")
db.session.add(user)
db.session.commit()
result = _get_org_ca_for_user(user, ca_type="user")
assert result is None
def test_inactive_ca_not_returned(self, app, test_user, test_org, test_membership):
with app.app_context():
ca = CA(
organization_id=test_org,
name="Inactive CA",
ca_type=CaType.USER,
key_type=KeyType.ED25519,
private_key="key",
public_key="pubkey",
fingerprint="sha256:inactive123...",
is_active=False,
)
db.session.add(ca)
db.session.commit()
user = db.session.get(User, test_user)
result = _get_org_ca_for_user(user, ca_type="user")
assert result is None
def test_host_ca_not_returned_when_user_requested(self, app, test_user, test_org, test_membership):
with app.app_context():
ca = CA(
organization_id=test_org,
name="Host CA",
ca_type=CaType.HOST,
key_type=KeyType.ED25519,
private_key="key",
public_key="pubkey",
fingerprint="sha256:host123...",
is_active=True,
)
db.session.add(ca)
db.session.commit()
user = db.session.get(User, test_user)
result = _get_org_ca_for_user(user, ca_type="user")
assert result is None
def test_user_ca_not_returned_when_host_requested(self, app, test_user, test_org, test_membership):
with app.app_context():
ca = CA(
organization_id=test_org,
name="User CA",
ca_type=CaType.USER,
key_type=KeyType.ED25519,
private_key="key",
public_key="pubkey",
fingerprint="sha256:useronly...",
is_active=True,
)
db.session.add(ca)
db.session.commit()
user = db.session.get(User, test_user)
result = _get_org_ca_for_user(user, ca_type="host")
assert result is None
@@ -0,0 +1,180 @@
import pytest
from uuid import uuid4
from datetime import datetime, timezone, timedelta
from gatehouse_app.extensions import db
from gatehouse_app.models.user.user import User
from gatehouse_app.models.ssh_ca.ca import CA, CaType, KeyType, CertType
from gatehouse_app.models.ssh_ca.ssh_key import SSHKey
from gatehouse_app.models.ssh_ca.ssh_certificate import SSHCertificate, CertificateStatus
from gatehouse_app.services.ssh_ca_signing_service import SSHCertificateSigningResponse
from gatehouse_app.api.v1.ssh._helpers import _persist_certificate
class TestPersistCertificate:
def test_persists_valid_certificate(self, app, test_user, test_org):
with app.app_context():
ca = CA(
organization_id=test_org,
name="Signing CA",
ca_type=CaType.USER,
key_type=KeyType.ED25519,
private_key="enc_priv",
public_key="ssh-ed25519 AAAAB3Nza...",
fingerprint="sha256:abc123...",
is_active=True,
)
db.session.add(ca)
db.session.commit()
ssh_key = SSHKey(
user_id=test_user,
payload="ssh-ed25519 AAAAB3NzaC1lZDI1NTE5AAAAIKeyData comment",
fingerprint="sha256:keyfp123...",
)
db.session.add(ssh_key)
db.session.commit()
now = datetime.now(timezone.utc)
later = now + timedelta(hours=24)
response = SSHCertificateSigningResponse(
certificate="ssh-ed25519-cert-v01@openssh.com AAAACertData...",
serial="123456",
valid_after=now,
valid_before=later,
principals=["eng-prod"],
)
result = _persist_certificate(
user_id=test_user,
ssh_key_id=ssh_key.id,
ca=ca,
signing_response=response,
request_ip="10.0.0.1",
cert_type_str="user",
cert_identity="user@example.com",
)
assert result is not None
assert result.ca_id == ca.id
assert result.user_id == test_user
assert result.ssh_key_id == ssh_key.id
assert result.cert_type == CertType.USER
assert result.certificate == response.certificate
assert result.serial == response.serial
assert result.valid_after.replace(tzinfo=None) == now.replace(tzinfo=None)
assert result.valid_before.replace(tzinfo=None) == later.replace(tzinfo=None)
assert result.request_ip == "10.0.0.1"
assert result.key_id == "user@example.com"
assert sorted(result.principals) == ["eng-prod"]
assert result.revoked is False
assert result.status == CertificateStatus.ISSUED
def test_none_ca_returns_none(self, app, test_user):
with app.app_context():
now = datetime.now(timezone.utc)
response = SSHCertificateSigningResponse(
certificate="cert-data",
serial="1",
valid_after=now,
valid_before=now + timedelta(hours=1),
)
result = _persist_certificate(test_user, "keyid", None, response)
assert result is None
def test_invalid_cert_type_str_falls_back_to_user(self, app, test_user, test_org):
with app.app_context():
ca = CA(
organization_id=test_org,
name="Signing CA",
ca_type=CaType.USER,
key_type=KeyType.ED25519,
private_key="enc_priv",
public_key="ssh-ed25519 AAAAB3Nza...",
fingerprint="sha256:fallback123...",
is_active=True,
)
db.session.add(ca)
db.session.commit()
now = datetime.now(timezone.utc)
response = SSHCertificateSigningResponse(
certificate="cert-data",
serial="1",
valid_after=now,
valid_before=now + timedelta(hours=1),
)
result = _persist_certificate(
user_id=test_user,
ssh_key_id=None,
ca=ca,
signing_response=response,
cert_type_str="invalid_type",
)
assert result is not None
assert result.cert_type == CertType.USER
def test_none_ssh_key_id_defaults_to_host_cert_key_id(self, app, test_user, test_org):
with app.app_context():
ca = CA(
organization_id=test_org,
name="Host CA",
ca_type=CaType.HOST,
key_type=KeyType.ED25519,
private_key="enc_priv",
public_key="ssh-ed25519 AAAAB3Nza...",
fingerprint="sha256:hostca123...",
is_active=True,
)
db.session.add(ca)
db.session.commit()
now = datetime.now(timezone.utc)
response = SSHCertificateSigningResponse(
certificate="cert-data",
serial="1",
valid_after=now,
valid_before=now + timedelta(hours=1),
)
result = _persist_certificate(
user_id=test_user,
ssh_key_id=None,
ca=ca,
signing_response=response,
)
assert result is not None
assert result.key_id == "host-cert"
def test_request_ip_stored(self, app, test_user, test_org):
with app.app_context():
ca = CA(
organization_id=test_org,
name="Signing CA",
ca_type=CaType.USER,
key_type=KeyType.ED25519,
private_key="enc_priv",
public_key="ssh-ed25519 AAAAB3Nza...",
fingerprint="sha256:ip...",
is_active=True,
)
db.session.add(ca)
db.session.commit()
now = datetime.now(timezone.utc)
response = SSHCertificateSigningResponse(
certificate="cert-data",
serial="1",
valid_after=now,
valid_before=now + timedelta(hours=1),
)
result = _persist_certificate(
user_id=test_user,
ssh_key_id=None,
ca=ca,
signing_response=response,
request_ip="192.168.1.100",
)
assert result is not None
assert result.request_ip == "192.168.1.100"
+78
View File
@@ -0,0 +1,78 @@
import pytest
from datetime import datetime, timezone
from gatehouse_app.extensions import db
from gatehouse_app.models.user.user import User
from gatehouse_app.models.ssh_ca.ssh_key import SSHKey
from gatehouse_app.services.ssh_key_service import SSHKeyService
from gatehouse_app.exceptions import UserNotFoundError, SSHKeyError
VALID_PUBLIC_KEY = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIKL7YKAg+CgUvk/oNmU1fO24fLf5O5LayGH/EgORbz06"
class TestSSHKeyServiceAdd:
def test_add_new_key_returns_true(self, app, test_user):
with app.app_context():
service = SSHKeyService()
key, is_new = service.add_ssh_key(test_user, VALID_PUBLIC_KEY, "My laptop")
assert is_new is True
assert key.user_id == test_user
assert key.payload == VALID_PUBLIC_KEY
assert key.description == "My laptop"
assert key.verified is False
assert key.fingerprint is not None
assert key.key_type is not None
def test_add_duplicate_returns_existing(self, app, test_user):
with app.app_context():
service = SSHKeyService()
key1, _ = service.add_ssh_key(test_user, VALID_PUBLIC_KEY)
key2, is_new = service.add_ssh_key(test_user, VALID_PUBLIC_KEY)
assert is_new is False
assert key2.id == key1.id
def test_add_restores_soft_deleted_key(self, app, test_user):
with app.app_context():
service = SSHKeyService()
key1, _ = service.add_ssh_key(test_user, VALID_PUBLIC_KEY, "Original")
# Soft-delete the key
key1.deleted_at = datetime.now(timezone.utc)
db.session.commit()
# Re-add same key
key2, is_new = service.add_ssh_key(test_user, VALID_PUBLIC_KEY, "Restored")
assert is_new is False
assert key2.id == key1.id
assert key2.deleted_at is None
assert key2.description == "Restored"
assert key2.verified is False
assert key2.verified_at is None
def test_add_with_description(self, app, test_user):
with app.app_context():
service = SSHKeyService()
key, is_new = service.add_ssh_key(test_user, VALID_PUBLIC_KEY, "Work laptop")
assert is_new is True
assert key.description == "Work laptop"
def test_user_not_found_raises(self, app):
with app.app_context():
service = SSHKeyService()
with pytest.raises(UserNotFoundError):
service.add_ssh_key("nonexistent-user-id", VALID_PUBLIC_KEY)
def test_invalid_key_format_raises(self, app, test_user):
with app.app_context():
service = SSHKeyService()
with pytest.raises(SSHKeyError):
service.add_ssh_key(test_user, "not-a-valid-key")
def test_idempotent_second_call_no_error(self, app, test_user):
with app.app_context():
service = SSHKeyService()
service.add_ssh_key(test_user, VALID_PUBLIC_KEY)
key2, is_new = service.add_ssh_key(test_user, VALID_PUBLIC_KEY)
assert is_new is False
assert key2 is not None