bmc_hub/app/settings/backend/views.py
Christian 4a52bdb5d6 feat: Implement quick-rent functionality for hardware assets
- Added QuickRentCreateInput model to handle quick-rent requests.
- Introduced quick_rent_preview endpoint to check existing subscriptions.
- Created quick_rent_hardware endpoint to manage rental subscriptions, asset bindings, and startup order drafts.
- Updated SQL queries to ensure proper data retrieval and handling.
- Added default rental price columns to hardware_assets table via migration.
- Enhanced UI in sag templates for better user experience and accessibility.
- Refactored existing code for improved readability and maintainability.
2026-04-21 01:34:40 +02:00

347 lines
11 KiB
Python

"""
Settings Frontend Views
"""
from datetime import datetime
from pathlib import Path
import re
from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from app.core.config import settings
from app.core.database import get_db_connection, release_db_connection, execute_query_single
router = APIRouter()
templates = Jinja2Templates(directory="app")
CREATE_TABLE_RE = re.compile(
r"CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)\s*\(",
re.IGNORECASE,
)
ADD_COLUMN_RE = re.compile(
r"ALTER\s+TABLE\s+(?:IF\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)\s+ADD\s+COLUMN\s+(?:IF\s+NOT\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)",
re.IGNORECASE,
)
CREATE_INDEX_RE = re.compile(
r"CREATE\s+(?:UNIQUE\s+)?INDEX\s+(?:IF\s+NOT\s+EXISTS\s+)?([A-Za-z_][A-Za-z0-9_]*)\s+ON\s+([A-Za-z_][A-Za-z0-9_]*)",
re.IGNORECASE,
)
SKIP_COLUMN_LINE_RE = re.compile(
r"^(?:CONSTRAINT|PRIMARY\s+KEY|FOREIGN\s+KEY|UNIQUE|CHECK|CASE|WHEN|ELSE|END)\b",
re.IGNORECASE,
)
def _strip_sql_comments(sql: str) -> str:
sql = re.sub(r"/\*.*?\*/", "", sql, flags=re.DOTALL)
sql = re.sub(r"--[^\n]*", "", sql)
return sql
def _extract_create_table_block(sql: str, start_pos: int) -> str:
open_paren = sql.find("(", start_pos)
if open_paren == -1:
return ""
depth = 0
for idx in range(open_paren, len(sql)):
def _migration_numeric_prefix(file_name: str) -> int | None:
match = re.match(r"^(\d+)_", file_name)
if not match:
return None
try:
return int(match.group(1))
except Exception:
return None
def _migration_sort_key(migration_path: Path) -> tuple[int, int, str]:
number = _migration_numeric_prefix(migration_path.name)
if number is None:
# Non-numbered files go last, alphabetically.
return (1, 0, migration_path.name.lower())
return (0, number, migration_path.name.lower())
def _find_duplicate_migration_numbers(file_names: list[str]) -> list[dict]:
grouped: dict[int, list[str]] = {}
for name in file_names:
number = _migration_numeric_prefix(name)
if number is None:
continue
grouped.setdefault(number, []).append(name)
duplicates = []
for number in sorted(grouped.keys()):
files = sorted(grouped[number], key=lambda n: n.lower())
if len(files) > 1:
duplicates.append({"number": number, "files": files})
return duplicates
ch = sql[idx]
if ch == "(":
depth += 1
elif ch == ")":
depth -= 1
if depth == 0:
return sql[open_paren + 1:idx]
return ""
def _parse_columns_from_create_block(block: str) -> set[str]:
columns: set[str] = set()
known_types = {
"serial", "bigserial", "smallint", "integer", "bigint", "numeric", "decimal", "real", "double",
"varchar", "character", "text", "boolean", "bool", "date", "timestamp", "time", "json", "jsonb", "uuid"
}
for raw_line in block.splitlines():
line = raw_line.strip().rstrip(",")
if not line:
continue
if SKIP_COLUMN_LINE_RE.match(line):
continue
tokens = line.replace("(", " ").split()
if len(tokens) < 2:
continue
second = tokens[1].strip().lower()
second_base = re.sub(r"[^a-z]", "", second)
if second_base and second_base not in known_types:
continue
match = re.match(r"^\"?([A-Za-z_][A-Za-z0-9_]*)\"?\s+", line)
if match:
columns.add(match.group(1))
return columns
def _parse_migration_expectations(sql: str) -> tuple[set[str], set[tuple[str, str]], set[str]]:
expected_tables: set[str] = set()
expected_columns: set[tuple[str, str]] = set()
expected_indexes: set[str] = set()
clean_sql = _strip_sql_comments(sql)
for match in CREATE_TABLE_RE.finditer(clean_sql):
table_name = match.group(1)
expected_tables.add(table_name)
block = _extract_create_table_block(clean_sql, match.end() - 1)
for column_name in _parse_columns_from_create_block(block):
expected_columns.add((table_name, column_name))
for match in ADD_COLUMN_RE.finditer(clean_sql):
expected_columns.add((match.group(1), match.group(2)))
for match in CREATE_INDEX_RE.finditer(clean_sql):
expected_indexes.add(match.group(1))
return expected_tables, expected_columns, expected_indexes
def _get_actual_schema_snapshot(conn) -> tuple[set[str], set[tuple[str, str]], set[str]]:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
"""
)
tables = {row[0] for row in cursor.fetchall()}
cursor.execute(
"""
SELECT table_name, column_name
FROM information_schema.columns
WHERE table_schema = 'public'
"""
)
columns = {(row[0], row[1]) for row in cursor.fetchall()}
cursor.execute(
"""
SELECT indexname
FROM pg_indexes
WHERE schemaname = 'public'
"""
)
indexes = {row[0] for row in cursor.fetchall()}
return tables, columns, indexes
def _status_for_migration_file(
migration_sql: str,
actual_tables: set[str],
actual_columns: set[tuple[str, str]],
actual_indexes: set[str],
) -> dict:
expected_tables, expected_columns, expected_indexes = _parse_migration_expectations(migration_sql)
total_checks = len(expected_tables) + len(expected_columns) + len(expected_indexes)
if total_checks == 0:
return {
"status": "gray",
"label": "Grå",
"summary": "Ingen direkte schema-checks fundet i filen",
"missing_tables": [],
"missing_columns": [],
"missing_indexes": [],
}
missing_tables = sorted([tbl for tbl in expected_tables if tbl not in actual_tables])
missing_columns = sorted([f"{tbl}.{col}" for (tbl, col) in expected_columns if (tbl, col) not in actual_columns])
missing_indexes = sorted([idx for idx in expected_indexes if idx not in actual_indexes])
if not missing_tables and not missing_columns and not missing_indexes:
return {
"status": "green",
"label": "Grøn",
"summary": "Alle schema-elementer fra filen findes i databasen",
"missing_tables": [],
"missing_columns": [],
"missing_indexes": [],
}
parts = []
if missing_tables:
parts.append(f"tabeller: {len(missing_tables)}")
if missing_columns:
parts.append(f"kolonner: {len(missing_columns)}")
if missing_indexes:
parts.append(f"indexes: {len(missing_indexes)}")
return {
"status": "red",
"label": "Rød",
"summary": "Mangler " + ", ".join(parts),
"missing_tables": missing_tables,
"missing_columns": missing_columns,
"missing_indexes": missing_indexes,
}
@router.get("/settings", response_class=HTMLResponse, tags=["Frontend"])
async def settings_page(request: Request):
"""Render settings page"""
default_dashboard_path = ""
user_id = getattr(request.state, "user_id", None)
if user_id:
try:
row = execute_query_single(
"""
SELECT default_dashboard_path
FROM user_dashboard_preferences
WHERE user_id = %s
""",
(int(user_id),)
)
default_dashboard_path = (row or {}).get("default_dashboard_path") or ""
except Exception:
default_dashboard_path = ""
return templates.TemplateResponse("settings/frontend/settings.html", {
"request": request,
"title": "Indstillinger",
"default_dashboard_path": default_dashboard_path
})
@router.get("/settings/migrations", response_class=HTMLResponse, tags=["Frontend"])
async def migrations_page(request: Request):
"""Render database migrations page"""
migrations_dir = Path(__file__).resolve().parents[3] / "migrations"
migrations = []
migration_file_names = []
if migrations_dir.exists():
for migration_file in sorted(migrations_dir.glob("*.sql"), key=_migration_sort_key):
stat = migration_file.stat()
migration_file_names.append(migration_file.name)
migrations.append({
"name": migration_file.name,
"size_kb": round(stat.st_size / 1024, 1),
"modified": datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M"),
"number": _migration_numeric_prefix(migration_file.name),
})
duplicate_numbers = _find_duplicate_migration_numbers(migration_file_names)
return templates.TemplateResponse("settings/frontend/migrations.html", {
"request": request,
"title": "Database Migrationer",
"migrations": migrations,
"db_user": settings.POSTGRES_USER,
"db_name": settings.POSTGRES_DB,
"db_container": "bmc-hub-postgres",
"is_production": request.url.hostname not in ['localhost', '127.0.0.1', '0.0.0.0'],
"duplicate_numbers": duplicate_numbers,
})
class MigrationExecution(BaseModel):
file_name: str
@router.get("/settings/migrations/status", tags=["Frontend"])
def migration_statuses():
"""Check migration files against current schema and return per-file color status."""
migrations_dir = Path(__file__).resolve().parents[3] / "migrations"
files = sorted(migrations_dir.glob("*.sql"), key=_migration_sort_key) if migrations_dir.exists() else []
conn = get_db_connection()
try:
actual_tables, actual_columns, actual_indexes = _get_actual_schema_snapshot(conn)
statuses = []
for migration_file in files:
migration_sql = migration_file.read_text(encoding="utf-8")
status_info = _status_for_migration_file(
migration_sql,
actual_tables,
actual_columns,
actual_indexes,
)
statuses.append({
"name": migration_file.name,
**status_info,
})
return {"statuses": statuses}
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Status check failed: {exc}")
finally:
release_db_connection(conn)
@router.post("/settings/migrations/execute", tags=["Frontend"])
def execute_migration(payload: MigrationExecution):
"""Execute a migration SQL file"""
migrations_dir = Path(__file__).resolve().parents[3] / "migrations"
migration_file = migrations_dir / payload.file_name
if not migration_file.exists():
raise HTTPException(status_code=404, detail="Migration file not found")
# Determine the container runtime (Podman or Docker)
migration_sql = migration_file.read_text()
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute(migration_sql)
conn.commit()
except Exception as exc:
conn.rollback()
raise HTTPException(status_code=500, detail=f"Migration failed: {exc}")
finally:
release_db_connection(conn)
return {"message": "Migration executed successfully"}