bmc_hub/app/services/vtiger_service.py

1139 lines
45 KiB
Python
Raw Normal View History

"""
vTiger Cloud CRM Integration Service
Handles subscription and sales order data retrieval
"""
import logging
import json
import re
import html as html_lib
import aiohttp
from decimal import Decimal
from typing import List, Dict, Optional, Any
from app.core.config import settings
logger = logging.getLogger(__name__)
class VTigerService:
"""Service for integrating with vTiger Cloud CRM via REST API"""
def __init__(self):
self.base_url = getattr(settings, 'VTIGER_URL', None)
self.username = getattr(settings, 'VTIGER_USERNAME', None)
self.api_key = getattr(settings, 'VTIGER_API_KEY', None)
# REST API endpoint
if self.base_url:
self.rest_endpoint = f"{self.base_url}/restapi/v1/vtiger/default"
else:
self.rest_endpoint = None
if not all([self.base_url, self.username, self.api_key]):
logger.warning("⚠️ vTiger credentials not fully configured")
self.last_query_status: Optional[int] = None
self.last_query_error: Optional[Dict] = None
def _get_auth(self):
"""Get HTTP Basic Auth credentials"""
if not self.api_key:
raise ValueError("VTIGER_API_KEY not configured")
return aiohttp.BasicAuth(self.username, self.api_key)
@staticmethod
def _sanitize_vtiger_id(raw_value: Optional[str]) -> Optional[str]:
"""Allow-list known vTiger id characters before embedding in query strings."""
if not raw_value:
return None
candidate = str(raw_value).strip()
if re.match(r"^[A-Za-z0-9_\-x]+$", candidate):
return candidate
return None
@staticmethod
def _extract_timelog_hours(timelog_data: Dict[str, Any]) -> Decimal:
"""Extract hours from common vTiger timelog fields and normalize to decimal hours."""
raw_value = None
field_used = None
for key in ("time_spent", "duration", "total_hours", "hours"):
value = timelog_data.get(key)
if value not in (None, ""):
raw_value = value
field_used = key
break
if raw_value in (None, ""):
return Decimal("0")
if field_used == "duration":
try:
seconds = Decimal(str(raw_value))
return (seconds / Decimal(3600)).quantize(Decimal("0.01"))
except Exception:
pass
raw_str = str(raw_value).strip().lower()
time_match = re.match(r"^(\d+):(\d+)(?::(\d+))?$", raw_str)
if time_match:
hours = Decimal(int(time_match.group(1)))
minutes = Decimal(int(time_match.group(2))) / Decimal(60)
seconds = Decimal(int(time_match.group(3) or 0)) / Decimal(3600)
return (hours + minutes + seconds).quantize(Decimal("0.01"))
if "h" in raw_str or "m" in raw_str or "s" in raw_str:
total_hours = Decimal("0")
hours_match = re.search(r"(\d+(?:[\.,]\d+)?)\s*h", raw_str)
mins_match = re.search(r"(\d+(?:[\.,]\d+)?)\s*m", raw_str)
secs_match = re.search(r"(\d+(?:[\.,]\d+)?)\s*s", raw_str)
if hours_match:
total_hours += Decimal(hours_match.group(1).replace(",", "."))
if mins_match:
total_hours += Decimal(mins_match.group(1).replace(",", ".")) / Decimal(60)
if secs_match:
total_hours += Decimal(secs_match.group(1).replace(",", ".")) / Decimal(3600)
return total_hours.quantize(Decimal("0.01"))
try:
return Decimal(raw_str.replace(",", ".")).quantize(Decimal("0.01"))
except Exception:
return Decimal("0")
async def query(self, query_string: str) -> List[Dict]:
"""
Execute a query on vTiger REST API
Args:
query_string: SQL-like query (e.g., "SELECT * FROM Accounts;")
Returns:
List of records
"""
if not self.rest_endpoint:
raise ValueError("VTIGER_URL not configured")
try:
auth = self._get_auth()
self.last_query_status = None
self.last_query_error = None
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.rest_endpoint}/query",
params={"query": query_string},
auth=auth
) as response:
text = await response.text()
self.last_query_status = response.status
if response.status == 200:
# vTiger returns text/json instead of application/json
import json
try:
data = json.loads(text)
except json.JSONDecodeError as e:
logger.error(f"❌ Invalid JSON in query response: {text[:200]}")
return []
if data.get('success'):
result = data.get('result', [])
logger.info(f"✅ Query returned {len(result)} records")
self.last_query_error = None
return result
else:
self.last_query_error = data.get('error')
logger.error(f"❌ vTiger query failed: {self.last_query_error}")
return []
else:
try:
parsed = json.loads(text) if text else {}
self.last_query_error = parsed.get('error')
except Exception:
self.last_query_error = None
if response.status == 429:
logger.warning(f"⚠️ vTiger query rate-limited (HTTP {response.status})")
else:
logger.error(f"❌ vTiger query HTTP error {response.status}")
logger.error(f"Query: {query_string}")
logger.error(f"Response: {text[:500]}")
return []
except Exception as e:
self.last_query_status = None
self.last_query_error = {"message": str(e)}
logger.error(f"❌ vTiger query error: {e}")
return []
async def get_account_by_id(self, account_id: str) -> Optional[Dict]:
"""
Fetch a single account by ID from vTiger
Args:
account_id: vTiger account ID (e.g., "3x760")
Returns:
Account record or None
"""
if not account_id:
logger.warning("⚠️ No account ID provided")
return None
try:
query = f"SELECT * FROM Accounts WHERE id='{account_id}' LIMIT 1;"
results = await self.query(query)
if results and len(results) > 0:
logger.info(f"✅ Found account {account_id}")
return results[0]
else:
logger.warning(f"⚠️ No account found with ID {account_id}")
return None
except Exception as e:
logger.error(f"❌ Error fetching account {account_id}: {e}")
return None
async def update_account(self, account_id: str, update_data: Dict) -> bool:
"""
Update an account in vTiger
Args:
account_id: vTiger account ID (e.g., "3x760")
update_data: Dictionary of fields to update
Returns:
True if successful, False otherwise
"""
if not self.rest_endpoint:
raise ValueError("VTIGER_URL not configured")
try:
# Fetch current account first - vTiger requires modifiedtime for updates
current_account = await self.get_account_by_id(account_id)
if not current_account:
logger.error(f"❌ Account {account_id} not found for update")
return False
auth = self._get_auth()
# Build payload with current data + updates
# Include essential fields for vTiger update validation
payload = {
'id': account_id,
'accountname': current_account.get('accountname'),
'assigned_user_id': current_account.get('assigned_user_id'),
'modifiedtime': current_account.get('modifiedtime'),
**update_data
}
logger.info(f"🔄 vTiger update payload: {payload}")
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.rest_endpoint}/update",
json=payload,
auth=auth
) as response:
text = await response.text()
if response.status == 200:
import json
try:
data = json.loads(text)
if data.get('success'):
logger.info(f"✅ Updated vTiger account {account_id}")
return True
else:
logger.error(f"❌ vTiger update failed: {data.get('error')}")
logger.error(f"Full response: {text}")
return False
except json.JSONDecodeError:
logger.error(f"❌ Invalid JSON in update response: {text[:200]}")
return False
else:
logger.error(f"❌ vTiger update HTTP error {response.status}")
logger.error(f"Full response: {text[:2000]}")
return False
except Exception as e:
logger.error(f"❌ Error updating vTiger account {account_id}: {e}")
return False
async def get_customer_sales_orders(self, vtiger_account_id: str) -> List[Dict]:
"""
Fetch sales orders for a customer from vTiger
Args:
vtiger_account_id: vTiger account ID (e.g., "3x760")
Returns:
List of sales order records
"""
if not vtiger_account_id:
logger.warning("⚠️ No vTiger account ID provided")
return []
try:
# Query for sales orders linked to this account
query = f"SELECT * FROM SalesOrder WHERE account_id='{vtiger_account_id}';"
logger.info(f"🔍 Fetching sales orders for vTiger account {vtiger_account_id}")
orders = await self.query(query)
logger.info(f"✅ Found {len(orders)} sales orders")
return orders
except Exception as e:
logger.error(f"❌ Error fetching sales orders: {e}")
return []
async def get_customer_subscriptions(self, vtiger_account_id: str) -> List[Dict]:
"""
Fetch subscriptions for a customer from vTiger
Args:
vtiger_account_id: vTiger account ID (e.g., "3x760")
Returns:
List of subscription records
"""
if not vtiger_account_id:
logger.warning("⚠️ No vTiger account ID provided")
return []
try:
# Query for subscriptions linked to this account (note: module name is singular "Subscription")
query = f"SELECT * FROM Subscription WHERE account_id='{vtiger_account_id}';"
logger.info(f"🔍 Fetching subscriptions for vTiger account {vtiger_account_id}")
subscriptions = await self.query(query)
logger.info(f"✅ Found {len(subscriptions)} subscriptions")
return subscriptions
except Exception as e:
logger.error(f"❌ Error fetching subscriptions: {e}")
return []
async def get_subscription(self, subscription_id: str) -> Optional[Dict]:
"""
Fetch full subscription details with LineItems from vTiger
Args:
subscription_id: vTiger subscription ID (e.g., "72x123")
Returns:
Subscription record with LineItems array or None
"""
if not self.rest_endpoint:
raise ValueError("VTIGER_URL not configured")
try:
auth = self._get_auth()
async with aiohttp.ClientSession() as session:
url = f"{self.rest_endpoint}/retrieve?id={subscription_id}"
async with session.get(url, auth=auth) as response:
if response.status == 200:
text = await response.text()
data = json.loads(text)
if data.get('success'):
logger.info(f"✅ Retrieved subscription {subscription_id}")
return data.get('result')
else:
logger.error(f"❌ vTiger API error: {data.get('error')}")
return None
else:
logger.error(f"❌ HTTP {response.status} retrieving subscription")
return None
except Exception as e:
logger.error(f"❌ Error retrieving subscription: {e}")
return None
async def get_account(self, vtiger_account_id: str) -> Optional[Dict]:
"""
Fetch account details from vTiger
Args:
vtiger_account_id: vTiger account ID (e.g., "3x760")
Returns:
Account record with BMC Låst field (cf_accounts_bmclst) or None
"""
if not vtiger_account_id:
logger.warning("⚠️ No vTiger account ID provided")
return None
try:
# Query for account by ID
query = f"SELECT * FROM Accounts WHERE id='{vtiger_account_id}';"
logger.info(f"🔍 Fetching account details for {vtiger_account_id}")
accounts = await self.query(query)
if accounts:
logger.info(f"✅ Found account: {accounts[0].get('accountname', 'Unknown')}")
return accounts[0]
else:
logger.warning(f"⚠️ No account found with ID {vtiger_account_id}")
return None
except Exception as e:
logger.error(f"❌ Error fetching account: {e}")
return None
async def test_connection(self) -> bool:
"""
Test vTiger connection using /me endpoint
Returns:
True if connection successful
"""
if not self.rest_endpoint:
raise ValueError("VTIGER_URL not configured in .env")
try:
auth = self._get_auth()
logger.info(f"🔑 Testing vTiger connection...")
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.rest_endpoint}/me",
auth=auth
) as response:
if response.status == 200:
# vTiger returns text/json instead of application/json
text = await response.text()
import json
data = json.loads(text)
if data.get('success'):
user_name = data['result'].get('user_name')
logger.info(f"✅ vTiger connection successful (user: {user_name})")
return True
else:
logger.error(f"❌ vTiger API returned success=false: {data}")
return False
else:
error_text = await response.text()
logger.error(f"❌ vTiger connection failed: HTTP {response.status}: {error_text}")
return False
except Exception as e:
logger.error(f"❌ vTiger connection error: {e}")
return False
async def create_subscription(
self,
account_id: str,
subject: str,
startdate: str,
generateinvoiceevery: str,
subscriptionstatus: str = "Active",
enddate: Optional[str] = None,
products: Optional[List[Dict]] = None
) -> Dict:
"""
Create a new subscription in vTiger
Args:
account_id: vTiger account ID (e.g., "3x123")
subject: Subscription subject/name
startdate: Start date (YYYY-MM-DD)
generateinvoiceevery: Frequency ("Monthly", "Quarterly", "Yearly")
subscriptionstatus: Status ("Active", "Cancelled", "Stopped")
enddate: End date (YYYY-MM-DD), optional
products: List of products [{"productid": "id", "quantity": 1, "listprice": 100}]
"""
if not self.rest_endpoint:
raise ValueError("VTIGER_URL not configured")
try:
auth = self._get_auth()
# Build subscription data
subscription_data = {
"account_id": account_id,
"subject": subject,
"startdate": startdate,
"generateinvoiceevery": generateinvoiceevery,
"subscriptionstatus": subscriptionstatus,
}
if enddate:
subscription_data["enddate"] = enddate
# Add products if provided
if products:
subscription_data["products"] = products
async with aiohttp.ClientSession() as session:
create_url = f"{self.rest_endpoint}/create"
payload = {
"elementType": "Subscription",
"element": subscription_data
}
logger.info(f"📤 Creating subscription: {subject}")
async with session.post(create_url, json=payload, auth=auth) as response:
if response.status == 200:
data = await response.json()
if data.get("success"):
result = data.get("result", {})
logger.info(f"✅ Created subscription: {result.get('id')}")
return result
else:
error_msg = data.get("error", {}).get("message", "Unknown error")
raise Exception(f"vTiger API error: {error_msg}")
else:
error_text = await response.text()
raise Exception(f"HTTP {response.status}: {error_text}")
except Exception as e:
logger.error(f"❌ Error creating subscription: {e}")
raise
async def update_subscription(self, subscription_id: str, updates: Dict, line_items: List[Dict] = None) -> Dict:
"""
Update a subscription in vTiger with optional line item price updates
Args:
subscription_id: vTiger subscription ID (e.g., "72x123")
updates: Dictionary of fields to update (subject, startdate, etc.)
line_items: Optional list of line items with updated prices
Each item: {"productid": "6x123", "quantity": "3", "listprice": "299.00"}
"""
if not self.rest_endpoint:
raise ValueError("VTIGER_URL not configured")
try:
auth = self._get_auth()
async with aiohttp.ClientSession() as session:
# For custom fields (cf_*), we need to retrieve first for context
if any(k.startswith('cf_') for k in updates.keys()):
logger.info(f"📥 Retrieving subscription {subscription_id} for custom field update")
current_sub = await self.get_subscription(subscription_id)
if current_sub:
# Only include essential fields + custom field update
essential_fields = ['id', 'account_id', 'subject', 'startdate',
'generateinvoiceevery', 'subscriptionstatus']
element_data = {
k: current_sub[k]
for k in essential_fields
if k in current_sub
}
element_data.update(updates)
element_data['id'] = subscription_id
else:
element_data = {"id": subscription_id, **updates}
else:
element_data = {"id": subscription_id, **updates}
update_url = f"{self.rest_endpoint}/update"
# Add LineItems if provided
if line_items:
element_data["LineItems"] = line_items
logger.info(f"📝 Updating {len(line_items)} line items")
payload = {
"elementType": "Subscription",
"element": element_data
}
logger.info(f"📤 Updating subscription {subscription_id}: {list(updates.keys())}")
logger.debug(f"Payload: {json.dumps(payload, indent=2)}")
async with session.post(update_url, json=payload, auth=auth) as response:
if response.status == 200:
text = await response.text()
data = json.loads(text)
if data.get("success"):
result = data.get("result", {})
logger.info(f"✅ Updated subscription: {subscription_id}")
return result
else:
error_msg = data.get("error", {}).get("message", "Unknown error")
logger.error(f"❌ vTiger error response: {data.get('error')}")
raise Exception(f"vTiger API error: {error_msg}")
else:
error_text = await response.text()
logger.error(f"❌ vTiger HTTP error: {error_text[:500]}")
raise Exception(f"HTTP {response.status}: {error_text}")
except Exception as e:
logger.error(f"❌ Error updating subscription: {e}")
raise
async def get_service_contracts(self, account_id: Optional[str] = None, active_only: bool = True) -> List[Dict]:
"""
Fetch service contracts from vTiger
Args:
account_id: Optional - filter by account ID (e.g., "3x760")
Returns:
List of service contract records with account_id included
"""
try:
query_filters = []
safe_account_id = self._sanitize_vtiger_id(account_id)
if account_id and not safe_account_id:
logger.warning("⚠️ Rejected unsafe account_id for service contract query")
return []
if safe_account_id:
query_filters.append(f"sc_related_to='{safe_account_id}'")
if active_only:
query_filters.append("contract_status='Active'")
where_clause = f" WHERE {' AND '.join(query_filters)}" if query_filters else ""
query = f"SELECT * FROM ServiceContracts{where_clause};"
if safe_account_id:
logger.info(f"🔍 Fetching service contracts for account {safe_account_id} (active_only={active_only})")
else:
logger.info(f"🔍 Fetching all service contracts (active_only={active_only})")
contracts = await self.query(query)
logger.info(f"✅ Found {len(contracts)} service contracts")
# Normalize fields used by the wizard
for contract in contracts:
if 'account_id' not in contract:
contract['account_id'] = (
contract.get('accountid')
or contract.get('cf_service_contracts_account')
or contract.get('sc_related_to')
or ""
)
if 'contract_number' not in contract:
contract['contract_number'] = contract.get('contract_no', '')
if not contract['account_id']:
logger.warning(
f"⚠️ Contract {contract.get('id')} has no account_id, filling null"
)
return contracts
except Exception as e:
logger.error(f"❌ Error fetching service contracts: {e}")
return []
async def get_service_contract_cases(self, contract_id: str) -> List[Dict]:
"""
Fetch cases linked to a service contract
Args:
contract_id: vTiger service contract ID (e.g., "75x123")
Returns:
List of case records linked to contract
"""
try:
# Query cases linked via service contract reference
query = (
"SELECT * FROM Cases WHERE "
f"servicecontract_id='{contract_id}' OR "
f"parent_id='{contract_id}';"
)
logger.info(f"🔍 Fetching cases for service contract {contract_id}")
cases = await self.query(query)
logger.info(f"✅ Found {len(cases)} cases for contract")
return cases
except Exception as e:
logger.error(f"❌ Error fetching contract cases: {e}")
return []
async def get_service_contract_timelogs(self, contract_id: str) -> List[Dict]:
"""
Fetch time logs linked to a service contract
Args:
contract_id: vTiger service contract ID (e.g., "75x123")
Returns:
List of timelog records linked to contract
"""
try:
# Query timelogs linked via service contract reference
query = (
"SELECT * FROM Timelog WHERE "
f"relatedto='{contract_id}';"
)
logger.info(f"🔍 Fetching timelogs for service contract {contract_id}")
timelogs = await self.query(query)
# Fallback: timelogs may be linked to cases instead of the contract
if not timelogs:
cases = await self.get_service_contract_cases(contract_id)
case_ids = [case.get('id') for case in cases if case.get('id')]
if case_ids:
logger.info(
f" No direct timelogs found; querying by {len(case_ids)} case IDs"
)
timelog_map = {}
chunk_size = 20
for index in range(0, len(case_ids), chunk_size):
chunk = case_ids[index:index + chunk_size]
ids = "', '".join(chunk)
chunk_query = (
"SELECT * FROM Timelog WHERE "
f"relatedto IN ('{ids}');"
)
chunk_rows = await self.query(chunk_query)
for row in chunk_rows:
if row.get('id'):
timelog_map[row['id']] = row
timelogs = list(timelog_map.values())
logger.info(f"✅ Found {len(timelogs)} timelogs for contract")
return timelogs
except Exception as e:
logger.error(f"❌ Error fetching contract timelogs: {e}")
return []
async def get_service_contract_customers(self) -> List[Dict[str, str]]:
"""Fetch account list that has at least one service contract."""
contracts = await self.get_service_contracts(active_only=False)
account_ids = sorted({
c.get("account_id")
for c in contracts
if c.get("account_id")
})
if not account_ids:
return []
customers: Dict[str, Dict[str, str]] = {}
chunk_size = 20
for i in range(0, len(account_ids), chunk_size):
chunk = account_ids[i:i + chunk_size]
ids = "', '".join(chunk)
query = (
"SELECT id, accountname FROM Accounts "
f"WHERE id IN ('{ids}');"
)
rows = await self.query(query)
for row in rows:
account_id = row.get("id")
if account_id:
customers[account_id] = {
"account_id": account_id,
"account_name": row.get("accountname") or account_id,
}
# Fallback names for accounts not returned by Accounts query
for account_id in account_ids:
if account_id not in customers:
customers[account_id] = {
"account_id": account_id,
"account_name": account_id,
}
return sorted(customers.values(), key=lambda x: x.get("account_name", "").lower())
@staticmethod
def _is_closed_case(status: Optional[str]) -> bool:
if not status:
return False
normalized = status.strip().lower()
return normalized in {"closed", "resolved", "done", "completed", "lukket"}
@staticmethod
def _first_non_empty(record: Dict[str, Any], keys: tuple[str, ...]) -> str:
for key in keys:
value = record.get(key)
if value not in (None, ""):
text = str(value).strip()
if text:
return text
return ""
@staticmethod
def _html_to_text(value: Optional[str]) -> str:
"""Convert HTML snippets from CRM fields into readable plain text."""
if value in (None, ""):
return ""
text = str(value)
text = re.sub(r"(?i)<br\s*/?>", "\n", text)
text = re.sub(r"(?is)<(script|style).*?>.*?</\1>", " ", text)
text = re.sub(r"(?is)<[^>]+>", " ", text)
text = html_lib.unescape(text)
text = text.replace("\xa0", " ")
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
@staticmethod
def _extract_vtiger_record_id(raw_id: Optional[str]) -> Optional[str]:
if not raw_id:
return None
text = str(raw_id).strip()
if not text:
return None
if re.match(r"^\d+x\d+$", text):
return text.split("x", 1)[1]
if text.isdigit():
return text
return None
def _build_vtiger_record_url(self, module: str, raw_id: Optional[str]) -> str:
if not self.base_url:
return ""
record_id = self._extract_vtiger_record_id(raw_id)
if not record_id:
return ""
base = str(self.base_url).rstrip("/")
if module == "Cases":
return f"{base}/view/detail?module=Cases&id={record_id}&viewtype=summary"
if module == "Timelog":
return f"{base}/view/detail?module=Timelog&id={record_id}&viewtype=summary"
return f"{base}/index.php?module={module}&view=Detail&record={record_id}"
@staticmethod
def _to_initials(value: Optional[str]) -> str:
if not value:
return "--"
text = str(value).strip()
if not text:
return "--"
# Raw vTiger entity IDs are not human initials.
if re.match(r"^\d+x\d+$", text):
return "--"
tokens = [token for token in re.split(r"[^A-Za-z0-9]+", text) if token]
if not tokens:
return "--"
if len(tokens) == 1:
return tokens[0][:2].upper()
return f"{tokens[0][0]}{tokens[1][0]}".upper()
async def _resolve_user_initials(self, user_refs: List[str]) -> Dict[str, str]:
initials_by_ref: Dict[str, str] = {}
safe_ids = []
for raw_ref in user_refs:
safe_ref = self._sanitize_vtiger_id(raw_ref)
if safe_ref:
safe_ids.append(safe_ref)
if not safe_ids:
return initials_by_ref
unique_ids = sorted(set(safe_ids))
chunk_size = 20
for index in range(0, len(unique_ids), chunk_size):
chunk = unique_ids[index:index + chunk_size]
id_list = "', '".join(chunk)
rows = await self.query(
"SELECT id, first_name, last_name, user_name FROM Users "
f"WHERE id IN ('{id_list}');"
)
for row in rows:
user_id = row.get("id")
if not user_id:
continue
full_name = " ".join(
part for part in [row.get("first_name"), row.get("last_name")] if part
).strip()
source = full_name or row.get("user_name") or user_id
initials_by_ref[user_id] = self._to_initials(source)
# Some timelog owners are groups, not user records.
group_rows = await self.query(
"SELECT id, groupname FROM Groups "
f"WHERE id IN ('{id_list}');"
)
for row in group_rows:
group_id = row.get("id")
if not group_id or group_id in initials_by_ref:
continue
initials_by_ref[group_id] = self._to_initials(row.get("groupname") or group_id)
return initials_by_ref
async def _resolve_contact_names(self, contact_refs: List[str]) -> Dict[str, str]:
names_by_ref: Dict[str, str] = {}
safe_ids = []
for raw_ref in contact_refs:
if not raw_ref:
continue
text = str(raw_ref).strip()
safe_ref = self._sanitize_vtiger_id(text)
if safe_ref and re.match(r"^\d+x\d+$", safe_ref):
safe_ids.append(safe_ref)
elif text:
names_by_ref[text] = text
if not safe_ids:
return names_by_ref
unique_ids = sorted(set(safe_ids))
chunk_size = 20
for index in range(0, len(unique_ids), chunk_size):
chunk = unique_ids[index:index + chunk_size]
id_list = "', '".join(chunk)
rows = await self.query(
"SELECT id, firstname, lastname, salutationtype FROM Contacts "
f"WHERE id IN ('{id_list}');"
)
for row in rows:
contact_id = row.get("id")
if not contact_id:
continue
parts = [
str(row.get("salutationtype") or "").strip(),
str(row.get("firstname") or "").strip(),
str(row.get("lastname") or "").strip(),
]
full_name = " ".join(part for part in parts if part)
names_by_ref[contact_id] = full_name or contact_id
return names_by_ref
async def get_service_contract_report_data(self, account_id: str, contract_id: str) -> Dict[str, Any]:
"""Build report payload for selected customer + service contract."""
safe_account_id = self._sanitize_vtiger_id(account_id)
safe_contract_id = self._sanitize_vtiger_id(contract_id)
if not safe_account_id or not safe_contract_id:
raise ValueError("Invalid account_id or contract_id")
contract_rows = await self.query(
f"SELECT * FROM ServiceContracts WHERE id='{safe_contract_id}' LIMIT 1;"
)
if not contract_rows:
raise ValueError("Service contract was not found")
contract = contract_rows[0]
contract_account_id = (
contract.get("account_id")
or contract.get("accountid")
or contract.get("cf_service_contracts_account")
or contract.get("sc_related_to")
or ""
)
# If contract is linked to a different customer, return clear validation error.
if contract_account_id and contract_account_id != safe_account_id:
raise ValueError("Service contract does not belong to selected customer")
account = await self.get_account_by_id(safe_account_id)
account_name = (account or {}).get("accountname") or safe_account_id
cases = await self.get_service_contract_cases(safe_contract_id)
timelogs = await self.get_service_contract_timelogs(safe_contract_id)
contact_ref_candidates = [
str(
case.get("contact_id")
or case.get("contactid")
or case.get("parent_contact_id")
or case.get("contactname")
or case.get("cf_contact_person")
or ""
)
for case in cases
if (
case.get("contact_id")
or case.get("contactid")
or case.get("parent_contact_id")
or case.get("contactname")
or case.get("cf_contact_person")
)
]
contact_name_map = await self._resolve_contact_names(contact_ref_candidates)
user_ref_candidates = [
str(
log.get("assigned_user_id")
or log.get("modifiedby")
or log.get("created_user_id")
or ""
)
for log in timelogs
if (
log.get("assigned_user_id")
or log.get("modifiedby")
or log.get("created_user_id")
)
]
user_initials_map = await self._resolve_user_initials(user_ref_candidates)
case_map: Dict[str, Dict[str, Any]] = {}
report_cases: List[Dict[str, Any]] = []
for case in cases:
case_id = case.get("id")
if not case_id:
continue
case_payload = {
"id": case_id,
"cc_number": self._first_non_empty(
case,
("ticket_no", "case_no", "caseno", "ticketid", "cf_case_number")
) or case_id,
"title": self._first_non_empty(
case,
("ticket_title", "tickettitle", "subject", "title")
) or f"Case {case_id}",
"description": self._html_to_text(
self._first_non_empty(
case,
("description", "ticketdescription", "solution", "comments", "comment")
)
) or "Ingen beskrivelse",
"contact_person": contact_name_map.get(
str(
case.get("contact_id")
or case.get("contactid")
or case.get("parent_contact_id")
or case.get("contactname")
or case.get("cf_contact_person")
or ""
),
self._first_non_empty(
case,
("contactname", "contact_id", "contactid", "cf_contact_person")
) or "-"
),
"vtiger_url": self._build_vtiger_record_url("Cases", case_id),
"status": case.get("ticketstatus") or case.get("status"),
"priority": case.get("ticketpriorities") or case.get("priority"),
"total_hours": Decimal("0"),
"timelog_count": 0,
"timelogs": [],
}
case_map[case_id] = case_payload
report_cases.append(case_payload)
unmatched_case_id = "UNMAPPED"
for log in timelogs:
related_case_id = (
log.get("relatedto")
or log.get("case_id")
or log.get("ticket_id")
or log.get("parent_id")
)
raw_user = (
log.get("assigned_user_id")
or log.get("modifiedby")
or log.get("created_user_id")
or ""
)
hours = self._extract_timelog_hours(log)
timelog_payload = {
"id": log.get("id") or "",
"related_case_id": related_case_id,
"worked_date": log.get("date_start") or log.get("createdtime") or log.get("modifiedtime"),
"user_name": raw_user,
"employee_initials": user_initials_map.get(str(raw_user), self._to_initials(str(raw_user))),
"description": self._html_to_text(
self._first_non_empty(
log,
(
"description",
"subject",
"commentcontent",
"comments",
"details",
"note",
"notes",
"cf_timelog_description",
),
)
) or "Ingen beskrivelse",
"status": log.get("status"),
"billable": log.get("billable"),
"hours": hours,
"vtiger_url": self._build_vtiger_record_url("Timelog", log.get("id")),
}
if related_case_id in case_map:
target_case = case_map[related_case_id]
else:
if unmatched_case_id not in case_map:
unmatched_case = {
"id": unmatched_case_id,
"cc_number": "-",
"title": "Timelogs uden relateret case",
"description": "Ingen relateret sag fra servicekontrakten.",
"contact_person": "-",
"vtiger_url": "",
"status": "Unknown",
"priority": None,
"total_hours": Decimal("0"),
"timelog_count": 0,
"timelogs": [],
}
case_map[unmatched_case_id] = unmatched_case
report_cases.append(unmatched_case)
target_case = case_map[unmatched_case_id]
target_case["timelogs"].append(timelog_payload)
target_case["timelog_count"] += 1
target_case["total_hours"] = (target_case["total_hours"] + hours).quantize(Decimal("0.01"))
total_timelogs = sum(int(case_item["timelog_count"]) for case_item in report_cases)
total_hours = sum((case_item["total_hours"] for case_item in report_cases), Decimal("0")).quantize(Decimal("0.01"))
closed_cases = sum(
1 for case_item in report_cases
if self._is_closed_case(case_item.get("status"))
)
open_cases = max(0, len(report_cases) - closed_cases)
return {
"customer": {
"account_id": safe_account_id,
"account_name": account_name,
},
"contract": {
"id": safe_contract_id,
"contract_number": contract.get("contract_number") or contract.get("contract_no") or "",
"subject": contract.get("subject") or "",
"contract_status": contract.get("contract_status"),
"vtiger_url": self._build_vtiger_record_url("ServiceContracts", safe_contract_id),
},
"cases": report_cases,
"summary": {
"total_cases": len(report_cases),
"open_cases": open_cases,
"closed_cases": closed_cases,
"total_timelogs": total_timelogs,
"total_hours": total_hours,
},
}
# Singleton instance
_vtiger_service = None
def get_vtiger_service() -> VTigerService:
"""Get or create vTiger service singleton"""
global _vtiger_service
if _vtiger_service is None:
_vtiger_service = VTigerService()
return _vtiger_service