diff --git a/gatehouse_app/api/v1/ssh.py b/gatehouse_app/api/v1/ssh.py index 8a54ae5..35d2ac6 100644 --- a/gatehouse_app/api/v1/ssh.py +++ b/gatehouse_app/api/v1/ssh.py @@ -131,7 +131,10 @@ def _persist_certificate(user_id, ssh_key_id, ca, signing_response, request_ip=N Args: user_id: UUID of the user - ssh_key_id: UUID of the SSH key that was signed + ssh_key_id: UUID of the SSH key that was signed. May be None for host + certificates issued against a raw public key (no pre-registered + SSHKey DB record). When None the record is still persisted + but ``ssh_key_id`` is left NULL (requires nullable FK migration). ca: CA model instance (may be None — cert still returned but not persisted) signing_response: SSHCertificateSigningResponse request_ip: Client IP address @@ -158,10 +161,10 @@ def _persist_certificate(user_id, ssh_key_id, ca, signing_response, request_ip=N cert_record = SSHCertificate( ca_id=ca.id, user_id=user_id, - ssh_key_id=ssh_key_id, + ssh_key_id=ssh_key_id, # None is OK for host certs (nullable FK) certificate=signing_response.certificate, serial=signing_response.serial, - key_id=cert_identity or str(ssh_key_id), + key_id=cert_identity or (str(ssh_key_id) if ssh_key_id else "host-cert"), cert_type=resolved_cert_type, principals=signing_response.principals, valid_after=signing_response.valid_after, @@ -744,6 +747,312 @@ def sign_certificate(): return api_response(data=result, message="Certificate signed successfully", status=201) +# --------------------------------------------------------------------------- +# Host certificate issuance (admin-only) +# --------------------------------------------------------------------------- + +def _classify_ssh_key_material(raw: str) -> str: + """Classify a raw SSH key string. + + Returns one of: 'certificate', 'public_key', 'private_key', 'unknown'. + This mirrors the frontend ``classifySshKeyMaterial`` helper so that the + API produces the same guardrails even when called directly (e.g. via CLI). + """ + import re + line = raw.strip().split()[0] if raw.strip() else "" + if re.search(r"-cert-v01@openssh\.com$", line): + return "certificate" + if re.match( + r"^(ssh-ed25519|ssh-rsa|ssh-dss|ecdsa-sha2-nistp\d+|sk-ssh-ed25519@openssh\.com)$", + line, + ): + return "public_key" + if "BEGIN OPENSSH PRIVATE KEY" in raw or "BEGIN RSA PRIVATE KEY" in raw: + return "private_key" + return "unknown" + + +@ssh_bp.route('/sign/host', methods=['POST']) +@login_required +def sign_host_certificate(): + """Issue a host certificate for a server's host public key. + + This endpoint is admin-only. It accepts a raw OpenSSH host public key + (the kind found in ``/etc/ssh/ssh_host_ed25519_key.pub``), signs it with + the organisation's Host CA, and returns the signed host certificate. + + The certificate should be saved on the server as + ``/etc/ssh/ssh_host_ed25519_key-cert.pub`` and referenced in + ``sshd_config`` as ``HostCertificate``. + + Clients trust the host because they have the Host CA *public key* in their + ``known_hosts`` (via ``@cert-authority``). That key is different from — + and must never be confused with — the certificate returned here. + + Request body (JSON): + host_public_key (str, required): + Raw OpenSSH host public key, e.g. + "ssh-ed25519 AAAA... root@server". + Must NOT be a certificate (ssh-*-cert-v01@openssh.com) or a + private key. + principals (list[str], required): + Hostnames / FQDNs the server is known by, e.g. + ["prod.example.com", "web01.internal"]. + These must match what SSH clients use in their connection target. + validity_hours (int, optional, default=720): + Certificate validity in hours. Host certs are typically + 30 days (720 h) to 1 year (8760 h). + ca_id (str, required): + UUID of the Host CA to sign with. Must be a ``ca_type=host`` CA + belonging to the caller's organisation. + + Returns (201): + certificate, serial, principals, valid_after, valid_before + + Errors: + 400 BAD_REQUEST — pasted material is a cert / private key / unknown + 403 FORBIDDEN — caller is not an org admin/owner + 404 CA_NOT_FOUND — ca_id does not exist or is not a host CA + 422 VALIDATION_ERROR — invalid principals, validity, or public key + 503 CA_NOT_CONFIGURED + """ + from gatehouse_app.models.organization.organization_member import OrganizationMember + from gatehouse_app.models.ssh_ca.ca import CA, CaType + from gatehouse_app.utils.constants import OrganizationRole + from gatehouse_app.utils.ca_key_encryption import decrypt_ca_key + + user = g.current_user + user_id = user.id + + # ── Admin-only gate ─────────────────────────────────────────────────────── + is_admin = OrganizationMember.query.filter( + OrganizationMember.user_id == user_id, + OrganizationMember.role.in_([OrganizationRole.ADMIN, OrganizationRole.OWNER]), + OrganizationMember.deleted_at.is_(None), + ).first() is not None + + if not is_admin: + return api_response( + success=False, + message="Issuing host certificates requires org admin or owner role.", + status=403, + error_type="FORBIDDEN", + ) + + data = request.get_json() + if not data: + return api_response(success=False, message="No JSON data provided", status=400, error_type="BAD_REQUEST") + + host_public_key = (data.get("host_public_key") or "").strip() + principals = data.get("principals") or [] + validity_hours = data.get("validity_hours", 720) + ca_id = (data.get("ca_id") or "").strip() + + # ── Validate host public key material ───────────────────────────────────── + if not host_public_key: + return api_response( + success=False, + message="host_public_key is required.", + status=400, + error_type="BAD_REQUEST", + ) + + key_kind = _classify_ssh_key_material(host_public_key) + if key_kind == "certificate": + return api_response( + success=False, + message=( + "You submitted a certificate (ssh-…-cert-v01@openssh.com), not a host public key. " + "Retrieve the server's host public key with: " + "cat /etc/ssh/ssh_host_ed25519_key.pub" + ), + status=400, + error_type="WRONG_KEY_MATERIAL", + ) + if key_kind == "private_key": + return api_response( + success=False, + message="Private keys must never be submitted here. Use the .pub file.", + status=400, + error_type="WRONG_KEY_MATERIAL", + ) + if key_kind == "unknown": + return api_response( + success=False, + message=( + "Unrecognised key format. " + "Expected an OpenSSH public key starting with ssh-ed25519, ssh-rsa, or ecdsa-sha2-*." + ), + status=400, + error_type="WRONG_KEY_MATERIAL", + ) + + # ── Validate principals ─────────────────────────────────────────────────── + if not principals or not isinstance(principals, list): + return api_response( + success=False, + message="principals must be a non-empty list of hostnames.", + status=422, + error_type="VALIDATION_ERROR", + ) + principals = [str(p).strip() for p in principals if str(p).strip()] + if not principals: + return api_response( + success=False, + message="At least one principal (hostname/FQDN) is required.", + status=422, + error_type="VALIDATION_ERROR", + ) + + # ── Validate validity ───────────────────────────────────────────────────── + try: + validity_hours = int(validity_hours) + if validity_hours < 1: + raise ValueError + except (TypeError, ValueError): + return api_response( + success=False, + message="validity_hours must be a positive integer.", + status=422, + error_type="VALIDATION_ERROR", + ) + + # ── Resolve CA ──────────────────────────────────────────────────────────── + if not ca_id: + return api_response( + success=False, + message="ca_id is required.", + status=400, + error_type="BAD_REQUEST", + ) + + org_ids = [ + m.organization_id + for m in OrganizationMember.query.filter_by(user_id=user_id, deleted_at=None).all() + ] + + # First: find the CA by id (ignoring type) so we can give a specific error + # if it exists but is the wrong type. + any_ca = CA.query.filter( + CA.id == ca_id, + CA.is_active.is_(True), + CA.organization_id.in_(org_ids), + CA.deleted_at.is_(None), + ).first() + + if any_ca and any_ca.ca_type != CaType.HOST: + return api_response( + success=False, + message=( + f"The CA '{any_ca.name}' is a {any_ca.ca_type.value} CA. " + "Host certificates must be signed by a ca_type='host' CA." + ), + status=400, + error_type="WRONG_CA_TYPE", + ) + + host_ca = any_ca # already filtered for org + active + not-deleted above + + if not host_ca: + return api_response( + success=False, + message=( + "Host CA not found, inactive, or you do not have permission to use it. " + "Ensure the CA exists and ca_type is 'host'." + ), + status=404, + error_type="CA_NOT_FOUND", + ) + + # ── Build key_id for the OpenSSH cert Key ID field ──────────────────────── + # Format: "host: [signed-by:]" + primary_principal = principals[0] + cert_identity = f"host:{primary_principal} [signed-by:{user.email}]" + + signing_request = SSHCertificateSigningRequest( + ssh_public_key=host_public_key, + principals=principals, + cert_type="host", + key_id=cert_identity, + expiry_hours=validity_hours, + extensions=[], # Host certs carry no extensions (OpenSSH spec) + critical_options={}, + ) + + validation_errors = signing_request.validate() + if validation_errors: + return api_response( + success=False, + message="Invalid signing request: " + "; ".join(validation_errors), + status=422, + error_type="VALIDATION_ERROR", + ) + + try: + ca_private_key_pem = decrypt_ca_key(host_ca.private_key) + response = ssh_ca_service.sign_certificate( + signing_request, ca_private_key=ca_private_key_pem, ca_obj=host_ca + ) + except Exception as exc: + AuditLog.log( + action=AuditAction.SSH_CERT_FAILED, + user_id=user_id, + resource_type="SSHCertificate", + ip_address=request.remote_addr, + success=False, + error_message=str(exc), + ) + return api_response( + success=False, + message=f"Host certificate signing failed: {exc}", + status=500, + error_type="SIGNING_FAILED", + ) + + # Persist a cert record linked to the issuing admin (no ssh_key_id FK + # because this was a raw key, not a registered user key). + # We reuse _persist_certificate with ssh_key_id=ca_id as a stable sentinel. + cert_record = _persist_certificate( + user_id=user_id, + ssh_key_id=None, # host certs are not tied to a user SSH key record + ca=host_ca, + signing_response=response, + request_ip=request.remote_addr, + cert_type_str="host", + cert_identity=cert_identity, + ) + + AuditLog.log( + action=AuditAction.SSH_CERT_ISSUED, + user_id=user_id, + resource_type="SSHCertificate", + resource_id=cert_record.id if cert_record else None, + ip_address=request.remote_addr, + description=( + f"Host certificate serial={response.serial} issued for " + f"{primary_principal} by {user.email}" + ), + extra_data={ + "serial": response.serial, + "principals": principals, + "ca_id": str(host_ca.id), + "cert_type": "host", + }, + ) + + result = { + "certificate": response.certificate, + "serial": response.serial, + "principals": response.principals, + "valid_after": response.valid_after.isoformat() if response.valid_after else None, + "valid_before": response.valid_before.isoformat() if response.valid_before else None, + } + if cert_record: + result["cert_id"] = str(cert_record.id) + + return api_response(data=result, message="Host certificate issued successfully", status=201) + + @ssh_bp.route('/certificates', methods=['GET']) @login_required def list_certificates(): @@ -758,6 +1067,7 @@ def list_certificates(): .order_by(SSHCertificate.created_at.desc()) .all() ) + return api_response( data={ 'certificates': [c.to_dict() for c in certs], diff --git a/gatehouse_app/models/ssh_ca/ssh_certificate.py b/gatehouse_app/models/ssh_ca/ssh_certificate.py index f226a69..a76fbd4 100644 --- a/gatehouse_app/models/ssh_ca/ssh_certificate.py +++ b/gatehouse_app/models/ssh_ca/ssh_certificate.py @@ -42,7 +42,7 @@ class SSHCertificate(BaseModel): ssh_key_id = db.Column( db.String(36), db.ForeignKey("ssh_keys.id"), - nullable=False, + nullable=True, # Nullable: host certs may be issued against a raw public key index=True, ) diff --git a/gatehouse_app/services/ssh_ca_signing_service.py b/gatehouse_app/services/ssh_ca_signing_service.py index 1502ab8..2f760cc 100644 --- a/gatehouse_app/services/ssh_ca_signing_service.py +++ b/gatehouse_app/services/ssh_ca_signing_service.py @@ -53,7 +53,9 @@ class SSHCertificateSigningRequest: self.cert_type = cert_type self.expiry_hours = expiry_hours self.critical_options = critical_options or {} - self.extensions = extensions or [] + # Preserve None vs [] distinction: None means "use CA/policy default"; + # [] means "explicitly no extensions" (correct for host certificates). + self.extensions = extensions # type: Optional[List[str]] def validate(self) -> List[str]: """Validate the signing request. @@ -273,11 +275,18 @@ class SSHCASigningService: ) # ───────────────────────────────────────────────────────────────── - # Set extensions — prefer policy-provided list, fall back to standard set + # Set extensions — prefer policy-provided list, fall back to standard set. + # Host certificates (cert_type=2) must have NO extensions per the OpenSSH + # spec; passing an empty list is correct and intentional for host certs. + # Only fall back to STANDARD_EXTENSIONS for user certificates when the + # caller did not supply an explicit (possibly empty) extension list. extensions = signing_request.extensions - if not extensions: - from gatehouse_app.models.organization.department_cert_policy import STANDARD_EXTENSIONS - extensions = list(STANDARD_EXTENSIONS) + if extensions is None: + if signing_request.cert_type == "user": + from gatehouse_app.models.organization.department_cert_policy import STANDARD_EXTENSIONS + extensions = list(STANDARD_EXTENSIONS) + else: + extensions = [] # host certs: no extensions certificate.fields.extensions = extensions certificate.fields.critical_options = signing_request.critical_options or {} diff --git a/migrations/versions/db15faee1fb8_allow_null_ssh_key_id_for_host_certs.py b/migrations/versions/db15faee1fb8_allow_null_ssh_key_id_for_host_certs.py new file mode 100644 index 0000000..eb33895 --- /dev/null +++ b/migrations/versions/db15faee1fb8_allow_null_ssh_key_id_for_host_certs.py @@ -0,0 +1,42 @@ +"""allow_null_ssh_key_id_for_host_certs + +Make ssh_certificates.ssh_key_id nullable so that host certificates issued +against a raw server host public key (i.e. not a pre-registered SSHKey record) +can be persisted in the database. + +Revision ID: db15faee1fb8 +Revises: 018_audit_enum_values +Create Date: 2026-03-03 16:55:54.030674 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = 'db15faee1fb8' +down_revision = '018_audit_enum_values' +branch_labels = None +depends_on = None + + +def upgrade(): + op.alter_column( + 'ssh_certificates', + 'ssh_key_id', + existing_type=sa.VARCHAR(length=36), + nullable=True, + ) + + +def downgrade(): + # Null out any rows introduced by host-cert issuance before restoring NOT NULL + op.execute( + "UPDATE ssh_certificates SET ssh_key_id = '00000000-0000-0000-0000-000000000000' " + "WHERE ssh_key_id IS NULL" + ) + op.alter_column( + 'ssh_certificates', + 'ssh_key_id', + existing_type=sa.VARCHAR(length=36), + nullable=False, + ) diff --git a/pytest.ini b/pytest.ini index 14caba6..e9a4053 100644 --- a/pytest.ini +++ b/pytest.ini @@ -6,7 +6,7 @@ python_functions = test_* addopts = -v --strict-markers - --cov=app + --cov=gatehouse_app --cov-report=term-missing --cov-report=html --cov-branch