refactor: standardize audit logging for ISO27001 compliance

This commit is contained in:
Ubuntu
2026-05-14 05:59:49 +00:00
parent 417d462fb9
commit 815084132f
18 changed files with 184 additions and 100 deletions
+52 -19
View File
@@ -8,7 +8,7 @@ from gatehouse_app.api.v1.ssh._helpers import (
from gatehouse_app.services.ssh_ca_signing_service import SSHCertificateSigningRequest
from gatehouse_app.exceptions import SSHKeyNotFoundError, SSHCertificateError
from gatehouse_app.utils.constants import AuditAction, OrganizationRole
from gatehouse_app.models import AuditLog
from gatehouse_app.services.audit_service import AuditService
from gatehouse_app.models.ssh_ca.certificate_audit_log import CertificateAuditLog
from gatehouse_app.utils.decorators import login_required
from gatehouse_app.utils.response import api_response
@@ -68,10 +68,11 @@ def sign_certificate():
expiry_hours = data.get('expiry_hours')
requested_org_id = data.get('organization_id')
AuditLog.log(
AuditService.log_action(
action=AuditAction.SSH_CERT_REQUESTED,
user_id=user_id, resource_type='SSHCertificate', ip_address=request.remote_addr,
description=(f'{user.email} requested a certificate' + (f' for principals: {", ".join(requested_principals)}' if requested_principals else '')),
user_id=user_id,
resource_type="SSHCertificate",
description=f"{user.email} requested a certificate" + (f" for principals: {', '.join(requested_principals)}" if requested_principals else ""),
)
# Validate organization_id if provided
@@ -209,10 +210,24 @@ def sign_certificate():
ca_private_key_pem = decrypt_ca_key(db_ca.private_key)
response = ssh_ca_service.sign_certificate(signing_request, ca_private_key=ca_private_key_pem, ca_obj=db_ca)
except SSHCertificateError as e:
AuditLog.log(action=AuditAction.SSH_CERT_FAILED, user_id=user_id, resource_type='SSHCertificate', ip_address=request.remote_addr, success=False, error_message=str(e))
AuditService.log_action(
action=AuditAction.SSH_CERT_FAILED,
user_id=user_id,
resource_type="SSHCertificate",
description=f"Certificate signing failed",
success=False,
error_message=str(e),
)
return api_response(success=False, message=str(e), status=400, error_type="SIGNING_FAILED")
except Exception as e:
AuditLog.log(action=AuditAction.SSH_CERT_FAILED, user_id=user_id, resource_type='SSHCertificate', ip_address=request.remote_addr, success=False, error_message=str(e))
AuditService.log_action(
action=AuditAction.SSH_CERT_FAILED,
user_id=user_id,
resource_type="SSHCertificate",
description=f"Certificate signing failed",
success=False,
error_message=str(e),
)
return api_response(success=False, message="Certificate signing failed", status=500, error_type="SERVER_ERROR")
cert_record = _persist_certificate(
@@ -221,12 +236,14 @@ def sign_certificate():
cert_type_str=cert_type, cert_identity=cert_identity,
)
AuditLog.log(
action=AuditAction.SSH_CERT_ISSUED, user_id=user_id,
resource_type='SSHCertificate', resource_id=cert_record.id if cert_record else key_id,
ip_address=request.remote_addr,
description=f'Certificate serial={response.serial} issued for {user.email}; principals: {", ".join(principals)}',
extra_data={'serial': response.serial, 'key_id': cert_identity, 'principals': principals, 'ca_id': str(db_ca.id), 'ssh_key_id': str(key_id), 'organization_id': str(target_org.id), 'organization_name': target_org.name},
AuditService.log_action(
action=AuditAction.SSH_CERT_ISSUED,
user_id=user_id,
organization_id=str(target_org.id),
resource_type="SSHCertificate",
resource_id=cert_record.id if cert_record else key_id,
metadata={"serial": response.serial, "key_id": cert_identity, "principals": principals, "ca_id": str(db_ca.id), "ssh_key_id": str(key_id)},
description=f"Certificate serial={response.serial} issued for {user.email}; principals: {', '.join(principals)}",
)
if cert_record:
@@ -340,7 +357,15 @@ def sign_host_certificate():
ca_private_key_pem = decrypt_ca_key(host_ca.private_key)
response = ssh_ca_service.sign_certificate(signing_request, ca_private_key=ca_private_key_pem, ca_obj=host_ca)
except Exception as exc:
AuditLog.log(action=AuditAction.SSH_CERT_FAILED, user_id=user_id, resource_type="SSHCertificate", ip_address=request.remote_addr, success=False, error_message=str(exc))
AuditService.log_action(
action=AuditAction.SSH_CERT_FAILED,
user_id=user_id,
organization_id=host_ca.organization_id,
resource_type="SSHCertificate",
description=f"Host certificate signing failed",
success=False,
error_message=str(exc),
)
return api_response(success=False, message=f"Host certificate signing failed: {exc}", status=500, error_type="SIGNING_FAILED")
cert_record = _persist_certificate(
@@ -349,12 +374,14 @@ def sign_host_certificate():
cert_type_str="host", cert_identity=cert_identity,
)
AuditLog.log(
action=AuditAction.SSH_CERT_ISSUED, user_id=user_id,
resource_type="SSHCertificate", resource_id=cert_record.id if cert_record else None,
ip_address=request.remote_addr,
AuditService.log_action(
action=AuditAction.SSH_CERT_ISSUED,
user_id=user_id,
organization_id=host_ca.organization_id,
resource_type="SSHCertificate",
resource_id=cert_record.id if cert_record else None,
metadata={"serial": response.serial, "principals": principals, "ca_id": str(host_ca.id), "cert_type": "host"},
description=f"Host certificate serial={response.serial} issued for {primary_principal} by {user.email}",
extra_data={"serial": response.serial, "principals": principals, "ca_id": str(host_ca.id), "cert_type": "host"},
)
result = {
@@ -415,7 +442,13 @@ def revoke_certificate(cert_id):
return api_response(success=False, message='Certificate is already revoked', status=409, error_type='ALREADY_REVOKED')
cert.revoke(reason=reason)
AuditLog.log(action=AuditAction.SSH_CERT_REVOKED, user_id=user_id, resource_type='SSHCertificate', resource_id=cert_id, ip_address=request.remote_addr, description=f'Revoked: {reason}')
AuditService.log_action(
action=AuditAction.SSH_CERT_REVOKED,
user_id=user_id,
resource_type="SSHCertificate",
resource_id=cert_id,
description=f"Certificate revoked: {reason}",
)
# Get organization from certificate's CA for audit logging
from gatehouse_app.models.ssh_ca.ca import CA