Når ordre eksporteres til e-conomic opdateres vTiger Timelog med: - billed_via_thehub_id: Hub ordre ID (f.eks. 5) - cf_timelog_invoiced: '1' (markér som faktureret) Dette sikrer at timelogs i vTiger bliver markeret som fakturerede og kan filtreres/rapporteres korrekt i vTiger.
938 lines
41 KiB
Python
938 lines
41 KiB
Python
"""
|
||
vTiger Sync Service for Time Tracking Module
|
||
=============================================
|
||
|
||
🚨 KRITISK: Denne service må KUN læse data fra vTiger.
|
||
Ingen opdateringer, ingen statusændringer, ingen skrivninger.
|
||
|
||
Formål:
|
||
- Hent ModComments (tidsregistreringer) fra vTiger
|
||
- Hent HelpDesk/ProjectTask (cases) fra vTiger
|
||
- Hent Accounts (kunder) fra vTiger
|
||
- Gem alt i tmodule_* tabeller (isoleret)
|
||
|
||
Safety Flags:
|
||
- TIMETRACKING_VTIGER_READ_ONLY = True (default)
|
||
- TIMETRACKING_VTIGER_DRY_RUN = True (default)
|
||
"""
|
||
|
||
import logging
|
||
import hashlib
|
||
import json
|
||
import asyncio
|
||
from datetime import datetime
|
||
from typing import List, Dict, Optional, Any
|
||
from decimal import Decimal
|
||
|
||
import aiohttp
|
||
from fastapi import HTTPException
|
||
|
||
from app.core.config import settings
|
||
from app.core.database import execute_query, execute_insert, execute_update, execute_query_single
|
||
from app.timetracking.backend.models import TModuleSyncStats
|
||
from app.timetracking.backend.audit import audit
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class TimeTrackingVTigerService:
|
||
"""
|
||
vTiger integration for Time Tracking Module.
|
||
|
||
🔒 READ-ONLY service - ingen skrivninger til vTiger tilladt.
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.base_url = settings.VTIGER_URL
|
||
self.username = settings.VTIGER_USERNAME
|
||
self.api_key = settings.VTIGER_API_KEY
|
||
self.rest_endpoint = f"{self.base_url}/restapi/v1/vtiger/default"
|
||
|
||
# Safety flags
|
||
self.read_only = settings.TIMETRACKING_VTIGER_READ_ONLY
|
||
self.dry_run = settings.TIMETRACKING_VTIGER_DRY_RUN
|
||
|
||
# Log safety status
|
||
if self.read_only:
|
||
logger.warning("🔒 TIMETRACKING vTiger READ-ONLY mode: Enabled")
|
||
if self.dry_run:
|
||
logger.warning("🏃 TIMETRACKING vTiger DRY-RUN mode: Enabled")
|
||
|
||
if not self.read_only:
|
||
logger.error("⚠️ WARNING: TIMETRACKING vTiger READ-ONLY disabled! This violates module isolation!")
|
||
|
||
def _get_auth(self) -> aiohttp.BasicAuth:
|
||
"""Get HTTP Basic Auth"""
|
||
# Prefer API key over password
|
||
auth_value = self.api_key if self.api_key else self.password
|
||
return aiohttp.BasicAuth(self.username, auth_value)
|
||
|
||
def _calculate_hash(self, data: Dict[str, Any]) -> str:
|
||
"""Calculate SHA256 hash of data for change detection"""
|
||
# Sort keys for consistent hashing
|
||
json_str = json.dumps(data, sort_keys=True)
|
||
return hashlib.sha256(json_str.encode()).hexdigest()
|
||
|
||
async def _retrieve(self, record_id: str) -> Dict:
|
||
"""
|
||
Retrieve full record details via vTiger REST API.
|
||
This gets ALL fields including relationships that query doesn't return.
|
||
|
||
🔍 READ-ONLY operation
|
||
"""
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(
|
||
f"{self.rest_endpoint}/retrieve",
|
||
params={"id": record_id},
|
||
auth=self._get_auth(),
|
||
timeout=aiohttp.ClientTimeout(total=30)
|
||
) as response:
|
||
text = await response.text()
|
||
|
||
if response.status != 200:
|
||
logger.error(f"❌ vTiger retrieve failed: HTTP {response.status} for {record_id}")
|
||
return {}
|
||
|
||
try:
|
||
data = json.loads(text)
|
||
except json.JSONDecodeError:
|
||
logger.error(f"❌ Invalid JSON in retrieve response for {record_id}")
|
||
return {}
|
||
|
||
if not data.get('success'):
|
||
logger.error(f"❌ vTiger retrieve failed for {record_id}: {data.get('error', {})}")
|
||
return {}
|
||
|
||
return data.get('result', {})
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error retrieving {record_id}: {e}")
|
||
return {}
|
||
|
||
async def _query(self, query_string: str) -> List[Dict]:
|
||
"""
|
||
Execute SQL-like query against vTiger REST API.
|
||
|
||
🔍 READ-ONLY operation
|
||
"""
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(
|
||
f"{self.rest_endpoint}/query",
|
||
params={"query": query_string},
|
||
auth=self._get_auth(),
|
||
timeout=aiohttp.ClientTimeout(total=60)
|
||
) as response:
|
||
text = await response.text()
|
||
|
||
if response.status != 200:
|
||
logger.error(f"❌ vTiger query failed: HTTP {response.status}")
|
||
logger.error(f"Query: {query_string}")
|
||
logger.error(f"Response body: {text[:1000]}")
|
||
logger.error(f"Response headers: {dict(response.headers)}")
|
||
raise HTTPException(
|
||
status_code=response.status,
|
||
detail=f"vTiger API error: {text[:200]}"
|
||
)
|
||
|
||
# vTiger returns text/json, not application/json
|
||
try:
|
||
data = json.loads(text)
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"❌ Invalid JSON in query response: {text[:200]}")
|
||
raise HTTPException(status_code=500, detail="Invalid JSON from vTiger")
|
||
|
||
# Check vTiger success flag
|
||
if not data.get('success'):
|
||
error_msg = data.get('error', {}).get('message', 'Unknown error')
|
||
logger.error(f"❌ vTiger query failed: {error_msg}")
|
||
logger.error(f"Query: {query_string}")
|
||
raise HTTPException(status_code=400, detail=f"vTiger error: {error_msg}")
|
||
|
||
result = data.get('result', [])
|
||
logger.info(f"✅ Query returned {len(result)} records")
|
||
return result
|
||
|
||
except aiohttp.ClientError as e:
|
||
logger.error(f"❌ vTiger connection error: {e}")
|
||
raise HTTPException(status_code=503, detail=f"Cannot connect to vTiger: {str(e)}")
|
||
|
||
async def _retrieve(self, record_id: str) -> Dict:
|
||
"""
|
||
Retrieve single record by ID.
|
||
|
||
🔍 READ-ONLY operation
|
||
"""
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(
|
||
f"{self.rest_endpoint}/retrieve",
|
||
params={"id": record_id},
|
||
auth=self._get_auth(),
|
||
timeout=aiohttp.ClientTimeout(total=30)
|
||
) as response:
|
||
if response.status != 200:
|
||
error_text = await response.text()
|
||
logger.error(f"❌ vTiger retrieve failed: {response.status} - {error_text}")
|
||
return {}
|
||
|
||
data = json.loads(await response.text())
|
||
|
||
if not data.get('success', False):
|
||
return {}
|
||
|
||
return data.get('result', {})
|
||
|
||
except aiohttp.ClientError as e:
|
||
logger.error(f"❌ vTiger retrieve error: {e}")
|
||
return {}
|
||
|
||
async def test_connection(self) -> bool:
|
||
"""Test vTiger connection"""
|
||
try:
|
||
logger.info("🔍 Testing vTiger connection...")
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(
|
||
f"{self.rest_endpoint}/me",
|
||
auth=self._get_auth(),
|
||
timeout=aiohttp.ClientTimeout(total=10)
|
||
) as response:
|
||
if response.status == 200:
|
||
logger.info("✅ vTiger connection successful")
|
||
return True
|
||
else:
|
||
logger.error(f"❌ vTiger connection failed: {response.status}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ vTiger connection error: {e}")
|
||
return False
|
||
|
||
async def update_timelog_billed(self, vtiger_ids: List[str], hub_order_id: int) -> Dict[str, int]:
|
||
"""
|
||
Update Timelog records in vTiger with Hub ordre ID (markér som faktureret).
|
||
|
||
🚨 WRITE operation - respekterer safety flags
|
||
|
||
Args:
|
||
vtiger_ids: Liste af vTiger Timelog IDs (format: "43x1234")
|
||
hub_order_id: Hub ordre ID til at sætte i billed_via_thehub_id felt
|
||
|
||
Returns:
|
||
Dict with success/error counts
|
||
"""
|
||
if self.read_only:
|
||
logger.warning(f"🔒 READ-ONLY mode: Would update {len(vtiger_ids)} timelogs with Hub order {hub_order_id}")
|
||
return {"updated": 0, "errors": 0, "message": "READ-ONLY mode"}
|
||
|
||
if self.dry_run:
|
||
logger.warning(f"🏃 DRY-RUN mode: Would update {len(vtiger_ids)} timelogs with Hub order {hub_order_id}")
|
||
return {"updated": 0, "errors": 0, "message": "DRY-RUN mode"}
|
||
|
||
stats = {"updated": 0, "errors": 0}
|
||
|
||
logger.info(f"📝 Updating {len(vtiger_ids)} timelogs in vTiger with Hub order {hub_order_id}...")
|
||
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
for vtiger_id in vtiger_ids:
|
||
try:
|
||
# Build update payload
|
||
update_url = f"{self.rest_endpoint}/update"
|
||
payload = {
|
||
"elementType": "Timelog",
|
||
"element": {
|
||
"id": vtiger_id,
|
||
"billed_via_thehub_id": str(hub_order_id),
|
||
"cf_timelog_invoiced": "1" # Marker som faktureret
|
||
}
|
||
}
|
||
|
||
async with session.post(
|
||
update_url,
|
||
json=payload,
|
||
auth=self._get_auth(),
|
||
timeout=aiohttp.ClientTimeout(total=30)
|
||
) as response:
|
||
text = await response.text()
|
||
|
||
if response.status == 200:
|
||
data = json.loads(text)
|
||
if data.get('success'):
|
||
logger.debug(f"✅ Updated timelog {vtiger_id}")
|
||
stats["updated"] += 1
|
||
else:
|
||
error_msg = data.get('error', {}).get('message', 'Unknown error')
|
||
logger.error(f"❌ vTiger error for {vtiger_id}: {error_msg}")
|
||
stats["errors"] += 1
|
||
else:
|
||
logger.error(f"❌ HTTP {response.status} for {vtiger_id}: {text[:200]}")
|
||
stats["errors"] += 1
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error updating timelog {vtiger_id}: {e}")
|
||
stats["errors"] += 1
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Batch update error: {e}")
|
||
stats["errors"] += len(vtiger_ids)
|
||
|
||
logger.info(f"✅ vTiger update complete: {stats['updated']} updated, {stats['errors']} errors")
|
||
return stats
|
||
|
||
async def _fetch_user_name(self, user_id: str) -> str:
|
||
"""
|
||
Fetch user name from vTiger using retrieve API.
|
||
|
||
Args:
|
||
user_id: vTiger user ID (e.g., "19x1")
|
||
|
||
Returns:
|
||
User's full name or user_id if not found
|
||
"""
|
||
try:
|
||
user_data = await self._retrieve(user_id)
|
||
|
||
if not user_data:
|
||
return user_id
|
||
|
||
# Build full name from first + last, fallback to username
|
||
first_name = user_data.get('first_name', '').strip()
|
||
last_name = user_data.get('last_name', '').strip()
|
||
user_name = user_data.get('user_name', '').strip()
|
||
|
||
if first_name and last_name:
|
||
return f"{first_name} {last_name}"
|
||
elif first_name:
|
||
return first_name
|
||
elif last_name:
|
||
return last_name
|
||
elif user_name:
|
||
return user_name
|
||
else:
|
||
return user_id
|
||
|
||
except Exception as e:
|
||
logger.debug(f"Could not fetch user {user_id}: {e}")
|
||
return user_id
|
||
return False
|
||
|
||
async def sync_customers(self, limit: int = 1000) -> Dict[str, int]:
|
||
"""
|
||
Sync Accounts (customers) from vTiger to tmodule_customers.
|
||
|
||
Uses ID-based pagination to fetch all accounts.
|
||
|
||
Returns: {imported: X, updated: Y, skipped: Z}
|
||
"""
|
||
logger.info("🔍 Syncing customers from vTiger...")
|
||
|
||
stats = {"imported": 0, "updated": 0, "skipped": 0, "errors": 0}
|
||
|
||
try:
|
||
# Fetch ALL accounts using pagination (vTiger has 200 record limit)
|
||
all_accounts = []
|
||
last_id = None
|
||
page = 1
|
||
|
||
while True:
|
||
if last_id:
|
||
query = f"SELECT * FROM Accounts WHERE id > '{last_id}' ORDER BY id LIMIT 200;"
|
||
else:
|
||
query = "SELECT * FROM Accounts ORDER BY id LIMIT 200;"
|
||
|
||
accounts = await self._query(query)
|
||
|
||
if not accounts:
|
||
break
|
||
|
||
all_accounts.extend(accounts)
|
||
last_id = accounts[-1]['id']
|
||
|
||
logger.info(f"📥 Fetched page {page}: {len(accounts)} accounts (last_id: {last_id})")
|
||
|
||
# Safety: if we got less than 200, we're done
|
||
if len(accounts) < 200:
|
||
break
|
||
|
||
page += 1
|
||
|
||
logger.info(f"📥 Total fetched: {len(all_accounts)} accounts from vTiger")
|
||
|
||
for account in all_accounts:
|
||
try:
|
||
vtiger_id = account.get('id', '')
|
||
if not vtiger_id:
|
||
logger.warning("⚠️ Skipping account without ID")
|
||
stats["skipped"] += 1
|
||
continue
|
||
|
||
# Calculate hash for change detection
|
||
data_hash = self._calculate_hash(account)
|
||
|
||
# Check if exists
|
||
existing = execute_query_single(
|
||
"SELECT id, sync_hash FROM tmodule_customers WHERE vtiger_id = %s",
|
||
(vtiger_id,))
|
||
|
||
if existing:
|
||
# Check if data changed
|
||
if existing['sync_hash'] == data_hash:
|
||
logger.debug(f"⏭️ No changes for customer {vtiger_id}")
|
||
stats["skipped"] += 1
|
||
continue
|
||
if existing:
|
||
# Update existing
|
||
execute_update(
|
||
"""UPDATE tmodule_customers
|
||
SET name = %s, email = %s, economic_customer_number = %s,
|
||
vtiger_data = %s::jsonb, sync_hash = %s,
|
||
last_synced_at = CURRENT_TIMESTAMP
|
||
WHERE vtiger_id = %s""",
|
||
(
|
||
account.get('accountname', 'Unknown'),
|
||
account.get('email1', None),
|
||
int(account.get('cf_854')) if account.get('cf_854') else None,
|
||
json.dumps(account),
|
||
data_hash,
|
||
vtiger_id
|
||
)
|
||
)
|
||
logger.debug(f"✏️ Updated customer {vtiger_id}")
|
||
stats["updated"] += 1
|
||
else:
|
||
# Insert new
|
||
execute_insert(
|
||
"""INSERT INTO tmodule_customers
|
||
(vtiger_id, name, email, economic_customer_number,
|
||
vtiger_data, sync_hash, last_synced_at)
|
||
VALUES (%s, %s, %s, %s, %s::jsonb, %s, CURRENT_TIMESTAMP)""",
|
||
(
|
||
vtiger_id,
|
||
account.get('accountname', 'Unknown'),
|
||
account.get('email1', None),
|
||
int(account.get('cf_854')) if account.get('cf_854') else None,
|
||
json.dumps(account),
|
||
data_hash
|
||
)
|
||
)
|
||
logger.debug(f"➕ Imported customer {vtiger_id}")
|
||
stats["imported"] += 1
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error processing account {account.get('id', 'unknown')}: {e}")
|
||
stats["errors"] += 1
|
||
|
||
logger.info(f"✅ Customer sync complete: {stats}")
|
||
return stats
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Customer sync failed: {e}")
|
||
raise
|
||
|
||
async def sync_cases(self, limit: int = 5000, fetch_comments: bool = False) -> Dict[str, int]:
|
||
"""
|
||
Sync HelpDesk tickets (cases) from vTiger to tmodule_cases.
|
||
|
||
Args:
|
||
limit: Maximum number of cases to sync
|
||
fetch_comments: Whether to fetch ModComments for each case (slow - rate limited)
|
||
|
||
Returns: {imported: X, updated: Y, skipped: Z}
|
||
"""
|
||
if fetch_comments:
|
||
logger.info(f"🔍 Syncing up to {limit} cases from vTiger WITH comments (slow)...")
|
||
else:
|
||
logger.info(f"🔍 Syncing up to {limit} cases from vTiger WITHOUT comments (fast)...")
|
||
|
||
stats = {"imported": 0, "updated": 0, "skipped": 0, "errors": 0}
|
||
|
||
try:
|
||
# vTiger API doesn't support OFFSET - use id-based pagination instead
|
||
all_tickets = []
|
||
last_id = "0x0" # Start from beginning
|
||
batch_size = 100 # Conservative batch size to avoid timeouts
|
||
max_batches = limit // batch_size + 1
|
||
|
||
for batch_num in range(max_batches):
|
||
# Use id > last_id for pagination (vTiger format: 39x1234)
|
||
query = f"SELECT * FROM Cases WHERE id > '{last_id}' ORDER BY id LIMIT {batch_size};"
|
||
batch = await self._query(query)
|
||
|
||
if not batch: # No more records
|
||
break
|
||
|
||
all_tickets.extend(batch)
|
||
last_id = batch[-1].get('id', last_id) # Get last ID for next iteration
|
||
|
||
logger.info(f"📥 Fetched {len(batch)} cases (total: {len(all_tickets)}, last_id: {last_id})")
|
||
|
||
if len(batch) < batch_size: # Last batch
|
||
break
|
||
|
||
if len(all_tickets) >= limit: # Reached limit
|
||
break
|
||
|
||
tickets = all_tickets[:limit] # Trim to requested limit
|
||
logger.info(f"✅ Total fetched: {len(tickets)} HelpDesk tickets from vTiger")
|
||
|
||
for ticket in tickets:
|
||
try:
|
||
vtiger_id = ticket.get('id', '')
|
||
if not vtiger_id:
|
||
stats["skipped"] += 1
|
||
continue
|
||
|
||
# Get related account (customer)
|
||
account_id = ticket.get('parent_id', '')
|
||
if not account_id:
|
||
logger.warning(f"⚠️ HelpDesk {vtiger_id} has no parent account")
|
||
stats["skipped"] += 1
|
||
continue
|
||
|
||
# Find customer in our DB
|
||
customer = execute_query_single(
|
||
"SELECT id FROM tmodule_customers WHERE vtiger_id = %s",
|
||
(account_id,))
|
||
|
||
if not customer:
|
||
logger.warning(f"⚠️ Customer {account_id} not found - sync customers first")
|
||
stats["skipped"] += 1
|
||
continue
|
||
|
||
customer_id = customer['id']
|
||
|
||
# Fetch internal comments for this case (with rate limiting) - ONLY if enabled
|
||
internal_comments = []
|
||
if fetch_comments:
|
||
internal_comments = await self._get_case_comments(vtiger_id)
|
||
# Small delay to avoid rate limiting (vTiger allows ~2-3 requests/sec)
|
||
await asyncio.sleep(0.4) # 400ms between comment fetches
|
||
|
||
# Merge comments into ticket data before storing
|
||
ticket_with_comments = ticket.copy()
|
||
if internal_comments:
|
||
ticket_with_comments['internal_comments'] = internal_comments
|
||
|
||
# Calculate hash AFTER adding comments (so changes to comments trigger update)
|
||
data_hash = self._calculate_hash(ticket_with_comments)
|
||
|
||
# Check if exists
|
||
existing = execute_query_single(
|
||
"SELECT id, sync_hash FROM tmodule_cases WHERE vtiger_id = %s",
|
||
(vtiger_id,))
|
||
|
||
if existing:
|
||
if existing['sync_hash'] == data_hash:
|
||
stats["skipped"] += 1
|
||
continue
|
||
|
||
# Update
|
||
execute_update(
|
||
"""UPDATE tmodule_cases
|
||
SET customer_id = %s, title = %s, description = %s,
|
||
status = %s, priority = %s, module_type = %s,
|
||
vtiger_data = %s::jsonb, sync_hash = %s,
|
||
last_synced_at = CURRENT_TIMESTAMP
|
||
WHERE vtiger_id = %s""",
|
||
(
|
||
customer_id,
|
||
ticket.get('ticket_title', 'No Title'),
|
||
ticket.get('description', None),
|
||
ticket.get('ticketstatus', None),
|
||
ticket.get('ticketpriorities', None),
|
||
'HelpDesk',
|
||
json.dumps(ticket_with_comments),
|
||
data_hash,
|
||
vtiger_id
|
||
)
|
||
)
|
||
stats["updated"] += 1
|
||
else:
|
||
# Insert
|
||
case_id = execute_insert(
|
||
"""INSERT INTO tmodule_cases
|
||
(vtiger_id, customer_id, title, description, status,
|
||
priority, module_type, vtiger_data, sync_hash, last_synced_at)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, CURRENT_TIMESTAMP)""",
|
||
(
|
||
vtiger_id,
|
||
customer_id,
|
||
ticket.get('ticket_title', 'No Title'),
|
||
ticket.get('description', None),
|
||
ticket.get('ticketstatus', None),
|
||
ticket.get('ticketpriorities', None),
|
||
'HelpDesk',
|
||
json.dumps(ticket_with_comments),
|
||
data_hash
|
||
)
|
||
)
|
||
stats["imported"] += 1
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error processing case {ticket.get('id', 'unknown')}: {e}")
|
||
stats["errors"] += 1
|
||
|
||
logger.info(f"✅ Case sync complete: {stats}")
|
||
return stats
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Case sync failed: {e}")
|
||
raise
|
||
|
||
async def _get_case_comments(self, case_id: str) -> List[Dict]:
|
||
"""
|
||
Fetch all ModComments (internal comments) for a specific case from vTiger.
|
||
|
||
Args:
|
||
case_id: vTiger case ID (format: "32x1234")
|
||
|
||
Returns:
|
||
List of comment dicts with structure: {text, author, date, created_at}
|
||
Sorted by creation date (newest first)
|
||
"""
|
||
try:
|
||
# Query ModComments where related_to = case_id
|
||
query = f"SELECT * FROM ModComments WHERE related_to = '{case_id}' ORDER BY createdtime DESC;"
|
||
comments = await self._query(query)
|
||
|
||
if not comments:
|
||
return []
|
||
|
||
# Transform vTiger format to internal format
|
||
formatted_comments = []
|
||
for comment in comments:
|
||
formatted_comments.append({
|
||
"text": comment.get("commentcontent", ""),
|
||
"author": comment.get("assigned_user_id", "Unknown"), # Will be user ID - could enhance with name lookup
|
||
"date": comment.get("createdtime", "")[:10], # Format: YYYY-MM-DD from YYYY-MM-DD HH:MM:SS
|
||
"created_at": comment.get("createdtime", "")
|
||
})
|
||
|
||
logger.info(f"📝 Fetched {len(formatted_comments)} comments for case {case_id}")
|
||
return formatted_comments
|
||
|
||
except HTTPException as e:
|
||
# Rate limit or API error - log but don't fail sync
|
||
if "429" in str(e.detail) or "TOO_MANY_REQUESTS" in str(e.detail):
|
||
logger.warning(f"⚠️ Rate limited fetching comments for case {case_id} - skipping")
|
||
else:
|
||
logger.error(f"❌ API error fetching comments for case {case_id}: {e.detail}")
|
||
return []
|
||
except Exception as e:
|
||
logger.error(f"❌ Failed to fetch comments for case {case_id}: {e}")
|
||
return [] # Return empty list on error - don't fail entire sync
|
||
|
||
async def sync_case_comments(self, case_vtiger_id: str) -> Dict[str, Any]:
|
||
"""
|
||
Sync comments for a specific case (for on-demand updates).
|
||
|
||
Args:
|
||
case_vtiger_id: vTiger case ID (format: "39x1234")
|
||
|
||
Returns:
|
||
Dict with success status and comment count
|
||
"""
|
||
try:
|
||
# Fetch comments
|
||
comments = await self._get_case_comments(case_vtiger_id)
|
||
|
||
if not comments:
|
||
return {"success": True, "comments": 0, "message": "No comments found"}
|
||
|
||
# Update case in database
|
||
execute_update(
|
||
"""UPDATE tmodule_cases
|
||
SET vtiger_data = jsonb_set(
|
||
COALESCE(vtiger_data, '{}'::jsonb),
|
||
'{internal_comments}',
|
||
%s::jsonb
|
||
),
|
||
last_synced_at = CURRENT_TIMESTAMP
|
||
WHERE vtiger_id = %s""",
|
||
(json.dumps(comments), case_vtiger_id)
|
||
)
|
||
|
||
logger.info(f"✅ Synced {len(comments)} comments for case {case_vtiger_id}")
|
||
return {"success": True, "comments": len(comments), "message": f"Synced {len(comments)} comments"}
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Failed to sync comments for case {case_vtiger_id}: {e}")
|
||
return {"success": False, "comments": 0, "error": str(e)}
|
||
|
||
async def sync_time_entries(self, limit: int = 3000) -> Dict[str, int]:
|
||
"""
|
||
Sync time entries from vTiger Timelog module to tmodule_times.
|
||
|
||
vTiger's Timelog module contains detailed time entries with:
|
||
- timelognumber: Unique ID (TL1234)
|
||
- duration: Time in seconds
|
||
- relatedto: Reference to Case/Account
|
||
- is_billable: '1' = yes, '0' = no
|
||
- cf_timelog_invoiced: '1' = has been invoiced
|
||
|
||
We only sync entries where:
|
||
- relatedto is not empty (linked to a Case or Account)
|
||
- Has valid duration > 0
|
||
|
||
NOTE: is_billable and cf_timelog_invoiced fields are not reliably populated in vTiger,
|
||
so we sync all timelogs and let the approval workflow decide what to bill.
|
||
"""
|
||
logger.info(f"🔍 Syncing all timelogs from vTiger with valid relatedto...")
|
||
|
||
stats = {"imported": 0, "updated": 0, "skipped": 0, "errors": 0}
|
||
|
||
try:
|
||
# Cache for user names (avoid fetching same user multiple times)
|
||
user_name_cache = {}
|
||
|
||
# vTiger API doesn't support OFFSET - use id-based pagination instead
|
||
all_timelogs = []
|
||
last_id = "0x0" # Start from beginning
|
||
batch_size = 100 # Conservative batch size
|
||
max_batches = limit // batch_size + 1
|
||
|
||
for batch_num in range(max_batches):
|
||
# Use id > last_id for pagination (vTiger format: 43x1234)
|
||
# NOTE: vTiger query API ignores WHERE on custom fields, so we fetch all and filter later
|
||
query = f"SELECT * FROM Timelog WHERE id > '{last_id}' ORDER BY id LIMIT {batch_size};"
|
||
batch = await self._query(query)
|
||
|
||
if not batch: # No more records
|
||
break
|
||
|
||
all_timelogs.extend(batch)
|
||
last_id = batch[-1].get('id', last_id) # Get last ID for next iteration
|
||
|
||
logger.info(f"📥 Fetched {len(batch)} timelogs (total: {len(all_timelogs)}, last_id: {last_id})")
|
||
|
||
if len(batch) < batch_size: # Last batch
|
||
break
|
||
|
||
if len(all_timelogs) >= limit: # Reached limit
|
||
break
|
||
|
||
logger.info(f"✅ Total fetched: {len(all_timelogs)} Timelog entries from vTiger")
|
||
|
||
# We don't filter here - the existing code already filters by:
|
||
# 1. duration > 0
|
||
# 2. relatedto not empty
|
||
# These filters happen in the processing loop below
|
||
|
||
timelogs = all_timelogs[:limit] # Trim to requested limit
|
||
logger.info(f"📊 Processing {len(timelogs)} timelogs...")
|
||
|
||
# NOTE: retrieve API is too slow for batch operations (1500+ individual calls)
|
||
# We'll work with query data and accept that relatedto might be empty for some
|
||
|
||
for timelog in timelogs:
|
||
try:
|
||
vtiger_id = timelog.get('id', '')
|
||
if not vtiger_id:
|
||
stats["skipped"] += 1
|
||
continue
|
||
|
||
# Get duration in hours (stored as seconds in vTiger)
|
||
duration_seconds = float(timelog.get('duration', 0) or 0)
|
||
if duration_seconds <= 0:
|
||
logger.debug(f"⏭️ Skipping timelog {vtiger_id} - no duration")
|
||
stats["skipped"] += 1
|
||
continue
|
||
|
||
hours = Decimal(str(duration_seconds / 3600.0)) # Convert seconds to hours
|
||
|
||
# Get related entity (Case or Account)
|
||
related_to = timelog.get('relatedto', '')
|
||
case_id = None
|
||
customer_id = None
|
||
|
||
if related_to:
|
||
# Try to find case first, then account
|
||
case = execute_query_single(
|
||
"SELECT id, customer_id FROM tmodule_cases WHERE vtiger_id = %s",
|
||
(related_to,))
|
||
|
||
if case:
|
||
case_id = case['id']
|
||
customer_id = case['customer_id']
|
||
else:
|
||
# Try to find customer directly
|
||
customer = execute_query_single(
|
||
"SELECT id FROM tmodule_customers WHERE vtiger_id = %s",
|
||
(related_to,))
|
||
|
||
if customer:
|
||
customer_id = customer['id']
|
||
case_id = None # No specific case, just customer
|
||
else:
|
||
logger.debug(f"⏭️ Related entity {related_to} not found in our database - will skip")
|
||
stats["skipped"] += 1
|
||
continue
|
||
|
||
# If no customer found at all, skip this timelog
|
||
if not customer_id:
|
||
logger.warning(f"⚠️ Timelog {vtiger_id} has no valid customer reference - skipping")
|
||
stats["skipped"] += 1
|
||
continue
|
||
|
||
# Get user name with caching
|
||
assigned_user_id = timelog.get('assigned_user_id', '')
|
||
if assigned_user_id and assigned_user_id not in user_name_cache:
|
||
user_name_cache[assigned_user_id] = await self._fetch_user_name(assigned_user_id)
|
||
user_name = user_name_cache.get(assigned_user_id, assigned_user_id)
|
||
|
||
data_hash = self._calculate_hash(timelog)
|
||
|
||
# Check if exists
|
||
existing = execute_query_single(
|
||
"SELECT id, sync_hash FROM tmodule_times WHERE vtiger_id = %s",
|
||
(vtiger_id,))
|
||
|
||
if existing:
|
||
if existing['sync_hash'] == data_hash:
|
||
stats["skipped"] += 1
|
||
continue
|
||
|
||
# Update only if NOT yet approved
|
||
result = execute_update(
|
||
"""UPDATE tmodule_times
|
||
SET description = %s, original_hours = %s, worked_date = %s,
|
||
user_name = %s, billable = %s, vtiger_data = %s::jsonb,
|
||
sync_hash = %s, last_synced_at = CURRENT_TIMESTAMP
|
||
WHERE vtiger_id = %s AND status = 'pending'""",
|
||
(
|
||
timelog.get('name', ''),
|
||
hours,
|
||
timelog.get('startedon', None),
|
||
user_name,
|
||
timelog.get('isbillable', '0') == '1',
|
||
json.dumps(timelog),
|
||
data_hash,
|
||
vtiger_id
|
||
)
|
||
)
|
||
|
||
if result > 0:
|
||
stats["updated"] += 1
|
||
else:
|
||
logger.debug(f"⏭️ Time entry {vtiger_id} already approved")
|
||
stats["skipped"] += 1
|
||
else:
|
||
# Insert new
|
||
execute_insert(
|
||
"""INSERT INTO tmodule_times
|
||
(vtiger_id, case_id, customer_id, description, original_hours,
|
||
worked_date, user_name, billable, vtiger_data, sync_hash,
|
||
status, last_synced_at)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, 'pending', CURRENT_TIMESTAMP)""",
|
||
(
|
||
vtiger_id,
|
||
case_id,
|
||
customer_id,
|
||
timelog.get('name', ''),
|
||
hours,
|
||
timelog.get('startedon', None),
|
||
user_name,
|
||
timelog.get('isbillable', '0') == '1',
|
||
json.dumps(timelog),
|
||
data_hash
|
||
)
|
||
)
|
||
stats["imported"] += 1
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error processing timelog {timelog.get('id', 'unknown')}: {e}")
|
||
stats["errors"] += 1
|
||
|
||
logger.info(f"✅ Time entry sync complete: {stats}")
|
||
return stats
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Time entry sync failed: {e}")
|
||
raise
|
||
|
||
async def full_sync(
|
||
self,
|
||
user_id: Optional[int] = None,
|
||
fetch_comments: bool = False
|
||
) -> TModuleSyncStats:
|
||
"""
|
||
Perform full sync of all data from vTiger.
|
||
|
||
Order: Customers -> Cases -> Time Entries (dependencies)
|
||
|
||
Args:
|
||
user_id: User performing the sync
|
||
fetch_comments: Whether to fetch ModComments (slow - adds ~0.4s per case)
|
||
"""
|
||
if fetch_comments:
|
||
logger.info("🚀 Starting FULL vTiger sync WITH comments (this will be slow)...")
|
||
else:
|
||
logger.info("🚀 Starting FULL vTiger sync WITHOUT comments (fast mode)...")
|
||
|
||
start_time = datetime.now()
|
||
|
||
# Log sync started
|
||
audit.log_sync_started(user_id=user_id)
|
||
|
||
try:
|
||
# Test connection first
|
||
if not await self.test_connection():
|
||
raise HTTPException(
|
||
status_code=503,
|
||
detail="Cannot connect to vTiger - check credentials"
|
||
)
|
||
|
||
# Sync in order of dependencies
|
||
customer_stats = await self.sync_customers()
|
||
case_stats = await self.sync_cases(fetch_comments=fetch_comments)
|
||
time_stats = await self.sync_time_entries()
|
||
|
||
end_time = datetime.now()
|
||
duration = (end_time - start_time).total_seconds()
|
||
|
||
# Build result
|
||
result = TModuleSyncStats(
|
||
customers_imported=customer_stats["imported"],
|
||
customers_updated=customer_stats["updated"],
|
||
customers_skipped=customer_stats["skipped"],
|
||
cases_imported=case_stats["imported"],
|
||
cases_updated=case_stats["updated"],
|
||
cases_skipped=case_stats["skipped"],
|
||
times_imported=time_stats["imported"],
|
||
times_updated=time_stats["updated"],
|
||
times_skipped=time_stats["skipped"],
|
||
errors=(
|
||
customer_stats["errors"] +
|
||
case_stats["errors"] +
|
||
time_stats["errors"]
|
||
),
|
||
duration_seconds=duration,
|
||
started_at=start_time,
|
||
completed_at=end_time
|
||
)
|
||
|
||
# Log completion
|
||
audit.log_sync_completed(
|
||
stats=result.model_dump(),
|
||
user_id=user_id
|
||
)
|
||
|
||
logger.info(f"✅ Full sync completed in {duration:.2f}s")
|
||
logger.info(f"📊 Customers: {customer_stats['imported']} new, {customer_stats['updated']} updated")
|
||
logger.info(f"📊 Cases: {case_stats['imported']} new, {case_stats['updated']} updated")
|
||
logger.info(f"📊 Times: {time_stats['imported']} new, {time_stats['updated']} updated")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Full sync failed: {e}")
|
||
audit.log_sync_failed(error=str(e), user_id=user_id)
|
||
raise
|
||
|
||
|
||
# Singleton instance
|
||
vtiger_service = TimeTrackingVTigerService()
|