diff --git a/gatehouse_app/api/v1/departments.py b/gatehouse_app/api/v1/departments.py index 155e465..73ea83d 100644 --- a/gatehouse_app/api/v1/departments.py +++ b/gatehouse_app/api/v1/departments.py @@ -520,3 +520,43 @@ def remove_department_member(org_id, dept_id, user_id): return api_response( message="Member removed successfully", ) + + +@api_v1_bp.route("/organizations//departments//principals", methods=["GET"]) +@login_required +@full_access_required +def get_department_principals(org_id, dept_id): + """Get all principals linked to a department.""" + org = OrganizationService.get_organization_by_id(org_id) + + if not org.is_member(g.current_user.id): + return api_response( + success=False, + message="You are not a member of this organization", + status=403, + error_type="AUTHORIZATION_ERROR", + ) + + dept = Department.query.filter_by( + id=dept_id, + organization_id=org_id, + deleted_at=None + ).first() + + if not dept: + return api_response( + success=False, + message="Department not found", + status=404, + error_type="NOT_FOUND", + ) + + principals = dept.get_principals(active_only=True) + + return api_response( + data={ + "principals": [p.to_dict() for p in principals], + "count": len(principals), + }, + message="Principals retrieved successfully", + ) diff --git a/gatehouse_app/api/v1/organizations.py b/gatehouse_app/api/v1/organizations.py index 9826f06..8fdc56c 100644 --- a/gatehouse_app/api/v1/organizations.py +++ b/gatehouse_app/api/v1/organizations.py @@ -1,5 +1,5 @@ """Organization endpoints.""" -from flask import g, request +from flask import g, request, current_app from marshmallow import ValidationError from gatehouse_app.api.v1 import api_v1_bp from gatehouse_app.utils.response import api_response @@ -13,6 +13,7 @@ from gatehouse_app.schemas.organization_schema import ( from gatehouse_app.services.organization_service import OrganizationService from gatehouse_app.services.user_service import UserService from gatehouse_app.utils.constants import OrganizationRole +from gatehouse_app.extensions import db @api_v1_bp.route("/organizations", methods=["POST"]) @login_required @full_access_required @@ -930,3 +931,377 @@ def get_my_audit_logs(): }, message="Activity retrieved", ) + + + +@api_v1_bp.route("/organizations//roles", methods=["GET"]) +@login_required +def list_organization_roles(org_id): + """List the available roles for an organization. + + Returns the canonical set of OrganizationRole values together with every + current member assigned to each role. + + Returns: + 200: roles list with member counts + 401: Not authenticated + 404: Organization not found + """ + from gatehouse_app.models.organization import Organization + from gatehouse_app.models.organization_member import OrganizationMember + + org = Organization.query.filter_by(id=org_id, deleted_at=None).first() + if not org: + return api_response(success=False, message="Organization not found", status=404, error_type="NOT_FOUND") + + # Load all active members grouped by role + members = OrganizationMember.query.filter_by(organization_id=org_id, deleted_at=None).all() + by_role: dict = {r.value: [] for r in OrganizationRole} + for m in members: + role_key = m.role.value if hasattr(m.role, "value") else str(m.role) + if role_key in by_role: + by_role[role_key].append({ + "user_id": m.user_id, + "email": m.user.email if m.user else None, + "full_name": m.user.full_name if m.user else None, + "joined_at": m.created_at.isoformat() if m.created_at else None, + }) + + roles = [ + { + "role": r.value, + "member_count": len(by_role[r.value]), + "members": by_role[r.value], + } + for r in OrganizationRole + ] + return api_response(data={"roles": roles, "organization_id": org_id}, message="Roles retrieved") + + +@api_v1_bp.route("/organizations//roles//members", methods=["POST"]) +@login_required +@require_admin +def assign_role_to_member(org_id, role_name): + """Assign a role to a user in the organization (admin/owner only). + + This is a convenience endpoint equivalent to PATCH + /organizations//members//role but driven by role name. + + Request body: + user_id – UUID of the member to assign + + Returns: + 200: Role assigned + 400: Invalid role / missing user_id + 403: Not an admin/owner + 404: Org or member not found + """ + from gatehouse_app.models.organization_member import OrganizationMember + from gatehouse_app.extensions import db + + try: + new_role = OrganizationRole(role_name.lower()) + except ValueError: + valid = [r.value for r in OrganizationRole] + return api_response(success=False, message=f"Invalid role. Must be one of: {valid}", status=400, error_type="VALIDATION_ERROR") + + data = request.get_json() or {} + target_user_id = data.get("user_id") + if not target_user_id: + return api_response(success=False, message="user_id is required", status=400, error_type="VALIDATION_ERROR") + + membership = OrganizationMember.query.filter_by( + organization_id=org_id, user_id=target_user_id, deleted_at=None + ).first() + if not membership: + return api_response(success=False, message="Member not found in this organization", status=404, error_type="NOT_FOUND") + + membership.role = new_role + db.session.commit() + return api_response( + data={"user_id": target_user_id, "role": new_role.value}, + message=f"Role updated to {new_role.value}", + ) + + +@api_v1_bp.route("/organizations//roles//members/", methods=["DELETE"]) +@login_required +@require_admin +def remove_role_from_member(org_id, role_name, user_id): + """Demote a member to GUEST (effectively removing a named role). + + Removing a role downgrades the member to GUEST rather than removing them + from the organization entirely. Use the existing DELETE + /organizations//members/ endpoint to fully remove. + + Returns: + 200: Role removed (member demoted to GUEST) + 400: Invalid role name + 403: Not an admin/owner + 404: Org or member not found + """ + from gatehouse_app.models.organization_member import OrganizationMember + from gatehouse_app.extensions import db + + try: + OrganizationRole(role_name.lower()) # validate the name + except ValueError: + valid = [r.value for r in OrganizationRole] + return api_response(success=False, message=f"Invalid role. Must be one of: {valid}", status=400, error_type="VALIDATION_ERROR") + + membership = OrganizationMember.query.filter_by( + organization_id=org_id, user_id=user_id, deleted_at=None + ).first() + if not membership: + return api_response(success=False, message="Member not found in this organization", status=404, error_type="NOT_FOUND") + + membership.role = OrganizationRole.GUEST + db.session.commit() + return api_response( + data={"user_id": user_id, "role": OrganizationRole.GUEST.value}, + message="Role removed; member demoted to GUEST", + ) + + +@api_v1_bp.route("/organizations//cas", methods=["GET"]) +@login_required +@require_admin +def list_org_cas(org_id): + """List all Certificate Authorities for an organization. + + Returns: + 200: List of CAs (private_key excluded) + 403: Not admin/owner + 404: Org not found + """ + from gatehouse_app.models.ca import CA + from gatehouse_app.models.organization import Organization + + org = Organization.query.filter_by(id=org_id, deleted_at=None).first() + if not org: + return api_response(success=False, message="Organization not found", status=404, error_type="NOT_FOUND") + + cas = CA.query.filter_by(organization_id=org_id, deleted_at=None).all() + return api_response( + data={"cas": [ca.to_dict() for ca in cas], "count": len(cas)}, + message="CAs retrieved", + ) + + +@api_v1_bp.route("/organizations//cas/", methods=["PATCH"]) +@login_required +@require_admin +def update_org_ca(org_id, ca_id): + """Update CA configuration (validity hours). + + Request body: + default_cert_validity_hours: Default validity in hours (optional) + max_cert_validity_hours: Maximum validity in hours (optional) + + Returns: + 200: CA updated successfully + 400: Validation error + 403: Not admin/owner + 404: Org or CA not found + """ + from gatehouse_app.models.ca import CA + from gatehouse_app.models.organization import Organization + from marshmallow import Schema, fields, validate, ValidationError + + org = Organization.query.filter_by(id=org_id, deleted_at=None).first() + if not org: + return api_response(success=False, message="Organization not found", status=404, error_type="NOT_FOUND") + + ca = CA.query.filter_by(id=ca_id, organization_id=org_id, deleted_at=None).first() + if not ca: + return api_response(success=False, message="CA not found", status=404, error_type="NOT_FOUND") + + try: + class CAUpdateSchema(Schema): + default_cert_validity_hours = fields.Int( + validate=validate.Range(min=1), + required=False + ) + max_cert_validity_hours = fields.Int( + validate=validate.Range(min=1), + required=False + ) + + schema = CAUpdateSchema() + data = schema.load(request.json or {}) + + # Validate that max >= default if both are provided + default_hours = data.get('default_cert_validity_hours', ca.default_cert_validity_hours) + max_hours = data.get('max_cert_validity_hours', ca.max_cert_validity_hours) + + if default_hours > max_hours: + return api_response( + success=False, + message="Default validity must be less than or equal to maximum validity", + status=400, + error_type="VALIDATION_ERROR", + ) + + # Update fields + if 'default_cert_validity_hours' in data: + ca.default_cert_validity_hours = data['default_cert_validity_hours'] + if 'max_cert_validity_hours' in data: + ca.max_cert_validity_hours = data['max_cert_validity_hours'] + + db.session.commit() + + return api_response( + data={"ca": ca.to_dict()}, + message="CA updated successfully", + ) + + except ValidationError as e: + return api_response( + success=False, + message="Validation failed", + status=400, + error_type="VALIDATION_ERROR", + error_details=e.messages, + ) + except Exception as e: + db.session.rollback() + return api_response( + success=False, + message="Failed to update CA", + status=500, + error_type="SERVER_ERROR", + ) + + +@api_v1_bp.route("/organizations//cas", methods=["POST"]) +@login_required +@require_admin +def create_org_ca(org_id): + """Create a new Certificate Authority for an organization. + + Request body: + name: CA display name (required) + description: Optional description + key_type: "ed25519" (default), "rsa", or "ecdsa" + default_cert_validity_hours: Default cert validity in hours (optional) + max_cert_validity_hours: Max cert validity in hours (optional) + + Returns: + 201: CA created successfully + 400: Validation error or name already taken + 403: Not admin/owner + 404: Org not found + """ + from gatehouse_app.models.ca import CA, KeyType + from gatehouse_app.models.organization import Organization + from gatehouse_app.utils.crypto import compute_ssh_fingerprint + from marshmallow import Schema, fields as ma_fields, validate, ValidationError as MaValidationError + from sshkey_tools.keys import Ed25519PrivateKey, RsaPrivateKey, EcdsaPrivateKey + + org = Organization.query.filter_by(id=org_id, deleted_at=None).first() + if not org: + return api_response(success=False, message="Organization not found", status=404, error_type="NOT_FOUND") + + class CreateCASchema(Schema): + name = ma_fields.Str(required=True, validate=validate.Length(min=1, max=255)) + description = ma_fields.Str(load_default=None, allow_none=True) + ca_type = ma_fields.Str(load_default="user", validate=validate.OneOf(["user", "host"])) + key_type = ma_fields.Str(load_default="ed25519", validate=validate.OneOf(["ed25519", "rsa", "ecdsa"])) + default_cert_validity_hours = ma_fields.Int(load_default=8, validate=validate.Range(min=1)) + max_cert_validity_hours = ma_fields.Int(load_default=720, validate=validate.Range(min=1)) + + try: + schema = CreateCASchema() + data = schema.load(request.get_json() or {}) + + # Check name uniqueness within org + existing = CA.query.filter_by( + organization_id=org_id, name=data["name"], deleted_at=None + ).first() + if existing: + return api_response( + success=False, + message="A CA with that name already exists in this organization", + status=400, + error_type="DUPLICATE_NAME", + ) + + # Enforce one CA per type per org + from gatehouse_app.models.ca import CaType + ca_type_val = data["ca_type"] + existing_type = CA.query.filter_by( + organization_id=org_id, deleted_at=None + ).filter(CA.ca_type == CaType(ca_type_val)).first() + if existing_type: + type_label = "User" if ca_type_val == "user" else "Host" + return api_response( + success=False, + message=f"A {type_label} CA already exists for this organization. " + f"You can only have one {type_label} CA per organization.", + status=400, + error_type="DUPLICATE_CA_TYPE", + ) + + # Validate cross-field + if data["default_cert_validity_hours"] > data["max_cert_validity_hours"]: + return api_response( + success=False, + message="Default validity must be less than or equal to maximum validity", + status=400, + error_type="VALIDATION_ERROR", + ) + + # Generate key pair + key_type = data["key_type"] + if key_type == "ed25519": + private_key_obj = Ed25519PrivateKey.generate() + elif key_type == "rsa": + private_key_obj = RsaPrivateKey.generate(4096) + else: # ecdsa + private_key_obj = EcdsaPrivateKey.generate() + + private_key_pem = private_key_obj.to_string() + public_key_str = private_key_obj.public_key.to_string() + fingerprint = compute_ssh_fingerprint(public_key_str) + + ca = CA( + organization_id=org_id, + name=data["name"], + description=data["description"], + ca_type=CaType(ca_type_val), + key_type=KeyType(key_type), + private_key=private_key_pem, + public_key=public_key_str, + fingerprint=fingerprint, + default_cert_validity_hours=data["default_cert_validity_hours"], + max_cert_validity_hours=data["max_cert_validity_hours"], + is_active=True, + ) + db.session.add(ca) + db.session.commit() + + return api_response( + data={"ca": ca.to_dict()}, + message="CA created successfully", + status=201, + ) + + except MaValidationError as e: + return api_response( + success=False, + message="Validation failed", + status=400, + error_type="VALIDATION_ERROR", + error_details=e.messages, + ) + except Exception as e: + db.session.rollback() + current_app.logger.exception("Failed to create CA") + return api_response( + success=False, + message="Failed to create CA", + status=500, + error_type="SERVER_ERROR", + ) + + diff --git a/gatehouse_app/api/v1/principals.py b/gatehouse_app/api/v1/principals.py index 11f3332..587d81d 100644 --- a/gatehouse_app/api/v1/principals.py +++ b/gatehouse_app/api/v1/principals.py @@ -8,6 +8,7 @@ from gatehouse_app.utils.decorators import login_required, require_admin, full_a from gatehouse_app.models import Principal, PrincipalMembership, Department, DepartmentPrincipal from gatehouse_app.services.organization_service import OrganizationService from gatehouse_app.services.user_service import UserService +from gatehouse_app.exceptions import OrganizationNotFoundError from gatehouse_app.extensions import db @@ -614,7 +615,10 @@ def link_principal_to_department(org_id, principal_id, dept_id): 404: Organization, principal, or department not found 409: Already linked """ - org = OrganizationService.get_organization_by_id(org_id) + try: + org = OrganizationService.get_organization_by_id(org_id) + except OrganizationNotFoundError: + return api_response(success=False, message="Organization not found", status=404, error_type="NOT_FOUND") principal = Principal.query.filter_by( id=principal_id, @@ -644,7 +648,6 @@ def link_principal_to_department(org_id, principal_id, dept_id): error_type="NOT_FOUND", ) - # Check if already linked existing = DepartmentPrincipal.query.filter_by( department_id=dept_id, principal_id=principal_id, @@ -659,13 +662,35 @@ def link_principal_to_department(org_id, principal_id, dept_id): error_type="CONFLICT", ) - # Create link - link = DepartmentPrincipal( - department_id=dept_id, - principal_id=principal_id, - ) - db.session.add(link) - db.session.commit() + soft_deleted = DepartmentPrincipal.query.filter( + DepartmentPrincipal.department_id == dept_id, + DepartmentPrincipal.principal_id == principal_id, + DepartmentPrincipal.deleted_at != None, # noqa: E711 + ).first() + + try: + if soft_deleted: + soft_deleted.deleted_at = None + else: + link = DepartmentPrincipal( + department_id=dept_id, + principal_id=principal_id, + ) + db.session.add(link) + db.session.commit() + except Exception as e: + db.session.rollback() + from gatehouse_app.extensions import db as _db + try: + _db.session.rollback() + except Exception: + pass + return api_response( + success=False, + message="Failed to link principal to department", + status=500, + error_type="SERVER_ERROR", + ) return api_response( message="Principal linked to department successfully", diff --git a/gatehouse_app/api/v1/ssh.py b/gatehouse_app/api/v1/ssh.py index 3f4946f..c301dad 100644 --- a/gatehouse_app/api/v1/ssh.py +++ b/gatehouse_app/api/v1/ssh.py @@ -16,6 +16,7 @@ from gatehouse_app.exceptions import ( from gatehouse_app.utils.constants import AuditAction from gatehouse_app.models import AuditLog from gatehouse_app.utils.decorators import login_required +from gatehouse_app.utils.response import api_response ssh_bp = Blueprint('ssh', __name__, url_prefix='/ssh') ssh_key_service = SSHKeyService() @@ -112,7 +113,7 @@ def _get_or_create_system_ca(): return None -def _persist_certificate(user_id, ssh_key_id, ca, signing_response, request_ip=None): +def _persist_certificate(user_id, ssh_key_id, ca, signing_response, request_ip=None, cert_type_str='user'): """Save a signed certificate to the ssh_certificates table. Args: @@ -121,6 +122,7 @@ def _persist_certificate(user_id, ssh_key_id, ca, signing_response, request_ip=N ca: CA model instance (may be None — cert still returned but not persisted) signing_response: SSHCertificateSigningResponse request_ip: Client IP address + cert_type_str: 'user' or 'host' (from the sign request) Returns: SSHCertificate instance or None if persistence failed @@ -133,6 +135,11 @@ def _persist_certificate(user_id, ssh_key_id, ca, signing_response, request_ip=N from gatehouse_app.models.ssh_certificate import SSHCertificate, CertificateStatus from gatehouse_app.models.ca import CertType + try: + resolved_cert_type = CertType(cert_type_str) + except ValueError: + resolved_cert_type = CertType.USER + cert_record = SSHCertificate( ca_id=ca.id, user_id=user_id, @@ -140,7 +147,7 @@ def _persist_certificate(user_id, ssh_key_id, ca, signing_response, request_ip=N certificate=signing_response.certificate, serial=signing_response.serial, key_id=str(ssh_key_id), - cert_type=CertType.USER, + cert_type=resolved_cert_type, principals=signing_response.principals, valid_after=signing_response.valid_after, valid_before=signing_response.valid_before, @@ -172,10 +179,13 @@ def list_ssh_keys(): user_id = g.current_user.id keys = ssh_key_service.get_user_ssh_keys(user_id) - return jsonify({ - 'keys': [k.to_dict() for k in keys], - 'count': len(keys), - }), 200 + return api_response( + data={ + 'keys': [k.to_dict() for k in keys], + 'count': len(keys), + }, + message="SSH keys retrieved successfully" + ) @ssh_bp.route('/keys', methods=['POST']) @@ -183,25 +193,24 @@ def list_ssh_keys(): def add_ssh_key(): """Add a new SSH public key for current user.""" user_id = g.current_user.id - + data = request.get_json() if not data: - return jsonify({'error': 'No JSON data provided'}), 400 - + return api_response(success=False, message='No JSON data provided', status=400, error_type='BAD_REQUEST') + public_key = data.get('public_key') or data.get('key') description = data.get('description') - + if not public_key: - return jsonify({'error': 'public_key is required'}), 400 - + return api_response(success=False, message='public_key is required', status=400, error_type='BAD_REQUEST') + try: ssh_key = ssh_key_service.add_ssh_key( user_id=user_id, public_key=public_key, description=description, ) - - # Audit log + AuditLog.log( action=AuditAction.SSH_KEY_ADDED, user_id=user_id, @@ -209,17 +218,17 @@ def add_ssh_key(): resource_id=ssh_key.id, ip_address=request.remote_addr, ) - - return jsonify(ssh_key.to_dict()), 201 - + + return api_response(success=True, message='SSH key added', data=ssh_key.to_dict(), status=201) + except SSHKeyAlreadyExistsError as e: - return jsonify({'error': e.message, 'code': 'SSH_KEY_ALREADY_EXISTS'}), 409 + return api_response(success=False, message=e.message, status=409, error_type='SSH_KEY_ALREADY_EXISTS') except IntegrityError: - return jsonify({'error': 'SSH key already exists', 'code': 'SSH_KEY_ALREADY_EXISTS'}), 409 + return api_response(success=False, message='SSH key already exists', status=409, error_type='SSH_KEY_ALREADY_EXISTS') except SSHKeyError as e: - return jsonify({'error': str(e)}), 400 + return api_response(success=False, message=str(e), status=400, error_type='SSH_KEY_ERROR') except ValidationError as e: - return jsonify({'error': str(e)}), 400 + return api_response(success=False, message=str(e), status=400, error_type='VALIDATION_ERROR') @ssh_bp.route('/keys/', methods=['GET']) @@ -227,18 +236,17 @@ def add_ssh_key(): def get_ssh_key(key_id): """Get a specific SSH key.""" user_id = g.current_user.id - + try: ssh_key = ssh_key_service.get_ssh_key(key_id) - - # Check ownership + if ssh_key.user_id != user_id: - return jsonify({'error': 'Forbidden'}), 403 - - return jsonify(ssh_key.to_dict()), 200 - + return api_response(success=False, message='Forbidden', status=403, error_type='FORBIDDEN') + + return api_response(success=True, message='SSH key retrieved', data=ssh_key.to_dict(), status=200) + except SSHKeyNotFoundError: - return jsonify({'error': 'SSH key not found'}), 404 + return api_response(success=False, message='SSH key not found', status=404, error_type='NOT_FOUND') @ssh_bp.route('/keys/', methods=['DELETE']) @@ -246,17 +254,15 @@ def get_ssh_key(key_id): def delete_ssh_key(key_id): """Delete an SSH key.""" user_id = g.current_user.id - + try: ssh_key = ssh_key_service.get_ssh_key(key_id) - - # Check ownership + if ssh_key.user_id != user_id: - return jsonify({'error': 'Forbidden'}), 403 - + return api_response(success=False, message='Forbidden', status=403, error_type='FORBIDDEN') + ssh_key_service.delete_ssh_key(key_id) - - # Audit log + AuditLog.log( action=AuditAction.SSH_KEY_DELETED, user_id=user_id, @@ -264,11 +270,11 @@ def delete_ssh_key(key_id): resource_id=key_id, ip_address=request.remote_addr, ) - - return jsonify({'status': 'deleted'}), 200 - + + return api_response(success=True, message='SSH key deleted', data={'status': 'deleted'}, status=200) + except SSHKeyNotFoundError: - return jsonify({'error': 'SSH key not found'}), 404 + return api_response(success=False, message='SSH key not found', status=404, error_type='NOT_FOUND') @ssh_bp.route('/keys//verify', methods=['GET', 'POST']) @@ -276,37 +282,34 @@ def delete_ssh_key(key_id): def verify_ssh_key(key_id): """Generate or verify SSH key ownership challenge.""" user_id = g.current_user.id - + try: ssh_key = ssh_key_service.get_ssh_key(key_id) - - # Check ownership + if ssh_key.user_id != user_id: - return jsonify({'error': 'Forbidden'}), 403 - - # Handle GET request - return challenge + return api_response(success=False, message='Forbidden', status=403, error_type='FORBIDDEN') + + # GET — return a fresh challenge if request.method == 'GET': challenge = ssh_key_service.generate_verification_challenge(key_id) - return jsonify({ + return api_response(success=True, message='Challenge generated', data={ 'challenge_text': challenge, - 'validationText': challenge, # Backwards compatibility + 'validationText': challenge, 'key_id': key_id, - }), 200 - - # Handle POST request - verify signature + }, status=200) + + # POST — verify signature or generate challenge data = request.get_json() or {} action = data.get('action', 'verify_signature') - + if action == 'verify_signature': - # Verify signature signature = data.get('signature') if not signature: - return jsonify({'error': 'signature is required'}), 400 - + return api_response(success=False, message='signature is required', status=400, error_type='BAD_REQUEST') + try: verified = ssh_key_service.verify_ssh_key_ownership(key_id, signature) - - # Audit log + AuditLog.log( action=AuditAction.SSH_KEY_VERIFIED, user_id=user_id, @@ -315,9 +318,9 @@ def verify_ssh_key(key_id): ip_address=request.remote_addr, success=verified, ) - - return jsonify({'verified': verified}), 200 - + + return api_response(success=True, message='Verification complete', data={'verified': verified}, status=200) + except Exception as e: AuditLog.log( action=AuditAction.SSH_KEY_VALIDATION_FAILED, @@ -328,18 +331,17 @@ def verify_ssh_key(key_id): success=False, error_message=str(e), ) - return jsonify({'error': str(e)}), 400 - + return api_response(success=False, message=str(e), status=400, error_type='VERIFICATION_FAILED') + else: # generate_challenge - # Generate verification challenge challenge = ssh_key_service.generate_verification_challenge(key_id) - return jsonify({ + return api_response(success=True, message='Challenge generated', data={ 'challenge_text': challenge, - 'challenge': challenge, # Both for compatibility - }), 200 - + 'challenge': challenge, + }, status=200) + except SSHKeyNotFoundError: - return jsonify({'error': 'SSH key not found'}), 404 + return api_response(success=False, message='SSH key not found', status=404, error_type='NOT_FOUND') @ssh_bp.route('/keys//update-description', methods=['PATCH']) @@ -347,27 +349,23 @@ def verify_ssh_key(key_id): def update_ssh_key_description(key_id): """Update SSH key description.""" user_id = g.current_user.id - + data = request.get_json() if not data or 'description' not in data: - return jsonify({'error': 'description is required'}), 400 - + return api_response(success=False, message='description is required', status=400, error_type='BAD_REQUEST') + try: ssh_key = ssh_key_service.get_ssh_key(key_id) - - # Check ownership + if ssh_key.user_id != user_id: - return jsonify({'error': 'Forbidden'}), 403 - - updated_key = ssh_key_service.update_ssh_key_description( - key_id, - data['description'] - ) - - return jsonify(updated_key.to_dict()), 200 - + return api_response(success=False, message='Forbidden', status=403, error_type='FORBIDDEN') + + updated_key = ssh_key_service.update_ssh_key_description(key_id, data['description']) + + return api_response(success=True, message='Description updated', data=updated_key.to_dict(), status=200) + except SSHKeyNotFoundError: - return jsonify({'error': 'SSH key not found'}), 404 + return api_response(success=False, message='SSH key not found', status=404, error_type='NOT_FOUND') @ssh_bp.route('/sign', methods=['POST']) @@ -376,92 +374,119 @@ def sign_certificate(): """Sign an SSH certificate for the current user.""" user = g.current_user user_id = user.id - + data = request.get_json() if not data: - return jsonify({'error': 'No JSON data provided'}), 400 - - try: - principals = data.get('principals', []) - cert_type = data.get('cert_type', 'user') - # Accept both 'key_id' and 'cert_id' (from CLI) - key_id = data.get('key_id') or data.get('cert_id') - expiry_hours = data.get('expiry_hours') - + return api_response(success=False, message="No JSON data provided", status=400, error_type="BAD_REQUEST") + + requested_principals = data.get('principals') or [] + cert_type = data.get('cert_type', 'user') + key_id = data.get('key_id') or data.get('cert_id') + expiry_hours = data.get('expiry_hours') + + # ── Resolve which principals the user is allowed to use ────────────────── + from gatehouse_app.models.organization_member import OrganizationMember + from gatehouse_app.models.principal import Principal, PrincipalMembership + from gatehouse_app.models.department import DepartmentMembership, DepartmentPrincipal + from gatehouse_app.utils.constants import OrganizationRole + + allowed_principal_names = set() + + memberships = OrganizationMember.query.filter_by(user_id=user_id).all() + for om in memberships: + org = om.organization + if not org or org.deleted_at is not None: + continue + role = om.role + if role in (OrganizationRole.ADMIN, OrganizationRole.OWNER): + # Admin/owner can use any principal in the org + for p in Principal.query.filter_by(organization_id=org.id, deleted_at=None).all(): + allowed_principal_names.add(p.name) + else: + # Direct memberships + for pm in PrincipalMembership.query.filter_by(user_id=user_id, deleted_at=None).all(): + if pm.principal and pm.principal.organization_id == org.id and pm.principal.deleted_at is None: + allowed_principal_names.add(pm.principal.name) + # Via department + for dm in DepartmentMembership.query.filter_by(user_id=user_id, deleted_at=None).all(): + if dm.department and dm.department.organization_id == org.id and dm.department.deleted_at is None: + for dp in DepartmentPrincipal.query.filter_by(department_id=dm.department_id, deleted_at=None).all(): + if dp.principal and dp.principal.deleted_at is None: + allowed_principal_names.add(dp.principal.name) + + # ── Determine final principals list ───────────────────────────────────── + if not requested_principals: + # Auto-resolve: use all principals the user is assigned to + principals = list(allowed_principal_names) if not principals: - return jsonify({'error': 'principals is required'}), 400 - - # If key_id not specified, use first verified key - if not key_id: - verified_keys = ssh_key_service.get_user_verified_ssh_keys(user_id) - if not verified_keys: - return jsonify({'error': 'No verified SSH keys found'}), 400 - key_id = verified_keys[0].id - - # Get the SSH key + return api_response( + success=False, + message="You have no principals assigned. Ask an admin to add you to a principal.", + status=400, + error_type="NO_PRINCIPALS", + ) + else: + # Validate each requested principal is within the user's allowed set + invalid = [p for p in requested_principals if p not in allowed_principal_names] + if invalid: + return api_response( + success=False, + message=f"You are not authorised to request principals: {', '.join(invalid)}", + status=403, + error_type="UNAUTHORIZED_PRINCIPALS", + ) + principals = requested_principals + + # ── Key resolution ──────────────────────────────────────────────────────── + if not key_id: + verified_keys = ssh_key_service.get_user_verified_ssh_keys(user_id) + if not verified_keys: + return api_response( + success=False, + message="No verified SSH keys found. Verify a key before requesting a certificate.", + status=400, + error_type="NO_VERIFIED_KEYS", + ) + key_id = verified_keys[0].id + + try: ssh_key = ssh_key_service.get_ssh_key(key_id) - if ssh_key.user_id != user_id: - return jsonify({'error': 'Forbidden'}), 403 - - if not ssh_key.verified: - return jsonify({'error': 'SSH key is not verified'}), 400 - - # Resolve which CA to use: org DB CA > config-file CA - db_ca = _get_org_ca_for_user(user) - ca_private_key = db_ca.private_key if db_ca else None # None → signing service uses config - - # Create signing request - signing_request = SSHCertificateSigningRequest( - ssh_public_key=ssh_key.payload, - principals=principals, - cert_type=cert_type, - key_id=key_id, - expiry_hours=int(expiry_hours) if expiry_hours else None, - ) - - # Validate request - validation_errors = signing_request.validate() - if validation_errors: - return jsonify({'errors': validation_errors}), 400 - - # Sign the certificate (pass ca_private_key=None → service loads from config) - response = ssh_ca_service.sign_certificate(signing_request, ca_private_key=ca_private_key) - - # Persist certificate to DB - # If user's org has no DB CA, use the system-config-ca record - ca_for_db = db_ca or _get_or_create_system_ca() - cert_record = _persist_certificate( - user_id=user_id, - ssh_key_id=key_id, - ca=ca_for_db, - signing_response=response, - request_ip=request.remote_addr, - ) - - # Audit log - AuditLog.log( - action=AuditAction.SSH_CERT_ISSUED, - user_id=user_id, - resource_type='SSHCertificate', - resource_id=cert_record.id if cert_record else key_id, - ip_address=request.remote_addr, - description=f'Certificate issued for principals: {", ".join(principals)}', - ) - - result = { - 'certificate': response.certificate, - 'serial': response.serial, - 'principals': response.principals, - 'valid_after': response.valid_after.isoformat() if response.valid_after else None, - 'valid_before': response.valid_before.isoformat() if response.valid_before else None, - } - if cert_record: - result['cert_id'] = str(cert_record.id) - - return jsonify(result), 201 - except SSHKeyNotFoundError: - return jsonify({'error': 'SSH key not found'}), 404 + return api_response(success=False, message="SSH key not found", status=404, error_type="NOT_FOUND") + + if ssh_key.user_id != user_id: + return api_response(success=False, message="Forbidden", status=403, error_type="FORBIDDEN") + + if not ssh_key.verified: + return api_response( + success=False, + message="SSH key is not verified. Verify it before requesting a certificate.", + status=400, + error_type="KEY_NOT_VERIFIED", + ) + + db_ca = _get_org_ca_for_user(user) + ca_private_key = db_ca.private_key if db_ca else None + + signing_request = SSHCertificateSigningRequest( + ssh_public_key=ssh_key.payload, + principals=principals, + cert_type=cert_type, + key_id=key_id, + expiry_hours=int(expiry_hours) if expiry_hours else None, + ) + validation_errors = signing_request.validate() + if validation_errors: + return api_response( + success=False, + message="Invalid signing request", + status=400, + error_type="VALIDATION_ERROR", + error_details={"errors": validation_errors}, + ) + + try: + response = ssh_ca_service.sign_certificate(signing_request, ca_private_key=ca_private_key) except SSHCertificateError as e: AuditLog.log( action=AuditAction.SSH_CERT_FAILED, @@ -471,7 +496,7 @@ def sign_certificate(): success=False, error_message=str(e), ) - return jsonify({'error': str(e)}), 400 + return api_response(success=False, message=str(e), status=400, error_type="SIGNING_FAILED") except Exception as e: AuditLog.log( action=AuditAction.SSH_CERT_FAILED, @@ -481,7 +506,38 @@ def sign_certificate(): success=False, error_message=str(e), ) - return jsonify({'error': 'Certificate signing failed: ' + str(e)}), 500 + return api_response(success=False, message="Certificate signing failed", status=500, error_type="SERVER_ERROR") + + ca_for_db = db_ca or _get_or_create_system_ca() + cert_record = _persist_certificate( + user_id=user_id, + ssh_key_id=key_id, + ca=ca_for_db, + signing_response=response, + request_ip=request.remote_addr, + cert_type_str=cert_type, + ) + + AuditLog.log( + action=AuditAction.SSH_CERT_ISSUED, + user_id=user_id, + resource_type='SSHCertificate', + resource_id=cert_record.id if cert_record else key_id, + ip_address=request.remote_addr, + description=f'Certificate issued for principals: {", ".join(principals)}', + ) + + result = { + 'certificate': response.certificate, + 'serial': response.serial, + 'principals': response.principals, + 'valid_after': response.valid_after.isoformat() if response.valid_after else None, + 'valid_before': response.valid_before.isoformat() if response.valid_before else None, + } + if cert_record: + result['cert_id'] = str(cert_record.id) + + return api_response(data=result, message="Certificate signed successfully", status=201) @ssh_bp.route('/certificates', methods=['GET']) @@ -498,12 +554,20 @@ def list_certificates(): .order_by(SSHCertificate.created_at.desc()) .all() ) - return jsonify({ - 'certificates': [c.to_dict() for c in certs], - 'count': len(certs), - }), 200 + return api_response( + data={ + 'certificates': [c.to_dict() for c in certs], + 'count': len(certs), + }, + message="Certificates retrieved successfully" + ) except Exception as e: - return jsonify({'error': str(e)}), 500 + return api_response( + success=False, + message=str(e), + status=500, + error_type='INTERNAL_ERROR' + ) @ssh_bp.route('/certificates/', methods=['GET']) @@ -516,15 +580,14 @@ def get_certificate(cert_id): from gatehouse_app.models.ssh_certificate import SSHCertificate cert = SSHCertificate.query.filter_by(id=cert_id, deleted_at=None).first() if not cert: - return jsonify({'error': 'Certificate not found'}), 404 + return api_response(success=False, message='Certificate not found', status=404, error_type='NOT_FOUND') if cert.user_id != user_id: - return jsonify({'error': 'Forbidden'}), 403 - # Include full certificate text in single-fetch endpoint + return api_response(success=False, message='Forbidden', status=403, error_type='FORBIDDEN') data = cert.to_dict() data['certificate'] = cert.certificate - return jsonify(data), 200 + return api_response(success=True, message='Certificate retrieved', data=data, status=200) except Exception as e: - return jsonify({'error': str(e)}), 500 + return api_response(success=False, message=str(e), status=500, error_type='INTERNAL_ERROR') @ssh_bp.route('/certificates//revoke', methods=['POST']) @@ -540,11 +603,11 @@ def revoke_certificate(cert_id): from gatehouse_app.models.ssh_certificate import SSHCertificate cert = SSHCertificate.query.filter_by(id=cert_id, deleted_at=None).first() if not cert: - return jsonify({'error': 'Certificate not found'}), 404 + return api_response(success=False, message='Certificate not found', status=404, error_type='NOT_FOUND') if cert.user_id != user_id: - return jsonify({'error': 'Forbidden'}), 403 + return api_response(success=False, message='Forbidden', status=403, error_type='FORBIDDEN') if cert.revoked: - return jsonify({'error': 'Certificate is already revoked'}), 409 + return api_response(success=False, message='Certificate is already revoked', status=409, error_type='ALREADY_REVOKED') cert.revoke(reason=reason) @@ -557,9 +620,14 @@ def revoke_certificate(cert_id): description=f'Revoked: {reason}', ) - return jsonify({'status': 'revoked', 'cert_id': cert_id, 'reason': reason}), 200 + return api_response( + success=True, + message='Certificate revoked successfully', + data={'status': 'revoked', 'cert_id': cert_id, 'reason': reason}, + status=200, + ) except Exception as e: - return jsonify({'error': str(e)}), 500 + return api_response(success=False, message=str(e), status=500, error_type='INTERNAL_ERROR') @ssh_bp.route('/ca/public-key', methods=['GET']) @@ -584,12 +652,15 @@ def get_ca_public_key(): # Try org CA first db_ca = _get_org_ca_for_user(user) if db_ca: - return jsonify({ - 'public_key': db_ca.public_key, - 'fingerprint': db_ca.fingerprint, - 'ca_name': db_ca.name, - 'source': 'db', - }), 200 + return api_response( + data={ + 'public_key': db_ca.public_key, + 'fingerprint': db_ca.fingerprint, + 'ca_name': db_ca.name, + 'source': 'db', + }, + message="CA public key retrieved successfully" + ) # Fall back to config-file CA try: @@ -601,15 +672,28 @@ def get_ca_public_key(): with open(key_path) as f: pub_key = f.read().strip() from gatehouse_app.utils.crypto import compute_ssh_fingerprint - return jsonify({ - 'public_key': pub_key, - 'fingerprint': compute_ssh_fingerprint(pub_key), - 'ca_name': 'system-config-ca', - 'source': 'config', - }), 200 + return api_response( + data={ + 'public_key': pub_key, + 'fingerprint': compute_ssh_fingerprint(pub_key), + 'ca_name': 'system-config-ca', + 'source': 'config', + }, + message="CA public key retrieved successfully" + ) except Exception as e: - return jsonify({'error': f'Could not load CA public key: {e}'}), 500 + return api_response( + success=False, + message=f'Could not load CA public key: {e}', + status=500, + error_type='INTERNAL_ERROR' + ) - return jsonify({'error': 'No CA configured for this organization'}), 404 + return api_response( + success=False, + message='No CA configured for this organization', + status=404, + error_type='NOT_FOUND' + ) diff --git a/gatehouse_app/api/v1/users.py b/gatehouse_app/api/v1/users.py index 407e373..7d318f0 100644 --- a/gatehouse_app/api/v1/users.py +++ b/gatehouse_app/api/v1/users.py @@ -157,3 +157,234 @@ def get_my_organizations(): }, message="Organizations retrieved successfully", ) + + +@api_v1_bp.route("/users/me/principals", methods=["GET"]) +@login_required +@full_access_required +def get_my_principals(): + """Return all principals the current user can sign certificates for. + + For each organization the user belongs to, returns: + - Their effective principals (direct membership + via department) + - Their role in that org (so the frontend can offer admin-mode selection) + - All principals in the org (admin/owner only — so they can pick any) + + Returns: + 200: { + orgs: [{ + org_id, org_name, role, + my_principals: [{id, name, description}], + all_principals: [{id, name, description}] # populated for admin/owner only + }] + } + """ + from gatehouse_app.models.organization_member import OrganizationMember + from gatehouse_app.models.principal import Principal, PrincipalMembership + from gatehouse_app.models.department import DepartmentMembership, DepartmentPrincipal + from gatehouse_app.utils.constants import OrganizationRole + + user = g.current_user + user_id = user.id + + # Get all org memberships + memberships = OrganizationMember.query.filter_by( + user_id=user_id, + ).all() + + orgs_result = [] + for membership in memberships: + org = membership.organization + if not org or org.deleted_at is not None: + continue + + role = membership.role + is_admin = role in (OrganizationRole.ADMIN, OrganizationRole.OWNER) + + # Collect the user's effective principals for this org + effective_principal_ids = set() + + # Direct memberships + direct = PrincipalMembership.query.filter_by( + user_id=user_id, + deleted_at=None, + ).all() + for pm in direct: + if pm.principal and pm.principal.organization_id == org.id and pm.principal.deleted_at is None: + effective_principal_ids.add(pm.principal_id) + + # Via department + dept_memberships = DepartmentMembership.query.filter_by( + user_id=user_id, + deleted_at=None, + ).all() + for dm in dept_memberships: + if dm.department and dm.department.organization_id == org.id and dm.department.deleted_at is None: + dept_principals = DepartmentPrincipal.query.filter_by( + department_id=dm.department_id, + deleted_at=None, + ).all() + for dp in dept_principals: + if dp.principal and dp.principal.deleted_at is None: + effective_principal_ids.add(dp.principal_id) + + # Fetch principal objects + my_principals = [] + if effective_principal_ids: + my_p = Principal.query.filter( + Principal.id.in_(list(effective_principal_ids)), + Principal.deleted_at == None, + ).all() + my_principals = [{"id": p.id, "name": p.name, "description": p.description} for p in my_p] + + # For admins/owners: also return all principals in the org + all_principals = [] + if is_admin: + all_p = Principal.query.filter_by( + organization_id=org.id, + deleted_at=None, + ).all() + all_principals = [{"id": p.id, "name": p.name, "description": p.description} for p in all_p] + + orgs_result.append({ + "org_id": org.id, + "org_name": org.name, + "role": role.value if hasattr(role, "value") else role, + "is_admin": is_admin, + "my_principals": my_principals, + "all_principals": all_principals, + }) + + return api_response( + data={"orgs": orgs_result}, + message="Principals retrieved successfully", + ) + + +@api_v1_bp.route("/admin/users", methods=["GET"]) +@login_required +def admin_list_users(): + """List all users the caller has admin rights to see. + + The caller must be an OWNER or ADMIN of at least one organization. + Returns users that share an organization with the caller and where the + caller holds admin/owner role in that organization. + + Query params: + q – optional search string (matched against name/email) + page – page number (default 1) + per_page – page size (default 50, max 200) + """ + from gatehouse_app.models.organization_member import OrganizationMember + from gatehouse_app.models.user import User as _User + from gatehouse_app.extensions import db as _db + from sqlalchemy import or_ + + caller = g.current_user + + # Find orgs where caller is admin/owner + admin_memberships = OrganizationMember.query.filter( + OrganizationMember.user_id == caller.id, + OrganizationMember.role.in_(["OWNER", "ADMIN"]), + OrganizationMember.deleted_at == None, + ).all() + + if not admin_memberships: + return api_response( + success=False, + message="Admin or owner role required", + status=403, + error_type="AUTHORIZATION_ERROR", + ) + + admin_org_ids = [m.organization_id for m in admin_memberships] + + # Collect user IDs in those orgs + member_rows = OrganizationMember.query.filter( + OrganizationMember.organization_id.in_(admin_org_ids), + OrganizationMember.deleted_at == None, + ).all() + visible_user_ids = list({row.user_id for row in member_rows}) + + # Optional search + q = request.args.get("q", "").strip() + try: + page = max(1, int(request.args.get("page", 1))) + per_page = min(200, max(1, int(request.args.get("per_page", 50)))) + except ValueError: + page, per_page = 1, 50 + + query = _User.query.filter( + _User.id.in_(visible_user_ids), + _User.deleted_at == None, + ) + if q: + like = f"%{q}%" + query = query.filter(or_(_User.email.ilike(like), _User.full_name.ilike(like))) + + total = query.count() + users = query.order_by(_User.email).offset((page - 1) * per_page).limit(per_page).all() + + member_lookup: dict = {} + for row in member_rows: + if row.user_id not in member_lookup: + member_lookup[row.user_id] = { + "organization_id": row.organization_id, + "role": row.role.value if hasattr(row.role, "value") else row.role, + } + + users_data = [] + for u in users: + d = u.to_dict() + m = member_lookup.get(u.id, {}) + d["org_role"] = m.get("role", "member") + d["org_id"] = m.get("organization_id") + users_data.append(d) + + return api_response( + data={ + "users": users_data, + "count": total, + "page": page, + "per_page": per_page, + "pages": (total + per_page - 1) // per_page, + }, + message="Users retrieved successfully", + ) + + +@api_v1_bp.route("/admin/users/", methods=["GET"]) +@login_required +def admin_get_user(user_id): + """Get a single user's profile (admin view with SSH keys).""" + from gatehouse_app.models.organization_member import OrganizationMember + from gatehouse_app.models.user import User as _User + from gatehouse_app.models.ssh_key import SSHKey + + caller = g.current_user + + target = _User.query.filter_by(id=user_id, deleted_at=None).first() + if not target: + return api_response(success=False, message="User not found", status=404, error_type="NOT_FOUND") + + # Verify caller has admin access to a shared org + target_org_ids = {m.organization_id for m in target.organization_memberships if m.deleted_at is None} + has_access = OrganizationMember.query.filter( + OrganizationMember.user_id == caller.id, + OrganizationMember.organization_id.in_(target_org_ids), + OrganizationMember.role.in_(["OWNER", "ADMIN"]), + OrganizationMember.deleted_at == None, + ).first() is not None + + if not has_access: + return api_response(success=False, message="Access denied", status=403, error_type="AUTHORIZATION_ERROR") + + ssh_keys = SSHKey.query.filter_by(user_id=user_id, deleted_at=None).all() + + return api_response( + data={ + "user": target.to_dict(), + "ssh_keys": [k.to_dict() for k in ssh_keys], + }, + message="User retrieved", + ) diff --git a/gatehouse_app/models/__init__.py b/gatehouse_app/models/__init__.py index 252ee0e..2b01a60 100644 --- a/gatehouse_app/models/__init__.py +++ b/gatehouse_app/models/__init__.py @@ -30,7 +30,7 @@ from gatehouse_app.models.principal import ( PrincipalMembership, ) from gatehouse_app.models.ssh_key import SSHKey -from gatehouse_app.models.ca import CA, KeyType, CertType +from gatehouse_app.models.ca import CA, KeyType, CertType, CAPermission from gatehouse_app.models.ssh_certificate import SSHCertificate, CertificateStatus from gatehouse_app.models.certificate_audit_log import CertificateAuditLog from gatehouse_app.models.password_reset_token import PasswordResetToken @@ -66,6 +66,7 @@ __all__ = [ "CA", "KeyType", "CertType", + "CAPermission", "SSHCertificate", "CertificateStatus", "CertificateAuditLog", diff --git a/gatehouse_app/models/base.py b/gatehouse_app/models/base.py index 0a63735..d1ce1c4 100644 --- a/gatehouse_app/models/base.py +++ b/gatehouse_app/models/base.py @@ -82,7 +82,10 @@ class BaseModel(db.Model): if column.name not in exclude: value = getattr(self, column.name) if isinstance(value, datetime): - result[column.name] = value.isoformat() + if value.tzinfo is None: + result[column.name] = value.isoformat() + "Z" + else: + result[column.name] = value.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") else: result[column.name] = value return result diff --git a/gatehouse_app/models/ca.py b/gatehouse_app/models/ca.py index 8a6271f..bfcef44 100644 --- a/gatehouse_app/models/ca.py +++ b/gatehouse_app/models/ca.py @@ -20,6 +20,13 @@ class CertType(str, Enum): HOST = "host" +class CaType(str, Enum): + """CA signing type — whether this CA signs user or host certificates.""" + + USER = "user" + HOST = "host" + + class CA(BaseModel): """Certificate Authority (CA) model for SSH certificate signing. @@ -40,7 +47,14 @@ class CA(BaseModel): # CA name and description name = db.Column(db.String(255), nullable=False) description = db.Column(db.Text, nullable=True) - + + # CA signing type: 'user' signs user certificates, 'host' signs host certificates + ca_type = db.Column( + db.Enum(CaType, values_callable=lambda x: [e.value for e in x]), + default=CaType.USER, + nullable=False, + ) + # Key type (ED25519, RSA, ECDSA) key_type = db.Column( db.Enum(KeyType, values_callable=lambda x: [e.value for e in x]), @@ -91,6 +105,11 @@ class CA(BaseModel): back_populates="ca", cascade="all, delete-orphan", ) + permissions = db.relationship( + "CAPermission", + back_populates="ca", + cascade="all, delete-orphan", + ) __table_args__ = ( db.UniqueConstraint( @@ -153,3 +172,49 @@ class CA(BaseModel): self.rotated_at = datetime.utcnow() self.rotation_reason = reason self.save() + + +class CAPermission(BaseModel): + """Per-user CA permission model. + + Controls which users are allowed to sign certificates against a specific CA. + When a CA has any permission rows the signing endpoint enforces the list; + CAs with no rows are open to all org members (backwards-compatible default). + + Permission values: + sign – user may request certificate signing + admin – user may sign AND manage the CA (rotate keys, delete, etc.) + """ + + __tablename__ = "ca_permissions" + + ca_id = db.Column( + db.String(36), + db.ForeignKey("cas.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + user_id = db.Column( + db.String(36), + db.ForeignKey("users.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + + permission = db.Column(db.String(50), nullable=False, default="sign") + + # Relationships + ca = db.relationship("CA", back_populates="permissions") + user = db.relationship("User", back_populates="ca_permissions") + + __table_args__ = ( + db.UniqueConstraint("ca_id", "user_id", name="uix_ca_permission"), + ) + + def __repr__(self): + return f"" + + def to_dict(self, exclude=None): + data = super().to_dict(exclude=exclude or []) + data["permission"] = self.permission + return data diff --git a/gatehouse_app/models/ssh_certificate.py b/gatehouse_app/models/ssh_certificate.py index b81e98b..58226e9 100644 --- a/gatehouse_app/models/ssh_certificate.py +++ b/gatehouse_app/models/ssh_certificate.py @@ -1,6 +1,6 @@ """SSH Certificate model.""" from enum import Enum -from datetime import datetime +from datetime import datetime, timezone from gatehouse_app.extensions import db from gatehouse_app.models.base import BaseModel from gatehouse_app.models.ca import CertType @@ -137,8 +137,10 @@ class SSHCertificate(BaseModel): if self.revoked or self.status == CertificateStatus.REVOKED: return False - now = datetime.utcnow() - return self.valid_after <= now <= self.valid_before + now = datetime.now(timezone.utc) + valid_after = self.valid_after.replace(tzinfo=timezone.utc) if self.valid_after.tzinfo is None else self.valid_after + valid_before = self.valid_before.replace(tzinfo=timezone.utc) if self.valid_before.tzinfo is None else self.valid_before + return valid_after <= now <= valid_before def is_expired(self): """Check if certificate has expired. @@ -146,7 +148,9 @@ class SSHCertificate(BaseModel): Returns: True if current time is past valid_before """ - return datetime.utcnow() > self.valid_before + now = datetime.now(timezone.utc) + valid_before = self.valid_before.replace(tzinfo=timezone.utc) if self.valid_before.tzinfo is None else self.valid_before + return now > valid_before def days_until_expiry(self): """Get number of days until certificate expires. @@ -154,7 +158,9 @@ class SSHCertificate(BaseModel): Returns: Number of days remaining (negative if already expired) """ - delta = self.valid_before - datetime.utcnow() + now = datetime.now(timezone.utc) + valid_before = self.valid_before.replace(tzinfo=timezone.utc) if self.valid_before.tzinfo is None else self.valid_before + delta = valid_before - now return delta.days + (1 if delta.seconds > 0 else 0) def revoke(self, reason=None): diff --git a/gatehouse_app/services/ssh_ca_signing_service.py b/gatehouse_app/services/ssh_ca_signing_service.py index 574c4bf..3f33b00 100644 --- a/gatehouse_app/services/ssh_ca_signing_service.py +++ b/gatehouse_app/services/ssh_ca_signing_service.py @@ -5,7 +5,7 @@ This service is a Gatehouse-integrated version of the secuird/ssh_ca.py logic. """ import logging import os -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import List, Dict, Any, Optional from sshkey_tools.cert import SSHCertificate, CertificateFields @@ -240,7 +240,7 @@ class SSHCASigningService: ) # Set validity period - now = datetime.utcnow() + now = datetime.now(timezone.utc) expiry_hours = signing_request.expiry_hours or self.config.get_int('cert_validity_hours') valid_before = now + timedelta(hours=expiry_hours) diff --git a/migrations/versions/013_add_ca_type.py b/migrations/versions/013_add_ca_type.py new file mode 100644 index 0000000..1df4bc2 --- /dev/null +++ b/migrations/versions/013_add_ca_type.py @@ -0,0 +1,42 @@ +"""Add ca_type column to cas table (user/host). + +Revision ID: 013 +Revises: d34bfb72844e +Create Date: 2026-02-28 23:00:00.000000 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '013' +down_revision = 'd34bfb72844e' +branch_labels = None +depends_on = None + + +def upgrade(): + # Create the enum type first (PostgreSQL requires this) + ca_type_enum = sa.Enum('user', 'host', name='ca_type_enum') + ca_type_enum.create(op.get_bind(), checkfirst=True) + + # Add ca_type column with a default of 'user' so existing CAs stay valid + op.add_column( + 'cas', + sa.Column( + 'ca_type', + ca_type_enum, + nullable=False, + server_default='user', + ), + ) + + +def downgrade(): + op.drop_column('cas', 'ca_type') + # Drop the enum type (PostgreSQL only; SQLite ignores) + try: + op.execute("DROP TYPE IF EXISTS ca_type_enum") + except Exception: + pass diff --git a/migrations/versions/d34bfb72844e_add_activation_fields_and_ca_permissions.py b/migrations/versions/d34bfb72844e_add_activation_fields_and_ca_permissions.py new file mode 100644 index 0000000..83d9f72 --- /dev/null +++ b/migrations/versions/d34bfb72844e_add_activation_fields_and_ca_permissions.py @@ -0,0 +1,50 @@ +"""add_activation_fields_and_ca_permissions + +Revision ID: d34bfb72844e +Revises: 012_ca_nullable_org +Create Date: 2026-02-28 18:06:47.328552 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = 'd34bfb72844e' +down_revision = '012_ca_nullable_org' +branch_labels = None +depends_on = None + + +def upgrade(): + # Create ca_permissions table + op.create_table( + 'ca_permissions', + sa.Column('ca_id', sa.String(length=36), nullable=False), + sa.Column('user_id', sa.String(length=36), nullable=False), + sa.Column('permission', sa.String(length=50), nullable=False), + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('updated_at', sa.DateTime(), nullable=False), + sa.Column('deleted_at', sa.DateTime(), nullable=True), + sa.ForeignKeyConstraint(['ca_id'], ['cas.id'], ondelete='CASCADE'), + sa.ForeignKeyConstraint(['user_id'], ['users.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('ca_id', 'user_id', name='uix_ca_permission'), + ) + op.create_index('ix_ca_permissions_ca_id', 'ca_permissions', ['ca_id'], unique=False) + op.create_index('ix_ca_permissions_user_id', 'ca_permissions', ['user_id'], unique=False) + + # Add activation columns to users + op.add_column('users', sa.Column('activated', sa.Boolean(), nullable=False, + server_default=sa.text('true'))) + op.add_column('users', sa.Column('activation_key', sa.String(length=128), nullable=True)) + op.create_index('ix_users_activation_key', 'users', ['activation_key'], unique=True) + + +def downgrade(): + op.drop_index('ix_users_activation_key', table_name='users') + op.drop_column('users', 'activation_key') + op.drop_column('users', 'activated') + op.drop_index('ix_ca_permissions_user_id', table_name='ca_permissions') + op.drop_index('ix_ca_permissions_ca_id', table_name='ca_permissions') + op.drop_table('ca_permissions')