""" vTiger Sync Service for Time Tracking Module ============================================= 🚨 KRITISK: Denne service mΓ₯ KUN lΓ¦se data fra vTiger. Ingen opdateringer, ingen statusΓ¦ndringer, ingen skrivninger. FormΓ₯l: - Hent ModComments (tidsregistreringer) fra vTiger - Hent HelpDesk/ProjectTask (cases) fra vTiger - Hent Accounts (kunder) fra vTiger - Gem alt i tmodule_* tabeller (isoleret) Safety Flags: - TIMETRACKING_VTIGER_READ_ONLY = True (default) - TIMETRACKING_VTIGER_DRY_RUN = True (default) """ import logging import hashlib import json from datetime import datetime from typing import List, Dict, Optional, Any from decimal import Decimal import aiohttp from fastapi import HTTPException from app.core.config import settings from app.core.database import execute_query, execute_insert, execute_update from app.timetracking.backend.models import TModuleSyncStats from app.timetracking.backend.audit import audit logger = logging.getLogger(__name__) class TimeTrackingVTigerService: """ vTiger integration for Time Tracking Module. πŸ”’ READ-ONLY service - ingen skrivninger til vTiger tilladt. """ def __init__(self): self.base_url = settings.VTIGER_URL self.username = settings.VTIGER_USERNAME self.api_key = settings.VTIGER_API_KEY self.password = settings.VTIGER_PASSWORD self.rest_endpoint = f"{self.base_url}/restapi/v1/vtiger/default" # Safety flags self.read_only = settings.TIMETRACKING_VTIGER_READ_ONLY self.dry_run = settings.TIMETRACKING_VTIGER_DRY_RUN # Log safety status if self.read_only: logger.warning("πŸ”’ TIMETRACKING vTiger READ-ONLY mode: Enabled") if self.dry_run: logger.warning("πŸƒ TIMETRACKING vTiger DRY-RUN mode: Enabled") if not self.read_only: logger.error("⚠️ WARNING: TIMETRACKING vTiger READ-ONLY disabled! This violates module isolation!") def _get_auth(self) -> aiohttp.BasicAuth: """Get HTTP Basic Auth""" # Prefer API key over password auth_value = self.api_key if self.api_key else self.password return aiohttp.BasicAuth(self.username, auth_value) def _calculate_hash(self, data: Dict[str, Any]) -> str: """Calculate SHA256 hash of data for change detection""" # Sort keys for consistent hashing json_str = json.dumps(data, sort_keys=True) return hashlib.sha256(json_str.encode()).hexdigest() async def _retrieve(self, record_id: str) -> Dict: """ Retrieve full record details via vTiger REST API. This gets ALL fields including relationships that query doesn't return. πŸ” READ-ONLY operation """ try: async with aiohttp.ClientSession() as session: async with session.get( f"{self.rest_endpoint}/retrieve", params={"id": record_id}, auth=self._get_auth(), timeout=aiohttp.ClientTimeout(total=30) ) as response: text = await response.text() if response.status != 200: logger.error(f"❌ vTiger retrieve failed: HTTP {response.status} for {record_id}") return {} try: data = json.loads(text) except json.JSONDecodeError: logger.error(f"❌ Invalid JSON in retrieve response for {record_id}") return {} if not data.get('success'): logger.error(f"❌ vTiger retrieve failed for {record_id}: {data.get('error', {})}") return {} return data.get('result', {}) except Exception as e: logger.error(f"❌ Error retrieving {record_id}: {e}") return {} async def _query(self, query_string: str) -> List[Dict]: """ Execute SQL-like query against vTiger REST API. πŸ” READ-ONLY operation """ try: async with aiohttp.ClientSession() as session: async with session.get( f"{self.rest_endpoint}/query", params={"query": query_string}, auth=self._get_auth(), timeout=aiohttp.ClientTimeout(total=60) ) as response: text = await response.text() if response.status != 200: logger.error(f"❌ vTiger query failed: HTTP {response.status}") logger.error(f"Query: {query_string}") logger.error(f"Response body: {text[:1000]}") logger.error(f"Response headers: {dict(response.headers)}") raise HTTPException( status_code=response.status, detail=f"vTiger API error: {text[:200]}" ) # vTiger returns text/json, not application/json try: data = json.loads(text) except json.JSONDecodeError as e: logger.error(f"❌ Invalid JSON in query response: {text[:200]}") raise HTTPException(status_code=500, detail="Invalid JSON from vTiger") # Check vTiger success flag if not data.get('success'): error_msg = data.get('error', {}).get('message', 'Unknown error') logger.error(f"❌ vTiger query failed: {error_msg}") logger.error(f"Query: {query_string}") raise HTTPException(status_code=400, detail=f"vTiger error: {error_msg}") result = data.get('result', []) logger.info(f"βœ… Query returned {len(result)} records") return result except aiohttp.ClientError as e: logger.error(f"❌ vTiger connection error: {e}") raise HTTPException(status_code=503, detail=f"Cannot connect to vTiger: {str(e)}") async def _retrieve(self, record_id: str) -> Dict: """ Retrieve single record by ID. πŸ” READ-ONLY operation """ try: async with aiohttp.ClientSession() as session: async with session.get( f"{self.rest_endpoint}/retrieve", params={"id": record_id}, auth=self._get_auth(), timeout=aiohttp.ClientTimeout(total=30) ) as response: if response.status != 200: error_text = await response.text() logger.error(f"❌ vTiger retrieve failed: {response.status} - {error_text}") return {} data = json.loads(await response.text()) if not data.get('success', False): return {} return data.get('result', {}) except aiohttp.ClientError as e: logger.error(f"❌ vTiger retrieve error: {e}") return {} async def test_connection(self) -> bool: """Test vTiger connection""" try: logger.info("πŸ” Testing vTiger connection...") async with aiohttp.ClientSession() as session: async with session.get( f"{self.rest_endpoint}/me", auth=self._get_auth(), timeout=aiohttp.ClientTimeout(total=10) ) as response: if response.status == 200: logger.info("βœ… vTiger connection successful") return True else: logger.error(f"❌ vTiger connection failed: {response.status}") return False except Exception as e: logger.error(f"❌ vTiger connection error: {e}") return False async def sync_customers(self, limit: int = 1000) -> Dict[str, int]: """ Sync Accounts (customers) from vTiger to tmodule_customers. Returns: {imported: X, updated: Y, skipped: Z} """ logger.info("πŸ” Syncing customers from vTiger...") stats = {"imported": 0, "updated": 0, "skipped": 0, "errors": 0} try: # Query vTiger for active accounts # Start with simplest query to debug query = "SELECT * FROM Accounts;" accounts = await self._query(query) logger.info(f"πŸ“₯ Fetched {len(accounts)} accounts from vTiger") for account in accounts: try: vtiger_id = account.get('id', '') if not vtiger_id: logger.warning("⚠️ Skipping account without ID") stats["skipped"] += 1 continue # Calculate hash for change detection data_hash = self._calculate_hash(account) # Check if exists existing = execute_query( "SELECT id, sync_hash FROM tmodule_customers WHERE vtiger_id = %s", (vtiger_id,), fetchone=True ) if existing: # Check if data changed if existing['sync_hash'] == data_hash: logger.debug(f"⏭️ No changes for customer {vtiger_id}") stats["skipped"] += 1 continue # Update existing execute_update( """UPDATE tmodule_customers SET name = %s, email = %s, vtiger_data = %s::jsonb, sync_hash = %s, last_synced_at = CURRENT_TIMESTAMP WHERE vtiger_id = %s""", ( account.get('accountname', 'Unknown'), account.get('email1', None), json.dumps(account), data_hash, vtiger_id ) ) logger.debug(f"✏️ Updated customer {vtiger_id}") stats["updated"] += 1 else: # Insert new execute_insert( """INSERT INTO tmodule_customers (vtiger_id, name, email, vtiger_data, sync_hash, last_synced_at) VALUES (%s, %s, %s, %s::jsonb, %s, CURRENT_TIMESTAMP)""", ( vtiger_id, account.get('accountname', 'Unknown'), account.get('email1', None), json.dumps(account), data_hash ) ) logger.debug(f"βž• Imported customer {vtiger_id}") stats["imported"] += 1 except Exception as e: logger.error(f"❌ Error processing account {account.get('id', 'unknown')}: {e}") stats["errors"] += 1 logger.info(f"βœ… Customer sync complete: {stats}") return stats except Exception as e: logger.error(f"❌ Customer sync failed: {e}") raise async def sync_cases(self, limit: int = 5000) -> Dict[str, int]: """ Sync HelpDesk tickets (cases) from vTiger to tmodule_cases. Returns: {imported: X, updated: Y, skipped: Z} """ logger.info(f"πŸ” Syncing up to {limit} cases from vTiger...") stats = {"imported": 0, "updated": 0, "skipped": 0, "errors": 0} try: # vTiger API doesn't support OFFSET - use id-based pagination instead all_tickets = [] last_id = "0x0" # Start from beginning batch_size = 100 # Conservative batch size to avoid timeouts max_batches = limit // batch_size + 1 for batch_num in range(max_batches): # Use id > last_id for pagination (vTiger format: 39x1234) query = f"SELECT * FROM Cases WHERE id > '{last_id}' ORDER BY id LIMIT {batch_size};" batch = await self._query(query) if not batch: # No more records break all_tickets.extend(batch) last_id = batch[-1].get('id', last_id) # Get last ID for next iteration logger.info(f"πŸ“₯ Fetched {len(batch)} cases (total: {len(all_tickets)}, last_id: {last_id})") if len(batch) < batch_size: # Last batch break if len(all_tickets) >= limit: # Reached limit break tickets = all_tickets[:limit] # Trim to requested limit logger.info(f"βœ… Total fetched: {len(tickets)} HelpDesk tickets from vTiger") for ticket in tickets: try: vtiger_id = ticket.get('id', '') if not vtiger_id: stats["skipped"] += 1 continue # Get related account (customer) account_id = ticket.get('parent_id', '') if not account_id: logger.warning(f"⚠️ HelpDesk {vtiger_id} has no parent account") stats["skipped"] += 1 continue # Find customer in our DB customer = execute_query( "SELECT id FROM tmodule_customers WHERE vtiger_id = %s", (account_id,), fetchone=True ) if not customer: logger.warning(f"⚠️ Customer {account_id} not found - sync customers first") stats["skipped"] += 1 continue customer_id = customer['id'] data_hash = self._calculate_hash(ticket) # Check if exists existing = execute_query( "SELECT id, sync_hash FROM tmodule_cases WHERE vtiger_id = %s", (vtiger_id,), fetchone=True ) if existing: if existing['sync_hash'] == data_hash: stats["skipped"] += 1 continue # Update execute_update( """UPDATE tmodule_cases SET customer_id = %s, title = %s, description = %s, status = %s, priority = %s, module_type = %s, vtiger_data = %s::jsonb, sync_hash = %s, last_synced_at = CURRENT_TIMESTAMP WHERE vtiger_id = %s""", ( customer_id, ticket.get('ticket_title', 'No Title'), ticket.get('description', None), ticket.get('ticketstatus', None), ticket.get('ticketpriorities', None), 'HelpDesk', json.dumps(ticket), data_hash, vtiger_id ) ) stats["updated"] += 1 else: # Insert case_id = execute_insert( """INSERT INTO tmodule_cases (vtiger_id, customer_id, title, description, status, priority, module_type, vtiger_data, sync_hash, last_synced_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, CURRENT_TIMESTAMP)""", ( vtiger_id, customer_id, ticket.get('ticket_title', 'No Title'), ticket.get('description', None), ticket.get('ticketstatus', None), ticket.get('ticketpriorities', None), 'HelpDesk', json.dumps(ticket), data_hash ) ) stats["imported"] += 1 except Exception as e: logger.error(f"❌ Error processing case {ticket.get('id', 'unknown')}: {e}") stats["errors"] += 1 logger.info(f"βœ… Case sync complete: {stats}") return stats except Exception as e: logger.error(f"❌ Case sync failed: {e}") raise async def sync_time_entries(self, limit: int = 3000) -> Dict[str, int]: """ Sync time entries from vTiger Timelog module to tmodule_times. vTiger's Timelog module contains detailed time entries with: - timelognumber: Unique ID (TL1234) - duration: Time in seconds - relatedto: Reference to Case/Account - isbillable: Billable flag """ logger.info(f"πŸ” Syncing up to {limit} time entries from vTiger Timelog...") stats = {"imported": 0, "updated": 0, "skipped": 0, "errors": 0} try: # vTiger API doesn't support OFFSET - use id-based pagination instead all_timelogs = [] last_id = "0x0" # Start from beginning batch_size = 100 # Conservative batch size max_batches = limit // batch_size + 1 for batch_num in range(max_batches): # Use id > last_id for pagination (vTiger format: 43x1234) query = f"SELECT * FROM Timelog WHERE timelog_status = 'Completed' AND id > '{last_id}' ORDER BY id LIMIT {batch_size};" batch = await self._query(query) if not batch: # No more records break all_timelogs.extend(batch) last_id = batch[-1].get('id', last_id) # Get last ID for next iteration logger.info(f"πŸ“₯ Fetched {len(batch)} timelogs (total: {len(all_timelogs)}, last_id: {last_id})") if len(batch) < batch_size: # Last batch break if len(all_timelogs) >= limit: # Reached limit break timelogs = all_timelogs[:limit] # Trim to requested limit logger.info(f"βœ… Total fetched: {len(timelogs)} Timelog entries from vTiger") # NOTE: retrieve API is too slow for batch operations (1500+ individual calls) # We'll work with query data and accept that relatedto might be empty for some for timelog in timelogs: try: vtiger_id = timelog.get('id', '') if not vtiger_id: stats["skipped"] += 1 continue # Get duration in hours (stored as seconds in vTiger) duration_seconds = float(timelog.get('duration', 0) or 0) if duration_seconds <= 0: logger.debug(f"⏭️ Skipping timelog {vtiger_id} - no duration") stats["skipped"] += 1 continue hours = Decimal(str(duration_seconds / 3600.0)) # Convert seconds to hours # Get related entity (Case or Account) related_to = timelog.get('relatedto', '') if not related_to: logger.warning(f"⚠️ Timelog {vtiger_id} has no relatedto - RAW DATA: {timelog}") stats["skipped"] += 1 continue # Try to find case first, then account case = execute_query( "SELECT id, customer_id FROM tmodule_cases WHERE vtiger_id = %s", (related_to,), fetchone=True ) if case: case_id = case['id'] customer_id = case['customer_id'] else: # Try to find customer directly customer = execute_query( "SELECT id FROM tmodule_customers WHERE vtiger_id = %s", (related_to,), fetchone=True ) if not customer: logger.debug(f"⏭️ Related entity {related_to} not found") stats["skipped"] += 1 continue customer_id = customer['id'] case_id = None # No specific case, just customer data_hash = self._calculate_hash(timelog) # Check if exists existing = execute_query( "SELECT id, sync_hash FROM tmodule_times WHERE vtiger_id = %s", (vtiger_id,), fetchone=True ) if existing: if existing['sync_hash'] == data_hash: stats["skipped"] += 1 continue # Update only if NOT yet approved result = execute_update( """UPDATE tmodule_times SET description = %s, original_hours = %s, worked_date = %s, user_name = %s, billable = %s, vtiger_data = %s::jsonb, sync_hash = %s, last_synced_at = CURRENT_TIMESTAMP WHERE vtiger_id = %s AND status = 'pending'""", ( timelog.get('name', ''), hours, timelog.get('startedon', None), timelog.get('assigned_user_id', None), timelog.get('isbillable', '0') == '1', json.dumps(timelog), data_hash, vtiger_id ) ) if result > 0: stats["updated"] += 1 else: logger.debug(f"⏭️ Time entry {vtiger_id} already approved") stats["skipped"] += 1 else: # Insert new execute_insert( """INSERT INTO tmodule_times (vtiger_id, case_id, customer_id, description, original_hours, worked_date, user_name, billable, vtiger_data, sync_hash, status, last_synced_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, 'pending', CURRENT_TIMESTAMP)""", ( vtiger_id, case_id, customer_id, timelog.get('name', ''), hours, timelog.get('startedon', None), timelog.get('assigned_user_id', None), timelog.get('isbillable', '0') == '1', json.dumps(timelog), data_hash ) ) stats["imported"] += 1 except Exception as e: logger.error(f"❌ Error processing timelog {timelog.get('id', 'unknown')}: {e}") stats["errors"] += 1 logger.info(f"βœ… Time entry sync complete: {stats}") return stats except Exception as e: logger.error(f"❌ Time entry sync failed: {e}") raise async def full_sync( self, user_id: Optional[int] = None ) -> TModuleSyncStats: """ Perform full sync of all data from vTiger. Order: Customers -> Cases -> Time Entries (dependencies) """ logger.info("πŸš€ Starting FULL vTiger sync...") start_time = datetime.now() # Log sync started audit.log_sync_started(user_id=user_id) try: # Test connection first if not await self.test_connection(): raise HTTPException( status_code=503, detail="Cannot connect to vTiger - check credentials" ) # Sync in order of dependencies customer_stats = await self.sync_customers() case_stats = await self.sync_cases() time_stats = await self.sync_time_entries() end_time = datetime.now() duration = (end_time - start_time).total_seconds() # Build result result = TModuleSyncStats( customers_imported=customer_stats["imported"], customers_updated=customer_stats["updated"], customers_skipped=customer_stats["skipped"], cases_imported=case_stats["imported"], cases_updated=case_stats["updated"], cases_skipped=case_stats["skipped"], times_imported=time_stats["imported"], times_updated=time_stats["updated"], times_skipped=time_stats["skipped"], errors=( customer_stats["errors"] + case_stats["errors"] + time_stats["errors"] ), duration_seconds=duration, started_at=start_time, completed_at=end_time ) # Log completion audit.log_sync_completed( stats=result.model_dump(), user_id=user_id ) logger.info(f"βœ… Full sync completed in {duration:.2f}s") logger.info(f"πŸ“Š Customers: {customer_stats['imported']} new, {customer_stats['updated']} updated") logger.info(f"πŸ“Š Cases: {case_stats['imported']} new, {case_stats['updated']} updated") logger.info(f"πŸ“Š Times: {time_stats['imported']} new, {time_stats['updated']} updated") return result except Exception as e: logger.error(f"❌ Full sync failed: {e}") audit.log_sync_failed(error=str(e), user_id=user_id) raise # Singleton instance vtiger_service = TimeTrackingVTigerService()