124 lines
3.5 KiB
Python
124 lines
3.5 KiB
Python
"""
|
|
Database Module
|
|
PostgreSQL connection and helpers using psycopg2
|
|
"""
|
|
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
from psycopg2.pool import SimpleConnectionPool
|
|
from typing import Optional
|
|
import logging
|
|
|
|
from app.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Connection pool
|
|
connection_pool: Optional[SimpleConnectionPool] = None
|
|
|
|
|
|
def init_db():
|
|
"""Initialize database connection pool"""
|
|
global connection_pool
|
|
|
|
try:
|
|
connection_pool = SimpleConnectionPool(
|
|
minconn=1,
|
|
maxconn=10,
|
|
dsn=settings.DATABASE_URL
|
|
)
|
|
logger.info("✅ Database connection pool initialized")
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to initialize database: {e}")
|
|
raise
|
|
|
|
|
|
def get_db_connection():
|
|
"""Get a connection from the pool"""
|
|
if connection_pool:
|
|
return connection_pool.getconn()
|
|
raise Exception("Database pool not initialized")
|
|
|
|
|
|
def release_db_connection(conn):
|
|
"""Return a connection to the pool"""
|
|
if connection_pool:
|
|
connection_pool.putconn(conn)
|
|
|
|
|
|
def get_db():
|
|
"""Context manager for database connections"""
|
|
conn = get_db_connection()
|
|
try:
|
|
yield conn
|
|
finally:
|
|
release_db_connection(conn)
|
|
|
|
|
|
def execute_query(query: str, params: tuple = None, fetch: bool = True):
|
|
"""Execute a SQL query and return results"""
|
|
conn = get_db_connection()
|
|
try:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
|
|
cursor.execute(query, params)
|
|
|
|
# Auto-detect write operations and commit
|
|
query_upper = query.strip().upper()
|
|
is_write = query_upper.startswith(('INSERT', 'UPDATE', 'DELETE'))
|
|
|
|
if is_write:
|
|
conn.commit()
|
|
|
|
# Only fetch if there are results to fetch
|
|
# (SELECT queries or INSERT/UPDATE/DELETE with RETURNING clause)
|
|
if fetch and (not is_write or 'RETURNING' in query_upper):
|
|
return cursor.fetchall()
|
|
elif is_write:
|
|
return cursor.rowcount
|
|
return []
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error(f"Query error: {e}")
|
|
raise
|
|
finally:
|
|
release_db_connection(conn)
|
|
|
|
|
|
def execute_insert(query: str, params: tuple = None):
|
|
"""Execute INSERT query and return new ID"""
|
|
conn = get_db_connection()
|
|
try:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
|
|
cursor.execute(query, params)
|
|
conn.commit()
|
|
result = cursor.fetchone()
|
|
return result['id'] if result and 'id' in result else None
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error(f"Insert error: {e}")
|
|
raise
|
|
finally:
|
|
release_db_connection(conn)
|
|
|
|
|
|
def execute_update(query: str, params: tuple = None):
|
|
"""Execute UPDATE/DELETE query and return affected rows"""
|
|
conn = get_db_connection()
|
|
try:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
|
|
cursor.execute(query, params)
|
|
conn.commit()
|
|
return cursor.rowcount
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error(f"Update error: {e}")
|
|
raise
|
|
finally:
|
|
release_db_connection(conn)
|
|
|
|
|
|
def execute_query_single(query: str, params: tuple = None):
|
|
"""Execute query and return single row (backwards compatibility for fetchone=True)"""
|
|
result = execute_query(query, params)
|
|
return result[0] if result and len(result) > 0 else None
|