Feat(Fix): CA manage Host/User Key

This commit is contained in:
2026-03-01 20:41:32 +05:45
parent 9875216861
commit be87fd90b1
2 changed files with 499 additions and 42 deletions
+248 -2
View File
@@ -14,6 +14,73 @@ from gatehouse_app.services.organization_service import OrganizationService
from gatehouse_app.services.user_service import UserService
from gatehouse_app.utils.constants import OrganizationRole
from gatehouse_app.extensions import db
def _get_system_ca_dict():
"""Return a synthetic read-only CA dict for the config-file CA, or None.
This is injected into the org CA list when no DB CA exists for a given
ca_type so that the admin UI correctly shows "configured" rather than
"Not configured" when a system-level CA key is present.
The returned dict has ``is_system=True`` so the frontend can render it
as read-only (no delete / edit / generate buttons).
"""
import os
try:
from gatehouse_app.config.ssh_ca_config import get_ssh_ca_config
from gatehouse_app.utils.crypto import compute_ssh_fingerprint
# Check env var first (takes priority over file path)
priv_key = os.environ.get("SSH_CA_PRIVATE_KEY", "").strip()
pub_key = ""
if not priv_key:
cfg = get_ssh_ca_config()
key_path = cfg.get_str("ca_key_path", "").strip()
if not key_path:
return None
pub_path = key_path + ".pub"
if not os.path.exists(pub_path):
return None
with open(pub_path) as f:
pub_key = f.read().strip()
else:
# Derive the public key from the private key
from sshkey_tools.keys import PrivateKey
pk = PrivateKey.from_string(priv_key)
pub_key = pk.public_key.to_string()
fingerprint = compute_ssh_fingerprint(pub_key)
return {
"id": f"system-ca-{fingerprint[:16]}",
"organization_id": None,
"name": "System CA (config file)",
"description": (
"Read-only — this CA is loaded from the server's SSH_CA_PRIVATE_KEY "
"environment variable or etc/ssh_ca.conf. Manage it on the server."
),
# ca_type is set by the caller
"ca_type": "user",
"key_type": "unknown",
"public_key": pub_key,
"fingerprint": fingerprint,
"is_active": True,
"is_system": True,
"default_cert_validity_hours": 0,
"max_cert_validity_hours": 0,
"total_certs": 0,
"active_certs": 0,
"revoked_certs": 0,
"created_at": None,
"updated_at": None,
}
except Exception:
return None
@api_v1_bp.route("/organizations", methods=["POST"])
@login_required
@full_access_required
@@ -1167,12 +1234,17 @@ def remove_role_from_member(org_id, role_name, user_id):
def list_org_cas(org_id):
"""List all Certificate Authorities for an organization.
If the system config-file CA is configured (via SSH_CA_PRIVATE_KEY env var
or ca_key_path in etc/ssh_ca.conf) and no DB CA exists for a given ca_type,
a synthetic read-only entry is injected so the UI correctly shows the
system CA as configured rather than "Not configured".
Returns:
200: List of CAs (private_key excluded)
403: Not admin/owner
404: Org not found
"""
from gatehouse_app.models.ssh_ca.ca import CA
from gatehouse_app.models.ssh_ca.ca import CA, CaType
from gatehouse_app.models.organization.organization import Organization
org = Organization.query.filter_by(id=org_id, deleted_at=None).first()
@@ -1180,8 +1252,22 @@ def list_org_cas(org_id):
return api_response(success=False, message="Organization not found", status=404, error_type="NOT_FOUND")
cas = CA.query.filter_by(organization_id=org_id, deleted_at=None).all()
ca_list = [ca.to_dict() for ca in cas]
# Determine which ca_types are already covered by a DB CA
covered_types = {ca.ca_type for ca in cas}
# Check whether a system config-file CA is available
system_ca_dict = _get_system_ca_dict()
if system_ca_dict:
# Inject a synthetic entry for each ca_type NOT covered by a real DB CA.
# The system CA only signs user certs (cert_type="user"), so we only
# inject it for the user slot. Host signing always needs a DB CA.
if CaType.USER not in covered_types:
ca_list.append({**system_ca_dict, "ca_type": "user"})
return api_response(
data={"cas": [ca.to_dict() for ca in cas], "count": len(cas)},
data={"cas": ca_list, "count": len(ca_list)},
message="CAs retrieved",
)
@@ -1403,3 +1489,163 @@ def create_org_ca(org_id):
)
@api_v1_bp.route("/organizations/<org_id>/cas/<ca_id>", methods=["DELETE"])
@login_required
@require_admin
def delete_org_ca(org_id, ca_id):
"""Soft-delete a Certificate Authority.
Deactivates the CA so no new certificates can be signed with it.
Existing certificates remain valid until they expire.
Returns:
200: CA deleted successfully
403: Not admin/owner
404: Org or CA not found
"""
from gatehouse_app.models.ssh_ca.ca import CA
from gatehouse_app.models.organization.organization import Organization
from gatehouse_app.utils.constants import AuditAction
from gatehouse_app.models import AuditLog
org = Organization.query.filter_by(id=org_id, deleted_at=None).first()
if not org:
return api_response(success=False, message="Organization not found", status=404, error_type="NOT_FOUND")
ca = CA.query.filter_by(id=ca_id, organization_id=org_id, deleted_at=None).first()
if not ca:
return api_response(success=False, message="CA not found", status=404, error_type="NOT_FOUND")
try:
ca_name = ca.name
ca_type = ca.ca_type.value if hasattr(ca.ca_type, "value") else str(ca.ca_type)
ca.is_active = False
ca.delete(soft=True)
AuditLog.log(
action=AuditAction.CA_DELETED,
user_id=g.current_user.id,
resource_type="CA",
resource_id=ca_id,
organization_id=org_id,
ip_address=request.remote_addr,
description=f"CA '{ca_name}' ({ca_type}) deleted",
)
return api_response(
data={"ca_id": ca_id},
message="CA deleted successfully",
)
except Exception as e:
db.session.rollback()
current_app.logger.exception("Failed to delete CA")
return api_response(
success=False,
message="Failed to delete CA",
status=500,
error_type="SERVER_ERROR",
)
@api_v1_bp.route("/organizations/<org_id>/cas/<ca_id>/rotate", methods=["POST"])
@login_required
@require_admin
def rotate_org_ca(org_id, ca_id):
"""Rotate (replace) a CA's key pair.
Generates a new key pair of the same or different type. The old public key
fingerprint is returned so admins can update TrustedUserCAKeys / known_hosts
on their servers. All previously-issued certificates remain valid until they
expire but no new certificates will be signed with the old key.
Request body (all optional):
key_type: "ed25519" (default keeps current), "rsa", or "ecdsa"
reason: Human-readable reason for the rotation
Returns:
200: CA rotated — { ca, old_fingerprint }
403: Not admin/owner
404: Org or CA not found
"""
from gatehouse_app.models.ssh_ca.ca import CA, KeyType
from gatehouse_app.models.organization.organization import Organization
from gatehouse_app.utils.crypto import compute_ssh_fingerprint
from gatehouse_app.utils.constants import AuditAction
from gatehouse_app.models import AuditLog
from sshkey_tools.keys import Ed25519PrivateKey, RsaPrivateKey, EcdsaPrivateKey
org = Organization.query.filter_by(id=org_id, deleted_at=None).first()
if not org:
return api_response(success=False, message="Organization not found", status=404, error_type="NOT_FOUND")
ca = CA.query.filter_by(id=ca_id, organization_id=org_id, deleted_at=None).first()
if not ca:
return api_response(success=False, message="CA not found", status=404, error_type="NOT_FOUND")
data = request.get_json() or {}
new_key_type = data.get("key_type") or (ca.key_type.value if hasattr(ca.key_type, "value") else str(ca.key_type))
reason = data.get("reason", "Admin-initiated key rotation")
if new_key_type not in ("ed25519", "rsa", "ecdsa"):
return api_response(
success=False,
message="Invalid key_type. Must be one of: ed25519, rsa, ecdsa",
status=400,
error_type="VALIDATION_ERROR",
)
try:
old_fingerprint = ca.fingerprint
# Generate new key pair
if new_key_type == "ed25519":
private_key_obj = Ed25519PrivateKey.generate()
elif new_key_type == "rsa":
private_key_obj = RsaPrivateKey.generate(4096)
else: # ecdsa
private_key_obj = EcdsaPrivateKey.generate()
new_private_key = private_key_obj.to_string()
new_public_key = private_key_obj.public_key.to_string()
new_fingerprint = compute_ssh_fingerprint(new_public_key)
ca.rotate_key(
new_private_key=new_private_key,
new_public_key=new_public_key,
new_fingerprint=new_fingerprint,
reason=reason,
)
ca.key_type = KeyType(new_key_type)
db.session.commit()
AuditLog.log(
action=AuditAction.CA_KEY_ROTATED,
user_id=g.current_user.id,
resource_type="CA",
resource_id=ca_id,
organization_id=org_id,
ip_address=request.remote_addr,
description=(
f"CA '{ca.name}' key rotated. "
f"Old fingerprint: {old_fingerprint}, New fingerprint: {new_fingerprint}. "
f"Reason: {reason}"
),
)
return api_response(
data={
"ca": ca.to_dict(),
"old_fingerprint": old_fingerprint,
},
message="CA key rotated successfully. Update TrustedUserCAKeys / known_hosts on your servers.",
)
except Exception as e:
db.session.rollback()
current_app.logger.exception("Failed to rotate CA key")
return api_response(
success=False,
message="Failed to rotate CA key",
status=500,
error_type="SERVER_ERROR",
)