""" Settings Frontend Views """ from datetime import datetime from pathlib import Path import re from fastapi import APIRouter, Request, HTTPException from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from pydantic import BaseModel from app.core.config import settings from app.core.database import get_db_connection, release_db_connection, execute_query_single router = APIRouter() templates = Jinja2Templates(directory="app") CREATE_TABLE_RE = re.compile( r"CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)\s*\(", re.IGNORECASE, ) ADD_COLUMN_RE = re.compile( r"ALTER\s+TABLE\s+(?:IF\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)\s+ADD\s+COLUMN\s+(?:IF\s+NOT\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)", re.IGNORECASE, ) CREATE_INDEX_RE = re.compile( r"CREATE\s+(?:UNIQUE\s+)?INDEX\s+(?:IF\s+NOT\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)\s+ON\s+([A-Za-z_][A-Za-z0-9_]*)", re.IGNORECASE, ) SKIP_COLUMN_LINE_RE = re.compile( r"^(?:CONSTRAINT|PRIMARY\s+KEY|FOREIGN\s+KEY|UNIQUE|CHECK|CASE|WHEN|ELSE|END)\b", re.IGNORECASE, ) def _strip_sql_comments(sql: str) -> str: sql = re.sub(r"/\*.*?\*/", "", sql, flags=re.DOTALL) sql = re.sub(r"--[^\n]*", "", sql) return sql def _extract_create_table_block(sql: str, start_pos: int) -> str: open_paren = sql.find("(", start_pos) if open_paren == -1: return "" depth = 0 for idx in range(open_paren, len(sql)): ch = sql[idx] if ch == "(": depth += 1 elif ch == ")": depth -= 1 if depth == 0: return sql[open_paren + 1:idx] return "" def _parse_columns_from_create_block(block: str) -> set[str]: columns: set[str] = set() known_types = { "serial", "bigserial", "smallint", "integer", "bigint", "numeric", "decimal", "real", "double", "varchar", "character", "text", "boolean", "bool", "date", "timestamp", "time", "json", "jsonb", "uuid" } for raw_line in block.splitlines(): line = raw_line.strip().rstrip(",") if not line: continue if SKIP_COLUMN_LINE_RE.match(line): continue tokens = line.replace("(", " ").split() if len(tokens) < 2: continue second = tokens[1].strip().lower() second_base = re.sub(r"[^a-z]", "", second) if second_base and second_base not in known_types: continue match = re.match(r"^\"?([A-Za-z_][A-Za-z0-9_]*)\"?\s+", line) if match: columns.add(match.group(1)) return columns def _parse_migration_expectations(sql: str) -> tuple[set[str], set[tuple[str, str]], set[str]]: expected_tables: set[str] = set() expected_columns: set[tuple[str, str]] = set() expected_indexes: set[str] = set() clean_sql = _strip_sql_comments(sql) for match in CREATE_TABLE_RE.finditer(clean_sql): table_name = match.group(1) expected_tables.add(table_name) block = _extract_create_table_block(clean_sql, match.end() - 1) for column_name in _parse_columns_from_create_block(block): expected_columns.add((table_name, column_name)) for match in ADD_COLUMN_RE.finditer(clean_sql): expected_columns.add((match.group(1), match.group(2))) for match in CREATE_INDEX_RE.finditer(clean_sql): expected_indexes.add(match.group(1)) return expected_tables, expected_columns, expected_indexes def _get_actual_schema_snapshot(conn) -> tuple[set[str], set[tuple[str, str]], set[str]]: with conn.cursor() as cursor: cursor.execute( """ SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE' """ ) tables = {row[0] for row in cursor.fetchall()} cursor.execute( """ SELECT table_name, column_name FROM information_schema.columns WHERE table_schema = 'public' """ ) columns = {(row[0], row[1]) for row in cursor.fetchall()} cursor.execute( """ SELECT indexname FROM pg_indexes WHERE schemaname = 'public' """ ) indexes = {row[0] for row in cursor.fetchall()} return tables, columns, indexes def _status_for_migration_file( migration_sql: str, actual_tables: set[str], actual_columns: set[tuple[str, str]], actual_indexes: set[str], ) -> dict: expected_tables, expected_columns, expected_indexes = _parse_migration_expectations(migration_sql) total_checks = len(expected_tables) + len(expected_columns) + len(expected_indexes) if total_checks == 0: return { "status": "gray", "label": "Grå", "summary": "Ingen direkte schema-checks fundet i filen", "missing_tables": [], "missing_columns": [], "missing_indexes": [], } missing_tables = sorted([tbl for tbl in expected_tables if tbl not in actual_tables]) missing_columns = sorted([f"{tbl}.{col}" for (tbl, col) in expected_columns if (tbl, col) not in actual_columns]) missing_indexes = sorted([idx for idx in expected_indexes if idx not in actual_indexes]) if not missing_tables and not missing_columns and not missing_indexes: return { "status": "green", "label": "Grøn", "summary": "Alle schema-elementer fra filen findes i databasen", "missing_tables": [], "missing_columns": [], "missing_indexes": [], } parts = [] if missing_tables: parts.append(f"tabeller: {len(missing_tables)}") if missing_columns: parts.append(f"kolonner: {len(missing_columns)}") if missing_indexes: parts.append(f"indexes: {len(missing_indexes)}") return { "status": "red", "label": "Rød", "summary": "Mangler " + ", ".join(parts), "missing_tables": missing_tables, "missing_columns": missing_columns, "missing_indexes": missing_indexes, } @router.get("/settings", response_class=HTMLResponse, tags=["Frontend"]) async def settings_page(request: Request): """Render settings page""" default_dashboard_path = "" user_id = getattr(request.state, "user_id", None) if user_id: try: row = execute_query_single( """ SELECT default_dashboard_path FROM user_dashboard_preferences WHERE user_id = %s """, (int(user_id),) ) default_dashboard_path = (row or {}).get("default_dashboard_path") or "" except Exception: default_dashboard_path = "" return templates.TemplateResponse("settings/frontend/settings.html", { "request": request, "title": "Indstillinger", "default_dashboard_path": default_dashboard_path }) @router.get("/settings/migrations", response_class=HTMLResponse, tags=["Frontend"]) async def migrations_page(request: Request): """Render database migrations page""" migrations_dir = Path(__file__).resolve().parents[3] / "migrations" migrations = [] if migrations_dir.exists(): for migration_file in sorted(migrations_dir.glob("*.sql")): stat = migration_file.stat() migrations.append({ "name": migration_file.name, "size_kb": round(stat.st_size / 1024, 1), "modified": datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M") }) return templates.TemplateResponse("settings/frontend/migrations.html", { "request": request, "title": "Database Migrationer", "migrations": migrations, "db_user": settings.POSTGRES_USER, "db_name": settings.POSTGRES_DB, "db_container": "bmc-hub-postgres", "is_production": request.url.hostname not in ['localhost', '127.0.0.1', '0.0.0.0'] }) class MigrationExecution(BaseModel): file_name: str @router.get("/settings/migrations/status", tags=["Frontend"]) def migration_statuses(): """Check migration files against current schema and return per-file color status.""" migrations_dir = Path(__file__).resolve().parents[3] / "migrations" files = sorted(migrations_dir.glob("*.sql")) if migrations_dir.exists() else [] conn = get_db_connection() try: actual_tables, actual_columns, actual_indexes = _get_actual_schema_snapshot(conn) statuses = [] for migration_file in files: migration_sql = migration_file.read_text(encoding="utf-8") status_info = _status_for_migration_file( migration_sql, actual_tables, actual_columns, actual_indexes, ) statuses.append({ "name": migration_file.name, **status_info, }) return {"statuses": statuses} except Exception as exc: raise HTTPException(status_code=500, detail=f"Status check failed: {exc}") finally: release_db_connection(conn) @router.post("/settings/migrations/execute", tags=["Frontend"]) def execute_migration(payload: MigrationExecution): """Execute a migration SQL file""" migrations_dir = Path(__file__).resolve().parents[3] / "migrations" migration_file = migrations_dir / payload.file_name if not migration_file.exists(): raise HTTPException(status_code=404, detail="Migration file not found") # Determine the container runtime (Podman or Docker) migration_sql = migration_file.read_text() conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(migration_sql) conn.commit() except Exception as exc: conn.rollback() raise HTTPException(status_code=500, detail=f"Migration failed: {exc}") finally: release_db_connection(conn) return {"message": "Migration executed successfully"}