Fix: DB Migration

This commit is contained in:
2026-03-23 17:51:55 +05:45
parent a7915c9328
commit 05eb092228
10 changed files with 1151 additions and 305 deletions
@@ -0,0 +1,101 @@
"""Convert ZeroTier table timestamp columns from TIMESTAMPTZ to TIMESTAMP.
Revision ID: 025_fix_zt_timestamps
Revises: 024_fix_zerotier_schema
Create Date: 2026-03-22
Migration 020_zerotier (and 023's fallback create_table) defined ZeroTier tables
with sa.DateTime(timezone=True), producing TIMESTAMP WITH TIME ZONE columns.
The rest of the codebase uses plain DateTime (timezone-naive TIMESTAMP WITHOUT
TIME ZONE). This migration aligns all ZeroTier table timestamp columns with the
existing codebase convention.
GUARDED: Each ALTER is only executed if the column is currently
TIMESTAMP WITH TIME ZONE. On a DB that has already been converted (e.g. dev),
the migration is a harmless no-op.
"""
from alembic import op
import sqlalchemy as sa
revision = "025_fix_zt_timestamps"
down_revision = "024_fix_zerotier_schema"
branch_labels = None
depends_on = None
# All ZeroTier tables that inherit BaseModel's created_at/updated_at/deleted_at
_ZT_BASE_TABLES = [
"portal_networks",
"devices",
"device_network_memberships",
"user_network_approvals",
"kill_switch_events",
"activation_sessions",
"zerotier_memberships",
]
# Additional datetime columns specific to individual models
_EXTRA_COLS = {
"activation_sessions": ["authenticated_at", "expires_at", "ended_at"],
"zerotier_memberships": ["join_seen_at", "last_synced_at"],
}
def _col_is_timestamptz(conn, table: str, column: str) -> bool:
"""Return True if the column is TIMESTAMP WITH TIME ZONE."""
row = conn.execute(sa.text(
"SELECT data_type FROM information_schema.columns "
"WHERE table_name = :t AND column_name = :c"
), {"t": table, "c": column}).first()
return row is not None and row[0] == "timestamp with time zone"
def _col_is_timestamp(conn, table: str, column: str) -> bool:
"""Return True if the column is TIMESTAMP WITHOUT TIME ZONE."""
row = conn.execute(sa.text(
"SELECT data_type FROM information_schema.columns "
"WHERE table_name = :t AND column_name = :c"
), {"t": table, "c": column}).first()
return row is not None and row[0] == "timestamp without time zone"
def upgrade():
conn = op.get_bind()
for tbl in _ZT_BASE_TABLES:
for col in ("created_at", "updated_at", "deleted_at"):
if _col_is_timestamptz(conn, tbl, col):
conn.execute(sa.text(
f'ALTER TABLE "{tbl}" ALTER COLUMN "{col}" '
f'TYPE TIMESTAMP WITHOUT TIME ZONE '
f'USING "{col}" AT TIME ZONE \'UTC\''
))
for col in _EXTRA_COLS.get(tbl, []):
if _col_is_timestamptz(conn, tbl, col):
conn.execute(sa.text(
f'ALTER TABLE "{tbl}" ALTER COLUMN "{col}" '
f'TYPE TIMESTAMP WITHOUT TIME ZONE '
f'USING CASE WHEN "{col}" IS NULL THEN NULL '
f'ELSE "{col}" AT TIME ZONE \'UTC\' END'
))
def downgrade():
conn = op.get_bind()
for tbl in _ZT_BASE_TABLES:
for col in ("created_at", "updated_at", "deleted_at"):
if _col_is_timestamp(conn, tbl, col):
conn.execute(sa.text(
f'ALTER TABLE "{tbl}" ALTER COLUMN "{col}" '
f'TYPE TIMESTAMP WITH TIME ZONE '
f'USING "{col}" AT TIME ZONE \'UTC\''
))
for col in _EXTRA_COLS.get(tbl, []):
if _col_is_timestamp(conn, tbl, col):
conn.execute(sa.text(
f'ALTER TABLE "{tbl}" ALTER COLUMN "{col}" '
f'TYPE TIMESTAMP WITH TIME ZONE '
f'USING CASE WHEN "{col}" IS NULL THEN NULL '
f'ELSE "{col}" AT TIME ZONE \'UTC\' END'
))