feat: add migration validation script and enhance migration status UI

This commit is contained in:
Christian 2026-03-25 22:49:33 +01:00
parent 205c0dab07
commit 9f563941e6
6 changed files with 760 additions and 19 deletions

View File

@ -107,6 +107,23 @@ if settings.ECONOMIC_READ_ONLY:
logger.warning("Read-only mode") logger.warning("Read-only mode")
``` ```
### Migration Validation
```bash
# Validate root migrations against current PostgreSQL schema
python scripts/validate_migrations.py
# Include module-specific migration directory in validation
python scripts/validate_migrations.py --module app/modules/sag/migrations
# Machine-readable report and strict index validation
python scripts/validate_migrations.py --json --strict-indexes
```
Exit codes:
- `0`: Schema is aligned, or only index differences were found without strict mode.
- `1`: Schema mismatches were found (missing tables/columns, or missing indexes with strict mode).
- `2`: Runtime error (for example connection/configuration issues).
## 🐳 Docker Commands ## 🐳 Docker Commands
```bash ```bash

View File

@ -282,7 +282,7 @@ class MissionService:
WHERE s.deleted_at IS NULL WHERE s.deleted_at IS NULL
AND LOWER(COALESCE(s.status, '')) <> 'afsluttet' AND LOWER(COALESCE(s.status, '')) <> 'afsluttet'
ORDER BY ORDER BY
CASE LOWER(COALESCE(s.priority, '')) CASE LOWER(COALESCE(s.priority::text, ''))
WHEN 'kritisk' THEN 5 WHEN 'kritisk' THEN 5
WHEN 'critical' THEN 5 WHEN 'critical' THEN 5
WHEN 'høj' THEN 4 WHEN 'høj' THEN 4

View File

