diff --git a/gatehouse_app/api/v1/ssh/keys.py b/gatehouse_app/api/v1/ssh/keys.py index e074586..e028130 100644 --- a/gatehouse_app/api/v1/ssh/keys.py +++ b/gatehouse_app/api/v1/ssh/keys.py @@ -32,11 +32,12 @@ def add_ssh_key(): return api_response(success=False, message='public_key is required', status=400, error_type='BAD_REQUEST') try: - ssh_key = ssh_key_service.add_ssh_key(user_id=user_id, public_key=public_key, description=description) - AuditLog.log(action=AuditAction.SSH_KEY_ADDED, user_id=user_id, resource_type='SSHKey', resource_id=ssh_key.id, ip_address=request.remote_addr) - return api_response(success=True, message='SSH key added', data=ssh_key.to_dict(), status=201) - except SSHKeyAlreadyExistsError as e: - return api_response(success=False, message=e.message, status=409, error_type='SSH_KEY_ALREADY_EXISTS') + ssh_key, is_new = ssh_key_service.add_ssh_key(user_id=user_id, public_key=public_key, description=description) + if is_new: + AuditLog.log(action=AuditAction.SSH_KEY_ADDED, user_id=user_id, resource_type='SSHKey', resource_id=ssh_key.id, ip_address=request.remote_addr) + return api_response(success=True, message='SSH key added', data=ssh_key.to_dict(), status=201) + else: + return api_response(success=True, message='SSH key already exists', data=ssh_key.to_dict(), status=200) except IntegrityError: return api_response(success=False, message='SSH key already exists', status=409, error_type='SSH_KEY_ALREADY_EXISTS') except SSHKeyError as e: diff --git a/gatehouse_app/models/ssh_ca/ssh_key.py b/gatehouse_app/models/ssh_ca/ssh_key.py index 218fd99..99cb8dd 100644 --- a/gatehouse_app/models/ssh_ca/ssh_key.py +++ b/gatehouse_app/models/ssh_ca/ssh_key.py @@ -21,10 +21,10 @@ class SSHKey(BaseModel): ) # SSH key payload in OpenSSH format (e.g., "ssh-ed25519 AAAAB3Nz...") - payload = db.Column(db.Text, nullable=False, unique=True) + payload = db.Column(db.Text, nullable=False) # SHA256 fingerprint for quick comparison and deduplication - fingerprint = db.Column(db.String(255), nullable=False, unique=True, index=True) + fingerprint = db.Column(db.String(255), nullable=False, index=True) # Optional human-readable description (e.g., "My laptop key") description = db.Column(db.String(255), nullable=True) @@ -53,6 +53,7 @@ class SSHKey(BaseModel): __table_args__ = ( db.Index("idx_ssh_key_user_verified", "user_id", "verified"), + db.UniqueConstraint('user_id', 'fingerprint', name='uix_user_fingerprint'), ) def __repr__(self): diff --git a/gatehouse_app/services/ssh_key_service.py b/gatehouse_app/services/ssh_key_service.py index 4da133b..2c66792 100644 --- a/gatehouse_app/services/ssh_key_service.py +++ b/gatehouse_app/services/ssh_key_service.py @@ -41,46 +41,47 @@ class SSHKeyService: user_id: str, public_key: str, description: Optional[str] = None, - ) -> SSHKey: + ) -> tuple[SSHKey, bool]: """Add an SSH public key for a user. - + Args: user_id: ID of the user public_key: SSH public key in OpenSSH format description: Optional description of the key - + Returns: - Created SSHKey instance - + Tuple of (SSHKey instance, is_new) where is_new is True for + newly created keys, False for existing keys (idempotent). + Raises: UserNotFoundError: If user doesn't exist SSHKeyError: If key format is invalid - SSHKeyAlreadyExistsError: If key already exists """ # Verify user exists user = User.query.get(user_id) if not user: raise UserNotFoundError(f"User {user_id} not found") - + # Validate key format if not verify_ssh_key_format(public_key): raise SSHKeyError("Invalid SSH public key format") - + # Compute fingerprint try: fingerprint = compute_ssh_fingerprint(public_key) except Exception as e: logger.error(f"Failed to compute fingerprint: {str(e)}") raise SSHKeyError(f"Failed to compute key fingerprint: {str(e)}") - - # Check for duplicate (including soft-deleted records — fingerprint is unique in DB) - existing = SSHKey.query.filter_by(fingerprint=fingerprint).first() + + # Check for duplicate per user (including soft-deleted records) + existing = SSHKey.query.filter_by( + user_id=user_id, fingerprint=fingerprint + ).first() if existing: if existing.deleted_at is not None: # Restore the soft-deleted key: clear deleted_at and update fields existing.deleted_at = None - existing.user_id = user_id - existing.description = description or existing.description + existing.description = description if description is not None else existing.description existing.verified = False existing.verified_at = None existing.verify_text = None @@ -90,15 +91,18 @@ class SSHKeyService: f"Restored soft-deleted SSH key for user {user_id}: " f"fingerprint={fingerprint}" ) - return existing - raise SSHKeyAlreadyExistsError( - f"SSH key with fingerprint {fingerprint} already exists" + return existing, False + # Idempotent: return existing key without error + logger.info( + f"SSH key already exists for user {user_id}: " + f"fingerprint={fingerprint}" ) - + return existing, False + # Extract metadata key_type = extract_ssh_key_type(public_key) key_comment = extract_ssh_key_comment(public_key) - + # Create SSH key record ssh_key = SSHKey( user_id=user_id, @@ -109,15 +113,15 @@ class SSHKeyService: key_comment=key_comment, verified=False, ) - + ssh_key.save() - + logger.info( f"SSH key added for user {user_id}: " f"fingerprint={fingerprint}, type={key_type}" ) - - return ssh_key + + return ssh_key, True def get_ssh_key(self, key_id: str) -> SSHKey: """Get an SSH key by ID. diff --git a/migrations/versions/a1b2c3d4e5f6_per_user_ssh_key_fingerprint.py b/migrations/versions/a1b2c3d4e5f6_per_user_ssh_key_fingerprint.py new file mode 100644 index 0000000..5caa0f0 --- /dev/null +++ b/migrations/versions/a1b2c3d4e5f6_per_user_ssh_key_fingerprint.py @@ -0,0 +1,44 @@ +"""Per-user SSH key fingerprint uniqueness. + +Revision ID: a1b2c3d4e5f6 +Revises: 8f2d9e4a7c1b +Create Date: 2026-04-24 10:00:00.000000 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'a1b2c3d4e5f6' +down_revision = '8f2d9e4a7c1b' +branch_labels = None +depends_on = None + + +def upgrade(): + # Drop the global unique constraint on payload + op.drop_constraint('ssh_keys_payload_key', 'ssh_keys', type_='unique') + + # Drop the global unique index on fingerprint + op.drop_index('ix_ssh_keys_fingerprint', table_name='ssh_keys') + + # Create a non-unique index on fingerprint for query performance + op.create_index(op.f('ix_ssh_keys_fingerprint'), 'ssh_keys', ['fingerprint'], unique=False) + + # Add composite unique constraint for per-user fingerprint uniqueness + op.create_unique_constraint('uix_user_fingerprint', 'ssh_keys', ['user_id', 'fingerprint']) + + +def downgrade(): + # Drop the composite unique constraint + op.drop_constraint('uix_user_fingerprint', 'ssh_keys', type_='unique') + + # Drop the non-unique index + op.drop_index(op.f('ix_ssh_keys_fingerprint'), table_name='ssh_keys') + + # Recreate the global unique index + op.create_index('ix_ssh_keys_fingerprint', 'ssh_keys', ['fingerprint'], unique=True) + + # Recreate the global unique constraint on payload + op.create_unique_constraint('ssh_keys_payload_key', 'ssh_keys', ['payload']) \ No newline at end of file diff --git a/tests/integration/test_ssh_workflows.py b/tests/integration/test_ssh_workflows.py index b709048..acddb98 100644 --- a/tests/integration/test_ssh_workflows.py +++ b/tests/integration/test_ssh_workflows.py @@ -106,23 +106,67 @@ class TestSSHKeyManagement: assert_error(exc_info.value, 400) - def test_add_duplicate_key_negative(self, integration_client, create_test_user): - """TEST: SSH-KEY-03 — Reject duplicate SSH key. + def test_add_duplicate_key_idempotent_positive(self, integration_client, create_test_user): + """TEST: SSH-KEY-03 — Add duplicate SSH key is idempotent for same user. WHAT: User adds TEST_PUBLIC_KEY, then tries to add it again. - WHY: Fingerprints must be unique per database to avoid - ambiguity in key-to-user mappings. - EXPECTED: 409 Conflict with error_type SSH_KEY_ALREADY_EXISTS. + WHY: Fingerprints are unique per user, not globally. Adding the + same key twice by the same user should succeed both times. + EXPECTED: Both calls succeed (201 then 200). Both return same key id. """ user = create_test_user(password="MyPassword123!") integration_client.auth.login(email=user["email"], password="MyPassword123!") - integration_client.ssh.add_key(TEST_PUBLIC_KEY, "First") + # First add should succeed with 201 + result1 = integration_client.ssh.add_key(TEST_PUBLIC_KEY, "First") + data1 = assert_success(result1, "added") + assert result1.get("code") == 201, f"Expected status 201 but got {result1.get('code')}" + + # Second add should succeed with 200 (idempotent) + result2 = integration_client.ssh.add_key(TEST_PUBLIC_KEY, "Duplicate") + data2 = assert_success(result2, "exists") + assert result2.get("code") == 200, f"Expected status 200 but got {result2.get('code')}" + + # Both calls should return the same key id + assert data1.get("id") == data2.get("id"), "Key IDs should match for idempotent operation" - with pytest.raises(ApiError) as exc_info: - integration_client.ssh.add_key(TEST_PUBLIC_KEY, "Duplicate") + def test_add_same_key_different_user_positive(self, integration_client, create_test_user): + """TEST: SSH-KEY-03b — Same key can be added by different users. - assert_error(exc_info.value, 409, "SSH_KEY_ALREADY_EXISTS") + WHAT: User A adds TEST_PUBLIC_KEY. User B adds the SAME key. + WHY: Fingerprint uniqueness is per-user, not global. + EXPECTED: Both calls succeed (201). Each user sees the key in their list. + """ + # Create and login as user A + user_a = create_test_user(password="PassA123!") + integration_client.auth.login(email=user_a["email"], password="PassA123!") + + # User A adds the key + result_a = integration_client.ssh.add_key(TEST_PUBLIC_KEY, "User A Key") + assert_success(result_a, "added") + assert result_a.get("code") == 201, f"Expected status 201 but got {result_a.get('code')}" + + # Create and login as user B + user_b = create_test_user(password="PassB123!") + integration_client.auth.logout() + integration_client.auth.login(email=user_b["email"], password="PassB123!") + + # User B adds the same key + result_b = integration_client.ssh.add_key(TEST_PUBLIC_KEY, "User B Key") + assert_success(result_b, "added") + assert result_b.get("code") == 201, f"Expected status 201 but got {result_b.get('code')}" + + # Verify user B sees exactly one key in their list + list_result_b = integration_client.ssh.list_keys() + list_data_b = assert_success(list_result_b) + assert list_data_b.get("count") == 1, f"User B should see 1 key but saw {list_data_b.get('count')}" + + # Log back in as user A and verify they still see exactly one key + integration_client.auth.logout() + integration_client.auth.login(email=user_a["email"], password="PassA123!") + list_result_a = integration_client.ssh.list_keys() + list_data_a = assert_success(list_result_a) + assert list_data_a.get("count") == 1, f"User A should see 1 key but saw {list_data_a.get('count')}" def test_add_key_without_auth_negative(self, integration_client): """TEST: SSH-KEY-04 — Reject key upload without authentication.