5250d18eb0
CA Encryption, Serials, Rate Limiter, Account suspension blocks login Transfer Ownership & Delete Account
1109 lines
42 KiB
Python
1109 lines
42 KiB
Python
"""SSH Key and Certificate API routes."""
|
|
from flask import Blueprint, request, jsonify, g
|
|
from sqlalchemy.exc import IntegrityError
|
|
from gatehouse_app.services.ssh_key_service import SSHKeyService
|
|
from gatehouse_app.services.ssh_ca_signing_service import (
|
|
SSHCASigningService,
|
|
SSHCertificateSigningRequest,
|
|
)
|
|
from gatehouse_app.exceptions import (
|
|
SSHKeyError,
|
|
SSHKeyNotFoundError,
|
|
SSHCertificateError,
|
|
ValidationError,
|
|
SSHKeyAlreadyExistsError,
|
|
)
|
|
from gatehouse_app.utils.constants import AuditAction
|
|
from gatehouse_app.models import AuditLog
|
|
from gatehouse_app.models.ssh_ca.certificate_audit_log import CertificateAuditLog
|
|
from gatehouse_app.utils.decorators import login_required
|
|
from gatehouse_app.utils.response import api_response
|
|
|
|
ssh_bp = Blueprint('ssh', __name__, url_prefix='/ssh')
|
|
ssh_key_service = SSHKeyService()
|
|
ssh_ca_service = SSHCASigningService()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _get_org_ca_for_user(user, ca_type: str = "user"):
|
|
"""Return the active DB CA of the given type for the user's first org, or None.
|
|
|
|
Args:
|
|
user: The current user object.
|
|
ca_type: ``"user"`` (default) or ``"host"`` — selects the CA that signs
|
|
the corresponding certificate type.
|
|
"""
|
|
try:
|
|
from gatehouse_app.models.ssh_ca.ca import CA, CaType
|
|
org_ids = [m.organization_id for m in user.organization_memberships]
|
|
if not org_ids:
|
|
return None
|
|
return CA.query.filter(
|
|
CA.organization_id.in_(org_ids),
|
|
CA.ca_type == CaType(ca_type),
|
|
CA.is_active == True, # noqa: E712
|
|
).first()
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _get_or_create_system_ca():
|
|
"""
|
|
Return a CA DB record representing the config-file CA.
|
|
|
|
This is used as the ``ca_id`` FK when persisting certificates that were
|
|
signed by the globally-configured CA key (not an org-specific DB CA).
|
|
The record is created on first use and has no ``organization_id``.
|
|
"""
|
|
from gatehouse_app.extensions import db
|
|
from gatehouse_app.models.ssh_ca.ca import CA, KeyType
|
|
from gatehouse_app.config.ssh_ca_config import get_ssh_ca_config
|
|
from gatehouse_app.utils.crypto import compute_ssh_fingerprint
|
|
import os
|
|
|
|
try:
|
|
existing = CA.query.filter_by(name="system-config-ca").first()
|
|
if existing:
|
|
return existing
|
|
|
|
cfg = get_ssh_ca_config()
|
|
key_path = cfg.get_str("ca_key_path", "").strip()
|
|
pub_key_path = key_path + ".pub"
|
|
|
|
if not os.path.exists(pub_key_path):
|
|
return None
|
|
|
|
with open(pub_key_path) as f:
|
|
pub_key = f.read().strip()
|
|
|
|
# Load private key for the record (encrypt before storing in DB)
|
|
priv_key = ""
|
|
if os.path.exists(key_path):
|
|
with open(key_path) as f:
|
|
raw_priv_key = f.read()
|
|
try:
|
|
from gatehouse_app.utils.ca_key_encryption import encrypt_ca_key
|
|
priv_key = encrypt_ca_key(raw_priv_key)
|
|
except Exception:
|
|
priv_key = raw_priv_key # fallback: store as-is if encryption unavailable
|
|
|
|
fingerprint = compute_ssh_fingerprint(pub_key)
|
|
|
|
# Check by fingerprint in case it was created under a different name
|
|
existing_by_fp = CA.query.filter_by(fingerprint=fingerprint).first()
|
|
if existing_by_fp:
|
|
return existing_by_fp
|
|
|
|
system_ca = CA(
|
|
name="system-config-ca",
|
|
description="Global CA loaded from etc/ssh_ca.conf (ca_key_path)",
|
|
key_type=KeyType.ED25519,
|
|
private_key=priv_key,
|
|
public_key=pub_key,
|
|
fingerprint=fingerprint,
|
|
is_active=True,
|
|
default_cert_validity_hours=24,
|
|
max_cert_validity_hours=720,
|
|
)
|
|
# organization_id is nullable=False in schema — we need a dummy org or
|
|
# need to allow NULL. Use None; the DB constraint will tell us quickly.
|
|
# If the migration enforces NOT NULL we'll catch the error gracefully.
|
|
db.session.add(system_ca)
|
|
db.session.commit()
|
|
return system_ca
|
|
except Exception as exc:
|
|
import logging
|
|
logging.getLogger(__name__).warning(
|
|
f"Could not upsert system-config-ca: {exc}"
|
|
)
|
|
try:
|
|
db.session.rollback()
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _persist_certificate(user_id, ssh_key_id, ca, signing_response, request_ip=None, cert_type_str='user', cert_identity=None):
|
|
"""Save a signed certificate to the ssh_certificates table.
|
|
|
|
Args:
|
|
user_id: UUID of the user
|
|
ssh_key_id: UUID of the SSH key that was signed
|
|
ca: CA model instance (may be None — cert still returned but not persisted)
|
|
signing_response: SSHCertificateSigningResponse
|
|
request_ip: Client IP address
|
|
cert_type_str: 'user' or 'host' (from the sign request)
|
|
cert_identity: Rich OpenSSH key_id string (e.g. "user@host (Name) [org:slug]").
|
|
Falls back to str(ssh_key_id) when not provided.
|
|
|
|
Returns:
|
|
SSHCertificate instance or None if persistence failed
|
|
"""
|
|
if ca is None:
|
|
return None
|
|
|
|
try:
|
|
from gatehouse_app.extensions import db
|
|
from gatehouse_app.models.ssh_ca.ssh_certificate import SSHCertificate, CertificateStatus
|
|
from gatehouse_app.models.ssh_ca.ca import CertType
|
|
|
|
try:
|
|
resolved_cert_type = CertType(cert_type_str)
|
|
except ValueError:
|
|
resolved_cert_type = CertType.USER
|
|
|
|
cert_record = SSHCertificate(
|
|
ca_id=ca.id,
|
|
user_id=user_id,
|
|
ssh_key_id=ssh_key_id,
|
|
certificate=signing_response.certificate,
|
|
serial=signing_response.serial,
|
|
key_id=cert_identity or str(ssh_key_id),
|
|
cert_type=resolved_cert_type,
|
|
principals=signing_response.principals,
|
|
valid_after=signing_response.valid_after,
|
|
valid_before=signing_response.valid_before,
|
|
revoked=False,
|
|
status=CertificateStatus.ISSUED,
|
|
request_ip=request_ip,
|
|
)
|
|
db.session.add(cert_record)
|
|
db.session.commit()
|
|
return cert_record
|
|
except Exception as exc:
|
|
import logging
|
|
logging.getLogger(__name__).warning(
|
|
f"Failed to persist certificate to DB: {exc}"
|
|
)
|
|
try:
|
|
from gatehouse_app.extensions import db as _db
|
|
_db.session.rollback()
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
|
|
def _get_merged_dept_cert_policy(user_id):
|
|
"""Return a merged cert policy view for the given user across all their departments.
|
|
|
|
Rules for merging when a user belongs to multiple departments:
|
|
- ``allow_user_expiry``: True only if ALL departments allow it.
|
|
- ``default_expiry_hours``: minimum across departments (most restrictive).
|
|
- ``max_expiry_hours``: minimum across departments (most restrictive).
|
|
- ``extensions``: intersection — only extensions allowed by ALL departments.
|
|
|
|
Returns a plain dict with keys:
|
|
allow_user_expiry, default_expiry_hours, max_expiry_hours, extensions
|
|
Or None if the user has no department memberships or no policies are configured.
|
|
"""
|
|
from gatehouse_app.models.organization.department import DepartmentMembership
|
|
from gatehouse_app.models.organization.department_cert_policy import DepartmentCertPolicy, STANDARD_EXTENSIONS
|
|
|
|
memberships = DepartmentMembership.query.filter_by(user_id=user_id, deleted_at=None).all()
|
|
dept_ids = [m.department_id for m in memberships if m.department and m.department.deleted_at is None]
|
|
if not dept_ids:
|
|
return None
|
|
|
|
policies = DepartmentCertPolicy.query.filter(
|
|
DepartmentCertPolicy.department_id.in_(dept_ids),
|
|
DepartmentCertPolicy.deleted_at.is_(None),
|
|
).all()
|
|
if not policies:
|
|
return None
|
|
|
|
allow_user_expiry = all(p.allow_user_expiry for p in policies)
|
|
default_expiry_hours = min(p.default_expiry_hours for p in policies)
|
|
max_expiry_hours = min(p.max_expiry_hours for p in policies)
|
|
|
|
# Intersection of all_extensions() across policies
|
|
ext_sets = [set(p.all_extensions()) for p in policies]
|
|
extensions = list(ext_sets[0].intersection(*ext_sets[1:]))
|
|
|
|
return {
|
|
"allow_user_expiry": allow_user_expiry,
|
|
"default_expiry_hours": default_expiry_hours,
|
|
"max_expiry_hours": max_expiry_hours,
|
|
"extensions": extensions,
|
|
}
|
|
|
|
|
|
@ssh_bp.route('/dept-cert-policy', methods=['GET'])
|
|
@login_required
|
|
def get_my_dept_cert_policy():
|
|
"""Return the merged department certificate policy for the current user.
|
|
|
|
Admins always get allow_user_expiry=True so the frontend shows the expiry
|
|
picker for them regardless of the member-facing toggle setting.
|
|
"""
|
|
from gatehouse_app.models.organization.organization_member import OrganizationMember
|
|
from gatehouse_app.models.organization.department_cert_policy import STANDARD_EXTENSIONS
|
|
from gatehouse_app.utils.constants import OrganizationRole
|
|
|
|
user = g.current_user
|
|
user_id = user.id
|
|
|
|
# Check if caller is an org admin/owner
|
|
is_org_admin = OrganizationMember.query.filter(
|
|
OrganizationMember.user_id == user_id,
|
|
OrganizationMember.role.in_(["OWNER", "ADMIN"]),
|
|
OrganizationMember.deleted_at == None,
|
|
).first() is not None
|
|
|
|
policy = _get_merged_dept_cert_policy(user_id)
|
|
if policy is None:
|
|
policy = {
|
|
"allow_user_expiry": is_org_admin, # admins default to True even without a dept policy
|
|
"default_expiry_hours": 1,
|
|
"max_expiry_hours": 24,
|
|
"extensions": list(STANDARD_EXTENSIONS),
|
|
}
|
|
elif is_org_admin:
|
|
# Override allow_user_expiry for admins — they can always pick
|
|
policy = {**policy, "allow_user_expiry": True}
|
|
|
|
return api_response(data={"policy": policy}, message="Certificate policy retrieved")
|
|
|
|
|
|
@ssh_bp.route('/keys', methods=['GET'])
|
|
@login_required
|
|
def list_ssh_keys():
|
|
"""Get all SSH keys for current user."""
|
|
user_id = g.current_user.id
|
|
|
|
keys = ssh_key_service.get_user_ssh_keys(user_id)
|
|
return api_response(
|
|
data={
|
|
'keys': [k.to_dict() for k in keys],
|
|
'count': len(keys),
|
|
},
|
|
message="SSH keys retrieved successfully"
|
|
)
|
|
|
|
|
|
@ssh_bp.route('/keys', methods=['POST'])
|
|
@login_required
|
|
def add_ssh_key():
|
|
"""Add a new SSH public key for current user."""
|
|
user_id = g.current_user.id
|
|
|
|
data = request.get_json()
|
|
if not data:
|
|
return api_response(success=False, message='No JSON data provided', status=400, error_type='BAD_REQUEST')
|
|
|
|
public_key = data.get('public_key') or data.get('key')
|
|
description = data.get('description')
|
|
|
|
if not public_key:
|
|
return api_response(success=False, message='public_key is required', status=400, error_type='BAD_REQUEST')
|
|
|
|
try:
|
|
ssh_key = ssh_key_service.add_ssh_key(
|
|
user_id=user_id,
|
|
public_key=public_key,
|
|
description=description,
|
|
)
|
|
|
|
AuditLog.log(
|
|
action=AuditAction.SSH_KEY_ADDED,
|
|
user_id=user_id,
|
|
resource_type='SSHKey',
|
|
resource_id=ssh_key.id,
|
|
ip_address=request.remote_addr,
|
|
)
|
|
|
|
return api_response(success=True, message='SSH key added', data=ssh_key.to_dict(), status=201)
|
|
|
|
except SSHKeyAlreadyExistsError as e:
|
|
return api_response(success=False, message=e.message, status=409, error_type='SSH_KEY_ALREADY_EXISTS')
|
|
except IntegrityError:
|
|
return api_response(success=False, message='SSH key already exists', status=409, error_type='SSH_KEY_ALREADY_EXISTS')
|
|
except SSHKeyError as e:
|
|
return api_response(success=False, message=str(e), status=400, error_type='SSH_KEY_ERROR')
|
|
except ValidationError as e:
|
|
return api_response(success=False, message=str(e), status=400, error_type='VALIDATION_ERROR')
|
|
|
|
|
|
@ssh_bp.route('/keys/<key_id>', methods=['GET'])
|
|
@login_required
|
|
def get_ssh_key(key_id):
|
|
"""Get a specific SSH key."""
|
|
user_id = g.current_user.id
|
|
|
|
try:
|
|
ssh_key = ssh_key_service.get_ssh_key(key_id)
|
|
|
|
if ssh_key.user_id != user_id:
|
|
return api_response(success=False, message='Forbidden', status=403, error_type='FORBIDDEN')
|
|
|
|
return api_response(success=True, message='SSH key retrieved', data=ssh_key.to_dict(), status=200)
|
|
|
|
except SSHKeyNotFoundError:
|
|
return api_response(success=False, message='SSH key not found', status=404, error_type='NOT_FOUND')
|
|
|
|
|
|
@ssh_bp.route('/keys/<key_id>', methods=['DELETE'])
|
|
@login_required
|
|
def delete_ssh_key(key_id):
|
|
"""Delete an SSH key."""
|
|
user_id = g.current_user.id
|
|
|
|
try:
|
|
ssh_key = ssh_key_service.get_ssh_key(key_id)
|
|
|
|
if ssh_key.user_id != user_id:
|
|
return api_response(success=False, message='Forbidden', status=403, error_type='FORBIDDEN')
|
|
|
|
ssh_key_service.delete_ssh_key(key_id)
|
|
|
|
AuditLog.log(
|
|
action=AuditAction.SSH_KEY_DELETED,
|
|
user_id=user_id,
|
|
resource_type='SSHKey',
|
|
resource_id=key_id,
|
|
ip_address=request.remote_addr,
|
|
)
|
|
|
|
return api_response(success=True, message='SSH key deleted', data={'status': 'deleted'}, status=200)
|
|
|
|
except SSHKeyNotFoundError:
|
|
return api_response(success=False, message='SSH key not found', status=404, error_type='NOT_FOUND')
|
|
|
|
|
|
@ssh_bp.route('/keys/<key_id>/verify', methods=['GET', 'POST'])
|
|
@login_required
|
|
def verify_ssh_key(key_id):
|
|
"""Generate or verify SSH key ownership challenge."""
|
|
user_id = g.current_user.id
|
|
|
|
try:
|
|
ssh_key = ssh_key_service.get_ssh_key(key_id)
|
|
|
|
if ssh_key.user_id != user_id:
|
|
return api_response(success=False, message='Forbidden', status=403, error_type='FORBIDDEN')
|
|
|
|
# GET — return a fresh challenge
|
|
if request.method == 'GET':
|
|
challenge = ssh_key_service.generate_verification_challenge(key_id)
|
|
return api_response(success=True, message='Challenge generated', data={
|
|
'challenge_text': challenge,
|
|
'validationText': challenge,
|
|
'key_id': key_id,
|
|
}, status=200)
|
|
|
|
# POST — verify signature or generate challenge
|
|
data = request.get_json() or {}
|
|
action = data.get('action', 'verify_signature')
|
|
|
|
if action == 'verify_signature':
|
|
signature = data.get('signature')
|
|
if not signature:
|
|
return api_response(success=False, message='signature is required', status=400, error_type='BAD_REQUEST')
|
|
|
|
try:
|
|
verified = ssh_key_service.verify_ssh_key_ownership(key_id, signature)
|
|
|
|
AuditLog.log(
|
|
action=AuditAction.SSH_KEY_VERIFIED,
|
|
user_id=user_id,
|
|
resource_type='SSHKey',
|
|
resource_id=key_id,
|
|
ip_address=request.remote_addr,
|
|
success=verified,
|
|
)
|
|
|
|
return api_response(success=True, message='Verification complete', data={'verified': verified}, status=200)
|
|
|
|
except Exception as e:
|
|
AuditLog.log(
|
|
action=AuditAction.SSH_KEY_VALIDATION_FAILED,
|
|
user_id=user_id,
|
|
resource_type='SSHKey',
|
|
resource_id=key_id,
|
|
ip_address=request.remote_addr,
|
|
success=False,
|
|
error_message=str(e),
|
|
)
|
|
return api_response(success=False, message=str(e), status=400, error_type='VERIFICATION_FAILED')
|
|
|
|
else: # generate_challenge
|
|
challenge = ssh_key_service.generate_verification_challenge(key_id)
|
|
return api_response(success=True, message='Challenge generated', data={
|
|
'challenge_text': challenge,
|
|
'challenge': challenge,
|
|
}, status=200)
|
|
|
|
except SSHKeyNotFoundError:
|
|
return api_response(success=False, message='SSH key not found', status=404, error_type='NOT_FOUND')
|
|
|
|
|
|
@ssh_bp.route('/keys/<key_id>/update-description', methods=['PATCH'])
|
|
@login_required
|
|
def update_ssh_key_description(key_id):
|
|
"""Update SSH key description."""
|
|
user_id = g.current_user.id
|
|
|
|
data = request.get_json()
|
|
if not data or 'description' not in data:
|
|
return api_response(success=False, message='description is required', status=400, error_type='BAD_REQUEST')
|
|
|
|
try:
|
|
ssh_key = ssh_key_service.get_ssh_key(key_id)
|
|
|
|
if ssh_key.user_id != user_id:
|
|
return api_response(success=False, message='Forbidden', status=403, error_type='FORBIDDEN')
|
|
|
|
updated_key = ssh_key_service.update_ssh_key_description(key_id, data['description'])
|
|
|
|
return api_response(success=True, message='Description updated', data=updated_key.to_dict(), status=200)
|
|
|
|
except SSHKeyNotFoundError:
|
|
return api_response(success=False, message='SSH key not found', status=404, error_type='NOT_FOUND')
|
|
|
|
|
|
@ssh_bp.route('/sign', methods=['POST'])
|
|
@login_required
|
|
def sign_certificate():
|
|
"""Sign an SSH certificate for the current user."""
|
|
user = g.current_user
|
|
user_id = user.id
|
|
|
|
# ── Check account suspension ──────────────────────────────────────────────
|
|
from gatehouse_app.utils.constants import UserStatus
|
|
if user.status in (UserStatus.SUSPENDED, UserStatus.COMPLIANCE_SUSPENDED):
|
|
return api_response(
|
|
success=False,
|
|
message="Your account is suspended. Contact an administrator.",
|
|
status=403,
|
|
error_type="ACCOUNT_SUSPENDED",
|
|
)
|
|
|
|
data = request.get_json()
|
|
if not data:
|
|
return api_response(success=False, message="No JSON data provided", status=400, error_type="BAD_REQUEST")
|
|
|
|
requested_principals = data.get('principals') or []
|
|
cert_type = data.get('cert_type', 'user')
|
|
key_id = data.get('key_id') or data.get('cert_id')
|
|
expiry_hours = data.get('expiry_hours')
|
|
|
|
# ── Log the request ───────────────────────────────────────────────────────
|
|
AuditLog.log(
|
|
action=AuditAction.SSH_CERT_REQUESTED,
|
|
user_id=user_id,
|
|
resource_type='SSHCertificate',
|
|
ip_address=request.remote_addr,
|
|
description=(
|
|
f'{user.email} requested a certificate'
|
|
+ (f' for principals: {", ".join(requested_principals)}' if requested_principals else '')
|
|
),
|
|
)
|
|
|
|
# ── Resolve which principals the user is allowed to use ──────────────────
|
|
from gatehouse_app.models.organization.organization_member import OrganizationMember
|
|
from gatehouse_app.models.organization.principal import Principal, PrincipalMembership
|
|
from gatehouse_app.models.organization.department import DepartmentMembership, DepartmentPrincipal
|
|
from gatehouse_app.utils.constants import OrganizationRole
|
|
|
|
allowed_principal_names = set()
|
|
|
|
memberships = OrganizationMember.query.filter_by(user_id=user_id).all()
|
|
for om in memberships:
|
|
org = om.organization
|
|
if not org or org.deleted_at is not None:
|
|
continue
|
|
role = om.role
|
|
if role in (OrganizationRole.ADMIN, OrganizationRole.OWNER):
|
|
# Admin/owner can use any principal in the org
|
|
for p in Principal.query.filter_by(organization_id=org.id, deleted_at=None).all():
|
|
allowed_principal_names.add(p.name)
|
|
else:
|
|
# Direct memberships
|
|
for pm in PrincipalMembership.query.filter_by(user_id=user_id, deleted_at=None).all():
|
|
if pm.principal and pm.principal.organization_id == org.id and pm.principal.deleted_at is None:
|
|
allowed_principal_names.add(pm.principal.name)
|
|
# Via department
|
|
for dm in DepartmentMembership.query.filter_by(user_id=user_id, deleted_at=None).all():
|
|
if dm.department and dm.department.organization_id == org.id and dm.department.deleted_at is None:
|
|
for dp in DepartmentPrincipal.query.filter_by(department_id=dm.department_id, deleted_at=None).all():
|
|
if dp.principal and dp.principal.deleted_at is None:
|
|
allowed_principal_names.add(dp.principal.name)
|
|
|
|
# ── Determine final principals list ─────────────────────────────────────
|
|
if not requested_principals:
|
|
# Auto-resolve: use all principals the user is assigned to
|
|
principals = list(allowed_principal_names)
|
|
if not principals:
|
|
return api_response(
|
|
success=False,
|
|
message="You have no principals assigned. Ask an admin to add you to a principal.",
|
|
status=400,
|
|
error_type="NO_PRINCIPALS",
|
|
)
|
|
else:
|
|
# Validate each requested principal is within the user's allowed set
|
|
invalid = [p for p in requested_principals if p not in allowed_principal_names]
|
|
if invalid:
|
|
return api_response(
|
|
success=False,
|
|
message=f"You are not authorised to request principals: {', '.join(invalid)}",
|
|
status=403,
|
|
error_type="UNAUTHORIZED_PRINCIPALS",
|
|
)
|
|
principals = requested_principals
|
|
|
|
# ── Key resolution ────────────────────────────────────────────────────────
|
|
if not key_id:
|
|
verified_keys = ssh_key_service.get_user_verified_ssh_keys(user_id)
|
|
if not verified_keys:
|
|
return api_response(
|
|
success=False,
|
|
message="No verified SSH keys found. Verify a key before requesting a certificate.",
|
|
status=400,
|
|
error_type="NO_VERIFIED_KEYS",
|
|
)
|
|
key_id = verified_keys[0].id
|
|
|
|
try:
|
|
ssh_key = ssh_key_service.get_ssh_key(key_id)
|
|
except SSHKeyNotFoundError:
|
|
return api_response(success=False, message="SSH key not found", status=404, error_type="NOT_FOUND")
|
|
|
|
if ssh_key.user_id != user_id:
|
|
return api_response(success=False, message="Forbidden", status=403, error_type="FORBIDDEN")
|
|
|
|
if not ssh_key.verified:
|
|
return api_response(
|
|
success=False,
|
|
message="SSH key is not verified. Verify it before requesting a certificate.",
|
|
status=400,
|
|
error_type="KEY_NOT_VERIFIED",
|
|
)
|
|
|
|
db_ca = _get_org_ca_for_user(user, ca_type=cert_type)
|
|
if db_ca is None:
|
|
return api_response(
|
|
success=False,
|
|
message=(
|
|
"No active Certificate Authority is configured for your organization. "
|
|
"An admin must generate a CA on the Certificate Authorities page before "
|
|
"certificates can be issued."
|
|
),
|
|
status=503,
|
|
error_type="CA_NOT_CONFIGURED",
|
|
)
|
|
|
|
# Determine if the caller is an org admin/owner (admins can always choose expiry)
|
|
is_org_admin = any(
|
|
om.role in (OrganizationRole.ADMIN, OrganizationRole.OWNER)
|
|
for om in memberships
|
|
if om.organization and om.organization.deleted_at is None
|
|
)
|
|
|
|
# ── Apply department certificate policy ───────────────────────────────────
|
|
dept_policy = _get_merged_dept_cert_policy(user_id)
|
|
if dept_policy:
|
|
if is_org_admin:
|
|
# Admins can always choose their own expiry, but still capped at dept max
|
|
if expiry_hours is not None:
|
|
expiry_hours = min(int(expiry_hours), dept_policy["max_expiry_hours"])
|
|
elif not dept_policy["allow_user_expiry"]:
|
|
# Regular members: ignore user-requested expiry; use dept default
|
|
expiry_hours = dept_policy["default_expiry_hours"]
|
|
else:
|
|
# Regular members allowed to pick, cap at dept maximum
|
|
if expiry_hours is not None:
|
|
expiry_hours = min(int(expiry_hours), dept_policy["max_expiry_hours"])
|
|
policy_extensions = dept_policy["extensions"]
|
|
else:
|
|
policy_extensions = None # let signing service use its own defaults
|
|
|
|
# ── Build rich key_id identity for the OpenSSH cert ─────────────────────
|
|
# This appears in `ssh-keygen -L -f cert.pub` as the Key ID field and
|
|
# is stored in the DB cert record so it's auditable.
|
|
org_slugs = sorted({
|
|
om.organization.slug
|
|
for om in memberships
|
|
if om.organization and om.organization.deleted_at is None
|
|
and getattr(om.organization, 'slug', None)
|
|
})
|
|
org_slug = org_slugs[0] if org_slugs else "unknown"
|
|
full_name = getattr(user, 'full_name', None) or getattr(user, 'name', None) or "unknown"
|
|
cert_identity = f"{user.email} ({full_name}) [org:{org_slug}]"
|
|
|
|
signing_request = SSHCertificateSigningRequest(
|
|
ssh_public_key=ssh_key.payload,
|
|
principals=principals,
|
|
cert_type=cert_type,
|
|
key_id=cert_identity,
|
|
expiry_hours=int(expiry_hours) if expiry_hours else None,
|
|
extensions=policy_extensions,
|
|
)
|
|
validation_errors = signing_request.validate()
|
|
if validation_errors:
|
|
return api_response(
|
|
success=False,
|
|
message="Invalid signing request",
|
|
status=400,
|
|
error_type="VALIDATION_ERROR",
|
|
error_details={"errors": validation_errors},
|
|
)
|
|
|
|
try:
|
|
from gatehouse_app.utils.ca_key_encryption import decrypt_ca_key
|
|
ca_private_key_pem = decrypt_ca_key(db_ca.private_key)
|
|
response = ssh_ca_service.sign_certificate(
|
|
signing_request, ca_private_key=ca_private_key_pem, ca_obj=db_ca
|
|
)
|
|
except SSHCertificateError as e:
|
|
AuditLog.log(
|
|
action=AuditAction.SSH_CERT_FAILED,
|
|
user_id=user_id,
|
|
resource_type='SSHCertificate',
|
|
ip_address=request.remote_addr,
|
|
success=False,
|
|
error_message=str(e),
|
|
)
|
|
return api_response(success=False, message=str(e), status=400, error_type="SIGNING_FAILED")
|
|
except Exception as e:
|
|
AuditLog.log(
|
|
action=AuditAction.SSH_CERT_FAILED,
|
|
user_id=user_id,
|
|
resource_type='SSHCertificate',
|
|
ip_address=request.remote_addr,
|
|
success=False,
|
|
error_message=str(e),
|
|
)
|
|
return api_response(success=False, message="Certificate signing failed", status=500, error_type="SERVER_ERROR")
|
|
|
|
cert_record = _persist_certificate(
|
|
user_id=user_id,
|
|
ssh_key_id=key_id,
|
|
ca=db_ca,
|
|
signing_response=response,
|
|
request_ip=request.remote_addr,
|
|
cert_type_str=cert_type,
|
|
cert_identity=cert_identity,
|
|
)
|
|
|
|
AuditLog.log(
|
|
action=AuditAction.SSH_CERT_ISSUED,
|
|
user_id=user_id,
|
|
resource_type='SSHCertificate',
|
|
resource_id=cert_record.id if cert_record else key_id,
|
|
ip_address=request.remote_addr,
|
|
description=(
|
|
f'Certificate serial={response.serial} issued for {user.email}; '
|
|
f'principals: {", ".join(principals)}'
|
|
),
|
|
extra_data={
|
|
'serial': response.serial,
|
|
'key_id': cert_identity,
|
|
'principals': principals,
|
|
'ca_id': str(db_ca.id),
|
|
'ssh_key_id': str(key_id),
|
|
},
|
|
)
|
|
|
|
if cert_record:
|
|
CertificateAuditLog.log(
|
|
certificate_id=cert_record.id,
|
|
action='issued',
|
|
user_id=user_id,
|
|
ip_address=request.remote_addr,
|
|
user_agent=request.headers.get('User-Agent'),
|
|
message=(
|
|
f'Certificate serial={response.serial} issued for {user.email}; '
|
|
f'principals: {", ".join(principals)}'
|
|
),
|
|
extra_data={
|
|
'serial': response.serial,
|
|
'key_id': cert_identity,
|
|
'principals': principals,
|
|
'ca_id': str(db_ca.id),
|
|
'ssh_key_id': str(key_id),
|
|
'valid_after': response.valid_after.isoformat() if response.valid_after else None,
|
|
'valid_before': response.valid_before.isoformat() if response.valid_before else None,
|
|
},
|
|
success=True,
|
|
)
|
|
|
|
result = {
|
|
'certificate': response.certificate,
|
|
'serial': response.serial,
|
|
'principals': response.principals,
|
|
'valid_after': response.valid_after.isoformat() if response.valid_after else None,
|
|
'valid_before': response.valid_before.isoformat() if response.valid_before else None,
|
|
}
|
|
if cert_record:
|
|
result['cert_id'] = str(cert_record.id)
|
|
|
|
return api_response(data=result, message="Certificate signed successfully", status=201)
|
|
|
|
|
|
@ssh_bp.route('/certificates', methods=['GET'])
|
|
@login_required
|
|
def list_certificates():
|
|
"""List all SSH certificates issued for the current user."""
|
|
user_id = g.current_user.id
|
|
|
|
try:
|
|
from gatehouse_app.models.ssh_ca.ssh_certificate import SSHCertificate
|
|
certs = (
|
|
SSHCertificate.query
|
|
.filter_by(user_id=user_id, deleted_at=None)
|
|
.order_by(SSHCertificate.created_at.desc())
|
|
.all()
|
|
)
|
|
return api_response(
|
|
data={
|
|
'certificates': [c.to_dict() for c in certs],
|
|
'count': len(certs),
|
|
},
|
|
message="Certificates retrieved successfully"
|
|
)
|
|
except Exception as e:
|
|
return api_response(
|
|
success=False,
|
|
message=str(e),
|
|
status=500,
|
|
error_type='INTERNAL_ERROR'
|
|
)
|
|
|
|
|
|
@ssh_bp.route('/certificates/<cert_id>', methods=['GET'])
|
|
@login_required
|
|
def get_certificate(cert_id):
|
|
"""Get a specific issued certificate (metadata only)."""
|
|
user_id = g.current_user.id
|
|
|
|
try:
|
|
from gatehouse_app.models.ssh_ca.ssh_certificate import SSHCertificate
|
|
cert = SSHCertificate.query.filter_by(id=cert_id, deleted_at=None).first()
|
|
if not cert:
|
|
return api_response(success=False, message='Certificate not found', status=404, error_type='NOT_FOUND')
|
|
if cert.user_id != user_id:
|
|
return api_response(success=False, message='Forbidden', status=403, error_type='FORBIDDEN')
|
|
data = cert.to_dict()
|
|
data['certificate'] = cert.certificate
|
|
return api_response(success=True, message='Certificate retrieved', data=data, status=200)
|
|
except Exception as e:
|
|
return api_response(success=False, message=str(e), status=500, error_type='INTERNAL_ERROR')
|
|
|
|
|
|
@ssh_bp.route('/certificates/<cert_id>/revoke', methods=['POST'])
|
|
@login_required
|
|
def revoke_certificate(cert_id):
|
|
"""Revoke an issued certificate."""
|
|
user_id = g.current_user.id
|
|
|
|
data = request.get_json() or {}
|
|
reason = data.get('reason', 'User requested revocation')
|
|
|
|
try:
|
|
from gatehouse_app.models.ssh_ca.ssh_certificate import SSHCertificate
|
|
cert = SSHCertificate.query.filter_by(id=cert_id, deleted_at=None).first()
|
|
if not cert:
|
|
return api_response(success=False, message='Certificate not found', status=404, error_type='NOT_FOUND')
|
|
if cert.user_id != user_id:
|
|
return api_response(success=False, message='Forbidden', status=403, error_type='FORBIDDEN')
|
|
if cert.revoked:
|
|
return api_response(success=False, message='Certificate is already revoked', status=409, error_type='ALREADY_REVOKED')
|
|
|
|
cert.revoke(reason=reason)
|
|
|
|
AuditLog.log(
|
|
action=AuditAction.SSH_CERT_REVOKED,
|
|
user_id=user_id,
|
|
resource_type='SSHCertificate',
|
|
resource_id=cert_id,
|
|
ip_address=request.remote_addr,
|
|
description=f'Revoked: {reason}',
|
|
)
|
|
|
|
CertificateAuditLog.log(
|
|
certificate_id=cert_id,
|
|
action='revoked',
|
|
user_id=user_id,
|
|
ip_address=request.remote_addr,
|
|
user_agent=request.headers.get('User-Agent'),
|
|
message=f'Certificate revoked: {reason}',
|
|
success=True,
|
|
)
|
|
|
|
return api_response(
|
|
success=True,
|
|
message='Certificate revoked successfully',
|
|
data={'status': 'revoked', 'cert_id': cert_id, 'reason': reason},
|
|
status=200,
|
|
)
|
|
except Exception as e:
|
|
return api_response(success=False, message=str(e), status=500, error_type='INTERNAL_ERROR')
|
|
|
|
|
|
@ssh_bp.route('/ca/public-key', methods=['GET'])
|
|
@login_required
|
|
def get_ca_public_key():
|
|
"""
|
|
Return the CA public key for this user's organization.
|
|
|
|
Server admins should add this key to their host's ``TrustedUserCAKeys``
|
|
directive so that certificates issued by gatehouse are trusted.
|
|
|
|
Query parameters:
|
|
ca_type: 'user' (default) or 'host' — which CA's public key to return
|
|
format: 'openssh' (default) or 'text' — affects Content-Type only
|
|
|
|
Returns:
|
|
{ "public_key": "ssh-ed25519 AAAA...",
|
|
"fingerprint": "SHA256:...",
|
|
"ca_name": "..." }
|
|
"""
|
|
user = g.current_user
|
|
ca_type = request.args.get("ca_type", "user")
|
|
if ca_type not in ("user", "host"):
|
|
return api_response(
|
|
success=False,
|
|
message="ca_type must be 'user' or 'host'",
|
|
status=400,
|
|
error_type="BAD_REQUEST",
|
|
)
|
|
|
|
db_ca = _get_org_ca_for_user(user, ca_type=ca_type)
|
|
if db_ca:
|
|
return api_response(
|
|
data={
|
|
'public_key': db_ca.public_key,
|
|
'fingerprint': db_ca.fingerprint,
|
|
'ca_name': db_ca.name,
|
|
'ca_type': ca_type,
|
|
'source': 'db',
|
|
},
|
|
message="CA public key retrieved successfully"
|
|
)
|
|
|
|
return api_response(
|
|
success=False,
|
|
message=(
|
|
f"No {ca_type} CA is configured for your organization. "
|
|
"An admin must generate one on the Certificate Authorities page."
|
|
),
|
|
status=404,
|
|
error_type="CA_NOT_CONFIGURED",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CA Permissions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@ssh_bp.route('/ca/<ca_id>/permissions', methods=['GET'])
|
|
@login_required
|
|
def list_ca_permissions(ca_id):
|
|
"""List permissions for a Certificate Authority.
|
|
|
|
Returns:
|
|
200: { ca_id, permissions: [...], open_to_all: bool }
|
|
403: Not admin/owner
|
|
404: CA not found
|
|
"""
|
|
from gatehouse_app.models.ssh_ca.ca import CA, CAPermission
|
|
from gatehouse_app.models.organization.organization_member import OrganizationMember
|
|
from gatehouse_app.utils.constants import OrganizationRole
|
|
|
|
user = g.current_user
|
|
|
|
ca = CA.query.filter_by(id=ca_id, deleted_at=None).first()
|
|
if not ca:
|
|
return api_response(success=False, message="CA not found", status=404, error_type="NOT_FOUND")
|
|
|
|
# Verify user is admin/owner of the CA's org
|
|
if ca.organization_id:
|
|
membership = OrganizationMember.query.filter_by(
|
|
organization_id=ca.organization_id,
|
|
user_id=user.id,
|
|
deleted_at=None,
|
|
).first()
|
|
if not membership or membership.role not in (OrganizationRole.ADMIN, OrganizationRole.OWNER):
|
|
return api_response(success=False, message="Admin access required", status=403, error_type="FORBIDDEN")
|
|
|
|
perms = CAPermission.query.filter_by(ca_id=ca_id, deleted_at=None).all()
|
|
perm_list = []
|
|
for p in perms:
|
|
d = p.to_dict()
|
|
d["user_email"] = p.user.email if p.user else None
|
|
perm_list.append(d)
|
|
|
|
return api_response(
|
|
data={
|
|
"ca_id": ca_id,
|
|
"permissions": perm_list,
|
|
"open_to_all": len(perms) == 0,
|
|
},
|
|
message="CA permissions retrieved",
|
|
)
|
|
|
|
|
|
@ssh_bp.route('/ca/<ca_id>/permissions', methods=['POST'])
|
|
@login_required
|
|
def add_ca_permission(ca_id):
|
|
"""Grant a user permission on a Certificate Authority.
|
|
|
|
Request body:
|
|
user_id: UUID of the user to grant access
|
|
permission: "sign" or "admin" (default: "sign")
|
|
|
|
Returns:
|
|
201: Permission granted
|
|
400: Validation error
|
|
403: Not admin/owner
|
|
404: CA or user not found
|
|
409: Permission already exists
|
|
"""
|
|
from gatehouse_app.models.ssh_ca.ca import CA, CAPermission
|
|
from gatehouse_app.models.organization.organization_member import OrganizationMember
|
|
from gatehouse_app.models.user import User
|
|
from gatehouse_app.utils.constants import OrganizationRole, AuditAction
|
|
from gatehouse_app.models import AuditLog
|
|
from gatehouse_app.extensions import db
|
|
|
|
user = g.current_user
|
|
|
|
ca = CA.query.filter_by(id=ca_id, deleted_at=None).first()
|
|
if not ca:
|
|
return api_response(success=False, message="CA not found", status=404, error_type="NOT_FOUND")
|
|
|
|
# Verify user is admin/owner of the CA's org
|
|
if ca.organization_id:
|
|
membership = OrganizationMember.query.filter_by(
|
|
organization_id=ca.organization_id,
|
|
user_id=user.id,
|
|
deleted_at=None,
|
|
).first()
|
|
if not membership or membership.role not in (OrganizationRole.ADMIN, OrganizationRole.OWNER):
|
|
return api_response(success=False, message="Admin access required", status=403, error_type="FORBIDDEN")
|
|
|
|
data = request.get_json() or {}
|
|
target_user_id = (data.get("user_id") or "").strip()
|
|
permission = data.get("permission", "sign")
|
|
|
|
if not target_user_id:
|
|
return api_response(success=False, message="user_id is required", status=400, error_type="VALIDATION_ERROR")
|
|
if permission not in ("sign", "admin"):
|
|
return api_response(
|
|
success=False,
|
|
message="permission must be 'sign' or 'admin'",
|
|
status=400,
|
|
error_type="VALIDATION_ERROR",
|
|
)
|
|
|
|
target_user = User.query.filter_by(id=target_user_id, deleted_at=None).first()
|
|
if not target_user:
|
|
return api_response(success=False, message="User not found", status=404, error_type="NOT_FOUND")
|
|
|
|
# Check for duplicate
|
|
existing = CAPermission.query.filter_by(
|
|
ca_id=ca_id, user_id=target_user_id, deleted_at=None
|
|
).first()
|
|
if existing:
|
|
# Update permission level if different
|
|
if existing.permission != permission:
|
|
existing.permission = permission
|
|
db.session.commit()
|
|
d = existing.to_dict()
|
|
d["user_email"] = target_user.email
|
|
return api_response(
|
|
data={"message": "Permission updated", "permission": d},
|
|
message="Permission updated",
|
|
)
|
|
return api_response(
|
|
success=False,
|
|
message="User already has this permission on the CA",
|
|
status=409,
|
|
error_type="DUPLICATE",
|
|
)
|
|
|
|
perm = CAPermission(
|
|
ca_id=ca_id,
|
|
user_id=target_user_id,
|
|
permission=permission,
|
|
)
|
|
db.session.add(perm)
|
|
db.session.commit()
|
|
|
|
AuditLog.log(
|
|
action=AuditAction.CA_UPDATED,
|
|
user_id=user.id,
|
|
resource_type="CAPermission",
|
|
resource_id=perm.id,
|
|
ip_address=request.remote_addr,
|
|
description=f"Granted '{permission}' on CA '{ca.name}' to user {target_user.email}",
|
|
)
|
|
|
|
d = perm.to_dict()
|
|
d["user_email"] = target_user.email
|
|
return api_response(
|
|
data={"message": "Permission granted", "permission": d},
|
|
message="Permission granted",
|
|
status=201,
|
|
)
|
|
|
|
|
|
@ssh_bp.route('/ca/<ca_id>/permissions/<target_user_id>', methods=['DELETE'])
|
|
@login_required
|
|
def remove_ca_permission(ca_id, target_user_id):
|
|
"""Revoke a user's permission on a Certificate Authority.
|
|
|
|
Returns:
|
|
200: Permission revoked
|
|
403: Not admin/owner
|
|
404: CA or permission not found
|
|
"""
|
|
from gatehouse_app.models.ssh_ca.ca import CA, CAPermission
|
|
from gatehouse_app.models.organization.organization_member import OrganizationMember
|
|
from gatehouse_app.utils.constants import OrganizationRole, AuditAction
|
|
from gatehouse_app.models import AuditLog
|
|
from gatehouse_app.extensions import db
|
|
|
|
user = g.current_user
|
|
|
|
ca = CA.query.filter_by(id=ca_id, deleted_at=None).first()
|
|
if not ca:
|
|
return api_response(success=False, message="CA not found", status=404, error_type="NOT_FOUND")
|
|
|
|
# Verify user is admin/owner of the CA's org
|
|
if ca.organization_id:
|
|
membership = OrganizationMember.query.filter_by(
|
|
organization_id=ca.organization_id,
|
|
user_id=user.id,
|
|
deleted_at=None,
|
|
).first()
|
|
if not membership or membership.role not in (OrganizationRole.ADMIN, OrganizationRole.OWNER):
|
|
return api_response(success=False, message="Admin access required", status=403, error_type="FORBIDDEN")
|
|
|
|
perm = CAPermission.query.filter_by(
|
|
ca_id=ca_id, user_id=target_user_id, deleted_at=None
|
|
).first()
|
|
if not perm:
|
|
return api_response(success=False, message="Permission not found", status=404, error_type="NOT_FOUND")
|
|
|
|
perm.delete(soft=True)
|
|
|
|
AuditLog.log(
|
|
action=AuditAction.CA_UPDATED,
|
|
user_id=user.id,
|
|
resource_type="CAPermission",
|
|
resource_id=perm.id,
|
|
ip_address=request.remote_addr,
|
|
description=f"Revoked permission on CA '{ca.name}' from user {target_user_id}",
|
|
)
|
|
|
|
return api_response(
|
|
data={},
|
|
message="Permission revoked",
|
|
)
|
|
|