""" vTiger Cloud CRM Integration Service Handles subscription and sales order data retrieval """ import logging import aiohttp from typing import List, Dict, Optional from app.core.config import settings logger = logging.getLogger(__name__) class VTigerService: """Service for integrating with vTiger Cloud CRM via REST API""" def __init__(self): self.base_url = getattr(settings, 'VTIGER_URL', None) self.username = getattr(settings, 'VTIGER_USERNAME', None) self.api_key = getattr(settings, 'VTIGER_API_KEY', None) # REST API endpoint if self.base_url: self.rest_endpoint = f"{self.base_url}/restapi/v1/vtiger/default" else: self.rest_endpoint = None if not all([self.base_url, self.username, self.api_key]): logger.warning("⚠️ vTiger credentials not fully configured") def _get_auth(self): """Get HTTP Basic Auth credentials""" if not self.api_key: raise ValueError("VTIGER_API_KEY not configured") return aiohttp.BasicAuth(self.username, self.api_key) async def query(self, query_string: str) -> List[Dict]: """ Execute a query on vTiger REST API Args: query_string: SQL-like query (e.g., "SELECT * FROM Accounts;") Returns: List of records """ if not self.rest_endpoint: raise ValueError("VTIGER_URL not configured") try: auth = self._get_auth() async with aiohttp.ClientSession() as session: async with session.get( f"{self.rest_endpoint}/query", params={"query": query_string}, auth=auth ) as response: text = await response.text() if response.status == 200: # vTiger returns text/json instead of application/json import json try: data = json.loads(text) except json.JSONDecodeError as e: logger.error(f"❌ Invalid JSON in query response: {text[:200]}") return [] if data.get('success'): result = data.get('result', []) logger.info(f"✅ Query returned {len(result)} records") return result else: logger.error(f"❌ vTiger query failed: {data.get('error')}") return [] else: logger.error(f"❌ vTiger query HTTP error {response.status}") logger.error(f"Query: {query_string}") logger.error(f"Response: {text[:500]}") return [] except Exception as e: logger.error(f"❌ vTiger query error: {e}") return [] async def get_customer_sales_orders(self, vtiger_account_id: str) -> List[Dict]: """ Fetch sales orders for a customer from vTiger Args: vtiger_account_id: vTiger account ID (e.g., "3x760") Returns: List of sales order records """ if not vtiger_account_id: logger.warning("⚠️ No vTiger account ID provided") return [] try: # Query for sales orders linked to this account query = f"SELECT * FROM SalesOrder WHERE account_id='{vtiger_account_id}';" logger.info(f"🔍 Fetching sales orders for vTiger account {vtiger_account_id}") orders = await self.query(query) logger.info(f"✅ Found {len(orders)} sales orders") return orders except Exception as e: logger.error(f"❌ Error fetching sales orders: {e}") return [] async def get_customer_subscriptions(self, vtiger_account_id: str) -> List[Dict]: """ Fetch subscriptions for a customer from vTiger Args: vtiger_account_id: vTiger account ID (e.g., "3x760") Returns: List of subscription records """ if not vtiger_account_id: logger.warning("⚠️ No vTiger account ID provided") return [] try: # Query for subscriptions linked to this account (note: module name is singular "Subscription") query = f"SELECT * FROM Subscription WHERE account_id='{vtiger_account_id}';" logger.info(f"🔍 Fetching subscriptions for vTiger account {vtiger_account_id}") subscriptions = await self.query(query) logger.info(f"✅ Found {len(subscriptions)} subscriptions") return subscriptions except Exception as e: logger.error(f"❌ Error fetching subscriptions: {e}") return [] async def test_connection(self) -> bool: """ Test vTiger connection using /me endpoint Returns: True if connection successful """ if not self.rest_endpoint: raise ValueError("VTIGER_URL not configured in .env") try: auth = self._get_auth() logger.info(f"🔑 Testing vTiger connection...") async with aiohttp.ClientSession() as session: async with session.get( f"{self.rest_endpoint}/me", auth=auth ) as response: if response.status == 200: # vTiger returns text/json instead of application/json text = await response.text() import json data = json.loads(text) if data.get('success'): user_name = data['result'].get('user_name') logger.info(f"✅ vTiger connection successful (user: {user_name})") return True else: logger.error(f"❌ vTiger API returned success=false: {data}") return False else: error_text = await response.text() logger.error(f"❌ vTiger connection failed: HTTP {response.status}: {error_text}") return False except Exception as e: logger.error(f"❌ vTiger connection error: {e}") return False # Singleton instance _vtiger_service = None def get_vtiger_service() -> VTigerService: """Get or create vTiger service singleton""" global _vtiger_service if _vtiger_service is None: _vtiger_service = VTigerService() return _vtiger_service