@ -934,17 +934,43 @@
if (!AudioCtx) return; if (!AudioCtx) return;
const context = new AudioCtx(); const context = new AudioCtx();
const oscillator = context.createOscillator(); const now = context.currentTime;
const isAlert = type === 'uptime_down';
const baseFreq = isAlert ? 392 : 784;
const overtoneFreq = isAlert ? 523.25 : 1046.5;
const totalDuration = isAlert ? 0.9 : 0.65;
const strikeDelay = isAlert ? 0.2 : 0.14;
function strike(startAt, ampScale) {
const fundamental = context.createOscillator();
const overtone = context.createOscillator();
const gainNode = context.createGain(); const gainNode = context.createGain();
oscillator.type = 'sine'; fundamental.type = 'sine';
oscillator.frequency.value = type === 'uptime_down' ? 260 : 620; overtone.type = 'triangle';
gainNode.gain.value = gainValue * 0.2; fundamental.frequency.setValueAtTime(baseFreq, startAt);
overtone.frequency.setValueAtTime(overtoneFreq, startAt);
oscillator.connect(gainNode); gainNode.gain.setValueAtTime(0.0001, startAt);
gainNode.gain.exponentialRampToValueAtTime(Math.max(0.0002, gainValue * ampScale), startAt + 0.01);
gainNode.gain.exponentialRampToValueAtTime(0.0001, startAt + totalDuration);
fundamental.connect(gainNode);
overtone.connect(gainNode);
gainNode.connect(context.destination); gainNode.connect(context.destination);
oscillator.start();
oscillator.stop(context.currentTime + (type === 'uptime_down' ? 0.35 : 0.15)); fundamental.start(startAt);
overtone.start(startAt);
fundamental.stop(startAt + totalDuration);
overtone.stop(startAt + totalDuration);
}
strike(now, isAlert ? 0.22 : 0.18);
strike(now + strikeDelay, isAlert ? 0.16 : 0.12);
window.setTimeout(() => {
context.close().catch(() => {});
}, Math.ceil((totalDuration + strikeDelay + 0.1) * 1000));
} }
function activateView(viewKey) { function activateView(viewKey) {

View File

@ -4,6 +4,7 @@ Settings Frontend Views
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
import re
from fastapi import APIRouter, Request, HTTPException from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import HTMLResponse from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
@ -15,6 +16,183 @@ from app.core.database import get_db_connection, release_db_connection, execute_
router = APIRouter() router = APIRouter()
templates = Jinja2Templates(directory="app") templates = Jinja2Templates(directory="app")
CREATE_TABLE_RE = re.compile(
r"CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)\s*\(",
re.IGNORECASE,
)
ADD_COLUMN_RE = re.compile(
r"ALTER\s+TABLE\s+(?:IF\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)\s+ADD\s+COLUMN\s+(?:IF\s+NOT\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)",
re.IGNORECASE,
)
CREATE_INDEX_RE = re.compile(
r"CREATE\s+(?:UNIQUE\s+)?INDEX\s+(?:IF\s+NOT\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)\s+ON\s+([A-Za-z_][A-Za-z0-9_]*)",
re.IGNORECASE,
)
SKIP_COLUMN_LINE_RE = re.compile(
r"^(?:CONSTRAINT|PRIMARY\s+KEY|FOREIGN\s+KEY|UNIQUE|CHECK|CASE|WHEN|ELSE|END)\b",
re.IGNORECASE,
)
def _strip_sql_comments(sql: str) -> str:
sql = re.sub(r"/\*.*?\*/", "", sql, flags=re.DOTALL)
sql = re.sub(r"--[^\n]*", "", sql)
return sql
def _extract_create_table_block(sql: str, start_pos: int) -> str:
open_paren = sql.find("(", start_pos)
if open_paren == -1:
return ""
depth = 0
for idx in range(open_paren, len(sql)):
ch = sql[idx]
if ch == "(":
depth += 1
elif ch == ")":
depth -= 1
if depth == 0:
return sql[open_paren + 1:idx]
return ""
def _parse_columns_from_create_block(block: str) -> set[str]:
columns: set[str] = set()
known_types = {
"serial", "bigserial", "smallint", "integer", "bigint", "numeric", "decimal", "real", "double",
"varchar", "character", "text", "boolean", "bool", "date", "timestamp", "time", "json", "jsonb", "uuid"
}
for raw_line in block.splitlines():
line = raw_line.strip().rstrip(",")
if not line:
continue
if SKIP_COLUMN_LINE_RE.match(line):
continue
tokens = line.replace("(", " ").split()
if len(tokens) < 2:
continue
second = tokens[1].strip().lower()
second_base = re.sub(r"[^a-z]", "", second)
if second_base and second_base not in known_types:
continue
match = re.match(r"^\"?([A-Za-z_][A-Za-z0-9_]*)\"?\s+", line)
if match:
columns.add(match.group(1))
return columns
def _parse_migration_expectations(sql: str) -> tuple[set[str], set[tuple[str, str]], set[str]]:
expected_tables: set[str] = set()
expected_columns: set[tuple[str, str]] = set()
expected_indexes: set[str] = set()
clean_sql = _strip_sql_comments(sql)
for match in CREATE_TABLE_RE.finditer(clean_sql):
table_name = match.group(1)
expected_tables.add(table_name)
block = _extract_create_table_block(clean_sql, match.end() - 1)
for column_name in _parse_columns_from_create_block(block):
expected_columns.add((table_name, column_name))
for match in ADD_COLUMN_RE.finditer(clean_sql):
expected_columns.add((match.group(1), match.group(2)))
for match in CREATE_INDEX_RE.finditer(clean_sql):
expected_indexes.add(match.group(1))
return expected_tables, expected_columns, expected_indexes
def _get_actual_schema_snapshot(conn) -> tuple[set[str], set[tuple[str, str]], set[str]]:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
"""
)
tables = {row[0] for row in cursor.fetchall()}
cursor.execute(
"""
SELECT table_name, column_name
FROM information_schema.columns
WHERE table_schema = 'public'
"""
)
columns = {(row[0], row[1]) for row in cursor.fetchall()}
cursor.execute(
"""
SELECT indexname
FROM pg_indexes
WHERE schemaname = 'public'
"""
)
indexes = {row[0] for row in cursor.fetchall()}
return tables, columns, indexes
def _status_for_migration_file(
migration_sql: str,
actual_tables: set[str],
actual_columns: set[tuple[str, str]],
actual_indexes: set[str],
) -> dict:
expected_tables, expected_columns, expected_indexes = _parse_migration_expectations(migration_sql)
total_checks = len(expected_tables) + len(expected_columns) + len(expected_indexes)
if total_checks == 0:
return {
"status": "gray",
"label": "Grå",
"summary": "Ingen direkte schema-checks fundet i filen",
"missing_tables": [],
"missing_columns": [],
"missing_indexes": [],
}
missing_tables = sorted([tbl for tbl in expected_tables if tbl not in actual_tables])
missing_columns = sorted([f"{tbl}.{col}" for (tbl, col) in expected_columns if (tbl, col) not in actual_columns])
missing_indexes = sorted([idx for idx in expected_indexes if idx not in actual_indexes])
if not missing_tables and not missing_columns and not missing_indexes:
return {
"status": "green",
"label": "Grøn",
"summary": "Alle schema-elementer fra filen findes i databasen",
"missing_tables": [],
"missing_columns": [],
"missing_indexes": [],
}
parts = []
if missing_tables:
parts.append(f"tabeller: {len(missing_tables)}")
if missing_columns:
parts.append(f"kolonner: {len(missing_columns)}")
if missing_indexes:
parts.append(f"indexes: {len(missing_indexes)}")
return {
"status": "red",
"label": "Rød",
"summary": "Mangler " + ", ".join(parts),
"missing_tables": missing_tables,
"missing_columns": missing_columns,
"missing_indexes": missing_indexes,
}
@router.get("/settings", response_class=HTMLResponse, tags=["Frontend"]) @router.get("/settings", response_class=HTMLResponse, tags=["Frontend"])
async def settings_page(request: Request): async def settings_page(request: Request):
@ -73,6 +251,42 @@ class MigrationExecution(BaseModel):
file_name: str file_name: str
@router.get("/settings/migrations/status", tags=["Frontend"])
def migration_statuses():
"""Check migration files against current schema and return per-file color status."""
migrations_dir = Path(__file__).resolve().parents[3] / "migrations"
files = sorted(migrations_dir.glob("*.sql")) if migrations_dir.exists() else []
conn = get_db_connection()
try:
actual_tables, actual_columns, actual_indexes = _get_actual_schema_snapshot(conn)
statuses = []
for migration_file in files:
migration_sql = migration_file.read_text(encoding="utf-8")
status_info = _status_for_migration_file(
migration_sql,
actual_tables,
actual_columns,
actual_indexes,
)
statuses.append({
"name": migration_file.name,
**status_info,
})
return {"statuses": statuses}
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Status check failed: {exc}")
finally:
release_db_connection(conn)
@router.get("/api/v1/settings/migrations/status", tags=["Frontend"])
def migration_statuses_api_v1_alias():
"""API-prefixed alias for environments/routes expecting /api/v1 prefix."""
return migration_statuses()
@router.post("/settings/migrations/execute", tags=["Frontend"]) @router.post("/settings/migrations/execute", tags=["Frontend"])
def execute_migration(payload: MigrationExecution): def execute_migration(payload: MigrationExecution):
"""Execute a migration SQL file""" """Execute a migration SQL file"""

View File

@ -20,6 +20,11 @@
.command-actions .btn { .command-actions .btn {
min-width: 120px; min-width: 120px;
} }
.migration-status-badge {
min-width: 72px;
display: inline-block;
text-align: center;
}
</style> </style>
{% endblock %} {% endblock %}
@ -45,7 +50,12 @@
<div class="col-lg-8"> <div class="col-lg-8">
<div class="card shadow-sm border-0"> <div class="card shadow-sm border-0">
<div class="card-header bg-white"> <div class="card-header bg-white">
<div class="d-flex justify-content-between align-items-center">
<h6 class="mb-0 fw-bold"><i class="bi bi-database me-2"></i>Tilgængelige migrationer</h6> <h6 class="mb-0 fw-bold"><i class="bi bi-database me-2"></i>Tilgængelige migrationer</h6>
<button id="checkMigrationStatusBtn" class="btn btn-sm btn-outline-success" onclick="checkMigrationStatuses()">
<i class="bi bi-check2-circle me-1"></i>Tjek status
</button>
</div>
</div> </div>
<div class="card-body"> <div class="card-body">
{% if migrations and migrations|length > 0 %} {% if migrations and migrations|length > 0 %}
@ -54,6 +64,7 @@
<thead> <thead>
<tr> <tr>
<th>Fil</th> <th>Fil</th>
<th>Status</th>
<th>Størrelse</th> <th>Størrelse</th>
<th>Sidst ændret</th> <th>Sidst ændret</th>
<th class="text-end">Handling</th> <th class="text-end">Handling</th>
@ -65,6 +76,9 @@
<td> <td>
<strong>{{ migration.name }}</strong> <strong>{{ migration.name }}</strong>
</td> </td>
<td>
<span class="badge bg-secondary migration-status-badge" data-migration="{{ migration.name }}" title="Ikke tjekket endnu">Grå</span>
</td>
<td>{{ migration.size_kb }} KB</td> <td>{{ migration.size_kb }} KB</td>
<td>{{ migration.modified }}</td> <td>{{ migration.modified }}</td>
<td class="text-end d-flex gap-2 justify-content-end"> <td class="text-end d-flex gap-2 justify-content-end">
@ -159,22 +173,45 @@
async function runMigration(migrationName, button) { async function runMigration(migrationName, button) {
const feedback = document.getElementById('migrationFeedback'); const feedback = document.getElementById('migrationFeedback');
const url = '/settings/migrations/execute';
button.disabled = true; button.disabled = true;
feedback.className = 'alert alert-info mt-3'; feedback.className = 'alert alert-info mt-3';
feedback.textContent = 'Kører migration...'; feedback.textContent = 'Kører migration...';
feedback.classList.remove('d-none'); feedback.classList.remove('d-none');
try {
const urls = buildMigrationActionUrls('execute');
const attempts = [];
let data = null;
let lastError = null;
for (const url of urls) {
try { try {
const response = await fetch(url, { const response = await fetch(url, {
method: 'POST', method: 'POST',
credentials: 'include',
headers: { 'Content-Type': 'application/json' }, headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ file_name: migrationName }) body: JSON.stringify({ file_name: migrationName })
}); });
const data = await response.json(); const payload = await response.json().catch(() => ({}));
if (!response.ok) throw new Error(data.detail || data.message); attempts.push(`${url} -> ${response.status}`);
if (response.ok) {
data = payload;
break;
}
lastError = payload.detail || payload.message || `HTTP ${response.status}`;
} catch (err) {
attempts.push(`${url} -> ERR`);
lastError = err.message || 'Netvaerksfejl';
}
}
if (!data) {
throw new Error(`${lastError || 'Migration fejlede'} (forsøgt: ${attempts.join(' | ')})`);
}
feedback.className = 'alert alert-success mt-3'; feedback.className = 'alert alert-success mt-3';
feedback.innerHTML = `<strong>Migration kørt</strong><br><pre class="mb-0">${data.output}</pre>`; feedback.innerHTML = `<strong>Migration kørt</strong><br><pre class="mb-0">${data.output}</pre>`;
@ -185,5 +222,112 @@
button.disabled = false; button.disabled = false;
} }
} }
function getStatusBadge(migrationName) {
const badges = document.querySelectorAll('.migration-status-badge');
for (const badge of badges) {
if (badge.dataset.migration === migrationName) {
return badge;
}
}
return null;
}
function applyMigrationStatus(statusItem) {
const badge = getStatusBadge(statusItem.name);
if (!badge) return;
badge.classList.remove('bg-secondary', 'bg-success', 'bg-danger');
if (statusItem.status === 'green') {
badge.classList.add('bg-success');
badge.textContent = 'Grøn';
} else if (statusItem.status === 'red') {
badge.classList.add('bg-danger');
badge.textContent = 'Rød';
} else {
badge.classList.add('bg-secondary');
badge.textContent = 'Grå';
}
badge.title = statusItem.summary || 'Ingen detaljer';
}
function uniqueUrls(urls) {
const seen = new Set();
return urls.filter((url) => {
if (seen.has(url)) return false;
seen.add(url);
return true;
});
}
function buildMigrationActionUrls(action) {
const path = (window.location.pathname || '').replace(/\/+$/, '');
const dynamicBase = path.endsWith('/migrations') ? path : '/settings/migrations';
const candidates = [
`${dynamicBase}/${action}`,
`/settings/migrations/${action}`,
`/api/v1/settings/migrations/${action}`
];
if (dynamicBase.startsWith('/api/v1/')) {
candidates.unshift(`/api/v1/settings/migrations/${action}`);
}
return uniqueUrls(candidates);
}
async function checkMigrationStatuses() {
const button = document.getElementById('checkMigrationStatusBtn');
const feedback = document.getElementById('migrationFeedback');
button.disabled = true;
feedback.className = 'alert alert-info mt-3';
feedback.textContent = 'Tjekker migration status...';
feedback.classList.remove('d-none');
try {
const urls = buildMigrationActionUrls('status');
let data = null;
let lastError = null;
const attempts = [];
for (const url of urls) {
try {
const response = await fetch(url, { credentials: 'include' });
const payload = await response.json().catch(() => ({}));
attempts.push(`${url} -> ${response.status}`);
if (response.ok) {
data = payload;
break;
}
lastError = payload.detail || `HTTP ${response.status}`;
} catch (err) {
attempts.push(`${url} -> ERR`);
lastError = err.message || 'Netvaerksfejl';
}
}
if (!data) {
throw new Error(`${lastError || 'Status check fejlede'} (forsøgt: ${attempts.join(' | ')})`);
}
const statuses = data.statuses || [];
statuses.forEach(applyMigrationStatus);
const redCount = statuses.filter(item => item.status === 'red').length;
const greenCount = statuses.filter(item => item.status === 'green').length;
const grayCount = statuses.filter(item => item.status === 'gray').length;
feedback.className = redCount > 0 ? 'alert alert-warning mt-3' : 'alert alert-success mt-3';
feedback.textContent = `Status opdateret: ${greenCount} grøn, ${redCount} rød, ${grayCount} grå.`;
} catch (error) {
feedback.className = 'alert alert-danger mt-3';
feedback.textContent = `Fejl ved status check: ${error.message}`;
} finally {
button.disabled = false;
}
}
</script> </script>
{% endblock %} {% endblock %}

View File

@ -0,0 +1,340 @@
#!/usr/bin/env python3
import argparse
import json
import logging
import re
import sys
from pathlib import Path
import psycopg2
from psycopg2.extras import RealDictCursor
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from app.core.config import settings
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger(__name__)
NUMBERED_SQL_RE = re.compile(r"^\d+.*\.sql$")
CREATE_TABLE_RE = re.compile(
r"CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)\s*\(",
re.IGNORECASE,
)
ADD_COLUMN_RE = re.compile(
r"ALTER\s+TABLE\s+(?:IF\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)\s+ADD\s+COLUMN\s+(?:IF\s+NOT\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)",
re.IGNORECASE,
)
CREATE_INDEX_RE = re.compile(
r"CREATE\s+(?:UNIQUE\s+)?INDEX\s+(?:IF\s+NOT\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)\s+ON\s+([A-Za-z_][A-Za-z0-9_]*)",
re.IGNORECASE,
)
SKIP_COLUMN_LINE_RE = re.compile(
r"^(?:CONSTRAINT|PRIMARY\s+KEY|FOREIGN\s+KEY|UNIQUE|CHECK|CASE|WHEN|ELSE|END)\b",
re.IGNORECASE,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Validate BMC Hub SQL migrations against current PostgreSQL schema."
)
parser.add_argument(
"--json",
action="store_true",
help="Output report in JSON format.",
)
parser.add_argument(
"--strict-indexes",
action="store_true",
default=False,
help="Treat missing indexes as failure (default: False).",
)
parser.add_argument(
"--module",
action="append",
default=[],
help="Also parse numbered SQL files from a module migration directory, relative to repo root.",
)
return parser.parse_args()
def patch_database_url_for_local_dev() -> None:
if "@postgres" in settings.DATABASE_URL:
logger.info("Patching DATABASE_URL for local run")
settings.DATABASE_URL = settings.DATABASE_URL.replace("@postgres", "@localhost").replace(":5432", ":5433")
def collect_numbered_sql(directory: Path) -> list[Path]:
files = [p for p in directory.glob("*.sql") if NUMBERED_SQL_RE.match(p.name)]
def _sort_key(path: Path) -> tuple[int, str]:
match = re.match(r"^(\d+)", path.name)
prefix = int(match.group(1)) if match else 0
return (prefix, path.name)
files.sort(key=_sort_key)
return files
def build_file_list(args: argparse.Namespace) -> list[Path]:
files: list[Path] = []
root_migrations = ROOT / "migrations"
files.extend(collect_numbered_sql(root_migrations))
for module_dir in args.module:
path = (ROOT / module_dir).resolve()
if not path.exists() or not path.is_dir():
raise FileNotFoundError(f"Module migration directory not found: {module_dir}")
files.extend(collect_numbered_sql(path))
unique_files: list[Path] = []
seen: set[Path] = set()
for path in files:
if path not in seen:
unique_files.append(path)
seen.add(path)
return unique_files
def strip_sql_comments(sql: str) -> str:
sql = re.sub(r"/\*.*?\*/", "", sql, flags=re.DOTALL)
sql = re.sub(r"--[^\n]*", "", sql)
return sql
def extract_create_table_block(sql: str, table_name: str, start_pos: int) -> str:
"""Return create-table body between the first opening '(' and matching ')'."""
open_paren = sql.find("(", start_pos)
if open_paren == -1:
return ""
depth = 0
for i in range(open_paren, len(sql)):
ch = sql[i]
if ch == "(":
depth += 1
elif ch == ")":
depth -= 1
if depth == 0:
return sql[open_paren + 1:i]
return ""
def parse_columns_from_create_block(block: str) -> set[str]:
columns: set[str] = set()
known_types = {
"serial", "bigserial", "smallint", "integer", "bigint", "numeric", "decimal", "real", "double",
"varchar", "character", "text", "boolean", "bool", "date", "timestamp", "time", "json", "jsonb", "uuid"
}
for raw_line in block.splitlines():
line = raw_line.strip().rstrip(",")
if not line:
continue
if SKIP_COLUMN_LINE_RE.match(line):
continue
tokens = line.replace("(", " ").split()
if len(tokens) < 2:
continue
second = tokens[1].strip().lower()
second_base = re.sub(r"[^a-z]", "", second)
if second_base and second_base not in known_types:
continue
match = re.match(r"^\"?([A-Za-z_][A-Za-z0-9_]*)\"?\s+", line)
if match:
columns.add(match.group(1))
return columns
def parse_expected_schema(files: list[Path]) -> tuple[dict[str, set[str]], set[str]]:
expected_tables: dict[str, set[str]] = {}
expected_indexes: set[str] = set()
for path in files:
sql = strip_sql_comments(path.read_text(encoding="utf-8"))
for match in CREATE_TABLE_RE.finditer(sql):
table_name = match.group(1)
if table_name not in expected_tables:
expected_tables[table_name] = set()
block = extract_create_table_block(sql, table_name, match.end() - 1)
expected_tables[table_name].update(parse_columns_from_create_block(block))
for match in ADD_COLUMN_RE.finditer(sql):
table_name = match.group(1)
column_name = match.group(2)
if table_name not in expected_tables:
expected_tables[table_name] = set()
expected_tables[table_name].add(column_name)
for match in CREATE_INDEX_RE.finditer(sql):
index_name = match.group(1)
expected_indexes.add(index_name)
return expected_tables, expected_indexes
def query_actual_schema() -> tuple[set[str], dict[str, set[str]], set[str]]:
conn = psycopg2.connect(settings.DATABASE_URL)
conn.autocommit = True
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
ORDER BY table_name
"""
)
actual_tables = {row["table_name"] for row in cur.fetchall()}
cur.execute(
"""
SELECT table_name, column_name
FROM information_schema.columns
WHERE table_schema = 'public'
ORDER BY table_name, ordinal_position
"""
)
actual_columns: dict[str, set[str]] = {}
for row in cur.fetchall():
table_name = row["table_name"]
if table_name not in actual_columns:
actual_columns[table_name] = set()
actual_columns[table_name].add(row["column_name"])
cur.execute(
"""
SELECT indexname
FROM pg_indexes
WHERE schemaname = 'public'
ORDER BY indexname
"""
)
actual_indexes = {row["indexname"] for row in cur.fetchall()}
return actual_tables, actual_columns, actual_indexes
finally:
conn.close()
def compare_expected_vs_actual(
expected_tables: dict[str, set[str]],
expected_indexes: set[str],
actual_tables: set[str],
actual_columns: dict[str, set[str]],
actual_indexes: set[str],
) -> dict[str, list[str]]:
missing_tables: list[str] = []
missing_columns: list[str] = []
missing_indexes: list[str] = []
for table_name in sorted(expected_tables.keys()):
if table_name not in actual_tables:
missing_tables.append(table_name)
continue
expected_cols = expected_tables.get(table_name, set())
current_cols = actual_columns.get(table_name, set())
for col in sorted(expected_cols):
if col not in current_cols:
missing_columns.append(f"{table_name}.{col}")
for index_name in sorted(expected_indexes):
if index_name not in actual_indexes:
missing_indexes.append(index_name)
return {
"missing_tables": missing_tables,
"missing_columns": missing_columns,
"missing_indexes": missing_indexes,
}
def print_report(report: dict[str, list[str]]) -> None:
if not report["missing_tables"] and not report["missing_columns"] and not report["missing_indexes"]:
logger.info("Schema validation OK: no mismatches found.")
return
if report["missing_tables"]:
logger.error("Missing tables:")
for table_name in report["missing_tables"]:
logger.error("- %s", table_name)
if report["missing_columns"]:
logger.error("Missing columns:")
for column_name in report["missing_columns"]:
logger.error("- %s", column_name)
if report["missing_indexes"]:
logger.warning("Missing indexes:")
for index_name in report["missing_indexes"]:
logger.warning("- %s", index_name)
def determine_exit_code(report: dict[str, list[str]], strict_indexes: bool) -> int:
has_table_or_column_mismatch = bool(report["missing_tables"] or report["missing_columns"])
has_index_mismatch = bool(report["missing_indexes"])
if has_table_or_column_mismatch:
return 1
if strict_indexes and has_index_mismatch:
return 1
return 0
def main() -> int:
args = parse_args()
try:
patch_database_url_for_local_dev()
files = build_file_list(args)
expected_tables, expected_indexes = parse_expected_schema(files)
actual_tables, actual_columns, actual_indexes = query_actual_schema()
report = compare_expected_vs_actual(
expected_tables,
expected_indexes,
actual_tables,
actual_columns,
actual_indexes,
)
if args.json:
payload = {
"status": "ok" if determine_exit_code(report, args.strict_indexes) == 0 else "mismatch",
"strict_indexes": args.strict_indexes,
"parsed_files": [str(path.relative_to(ROOT)) for path in files],
"missing_tables": report["missing_tables"],
"missing_columns": report["missing_columns"],
"missing_indexes": report["missing_indexes"],
}
print(json.dumps(payload, ensure_ascii=True))
else:
logger.info("Parsed migration files: %s", len(files))
print_report(report)
return determine_exit_code(report, args.strict_indexes)
except Exception as exc:
if args.json:
print(json.dumps({"status": "error", "message": str(exc)}, ensure_ascii=True))
else:
logger.error("Validation failed: %s", exc)
return 2
if __name__ == "__main__":
raise SystemExit(main())