bmc_hub/app/core/database.py
Christian 29acdf3e01 Add tests for new SAG module endpoints and module deactivation
- Implement test script for new SAG module endpoints BE-003 (Tag State Management) and BE-004 (Bulk Operations).
- Create test cases for creating, updating, and bulk operations on cases and tags.
- Add a test for module deactivation to ensure data integrity is maintained.
- Include setup and teardown for tests to clear database state before and after each test.
2026-01-31 23:16:24 +01:00

132 lines
3.8 KiB
Python

"""
Database Module
PostgreSQL connection and helpers using psycopg2
"""
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.pool import SimpleConnectionPool
from typing import Optional
import logging
from app.core.config import settings
logger = logging.getLogger(__name__)
# Connection pool
connection_pool: Optional[SimpleConnectionPool] = None
def init_db():
"""Initialize database connection pool"""
global connection_pool
try:
connection_pool = SimpleConnectionPool(
minconn=1,
maxconn=10,
dsn=settings.DATABASE_URL
)
logger.info("✅ Database connection pool initialized")
except Exception as e:
logger.error(f"❌ Failed to initialize database: {e}")
raise
def get_db_connection():
"""Get a connection from the pool"""
if connection_pool:
conn = connection_pool.getconn()
try:
conn.set_client_encoding("UTF8")
except Exception:
pass
return conn
raise Exception("Database pool not initialized")
def release_db_connection(conn):
"""Return a connection to the pool"""
if connection_pool:
connection_pool.putconn(conn)
def get_db():
"""Context manager for database connections"""
conn = get_db_connection()
try:
yield conn
finally:
release_db_connection(conn)
def execute_query(query: str, params: tuple = None, fetch: bool = True):
"""Execute a SQL query and return results"""
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
# Auto-detect write operations and commit
query_upper = query.strip().upper()
is_write = query_upper.startswith(('INSERT', 'UPDATE', 'DELETE'))
if is_write:
conn.commit()
# Only fetch if there are results to fetch
# (SELECT queries or INSERT/UPDATE/DELETE with RETURNING clause)
if fetch and (not is_write or 'RETURNING' in query_upper):
return cursor.fetchall()
elif is_write:
return cursor.rowcount
return []
except Exception as e:
conn.rollback()
logger.error(f"Query error: {e}")
raise
finally:
release_db_connection(conn)
def execute_insert(query: str, params: tuple = None):
"""Execute INSERT query and return new ID (requires RETURNING clause)"""
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
conn.commit()
result = cursor.fetchone()
if result:
# Return first column value regardless of name (id, extraction_id, file_id, etc.)
return result[list(result.keys())[0]] if result else None
return None
except Exception as e:
conn.rollback()
logger.error(f"Insert error: {e}")
raise
finally:
release_db_connection(conn)
def execute_update(query: str, params: tuple = None):
"""Execute UPDATE/DELETE query and return affected rows"""
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
conn.commit()
return cursor.rowcount
except Exception as e:
conn.rollback()
logger.error(f"Update error: {e}")
raise
finally:
release_db_connection(conn)
def execute_query_single(query: str, params: tuple = None):
"""Execute query and return single row (backwards compatibility for fetchone=True)"""
result = execute_query(query, params)
return result[0] if result and len(result) > 0 else None