bmc_hub/app/settings/backend/views.py

307 lines
10 KiB
Python

"""
Settings Frontend Views
"""
from datetime import datetime
from pathlib import Path
import re
from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from app.core.config import settings
from app.core.database import get_db_connection, release_db_connection, execute_query_single
router = APIRouter()
templates = Jinja2Templates(directory="app")
CREATE_TABLE_RE = re.compile(
r"CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)\s*\(",
re.IGNORECASE,
)
ADD_COLUMN_RE = re.compile(
r"ALTER\s+TABLE\s+(?:IF\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)\s+ADD\s+COLUMN\s+(?:IF\s+NOT\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)",
re.IGNORECASE,
)
CREATE_INDEX_RE = re.compile(
r"CREATE\s+(?:UNIQUE\s+)?INDEX\s+(?:IF\s+NOT\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)\s+ON\s+([A-Za-z_][A-Za-z0-9_]*)",
re.IGNORECASE,
)
SKIP_COLUMN_LINE_RE = re.compile(
r"^(?:CONSTRAINT|PRIMARY\s+KEY|FOREIGN\s+KEY|UNIQUE|CHECK|CASE|WHEN|ELSE|END)\b",
re.IGNORECASE,
)
def _strip_sql_comments(sql: str) -> str:
sql = re.sub(r"/\*.*?\*/", "", sql, flags=re.DOTALL)
sql = re.sub(r"--[^\n]*", "", sql)
return sql
def _extract_create_table_block(sql: str, start_pos: int) -> str:
open_paren = sql.find("(", start_pos)
if open_paren == -1:
return ""
depth = 0
for idx in range(open_paren, len(sql)):
ch = sql[idx]
if ch == "(":
depth += 1
elif ch == ")":
depth -= 1
if depth == 0:
return sql[open_paren + 1:idx]
return ""
def _parse_columns_from_create_block(block: str) -> set[str]:
columns: set[str] = set()
known_types = {
"serial", "bigserial", "smallint", "integer", "bigint", "numeric", "decimal", "real", "double",
"varchar", "character", "text", "boolean", "bool", "date", "timestamp", "time", "json", "jsonb", "uuid"
}
for raw_line in block.splitlines():
line = raw_line.strip().rstrip(",")
if not line:
continue
if SKIP_COLUMN_LINE_RE.match(line):
continue
tokens = line.replace("(", " ").split()
if len(tokens) < 2:
continue
second = tokens[1].strip().lower()
second_base = re.sub(r"[^a-z]", "", second)
if second_base and second_base not in known_types:
continue
match = re.match(r"^\"?([A-Za-z_][A-Za-z0-9_]*)\"?\s+", line)
if match:
columns.add(match.group(1))
return columns
def _parse_migration_expectations(sql: str) -> tuple[set[str], set[tuple[str, str]], set[str]]:
expected_tables: set[str] = set()
expected_columns: set[tuple[str, str]] = set()
expected_indexes: set[str] = set()
clean_sql = _strip_sql_comments(sql)
for match in CREATE_TABLE_RE.finditer(clean_sql):
table_name = match.group(1)
expected_tables.add(table_name)
block = _extract_create_table_block(clean_sql, match.end() - 1)
for column_name in _parse_columns_from_create_block(block):
expected_columns.add((table_name, column_name))
for match in ADD_COLUMN_RE.finditer(clean_sql):
expected_columns.add((match.group(1), match.group(2)))
for match in CREATE_INDEX_RE.finditer(clean_sql):
expected_indexes.add(match.group(1))
return expected_tables, expected_columns, expected_indexes
def _get_actual_schema_snapshot(conn) -> tuple[set[str], set[tuple[str, str]], set[str]]:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
"""
)
tables = {row[0] for row in cursor.fetchall()}
cursor.execute(
"""
SELECT table_name, column_name
FROM information_schema.columns
WHERE table_schema = 'public'
"""
)
columns = {(row[0], row[1]) for row in cursor.fetchall()}
cursor.execute(
"""
SELECT indexname
FROM pg_indexes
WHERE schemaname = 'public'
"""
)
indexes = {row[0] for row in cursor.fetchall()}
return tables, columns, indexes
def _status_for_migration_file(
migration_sql: str,
actual_tables: set[str],
actual_columns: set[tuple[str, str]],
actual_indexes: set[str],
) -> dict:
expected_tables, expected_columns, expected_indexes = _parse_migration_expectations(migration_sql)
total_checks = len(expected_tables) + len(expected_columns) + len(expected_indexes)
if total_checks == 0:
return {
"status": "gray",
"label": "Grå",
"summary": "Ingen direkte schema-checks fundet i filen",
"missing_tables": [],
"missing_columns": [],
"missing_indexes": [],
}
missing_tables = sorted([tbl for tbl in expected_tables if tbl not in actual_tables])
missing_columns = sorted([f"{tbl}.{col}" for (tbl, col) in expected_columns if (tbl, col) not in actual_columns])
missing_indexes = sorted([idx for idx in expected_indexes if idx not in actual_indexes])
if not missing_tables and not missing_columns and not missing_indexes:
return {
"status": "green",
"label": "Grøn",
"summary": "Alle schema-elementer fra filen findes i databasen",
"missing_tables": [],
"missing_columns": [],
"missing_indexes": [],
}
parts = []
if missing_tables:
parts.append(f"tabeller: {len(missing_tables)}")
if missing_columns:
parts.append(f"kolonner: {len(missing_columns)}")
if missing_indexes:
parts.append(f"indexes: {len(missing_indexes)}")
return {
"status": "red",
"label": "Rød",
"summary": "Mangler " + ", ".join(parts),
"missing_tables": missing_tables,
"missing_columns": missing_columns,
"missing_indexes": missing_indexes,
}
@router.get("/settings", response_class=HTMLResponse, tags=["Frontend"])
async def settings_page(request: Request):
"""Render settings page"""
default_dashboard_path = ""
user_id = getattr(request.state, "user_id", None)
if user_id:
try:
row = execute_query_single(
"""
SELECT default_dashboard_path
FROM user_dashboard_preferences
WHERE user_id = %s
""",
(int(user_id),)
)
default_dashboard_path = (row or {}).get("default_dashboard_path") or ""
except Exception:
default_dashboard_path = ""
return templates.TemplateResponse("settings/frontend/settings.html", {
"request": request,
"title": "Indstillinger",
"default_dashboard_path": default_dashboard_path
})
@router.get("/settings/migrations", response_class=HTMLResponse, tags=["Frontend"])
async def migrations_page(request: Request):
"""Render database migrations page"""
migrations_dir = Path(__file__).resolve().parents[3] / "migrations"
migrations = []
if migrations_dir.exists():
for migration_file in sorted(migrations_dir.glob("*.sql")):
stat = migration_file.stat()
migrations.append({
"name": migration_file.name,
"size_kb": round(stat.st_size / 1024, 1),
"modified": datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M")
})
return templates.TemplateResponse("settings/frontend/migrations.html", {
"request": request,
"title": "Database Migrationer",
"migrations": migrations,
"db_user": settings.POSTGRES_USER,
"db_name": settings.POSTGRES_DB,
"db_container": "bmc-hub-postgres",
"is_production": request.url.hostname not in ['localhost', '127.0.0.1', '0.0.0.0']
})
class MigrationExecution(BaseModel):
file_name: str
@router.get("/settings/migrations/status", tags=["Frontend"])
def migration_statuses():
"""Check migration files against current schema and return per-file color status."""
migrations_dir = Path(__file__).resolve().parents[3] / "migrations"
files = sorted(migrations_dir.glob("*.sql")) if migrations_dir.exists() else []
conn = get_db_connection()
try:
actual_tables, actual_columns, actual_indexes = _get_actual_schema_snapshot(conn)
statuses = []
for migration_file in files:
migration_sql = migration_file.read_text(encoding="utf-8")
status_info = _status_for_migration_file(
migration_sql,
actual_tables,
actual_columns,
actual_indexes,
)
statuses.append({
"name": migration_file.name,
**status_info,
})
return {"statuses": statuses}
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Status check failed: {exc}")
finally:
release_db_connection(conn)
@router.post("/settings/migrations/execute", tags=["Frontend"])
def execute_migration(payload: MigrationExecution):
"""Execute a migration SQL file"""
migrations_dir = Path(__file__).resolve().parents[3] / "migrations"
migration_file = migrations_dir / payload.file_name
if not migration_file.exists():
raise HTTPException(status_code=404, detail="Migration file not found")
# Determine the container runtime (Podman or Docker)
migration_sql = migration_file.read_text()
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute(migration_sql)
conn.commit()
except Exception as exc:
conn.rollback()
raise HTTPException(status_code=500, detail=f"Migration failed: {exc}")
finally:
release_db_connection(conn)
return {"message": "Migration executed successfully"}