Feat: Implemented SUDO Department & API Key, CA Serial

This commit is contained in:
2026-03-08 18:10:26 +05:45
parent ff976ee1cc
commit f334000da3
16 changed files with 911 additions and 5 deletions
+2 -1
View File
@@ -5,6 +5,7 @@ from flask import Blueprint
api_v1_bp = Blueprint("api_v1", __name__)
# Import route modules to register them
from gatehouse_app.api.v1 import auth, users, organizations, policies, external_auth, departments, principals, ssh, zerotier
from gatehouse_app.api.v1 import auth, users, organizations, policies, external_auth, departments, principals, ssh, zerotier, sudo
api_v1_bp.register_blueprint(ssh.ssh_bp)
+4
View File
@@ -16,12 +16,15 @@ class DepartmentCreateSchema(Schema):
"""Schema for creating a department."""
name = fields.Str(required=True, validate=validate.Length(min=1, max=255))
description = fields.Str(allow_none=True, validate=validate.Length(max=2000))
can_sudo = fields.Bool(allow_none=True, load_default=False)
class DepartmentUpdateSchema(Schema):
"""Schema for updating a department."""
name = fields.Str(validate=validate.Length(min=1, max=255))
description = fields.Str(allow_none=True, validate=validate.Length(max=2000))
can_sudo = fields.Bool(allow_none=True)
class AddDepartmentMemberSchema(Schema):
@@ -119,6 +122,7 @@ def create_department(org_id):
organization_id=org_id,
name=data["name"],
description=data.get("description"),
can_sudo=data.get("can_sudo", False),
)
db.session.add(dept)
db.session.commit()
@@ -1,4 +1,4 @@
"""Organization routes package."""
from gatehouse_app.api.v1.organizations import core, members, invites, clients, cas, audit, roles
from gatehouse_app.api.v1.organizations import core, members, invites, clients, cas, audit, roles, api_keys
__all__ = ["core", "members", "invites", "clients", "cas", "audit", "roles"]
__all__ = ["core", "members", "invites", "clients", "cas", "audit", "roles", "api_keys"]
@@ -0,0 +1,299 @@
"""Organization API Key management endpoints."""
from flask import g, request
from marshmallow import Schema, fields, validate, ValidationError
from gatehouse_app.api.v1 import api_v1_bp
from gatehouse_app.utils.response import api_response
from gatehouse_app.utils.decorators import login_required, require_admin, full_access_required
from gatehouse_app.models.organization import OrganizationApiKey
from gatehouse_app.services.organization_service import OrganizationService
from gatehouse_app.extensions import db
class ApiKeyCreateSchema(Schema):
"""Schema for creating an API key."""
name = fields.Str(required=True, validate=validate.Length(min=1, max=255))
description = fields.Str(allow_none=True, validate=validate.Length(max=2000))
class ApiKeyUpdateSchema(Schema):
"""Schema for updating an API key."""
name = fields.Str(validate=validate.Length(min=1, max=255))
description = fields.Str(allow_none=True, validate=validate.Length(max=2000))
@api_v1_bp.route("/organizations/<org_id>/api-keys", methods=["GET"])
@login_required
@require_admin
@full_access_required
def list_api_keys(org_id):
"""
List all API keys for an organization.
Only accessible by organization admins.
Args:
org_id: Organization ID
Returns:
200: List of API keys (without key values)
401: Not authenticated
403: Not an admin
404: Organization not found
"""
org = OrganizationService.get_organization_by_id(org_id)
# Check if user is an admin
from gatehouse_app.models.organization.organization_member import OrganizationMember
from gatehouse_app.utils.constants import OrganizationRole
membership = OrganizationMember.query.filter_by(
user_id=g.current_user.id,
organization_id=org_id,
deleted_at=None
).first()
if not membership or membership.role not in [OrganizationRole.OWNER, OrganizationRole.ADMIN]:
return api_response(
success=False,
message="You do not have permission to manage API keys",
status=403,
error_type="AUTHORIZATION_ERROR",
)
api_keys = OrganizationApiKey.query.filter_by(
organization_id=org_id,
deleted_at=None
).all()
return api_response(
data={
"api_keys": [k.to_dict() for k in api_keys],
"count": len(api_keys),
},
message="API keys retrieved successfully",
)
@api_v1_bp.route("/organizations/<org_id>/api-keys", methods=["POST"])
@login_required
@require_admin
@full_access_required
def create_api_key(org_id):
"""
Create a new API key for an organization.
Only accessible by organization admins.
The plain text key is returned only on creation and should be stored securely.
Args:
org_id: Organization ID
Request body:
name: API key name (required)
description: Optional description
Returns:
201: API key created successfully
400: Validation error
401: Not authenticated
403: Not an admin
404: Organization not found
"""
try:
org = OrganizationService.get_organization_by_id(org_id)
# Check if user is an admin
from gatehouse_app.models.organization.organization_member import OrganizationMember
from gatehouse_app.utils.constants import OrganizationRole
membership = OrganizationMember.query.filter_by(
user_id=g.current_user.id,
organization_id=org_id,
deleted_at=None
).first()
if not membership or membership.role not in [OrganizationRole.OWNER, OrganizationRole.ADMIN]:
return api_response(
success=False,
message="You do not have permission to create API keys",
status=403,
error_type="AUTHORIZATION_ERROR",
)
schema = ApiKeyCreateSchema()
data = schema.load(request.json or {})
# Create the API key
api_key, plain_key = OrganizationApiKey.create_key(
organization_id=org_id,
name=data["name"],
description=data.get("description"),
)
# Return the key data with the plain text key (only on creation)
key_dict = api_key.to_dict()
key_dict["key"] = plain_key # Include plain text only on creation
return api_response(
data={"api_key": key_dict},
message="API key created successfully. Store the key value securely - it cannot be retrieved later.",
status=201,
)
except ValidationError as e:
return api_response(
success=False,
message="Validation failed",
status=400,
error_type="VALIDATION_ERROR",
error_details=e.messages,
)
@api_v1_bp.route("/organizations/<org_id>/api-keys/<key_id>", methods=["PATCH"])
@login_required
@require_admin
@full_access_required
def update_api_key(org_id, key_id):
"""
Update an API key.
Only accessible by organization admins.
Args:
org_id: Organization ID
key_id: API Key ID
Request body:
name: New name (optional)
description: New description (optional)
Returns:
200: API key updated successfully
400: Validation error
401: Not authenticated
403: Not an admin
404: Organization or API key not found
"""
try:
org = OrganizationService.get_organization_by_id(org_id)
# Check if user is an admin
from gatehouse_app.models.organization.organization_member import OrganizationMember
from gatehouse_app.utils.constants import OrganizationRole
membership = OrganizationMember.query.filter_by(
user_id=g.current_user.id,
organization_id=org_id,
deleted_at=None
).first()
if not membership or membership.role not in [OrganizationRole.OWNER, OrganizationRole.ADMIN]:
return api_response(
success=False,
message="You do not have permission to update API keys",
status=403,
error_type="AUTHORIZATION_ERROR",
)
api_key = OrganizationApiKey.query.filter_by(
id=key_id,
organization_id=org_id,
deleted_at=None
).first()
if not api_key:
return api_response(
success=False,
message="API key not found",
status=404,
error_type="NOT_FOUND",
)
schema = ApiKeyUpdateSchema()
data = schema.load(request.json or {})
# Update fields
if "name" in data:
api_key.name = data["name"]
if "description" in data:
api_key.description = data["description"]
api_key.save()
return api_response(
data={"api_key": api_key.to_dict()},
message="API key updated successfully",
)
except ValidationError as e:
return api_response(
success=False,
message="Validation failed",
status=400,
error_type="VALIDATION_ERROR",
error_details=e.messages,
)
@api_v1_bp.route("/organizations/<org_id>/api-keys/<key_id>", methods=["DELETE"])
@login_required
@require_admin
@full_access_required
def delete_api_key(org_id, key_id):
"""
Delete/revoke an API key.
Only accessible by organization admins.
Args:
org_id: Organization ID
key_id: API Key ID
Returns:
200: API key deleted successfully
401: Not authenticated
403: Not an admin
404: Organization or API key not found
"""
org = OrganizationService.get_organization_by_id(org_id)
# Check if user is an admin
from gatehouse_app.models.organization.organization_member import OrganizationMember
from gatehouse_app.utils.constants import OrganizationRole
membership = OrganizationMember.query.filter_by(
user_id=g.current_user.id,
organization_id=org_id,
deleted_at=None
).first()
if not membership or membership.role not in [OrganizationRole.OWNER, OrganizationRole.ADMIN]:
return api_response(
success=False,
message="You do not have permission to delete API keys",
status=403,
error_type="AUTHORIZATION_ERROR",
)
api_key = OrganizationApiKey.query.filter_by(
id=key_id,
organization_id=org_id,
deleted_at=None
).first()
if not api_key:
return api_response(
success=False,
message="API key not found",
status=404,
error_type="NOT_FOUND",
)
# Soft delete the API key
api_key.delete(soft=True)
return api_response(
message="API key deleted successfully",
)
@@ -173,3 +173,101 @@ def get_my_audit_logs():
},
message="Activity retrieved",
)
@api_v1_bp.route("/organizations/<org_id>/certificates/audit", methods=["GET"])
@login_required
@require_admin
@full_access_required
def get_certificate_audit_logs(org_id):
"""
Get certificate issuance audit logs for an organization.
Only accessible by organization admins.
Returns certificate serial IDs, user IDs, and issuance timestamps for compliance.
Args:
org_id: Organization ID
Returns:
200: List of certificate audit logs
401: Not authenticated
403: Not an admin
404: Organization not found
"""
from gatehouse_app.models.ssh_ca.certificate_audit_log import CertificateAuditLog
from gatehouse_app.models.ssh_ca.ssh_certificate import SSHCertificate
from gatehouse_app.models.user import User
org = OrganizationService.get_organization_by_id(org_id)
page = max(1, int(request.args.get("page", 1)))
per_page = min(int(request.args.get("per_page", 50)), 200)
action_filter = request.args.get("action", "signed") # Default to signed certificates
# Get all CAs for this organization
from gatehouse_app.models.ssh_ca import CA
org_cas = CA.query.filter_by(organization_id=org_id, deleted_at=None).all()
org_ca_ids = [ca.id for ca in org_cas]
if not org_ca_ids:
return api_response(
data={
"audit_logs": [],
"count": 0,
"page": page,
"per_page": per_page,
"pages": 0,
},
message="No certificate audit logs found",
)
# Query certificate audit logs for certificates issued by org's CAs
query = CertificateAuditLog.query.join(
SSHCertificate,
CertificateAuditLog.certificate_id == SSHCertificate.id
).filter(
SSHCertificate.ca_id.in_(org_ca_ids),
CertificateAuditLog.deleted_at.is_(None)
)
if action_filter:
query = query.filter(CertificateAuditLog.action == action_filter)
query = query.order_by(CertificateAuditLog.created_at.desc())
total = query.count()
logs = query.offset((page - 1) * per_page).limit(per_page).all()
# Build response data with certificate details
audit_data = []
for log in logs:
cert = log.certificate
user = log.user
audit_data.append({
"id": log.id,
"action": log.action,
"certificate_serial": cert.serial,
"key_id": cert.key_id,
"principals": cert.principals,
"user_id": user.id if user else cert.user_id,
"user_email": user.email if user else None,
"issued_at": cert.created_at.isoformat() if cert.created_at else None,
"valid_after": cert.valid_after.isoformat() if cert.valid_after else None,
"valid_before": cert.valid_before.isoformat() if cert.valid_before else None,
"ip_address": log.ip_address,
"user_agent": log.user_agent,
"message": log.message,
"success": log.success,
"created_at": log.created_at.isoformat() if log.created_at else None,
})
return api_response(
data={
"audit_logs": audit_data,
"count": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page,
},
message="Certificate audit logs retrieved successfully",
)
+137
View File
@@ -0,0 +1,137 @@
"""Sudoer check and sudo-related endpoints."""
from flask import request
from gatehouse_app.api.v1 import api_v1_bp
from gatehouse_app.utils.response import api_response
from gatehouse_app.models.organization import OrganizationApiKey
from gatehouse_app.models.ssh_ca.ssh_certificate import SSHCertificate
from gatehouse_app.models.organization import Department, DepartmentMembership
@api_v1_bp.route("/sudo/check", methods=["POST"])
def check_sudoer():
"""
Check if a user with a given certificate can sudo.
This endpoint validates an API key for an organization, retrieves the certificate
by serial ID, finds the user and their departments, and checks if any of their
departments have sudo capability.
Request body:
api_key: Organization API key (required)
certificate_serial: Certificate serial ID (required)
Returns:
200: Sudoer status returned
400: Invalid request body
401: Invalid API key
403: Certificate not found or user not found
404: Organization or certificate not found
"""
try:
data = request.get_json()
if not data:
return api_response(
success=False,
message="Request body is required",
status=400,
error_type="INVALID_REQUEST",
)
api_key = data.get("api_key")
certificate_serial = data.get("certificate_serial")
if not api_key or certificate_serial is None:
return api_response(
success=False,
message="api_key and certificate_serial are required",
status=400,
error_type="MISSING_REQUIRED_FIELDS",
)
# Find the certificate by serial
certificate = SSHCertificate.query.filter_by(
serial=certificate_serial,
deleted_at=None
).first()
if not certificate:
return api_response(
success=False,
message="Certificate not found",
status=404,
error_type="NOT_FOUND",
)
# Get the CA and organization
ca = certificate.ca
if not ca:
return api_response(
success=False,
message="Certificate CA not found",
status=404,
error_type="NOT_FOUND",
)
org_id = ca.organization_id
# Verify the API key for this organization
org_api_key = OrganizationApiKey.verify_key(org_id, api_key)
if not org_api_key:
return api_response(
success=False,
message="Invalid API key for organization",
status=401,
error_type="UNAUTHORIZED",
)
# Get the user from the certificate
user = certificate.user
if not user:
return api_response(
success=False,
message="Certificate user not found",
status=404,
error_type="NOT_FOUND",
)
# Get all departments the user belongs to
user_departments = DepartmentMembership.query.filter_by(
user_id=user.id,
deleted_at=None
).all()
# Check if any of the user's departments have sudo capability
can_sudo = False
sudoer_departments = []
for dept_membership in user_departments:
dept = dept_membership.department
if dept and dept.can_sudo and dept.deleted_at is None:
can_sudo = True
sudoer_departments.append({
"id": dept.id,
"name": dept.name,
})
return api_response(
data={
"can_sudo": can_sudo,
"user_id": user.id,
"user_email": user.email,
"certificate_serial": certificate.serial,
"sudoer_departments": sudoer_departments,
"all_departments_count": len(user_departments),
},
message="Sudoer status retrieved successfully",
status=200,
)
except Exception as e:
return api_response(
success=False,
message=f"An error occurred: {str(e)}",
status=500,
error_type="INTERNAL_ERROR",
)
@@ -12,6 +12,7 @@ from gatehouse_app.models.organization.department_cert_policy import (
)
from gatehouse_app.models.organization.principal import Principal, PrincipalMembership
from gatehouse_app.models.organization.org_invite_token import OrgInviteToken
from gatehouse_app.models.organization.organization_api_key import OrganizationApiKey
__all__ = [
"Organization",
@@ -24,4 +25,5 @@ __all__ = [
"Principal",
"PrincipalMembership",
"OrgInviteToken",
"OrganizationApiKey",
]
@@ -27,6 +27,7 @@ class Department(BaseModel):
)
name = db.Column(db.String(255), nullable=False, index=True)
description = db.Column(db.Text, nullable=True)
can_sudo = db.Column(db.Boolean, default=False, nullable=False)
# Relationships
organization = db.relationship("Organization", back_populates="departments")
@@ -4,12 +4,13 @@ from gatehouse_app.extensions import db
from gatehouse_app.models.base import BaseModel
# Standard SSH certificate extensions
# Standard SSH certificate extensions — must be in strict lexical order
# (OpenSSH RFC 4251 §5 / golang.org/x/crypto/ssh requires lexical ordering)
STANDARD_EXTENSIONS = [
"permit-X11-forwarding",
"permit-agent-forwarding",
"permit-pty",
"permit-port-forwarding",
"permit-pty",
"permit-user-rc",
]
@@ -43,6 +43,9 @@ class Organization(BaseModel):
cas = db.relationship(
"CA", back_populates="organization", cascade="all, delete-orphan"
)
api_keys = db.relationship(
"OrganizationApiKey", back_populates="organization", cascade="all, delete-orphan"
)
def __repr__(self):
"""String representation of Organization."""
@@ -0,0 +1,158 @@
"""Organization API Key model — API keys for organizations for external integrations."""
import secrets
from datetime import datetime, timezone
from gatehouse_app.extensions import db
from gatehouse_app.models.base import BaseModel
class OrganizationApiKey(BaseModel):
"""API Key model representing an API key for an organization.
API keys are used to authenticate external integrations or services
that need programmatic access to the organization's resources.
Each key is tied to an organization and can be revoked/deleted as needed.
"""
__tablename__ = "organization_api_keys"
organization_id = db.Column(
db.String(36),
db.ForeignKey("organizations.id"),
nullable=False,
index=True,
)
# Human-readable name for the API key
name = db.Column(db.String(255), nullable=False)
# Hashed key value (never store plain text)
key_hash = db.Column(db.String(255), nullable=False, unique=True, index=True)
# Last used timestamp for tracking activity
last_used_at = db.Column(db.DateTime, nullable=True)
# Revocation status
is_revoked = db.Column(db.Boolean, default=False, nullable=False, index=True)
revoked_at = db.Column(db.DateTime, nullable=True)
revoke_reason = db.Column(db.String(255), nullable=True)
# Description/purpose of the key
description = db.Column(db.Text, nullable=True)
# Relationships
organization = db.relationship("Organization", back_populates="api_keys")
__table_args__ = (
db.Index("idx_org_api_key_org_active", "organization_id", "is_revoked"),
db.Index("idx_api_key_last_used", "last_used_at"),
)
def __repr__(self):
"""String representation of OrganizationApiKey."""
return f"<OrganizationApiKey name={self.name} org_id={self.organization_id}>"
@staticmethod
def generate_key() -> str:
"""Generate a random API key.
Returns:
A random 32-byte hex string suitable for use as an API key
"""
return secrets.token_hex(32)
@classmethod
def create_key(
cls,
organization_id: str,
name: str,
description: str = None,
) -> tuple:
"""Create and store a new API key for an organization.
Args:
organization_id: ID of the organization
name: Human-readable name for the key
description: Optional description/purpose of the key
Returns:
Tuple of (OrganizationApiKey instance, plain_text_key_string)
The plain text key is only returned on creation and should be
stored securely by the user. It cannot be retrieved later.
"""
# Generate a plain text key
plain_key = cls.generate_key()
# Hash it using the key_hash method
key_hash = cls.hash_key(plain_key)
# Create the database record
api_key = cls(
organization_id=organization_id,
name=name,
key_hash=key_hash,
description=description,
)
api_key.save()
return api_key, plain_key
@staticmethod
def hash_key(plain_key: str) -> str:
"""Hash an API key for storage.
Args:
plain_key: The plain text API key
Returns:
Hashed version of the key
"""
import hashlib
return hashlib.sha256(plain_key.encode()).hexdigest()
@classmethod
def verify_key(cls, organization_id: str, plain_key: str) -> "OrganizationApiKey":
"""Verify an API key for an organization.
Args:
organization_id: ID of the organization
plain_key: The plain text API key to verify
Returns:
OrganizationApiKey instance if valid and active, None otherwise
"""
key_hash = cls.hash_key(plain_key)
api_key = cls.query.filter_by(
organization_id=organization_id,
key_hash=key_hash,
is_revoked=False,
deleted_at=None,
).first()
if api_key:
# Update last used timestamp
api_key.last_used_at = datetime.now(timezone.utc)
api_key.save()
return api_key
def revoke(self, reason: str = None) -> None:
"""Revoke this API key.
Args:
reason: Optional reason for revocation
"""
self.is_revoked = True
self.revoked_at = datetime.now(timezone.utc)
self.revoke_reason = reason
self.save()
def to_dict(self, exclude=None):
"""Convert API key to dictionary.
The key_hash is excluded by default for security.
"""
exclude = exclude or []
if "key_hash" not in exclude:
exclude.append("key_hash")
return super().to_dict(exclude=exclude)
@@ -288,6 +288,12 @@ class SSHCASigningService:
else:
extensions = [] # host certs: no extensions
# OpenSSH (RFC 4251 §5) and golang.org/x/crypto/ssh require
# certificate extensions to be in strict lexical (alphabetical) order.
# Sort unconditionally so any caller-supplied or policy-derived list
# is guaranteed to be compliant.
extensions = sorted(extensions)
certificate.fields.extensions = extensions
certificate.fields.critical_options = signing_request.critical_options or {}
@@ -0,0 +1,76 @@
"""Seed CA serial counters with a timestamp-based starting value.
Revision ID: 020_ca_serial_timestamp_start
Revises: 019_audit_varchar, d34bfb72844e
Create Date: 2026-03-06
WHY
---
``next_serial_number`` was originally seeded at ``1`` for every CA
(``server_default="1"`` in migration 017). Because the
``ix_ssh_certificates_serial`` index enforces a globally-unique constraint on
the serial column, any two CAs issuing their first certificate would both try
to insert serial ``1``, causing a UniqueViolation.
FIX new CAs
-------------
The CA model's Python-side ``default`` is now ``_serial_start()``, which
returns ``int(time.time() * 1000)`` (Unix milliseconds) at row-creation time.
CAs created after this migration will start their serial counter at the
millisecond they were first inserted, so serials are globally unique across
CAs and still monotonically increasing within each CA.
FIX existing CAs
-------------------
This migration performs a data migration: any CA whose ``next_serial_number``
is still ``<= 2`` (i.e. has issued at most one certificate since the original
``1``-based default) is given a new timestamp-based starting value.
CAs that have already issued many certificates keep their current counter
unchanged their serials are already beyond the low collision-prone range.
NOTE: the ``server_default`` on the column is intentionally NOT changed here
because SQLAlchemy uses the Python-side ``default=_serial_start`` callable for
new rows; the ``server_default`` is only a database-level fallback that is
never hit when rows are inserted via the ORM.
"""
import time
from alembic import op
import sqlalchemy as sa
revision = "020_ca_serial_timestamp_start"
down_revision = ("3de11c5dc2d5", "d34bfb72844e")
branch_labels = None
depends_on = None
def _now_ms() -> int:
return int(time.time() * 1000)
def upgrade():
conn = op.get_bind()
# Update ALL CAs to a timestamp-based starting serial — not just those
# stuck at 1. Any CA with a serial below the current ms timestamp is in
# the low collision-prone range (serials 1N where N is tiny). Resetting
# every CA to a fresh ms timestamp is safe: the counter only moves forward
# from here, and no existing certificate serial is changed.
rows = conn.execute(
sa.text("SELECT id FROM cas")
).fetchall()
for (ca_id,) in rows:
new_start = _now_ms()
conn.execute(
sa.text(
"UPDATE cas SET next_serial_number = :val WHERE id = :id"
),
{"val": new_start, "id": ca_id},
)
def downgrade():
# There is no safe downgrade for a data migration that assigns new serial
# starting points — resetting to 1 would recreate the collision risk.
pass
@@ -0,0 +1,30 @@
"""add_cert_token_to_ssh_certificates
Revision ID: 3de11c5dc2d5
Revises: 019_audit_varchar
Create Date: 2026-03-06 16:04:33.561099
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3de11c5dc2d5'
down_revision = '019_audit_varchar'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('ssh_certificates', sa.Column('cert_token', sa.String(length=64), nullable=True))
op.create_index(op.f('ix_ssh_certificates_cert_token'), 'ssh_certificates', ['cert_token'], unique=True)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_ssh_certificates_cert_token'), table_name='ssh_certificates')
op.drop_column('ssh_certificates', 'cert_token')
# ### end Alembic commands ###
@@ -0,0 +1,34 @@
"""Add can_sudo column to departments table.
Revision ID: 002_add_can_sudo_to_departments
Revises: 001_add_org_api_keys
Create Date: 2026-03-07 23:40:30.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '002_add_can_sudo_to_departments'
down_revision = '001_add_org_api_keys'
branch_labels = None
depends_on = None
def upgrade():
# Add can_sudo column to departments table
op.add_column('departments',
sa.Column('can_sudo', sa.Boolean(), nullable=False, server_default='false'))
# Create index for performance
op.create_index('idx_dept_can_sudo', 'departments',
['organization_id', 'can_sudo'])
def downgrade():
# Drop index
op.drop_index('idx_dept_can_sudo', table_name='departments')
# Drop column
op.drop_column('departments', 'can_sudo')
@@ -0,0 +1,56 @@
"""Add organization_api_keys table for API key management.
Revision ID: 001_add_org_api_keys
Revises: 3de11c5dc2d5
Create Date: 2026-03-07 23:40:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '001_add_org_api_keys'
down_revision = '3de11c5dc2d5'
branch_labels = None
depends_on = None
def upgrade():
# Create organization_api_keys table
op.create_table(
'organization_api_keys',
sa.Column('id', sa.String(36), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('deleted_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('organization_id', sa.String(36), nullable=False),
sa.Column('name', sa.String(255), nullable=False),
sa.Column('key_hash', sa.String(255), nullable=False),
sa.Column('last_used_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('is_revoked', sa.Boolean(), nullable=False, server_default='false'),
sa.Column('revoked_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('revoke_reason', sa.String(255), nullable=True),
sa.Column('description', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['organization_id'], ['organizations.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('key_hash'),
)
# Create indexes for performance
op.create_index('idx_org_api_key_org_active', 'organization_api_keys',
['organization_id', 'is_revoked'])
op.create_index('idx_api_key_last_used', 'organization_api_keys',
['last_used_at'])
op.create_index('idx_org_api_key_org_id', 'organization_api_keys',
['organization_id'])
def downgrade():
# Drop indexes
op.drop_index('idx_org_api_key_org_id', table_name='organization_api_keys')
op.drop_index('idx_api_key_last_used', table_name='organization_api_keys')
op.drop_index('idx_org_api_key_org_active', table_name='organization_api_keys')
# Drop table
op.drop_table('organization_api_keys')