- Introduced Technician Dashboard V1 (tech_v1_overview.html) with KPI cards and new cases overview. - Implemented Technician Dashboard V2 (tech_v2_workboard.html) featuring a workboard layout for daily tasks and opportunities. - Developed Technician Dashboard V3 (tech_v3_table_focus.html) with a power table for detailed case management. - Created a dashboard selector page (technician_dashboard_selector.html) for easy navigation between dashboard versions. - Added user dashboard preferences migration (130_user_dashboard_preferences.sql) to store default dashboard paths. - Enhanced sag_sager table with assigned group ID (131_sag_assignment_group.sql) for better case management. - Updated sag_subscriptions table to include cancellation rules and billing dates (132_subscription_cancellation.sql, 134_subscription_billing_dates.sql). - Implemented subscription staging for CRM integration (136_simply_subscription_staging.sql). - Added a script to move time tracking section in detail view (move_time_section.py). - Created a test script for subscription processing (test_subscription_processing.py).
647 lines
26 KiB
Python
647 lines
26 KiB
Python
"""
|
||
vTiger Cloud CRM Integration Service
|
||
Handles subscription and sales order data retrieval
|
||
"""
|
||
import logging
|
||
import json
|
||
import aiohttp
|
||
from typing import List, Dict, Optional
|
||
from app.core.config import settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class VTigerService:
|
||
"""Service for integrating with vTiger Cloud CRM via REST API"""
|
||
|
||
def __init__(self):
|
||
self.base_url = getattr(settings, 'VTIGER_URL', None)
|
||
self.username = getattr(settings, 'VTIGER_USERNAME', None)
|
||
self.api_key = getattr(settings, 'VTIGER_API_KEY', None)
|
||
|
||
# REST API endpoint
|
||
if self.base_url:
|
||
self.rest_endpoint = f"{self.base_url}/restapi/v1/vtiger/default"
|
||
else:
|
||
self.rest_endpoint = None
|
||
|
||
if not all([self.base_url, self.username, self.api_key]):
|
||
logger.warning("⚠️ vTiger credentials not fully configured")
|
||
|
||
self.last_query_status: Optional[int] = None
|
||
self.last_query_error: Optional[Dict] = None
|
||
|
||
def _get_auth(self):
|
||
"""Get HTTP Basic Auth credentials"""
|
||
if not self.api_key:
|
||
raise ValueError("VTIGER_API_KEY not configured")
|
||
return aiohttp.BasicAuth(self.username, self.api_key)
|
||
|
||
async def query(self, query_string: str) -> List[Dict]:
|
||
"""
|
||
Execute a query on vTiger REST API
|
||
|
||
Args:
|
||
query_string: SQL-like query (e.g., "SELECT * FROM Accounts;")
|
||
|
||
Returns:
|
||
List of records
|
||
"""
|
||
if not self.rest_endpoint:
|
||
raise ValueError("VTIGER_URL not configured")
|
||
|
||
try:
|
||
auth = self._get_auth()
|
||
self.last_query_status = None
|
||
self.last_query_error = None
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(
|
||
f"{self.rest_endpoint}/query",
|
||
params={"query": query_string},
|
||
auth=auth
|
||
) as response:
|
||
text = await response.text()
|
||
self.last_query_status = response.status
|
||
|
||
if response.status == 200:
|
||
# vTiger returns text/json instead of application/json
|
||
import json
|
||
try:
|
||
data = json.loads(text)
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"❌ Invalid JSON in query response: {text[:200]}")
|
||
return []
|
||
|
||
if data.get('success'):
|
||
result = data.get('result', [])
|
||
logger.info(f"✅ Query returned {len(result)} records")
|
||
self.last_query_error = None
|
||
return result
|
||
else:
|
||
self.last_query_error = data.get('error')
|
||
logger.error(f"❌ vTiger query failed: {self.last_query_error}")
|
||
return []
|
||
else:
|
||
try:
|
||
parsed = json.loads(text) if text else {}
|
||
self.last_query_error = parsed.get('error')
|
||
except Exception:
|
||
self.last_query_error = None
|
||
if response.status == 429:
|
||
logger.warning(f"⚠️ vTiger query rate-limited (HTTP {response.status})")
|
||
else:
|
||
logger.error(f"❌ vTiger query HTTP error {response.status}")
|
||
logger.error(f"Query: {query_string}")
|
||
logger.error(f"Response: {text[:500]}")
|
||
return []
|
||
except Exception as e:
|
||
self.last_query_status = None
|
||
self.last_query_error = {"message": str(e)}
|
||
logger.error(f"❌ vTiger query error: {e}")
|
||
return []
|
||
|
||
async def get_account_by_id(self, account_id: str) -> Optional[Dict]:
|
||
"""
|
||
Fetch a single account by ID from vTiger
|
||
|
||
Args:
|
||
account_id: vTiger account ID (e.g., "3x760")
|
||
|
||
Returns:
|
||
Account record or None
|
||
"""
|
||
if not account_id:
|
||
logger.warning("⚠️ No account ID provided")
|
||
return None
|
||
|
||
try:
|
||
query = f"SELECT * FROM Accounts WHERE id='{account_id}' LIMIT 1;"
|
||
results = await self.query(query)
|
||
|
||
if results and len(results) > 0:
|
||
logger.info(f"✅ Found account {account_id}")
|
||
return results[0]
|
||
else:
|
||
logger.warning(f"⚠️ No account found with ID {account_id}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error fetching account {account_id}: {e}")
|
||
return None
|
||
|
||
async def update_account(self, account_id: str, update_data: Dict) -> bool:
|
||
"""
|
||
Update an account in vTiger
|
||
|
||
Args:
|
||
account_id: vTiger account ID (e.g., "3x760")
|
||
update_data: Dictionary of fields to update
|
||
|
||
Returns:
|
||
True if successful, False otherwise
|
||
"""
|
||
if not self.rest_endpoint:
|
||
raise ValueError("VTIGER_URL not configured")
|
||
|
||
try:
|
||
# Fetch current account first - vTiger requires modifiedtime for updates
|
||
current_account = await self.get_account_by_id(account_id)
|
||
if not current_account:
|
||
logger.error(f"❌ Account {account_id} not found for update")
|
||
return False
|
||
|
||
auth = self._get_auth()
|
||
|
||
# Build payload with current data + updates
|
||
# Include essential fields for vTiger update validation
|
||
payload = {
|
||
'id': account_id,
|
||
'accountname': current_account.get('accountname'),
|
||
'assigned_user_id': current_account.get('assigned_user_id'),
|
||
'modifiedtime': current_account.get('modifiedtime'),
|
||
**update_data
|
||
}
|
||
|
||
logger.info(f"🔄 vTiger update payload: {payload}")
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.post(
|
||
f"{self.rest_endpoint}/update",
|
||
json=payload,
|
||
auth=auth
|
||
) as response:
|
||
text = await response.text()
|
||
|
||
if response.status == 200:
|
||
import json
|
||
try:
|
||
data = json.loads(text)
|
||
if data.get('success'):
|
||
logger.info(f"✅ Updated vTiger account {account_id}")
|
||
return True
|
||
else:
|
||
logger.error(f"❌ vTiger update failed: {data.get('error')}")
|
||
logger.error(f"Full response: {text}")
|
||
return False
|
||
except json.JSONDecodeError:
|
||
logger.error(f"❌ Invalid JSON in update response: {text[:200]}")
|
||
return False
|
||
else:
|
||
logger.error(f"❌ vTiger update HTTP error {response.status}")
|
||
logger.error(f"Full response: {text[:2000]}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error updating vTiger account {account_id}: {e}")
|
||
return False
|
||
|
||
async def get_customer_sales_orders(self, vtiger_account_id: str) -> List[Dict]:
|
||
"""
|
||
Fetch sales orders for a customer from vTiger
|
||
|
||
Args:
|
||
vtiger_account_id: vTiger account ID (e.g., "3x760")
|
||
|
||
Returns:
|
||
List of sales order records
|
||
"""
|
||
if not vtiger_account_id:
|
||
logger.warning("⚠️ No vTiger account ID provided")
|
||
return []
|
||
|
||
try:
|
||
# Query for sales orders linked to this account
|
||
query = f"SELECT * FROM SalesOrder WHERE account_id='{vtiger_account_id}';"
|
||
|
||
logger.info(f"🔍 Fetching sales orders for vTiger account {vtiger_account_id}")
|
||
orders = await self.query(query)
|
||
|
||
logger.info(f"✅ Found {len(orders)} sales orders")
|
||
return orders
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error fetching sales orders: {e}")
|
||
return []
|
||
|
||
async def get_customer_subscriptions(self, vtiger_account_id: str) -> List[Dict]:
|
||
"""
|
||
Fetch subscriptions for a customer from vTiger
|
||
|
||
Args:
|
||
vtiger_account_id: vTiger account ID (e.g., "3x760")
|
||
|
||
Returns:
|
||
List of subscription records
|
||
"""
|
||
if not vtiger_account_id:
|
||
logger.warning("⚠️ No vTiger account ID provided")
|
||
return []
|
||
|
||
try:
|
||
# Query for subscriptions linked to this account (note: module name is singular "Subscription")
|
||
query = f"SELECT * FROM Subscription WHERE account_id='{vtiger_account_id}';"
|
||
|
||
logger.info(f"🔍 Fetching subscriptions for vTiger account {vtiger_account_id}")
|
||
subscriptions = await self.query(query)
|
||
|
||
logger.info(f"✅ Found {len(subscriptions)} subscriptions")
|
||
return subscriptions
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error fetching subscriptions: {e}")
|
||
return []
|
||
|
||
async def get_subscription(self, subscription_id: str) -> Optional[Dict]:
|
||
"""
|
||
Fetch full subscription details with LineItems from vTiger
|
||
|
||
Args:
|
||
subscription_id: vTiger subscription ID (e.g., "72x123")
|
||
|
||
Returns:
|
||
Subscription record with LineItems array or None
|
||
"""
|
||
if not self.rest_endpoint:
|
||
raise ValueError("VTIGER_URL not configured")
|
||
|
||
try:
|
||
auth = self._get_auth()
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
url = f"{self.rest_endpoint}/retrieve?id={subscription_id}"
|
||
|
||
async with session.get(url, auth=auth) as response:
|
||
if response.status == 200:
|
||
text = await response.text()
|
||
data = json.loads(text)
|
||
if data.get('success'):
|
||
logger.info(f"✅ Retrieved subscription {subscription_id}")
|
||
return data.get('result')
|
||
else:
|
||
logger.error(f"❌ vTiger API error: {data.get('error')}")
|
||
return None
|
||
else:
|
||
logger.error(f"❌ HTTP {response.status} retrieving subscription")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error retrieving subscription: {e}")
|
||
return None
|
||
|
||
async def get_account(self, vtiger_account_id: str) -> Optional[Dict]:
|
||
"""
|
||
Fetch account details from vTiger
|
||
|
||
Args:
|
||
vtiger_account_id: vTiger account ID (e.g., "3x760")
|
||
|
||
Returns:
|
||
Account record with BMC Låst field (cf_accounts_bmclst) or None
|
||
"""
|
||
if not vtiger_account_id:
|
||
logger.warning("⚠️ No vTiger account ID provided")
|
||
return None
|
||
|
||
try:
|
||
# Query for account by ID
|
||
query = f"SELECT * FROM Accounts WHERE id='{vtiger_account_id}';"
|
||
|
||
logger.info(f"🔍 Fetching account details for {vtiger_account_id}")
|
||
accounts = await self.query(query)
|
||
|
||
if accounts:
|
||
logger.info(f"✅ Found account: {accounts[0].get('accountname', 'Unknown')}")
|
||
return accounts[0]
|
||
else:
|
||
logger.warning(f"⚠️ No account found with ID {vtiger_account_id}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error fetching account: {e}")
|
||
return None
|
||
|
||
async def test_connection(self) -> bool:
|
||
"""
|
||
Test vTiger connection using /me endpoint
|
||
|
||
Returns:
|
||
True if connection successful
|
||
"""
|
||
if not self.rest_endpoint:
|
||
raise ValueError("VTIGER_URL not configured in .env")
|
||
|
||
try:
|
||
auth = self._get_auth()
|
||
logger.info(f"🔑 Testing vTiger connection...")
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(
|
||
f"{self.rest_endpoint}/me",
|
||
auth=auth
|
||
) as response:
|
||
if response.status == 200:
|
||
# vTiger returns text/json instead of application/json
|
||
text = await response.text()
|
||
import json
|
||
data = json.loads(text)
|
||
|
||
if data.get('success'):
|
||
user_name = data['result'].get('user_name')
|
||
logger.info(f"✅ vTiger connection successful (user: {user_name})")
|
||
return True
|
||
else:
|
||
logger.error(f"❌ vTiger API returned success=false: {data}")
|
||
return False
|
||
else:
|
||
error_text = await response.text()
|
||
logger.error(f"❌ vTiger connection failed: HTTP {response.status}: {error_text}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ vTiger connection error: {e}")
|
||
return False
|
||
|
||
async def create_subscription(
|
||
self,
|
||
account_id: str,
|
||
subject: str,
|
||
startdate: str,
|
||
generateinvoiceevery: str,
|
||
subscriptionstatus: str = "Active",
|
||
enddate: Optional[str] = None,
|
||
products: Optional[List[Dict]] = None
|
||
) -> Dict:
|
||
"""
|
||
Create a new subscription in vTiger
|
||
|
||
Args:
|
||
account_id: vTiger account ID (e.g., "3x123")
|
||
subject: Subscription subject/name
|
||
startdate: Start date (YYYY-MM-DD)
|
||
generateinvoiceevery: Frequency ("Monthly", "Quarterly", "Yearly")
|
||
subscriptionstatus: Status ("Active", "Cancelled", "Stopped")
|
||
enddate: End date (YYYY-MM-DD), optional
|
||
products: List of products [{"productid": "id", "quantity": 1, "listprice": 100}]
|
||
"""
|
||
if not self.rest_endpoint:
|
||
raise ValueError("VTIGER_URL not configured")
|
||
|
||
try:
|
||
auth = self._get_auth()
|
||
|
||
# Build subscription data
|
||
subscription_data = {
|
||
"account_id": account_id,
|
||
"subject": subject,
|
||
"startdate": startdate,
|
||
"generateinvoiceevery": generateinvoiceevery,
|
||
"subscriptionstatus": subscriptionstatus,
|
||
}
|
||
|
||
if enddate:
|
||
subscription_data["enddate"] = enddate
|
||
|
||
# Add products if provided
|
||
if products:
|
||
subscription_data["products"] = products
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
create_url = f"{self.rest_endpoint}/create"
|
||
|
||
payload = {
|
||
"elementType": "Subscription",
|
||
"element": subscription_data
|
||
}
|
||
|
||
logger.info(f"📤 Creating subscription: {subject}")
|
||
|
||
async with session.post(create_url, json=payload, auth=auth) as response:
|
||
if response.status == 200:
|
||
data = await response.json()
|
||
if data.get("success"):
|
||
result = data.get("result", {})
|
||
logger.info(f"✅ Created subscription: {result.get('id')}")
|
||
return result
|
||
else:
|
||
error_msg = data.get("error", {}).get("message", "Unknown error")
|
||
raise Exception(f"vTiger API error: {error_msg}")
|
||
else:
|
||
error_text = await response.text()
|
||
raise Exception(f"HTTP {response.status}: {error_text}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error creating subscription: {e}")
|
||
raise
|
||
|
||
async def update_subscription(self, subscription_id: str, updates: Dict, line_items: List[Dict] = None) -> Dict:
|
||
"""
|
||
Update a subscription in vTiger with optional line item price updates
|
||
|
||
Args:
|
||
subscription_id: vTiger subscription ID (e.g., "72x123")
|
||
updates: Dictionary of fields to update (subject, startdate, etc.)
|
||
line_items: Optional list of line items with updated prices
|
||
Each item: {"productid": "6x123", "quantity": "3", "listprice": "299.00"}
|
||
"""
|
||
if not self.rest_endpoint:
|
||
raise ValueError("VTIGER_URL not configured")
|
||
|
||
try:
|
||
auth = self._get_auth()
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
# For custom fields (cf_*), we need to retrieve first for context
|
||
if any(k.startswith('cf_') for k in updates.keys()):
|
||
logger.info(f"📥 Retrieving subscription {subscription_id} for custom field update")
|
||
current_sub = await self.get_subscription(subscription_id)
|
||
if current_sub:
|
||
# Only include essential fields + custom field update
|
||
essential_fields = ['id', 'account_id', 'subject', 'startdate',
|
||
'generateinvoiceevery', 'subscriptionstatus']
|
||
element_data = {
|
||
k: current_sub[k]
|
||
for k in essential_fields
|
||
if k in current_sub
|
||
}
|
||
element_data.update(updates)
|
||
element_data['id'] = subscription_id
|
||
else:
|
||
element_data = {"id": subscription_id, **updates}
|
||
else:
|
||
element_data = {"id": subscription_id, **updates}
|
||
|
||
update_url = f"{self.rest_endpoint}/update"
|
||
|
||
# Add LineItems if provided
|
||
if line_items:
|
||
element_data["LineItems"] = line_items
|
||
logger.info(f"📝 Updating {len(line_items)} line items")
|
||
|
||
payload = {
|
||
"elementType": "Subscription",
|
||
"element": element_data
|
||
}
|
||
|
||
logger.info(f"📤 Updating subscription {subscription_id}: {list(updates.keys())}")
|
||
logger.debug(f"Payload: {json.dumps(payload, indent=2)}")
|
||
|
||
async with session.post(update_url, json=payload, auth=auth) as response:
|
||
if response.status == 200:
|
||
text = await response.text()
|
||
data = json.loads(text)
|
||
if data.get("success"):
|
||
result = data.get("result", {})
|
||
logger.info(f"✅ Updated subscription: {subscription_id}")
|
||
return result
|
||
else:
|
||
error_msg = data.get("error", {}).get("message", "Unknown error")
|
||
logger.error(f"❌ vTiger error response: {data.get('error')}")
|
||
raise Exception(f"vTiger API error: {error_msg}")
|
||
else:
|
||
error_text = await response.text()
|
||
logger.error(f"❌ vTiger HTTP error: {error_text[:500]}")
|
||
raise Exception(f"HTTP {response.status}: {error_text}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error updating subscription: {e}")
|
||
raise
|
||
|
||
async def get_service_contracts(self, account_id: Optional[str] = None) -> List[Dict]:
|
||
"""
|
||
Fetch service contracts from vTiger
|
||
|
||
Args:
|
||
account_id: Optional - filter by account ID (e.g., "3x760")
|
||
|
||
Returns:
|
||
List of service contract records with account_id included
|
||
"""
|
||
try:
|
||
if account_id:
|
||
query = (
|
||
"SELECT * FROM ServiceContracts "
|
||
f"WHERE sc_related_to='{account_id}' AND contract_status='Active';"
|
||
)
|
||
logger.info(f"🔍 Fetching active service contracts for account {account_id}")
|
||
else:
|
||
query = "SELECT * FROM ServiceContracts WHERE contract_status='Active';"
|
||
logger.info(f"🔍 Fetching all active service contracts")
|
||
|
||
contracts = await self.query(query)
|
||
logger.info(f"✅ Found {len(contracts)} service contracts")
|
||
|
||
# Normalize fields used by the wizard
|
||
for contract in contracts:
|
||
if 'account_id' not in contract:
|
||
contract['account_id'] = (
|
||
contract.get('accountid')
|
||
or contract.get('cf_service_contracts_account')
|
||
or contract.get('sc_related_to')
|
||
or ""
|
||
)
|
||
|
||
if 'contract_number' not in contract:
|
||
contract['contract_number'] = contract.get('contract_no', '')
|
||
|
||
if not contract['account_id']:
|
||
logger.warning(
|
||
f"⚠️ Contract {contract.get('id')} has no account_id, filling null"
|
||
)
|
||
|
||
return contracts
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error fetching service contracts: {e}")
|
||
return []
|
||
|
||
async def get_service_contract_cases(self, contract_id: str) -> List[Dict]:
|
||
"""
|
||
Fetch cases linked to a service contract
|
||
|
||
Args:
|
||
contract_id: vTiger service contract ID (e.g., "75x123")
|
||
|
||
Returns:
|
||
List of case records linked to contract
|
||
"""
|
||
try:
|
||
# Query cases linked via service contract reference
|
||
query = (
|
||
"SELECT * FROM Cases WHERE "
|
||
f"servicecontract_id='{contract_id}' OR "
|
||
f"parent_id='{contract_id}';"
|
||
)
|
||
logger.info(f"🔍 Fetching cases for service contract {contract_id}")
|
||
|
||
cases = await self.query(query)
|
||
logger.info(f"✅ Found {len(cases)} cases for contract")
|
||
return cases
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error fetching contract cases: {e}")
|
||
return []
|
||
|
||
async def get_service_contract_timelogs(self, contract_id: str) -> List[Dict]:
|
||
"""
|
||
Fetch time logs linked to a service contract
|
||
|
||
Args:
|
||
contract_id: vTiger service contract ID (e.g., "75x123")
|
||
|
||
Returns:
|
||
List of timelog records linked to contract
|
||
"""
|
||
try:
|
||
# Query timelogs linked via service contract reference
|
||
query = (
|
||
"SELECT * FROM Timelog WHERE "
|
||
f"relatedto='{contract_id}';"
|
||
)
|
||
logger.info(f"🔍 Fetching timelogs for service contract {contract_id}")
|
||
|
||
timelogs = await self.query(query)
|
||
|
||
# Fallback: timelogs may be linked to cases instead of the contract
|
||
if not timelogs:
|
||
cases = await self.get_service_contract_cases(contract_id)
|
||
case_ids = [case.get('id') for case in cases if case.get('id')]
|
||
|
||
if case_ids:
|
||
logger.info(
|
||
f"ℹ️ No direct timelogs found; querying by {len(case_ids)} case IDs"
|
||
)
|
||
timelog_map = {}
|
||
chunk_size = 20
|
||
|
||
for index in range(0, len(case_ids), chunk_size):
|
||
chunk = case_ids[index:index + chunk_size]
|
||
ids = "', '".join(chunk)
|
||
chunk_query = (
|
||
"SELECT * FROM Timelog WHERE "
|
||
f"relatedto IN ('{ids}');"
|
||
)
|
||
chunk_rows = await self.query(chunk_query)
|
||
for row in chunk_rows:
|
||
if row.get('id'):
|
||
timelog_map[row['id']] = row
|
||
|
||
timelogs = list(timelog_map.values())
|
||
|
||
logger.info(f"✅ Found {len(timelogs)} timelogs for contract")
|
||
return timelogs
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error fetching contract timelogs: {e}")
|
||
return []
|
||
|
||
|
||
# Singleton instance
|
||
_vtiger_service = None
|
||
|
||
def get_vtiger_service() -> VTigerService:
|
||
"""Get or create vTiger service singleton"""
|
||
global _vtiger_service
|
||
if _vtiger_service is None:
|
||
_vtiger_service = VTigerService()
|
||
return _vtiger_service
|