From de6f39e7e3bab324f92a566f78eca57ef63f4441 Mon Sep 17 00:00:00 2001 From: Cory Hawkvelt Date: Sat, 25 Apr 2026 06:22:08 +0930 Subject: [PATCH] feat(ssh): change SSH key uniqueness to per-user scope Previously, SSH key fingerprints were globally unique across all users, preventing the same key from being registered by different users. This change makes fingerprint uniqueness scoped to individual users. - Remove global unique constraints on payload and fingerprint columns - Add composite unique constraint on (user_id, fingerprint) - Make add_ssh_key operation idempotent for same user - Return tuple (SSHKey, is_new) from service to indicate creation status - Update API to return 200 for existing keys, 201 for new keys BREAKING CHANGE: API behavior changed - duplicate key addition now returns 200 OK instead of 409 Conflict. Service method signature changed from returning SSHKey to tuple[SSHKey, bool]. --- gatehouse_app/api/v1/ssh/keys.py | 11 ++-- gatehouse_app/models/ssh_ca/ssh_key.py | 5 +- gatehouse_app/services/ssh_key_service.py | 48 +++++++------- ...b2c3d4e5f6_per_user_ssh_key_fingerprint.py | 44 +++++++++++++ tests/integration/test_ssh_workflows.py | 62 ++++++++++++++++--- 5 files changed, 132 insertions(+), 38 deletions(-) create mode 100644 migrations/versions/a1b2c3d4e5f6_per_user_ssh_key_fingerprint.py diff --git a/gatehouse_app/api/v1/ssh/keys.py b/gatehouse_app/api/v1/ssh/keys.py index e074586..e028130 100644 --- a/gatehouse_app/api/v1/ssh/keys.py +++ b/gatehouse_app/api/v1/ssh/keys.py @@ -32,11 +32,12 @@ def add_ssh_key(): return api_response(success=False, message='public_key is required', status=400, error_type='BAD_REQUEST') try: - ssh_key = ssh_key_service.add_ssh_key(user_id=user_id, public_key=public_key, description=description) - AuditLog.log(action=AuditAction.SSH_KEY_ADDED, user_id=user_id, resource_type='SSHKey', resource_id=ssh_key.id, ip_address=request.remote_addr) - return api_response(success=True, message='SSH key added', data=ssh_key.to_dict(), status=201) - except SSHKeyAlreadyExistsError as e: - return api_response(success=False, message=e.message, status=409, error_type='SSH_KEY_ALREADY_EXISTS') + ssh_key, is_new = ssh_key_service.add_ssh_key(user_id=user_id, public_key=public_key, description=description) + if is_new: + AuditLog.log(action=AuditAction.SSH_KEY_ADDED, user_id=user_id, resource_type='SSHKey', resource_id=ssh_key.id, ip_address=request.remote_addr) + return api_response(success=True, message='SSH key added', data=ssh_key.to_dict(), status=201) + else: + return api_response(success=True, message='SSH key already exists', data=ssh_key.to_dict(), status=200) except IntegrityError: return api_response(success=False, message='SSH key already exists', status=409, error_type='SSH_KEY_ALREADY_EXISTS') except SSHKeyError as e: diff --git a/gatehouse_app/models/ssh_ca/ssh_key.py b/gatehouse_app/models/ssh_ca/ssh_key.py index 218fd99..99cb8dd 100644 --- a/gatehouse_app/models/ssh_ca/ssh_key.py +++ b/gatehouse_app/models/ssh_ca/ssh_key.py @@ -21,10 +21,10 @@ class SSHKey(BaseModel): ) # SSH key payload in OpenSSH format (e.g., "ssh-ed25519 AAAAB3Nz...") - payload = db.Column(db.Text, nullable=False, unique=True) + payload = db.Column(db.Text, nullable=False) # SHA256 fingerprint for quick comparison and deduplication - fingerprint = db.Column(db.String(255), nullable=False, unique=True, index=True) + fingerprint = db.Column(db.String(255), nullable=False, index=True) # Optional human-readable description (e.g., "My laptop key") description = db.Column(db.String(255), nullable=True) @@ -53,6 +53,7 @@ class SSHKey(BaseModel): __table_args__ = ( db.Index("idx_ssh_key_user_verified", "user_id", "verified"), + db.UniqueConstraint('user_id', 'fingerprint', name='uix_user_fingerprint'), ) def __repr__(self): diff --git a/gatehouse_app/services/ssh_key_service.py b/gatehouse_app/services/ssh_key_service.py index 4da133b..2c66792 100644 --- a/gatehouse_app/services/ssh_key_service.py +++ b/gatehouse_app/services/ssh_key_service.py @@ -41,46 +41,47 @@ class SSHKeyService: user_id: str, public_key: str, description: Optional[str] = None, - ) -> SSHKey: + ) -> tuple[SSHKey, bool]: """Add an SSH public key for a user. - + Args: user_id: ID of the user public_key: SSH public key in OpenSSH format description: Optional description of the key - + Returns: - Created SSHKey instance - + Tuple of (SSHKey instance, is_new) where is_new is True for + newly created keys, False for existing keys (idempotent). + Raises: UserNotFoundError: If user doesn't exist SSHKeyError: If key format is invalid - SSHKeyAlreadyExistsError: If key already exists """ # Verify user exists user = User.query.get(user_id) if not user: raise UserNotFoundError(f"User {user_id} not found") - + # Validate key format if not verify_ssh_key_format(public_key): raise SSHKeyError("Invalid SSH public key format") - + # Compute fingerprint try: fingerprint = compute_ssh_fingerprint(public_key) except Exception as e: logger.error(f"Failed to compute fingerprint: {str(e)}") raise SSHKeyError(f"Failed to compute key fingerprint: {str(e)}") - - # Check for duplicate (including soft-deleted records — fingerprint is unique in DB) - existing = SSHKey.query.filter_by(fingerprint=fingerprint).first() + + # Check for duplicate per user (including soft-deleted records) + existing = SSHKey.query.filter_by( + user_id=user_id, fingerprint=fingerprint + ).first() if existing: if existing.deleted_at is not None: # Restore the soft-deleted key: clear deleted_at and update fields existing.deleted_at = None - existing.user_id = user_id - existing.description = description or existing.description + existing.description = description if description is not None else existing.description existing.verified = False existing.verified_at = None existing.verify_text = None @@ -90,15 +91,18 @@ class SSHKeyService: f"Restored soft-deleted SSH key for user {user_id}: " f"fingerprint={fingerprint}" ) - return existing - raise SSHKeyAlreadyExistsError( - f"SSH key with fingerprint {fingerprint} already exists" + return existing, False + # Idempotent: return existing key without error + logger.info( + f"SSH key already exists for user {user_id}: " + f"fingerprint={fingerprint}" ) - + return existing, False + # Extract metadata key_type = extract_ssh_key_type(public_key) key_comment = extract_ssh_key_comment(public_key) - + # Create SSH key record ssh_key = SSHKey( user_id=user_id, @@ -109,15 +113,15 @@ class SSHKeyService: key_comment=key_comment, verified=False, ) - + ssh_key.save() - + logger.info( f"SSH key added for user {user_id}: " f"fingerprint={fingerprint}, type={key_type}" ) - - return ssh_key + + return ssh_key, True def get_ssh_key(self, key_id: str) -> SSHKey: """Get an SSH key by ID. diff --git a/migrations/versions/a1b2c3d4e5f6_per_user_ssh_key_fingerprint.py b/migrations/versions/a1b2c3d4e5f6_per_user_ssh_key_fingerprint.py new file mode 100644 index 0000000..5caa0f0 --- /dev/null +++ b/migrations/versions/a1b2c3d4e5f6_per_user_ssh_key_fingerprint.py @@ -0,0 +1,44 @@ +"""Per-user SSH key fingerprint uniqueness. + +Revision ID: a1b2c3d4e5f6 +Revises: 8f2d9e4a7c1b +Create Date: 2026-04-24 10:00:00.000000 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'a1b2c3d4e5f6' +down_revision = '8f2d9e4a7c1b' +branch_labels = None +depends_on = None + + +def upgrade(): + # Drop the global unique constraint on payload + op.drop_constraint('ssh_keys_payload_key', 'ssh_keys', type_='unique') + + # Drop the global unique index on fingerprint + op.drop_index('ix_ssh_keys_fingerprint', table_name='ssh_keys') + + # Create a non-unique index on fingerprint for query performance + op.create_index(op.f('ix_ssh_keys_fingerprint'), 'ssh_keys', ['fingerprint'], unique=False) + + # Add composite unique constraint for per-user fingerprint uniqueness + op.create_unique_constraint('uix_user_fingerprint', 'ssh_keys', ['user_id', 'fingerprint']) + + +def downgrade(): + # Drop the composite unique constraint + op.drop_constraint('uix_user_fingerprint', 'ssh_keys', type_='unique') + + # Drop the non-unique index + op.drop_index(op.f('ix_ssh_keys_fingerprint'), table_name='ssh_keys') + + # Recreate the global unique index + op.create_index('ix_ssh_keys_fingerprint', 'ssh_keys', ['fingerprint'], unique=True) + + # Recreate the global unique constraint on payload + op.create_unique_constraint('ssh_keys_payload_key', 'ssh_keys', ['payload']) \ No newline at end of file diff --git a/tests/integration/test_ssh_workflows.py b/tests/integration/test_ssh_workflows.py index b709048..acddb98 100644 --- a/tests/integration/test_ssh_workflows.py +++ b/tests/integration/test_ssh_workflows.py @@ -106,23 +106,67 @@ class TestSSHKeyManagement: assert_error(exc_info.value, 400) - def test_add_duplicate_key_negative(self, integration_client, create_test_user): - """TEST: SSH-KEY-03 — Reject duplicate SSH key. + def test_add_duplicate_key_idempotent_positive(self, integration_client, create_test_user): + """TEST: SSH-KEY-03 — Add duplicate SSH key is idempotent for same user. WHAT: User adds TEST_PUBLIC_KEY, then tries to add it again. - WHY: Fingerprints must be unique per database to avoid - ambiguity in key-to-user mappings. - EXPECTED: 409 Conflict with error_type SSH_KEY_ALREADY_EXISTS. + WHY: Fingerprints are unique per user, not globally. Adding the + same key twice by the same user should succeed both times. + EXPECTED: Both calls succeed (201 then 200). Both return same key id. """ user = create_test_user(password="MyPassword123!") integration_client.auth.login(email=user["email"], password="MyPassword123!") - integration_client.ssh.add_key(TEST_PUBLIC_KEY, "First") + # First add should succeed with 201 + result1 = integration_client.ssh.add_key(TEST_PUBLIC_KEY, "First") + data1 = assert_success(result1, "added") + assert result1.get("code") == 201, f"Expected status 201 but got {result1.get('code')}" + + # Second add should succeed with 200 (idempotent) + result2 = integration_client.ssh.add_key(TEST_PUBLIC_KEY, "Duplicate") + data2 = assert_success(result2, "exists") + assert result2.get("code") == 200, f"Expected status 200 but got {result2.get('code')}" + + # Both calls should return the same key id + assert data1.get("id") == data2.get("id"), "Key IDs should match for idempotent operation" - with pytest.raises(ApiError) as exc_info: - integration_client.ssh.add_key(TEST_PUBLIC_KEY, "Duplicate") + def test_add_same_key_different_user_positive(self, integration_client, create_test_user): + """TEST: SSH-KEY-03b — Same key can be added by different users. - assert_error(exc_info.value, 409, "SSH_KEY_ALREADY_EXISTS") + WHAT: User A adds TEST_PUBLIC_KEY. User B adds the SAME key. + WHY: Fingerprint uniqueness is per-user, not global. + EXPECTED: Both calls succeed (201). Each user sees the key in their list. + """ + # Create and login as user A + user_a = create_test_user(password="PassA123!") + integration_client.auth.login(email=user_a["email"], password="PassA123!") + + # User A adds the key + result_a = integration_client.ssh.add_key(TEST_PUBLIC_KEY, "User A Key") + assert_success(result_a, "added") + assert result_a.get("code") == 201, f"Expected status 201 but got {result_a.get('code')}" + + # Create and login as user B + user_b = create_test_user(password="PassB123!") + integration_client.auth.logout() + integration_client.auth.login(email=user_b["email"], password="PassB123!") + + # User B adds the same key + result_b = integration_client.ssh.add_key(TEST_PUBLIC_KEY, "User B Key") + assert_success(result_b, "added") + assert result_b.get("code") == 201, f"Expected status 201 but got {result_b.get('code')}" + + # Verify user B sees exactly one key in their list + list_result_b = integration_client.ssh.list_keys() + list_data_b = assert_success(list_result_b) + assert list_data_b.get("count") == 1, f"User B should see 1 key but saw {list_data_b.get('count')}" + + # Log back in as user A and verify they still see exactly one key + integration_client.auth.logout() + integration_client.auth.login(email=user_a["email"], password="PassA123!") + list_result_a = integration_client.ssh.list_keys() + list_data_a = assert_success(list_result_a) + assert list_data_a.get("count") == 1, f"User A should see 1 key but saw {list_data_a.get('count')}" def test_add_key_without_auth_negative(self, integration_client): """TEST: SSH-KEY-04 — Reject key upload without authentication.