feat(ssh): add multi-organization support for certificate signing

Add support for users who belong to multiple organizations to select
which organization's CA should sign their SSH certificates.

Changes:
- CLI: Add --org-id and --list-orgs options for organization selection
- API: Return MULTIPLE_ORGS_AMBIGUOUS error when org selection needed
- API: Add /users/me/organizations/simple endpoint for CLI org listing
- DB: Add organization_id to certificate_audit_logs for better tracking
- Include organization_name in certificate response for clarity
This commit is contained in:
2026-04-24 22:27:24 +09:30
parent 015c622016
commit cec04f3cb2
8 changed files with 314 additions and 46 deletions
+5 -2
View File
@@ -11,11 +11,14 @@ ssh_ca_service = SSHCASigningService()
_logger = logging.getLogger(__name__)
def _get_org_ca_for_user(user, ca_type: str = "user"):
def _get_org_ca_for_user(user, ca_type: str = "user", organization_id=None):
try:
from gatehouse_app.models.ssh_ca.ca import CA, CaType
org_ids = [m.organization_id for m in user.get_active_memberships()]
if organization_id:
org_ids = [organization_id]
else:
org_ids = [m.organization_id for m in user.get_active_memberships()]
if not org_ids:
return None
+73 -22
View File
@@ -14,6 +14,12 @@ from gatehouse_app.utils.decorators import login_required
from gatehouse_app.utils.response import api_response
def _validate_uuid(uuid_str: str) -> bool:
"""Validate UUID format."""
import re
return bool(re.match(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', uuid_str, re.I))
@ssh_bp.route('/dept-cert-policy', methods=['GET'])
@login_required
def get_my_dept_cert_policy():
@@ -60,6 +66,7 @@ def sign_certificate():
cert_type = data.get('cert_type', 'user')
key_id = data.get('key_id') or data.get('cert_id')
expiry_hours = data.get('expiry_hours')
requested_org_id = data.get('organization_id')
AuditLog.log(
action=AuditAction.SSH_CERT_REQUESTED,
@@ -67,22 +74,63 @@ def sign_certificate():
description=(f'{user.email} requested a certificate' + (f' for principals: {", ".join(requested_principals)}' if requested_principals else '')),
)
allowed_principal_names = set()
# Validate organization_id if provided
if requested_org_id and not _validate_uuid(requested_org_id):
return api_response(success=False, message="Invalid organization_id format. Must be a valid UUID.", status=400, error_type="INVALID_ORG_ID")
# Get user's active organization memberships
memberships = OrganizationMember.query.filter_by(user_id=user_id, deleted_at=None).all()
for om in memberships:
org = om.organization
if not org or org.deleted_at is not None:
continue
role = om.role
active_memberships = [om for om in memberships if om.organization and om.organization.deleted_at is None]
if not active_memberships:
return api_response(success=False, message="You are not a member of any active organizations.", status=400, error_type="NO_ORG_MEMBERSHIPS")
# Select target organization
target_org = None
if requested_org_id:
# Check if user is member of the requested organization
target_membership = next((om for om in active_memberships if str(om.organization_id).lower() == requested_org_id.lower()), None)
if not target_membership:
return api_response(success=False, message="You are not a member of the specified organization.", status=403, error_type="NOT_ORG_MEMBER")
target_org = target_membership.organization
if not target_org or target_org.deleted_at is not None:
return api_response(success=False, message="The specified organization was not found or has been deleted.", status=404, error_type="ORG_NOT_FOUND")
else:
# No organization specified - use default logic for backward compatibility
if len(active_memberships) > 1:
org_names = [om.organization.name for om in active_memberships]
orgs_data = [
{
"id": m.organization_id,
"name": m.organization.name,
"role": m.role.value if hasattr(m.role, "value") else str(m.role)
}
for m in active_memberships
]
return api_response(
success=False,
message="You are a member of multiple organizations. Please specify organization_id.",
status=400,
error_type="MULTIPLE_ORGS_AMBIGUOUS",
error_details={"organizations": orgs_data}
)
target_org = active_memberships[0].organization
# Get allowed principals for the selected organization
allowed_principal_names = set()
target_membership = next((om for om in active_memberships if str(om.organization_id).lower() == str(target_org.id).lower()), None)
if target_membership:
role = target_membership.role
if role in (OrganizationRole.ADMIN, OrganizationRole.OWNER):
for p in Principal.query.filter_by(organization_id=org.id, deleted_at=None).all():
for p in Principal.query.filter_by(organization_id=target_org.id, deleted_at=None).all():
allowed_principal_names.add(p.name)
else:
for pm in PrincipalMembership.query.filter_by(user_id=user_id, deleted_at=None).all():
if pm.principal and pm.principal.organization_id == org.id and pm.principal.deleted_at is None:
if pm.principal and pm.principal.organization_id == target_org.id and pm.principal.deleted_at is None:
allowed_principal_names.add(pm.principal.name)
for dm in DepartmentMembership.query.filter_by(user_id=user_id, deleted_at=None).all():
if dm.department and dm.department.organization_id == org.id and dm.department.deleted_at is None:
if dm.department and dm.department.organization_id == target_org.id and dm.department.deleted_at is None:
for dp in DepartmentPrincipal.query.filter_by(department_id=dm.department_id, deleted_at=None).all():
if dp.principal and dp.principal.deleted_at is None:
allowed_principal_names.add(dp.principal.name)
@@ -114,7 +162,8 @@ def sign_certificate():
if not ssh_key.verified:
return api_response(success=False, message="SSH key is not verified. Verify it before requesting a certificate.", status=400, error_type="KEY_NOT_VERIFIED")
db_ca = _get_org_ca_for_user(user, ca_type=cert_type)
# Use the selected organization's ID for CA selection
db_ca = _get_org_ca_for_user(user, ca_type=cert_type, organization_id=target_org.id)
if db_ca is None:
return api_response(
success=False,
@@ -122,11 +171,7 @@ def sign_certificate():
status=503, error_type="CA_NOT_CONFIGURED",
)
is_org_admin = any(
om.role in (OrganizationRole.ADMIN, OrganizationRole.OWNER)
for om in memberships
if om.organization and om.organization.deleted_at is None
)
is_org_admin = target_membership.role in (OrganizationRole.ADMIN, OrganizationRole.OWNER) if target_membership else False
dept_policy = _get_merged_dept_cert_policy(user_id)
if dept_policy:
@@ -146,11 +191,7 @@ def sign_certificate():
else:
policy_extensions = None
org_slugs = sorted({
om.organization.slug for om in memberships
if om.organization and om.organization.deleted_at is None and getattr(om.organization, 'slug', None)
})
org_slug = org_slugs[0] if org_slugs else "unknown"
org_slug = getattr(target_org, 'slug', 'unknown')
full_name = getattr(user, 'full_name', None) or getattr(user, 'name', None) or "unknown"
cert_identity = f"{user.email} ({full_name}) [org:{org_slug}]"
@@ -185,12 +226,13 @@ def sign_certificate():
resource_type='SSHCertificate', resource_id=cert_record.id if cert_record else key_id,
ip_address=request.remote_addr,
description=f'Certificate serial={response.serial} issued for {user.email}; principals: {", ".join(principals)}',
extra_data={'serial': response.serial, 'key_id': cert_identity, 'principals': principals, 'ca_id': str(db_ca.id), 'ssh_key_id': str(key_id)},
extra_data={'serial': response.serial, 'key_id': cert_identity, 'principals': principals, 'ca_id': str(db_ca.id), 'ssh_key_id': str(key_id), 'organization_id': str(target_org.id), 'organization_name': target_org.name},
)
if cert_record:
CertificateAuditLog.log(
certificate_id=cert_record.id, action='issued', user_id=user_id,
organization_id=str(target_org.id),
ip_address=request.remote_addr, user_agent=request.headers.get('User-Agent'),
message=f'Certificate serial={response.serial} issued for {user.email}; principals: {", ".join(principals)}',
extra_data={
@@ -198,6 +240,7 @@ def sign_certificate():
'ca_id': str(db_ca.id), 'ssh_key_id': str(key_id),
'valid_after': response.valid_after.isoformat() if response.valid_after else None,
'valid_before': response.valid_before.isoformat() if response.valid_before else None,
'organization_id': str(target_org.id),
},
success=True,
)
@@ -207,6 +250,8 @@ def sign_certificate():
'principals': response.principals,
'valid_after': response.valid_after.isoformat() if response.valid_after else None,
'valid_before': response.valid_before.isoformat() if response.valid_before else None,
'organization_id': str(target_org.id),
'organization_name': target_org.name,
}
if cert_record:
result['cert_id'] = str(cert_record.id)
@@ -371,7 +416,13 @@ def revoke_certificate(cert_id):
cert.revoke(reason=reason)
AuditLog.log(action=AuditAction.SSH_CERT_REVOKED, user_id=user_id, resource_type='SSHCertificate', resource_id=cert_id, ip_address=request.remote_addr, description=f'Revoked: {reason}')
CertificateAuditLog.log(certificate_id=cert_id, action='revoked', user_id=user_id, ip_address=request.remote_addr, user_agent=request.headers.get('User-Agent'), message=f'Certificate revoked: {reason}', success=True)
# Get organization from certificate's CA for audit logging
from gatehouse_app.models.ssh_ca.ca import CA
ca = CA.query.get(cert.ca_id)
org_id = ca.organization_id if ca else None
CertificateAuditLog.log(certificate_id=cert_id, action='revoked', user_id=user_id, organization_id=org_id, ip_address=request.remote_addr, user_agent=request.headers.get('User-Agent'), message=f'Certificate revoked: {reason}', success=True)
return api_response(success=True, message='Certificate revoked successfully', data={'status': 'revoked', 'cert_id': cert_id, 'reason': reason}, status=200)
except Exception as e: