bmc_hub/app/jobs/backfill_vtiger_archived_contacts.py

224 lines
8.6 KiB
Python
Raw Normal View History

"""
Backfill organization_name/contact_name on archived vTiger tickets.
Usage:
python -m app.jobs.backfill_vtiger_archived_contacts --limit 200 --sleep 0.35 --retries 8
"""
import argparse
import asyncio
import logging
from typing import Any, Optional
from app.core.database import init_db, execute_query, execute_update
from app.services.vtiger_service import get_vtiger_service
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("vtiger_backfill")
def _first_value(data: dict, keys: list[str]) -> Optional[str]:
for key in keys:
value = data.get(key)
if value is None:
continue
text = str(value).strip()
if text:
return text
return None
def _looks_like_external_id(value: Optional[str]) -> bool:
if not value:
return False
text = str(value)
return "x" in text and len(text) >= 4
async def _query_with_retry(vtiger: Any, query_string: str, retries: int, base_delay: float) -> list[dict]:
for attempt in range(retries + 1):
result = await vtiger.query(query_string)
status_code = getattr(vtiger, "last_query_status", None)
error = getattr(vtiger, "last_query_error", None) or {}
error_code = error.get("code") if isinstance(error, dict) else None
if status_code != 429 and error_code != "TOO_MANY_REQUESTS":
return result
if attempt < retries:
await asyncio.sleep(base_delay * (2**attempt))
return []
async def run(limit: int, sleep_seconds: float, retries: int, base_delay: float, only_missing_both: bool) -> None:
init_db()
vtiger = get_vtiger_service()
missing_clause = (
"COALESCE(NULLIF(BTRIM(organization_name), ''), NULL) IS NULL "
"AND COALESCE(NULLIF(BTRIM(contact_name), ''), NULL) IS NULL"
if only_missing_both
else "COALESCE(NULLIF(BTRIM(organization_name), ''), NULL) IS NULL OR COALESCE(NULLIF(BTRIM(contact_name), ''), NULL) IS NULL"
)
rows = execute_query(
f"""
SELECT id, external_id, organization_name, contact_name
FROM tticket_archived_tickets
WHERE source_system = 'vtiger'
AND ({missing_clause})
ORDER BY id ASC
LIMIT %s
""",
(limit,),
) or []
logger.info("Candidates: %s", len(rows))
account_cache: dict[str, Optional[str]] = {}
contact_cache: dict[str, Optional[str]] = {}
contact_account_cache: dict[str, Optional[str]] = {}
stats = {
"candidates": len(rows),
"updated": 0,
"unchanged": 0,
"case_missing": 0,
"errors": 0,
}
for row in rows:
archived_id = row["id"]
external_id = row["external_id"]
existing_org = (row.get("organization_name") or "").strip()
existing_contact = (row.get("contact_name") or "").strip()
try:
case_rows = await _query_with_retry(
vtiger,
f"SELECT * FROM Cases WHERE id='{external_id}' LIMIT 1;",
retries=retries,
base_delay=base_delay,
)
if not case_rows:
stats["case_missing"] += 1
continue
case_data = case_rows[0]
organization_name = _first_value(case_data, ["accountname", "account_name", "organization", "company"])
contact_name = _first_value(case_data, ["contactname", "contact_name", "contact", "firstname", "lastname"])
account_id = _first_value(case_data, ["parent_id", "account_id", "accountid", "account"])
if not organization_name and _looks_like_external_id(account_id):
if account_id not in account_cache:
account_rows = await _query_with_retry(
vtiger,
f"SELECT * FROM Accounts WHERE id='{account_id}' LIMIT 1;",
retries=retries,
base_delay=base_delay,
)
account_cache[account_id] = _first_value(
account_rows[0] if account_rows else {},
["accountname", "account_name", "name"],
)
organization_name = account_cache.get(account_id)
contact_id = _first_value(case_data, ["contact_id", "contactid"])
if _looks_like_external_id(contact_id):
if contact_id not in contact_cache or contact_id not in contact_account_cache:
contact_rows = await _query_with_retry(
vtiger,
f"SELECT * FROM Contacts WHERE id='{contact_id}' LIMIT 1;",
retries=retries,
base_delay=base_delay,
)
contact_data = contact_rows[0] if contact_rows else {}
first_name = _first_value(contact_data, ["firstname", "first_name", "first"])
last_name = _first_value(contact_data, ["lastname", "last_name", "last"])
combined_name = " ".join([name for name in [first_name, last_name] if name]).strip()
contact_cache[contact_id] = combined_name or _first_value(
contact_data,
["contactname", "contact_name", "name"],
)
related_account_id = _first_value(
contact_data,
["account_id", "accountid", "account", "parent_id"],
)
contact_account_cache[contact_id] = related_account_id if _looks_like_external_id(related_account_id) else None
if not contact_name:
contact_name = contact_cache.get(contact_id)
if not organization_name:
related_account_id = contact_account_cache.get(contact_id)
if related_account_id:
if related_account_id not in account_cache:
account_rows = await _query_with_retry(
vtiger,
f"SELECT * FROM Accounts WHERE id='{related_account_id}' LIMIT 1;",
retries=retries,
base_delay=base_delay,
)
account_cache[related_account_id] = _first_value(
account_rows[0] if account_rows else {},
["accountname", "account_name", "name"],
)
organization_name = account_cache.get(related_account_id)
next_org = organization_name if (not existing_org and organization_name) else None
next_contact = contact_name if (not existing_contact and contact_name) else None
if not next_org and not next_contact:
stats["unchanged"] += 1
else:
execute_update(
"""
UPDATE tticket_archived_tickets
SET organization_name = CASE
WHEN COALESCE(NULLIF(BTRIM(organization_name), ''), NULL) IS NULL THEN COALESCE(%s, organization_name)
ELSE organization_name
END,
contact_name = CASE
WHEN COALESCE(NULLIF(BTRIM(contact_name), ''), NULL) IS NULL THEN COALESCE(%s, contact_name)
ELSE contact_name
END,
last_synced_at = CURRENT_TIMESTAMP
WHERE id = %s
""",
(next_org, next_contact, archived_id),
)
stats["updated"] += 1
await asyncio.sleep(sleep_seconds)
except Exception as exc:
stats["errors"] += 1
logger.warning("Row %s (%s) failed: %s", archived_id, external_id, exc)
logger.info("Backfill result: %s", stats)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--limit", type=int, default=200)
parser.add_argument("--sleep", type=float, default=0.35)
parser.add_argument("--retries", type=int, default=8)
parser.add_argument("--base-delay", type=float, default=1.2)
parser.add_argument("--only-missing-both", action="store_true")
args = parser.parse_args()
asyncio.run(
run(
limit=args.limit,
sleep_seconds=args.sleep,
retries=args.retries,
base_delay=args.base_delay,
only_missing_both=args.only_missing_both,
)
)
if __name__ == "__main__":
main()