bmc_hub/app/core/database.py

124 lines
3.5 KiB
Python

"""
Database Module
PostgreSQL connection and helpers using psycopg2
"""
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.pool import SimpleConnectionPool
from typing import Optional
import logging
from app.core.config import settings
logger = logging.getLogger(__name__)
# Connection pool
connection_pool: Optional[SimpleConnectionPool] = None
def init_db():
"""Initialize database connection pool"""
global connection_pool
try:
connection_pool = SimpleConnectionPool(
minconn=1,
maxconn=10,
dsn=settings.DATABASE_URL
)
logger.info("✅ Database connection pool initialized")
except Exception as e:
logger.error(f"❌ Failed to initialize database: {e}")
raise
def get_db_connection():
"""Get a connection from the pool"""
if connection_pool:
return connection_pool.getconn()
raise Exception("Database pool not initialized")
def release_db_connection(conn):
"""Return a connection to the pool"""
if connection_pool:
connection_pool.putconn(conn)
def get_db():
"""Context manager for database connections"""
conn = get_db_connection()
try:
yield conn
finally:
release_db_connection(conn)
def execute_query(query: str, params: tuple = None, fetch: bool = True):
"""Execute a SQL query and return results"""
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
# Auto-detect write operations and commit
query_upper = query.strip().upper()
is_write = query_upper.startswith(('INSERT', 'UPDATE', 'DELETE'))
if is_write:
conn.commit()
# Only fetch if there are results to fetch
# (SELECT queries or INSERT/UPDATE/DELETE with RETURNING clause)
if fetch and (not is_write or 'RETURNING' in query_upper):
return cursor.fetchall()
elif is_write:
return cursor.rowcount
return []
except Exception as e:
conn.rollback()
logger.error(f"Query error: {e}")
raise
finally:
release_db_connection(conn)
def execute_insert(query: str, params: tuple = None):
"""Execute INSERT query and return new ID"""
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
conn.commit()
result = cursor.fetchone()
return result['id'] if result and 'id' in result else None
except Exception as e:
conn.rollback()
logger.error(f"Insert error: {e}")
raise
finally:
release_db_connection(conn)
def execute_update(query: str, params: tuple = None):
"""Execute UPDATE/DELETE query and return affected rows"""
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
conn.commit()
return cursor.rowcount
except Exception as e:
conn.rollback()
logger.error(f"Update error: {e}")
raise
finally:
release_db_connection(conn)
def execute_query_single(query: str, params: tuple = None):
"""Execute query and return single row (backwards compatibility for fetchone=True)"""
result = execute_query(query, params)
return result[0] if result and len(result) > 0 else None