diff --git a/gatehouse_app/api/v1/organizations.py b/gatehouse_app/api/v1/organizations.py index 69739bc..9f9db4c 100644 --- a/gatehouse_app/api/v1/organizations.py +++ b/gatehouse_app/api/v1/organizations.py @@ -14,6 +14,73 @@ from gatehouse_app.services.organization_service import OrganizationService from gatehouse_app.services.user_service import UserService from gatehouse_app.utils.constants import OrganizationRole from gatehouse_app.extensions import db + + + +def _get_system_ca_dict(): + """Return a synthetic read-only CA dict for the config-file CA, or None. + + This is injected into the org CA list when no DB CA exists for a given + ca_type so that the admin UI correctly shows "configured" rather than + "Not configured" when a system-level CA key is present. + + The returned dict has ``is_system=True`` so the frontend can render it + as read-only (no delete / edit / generate buttons). + """ + import os + try: + from gatehouse_app.config.ssh_ca_config import get_ssh_ca_config + from gatehouse_app.utils.crypto import compute_ssh_fingerprint + + # Check env var first (takes priority over file path) + priv_key = os.environ.get("SSH_CA_PRIVATE_KEY", "").strip() + pub_key = "" + + if not priv_key: + cfg = get_ssh_ca_config() + key_path = cfg.get_str("ca_key_path", "").strip() + if not key_path: + return None + pub_path = key_path + ".pub" + if not os.path.exists(pub_path): + return None + with open(pub_path) as f: + pub_key = f.read().strip() + else: + # Derive the public key from the private key + from sshkey_tools.keys import PrivateKey + pk = PrivateKey.from_string(priv_key) + pub_key = pk.public_key.to_string() + + fingerprint = compute_ssh_fingerprint(pub_key) + return { + "id": f"system-ca-{fingerprint[:16]}", + "organization_id": None, + "name": "System CA (config file)", + "description": ( + "Read-only — this CA is loaded from the server's SSH_CA_PRIVATE_KEY " + "environment variable or etc/ssh_ca.conf. Manage it on the server." + ), + # ca_type is set by the caller + "ca_type": "user", + "key_type": "unknown", + "public_key": pub_key, + "fingerprint": fingerprint, + "is_active": True, + "is_system": True, + "default_cert_validity_hours": 0, + "max_cert_validity_hours": 0, + "total_certs": 0, + "active_certs": 0, + "revoked_certs": 0, + "created_at": None, + "updated_at": None, + } + except Exception: + return None + + + @api_v1_bp.route("/organizations", methods=["POST"]) @login_required @full_access_required @@ -1167,12 +1234,17 @@ def remove_role_from_member(org_id, role_name, user_id): def list_org_cas(org_id): """List all Certificate Authorities for an organization. + If the system config-file CA is configured (via SSH_CA_PRIVATE_KEY env var + or ca_key_path in etc/ssh_ca.conf) and no DB CA exists for a given ca_type, + a synthetic read-only entry is injected so the UI correctly shows the + system CA as configured rather than "Not configured". + Returns: 200: List of CAs (private_key excluded) 403: Not admin/owner 404: Org not found """ - from gatehouse_app.models.ssh_ca.ca import CA + from gatehouse_app.models.ssh_ca.ca import CA, CaType from gatehouse_app.models.organization.organization import Organization org = Organization.query.filter_by(id=org_id, deleted_at=None).first() @@ -1180,8 +1252,22 @@ def list_org_cas(org_id): return api_response(success=False, message="Organization not found", status=404, error_type="NOT_FOUND") cas = CA.query.filter_by(organization_id=org_id, deleted_at=None).all() + ca_list = [ca.to_dict() for ca in cas] + + # Determine which ca_types are already covered by a DB CA + covered_types = {ca.ca_type for ca in cas} + + # Check whether a system config-file CA is available + system_ca_dict = _get_system_ca_dict() + if system_ca_dict: + # Inject a synthetic entry for each ca_type NOT covered by a real DB CA. + # The system CA only signs user certs (cert_type="user"), so we only + # inject it for the user slot. Host signing always needs a DB CA. + if CaType.USER not in covered_types: + ca_list.append({**system_ca_dict, "ca_type": "user"}) + return api_response( - data={"cas": [ca.to_dict() for ca in cas], "count": len(cas)}, + data={"cas": ca_list, "count": len(ca_list)}, message="CAs retrieved", ) @@ -1403,3 +1489,163 @@ def create_org_ca(org_id): ) +@api_v1_bp.route("/organizations//cas/", methods=["DELETE"]) +@login_required +@require_admin +def delete_org_ca(org_id, ca_id): + """Soft-delete a Certificate Authority. + + Deactivates the CA so no new certificates can be signed with it. + Existing certificates remain valid until they expire. + + Returns: + 200: CA deleted successfully + 403: Not admin/owner + 404: Org or CA not found + """ + from gatehouse_app.models.ssh_ca.ca import CA + from gatehouse_app.models.organization.organization import Organization + from gatehouse_app.utils.constants import AuditAction + from gatehouse_app.models import AuditLog + + org = Organization.query.filter_by(id=org_id, deleted_at=None).first() + if not org: + return api_response(success=False, message="Organization not found", status=404, error_type="NOT_FOUND") + + ca = CA.query.filter_by(id=ca_id, organization_id=org_id, deleted_at=None).first() + if not ca: + return api_response(success=False, message="CA not found", status=404, error_type="NOT_FOUND") + + try: + ca_name = ca.name + ca_type = ca.ca_type.value if hasattr(ca.ca_type, "value") else str(ca.ca_type) + ca.is_active = False + ca.delete(soft=True) + + AuditLog.log( + action=AuditAction.CA_DELETED, + user_id=g.current_user.id, + resource_type="CA", + resource_id=ca_id, + organization_id=org_id, + ip_address=request.remote_addr, + description=f"CA '{ca_name}' ({ca_type}) deleted", + ) + + return api_response( + data={"ca_id": ca_id}, + message="CA deleted successfully", + ) + except Exception as e: + db.session.rollback() + current_app.logger.exception("Failed to delete CA") + return api_response( + success=False, + message="Failed to delete CA", + status=500, + error_type="SERVER_ERROR", + ) + + +@api_v1_bp.route("/organizations//cas//rotate", methods=["POST"]) +@login_required +@require_admin +def rotate_org_ca(org_id, ca_id): + """Rotate (replace) a CA's key pair. + + Generates a new key pair of the same or different type. The old public key + fingerprint is returned so admins can update TrustedUserCAKeys / known_hosts + on their servers. All previously-issued certificates remain valid until they + expire but no new certificates will be signed with the old key. + + Request body (all optional): + key_type: "ed25519" (default keeps current), "rsa", or "ecdsa" + reason: Human-readable reason for the rotation + + Returns: + 200: CA rotated — { ca, old_fingerprint } + 403: Not admin/owner + 404: Org or CA not found + """ + from gatehouse_app.models.ssh_ca.ca import CA, KeyType + from gatehouse_app.models.organization.organization import Organization + from gatehouse_app.utils.crypto import compute_ssh_fingerprint + from gatehouse_app.utils.constants import AuditAction + from gatehouse_app.models import AuditLog + from sshkey_tools.keys import Ed25519PrivateKey, RsaPrivateKey, EcdsaPrivateKey + + org = Organization.query.filter_by(id=org_id, deleted_at=None).first() + if not org: + return api_response(success=False, message="Organization not found", status=404, error_type="NOT_FOUND") + + ca = CA.query.filter_by(id=ca_id, organization_id=org_id, deleted_at=None).first() + if not ca: + return api_response(success=False, message="CA not found", status=404, error_type="NOT_FOUND") + + data = request.get_json() or {} + new_key_type = data.get("key_type") or (ca.key_type.value if hasattr(ca.key_type, "value") else str(ca.key_type)) + reason = data.get("reason", "Admin-initiated key rotation") + + if new_key_type not in ("ed25519", "rsa", "ecdsa"): + return api_response( + success=False, + message="Invalid key_type. Must be one of: ed25519, rsa, ecdsa", + status=400, + error_type="VALIDATION_ERROR", + ) + + try: + old_fingerprint = ca.fingerprint + + # Generate new key pair + if new_key_type == "ed25519": + private_key_obj = Ed25519PrivateKey.generate() + elif new_key_type == "rsa": + private_key_obj = RsaPrivateKey.generate(4096) + else: # ecdsa + private_key_obj = EcdsaPrivateKey.generate() + + new_private_key = private_key_obj.to_string() + new_public_key = private_key_obj.public_key.to_string() + new_fingerprint = compute_ssh_fingerprint(new_public_key) + + ca.rotate_key( + new_private_key=new_private_key, + new_public_key=new_public_key, + new_fingerprint=new_fingerprint, + reason=reason, + ) + ca.key_type = KeyType(new_key_type) + db.session.commit() + + AuditLog.log( + action=AuditAction.CA_KEY_ROTATED, + user_id=g.current_user.id, + resource_type="CA", + resource_id=ca_id, + organization_id=org_id, + ip_address=request.remote_addr, + description=( + f"CA '{ca.name}' key rotated. " + f"Old fingerprint: {old_fingerprint}, New fingerprint: {new_fingerprint}. " + f"Reason: {reason}" + ), + ) + + return api_response( + data={ + "ca": ca.to_dict(), + "old_fingerprint": old_fingerprint, + }, + message="CA key rotated successfully. Update TrustedUserCAKeys / known_hosts on your servers.", + ) + except Exception as e: + db.session.rollback() + current_app.logger.exception("Failed to rotate CA key") + return api_response( + success=False, + message="Failed to rotate CA key", + status=500, + error_type="SERVER_ERROR", + ) + diff --git a/gatehouse_app/api/v1/ssh.py b/gatehouse_app/api/v1/ssh.py index 3f89fef..7491eb1 100644 --- a/gatehouse_app/api/v1/ssh.py +++ b/gatehouse_app/api/v1/ssh.py @@ -27,15 +27,22 @@ ssh_ca_service = SSHCASigningService() # Helpers # --------------------------------------------------------------------------- -def _get_org_ca_for_user(user): - """Return the active DB CA for the user's first org, or None.""" +def _get_org_ca_for_user(user, ca_type: str = "user"): + """Return the active DB CA of the given type for the user's first org, or None. + + Args: + user: The current user object. + ca_type: ``"user"`` (default) or ``"host"`` — selects the CA that signs + the corresponding certificate type. + """ try: - from gatehouse_app.models.ssh_ca.ca import CA + from gatehouse_app.models.ssh_ca.ca import CA, CaType org_ids = [m.organization_id for m in user.organization_memberships] if not org_ids: return None return CA.query.filter( CA.organization_id.in_(org_ids), + CA.ca_type == CaType(ca_type), CA.is_active == True, # noqa: E712 ).first() except Exception: @@ -556,8 +563,18 @@ def sign_certificate(): error_type="KEY_NOT_VERIFIED", ) - db_ca = _get_org_ca_for_user(user) - ca_private_key = db_ca.private_key if db_ca else None + db_ca = _get_org_ca_for_user(user, ca_type=cert_type) + if db_ca is None: + return api_response( + success=False, + message=( + "No active Certificate Authority is configured for your organization. " + "An admin must generate a CA on the Certificate Authorities page before " + "certificates can be issued." + ), + status=503, + error_type="CA_NOT_CONFIGURED", + ) # Determine if the caller is an org admin/owner (admins can always choose expiry) is_org_admin = any( @@ -603,7 +620,7 @@ def sign_certificate(): ) try: - response = ssh_ca_service.sign_certificate(signing_request, ca_private_key=ca_private_key) + response = ssh_ca_service.sign_certificate(signing_request, ca_private_key=db_ca.private_key) except SSHCertificateError as e: AuditLog.log( action=AuditAction.SSH_CERT_FAILED, @@ -625,11 +642,10 @@ def sign_certificate(): ) return api_response(success=False, message="Certificate signing failed", status=500, error_type="SERVER_ERROR") - ca_for_db = db_ca or _get_or_create_system_ca() cert_record = _persist_certificate( user_id=user_id, ssh_key_id=key_id, - ca=ca_for_db, + ca=db_ca, signing_response=response, request_ip=request.remote_addr, cert_type_str=cert_type, @@ -757,7 +773,8 @@ def get_ca_public_key(): directive so that certificates issued by gatehouse are trusted. Query parameters: - format: 'openssh' (default) or 'text' — affects Content-Type only + ca_type: 'user' (default) or 'host' — which CA's public key to return + format: 'openssh' (default) or 'text' — affects Content-Type only Returns: { "public_key": "ssh-ed25519 AAAA...", @@ -765,52 +782,246 @@ def get_ca_public_key(): "ca_name": "..." } """ user = g.current_user + ca_type = request.args.get("ca_type", "user") + if ca_type not in ("user", "host"): + return api_response( + success=False, + message="ca_type must be 'user' or 'host'", + status=400, + error_type="BAD_REQUEST", + ) - # Try org CA first - db_ca = _get_org_ca_for_user(user) + db_ca = _get_org_ca_for_user(user, ca_type=ca_type) if db_ca: return api_response( data={ 'public_key': db_ca.public_key, 'fingerprint': db_ca.fingerprint, 'ca_name': db_ca.name, + 'ca_type': ca_type, 'source': 'db', }, message="CA public key retrieved successfully" ) - # Fall back to config-file CA - try: - from gatehouse_app.config.ssh_ca_config import get_ssh_ca_config - import os - cfg = get_ssh_ca_config() - key_path = cfg.get_str('ca_key_path', '').strip() + '.pub' - if os.path.exists(key_path): - with open(key_path) as f: - pub_key = f.read().strip() - from gatehouse_app.utils.crypto import compute_ssh_fingerprint - return api_response( - data={ - 'public_key': pub_key, - 'fingerprint': compute_ssh_fingerprint(pub_key), - 'ca_name': 'system-config-ca', - 'source': 'config', - }, - message="CA public key retrieved successfully" - ) - except Exception as e: - return api_response( - success=False, - message=f'Could not load CA public key: {e}', - status=500, - error_type='INTERNAL_ERROR' - ) - return api_response( success=False, - message='No CA configured for this organization', + message=( + f"No {ca_type} CA is configured for your organization. " + "An admin must generate one on the Certificate Authorities page." + ), status=404, - error_type='NOT_FOUND' + error_type="CA_NOT_CONFIGURED", ) +# --------------------------------------------------------------------------- +# CA Permissions +# --------------------------------------------------------------------------- + +@ssh_bp.route('/ca//permissions', methods=['GET']) +@login_required +def list_ca_permissions(ca_id): + """List permissions for a Certificate Authority. + + Returns: + 200: { ca_id, permissions: [...], open_to_all: bool } + 403: Not admin/owner + 404: CA not found + """ + from gatehouse_app.models.ssh_ca.ca import CA, CAPermission + from gatehouse_app.models.organization.organization_member import OrganizationMember + from gatehouse_app.utils.constants import OrganizationRole + + user = g.current_user + + ca = CA.query.filter_by(id=ca_id, deleted_at=None).first() + if not ca: + return api_response(success=False, message="CA not found", status=404, error_type="NOT_FOUND") + + # Verify user is admin/owner of the CA's org + if ca.organization_id: + membership = OrganizationMember.query.filter_by( + organization_id=ca.organization_id, + user_id=user.id, + deleted_at=None, + ).first() + if not membership or membership.role not in (OrganizationRole.ADMIN, OrganizationRole.OWNER): + return api_response(success=False, message="Admin access required", status=403, error_type="FORBIDDEN") + + perms = CAPermission.query.filter_by(ca_id=ca_id, deleted_at=None).all() + perm_list = [] + for p in perms: + d = p.to_dict() + d["user_email"] = p.user.email if p.user else None + perm_list.append(d) + + return api_response( + data={ + "ca_id": ca_id, + "permissions": perm_list, + "open_to_all": len(perms) == 0, + }, + message="CA permissions retrieved", + ) + + +@ssh_bp.route('/ca//permissions', methods=['POST']) +@login_required +def add_ca_permission(ca_id): + """Grant a user permission on a Certificate Authority. + + Request body: + user_id: UUID of the user to grant access + permission: "sign" or "admin" (default: "sign") + + Returns: + 201: Permission granted + 400: Validation error + 403: Not admin/owner + 404: CA or user not found + 409: Permission already exists + """ + from gatehouse_app.models.ssh_ca.ca import CA, CAPermission + from gatehouse_app.models.organization.organization_member import OrganizationMember + from gatehouse_app.models.user import User + from gatehouse_app.utils.constants import OrganizationRole, AuditAction + from gatehouse_app.models import AuditLog + from gatehouse_app.extensions import db + + user = g.current_user + + ca = CA.query.filter_by(id=ca_id, deleted_at=None).first() + if not ca: + return api_response(success=False, message="CA not found", status=404, error_type="NOT_FOUND") + + # Verify user is admin/owner of the CA's org + if ca.organization_id: + membership = OrganizationMember.query.filter_by( + organization_id=ca.organization_id, + user_id=user.id, + deleted_at=None, + ).first() + if not membership or membership.role not in (OrganizationRole.ADMIN, OrganizationRole.OWNER): + return api_response(success=False, message="Admin access required", status=403, error_type="FORBIDDEN") + + data = request.get_json() or {} + target_user_id = (data.get("user_id") or "").strip() + permission = data.get("permission", "sign") + + if not target_user_id: + return api_response(success=False, message="user_id is required", status=400, error_type="VALIDATION_ERROR") + if permission not in ("sign", "admin"): + return api_response( + success=False, + message="permission must be 'sign' or 'admin'", + status=400, + error_type="VALIDATION_ERROR", + ) + + target_user = User.query.filter_by(id=target_user_id, deleted_at=None).first() + if not target_user: + return api_response(success=False, message="User not found", status=404, error_type="NOT_FOUND") + + # Check for duplicate + existing = CAPermission.query.filter_by( + ca_id=ca_id, user_id=target_user_id, deleted_at=None + ).first() + if existing: + # Update permission level if different + if existing.permission != permission: + existing.permission = permission + db.session.commit() + d = existing.to_dict() + d["user_email"] = target_user.email + return api_response( + data={"message": "Permission updated", "permission": d}, + message="Permission updated", + ) + return api_response( + success=False, + message="User already has this permission on the CA", + status=409, + error_type="DUPLICATE", + ) + + perm = CAPermission( + ca_id=ca_id, + user_id=target_user_id, + permission=permission, + ) + db.session.add(perm) + db.session.commit() + + AuditLog.log( + action=AuditAction.CA_UPDATED, + user_id=user.id, + resource_type="CAPermission", + resource_id=perm.id, + ip_address=request.remote_addr, + description=f"Granted '{permission}' on CA '{ca.name}' to user {target_user.email}", + ) + + d = perm.to_dict() + d["user_email"] = target_user.email + return api_response( + data={"message": "Permission granted", "permission": d}, + message="Permission granted", + status=201, + ) + + +@ssh_bp.route('/ca//permissions/', methods=['DELETE']) +@login_required +def remove_ca_permission(ca_id, target_user_id): + """Revoke a user's permission on a Certificate Authority. + + Returns: + 200: Permission revoked + 403: Not admin/owner + 404: CA or permission not found + """ + from gatehouse_app.models.ssh_ca.ca import CA, CAPermission + from gatehouse_app.models.organization.organization_member import OrganizationMember + from gatehouse_app.utils.constants import OrganizationRole, AuditAction + from gatehouse_app.models import AuditLog + from gatehouse_app.extensions import db + + user = g.current_user + + ca = CA.query.filter_by(id=ca_id, deleted_at=None).first() + if not ca: + return api_response(success=False, message="CA not found", status=404, error_type="NOT_FOUND") + + # Verify user is admin/owner of the CA's org + if ca.organization_id: + membership = OrganizationMember.query.filter_by( + organization_id=ca.organization_id, + user_id=user.id, + deleted_at=None, + ).first() + if not membership or membership.role not in (OrganizationRole.ADMIN, OrganizationRole.OWNER): + return api_response(success=False, message="Admin access required", status=403, error_type="FORBIDDEN") + + perm = CAPermission.query.filter_by( + ca_id=ca_id, user_id=target_user_id, deleted_at=None + ).first() + if not perm: + return api_response(success=False, message="Permission not found", status=404, error_type="NOT_FOUND") + + perm.delete(soft=True) + + AuditLog.log( + action=AuditAction.CA_UPDATED, + user_id=user.id, + resource_type="CAPermission", + resource_id=perm.id, + ip_address=request.remote_addr, + description=f"Revoked permission on CA '{ca.name}' from user {target_user_id}", + ) + + return api_response( + data={}, + message="Permission revoked", + ) +