#!/usr/bin/env python3 """ Backfill organization_name/contact_name on archived vTiger tickets. Usage (inside api container): python scripts/backfill_vtiger_archived_contacts.py --limit 200 --sleep 0.35 --retries 8 """ import argparse import asyncio import logging from typing import Any, Optional from app.core.database import init_db, execute_query, execute_update from app.services.vtiger_service import get_vtiger_service logging.basicConfig(level=logging.INFO) logger = logging.getLogger("vtiger_backfill") def _first_value(data: dict, keys: list[str]) -> Optional[str]: for key in keys: value = data.get(key) if value is None: continue text = str(value).strip() if text: return text return None def _looks_like_external_id(value: Optional[str]) -> bool: if not value: return False text = str(value) return "x" in text and len(text) >= 4 async def _query_with_retry(vtiger: Any, query_string: str, retries: int, base_delay: float) -> list[dict]: for attempt in range(retries + 1): result = await vtiger.query(query_string) status_code = getattr(vtiger, "last_query_status", None) error = getattr(vtiger, "last_query_error", None) or {} error_code = error.get("code") if isinstance(error, dict) else None if status_code != 429 and error_code != "TOO_MANY_REQUESTS": return result if attempt < retries: await asyncio.sleep(base_delay * (2**attempt)) return [] async def run(limit: int, sleep_seconds: float, retries: int, base_delay: float, only_missing_both: bool) -> None: init_db() vtiger = get_vtiger_service() missing_clause = ( "COALESCE(NULLIF(BTRIM(organization_name), ''), NULL) IS NULL " "AND COALESCE(NULLIF(BTRIM(contact_name), ''), NULL) IS NULL" if only_missing_both else "COALESCE(NULLIF(BTRIM(organization_name), ''), NULL) IS NULL OR COALESCE(NULLIF(BTRIM(contact_name), ''), NULL) IS NULL" ) rows = execute_query( f""" SELECT id, external_id, organization_name, contact_name FROM tticket_archived_tickets WHERE source_system = 'vtiger' AND ({missing_clause}) ORDER BY id ASC LIMIT %s """, (limit,), ) or [] logger.info("Candidates: %s", len(rows)) account_cache: dict[str, Optional[str]] = {} contact_cache: dict[str, Optional[str]] = {} contact_account_cache: dict[str, Optional[str]] = {} stats = { "candidates": len(rows), "updated": 0, "unchanged": 0, "case_missing": 0, "errors": 0, } for row in rows: archived_id = row["id"] external_id = row["external_id"] existing_org = (row.get("organization_name") or "").strip() existing_contact = (row.get("contact_name") or "").strip() try: case_rows = await _query_with_retry( vtiger, f"SELECT * FROM Cases WHERE id='{external_id}' LIMIT 1;", retries=retries, base_delay=base_delay, ) if not case_rows: stats["case_missing"] += 1 continue case_data = case_rows[0] organization_name = _first_value(case_data, ["accountname", "account_name", "organization", "company"]) contact_name = _first_value(case_data, ["contactname", "contact_name", "contact", "firstname", "lastname"]) account_id = _first_value(case_data, ["parent_id", "account_id", "accountid", "account"]) if not organization_name and _looks_like_external_id(account_id): if account_id not in account_cache: account_rows = await _query_with_retry( vtiger, f"SELECT * FROM Accounts WHERE id='{account_id}' LIMIT 1;", retries=retries, base_delay=base_delay, ) account_cache[account_id] = _first_value( account_rows[0] if account_rows else {}, ["accountname", "account_name", "name"], ) organization_name = account_cache.get(account_id) contact_id = _first_value(case_data, ["contact_id", "contactid"]) if _looks_like_external_id(contact_id): if contact_id not in contact_cache or contact_id not in contact_account_cache: contact_rows = await _query_with_retry( vtiger, f"SELECT * FROM Contacts WHERE id='{contact_id}' LIMIT 1;", retries=retries, base_delay=base_delay, ) contact_data = contact_rows[0] if contact_rows else {} first_name = _first_value(contact_data, ["firstname", "first_name", "first"]) last_name = _first_value(contact_data, ["lastname", "last_name", "last"]) combined_name = " ".join([name for name in [first_name, last_name] if name]).strip() contact_cache[contact_id] = combined_name or _first_value( contact_data, ["contactname", "contact_name", "name"], ) related_account_id = _first_value( contact_data, ["account_id", "accountid", "account", "parent_id"], ) contact_account_cache[contact_id] = related_account_id if _looks_like_external_id(related_account_id) else None if not contact_name: contact_name = contact_cache.get(contact_id) if not organization_name: related_account_id = contact_account_cache.get(contact_id) if related_account_id: if related_account_id not in account_cache: account_rows = await _query_with_retry( vtiger, f"SELECT * FROM Accounts WHERE id='{related_account_id}' LIMIT 1;", retries=retries, base_delay=base_delay, ) account_cache[related_account_id] = _first_value( account_rows[0] if account_rows else {}, ["accountname", "account_name", "name"], ) organization_name = account_cache.get(related_account_id) next_org = organization_name if (not existing_org and organization_name) else None next_contact = contact_name if (not existing_contact and contact_name) else None if not next_org and not next_contact: stats["unchanged"] += 1 else: execute_update( """ UPDATE tticket_archived_tickets SET organization_name = CASE WHEN COALESCE(NULLIF(BTRIM(organization_name), ''), NULL) IS NULL THEN COALESCE(%s, organization_name) ELSE organization_name END, contact_name = CASE WHEN COALESCE(NULLIF(BTRIM(contact_name), ''), NULL) IS NULL THEN COALESCE(%s, contact_name) ELSE contact_name END, last_synced_at = CURRENT_TIMESTAMP WHERE id = %s """, (next_org, next_contact, archived_id), ) stats["updated"] += 1 await asyncio.sleep(sleep_seconds) except Exception as exc: stats["errors"] += 1 logger.warning("Row %s (%s) failed: %s", archived_id, external_id, exc) logger.info("Backfill result: %s", stats) def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--limit", type=int, default=200) parser.add_argument("--sleep", type=float, default=0.35) parser.add_argument("--retries", type=int, default=8) parser.add_argument("--base-delay", type=float, default=1.2) parser.add_argument("--only-missing-both", action="store_true") args = parser.parse_args() asyncio.run( run( limit=args.limit, sleep_seconds=args.sleep, retries=args.retries, base_delay=args.base_delay, only_missing_both=args.only_missing_both, ) ) if __name__ == "__main__": main()