bmc_hub/app/core/database.py

124 lines
3.5 KiB
Python
Raw Normal View History

2025-12-05 14:22:39 +01:00
"""
Database Module
PostgreSQL connection and helpers using psycopg2
"""
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.pool import SimpleConnectionPool
from typing import Optional
import logging
from app.core.config import settings
logger = logging.getLogger(__name__)
# Connection pool
connection_pool: Optional[SimpleConnectionPool] = None
def init_db():
"""Initialize database connection pool"""
global connection_pool
try:
connection_pool = SimpleConnectionPool(
minconn=1,
maxconn=10,
dsn=settings.DATABASE_URL
)
logger.info("✅ Database connection pool initialized")
except Exception as e:
logger.error(f"❌ Failed to initialize database: {e}")
raise
def get_db_connection():
"""Get a connection from the pool"""
if connection_pool:
return connection_pool.getconn()
raise Exception("Database pool not initialized")
def release_db_connection(conn):
"""Return a connection to the pool"""
if connection_pool:
connection_pool.putconn(conn)
def get_db():
"""Context manager for database connections"""
conn = get_db_connection()
try:
yield conn
finally:
release_db_connection(conn)
def execute_query(query: str, params: tuple = None, fetch: bool = True):
"""Execute a SQL query and return results"""
2025-12-05 14:22:39 +01:00
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
# Auto-detect write operations and commit
query_upper = query.strip().upper()
is_write = query_upper.startswith(('INSERT', 'UPDATE', 'DELETE'))
if is_write:
conn.commit()
# Only fetch if there are results to fetch
# (SELECT queries or INSERT/UPDATE/DELETE with RETURNING clause)
if fetch and (not is_write or 'RETURNING' in query_upper):
return cursor.fetchall()
elif is_write:
return cursor.rowcount
return []
except Exception as e:
conn.rollback()
logger.error(f"Query error: {e}")
raise
finally:
release_db_connection(conn)
def execute_insert(query: str, params: tuple = None):
"""Execute INSERT query and return new ID"""
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
2025-12-05 14:22:39 +01:00
cursor.execute(query, params)
conn.commit()
result = cursor.fetchone()
return result['id'] if result and 'id' in result else None
2025-12-05 14:22:39 +01:00
except Exception as e:
conn.rollback()
logger.error(f"Insert error: {e}")
raise
finally:
release_db_connection(conn)
def execute_update(query: str, params: tuple = None):
"""Execute UPDATE/DELETE query and return affected rows"""
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
conn.commit()
return cursor.rowcount
except Exception as e:
conn.rollback()
logger.error(f"Update error: {e}")
2025-12-05 14:22:39 +01:00
raise
finally:
release_db_connection(conn)
def execute_query_single(query: str, params: tuple = None):
"""Execute query and return single row (backwards compatibility for fetchone=True)"""
result = execute_query(query, params)
return result[0] if result and len(result) > 0 else None