bmc_hub/app/timetracking/backend/vtiger_sync.py

940 lines
41 KiB
Python
Raw Normal View History

"""
vTiger Sync Service for Time Tracking Module
=============================================
🚨 KRITISK: Denne service KUN læse data fra vTiger.
Ingen opdateringer, ingen statusændringer, ingen skrivninger.
Formål:
- Hent ModComments (tidsregistreringer) fra vTiger
- Hent HelpDesk/ProjectTask (cases) fra vTiger
- Hent Accounts (kunder) fra vTiger
- Gem alt i tmodule_* tabeller (isoleret)
Safety Flags:
- TIMETRACKING_VTIGER_READ_ONLY = True (default)
- TIMETRACKING_VTIGER_DRY_RUN = True (default)
"""
import logging
import hashlib
import json
import asyncio
from datetime import datetime
from typing import List, Dict, Optional, Any
from decimal import Decimal
import aiohttp
from fastapi import HTTPException
from app.core.config import settings
from app.core.database import execute_query, execute_insert, execute_update, execute_query_single
from app.timetracking.backend.models import TModuleSyncStats
from app.timetracking.backend.audit import audit
logger = logging.getLogger(__name__)
class TimeTrackingVTigerService:
"""
vTiger integration for Time Tracking Module.
🔒 READ-ONLY service - ingen skrivninger til vTiger tilladt.
"""
def __init__(self):
self.base_url = settings.VTIGER_URL
self.username = settings.VTIGER_USERNAME
self.api_key = settings.VTIGER_API_KEY
self.rest_endpoint = f"{self.base_url}/restapi/v1/vtiger/default"
# Safety flags
self.read_only = settings.TIMETRACKING_VTIGER_READ_ONLY
self.dry_run = settings.TIMETRACKING_VTIGER_DRY_RUN
# Log safety status
if self.read_only:
logger.warning("🔒 TIMETRACKING vTiger READ-ONLY mode: Enabled")
if self.dry_run:
logger.warning("🏃 TIMETRACKING vTiger DRY-RUN mode: Enabled")
if not self.read_only:
logger.error("⚠️ WARNING: TIMETRACKING vTiger READ-ONLY disabled! This violates module isolation!")
def _get_auth(self) -> aiohttp.BasicAuth:
"""Get HTTP Basic Auth"""
# Prefer API key over password
auth_value = self.api_key if self.api_key else self.password
return aiohttp.BasicAuth(self.username, auth_value)
def _calculate_hash(self, data: Dict[str, Any]) -> str:
"""Calculate SHA256 hash of data for change detection"""
# Sort keys for consistent hashing
json_str = json.dumps(data, sort_keys=True)
return hashlib.sha256(json_str.encode()).hexdigest()
async def _retrieve(self, record_id: str) -> Dict:
"""
Retrieve full record details via vTiger REST API.
This gets ALL fields including relationships that query doesn't return.
🔍 READ-ONLY operation
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.rest_endpoint}/retrieve",
params={"id": record_id},
auth=self._get_auth(),
timeout=aiohttp.ClientTimeout(total=30)
) as response:
text = await response.text()
if response.status != 200:
logger.error(f"❌ vTiger retrieve failed: HTTP {response.status} for {record_id}")
return {}
try:
data = json.loads(text)
except json.JSONDecodeError:
logger.error(f"❌ Invalid JSON in retrieve response for {record_id}")
return {}
if not data.get('success'):
logger.error(f"❌ vTiger retrieve failed for {record_id}: {data.get('error', {})}")
return {}
return data.get('result', {})
except Exception as e:
logger.error(f"❌ Error retrieving {record_id}: {e}")
return {}
async def _query(self, query_string: str) -> List[Dict]:
"""
Execute SQL-like query against vTiger REST API.
🔍 READ-ONLY operation
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.rest_endpoint}/query",
params={"query": query_string},
auth=self._get_auth(),
timeout=aiohttp.ClientTimeout(total=60)
) as response:
text = await response.text()
if response.status != 200:
logger.error(f"❌ vTiger query failed: HTTP {response.status}")
logger.error(f"Query: {query_string}")
logger.error(f"Response body: {text[:1000]}")
logger.error(f"Response headers: {dict(response.headers)}")
raise HTTPException(
status_code=response.status,
detail=f"vTiger API error: {text[:200]}"
)
# vTiger returns text/json, not application/json
try:
data = json.loads(text)
except json.JSONDecodeError as e:
logger.error(f"❌ Invalid JSON in query response: {text[:200]}")
raise HTTPException(status_code=500, detail="Invalid JSON from vTiger")
# Check vTiger success flag
if not data.get('success'):
error_msg = data.get('error', {}).get('message', 'Unknown error')
logger.error(f"❌ vTiger query failed: {error_msg}")
logger.error(f"Query: {query_string}")
raise HTTPException(status_code=400, detail=f"vTiger error: {error_msg}")
result = data.get('result', [])
logger.info(f"✅ Query returned {len(result)} records")
return result
except aiohttp.ClientError as e:
logger.error(f"❌ vTiger connection error: {e}")
raise HTTPException(status_code=503, detail=f"Cannot connect to vTiger: {str(e)}")
async def _retrieve(self, record_id: str) -> Dict:
"""
Retrieve single record by ID.
🔍 READ-ONLY operation
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.rest_endpoint}/retrieve",
params={"id": record_id},
auth=self._get_auth(),
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"❌ vTiger retrieve failed: {response.status} - {error_text}")
return {}
data = json.loads(await response.text())
if not data.get('success', False):
return {}
return data.get('result', {})
except aiohttp.ClientError as e:
logger.error(f"❌ vTiger retrieve error: {e}")
return {}
async def test_connection(self) -> bool:
"""Test vTiger connection"""
try:
logger.info("🔍 Testing vTiger connection...")
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.rest_endpoint}/me",
auth=self._get_auth(),
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status == 200:
logger.info("✅ vTiger connection successful")
return True
else:
logger.error(f"❌ vTiger connection failed: {response.status}")
return False
except Exception as e:
logger.error(f"❌ vTiger connection error: {e}")
return False
async def update_timelog_billed(self, vtiger_ids: List[str], hub_order_id: int) -> Dict[str, int]:
"""
Update Timelog records in vTiger with Hub ordre ID (markér som faktureret).
🚨 WRITE operation - respekterer safety flags
Args:
vtiger_ids: Liste af vTiger Timelog IDs (format: "43x1234")
hub_order_id: Hub ordre ID til at sætte i billed_via_thehub_id felt
Returns:
Dict with success/error counts
"""
if self.read_only:
logger.warning(f"🔒 READ-ONLY mode: Would update {len(vtiger_ids)} timelogs with Hub order {hub_order_id}")
return {"updated": 0, "errors": 0, "message": "READ-ONLY mode"}
if self.dry_run:
logger.warning(f"🏃 DRY-RUN mode: Would update {len(vtiger_ids)} timelogs with Hub order {hub_order_id}")
return {"updated": 0, "errors": 0, "message": "DRY-RUN mode"}
stats = {"updated": 0, "errors": 0}
logger.info(f"📝 Updating {len(vtiger_ids)} timelogs in vTiger with Hub order {hub_order_id}...")
try:
async with aiohttp.ClientSession() as session:
for vtiger_id in vtiger_ids:
try:
# Build update payload
update_url = f"{self.rest_endpoint}/update"
payload = {
"elementType": "Timelog",
"element": {
"id": vtiger_id,
"billed_via_thehub_id": str(hub_order_id),
"cf_timelog_invoiced": "1" # Marker som faktureret
}
}
async with session.post(
update_url,
json=payload,
auth=self._get_auth(),
timeout=aiohttp.ClientTimeout(total=30)
) as response:
text = await response.text()
if response.status == 200:
data = json.loads(text)
if data.get('success'):
logger.debug(f"✅ Updated timelog {vtiger_id}")
stats["updated"] += 1
else:
error_msg = data.get('error', {}).get('message', 'Unknown error')
logger.error(f"❌ vTiger error for {vtiger_id}: {error_msg}")
stats["errors"] += 1
else:
logger.error(f"❌ HTTP {response.status} for {vtiger_id}: {text[:200]}")
stats["errors"] += 1
except Exception as e:
logger.error(f"❌ Error updating timelog {vtiger_id}: {e}")
stats["errors"] += 1
except Exception as e:
logger.error(f"❌ Batch update error: {e}")
stats["errors"] += len(vtiger_ids)
logger.info(f"✅ vTiger update complete: {stats['updated']} updated, {stats['errors']} errors")
return stats
async def _fetch_user_name(self, user_id: str) -> str:
"""
Fetch user name from vTiger using retrieve API.
Args:
user_id: vTiger user ID (e.g., "19x1")
Returns:
User's full name or user_id if not found
"""
try:
user_data = await self._retrieve(user_id)
if not user_data:
return user_id
# Build full name from first + last, fallback to username
first_name = user_data.get('first_name', '').strip()
last_name = user_data.get('last_name', '').strip()
user_name = user_data.get('user_name', '').strip()
if first_name and last_name:
return f"{first_name} {last_name}"
elif first_name:
return first_name
elif last_name:
return last_name
elif user_name:
return user_name
else:
return user_id
except Exception as e:
logger.debug(f"Could not fetch user {user_id}: {e}")
return user_id
return False
async def sync_customers(self, limit: int = 1000) -> Dict[str, int]:
"""
Sync Accounts (customers) from vTiger to tmodule_customers.
Uses ID-based pagination to fetch all accounts.
Returns: {imported: X, updated: Y, skipped: Z}
"""
logger.info("🔍 Syncing customers from vTiger...")
stats = {"imported": 0, "updated": 0, "skipped": 0, "errors": 0}
try:
# Fetch ALL accounts using pagination (vTiger has 200 record limit)
all_accounts = []
last_id = None
page = 1
while True:
if last_id:
query = f"SELECT * FROM Accounts WHERE id > '{last_id}' ORDER BY id LIMIT 200;"
else:
query = "SELECT * FROM Accounts ORDER BY id LIMIT 200;"
accounts = await self._query(query)
if not accounts:
break
all_accounts.extend(accounts)
last_id = accounts[-1]['id']
logger.info(f"📥 Fetched page {page}: {len(accounts)} accounts (last_id: {last_id})")
# Safety: if we got less than 200, we're done
if len(accounts) < 200:
break
page += 1
logger.info(f"📥 Total fetched: {len(all_accounts)} accounts from vTiger")
for account in all_accounts:
try:
vtiger_id = account.get('id', '')
if not vtiger_id:
logger.warning("⚠️ Skipping account without ID")
stats["skipped"] += 1
continue
# Calculate hash for change detection
data_hash = self._calculate_hash(account)
# Check if exists
existing = execute_query_single(
"SELECT id, sync_hash FROM tmodule_customers WHERE vtiger_id = %s",
(vtiger_id,))
if existing:
# Check if data changed
if existing['sync_hash'] == data_hash:
logger.debug(f"⏭️ No changes for customer {vtiger_id}")
stats["skipped"] += 1
continue
if existing:
# Update existing
execute_update(
"""UPDATE tmodule_customers
SET name = %s, email = %s, economic_customer_number = %s,
vtiger_data = %s::jsonb, sync_hash = %s,
last_synced_at = CURRENT_TIMESTAMP
WHERE vtiger_id = %s""",
(
account.get('accountname', 'Unknown'),
account.get('email1', None),
int(account.get('cf_854')) if account.get('cf_854') else None,
json.dumps(account),
data_hash,
vtiger_id
)
)
logger.debug(f"✏️ Updated customer {vtiger_id}")
stats["updated"] += 1
else:
# Insert new
execute_insert(
"""INSERT INTO tmodule_customers
(vtiger_id, name, email, economic_customer_number,
vtiger_data, sync_hash, last_synced_at)
VALUES (%s, %s, %s, %s, %s::jsonb, %s, CURRENT_TIMESTAMP)""",
(
vtiger_id,
account.get('accountname', 'Unknown'),
account.get('email1', None),
int(account.get('cf_854')) if account.get('cf_854') else None,
json.dumps(account),
data_hash
)
)
logger.debug(f" Imported customer {vtiger_id}")
stats["imported"] += 1
except Exception as e:
logger.error(f"❌ Error processing account {account.get('id', 'unknown')}: {e}")
stats["errors"] += 1
logger.info(f"✅ Customer sync complete: {stats}")
return stats
except Exception as e:
logger.error(f"❌ Customer sync failed: {e}")
raise
async def sync_cases(self, limit: int = 5000, fetch_comments: bool = False) -> Dict[str, int]:
"""
Sync HelpDesk tickets (cases) from vTiger to tmodule_cases.
Args:
limit: Maximum number of cases to sync
fetch_comments: Whether to fetch ModComments for each case (slow - rate limited)
Returns: {imported: X, updated: Y, skipped: Z}
"""
if fetch_comments:
logger.info(f"🔍 Syncing up to {limit} cases from vTiger WITH comments (slow)...")
else:
logger.info(f"🔍 Syncing up to {limit} cases from vTiger WITHOUT comments (fast)...")
stats = {"imported": 0, "updated": 0, "skipped": 0, "errors": 0}
try:
# vTiger API doesn't support OFFSET - use id-based pagination instead
all_tickets = []
last_id = "0x0" # Start from beginning
batch_size = 100 # Conservative batch size to avoid timeouts
max_batches = limit // batch_size + 1
for batch_num in range(max_batches):
# Use id > last_id for pagination (vTiger format: 39x1234)
query = f"SELECT * FROM Cases WHERE id > '{last_id}' ORDER BY id LIMIT {batch_size};"
batch = await self._query(query)
if not batch: # No more records
break
all_tickets.extend(batch)
last_id = batch[-1].get('id', last_id) # Get last ID for next iteration
logger.info(f"📥 Fetched {len(batch)} cases (total: {len(all_tickets)}, last_id: {last_id})")
if len(batch) < batch_size: # Last batch
break
if len(all_tickets) >= limit: # Reached limit
break
tickets = all_tickets[:limit] # Trim to requested limit
logger.info(f"✅ Total fetched: {len(tickets)} HelpDesk tickets from vTiger")
for ticket in tickets:
try:
vtiger_id = ticket.get('id', '')
if not vtiger_id:
stats["skipped"] += 1
continue
# Get related account (customer)
account_id = ticket.get('parent_id', '')
if not account_id:
logger.warning(f"⚠️ HelpDesk {vtiger_id} has no parent account")
stats["skipped"] += 1
continue
# Find customer in our DB
customer = execute_query_single(
"SELECT id FROM tmodule_customers WHERE vtiger_id = %s",
(account_id,))
if not customer:
logger.warning(f"⚠️ Customer {account_id} not found - sync customers first")
stats["skipped"] += 1
continue
customer_id = customer['id']
# Fetch internal comments for this case (with rate limiting) - ONLY if enabled
internal_comments = []
if fetch_comments:
internal_comments = await self._get_case_comments(vtiger_id)
# Small delay to avoid rate limiting (vTiger allows ~2-3 requests/sec)
await asyncio.sleep(0.4) # 400ms between comment fetches
# Merge comments into ticket data before storing
ticket_with_comments = ticket.copy()
if internal_comments:
ticket_with_comments['internal_comments'] = internal_comments
# Calculate hash AFTER adding comments (so changes to comments trigger update)
data_hash = self._calculate_hash(ticket_with_comments)
# Check if exists
existing = execute_query_single(
"SELECT id, sync_hash FROM tmodule_cases WHERE vtiger_id = %s",
(vtiger_id,))
if existing:
if existing['sync_hash'] == data_hash:
stats["skipped"] += 1
continue
# Update
execute_update(
"""UPDATE tmodule_cases
SET customer_id = %s, title = %s, description = %s,
status = %s, priority = %s, module_type = %s,
vtiger_data = %s::jsonb, sync_hash = %s,
last_synced_at = CURRENT_TIMESTAMP
WHERE vtiger_id = %s""",
(
customer_id,
ticket.get('title') or ticket.get('ticket_title', 'No Title'),
ticket.get('description', None),
ticket.get('ticketstatus', None),
ticket.get('ticketpriorities', None),
'HelpDesk',
json.dumps(ticket_with_comments),
data_hash,
vtiger_id
)
)
stats["updated"] += 1
else:
# Insert
case_id = execute_insert(
"""INSERT INTO tmodule_cases
(vtiger_id, customer_id, title, description, status,
priority, module_type, vtiger_data, sync_hash, last_synced_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, CURRENT_TIMESTAMP)""",
(
vtiger_id,
customer_id,
ticket.get('title') or ticket.get('ticket_title', 'No Title'),
ticket.get('description', None),
ticket.get('ticketstatus', None),
ticket.get('ticketpriorities', None),
'HelpDesk',
json.dumps(ticket_with_comments),
data_hash
)
)
stats["imported"] += 1
except Exception as e:
logger.error(f"❌ Error processing case {ticket.get('id', 'unknown')}: {e}")
stats["errors"] += 1
logger.info(f"✅ Case sync complete: {stats}")
return stats
except Exception as e:
logger.error(f"❌ Case sync failed: {e}")
raise
async def _get_case_comments(self, case_id: str) -> List[Dict]:
"""
Fetch all ModComments (internal comments) for a specific case from vTiger.
Args:
case_id: vTiger case ID (format: "32x1234")
Returns:
List of comment dicts with structure: {text, author, date, created_at}
Sorted by creation date (newest first)
"""
try:
# Query ModComments where related_to = case_id
query = f"SELECT * FROM ModComments WHERE related_to = '{case_id}' ORDER BY createdtime DESC;"
comments = await self._query(query)
if not comments:
return []
# Transform vTiger format to internal format
formatted_comments = []
for comment in comments:
formatted_comments.append({
"text": comment.get("commentcontent", ""),
"author": comment.get("assigned_user_id", "Unknown"), # Will be user ID - could enhance with name lookup
"date": comment.get("createdtime", "")[:10], # Format: YYYY-MM-DD from YYYY-MM-DD HH:MM:SS
"created_at": comment.get("createdtime", "")
})
logger.info(f"📝 Fetched {len(formatted_comments)} comments for case {case_id}")
return formatted_comments
except HTTPException as e:
# Rate limit or API error - log but don't fail sync
if "429" in str(e.detail) or "TOO_MANY_REQUESTS" in str(e.detail):
logger.warning(f"⚠️ Rate limited fetching comments for case {case_id} - skipping")
else:
logger.error(f"❌ API error fetching comments for case {case_id}: {e.detail}")
return []
except Exception as e:
logger.error(f"❌ Failed to fetch comments for case {case_id}: {e}")
return [] # Return empty list on error - don't fail entire sync
async def sync_case_comments(self, case_vtiger_id: str) -> Dict[str, Any]:
"""
Sync comments for a specific case (for on-demand updates).
Args:
case_vtiger_id: vTiger case ID (format: "39x1234")
Returns:
Dict with success status and comment count
"""
try:
# Fetch comments
comments = await self._get_case_comments(case_vtiger_id)
if not comments:
return {"success": True, "comments": 0, "message": "No comments found"}
# Update case in database
execute_update(
"""UPDATE tmodule_cases
SET vtiger_data = jsonb_set(
COALESCE(vtiger_data, '{}'::jsonb),
'{internal_comments}',
%s::jsonb
),
last_synced_at = CURRENT_TIMESTAMP
WHERE vtiger_id = %s""",
(json.dumps(comments), case_vtiger_id)
)
logger.info(f"✅ Synced {len(comments)} comments for case {case_vtiger_id}")
return {"success": True, "comments": len(comments), "message": f"Synced {len(comments)} comments"}
except Exception as e:
logger.error(f"❌ Failed to sync comments for case {case_vtiger_id}: {e}")
return {"success": False, "comments": 0, "error": str(e)}
async def sync_time_entries(self, limit: int = 3000) -> Dict[str, int]:
"""
Sync time entries from vTiger Timelog module to tmodule_times.
vTiger's Timelog module contains detailed time entries with:
- timelognumber: Unique ID (TL1234)
- duration: Time in seconds
- relatedto: Reference to Case/Account
- is_billable: '1' = yes, '0' = no
- cf_timelog_invoiced: '1' = has been invoiced
We only sync entries where:
- relatedto is not empty (linked to a Case or Account)
- Has valid duration > 0
NOTE: is_billable and cf_timelog_invoiced fields are not reliably populated in vTiger,
so we sync all timelogs and let the approval workflow decide what to bill.
"""
logger.info(f"🔍 Syncing all timelogs from vTiger with valid relatedto...")
stats = {"imported": 0, "updated": 0, "skipped": 0, "errors": 0}
try:
# Cache for user names (avoid fetching same user multiple times)
user_name_cache = {}
# vTiger API doesn't support OFFSET - use id-based pagination instead
all_timelogs = []
last_id = "0x0" # Start from beginning
batch_size = 100 # Conservative batch size
max_batches = limit // batch_size + 1
for batch_num in range(max_batches):
# Use id > last_id for pagination (vTiger format: 43x1234)
# NOTE: vTiger query API ignores WHERE on custom fields, so we fetch all and filter later
query = f"SELECT * FROM Timelog WHERE id > '{last_id}' ORDER BY id LIMIT {batch_size};"
batch = await self._query(query)
if not batch: # No more records
break
all_timelogs.extend(batch)
last_id = batch[-1].get('id', last_id) # Get last ID for next iteration
logger.info(f"📥 Fetched {len(batch)} timelogs (total: {len(all_timelogs)}, last_id: {last_id})")
if len(batch) < batch_size: # Last batch
break
if len(all_timelogs) >= limit: # Reached limit
break
logger.info(f"✅ Total fetched: {len(all_timelogs)} Timelog entries from vTiger")
# We don't filter here - the existing code already filters by:
# 1. duration > 0
# 2. relatedto not empty
# These filters happen in the processing loop below
timelogs = all_timelogs[:limit] # Trim to requested limit
logger.info(f"📊 Processing {len(timelogs)} timelogs...")
# NOTE: retrieve API is too slow for batch operations (1500+ individual calls)
# We'll work with query data and accept that relatedto might be empty for some
for timelog in timelogs:
try:
vtiger_id = timelog.get('id', '')
if not vtiger_id:
stats["skipped"] += 1
continue
# Get duration in hours (stored as seconds in vTiger)
duration_seconds = float(timelog.get('duration', 0) or 0)
if duration_seconds <= 0:
logger.debug(f"⏭️ Skipping timelog {vtiger_id} - no duration")
stats["skipped"] += 1
continue
hours = Decimal(str(duration_seconds / 3600.0)) # Convert seconds to hours
# Get related entity (Case or Account)
related_to = timelog.get('relatedto', '')
case_id = None
customer_id = None
if related_to:
# Try to find case first, then account
case = execute_query_single(
"SELECT id, customer_id FROM tmodule_cases WHERE vtiger_id = %s",
(related_to,))
if case:
case_id = case['id']
customer_id = case['customer_id']
else:
# Try to find customer directly
customer = execute_query_single(
"SELECT id FROM tmodule_customers WHERE vtiger_id = %s",
(related_to,))
if customer:
customer_id = customer['id']
case_id = None # No specific case, just customer
else:
logger.debug(f"⏭️ Related entity {related_to} not found in our database - will skip")
stats["skipped"] += 1
continue
# If no customer found at all, skip this timelog
if not customer_id:
logger.warning(f"⚠️ Timelog {vtiger_id} has no valid customer reference - skipping")
stats["skipped"] += 1
continue
# Get user name with caching
assigned_user_id = timelog.get('assigned_user_id', '')
if assigned_user_id and assigned_user_id not in user_name_cache:
user_name_cache[assigned_user_id] = await self._fetch_user_name(assigned_user_id)
user_name = user_name_cache.get(assigned_user_id, assigned_user_id)
data_hash = self._calculate_hash(timelog)
# Check if exists
existing = execute_query_single(
"SELECT id, sync_hash FROM tmodule_times WHERE vtiger_id = %s",
(vtiger_id,))
if existing:
if existing['sync_hash'] == data_hash:
stats["skipped"] += 1
continue
# Update only if NOT yet approved AND NOT yet billed
result = execute_update(
"""UPDATE tmodule_times
SET description = %s, original_hours = %s, worked_date = %s,
user_name = %s, billable = %s, vtiger_data = %s::jsonb,
sync_hash = %s, last_synced_at = CURRENT_TIMESTAMP
WHERE vtiger_id = %s
AND status = 'pending'
AND billed_via_thehub_id IS NULL""",
(
timelog.get('name', ''),
hours,
timelog.get('startedon', None),
user_name,
timelog.get('isbillable', '0') == '1',
json.dumps(timelog),
data_hash,
vtiger_id
)
)
if result > 0:
stats["updated"] += 1
else:
logger.debug(f"⏭️ Time entry {vtiger_id} already approved or billed")
stats["skipped"] += 1
else:
# Insert new
execute_insert(
"""INSERT INTO tmodule_times
(vtiger_id, case_id, customer_id, description, original_hours,
worked_date, user_name, billable, vtiger_data, sync_hash,
status, last_synced_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, 'pending', CURRENT_TIMESTAMP)""",
(
vtiger_id,
case_id,
customer_id,
timelog.get('name', ''),
hours,
timelog.get('startedon', None),
user_name,
timelog.get('isbillable', '0') == '1',
json.dumps(timelog),
data_hash
)
)
stats["imported"] += 1
except Exception as e:
logger.error(f"❌ Error processing timelog {timelog.get('id', 'unknown')}: {e}")
stats["errors"] += 1
logger.info(f"✅ Time entry sync complete: {stats}")
return stats
except Exception as e:
logger.error(f"❌ Time entry sync failed: {e}")
raise
async def full_sync(
self,
user_id: Optional[int] = None,
fetch_comments: bool = False
) -> TModuleSyncStats:
"""
Perform full sync of all data from vTiger.
Order: Customers -> Cases -> Time Entries (dependencies)
Args:
user_id: User performing the sync
fetch_comments: Whether to fetch ModComments (slow - adds ~0.4s per case)
"""
if fetch_comments:
logger.info("🚀 Starting FULL vTiger sync WITH comments (this will be slow)...")
else:
logger.info("🚀 Starting FULL vTiger sync WITHOUT comments (fast mode)...")
start_time = datetime.now()
# Log sync started
audit.log_sync_started(user_id=user_id)
try:
# Test connection first
if not await self.test_connection():
raise HTTPException(
status_code=503,
detail="Cannot connect to vTiger - check credentials"
)
# Sync in order of dependencies
customer_stats = await self.sync_customers()
case_stats = await self.sync_cases(fetch_comments=fetch_comments)
time_stats = await self.sync_time_entries()
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
# Build result
result = TModuleSyncStats(
customers_imported=customer_stats["imported"],
customers_updated=customer_stats["updated"],
customers_skipped=customer_stats["skipped"],
cases_imported=case_stats["imported"],
cases_updated=case_stats["updated"],
cases_skipped=case_stats["skipped"],
times_imported=time_stats["imported"],
times_updated=time_stats["updated"],
times_skipped=time_stats["skipped"],
errors=(
customer_stats["errors"] +
case_stats["errors"] +
time_stats["errors"]
),
duration_seconds=duration,
started_at=start_time,
completed_at=end_time
)
# Log completion
audit.log_sync_completed(
stats=result.model_dump(),
user_id=user_id
)
logger.info(f"✅ Full sync completed in {duration:.2f}s")
logger.info(f"📊 Customers: {customer_stats['imported']} new, {customer_stats['updated']} updated")
logger.info(f"📊 Cases: {case_stats['imported']} new, {case_stats['updated']} updated")
logger.info(f"📊 Times: {time_stats['imported']} new, {time_stats['updated']} updated")
return result
except Exception as e:
logger.error(f"❌ Full sync failed: {e}")
audit.log_sync_failed(error=str(e), user_id=user_id)
raise
# Singleton instance
vtiger_service = TimeTrackingVTigerService()