bmc_hub/app/services/customer_consistency.py

263 lines
10 KiB
Python

"""
Customer Data Consistency Service
Compares customer data across BMC Hub, vTiger Cloud, and e-conomic
"""
import logging
import asyncio
from typing import Dict, List, Optional, Tuple, Any
from app.core.database import execute_query_single, execute_update
from app.services.vtiger_service import VTigerService
from app.services.economic_service import EconomicService
from app.core.config import settings
logger = logging.getLogger(__name__)
class CustomerConsistencyService:
"""Service for checking and syncing customer data across systems"""
# Field mapping: hub_field -> (vtiger_field, economic_field)
FIELD_MAP = {
'name': ('accountname', 'name'),
'cvr_number': ('cf_856', 'corporateIdentificationNumber'),
'address': ('bill_street', 'address'),
'city': ('bill_city', 'city'),
'postal_code': ('bill_code', 'zip'),
'country': ('bill_country', 'country'),
'phone': ('phone', 'telephoneAndFaxNumber'),
'mobile_phone': ('mobile', 'mobilePhone'),
'email': ('email1', 'email'),
'website': ('website', 'website'),
'invoice_email': ('email2', 'email'),
}
def __init__(self):
self.vtiger = VTigerService()
self.economic = EconomicService()
@staticmethod
def normalize_value(value: Any) -> Optional[str]:
"""
Normalize value for comparison
- Convert to string
- Strip whitespace
- Lowercase
- Convert empty strings to None
"""
if value is None:
return None
# Convert to string
str_value = str(value).strip()
# Empty string to None
if not str_value:
return None
# Lowercase for case-insensitive comparison
return str_value.lower()
async def fetch_all_data(self, customer_id: int) -> Dict[str, Optional[Dict[str, Any]]]:
"""
Fetch customer data from all three systems in parallel
Args:
customer_id: Hub customer ID
Returns:
Dict with keys 'hub', 'vtiger', 'economic' containing raw data (or None)
"""
logger.info(f"🔍 Fetching customer data from all systems for customer {customer_id}")
# Fetch Hub data first to get mapping IDs
hub_query = """
SELECT * FROM customers WHERE id = %s
"""
hub_data = await asyncio.to_thread(execute_query_single, hub_query, (customer_id,))
if not hub_data:
raise ValueError(f"Customer {customer_id} not found in Hub")
# Prepare async tasks for vTiger and e-conomic
vtiger_task = None
economic_task = None
# Fetch vTiger data if we have an ID and vTiger is configured
if hub_data.get('vtiger_id') and settings.VTIGER_URL:
vtiger_task = self.vtiger.get_account_by_id(hub_data['vtiger_id'])
# Fetch e-conomic data if we have a customer number and e-conomic is configured
if hub_data.get('economic_customer_number') and settings.ECONOMIC_APP_SECRET_TOKEN:
economic_task = self.economic.get_customer(hub_data['economic_customer_number'])
# Parallel fetch with error handling
tasks = {}
if vtiger_task:
tasks['vtiger'] = vtiger_task
if economic_task:
tasks['economic'] = economic_task
results = {}
if tasks:
task_results = await asyncio.gather(
*tasks.values(),
return_exceptions=True
)
# Map results back
for key, result in zip(tasks.keys(), task_results):
if isinstance(result, Exception):
logger.error(f"❌ Error fetching {key} data: {result}")
results[key] = None
else:
results[key] = result
return {
'hub': hub_data,
'vtiger': results.get('vtiger'),
'economic': results.get('economic')
}
@classmethod
def compare_data(cls, all_data: Dict[str, Optional[Dict[str, Any]]]) -> Dict[str, Dict[str, Any]]:
"""
Compare data across systems and identify discrepancies
Args:
all_data: Dict with 'hub', 'vtiger', 'economic' data (values may be None)
Returns:
Dict of discrepancies: {
field_name: {
'hub': value,
'vtiger': value,
'economic': value,
'discrepancy': True/False
}
}
"""
discrepancies = {}
hub_data = all_data.get('hub', {})
vtiger_data = all_data.get('vtiger', {})
economic_data = all_data.get('economic', {})
for hub_field, (vtiger_field, economic_field) in cls.FIELD_MAP.items():
# Get raw values
hub_value = hub_data.get(hub_field)
vtiger_value = vtiger_data.get(vtiger_field) if vtiger_data else None
economic_value = economic_data.get(economic_field) if economic_data else None
# Normalize for comparison
hub_norm = cls.normalize_value(hub_value)
vtiger_norm = cls.normalize_value(vtiger_value)
economic_norm = cls.normalize_value(economic_value)
# Check if all values are the same
# Only compare systems that are available
available_values = []
if hub_data:
available_values.append(hub_norm)
if vtiger_data:
available_values.append(vtiger_norm)
if economic_data:
available_values.append(economic_norm)
# Has discrepancy if there are different non-None values
has_discrepancy = len(set(available_values)) > 1 if len(available_values) > 1 else False
discrepancies[hub_field] = {
'hub': hub_value,
'vtiger': vtiger_value,
'economic': economic_value,
'discrepancy': has_discrepancy
}
return discrepancies
async def sync_field(
self,
customer_id: int,
field_name: str,
source_system: str,
source_value: Any
) -> Dict[str, bool]:
"""
Sync a field value to all enabled systems
Args:
customer_id: Hub customer ID
field_name: Hub field name (from FIELD_MAP keys)
source_system: 'hub', 'vtiger', or 'economic'
source_value: The correct value to sync
Returns:
Dict with sync status: {'hub': True/False, 'vtiger': True/False, 'economic': True/False}
"""
logger.info(f"🔄 Syncing {field_name} from {source_system} with value: {source_value}")
if field_name not in self.FIELD_MAP:
raise ValueError(f"Unknown field: {field_name}")
vtiger_field, economic_field = self.FIELD_MAP[field_name]
# Fetch Hub data to get mapping IDs
hub_query = "SELECT * FROM customers WHERE id = %s"
hub_data = await asyncio.to_thread(execute_query_single, hub_query, (customer_id,))
if not hub_data:
raise ValueError(f"Customer {customer_id} not found")
results = {}
# Update Hub if not the source
if source_system != 'hub':
try:
update_query = f"UPDATE customers SET {field_name} = %s WHERE id = %s"
await asyncio.to_thread(execute_update, update_query, (source_value, customer_id))
results['hub'] = True
logger.info(f"✅ Hub {field_name} updated")
except Exception as e:
logger.error(f"❌ Failed to update Hub: {e}")
results['hub'] = False
else:
results['hub'] = True # Already correct
# Update vTiger if enabled and not the source
if settings.VTIGER_SYNC_ENABLED and source_system != 'vtiger' and hub_data.get('vtiger_id'):
try:
update_data = {vtiger_field: source_value}
success = await self.vtiger.update_account(hub_data['vtiger_id'], update_data)
if success:
results['vtiger'] = True
logger.info(f"✅ vTiger {vtiger_field} updated")
else:
results['vtiger'] = False
logger.error(f"❌ vTiger update failed - API returned False")
except Exception as e:
logger.error(f"❌ Failed to update vTiger: {e}")
results['vtiger'] = False
else:
results['vtiger'] = True # Not applicable or already correct
# Update e-conomic if enabled and not the source
if settings.ECONOMIC_SYNC_ENABLED and source_system != 'economic' and hub_data.get('economic_customer_number'):
try:
# e-conomic update requires different handling based on field
update_data = {economic_field: source_value}
# Check safety flags
if settings.ECONOMIC_READ_ONLY or settings.ECONOMIC_DRY_RUN:
logger.warning(f"⚠️ e-conomic update blocked by safety flags (READ_ONLY={settings.ECONOMIC_READ_ONLY}, DRY_RUN={settings.ECONOMIC_DRY_RUN})")
results['economic'] = False
else:
await self.economic.update_customer(hub_data['economic_customer_number'], update_data)
results['economic'] = True
logger.info(f"✅ e-conomic {economic_field} updated")
except Exception as e:
logger.error(f"❌ Failed to update e-conomic: {e}")
results['economic'] = False
else:
results['economic'] = True # Not applicable or already correct
return results