""" Database Module PostgreSQL connection and helpers using psycopg2 """ import psycopg2 from psycopg2.extras import RealDictCursor from psycopg2.pool import SimpleConnectionPool from typing import Optional import logging from app.core.config import settings logger = logging.getLogger(__name__) # Connection pool connection_pool: Optional[SimpleConnectionPool] = None def init_db(): """Initialize database connection pool""" global connection_pool try: connection_pool = SimpleConnectionPool( minconn=1, maxconn=10, dsn=settings.DATABASE_URL ) logger.info("✅ Database connection pool initialized") except Exception as e: logger.error(f"❌ Failed to initialize database: {e}") raise def get_db_connection(): """Get a connection from the pool""" if connection_pool: return connection_pool.getconn() raise Exception("Database pool not initialized") def release_db_connection(conn): """Return a connection to the pool""" if connection_pool: connection_pool.putconn(conn) def get_db(): """Context manager for database connections""" conn = get_db_connection() try: yield conn finally: release_db_connection(conn) def execute_query(query: str, params: tuple = None, fetch: bool = True): """Execute a SQL query and return results""" conn = get_db_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(query, params) # Auto-detect write operations and commit query_upper = query.strip().upper() is_write = query_upper.startswith(('INSERT', 'UPDATE', 'DELETE')) if is_write: conn.commit() # Only fetch if there are results to fetch # (SELECT queries or INSERT/UPDATE/DELETE with RETURNING clause) if fetch and (not is_write or 'RETURNING' in query_upper): return cursor.fetchall() elif is_write: return cursor.rowcount return [] except Exception as e: conn.rollback() logger.error(f"Query error: {e}") raise finally: release_db_connection(conn) def execute_insert(query: str, params: tuple = None): """Execute INSERT query and return new ID (requires RETURNING id clause)""" conn = get_db_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(query, params) conn.commit() result = cursor.fetchone() return result['id'] if result and 'id' in result else None except Exception as e: conn.rollback() logger.error(f"Insert error: {e}") raise finally: release_db_connection(conn) def execute_update(query: str, params: tuple = None): """Execute UPDATE/DELETE query and return affected rows""" conn = get_db_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(query, params) conn.commit() return cursor.rowcount except Exception as e: conn.rollback() logger.error(f"Update error: {e}") raise finally: release_db_connection(conn) def execute_query_single(query: str, params: tuple = None): """Execute query and return single row (backwards compatibility for fetchone=True)""" result = execute_query(query, params) return result[0] if result and len(result) > 0 else None