docs: Create vTiger & Simply-CRM integration setup guide with credential requirements feat: Implement ticket system enhancements including relations, calendar events, templates, and AI suggestions refactor: Update ticket system migration to include audit logging and enhanced email metadata
403 lines
14 KiB
Python
403 lines
14 KiB
Python
"""
|
|
E-conomic Export Service for Ticket System
|
|
Exports billable worklog entries to e-conomic as invoice lines
|
|
|
|
🚨 SAFETY MODES (inherited from EconomicService):
|
|
- TICKET_ECONOMIC_READ_ONLY: Blocks ALL write operations when True
|
|
- TICKET_ECONOMIC_DRY_RUN: Logs operations but doesn't send to e-conomic when True
|
|
- TICKET_ECONOMIC_AUTO_EXPORT: Enable automatic export on worklog approval
|
|
|
|
Integration Pattern:
|
|
1. Worklog entry marked "billable" → Export to e-conomic
|
|
2. Create invoice draft with worklog line items
|
|
3. Store economic_invoice_number in tticket_worklog
|
|
4. Mark worklog as "billed" after successful export
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, List, Optional
|
|
from datetime import date, datetime
|
|
from decimal import Decimal
|
|
|
|
from app.core.database import execute_query, execute_update, execute_query_single
|
|
from app.core.config import settings
|
|
from app.services.economic_service import EconomicService
|
|
from psycopg2.extras import Json
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TicketEconomicExportService:
|
|
"""
|
|
Handles export of billable worklog to e-conomic
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.economic = EconomicService()
|
|
self.read_only = getattr(settings, 'TICKET_ECONOMIC_READ_ONLY', True)
|
|
self.dry_run = getattr(settings, 'TICKET_ECONOMIC_DRY_RUN', True)
|
|
self.auto_export = getattr(settings, 'TICKET_ECONOMIC_AUTO_EXPORT', False)
|
|
|
|
# Log safety status
|
|
if self.read_only:
|
|
logger.warning("🔒 TICKET E-CONOMIC READ-ONLY MODE ENABLED")
|
|
elif self.dry_run:
|
|
logger.warning("🏃 TICKET E-CONOMIC DRY-RUN MODE ENABLED")
|
|
else:
|
|
logger.warning("⚠️ TICKET E-CONOMIC WRITE MODE ACTIVE")
|
|
|
|
def _check_export_permission(self, operation: str) -> bool:
|
|
"""Check if export operations are allowed"""
|
|
if self.read_only:
|
|
logger.error(f"🚫 BLOCKED: {operation} - TICKET_ECONOMIC_READ_ONLY=true")
|
|
logger.error("To enable: Set TICKET_ECONOMIC_READ_ONLY=false in .env")
|
|
return False
|
|
|
|
if self.dry_run:
|
|
logger.warning(f"🏃 DRY-RUN: {operation} - TICKET_ECONOMIC_DRY_RUN=true")
|
|
logger.warning("To execute: Set TICKET_ECONOMIC_DRY_RUN=false in .env")
|
|
return False
|
|
|
|
logger.warning(f"⚠️ EXECUTING: {operation} - Will create in e-conomic")
|
|
return True
|
|
|
|
async def export_billable_worklog_batch(
|
|
self,
|
|
customer_id: int,
|
|
worklog_ids: Optional[List[int]] = None,
|
|
date_from: Optional[date] = None,
|
|
date_to: Optional[date] = None
|
|
) -> Dict:
|
|
"""
|
|
Export billable worklog entries to e-conomic as draft invoice
|
|
|
|
Args:
|
|
customer_id: Customer ID for the invoice
|
|
worklog_ids: Specific worklog entry IDs to export (optional)
|
|
date_from: Start date filter (optional)
|
|
date_to: End date filter (optional)
|
|
|
|
Returns:
|
|
Dict with export results
|
|
"""
|
|
try:
|
|
logger.info(f"📦 Starting worklog export for customer {customer_id}")
|
|
|
|
# Get customer e-conomic info
|
|
customer = await self._get_customer_economic_info(customer_id)
|
|
if not customer:
|
|
raise ValueError(f"Customer {customer_id} not found or missing e-conomic mapping")
|
|
|
|
# Get billable worklog entries
|
|
worklogs = await self._get_billable_worklog(
|
|
customer_id, worklog_ids, date_from, date_to
|
|
)
|
|
|
|
if not worklogs:
|
|
logger.info("✅ No billable worklog entries found")
|
|
return {
|
|
'status': 'no_entries',
|
|
'exported_count': 0
|
|
}
|
|
|
|
logger.info(f"📋 Found {len(worklogs)} billable entries")
|
|
|
|
# Safety check
|
|
if not self._check_export_permission(f"Export {len(worklogs)} worklog entries"):
|
|
return {
|
|
'status': 'blocked',
|
|
'reason': 'Safety mode enabled',
|
|
'read_only': self.read_only,
|
|
'dry_run': self.dry_run,
|
|
'entries': len(worklogs)
|
|
}
|
|
|
|
# Create invoice in e-conomic
|
|
invoice_result = await self._create_economic_invoice(customer, worklogs)
|
|
|
|
# Update worklog entries with invoice number
|
|
await self._mark_worklogs_as_billed(
|
|
worklog_ids=[w['id'] for w in worklogs],
|
|
economic_invoice_number=invoice_result.get('draftInvoiceNumber')
|
|
)
|
|
|
|
logger.info(f"✅ Successfully exported {len(worklogs)} entries to e-conomic")
|
|
|
|
return {
|
|
'status': 'exported',
|
|
'exported_count': len(worklogs),
|
|
'invoice_number': invoice_result.get('draftInvoiceNumber'),
|
|
'total_hours': sum(float(w['hours']) for w in worklogs),
|
|
'total_amount': sum(float(w['amount']) for w in worklogs),
|
|
'entries': [
|
|
{
|
|
'id': w['id'],
|
|
'ticket_number': w['ticket_number'],
|
|
'hours': float(w['hours'])
|
|
}
|
|
for w in worklogs
|
|
]
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to export worklog: {e}")
|
|
raise
|
|
|
|
async def _get_customer_economic_info(self, customer_id: int) -> Optional[Dict]:
|
|
"""
|
|
Get customer e-conomic mapping information
|
|
|
|
Returns:
|
|
Dict with debtor_number, payment_terms, etc.
|
|
"""
|
|
query = """
|
|
SELECT
|
|
id,
|
|
name,
|
|
email,
|
|
economic_customer_number,
|
|
payment_terms_number,
|
|
address,
|
|
postal_code,
|
|
city
|
|
FROM customers
|
|
WHERE id = %s
|
|
"""
|
|
|
|
customer = execute_query_single(query, (customer_id,))
|
|
|
|
if not customer:
|
|
logger.error(f"❌ Customer {customer_id} not found")
|
|
return None
|
|
|
|
if not customer.get('economic_customer_number'):
|
|
logger.error(f"❌ Customer {customer_id} missing economic_customer_number")
|
|
return None
|
|
|
|
return customer
|
|
|
|
async def _get_billable_worklog(
|
|
self,
|
|
customer_id: int,
|
|
worklog_ids: Optional[List[int]] = None,
|
|
date_from: Optional[date] = None,
|
|
date_to: Optional[date] = None
|
|
) -> List[Dict]:
|
|
"""
|
|
Get billable worklog entries ready for export
|
|
|
|
Returns:
|
|
List of worklog dicts with ticket and customer info
|
|
"""
|
|
query = """
|
|
SELECT
|
|
w.id,
|
|
w.ticket_id,
|
|
w.work_date,
|
|
w.hours,
|
|
w.work_type,
|
|
w.description,
|
|
w.billing_method,
|
|
t.ticket_number,
|
|
t.subject AS ticket_subject,
|
|
t.customer_id,
|
|
c.economic_customer_number,
|
|
(w.hours * 850) AS amount
|
|
FROM tticket_worklog w
|
|
INNER JOIN tticket_tickets t ON t.id = w.ticket_id
|
|
INNER JOIN customers c ON c.id = t.customer_id
|
|
WHERE w.status = 'billable'
|
|
AND w.billing_method = 'invoice'
|
|
AND w.billed_at IS NULL
|
|
AND t.customer_id = %s
|
|
"""
|
|
|
|
params = [customer_id]
|
|
|
|
if worklog_ids:
|
|
query += " AND w.id = ANY(%s)"
|
|
params.append(worklog_ids)
|
|
|
|
if date_from:
|
|
query += " AND w.work_date >= %s"
|
|
params.append(date_from)
|
|
|
|
if date_to:
|
|
query += " AND w.work_date <= %s"
|
|
params.append(date_to)
|
|
|
|
query += " ORDER BY w.work_date ASC, w.created_at ASC"
|
|
|
|
return execute_query(query, tuple(params))
|
|
|
|
async def _create_economic_invoice(self, customer: Dict, worklogs: List[Dict]) -> Dict:
|
|
"""
|
|
Create draft invoice in e-conomic with worklog line items
|
|
|
|
Args:
|
|
customer: Customer dict with e-conomic mapping
|
|
worklogs: List of billable worklog entries
|
|
|
|
Returns:
|
|
e-conomic invoice response dict
|
|
"""
|
|
# Build invoice payload
|
|
invoice_data = {
|
|
"date": datetime.now().strftime("%Y-%m-%d"),
|
|
"customer": {
|
|
"customerNumber": customer['economic_customer_number']
|
|
},
|
|
"recipient": {
|
|
"name": customer['name'],
|
|
"address": customer.get('address', ''),
|
|
"zip": customer.get('postal_code', ''),
|
|
"city": customer.get('city', ''),
|
|
"vatZone": {
|
|
"vatZoneNumber": 1 # Denmark
|
|
}
|
|
},
|
|
"paymentTerms": {
|
|
"paymentTermsNumber": customer.get('payment_terms_number', 1)
|
|
},
|
|
"layout": {
|
|
"layoutNumber": 19 # Default layout
|
|
},
|
|
"lines": []
|
|
}
|
|
|
|
# Add worklog entries as invoice lines
|
|
for worklog in worklogs:
|
|
product_number = 'SUPPORT' # Default product number
|
|
|
|
# Build line description
|
|
ticket_ref = f"[{worklog['ticket_number']}] {worklog['ticket_subject']}"
|
|
work_desc = worklog.get('description', 'Support arbejde')
|
|
line_text = f"{ticket_ref}\n{work_desc}\n{worklog['work_date']}"
|
|
|
|
line = {
|
|
"product": {
|
|
"productNumber": product_number[:25] # Max 25 chars
|
|
},
|
|
"description": line_text[:250], # Max 250 chars
|
|
"quantity": float(worklog['hours']),
|
|
"unitNetPrice": float(worklog['amount']) / float(worklog['hours']),
|
|
"unit": {
|
|
"unitNumber": 1 # Hours
|
|
}
|
|
}
|
|
|
|
invoice_data['lines'].append(line)
|
|
|
|
logger.info(f"📄 Creating e-conomic invoice with {len(invoice_data['lines'])} lines")
|
|
logger.info(f"📊 Invoice payload: {invoice_data}")
|
|
|
|
# DRY-RUN: Just return mock response
|
|
if self.dry_run:
|
|
logger.warning("🏃 DRY-RUN: Would create invoice in e-conomic")
|
|
return {
|
|
'draftInvoiceNumber': 999999,
|
|
'grossAmount': sum(float(w['amount']) for w in worklogs),
|
|
'netAmount': sum(float(w['amount']) for w in worklogs),
|
|
'vatAmount': 0,
|
|
'dryRun': True
|
|
}
|
|
|
|
# REAL EXECUTION: Create in e-conomic
|
|
try:
|
|
result = await self.economic.create_draft_invoice(invoice_data)
|
|
logger.info(f"✅ Created e-conomic draft invoice: {result.get('draftInvoiceNumber')}")
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to create e-conomic invoice: {e}")
|
|
raise
|
|
|
|
async def _mark_worklogs_as_billed(
|
|
self,
|
|
worklog_ids: List[int],
|
|
economic_invoice_number: Optional[int]
|
|
) -> None:
|
|
"""
|
|
Mark worklog entries as billed with e-conomic reference
|
|
|
|
Args:
|
|
worklog_ids: List of worklog entry IDs
|
|
economic_invoice_number: e-conomic invoice number
|
|
"""
|
|
if not worklog_ids:
|
|
return
|
|
|
|
query = """
|
|
UPDATE tticket_worklog
|
|
SET billed_at = CURRENT_TIMESTAMP,
|
|
economic_invoice_number = %s,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ANY(%s)
|
|
"""
|
|
|
|
execute_update(query, (economic_invoice_number, worklog_ids))
|
|
|
|
logger.info(f"✅ Marked {len(worklog_ids)} worklog entries as billed")
|
|
|
|
async def get_export_preview(
|
|
self,
|
|
customer_id: int,
|
|
worklog_ids: Optional[List[int]] = None,
|
|
date_from: Optional[date] = None,
|
|
date_to: Optional[date] = None
|
|
) -> Dict:
|
|
"""
|
|
Preview what would be exported without actually exporting
|
|
|
|
Args:
|
|
Same as export_billable_worklog_batch
|
|
|
|
Returns:
|
|
Dict with preview information
|
|
"""
|
|
try:
|
|
customer = await self._get_customer_economic_info(customer_id)
|
|
if not customer:
|
|
return {
|
|
'status': 'error',
|
|
'error': 'Customer not found or missing e-conomic mapping'
|
|
}
|
|
|
|
worklogs = await self._get_billable_worklog(
|
|
customer_id, worklog_ids, date_from, date_to
|
|
)
|
|
|
|
total_hours = sum(float(w['hours']) for w in worklogs)
|
|
total_amount = sum(float(w['amount']) for w in worklogs)
|
|
|
|
return {
|
|
'status': 'preview',
|
|
'customer_id': customer_id,
|
|
'customer_name': customer['name'],
|
|
'economic_customer_number': customer['economic_customer_number'],
|
|
'entry_count': len(worklogs),
|
|
'total_hours': float(total_hours),
|
|
'total_amount': float(total_amount),
|
|
'entries': [
|
|
{
|
|
'id': w['id'],
|
|
'ticket_number': w['ticket_number'],
|
|
'work_date': w['work_date'].strftime('%Y-%m-%d'),
|
|
'hours': float(w['hours']),
|
|
'amount': float(w['amount']),
|
|
'description': w['description']
|
|
}
|
|
for w in worklogs
|
|
]
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to generate export preview: {e}")
|
|
return {
|
|
'status': 'error',
|
|
'error': str(e)
|
|
}
|
|
|
|
|
|
# Singleton instance
|
|
ticket_economic_service = TicketEconomicExportService()
|