bmc_hub/app/core/database.py

149 lines
4.0 KiB
Python

"""
Database Module
PostgreSQL connection and helpers using psycopg2
"""
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.pool import SimpleConnectionPool
from typing import Optional
import logging
from app.core.config import settings
logger = logging.getLogger(__name__)
# Connection pool
connection_pool: Optional[SimpleConnectionPool] = None
def init_db():
"""Initialize database connection pool"""
global connection_pool
try:
connection_pool = SimpleConnectionPool(
minconn=1,
maxconn=10,
dsn=settings.DATABASE_URL
)
logger.info("✅ Database connection pool initialized")
except Exception as e:
logger.error(f"❌ Failed to initialize database: {e}")
raise
def get_db_connection():
"""Get a connection from the pool"""
if connection_pool:
return connection_pool.getconn()
raise Exception("Database pool not initialized")
def release_db_connection(conn):
"""Return a connection to the pool"""
if connection_pool:
connection_pool.putconn(conn)
def get_db():
"""Context manager for database connections"""
conn = get_db_connection()
try:
yield conn
finally:
release_db_connection(conn)
def execute_query(query: str, params: tuple = None, fetchone: bool = False):
"""
Execute a SQL query and return results
Args:
query: SQL query string
params: Query parameters tuple
fetchone: If True, return single row dict, otherwise list of dicts
Returns:
Single dict if fetchone=True, otherwise list of dicts
"""
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params or ())
if fetchone:
row = cursor.fetchone()
return dict(row) if row else None
else:
rows = cursor.fetchall()
return [dict(row) for row in rows]
except Exception as e:
conn.rollback()
logger.error(f"Query error: {e}")
raise
finally:
release_db_connection(conn)
def execute_insert(query: str, params: tuple = ()) -> Optional[int]:
"""
Execute an INSERT query and return last row id
Args:
query: SQL INSERT query (will add RETURNING id if not present)
params: Query parameters tuple
Returns:
Last inserted row ID or None
"""
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
# PostgreSQL requires RETURNING clause
if "RETURNING" not in query.upper():
query = query.rstrip(";") + " RETURNING id"
cursor.execute(query, params)
result = cursor.fetchone()
conn.commit()
# If result exists, return the first column value (typically ID)
if result:
# If it's a dict, get first value
if isinstance(result, dict):
return list(result.values())[0]
# If it's a tuple/list, get first element
return result[0]
return None
except Exception as e:
conn.rollback()
logger.error(f"Insert error: {e}")
raise
finally:
release_db_connection(conn)
def execute_update(query: str, params: tuple = ()) -> int:
"""
Execute an UPDATE/DELETE query and return affected rows
Args:
query: SQL UPDATE/DELETE query
params: Query parameters tuple
Returns:
Number of affected rows
"""
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
rowcount = cursor.rowcount
conn.commit()
return rowcount
except Exception as e:
conn.rollback()
logger.error(f"Update error: {e}")
raise
finally:
release_db_connection(conn)