- Created a new SQL migration for the sag_salgsvarer table to manage sales and purchase items. - Implemented a new HTML template for the Varekøb & Salg module, including summary cards and tables for sales and purchases. - Added JavaScript functions for loading and rendering order data dynamically. - Introduced a new backend search module for customers, contacts, hardware, and locations with autocomplete functionality. - Developed an email templates API for managing system and customer-specific email templates. - Created multiple migrations for Nextcloud instances, cache, audit logs, email templates, sag comments, hardware locations, and billing methods. - Enhanced the sag module with solutions, order lines, work types, and 2FA support for user authentication.
131 lines
3.9 KiB
Python
131 lines
3.9 KiB
Python
"""
|
|
Database Module
|
|
PostgreSQL connection and helpers using psycopg2
|
|
"""
|
|
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
from psycopg2.pool import SimpleConnectionPool
|
|
from typing import Optional
|
|
import logging
|
|
|
|
from app.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Connection pool
|
|
connection_pool: Optional[SimpleConnectionPool] = None
|
|
|
|
|
|
def init_db():
|
|
"""Initialize database connection pool"""
|
|
global connection_pool
|
|
|
|
try:
|
|
connection_pool = SimpleConnectionPool(
|
|
minconn=1,
|
|
maxconn=10,
|
|
dsn=settings.DATABASE_URL
|
|
)
|
|
logger.info("✅ Database connection pool initialized")
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to initialize database: {e}")
|
|
raise
|
|
|
|
|
|
def get_db_connection():
|
|
"""Get a connection from the pool"""
|
|
if connection_pool:
|
|
conn = connection_pool.getconn()
|
|
try:
|
|
conn.set_client_encoding("UTF8")
|
|
except Exception:
|
|
pass
|
|
return conn
|
|
raise Exception("Database pool not initialized")
|
|
|
|
|
|
def release_db_connection(conn):
|
|
"""Return a connection to the pool"""
|
|
if connection_pool:
|
|
connection_pool.putconn(conn)
|
|
|
|
|
|
def get_db():
|
|
"""Context manager for database connections"""
|
|
conn = get_db_connection()
|
|
try:
|
|
yield conn
|
|
finally:
|
|
release_db_connection(conn)
|
|
|
|
|
|
def execute_query(query: str, params: tuple = None, fetch: bool = True):
|
|
"""Execute a SQL query and return results"""
|
|
conn = get_db_connection()
|
|
try:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
|
|
cursor.execute(query, params)
|
|
|
|
# Auto-detect write operations and commit
|
|
# Robust detection handling comments and whitespace
|
|
clean_query = "\n".join([line for line in query.split("\n") if not line.strip().startswith("--")]).strip().upper()
|
|
is_write = clean_query.startswith(('INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP', 'TRUNCATE', 'COMMENT'))
|
|
|
|
if is_write:
|
|
conn.commit()
|
|
|
|
# Only fetch if there are results to fetch (cursor.description is not None)
|
|
if cursor.description:
|
|
return cursor.fetchall()
|
|
|
|
return cursor.rowcount
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error(f"Query error: {e}")
|
|
raise
|
|
finally:
|
|
release_db_connection(conn)
|
|
|
|
|
|
def execute_insert(query: str, params: tuple = None):
|
|
"""Execute INSERT query and return new ID (requires RETURNING clause)"""
|
|
conn = get_db_connection()
|
|
try:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
|
|
cursor.execute(query, params)
|
|
conn.commit()
|
|
result = cursor.fetchone()
|
|
if result:
|
|
# Return first column value regardless of name (id, extraction_id, file_id, etc.)
|
|
return result[list(result.keys())[0]] if result else None
|
|
return None
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error(f"Insert error: {e}")
|
|
raise
|
|
finally:
|
|
release_db_connection(conn)
|
|
|
|
|
|
def execute_update(query: str, params: tuple = None):
|
|
"""Execute UPDATE/DELETE query and return affected rows"""
|
|
conn = get_db_connection()
|
|
try:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
|
|
cursor.execute(query, params)
|
|
conn.commit()
|
|
return cursor.rowcount
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error(f"Update error: {e}")
|
|
raise
|
|
finally:
|
|
release_db_connection(conn)
|
|
|
|
|
|
def execute_query_single(query: str, params: tuple = None):
|
|
"""Execute query and return single row (backwards compatibility for fetchone=True)"""
|
|
result = execute_query(query, params)
|
|
return result[0] if result and len(result) > 0 else None
|