bmc_hub/app/core/database.py
Christian 56d6d45aa2 feat(sag): Add Varekøb & Salg module with database migration and frontend template
- Created a new SQL migration for the sag_salgsvarer table to manage sales and purchase items.
- Implemented a new HTML template for the Varekøb & Salg module, including summary cards and tables for sales and purchases.
- Added JavaScript functions for loading and rendering order data dynamically.
- Introduced a new backend search module for customers, contacts, hardware, and locations with autocomplete functionality.
- Developed an email templates API for managing system and customer-specific email templates.
- Created multiple migrations for Nextcloud instances, cache, audit logs, email templates, sag comments, hardware locations, and billing methods.
- Enhanced the sag module with solutions, order lines, work types, and 2FA support for user authentication.
2026-02-02 20:23:56 +01:00

131 lines
3.9 KiB
Python

"""
Database Module
PostgreSQL connection and helpers using psycopg2
"""
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.pool import SimpleConnectionPool
from typing import Optional
import logging
from app.core.config import settings
logger = logging.getLogger(__name__)
# Connection pool
connection_pool: Optional[SimpleConnectionPool] = None
def init_db():
"""Initialize database connection pool"""
global connection_pool
try:
connection_pool = SimpleConnectionPool(
minconn=1,
maxconn=10,
dsn=settings.DATABASE_URL
)
logger.info("✅ Database connection pool initialized")
except Exception as e:
logger.error(f"❌ Failed to initialize database: {e}")
raise
def get_db_connection():
"""Get a connection from the pool"""
if connection_pool:
conn = connection_pool.getconn()
try:
conn.set_client_encoding("UTF8")
except Exception:
pass
return conn
raise Exception("Database pool not initialized")
def release_db_connection(conn):
"""Return a connection to the pool"""
if connection_pool:
connection_pool.putconn(conn)
def get_db():
"""Context manager for database connections"""
conn = get_db_connection()
try:
yield conn
finally:
release_db_connection(conn)
def execute_query(query: str, params: tuple = None, fetch: bool = True):
"""Execute a SQL query and return results"""
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
# Auto-detect write operations and commit
# Robust detection handling comments and whitespace
clean_query = "\n".join([line for line in query.split("\n") if not line.strip().startswith("--")]).strip().upper()
is_write = clean_query.startswith(('INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP', 'TRUNCATE', 'COMMENT'))
if is_write:
conn.commit()
# Only fetch if there are results to fetch (cursor.description is not None)
if cursor.description:
return cursor.fetchall()
return cursor.rowcount
except Exception as e:
conn.rollback()
logger.error(f"Query error: {e}")
raise
finally:
release_db_connection(conn)
def execute_insert(query: str, params: tuple = None):
"""Execute INSERT query and return new ID (requires RETURNING clause)"""
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
conn.commit()
result = cursor.fetchone()
if result:
# Return first column value regardless of name (id, extraction_id, file_id, etc.)
return result[list(result.keys())[0]] if result else None
return None
except Exception as e:
conn.rollback()
logger.error(f"Insert error: {e}")
raise
finally:
release_db_connection(conn)
def execute_update(query: str, params: tuple = None):
"""Execute UPDATE/DELETE query and return affected rows"""
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
conn.commit()
return cursor.rowcount
except Exception as e:
conn.rollback()
logger.error(f"Update error: {e}")
raise
finally:
release_db_connection(conn)
def execute_query_single(query: str, params: tuple = None):
"""Execute query and return single row (backwards compatibility for fetchone=True)"""
result = execute_query(query, params)
return result[0] if result and len(result) > 0 else None