""" Customer Data Consistency Service Compares customer data across BMC Hub, vTiger Cloud, and e-conomic """ import logging import asyncio from typing import Dict, List, Optional, Tuple, Any from app.core.database import execute_query_single, execute_update from app.services.vtiger_service import VTigerService from app.services.economic_service import EconomicService from app.core.config import settings logger = logging.getLogger(__name__) class CustomerConsistencyService: """Service for checking and syncing customer data across systems""" # Field mapping: hub_field -> (vtiger_field, economic_field) FIELD_MAP = { 'name': ('accountname', 'name'), 'cvr_number': ('cf_856', 'corporateIdentificationNumber'), 'address': ('bill_street', 'address'), 'city': ('bill_city', 'city'), 'postal_code': ('bill_code', 'zip'), 'country': ('bill_country', 'country'), 'phone': ('phone', 'telephoneAndFaxNumber'), 'mobile_phone': ('mobile', 'mobilePhone'), 'email': ('email1', 'email'), 'website': ('website', 'website'), 'invoice_email': ('email2', 'email'), } def __init__(self): self.vtiger = VTigerService() self.economic = EconomicService() @staticmethod def normalize_value(value: Any) -> Optional[str]: """ Normalize value for comparison - Convert to string - Strip whitespace - Lowercase - Convert empty strings to None """ if value is None: return None # Convert to string str_value = str(value).strip() # Empty string to None if not str_value: return None # Lowercase for case-insensitive comparison return str_value.lower() async def fetch_all_data(self, customer_id: int) -> Dict[str, Optional[Dict[str, Any]]]: """ Fetch customer data from all three systems in parallel Args: customer_id: Hub customer ID Returns: Dict with keys 'hub', 'vtiger', 'economic' containing raw data (or None) """ logger.info(f"🔍 Fetching customer data from all systems for customer {customer_id}") # Fetch Hub data first to get mapping IDs hub_query = """ SELECT * FROM customers WHERE id = %s """ hub_data = await asyncio.to_thread(execute_query_single, hub_query, (customer_id,)) if not hub_data: raise ValueError(f"Customer {customer_id} not found in Hub") # Prepare async tasks for vTiger and e-conomic vtiger_task = None economic_task = None # Fetch vTiger data if we have an ID and vTiger is configured if hub_data.get('vtiger_id') and settings.VTIGER_URL: vtiger_task = self.vtiger.get_account_by_id(hub_data['vtiger_id']) # Fetch e-conomic data if we have a customer number and e-conomic is configured if hub_data.get('economic_customer_number') and settings.ECONOMIC_APP_SECRET_TOKEN: economic_task = self.economic.get_customer(hub_data['economic_customer_number']) # Parallel fetch with error handling tasks = {} if vtiger_task: tasks['vtiger'] = vtiger_task if economic_task: tasks['economic'] = economic_task results = {} if tasks: task_results = await asyncio.gather( *tasks.values(), return_exceptions=True ) # Map results back for key, result in zip(tasks.keys(), task_results): if isinstance(result, Exception): logger.error(f"❌ Error fetching {key} data: {result}") results[key] = None else: results[key] = result return { 'hub': hub_data, 'vtiger': results.get('vtiger'), 'economic': results.get('economic') } @classmethod def compare_data(cls, all_data: Dict[str, Optional[Dict[str, Any]]]) -> Dict[str, Dict[str, Any]]: """ Compare data across systems and identify discrepancies Args: all_data: Dict with 'hub', 'vtiger', 'economic' data (values may be None) Returns: Dict of discrepancies: { field_name: { 'hub': value, 'vtiger': value, 'economic': value, 'discrepancy': True/False } } """ discrepancies = {} hub_data = all_data.get('hub', {}) vtiger_data = all_data.get('vtiger', {}) economic_data = all_data.get('economic', {}) for hub_field, (vtiger_field, economic_field) in cls.FIELD_MAP.items(): # Get raw values hub_value = hub_data.get(hub_field) vtiger_value = vtiger_data.get(vtiger_field) if vtiger_data else None economic_value = economic_data.get(economic_field) if economic_data else None # Normalize for comparison hub_norm = cls.normalize_value(hub_value) vtiger_norm = cls.normalize_value(vtiger_value) economic_norm = cls.normalize_value(economic_value) # Check if all values are the same # Only compare systems that are available available_values = [] if hub_data: available_values.append(hub_norm) if vtiger_data: available_values.append(vtiger_norm) if economic_data: available_values.append(economic_norm) # Has discrepancy if there are different non-None values has_discrepancy = len(set(available_values)) > 1 if len(available_values) > 1 else False discrepancies[hub_field] = { 'hub': hub_value, 'vtiger': vtiger_value, 'economic': economic_value, 'discrepancy': has_discrepancy } return discrepancies async def sync_field( self, customer_id: int, field_name: str, source_system: str, source_value: Any ) -> Dict[str, bool]: """ Sync a field value to all enabled systems Args: customer_id: Hub customer ID field_name: Hub field name (from FIELD_MAP keys) source_system: 'hub', 'vtiger', or 'economic' source_value: The correct value to sync Returns: Dict with sync status: {'hub': True/False, 'vtiger': True/False, 'economic': True/False} """ logger.info(f"🔄 Syncing {field_name} from {source_system} with value: {source_value}") if field_name not in self.FIELD_MAP: raise ValueError(f"Unknown field: {field_name}") vtiger_field, economic_field = self.FIELD_MAP[field_name] # Fetch Hub data to get mapping IDs hub_query = "SELECT * FROM customers WHERE id = %s" hub_data = await asyncio.to_thread(execute_query_single, hub_query, (customer_id,)) if not hub_data: raise ValueError(f"Customer {customer_id} not found") results = {} # Update Hub if not the source if source_system != 'hub': try: update_query = f"UPDATE customers SET {field_name} = %s WHERE id = %s" await asyncio.to_thread(execute_update, update_query, (source_value, customer_id)) results['hub'] = True logger.info(f"✅ Hub {field_name} updated") except Exception as e: logger.error(f"❌ Failed to update Hub: {e}") results['hub'] = False else: results['hub'] = True # Already correct # Update vTiger if enabled and not the source if settings.VTIGER_SYNC_ENABLED and source_system != 'vtiger' and hub_data.get('vtiger_id'): try: update_data = {vtiger_field: source_value} success = await self.vtiger.update_account(hub_data['vtiger_id'], update_data) if success: results['vtiger'] = True logger.info(f"✅ vTiger {vtiger_field} updated") else: results['vtiger'] = False logger.error(f"❌ vTiger update failed - API returned False") except Exception as e: logger.error(f"❌ Failed to update vTiger: {e}") results['vtiger'] = False else: results['vtiger'] = True # Not applicable or already correct # Update e-conomic if enabled and not the source if settings.ECONOMIC_SYNC_ENABLED and source_system != 'economic' and hub_data.get('economic_customer_number'): try: # e-conomic update requires different handling based on field update_data = {economic_field: source_value} # Check safety flags if settings.ECONOMIC_READ_ONLY or settings.ECONOMIC_DRY_RUN: logger.warning(f"⚠️ e-conomic update blocked by safety flags (READ_ONLY={settings.ECONOMIC_READ_ONLY}, DRY_RUN={settings.ECONOMIC_DRY_RUN})") results['economic'] = False else: await self.economic.update_customer(hub_data['economic_customer_number'], update_data) results['economic'] = True logger.info(f"✅ e-conomic {economic_field} updated") except Exception as e: logger.error(f"❌ Failed to update e-conomic: {e}") results['economic'] = False else: results['economic'] = True # Not applicable or already correct return results