""" Database Module PostgreSQL connection and helpers using psycopg2 """ import psycopg2 from psycopg2.extras import RealDictCursor from psycopg2.pool import SimpleConnectionPool from typing import Optional import logging from app.core.config import settings logger = logging.getLogger(__name__) # Connection pool connection_pool: Optional[SimpleConnectionPool] = None def init_db(): """Initialize database connection pool""" global connection_pool try: connection_pool = SimpleConnectionPool( minconn=1, maxconn=10, dsn=settings.DATABASE_URL ) logger.info("✅ Database connection pool initialized") except Exception as e: logger.error(f"❌ Failed to initialize database: {e}") raise def get_db_connection(): """Get a connection from the pool""" if connection_pool: return connection_pool.getconn() raise Exception("Database pool not initialized") def release_db_connection(conn): """Return a connection to the pool""" if connection_pool: connection_pool.putconn(conn) def get_db(): """Context manager for database connections""" conn = get_db_connection() try: yield conn finally: release_db_connection(conn) def execute_query(query: str, params: tuple = None, fetch: bool = True): """Execute a SQL query and return results""" conn = get_db_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(query, params) # Auto-detect write operations and commit query_upper = query.strip().upper() if query_upper.startswith(('INSERT', 'UPDATE', 'DELETE')): conn.commit() if fetch: return cursor.fetchall() return cursor.rowcount except Exception as e: conn.rollback() logger.error(f"Query error: {e}") raise finally: release_db_connection(conn) def execute_insert(query: str, params: tuple = None): """Execute INSERT query and return new ID""" conn = get_db_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(query, params) conn.commit() result = cursor.fetchone() return result['id'] if result and 'id' in result else None except Exception as e: conn.rollback() logger.error(f"Insert error: {e}") raise finally: release_db_connection(conn) def execute_update(query: str, params: tuple = None): """Execute UPDATE/DELETE query and return affected rows""" conn = get_db_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(query, params) conn.commit() return cursor.rowcount except Exception as e: conn.rollback() logger.error(f"Update error: {e}") raise finally: release_db_connection(conn) def execute_query_single(query: str, params: tuple = None): """Execute query and return single row (backwards compatibility for fetchone=True)""" result = execute_query(query, params) return result[0] if result and len(result) > 0 else None