2025-12-15 12:28:12 +01:00
|
|
|
"""
|
|
|
|
|
Email Workflow Service
|
|
|
|
|
Executes automated workflows based on email classification
|
|
|
|
|
Inspired by OmniSync architecture adapted for BMC Hub
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import logging
|
|
|
|
|
from typing import Dict, List, Optional, Any
|
|
|
|
|
from datetime import datetime, date
|
|
|
|
|
import re
|
|
|
|
|
import json
|
|
|
|
|
import hashlib
|
|
|
|
|
import shutil
|
2026-04-01 21:34:58 +02:00
|
|
|
import io
|
2026-04-03 00:50:34 +02:00
|
|
|
import html
|
2025-12-15 12:28:12 +01:00
|
|
|
from pathlib import Path
|
|
|
|
|
from decimal import Decimal
|
2026-04-01 21:34:58 +02:00
|
|
|
from uuid import uuid4
|
2025-12-15 12:28:12 +01:00
|
|
|
|
2026-04-01 21:34:58 +02:00
|
|
|
from app.core.database import execute_query, execute_insert, execute_update, table_has_column
|
2025-12-15 12:28:12 +01:00
|
|
|
from app.core.config import settings
|
|
|
|
|
from app.services.email_activity_logger import email_activity_logger
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EmailWorkflowService:
|
|
|
|
|
"""Orchestrates workflow execution for classified emails"""
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.enabled = settings.EMAIL_WORKFLOWS_ENABLED if hasattr(settings, 'EMAIL_WORKFLOWS_ENABLED') else True
|
2026-03-02 23:58:56 +01:00
|
|
|
|
|
|
|
|
HELPDESK_SKIP_CLASSIFICATIONS = {
|
|
|
|
|
'invoice',
|
|
|
|
|
'order_confirmation',
|
|
|
|
|
'freight_note',
|
|
|
|
|
'time_confirmation',
|
|
|
|
|
'newsletter',
|
|
|
|
|
'spam',
|
|
|
|
|
'bankruptcy',
|
|
|
|
|
'recording'
|
|
|
|
|
}
|
2026-04-01 21:34:58 +02:00
|
|
|
|
|
|
|
|
_SCAN_TOKEN_PATTERN = re.compile(r'\bBMCSCAN-[A-Z0-9-]{10,100}\b', re.IGNORECASE)
|
2025-12-15 12:28:12 +01:00
|
|
|
|
|
|
|
|
async def execute_workflows(self, email_data: Dict) -> Dict:
|
|
|
|
|
"""
|
|
|
|
|
Execute all matching workflows for an email
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
email_data: Email dict with classification, confidence_score, id, etc.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Dict with execution results
|
|
|
|
|
"""
|
|
|
|
|
if not self.enabled:
|
|
|
|
|
logger.info("⏭️ Workflows disabled")
|
|
|
|
|
return {'status': 'disabled', 'workflows_executed': 0}
|
|
|
|
|
|
|
|
|
|
email_id = email_data.get('id')
|
2026-03-23 20:35:15 +01:00
|
|
|
classification = (email_data.get('classification') or '').strip().lower()
|
2025-12-15 12:28:12 +01:00
|
|
|
confidence = email_data.get('confidence_score', 0.0)
|
2026-03-23 20:35:15 +01:00
|
|
|
has_hint = self.has_helpdesk_routing_hint(email_data)
|
2025-12-15 12:28:12 +01:00
|
|
|
|
2026-03-23 20:35:15 +01:00
|
|
|
if not email_id:
|
|
|
|
|
logger.warning("⚠️ Cannot execute workflows: missing email_id")
|
2025-12-15 12:28:12 +01:00
|
|
|
return {'status': 'skipped', 'reason': 'missing_data'}
|
2026-03-23 20:35:15 +01:00
|
|
|
|
|
|
|
|
if not classification:
|
|
|
|
|
if has_hint:
|
|
|
|
|
classification = 'general'
|
|
|
|
|
email_data['classification'] = classification
|
|
|
|
|
else:
|
|
|
|
|
logger.warning("⚠️ Cannot execute workflows: missing classification")
|
|
|
|
|
return {'status': 'skipped', 'reason': 'missing_data'}
|
2025-12-15 12:28:12 +01:00
|
|
|
|
|
|
|
|
logger.info(f"🔄 Finding workflows for classification: {classification} (confidence: {confidence})")
|
2026-01-11 19:23:21 +01:00
|
|
|
|
|
|
|
|
results = {
|
|
|
|
|
'status': 'executed',
|
|
|
|
|
'workflows_executed': 0,
|
|
|
|
|
'workflows_succeeded': 0,
|
|
|
|
|
'workflows_failed': 0,
|
|
|
|
|
'details': []
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Special System Workflow: Bankruptcy Analysis
|
|
|
|
|
# Parses Statstidende emails for CVR numbers to link to customers
|
|
|
|
|
if classification == 'bankruptcy':
|
|
|
|
|
sys_result = await self._handle_bankruptcy_analysis(email_data)
|
|
|
|
|
results['details'].append(sys_result)
|
|
|
|
|
|
|
|
|
|
if sys_result['status'] == 'completed':
|
|
|
|
|
results['workflows_executed'] += 1
|
|
|
|
|
results['workflows_succeeded'] += 1
|
|
|
|
|
logger.info("✅ Bankruptcy system workflow executed successfully")
|
2026-03-02 23:58:56 +01:00
|
|
|
|
|
|
|
|
# Special System Workflow: Helpdesk SAG routing
|
2026-04-01 21:34:58 +02:00
|
|
|
# - If SAG/tråd-hint findes => forsøg routing til eksisterende sag
|
|
|
|
|
# - Newsletters/spam skip routing ENTIRELY (even with thread hints)
|
2026-03-23 20:35:15 +01:00
|
|
|
# - Uden hints: brug klassifikationsgating som før
|
2026-04-01 21:34:58 +02:00
|
|
|
HARD_SKIP = {'newsletter', 'spam'}
|
2026-03-23 20:35:15 +01:00
|
|
|
should_try_helpdesk = (
|
2026-04-01 21:34:58 +02:00
|
|
|
classification not in HARD_SKIP
|
|
|
|
|
and (
|
|
|
|
|
classification not in self.HELPDESK_SKIP_CLASSIFICATIONS
|
|
|
|
|
or has_hint
|
|
|
|
|
)
|
2026-03-23 20:35:15 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if should_try_helpdesk:
|
2026-03-02 23:58:56 +01:00
|
|
|
helpdesk_result = await self._handle_helpdesk_sag_routing(email_data)
|
|
|
|
|
if helpdesk_result:
|
|
|
|
|
results['details'].append(helpdesk_result)
|
|
|
|
|
if helpdesk_result.get('status') == 'completed':
|
|
|
|
|
results['workflows_executed'] += 1
|
|
|
|
|
results['workflows_succeeded'] += 1
|
|
|
|
|
logger.info("✅ Helpdesk SAG routing workflow executed")
|
2025-12-15 12:28:12 +01:00
|
|
|
|
|
|
|
|
# Find matching workflows
|
|
|
|
|
workflows = await self._find_matching_workflows(email_data)
|
|
|
|
|
|
2026-01-11 19:23:21 +01:00
|
|
|
if not workflows and results['workflows_executed'] == 0:
|
2025-12-15 12:28:12 +01:00
|
|
|
logger.info(f"✅ No workflows match classification: {classification}")
|
|
|
|
|
return {'status': 'no_match', 'workflows_executed': 0}
|
|
|
|
|
|
|
|
|
|
logger.info(f"📋 Found {len(workflows)} matching workflow(s)")
|
|
|
|
|
|
2026-01-11 19:23:21 +01:00
|
|
|
# Initialize results if not already (moved up)
|
|
|
|
|
# results = { ... } (already initialized in my thought, but need to move init up)
|
2025-12-15 12:28:12 +01:00
|
|
|
|
|
|
|
|
# Execute workflows in priority order
|
|
|
|
|
for workflow in workflows:
|
|
|
|
|
result = await self._execute_workflow(workflow, email_data)
|
|
|
|
|
results['details'].append(result)
|
|
|
|
|
results['workflows_executed'] += 1
|
|
|
|
|
|
|
|
|
|
if result['status'] == 'completed':
|
|
|
|
|
results['workflows_succeeded'] += 1
|
|
|
|
|
else:
|
|
|
|
|
results['workflows_failed'] += 1
|
|
|
|
|
|
|
|
|
|
# Stop if workflow has stop_on_match=true
|
|
|
|
|
if workflow.get('stop_on_match') and result['status'] == 'completed':
|
|
|
|
|
logger.info(f"🛑 Stopping workflow chain (stop_on_match=true)")
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
logger.info(f"✅ Workflow execution complete: {results['workflows_succeeded']}/{results['workflows_executed']} succeeded")
|
|
|
|
|
return results
|
|
|
|
|
|
2026-01-11 19:23:21 +01:00
|
|
|
async def _handle_bankruptcy_analysis(self, email_data: Dict) -> Dict:
|
|
|
|
|
"""
|
|
|
|
|
System workflow for bankruptcy emails (Statstidende).
|
|
|
|
|
Parses body for CVR numbers and links to customer if match found.
|
|
|
|
|
Returns: Execution result dict
|
|
|
|
|
"""
|
|
|
|
|
logger.info("🕵️ Running Bankruptcy Analysis on email")
|
|
|
|
|
|
|
|
|
|
# Combine subject, body and html for search
|
|
|
|
|
text_content = (
|
|
|
|
|
f"{email_data.get('subject', '')} "
|
|
|
|
|
f"{email_data.get('body_text', '')} "
|
|
|
|
|
f"{email_data.get('body_html', '')}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Regex for CVR numbers (8 digits, possibly preceded by 'CVR-nr.:')
|
|
|
|
|
# We look for explicit 'CVR-nr.: XXXXXXXX' pattern first as it's more reliable
|
|
|
|
|
cvr_matches = re.findall(r'CVR-nr\.?:?\s*(\d{8})', text_content, re.IGNORECASE)
|
|
|
|
|
|
|
|
|
|
if not cvr_matches:
|
|
|
|
|
logger.info("✅ No CVR numbers found in bankruptcy email")
|
|
|
|
|
return {'status': 'skipped', 'reason': 'no_cvr_found'}
|
|
|
|
|
|
|
|
|
|
unique_cvrs = list(set(cvr_matches))
|
|
|
|
|
logger.info(f"📋 Found CVRs in email: {unique_cvrs}")
|
|
|
|
|
|
|
|
|
|
if not unique_cvrs:
|
|
|
|
|
return {'status': 'skipped', 'reason': 'no_unique_cvr'}
|
|
|
|
|
|
|
|
|
|
# Check if any CVRs belong to our customers
|
|
|
|
|
# Safe parameterized query for variable list length
|
|
|
|
|
format_strings = ','.join(['%s'] * len(unique_cvrs))
|
|
|
|
|
query = f"""
|
|
|
|
|
SELECT id, name, cvr_number
|
|
|
|
|
FROM customers
|
|
|
|
|
WHERE cvr_number IN ({format_strings})
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
matching_customers = execute_query(query, tuple(unique_cvrs))
|
|
|
|
|
|
|
|
|
|
if not matching_customers:
|
|
|
|
|
logger.info("✅ No matching customers found for bankruptcy CVRs - Marking as processed")
|
|
|
|
|
execute_update(
|
|
|
|
|
"""UPDATE email_messages
|
|
|
|
|
SET status = 'processed', folder = 'Processed',
|
|
|
|
|
processed_at = CURRENT_TIMESTAMP, auto_processed = true
|
|
|
|
|
WHERE id = %s""",
|
|
|
|
|
(email_data['id'],)
|
|
|
|
|
)
|
|
|
|
|
return {'status': 'completed', 'action': 'marked_processed_no_match'}
|
|
|
|
|
|
|
|
|
|
logger.warning(f"⚠️ FOUND BANKRUPTCY MATCHES: {[c['name'] for c in matching_customers]}")
|
|
|
|
|
|
|
|
|
|
# Link to the first customer found (limitation of 1:1 schema)
|
|
|
|
|
first_match = matching_customers[0]
|
|
|
|
|
|
|
|
|
|
execute_update(
|
|
|
|
|
"""UPDATE email_messages
|
|
|
|
|
SET customer_id = %s, status = 'processed', folder = 'Processed',
|
|
|
|
|
processed_at = CURRENT_TIMESTAMP, auto_processed = true
|
|
|
|
|
WHERE id = %s""",
|
|
|
|
|
(first_match['id'], email_data['id'])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logger.info(f"🔗 Linked bankruptcy email {email_data['id']} to customer {first_match['name']} ({first_match['id']}) and marked as processed")
|
|
|
|
|
|
|
|
|
|
if len(matching_customers) > 1:
|
|
|
|
|
logger.warning(f"❗ Email contained multiple customer matches! Only linked to first one.")
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'status': 'completed',
|
|
|
|
|
'action': 'linked_customer',
|
|
|
|
|
'customer_name': first_match['name']
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-02 23:58:56 +01:00
|
|
|
def _extract_sender_domain(self, email_data: Dict) -> Optional[str]:
|
|
|
|
|
sender_email = (email_data.get('sender_email') or '').strip().lower()
|
|
|
|
|
if '@' not in sender_email:
|
|
|
|
|
return None
|
|
|
|
|
domain = sender_email.split('@', 1)[1].strip()
|
|
|
|
|
if domain.startswith('www.'):
|
|
|
|
|
domain = domain[4:]
|
|
|
|
|
return domain or None
|
|
|
|
|
|
2026-03-23 20:35:15 +01:00
|
|
|
def has_helpdesk_routing_hint(self, email_data: Dict) -> bool:
|
2026-04-01 21:34:58 +02:00
|
|
|
"""Return True when email has explicit routing hints (SAG tag, BMCid, or reply headers).
|
|
|
|
|
|
|
|
|
|
NOTE: A bare thread_key (Graph conversationId) is NOT a routing hint
|
|
|
|
|
because every Graph email has one, including newsletters and spam.
|
|
|
|
|
Only actual reply indicators (In-Reply-To, References), explicit
|
|
|
|
|
SAG tags, or BMCid markers count as hints."""
|
|
|
|
|
if self._extract_bmc_id(email_data):
|
2026-03-23 20:35:15 +01:00
|
|
|
return True
|
|
|
|
|
|
2026-04-01 21:34:58 +02:00
|
|
|
if self._extract_sag_id(email_data):
|
2026-03-23 20:35:15 +01:00
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
if self._normalize_message_id(email_data.get('in_reply_to')):
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
if self._extract_reference_message_ids(email_data.get('email_references')):
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
2026-04-01 21:34:58 +02:00
|
|
|
def _extract_bmc_id(self, email_data: Dict) -> Optional[Dict[str, Any]]:
|
|
|
|
|
"""Extract structured BMCid from email body/subject.
|
|
|
|
|
|
|
|
|
|
Returns dict with 'sag_id' (int) and 'thread_suffix' (str, e.g. '472193')
|
|
|
|
|
or None if no BMCid is found.
|
|
|
|
|
"""
|
|
|
|
|
candidates = [
|
|
|
|
|
email_data.get('body_html') or '',
|
|
|
|
|
email_data.get('body_text') or '',
|
|
|
|
|
email_data.get('subject') or '',
|
|
|
|
|
]
|
|
|
|
|
pattern = r'\bBMCid\s*:\s*s(\d+)t(\d+)\b'
|
|
|
|
|
for value in candidates:
|
|
|
|
|
match = re.search(pattern, value, re.IGNORECASE)
|
|
|
|
|
if match:
|
|
|
|
|
return {
|
|
|
|
|
'sag_id': int(match.group(1)),
|
|
|
|
|
'thread_suffix': match.group(2),
|
|
|
|
|
}
|
|
|
|
|
return None
|
|
|
|
|
|
2026-03-02 23:58:56 +01:00
|
|
|
def _extract_sag_id(self, email_data: Dict) -> Optional[int]:
|
2026-04-01 21:34:58 +02:00
|
|
|
# First try structured BMCid (most reliable)
|
|
|
|
|
bmc_id = self._extract_bmc_id(email_data)
|
|
|
|
|
if bmc_id:
|
|
|
|
|
return bmc_id['sag_id']
|
|
|
|
|
|
2026-03-02 23:58:56 +01:00
|
|
|
candidates = [
|
|
|
|
|
email_data.get('subject') or '',
|
|
|
|
|
email_data.get('in_reply_to') or '',
|
2026-03-23 20:35:15 +01:00
|
|
|
email_data.get('email_references') or '',
|
|
|
|
|
email_data.get('body_text') or '',
|
|
|
|
|
email_data.get('body_html') or '',
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
# Accept both strict and human variants used in real subjects, e.g.:
|
2026-04-01 21:34:58 +02:00
|
|
|
# - [SAG-53] (hidden/subject prefix)
|
2026-03-23 20:35:15 +01:00
|
|
|
# - SAG-53
|
|
|
|
|
# - SAG #53
|
|
|
|
|
# - Sag 53
|
|
|
|
|
sag_patterns = [
|
2026-04-01 21:34:58 +02:00
|
|
|
r'\[SAG-(\d+)\]',
|
2026-03-23 20:35:15 +01:00
|
|
|
r'\bSAG-(\d+)\b',
|
|
|
|
|
r'\bSAG\s*#\s*(\d+)\b',
|
|
|
|
|
r'\bSAG\s+(\d+)\b',
|
2026-03-02 23:58:56 +01:00
|
|
|
]
|
|
|
|
|
|
|
|
|
|
for value in candidates:
|
2026-03-23 20:35:15 +01:00
|
|
|
for pattern in sag_patterns:
|
|
|
|
|
match = re.search(pattern, value, re.IGNORECASE)
|
|
|
|
|
if match:
|
|
|
|
|
return int(match.group(1))
|
2026-03-02 23:58:56 +01:00
|
|
|
return None
|
|
|
|
|
|
2026-03-03 10:42:16 +01:00
|
|
|
def _normalize_message_id(self, value: Optional[str]) -> Optional[str]:
|
|
|
|
|
if not value:
|
|
|
|
|
return None
|
|
|
|
|
normalized = re.sub(r'[<>\s]', '', str(value)).lower().strip()
|
|
|
|
|
return normalized or None
|
|
|
|
|
|
|
|
|
|
def _extract_thread_message_ids(self, email_data: Dict) -> List[str]:
|
|
|
|
|
tokens: List[str] = []
|
|
|
|
|
|
|
|
|
|
in_reply_to = self._normalize_message_id(email_data.get('in_reply_to'))
|
|
|
|
|
if in_reply_to:
|
|
|
|
|
tokens.append(in_reply_to)
|
|
|
|
|
|
|
|
|
|
raw_references = (email_data.get('email_references') or '').strip()
|
|
|
|
|
if raw_references:
|
|
|
|
|
for ref in re.split(r'[\s,]+', raw_references):
|
|
|
|
|
normalized_ref = self._normalize_message_id(ref)
|
|
|
|
|
if normalized_ref:
|
|
|
|
|
tokens.append(normalized_ref)
|
|
|
|
|
|
|
|
|
|
# De-duplicate while preserving order
|
|
|
|
|
return list(dict.fromkeys(tokens))
|
|
|
|
|
|
2026-03-23 20:35:15 +01:00
|
|
|
def _extract_reference_message_ids(self, raw_references: Optional[str]) -> List[str]:
|
|
|
|
|
tokens: List[str] = []
|
|
|
|
|
if raw_references:
|
|
|
|
|
for ref in re.split(r'[\s,]+', str(raw_references).strip()):
|
|
|
|
|
normalized_ref = self._normalize_message_id(ref)
|
|
|
|
|
if normalized_ref:
|
|
|
|
|
tokens.append(normalized_ref)
|
|
|
|
|
return list(dict.fromkeys(tokens))
|
|
|
|
|
|
|
|
|
|
def _derive_thread_key(self, email_data: Dict) -> Optional[str]:
|
2026-03-30 07:50:15 +02:00
|
|
|
"""Derive stable conversation key: root References -> In-Reply-To -> explicit -> Message-ID."""
|
2026-03-23 20:35:15 +01:00
|
|
|
|
|
|
|
|
ref_ids = self._extract_reference_message_ids(email_data.get('email_references'))
|
|
|
|
|
if ref_ids:
|
|
|
|
|
return ref_ids[0]
|
|
|
|
|
|
|
|
|
|
in_reply_to = self._normalize_message_id(email_data.get('in_reply_to'))
|
|
|
|
|
if in_reply_to:
|
|
|
|
|
return in_reply_to
|
|
|
|
|
|
2026-03-30 07:50:15 +02:00
|
|
|
explicit = self._normalize_message_id(email_data.get('thread_key'))
|
|
|
|
|
if explicit:
|
|
|
|
|
return explicit
|
|
|
|
|
|
2026-03-23 20:35:15 +01:00
|
|
|
return self._normalize_message_id(email_data.get('message_id'))
|
|
|
|
|
|
|
|
|
|
def _find_sag_id_from_thread_key(self, thread_key: Optional[str]) -> Optional[int]:
|
|
|
|
|
if not thread_key:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# Backward compatibility when DB migration is not yet applied.
|
|
|
|
|
try:
|
|
|
|
|
rows = execute_query(
|
|
|
|
|
"""
|
|
|
|
|
SELECT se.sag_id
|
|
|
|
|
FROM sag_emails se
|
|
|
|
|
JOIN email_messages em ON em.id = se.email_id
|
|
|
|
|
WHERE em.deleted_at IS NULL
|
2026-04-01 21:34:58 +02:00
|
|
|
AND (
|
|
|
|
|
LOWER(REGEXP_REPLACE(COALESCE(em.thread_key, ''), '[<>\\s]', '', 'g')) = %s
|
|
|
|
|
OR LOWER(REGEXP_REPLACE(COALESCE(em.message_id, ''), '[<>\\s]', '', 'g')) = %s
|
|
|
|
|
)
|
2026-03-23 20:35:15 +01:00
|
|
|
ORDER BY se.created_at DESC
|
|
|
|
|
LIMIT 1
|
|
|
|
|
""",
|
2026-04-01 21:34:58 +02:00
|
|
|
(thread_key, thread_key)
|
2026-03-23 20:35:15 +01:00
|
|
|
)
|
|
|
|
|
return rows[0]['sag_id'] if rows else None
|
|
|
|
|
except Exception:
|
|
|
|
|
return None
|
|
|
|
|
|
2026-03-03 10:42:16 +01:00
|
|
|
def _find_sag_id_from_thread_headers(self, email_data: Dict) -> Optional[int]:
|
|
|
|
|
thread_message_ids = self._extract_thread_message_ids(email_data)
|
|
|
|
|
if not thread_message_ids:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
placeholders = ','.join(['%s'] * len(thread_message_ids))
|
|
|
|
|
rows = execute_query(
|
|
|
|
|
f"""
|
|
|
|
|
SELECT se.sag_id
|
|
|
|
|
FROM sag_emails se
|
|
|
|
|
JOIN email_messages em ON em.id = se.email_id
|
|
|
|
|
WHERE em.deleted_at IS NULL
|
|
|
|
|
AND LOWER(REGEXP_REPLACE(COALESCE(em.message_id, ''), '[<>\\s]', '', 'g')) IN ({placeholders})
|
|
|
|
|
ORDER BY se.created_at DESC
|
|
|
|
|
LIMIT 1
|
|
|
|
|
""",
|
|
|
|
|
tuple(thread_message_ids)
|
|
|
|
|
)
|
|
|
|
|
return rows[0]['sag_id'] if rows else None
|
|
|
|
|
|
2026-04-01 21:34:58 +02:00
|
|
|
# Sender domains that should never trigger customer-domain SAG creation.
|
|
|
|
|
# Includes own sending domain and common automated senders.
|
|
|
|
|
_IGNORED_SENDER_DOMAINS = {
|
|
|
|
|
'bmcnetworks.dk',
|
|
|
|
|
'bmchub.local',
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-02 23:58:56 +01:00
|
|
|
def _find_customer_by_domain(self, domain: str) -> Optional[Dict[str, Any]]:
|
|
|
|
|
if not domain:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
domain = domain.lower().strip()
|
2026-04-01 21:34:58 +02:00
|
|
|
|
|
|
|
|
# Never match the system's own sending domain as a customer
|
|
|
|
|
if domain in self._IGNORED_SENDER_DOMAINS:
|
|
|
|
|
return None
|
|
|
|
|
|
2026-03-02 23:58:56 +01:00
|
|
|
domain_alt = domain[4:] if domain.startswith('www.') else f"www.{domain}"
|
|
|
|
|
|
|
|
|
|
query = """
|
|
|
|
|
SELECT id, name
|
|
|
|
|
FROM customers
|
|
|
|
|
WHERE is_active = true
|
|
|
|
|
AND (
|
|
|
|
|
LOWER(TRIM(email_domain)) = %s
|
|
|
|
|
OR LOWER(TRIM(email_domain)) = %s
|
|
|
|
|
)
|
|
|
|
|
ORDER BY id ASC
|
|
|
|
|
LIMIT 1
|
|
|
|
|
"""
|
|
|
|
|
rows = execute_query(query, (domain, domain_alt))
|
|
|
|
|
return rows[0] if rows else None
|
|
|
|
|
|
2026-04-01 21:34:58 +02:00
|
|
|
def _find_thread_key_by_bmc_suffix(self, sag_id: int, thread_suffix: str) -> Optional[str]:
|
|
|
|
|
"""Find the thread_key of an outgoing email whose BMCid matches s{sag_id}t{thread_suffix}."""
|
|
|
|
|
try:
|
|
|
|
|
# Legacy compatibility: older outbound emails used t001 when the
|
|
|
|
|
# provisional thread key was unknown. In that case, pick the most
|
|
|
|
|
# recent outbound thread key in the same case as best effort.
|
|
|
|
|
if str(thread_suffix) == '001':
|
|
|
|
|
fallback = execute_query(
|
|
|
|
|
"""
|
|
|
|
|
SELECT em.thread_key
|
|
|
|
|
FROM sag_emails se
|
|
|
|
|
JOIN email_messages em ON em.id = se.email_id
|
|
|
|
|
WHERE se.sag_id = %s
|
|
|
|
|
AND em.deleted_at IS NULL
|
|
|
|
|
AND em.thread_key IS NOT NULL
|
|
|
|
|
AND TRIM(em.thread_key) != ''
|
|
|
|
|
AND LOWER(COALESCE(em.sender_email, '')) = %s
|
|
|
|
|
ORDER BY em.received_date DESC
|
|
|
|
|
LIMIT 1
|
|
|
|
|
""",
|
|
|
|
|
(sag_id, 'noreply@bmcnetworks.dk'),
|
|
|
|
|
)
|
|
|
|
|
if fallback and fallback[0].get('thread_key'):
|
|
|
|
|
return fallback[0]['thread_key']
|
|
|
|
|
|
|
|
|
|
rows = execute_query(
|
|
|
|
|
"""
|
|
|
|
|
SELECT em.thread_key
|
|
|
|
|
FROM sag_emails se
|
|
|
|
|
JOIN email_messages em ON em.id = se.email_id
|
|
|
|
|
WHERE se.sag_id = %s
|
|
|
|
|
AND em.deleted_at IS NULL
|
|
|
|
|
AND em.thread_key IS NOT NULL
|
|
|
|
|
AND TRIM(em.thread_key) != ''
|
|
|
|
|
ORDER BY em.received_date DESC
|
|
|
|
|
""",
|
|
|
|
|
(sag_id,),
|
|
|
|
|
)
|
|
|
|
|
if not rows:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# Rebuild the BMCid suffix for each candidate thread_key
|
|
|
|
|
# and return the one that matches our target suffix.
|
|
|
|
|
for row in rows:
|
|
|
|
|
tk = row['thread_key']
|
|
|
|
|
normalized = re.sub(r"[^a-z0-9]+", "", str(tk).lower())
|
|
|
|
|
if not normalized:
|
|
|
|
|
continue
|
|
|
|
|
digest = hashlib.sha1(normalized.encode("utf-8")).hexdigest()
|
|
|
|
|
candidate_suffix = str((int(digest[:8], 16) % 900000) + 100000)
|
|
|
|
|
if candidate_suffix == thread_suffix:
|
|
|
|
|
return tk
|
|
|
|
|
return None
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.warning("⚠️ Failed BMCid thread_key lookup: %s", e)
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _update_email_thread_key(self, email_id: int, thread_key: str) -> None:
|
|
|
|
|
"""Set the thread_key on an email so it groups correctly."""
|
|
|
|
|
execute_update(
|
|
|
|
|
"UPDATE email_messages SET thread_key = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s",
|
|
|
|
|
(thread_key, email_id),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async def _finalize_sag_routing(
|
|
|
|
|
self, email_id: int, email_data: Dict, sag_id: int, routing_source: str
|
|
|
|
|
) -> Dict[str, Any]:
|
|
|
|
|
"""Link an email to an existing SAG and mark as processed."""
|
|
|
|
|
case_rows = execute_query(
|
|
|
|
|
"SELECT id, customer_id, titel FROM sag_sager WHERE id = %s AND deleted_at IS NULL",
|
|
|
|
|
(sag_id,),
|
|
|
|
|
)
|
|
|
|
|
if not case_rows:
|
|
|
|
|
logger.warning("⚠️ Email %s referenced SAG-%s but case was not found", email_id, sag_id)
|
|
|
|
|
return {'status': 'skipped', 'action': 'sag_id_not_found', 'sag_id': sag_id}
|
|
|
|
|
|
|
|
|
|
case = case_rows[0]
|
|
|
|
|
self._add_helpdesk_comment(sag_id, email_data)
|
|
|
|
|
self._link_email_to_sag(sag_id, email_id)
|
|
|
|
|
|
|
|
|
|
execute_update(
|
|
|
|
|
"""
|
|
|
|
|
UPDATE email_messages
|
|
|
|
|
SET linked_case_id = %s,
|
|
|
|
|
customer_id = COALESCE(customer_id, %s),
|
|
|
|
|
status = 'processed',
|
|
|
|
|
folder = 'Processed',
|
|
|
|
|
processed_at = CURRENT_TIMESTAMP,
|
|
|
|
|
auto_processed = true
|
|
|
|
|
WHERE id = %s
|
|
|
|
|
""",
|
|
|
|
|
(sag_id, case.get('customer_id'), email_id),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
token_for_attach = None
|
|
|
|
|
token_route = self._resolve_scan_token_route(email_id, email_data)
|
|
|
|
|
if token_route:
|
|
|
|
|
token_for_attach = token_route.get('token')
|
|
|
|
|
self._auto_attach_scanner_email(email_id, sag_id, token_for_attach)
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'status': 'completed',
|
|
|
|
|
'action': 'updated_existing_sag',
|
|
|
|
|
'sag_id': sag_id,
|
|
|
|
|
'customer_id': case.get('customer_id'),
|
|
|
|
|
'routing_source': routing_source,
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-02 23:58:56 +01:00
|
|
|
def _link_email_to_sag(self, sag_id: int, email_id: int) -> None:
|
|
|
|
|
execute_update(
|
|
|
|
|
"""
|
|
|
|
|
INSERT INTO sag_emails (sag_id, email_id)
|
|
|
|
|
SELECT %s, %s
|
|
|
|
|
WHERE NOT EXISTS (
|
|
|
|
|
SELECT 1 FROM sag_emails WHERE sag_id = %s AND email_id = %s
|
|
|
|
|
)
|
|
|
|
|
""",
|
|
|
|
|
(sag_id, email_id, sag_id, email_id)
|
|
|
|
|
)
|
|
|
|
|
|
2026-04-01 21:34:58 +02:00
|
|
|
def _extract_scan_tokens(self, *values: Optional[str]) -> List[str]:
|
|
|
|
|
tokens: List[str] = []
|
|
|
|
|
for value in values:
|
|
|
|
|
if not value:
|
|
|
|
|
continue
|
|
|
|
|
found = self._SCAN_TOKEN_PATTERN.findall(str(value))
|
|
|
|
|
if found:
|
|
|
|
|
tokens.extend(token.upper() for token in found)
|
|
|
|
|
return list(dict.fromkeys(tokens))
|
|
|
|
|
|
|
|
|
|
def _resolve_scan_token_route(self, email_id: int, email_data: Dict) -> Optional[Dict[str, Any]]:
|
|
|
|
|
text_tokens = self._extract_scan_tokens(
|
|
|
|
|
email_data.get('subject'),
|
|
|
|
|
email_data.get('body_text'),
|
|
|
|
|
email_data.get('body_html'),
|
|
|
|
|
email_data.get('in_reply_to'),
|
|
|
|
|
email_data.get('email_references'),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
filename_tokens: List[str] = []
|
|
|
|
|
attachment_content_tokens: List[str] = []
|
|
|
|
|
try:
|
|
|
|
|
attachment_rows = execute_query(
|
|
|
|
|
"""
|
|
|
|
|
SELECT filename, content_type, content_data, file_path
|
|
|
|
|
FROM email_attachments
|
|
|
|
|
WHERE email_id = %s
|
|
|
|
|
ORDER BY id ASC
|
|
|
|
|
""",
|
|
|
|
|
(email_id,),
|
|
|
|
|
) or []
|
|
|
|
|
for row in attachment_rows:
|
|
|
|
|
filename_tokens.extend(self._extract_scan_tokens(row.get('filename')))
|
|
|
|
|
attachment_content_tokens.extend(
|
|
|
|
|
self._extract_scan_tokens_from_attachment(
|
|
|
|
|
filename=row.get('filename'),
|
|
|
|
|
content_type=row.get('content_type'),
|
|
|
|
|
content_data=row.get('content_data'),
|
|
|
|
|
file_path=row.get('file_path'),
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
logger.warning("⚠️ Failed to inspect attachment filenames for scan token: %s", exc)
|
|
|
|
|
|
|
|
|
|
all_tokens = list(dict.fromkeys(text_tokens + filename_tokens + attachment_content_tokens))
|
|
|
|
|
if not all_tokens:
|
|
|
|
|
return self._resolve_scan_route_from_scanner_headers(email_data)
|
|
|
|
|
|
|
|
|
|
placeholders = ','.join(['%s'] * len(all_tokens))
|
|
|
|
|
try:
|
|
|
|
|
rows = execute_query(
|
|
|
|
|
f"""
|
|
|
|
|
SELECT token, sag_id, token_type
|
|
|
|
|
FROM sag_document_tokens
|
|
|
|
|
WHERE token IN ({placeholders})
|
|
|
|
|
AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)
|
|
|
|
|
ORDER BY consumed_at IS NULL DESC, created_at DESC
|
|
|
|
|
LIMIT 1
|
|
|
|
|
""",
|
|
|
|
|
tuple(all_tokens),
|
|
|
|
|
)
|
|
|
|
|
if rows:
|
|
|
|
|
return rows[0]
|
|
|
|
|
|
|
|
|
|
# Fallback for scanner workflows where token only exists in barcode image
|
|
|
|
|
# and therefore not in plain text metadata.
|
|
|
|
|
return self._resolve_scan_route_from_scanner_headers(email_data)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
logger.warning("⚠️ Scan token lookup failed: %s", exc)
|
|
|
|
|
return self._resolve_scan_route_from_scanner_headers(email_data)
|
|
|
|
|
|
|
|
|
|
def _extract_scan_tokens_from_attachment(
|
|
|
|
|
self,
|
|
|
|
|
filename: Optional[str],
|
|
|
|
|
content_type: Optional[str],
|
|
|
|
|
content_data: Optional[Any],
|
|
|
|
|
file_path: Optional[str],
|
|
|
|
|
) -> List[str]:
|
|
|
|
|
tokens: List[str] = []
|
|
|
|
|
|
|
|
|
|
payload: Optional[bytes] = None
|
|
|
|
|
if content_data is not None:
|
|
|
|
|
try:
|
|
|
|
|
payload = bytes(content_data)
|
|
|
|
|
except Exception:
|
|
|
|
|
payload = None
|
|
|
|
|
|
|
|
|
|
if payload is None and file_path:
|
|
|
|
|
try:
|
|
|
|
|
payload = Path(file_path).read_bytes()
|
|
|
|
|
except Exception:
|
|
|
|
|
payload = None
|
|
|
|
|
|
|
|
|
|
if not payload:
|
|
|
|
|
return tokens
|
|
|
|
|
|
|
|
|
|
# 1) Cheap text extraction directly from bytes catches tokens in OCR-layer PDFs,
|
|
|
|
|
# plain text files, or metadata-rich attachments.
|
|
|
|
|
try:
|
|
|
|
|
sample = payload[:1_500_000]
|
|
|
|
|
tokens.extend(self._extract_scan_tokens(sample.decode('utf-8', errors='ignore')))
|
|
|
|
|
tokens.extend(self._extract_scan_tokens(sample.decode('latin-1', errors='ignore')))
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
ext = (Path(str(filename or '')).suffix or '').lower().strip('.')
|
|
|
|
|
ctype = (content_type or '').lower()
|
|
|
|
|
|
|
|
|
|
# 2) PDF text-layer extraction (when available) for scanned documents with OCR.
|
|
|
|
|
if ext == 'pdf' or 'pdf' in ctype:
|
|
|
|
|
try:
|
|
|
|
|
from pypdf import PdfReader # type: ignore
|
|
|
|
|
|
|
|
|
|
reader = PdfReader(io.BytesIO(payload))
|
|
|
|
|
text_chunks: List[str] = []
|
|
|
|
|
for page in reader.pages[:5]:
|
|
|
|
|
extracted = page.extract_text() or ''
|
|
|
|
|
if extracted:
|
|
|
|
|
text_chunks.append(extracted)
|
|
|
|
|
if text_chunks:
|
|
|
|
|
tokens.extend(self._extract_scan_tokens("\n".join(text_chunks)))
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
# 3) Decode barcode directly from scanned attachments.
|
|
|
|
|
# This catches cases where BMCSCAN exists only as a barcode image.
|
|
|
|
|
try:
|
|
|
|
|
if ext == 'pdf' or 'pdf' in ctype:
|
|
|
|
|
tokens.extend(self._extract_scan_tokens_from_pdf_barcode(payload))
|
|
|
|
|
else:
|
|
|
|
|
tokens.extend(self._extract_scan_tokens_from_image_barcode(payload))
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
return list(dict.fromkeys(token.upper() for token in tokens if token))
|
|
|
|
|
|
|
|
|
|
def _extract_scan_tokens_from_image_barcode(self, payload: bytes) -> List[str]:
|
|
|
|
|
try:
|
|
|
|
|
from PIL import Image # type: ignore
|
|
|
|
|
from pyzbar.pyzbar import decode as zbar_decode # type: ignore
|
|
|
|
|
except Exception:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
image = Image.open(io.BytesIO(payload))
|
|
|
|
|
except Exception:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
decoded_tokens: List[str] = []
|
|
|
|
|
variants = [image]
|
|
|
|
|
try:
|
|
|
|
|
variants.append(image.convert('L'))
|
|
|
|
|
variants.append(image.convert('L').point(lambda p: 255 if p > 140 else 0))
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
for variant in variants:
|
|
|
|
|
try:
|
|
|
|
|
for item in zbar_decode(variant):
|
|
|
|
|
raw = item.data.decode('utf-8', errors='ignore')
|
|
|
|
|
decoded_tokens.extend(self._extract_scan_tokens(raw))
|
|
|
|
|
except Exception:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
return list(dict.fromkeys(decoded_tokens))
|
|
|
|
|
|
|
|
|
|
def _extract_scan_tokens_from_pdf_barcode(self, payload: bytes) -> List[str]:
|
|
|
|
|
try:
|
|
|
|
|
import pypdfium2 as pdfium # type: ignore
|
|
|
|
|
from pyzbar.pyzbar import decode as zbar_decode # type: ignore
|
|
|
|
|
except Exception:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
decoded_tokens: List[str] = []
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
doc = pdfium.PdfDocument(io.BytesIO(payload))
|
|
|
|
|
except Exception:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
page_count = min(len(doc), 3)
|
|
|
|
|
for page_index in range(page_count):
|
|
|
|
|
page = None
|
|
|
|
|
try:
|
|
|
|
|
page = doc.get_page(page_index)
|
|
|
|
|
bitmap = page.render(scale=2.2)
|
|
|
|
|
pil_image = bitmap.to_pil()
|
|
|
|
|
|
|
|
|
|
for variant in (pil_image, pil_image.convert('L')):
|
|
|
|
|
for item in zbar_decode(variant):
|
|
|
|
|
raw = item.data.decode('utf-8', errors='ignore')
|
|
|
|
|
decoded_tokens.extend(self._extract_scan_tokens(raw))
|
|
|
|
|
except Exception:
|
|
|
|
|
continue
|
|
|
|
|
finally:
|
|
|
|
|
try:
|
|
|
|
|
if page is not None:
|
|
|
|
|
page.close()
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
return list(dict.fromkeys(decoded_tokens))
|
|
|
|
|
|
|
|
|
|
def _resolve_scan_route_from_scanner_headers(self, email_data: Dict) -> Optional[Dict[str, Any]]:
|
|
|
|
|
"""Infer case route from scanner-generated message-id timestamps.
|
|
|
|
|
|
|
|
|
|
Some scanner/MFP flows only include the barcode token inside the attached image/PDF,
|
|
|
|
|
while headers contain a timestamped local message-id such as
|
|
|
|
|
`<1.20260401075731@172.16.31.35>`. We map that timestamp to the nearest recent,
|
|
|
|
|
unconsumed document token.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
header_values = [
|
|
|
|
|
email_data.get('in_reply_to'),
|
|
|
|
|
email_data.get('email_references'),
|
|
|
|
|
email_data.get('message_id'),
|
|
|
|
|
email_data.get('thread_key'),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
candidates: List[datetime] = []
|
|
|
|
|
ts_pattern = re.compile(r'(20\d{12})')
|
|
|
|
|
|
|
|
|
|
for raw in header_values:
|
|
|
|
|
if not raw:
|
|
|
|
|
continue
|
|
|
|
|
for match in ts_pattern.findall(str(raw)):
|
|
|
|
|
try:
|
|
|
|
|
candidates.append(datetime.strptime(match, "%Y%m%d%H%M%S"))
|
|
|
|
|
except ValueError:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if not candidates:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
for ts in candidates:
|
|
|
|
|
try:
|
|
|
|
|
rows = execute_query(
|
|
|
|
|
"""
|
|
|
|
|
SELECT token, sag_id, token_type, created_at
|
|
|
|
|
FROM sag_document_tokens
|
|
|
|
|
WHERE consumed_at IS NULL
|
|
|
|
|
AND created_at BETWEEN %s::timestamp - INTERVAL '90 minutes'
|
|
|
|
|
AND %s::timestamp + INTERVAL '20 minutes'
|
|
|
|
|
ORDER BY ABS(EXTRACT(EPOCH FROM (created_at - %s::timestamp))) ASC,
|
|
|
|
|
CASE WHEN token_type = 'work_order' THEN 0 ELSE 1 END,
|
|
|
|
|
id DESC
|
|
|
|
|
LIMIT 1
|
|
|
|
|
""",
|
|
|
|
|
(ts, ts, ts),
|
|
|
|
|
) or []
|
|
|
|
|
if rows:
|
|
|
|
|
row = rows[0]
|
|
|
|
|
logger.info(
|
|
|
|
|
"🔎 Inferred scanner route via header timestamp %s -> SAG-%s (%s)",
|
|
|
|
|
ts.isoformat(),
|
|
|
|
|
row.get('sag_id'),
|
|
|
|
|
row.get('token'),
|
|
|
|
|
)
|
|
|
|
|
return {
|
|
|
|
|
'token': row.get('token'),
|
|
|
|
|
'sag_id': row.get('sag_id'),
|
|
|
|
|
'token_type': row.get('token_type'),
|
|
|
|
|
}
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
logger.warning("⚠️ Scanner header timestamp route lookup failed: %s", exc)
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _copy_email_attachments_to_case(self, email_id: int, sag_id: int, source_token: Optional[str]) -> int:
|
|
|
|
|
attachments = execute_query(
|
|
|
|
|
"""
|
|
|
|
|
SELECT filename, content_type, size_bytes, file_path, content_data
|
|
|
|
|
FROM email_attachments
|
|
|
|
|
WHERE email_id = %s
|
|
|
|
|
ORDER BY id ASC
|
|
|
|
|
""",
|
|
|
|
|
(email_id,),
|
|
|
|
|
) or []
|
|
|
|
|
if not attachments:
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
upload_base = Path(settings.UPLOAD_DIR).resolve()
|
|
|
|
|
(upload_base / "sag_files").mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
has_source_email = table_has_column("sag_files", "source_email_id")
|
|
|
|
|
has_source_type = table_has_column("sag_files", "source_type")
|
|
|
|
|
has_source_token = table_has_column("sag_files", "source_token")
|
|
|
|
|
|
|
|
|
|
copied = 0
|
|
|
|
|
for attachment in attachments:
|
|
|
|
|
filename = Path(attachment.get('filename') or 'scanned-document.bin').name
|
|
|
|
|
|
|
|
|
|
if has_source_email:
|
|
|
|
|
existing = execute_query(
|
|
|
|
|
"""
|
|
|
|
|
SELECT 1
|
|
|
|
|
FROM sag_files
|
|
|
|
|
WHERE sag_id = %s
|
|
|
|
|
AND source_email_id = %s
|
|
|
|
|
AND filename = %s
|
|
|
|
|
LIMIT 1
|
|
|
|
|
""",
|
|
|
|
|
(sag_id, email_id, filename),
|
|
|
|
|
) or []
|
|
|
|
|
if existing:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
payload = attachment.get('content_data')
|
|
|
|
|
if payload is None and attachment.get('file_path'):
|
|
|
|
|
try:
|
|
|
|
|
payload = Path(attachment['file_path']).read_bytes()
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
logger.warning("⚠️ Could not read attachment file (%s): %s", filename, exc)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if payload is None:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
raw_payload = bytes(payload)
|
|
|
|
|
stored_name = f"sag_files/{uuid4().hex}_{filename}"
|
|
|
|
|
target_path = upload_base / stored_name
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
target_path.write_bytes(raw_payload)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
logger.warning("⚠️ Could not write case file from attachment (%s): %s", filename, exc)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
columns = ["sag_id", "filename", "content_type", "size_bytes", "stored_name"]
|
|
|
|
|
values: List[Any] = [
|
|
|
|
|
sag_id,
|
|
|
|
|
filename,
|
|
|
|
|
attachment.get('content_type') or 'application/octet-stream',
|
|
|
|
|
attachment.get('size_bytes') or len(raw_payload),
|
|
|
|
|
stored_name,
|
|
|
|
|
]
|
|
|
|
|
if has_source_email:
|
|
|
|
|
columns.append("source_email_id")
|
|
|
|
|
values.append(email_id)
|
|
|
|
|
if has_source_type:
|
|
|
|
|
columns.append("source_type")
|
|
|
|
|
values.append("scanner_email")
|
|
|
|
|
if has_source_token:
|
|
|
|
|
columns.append("source_token")
|
|
|
|
|
values.append(source_token)
|
|
|
|
|
|
|
|
|
|
execute_query(
|
|
|
|
|
f"INSERT INTO sag_files ({', '.join(columns)}) VALUES ({', '.join(['%s'] * len(values))})",
|
|
|
|
|
tuple(values),
|
|
|
|
|
)
|
|
|
|
|
copied += 1
|
|
|
|
|
|
|
|
|
|
return copied
|
|
|
|
|
|
|
|
|
|
def _auto_attach_scanner_email(self, email_id: int, sag_id: int, token: Optional[str]) -> None:
|
|
|
|
|
try:
|
|
|
|
|
copied = self._copy_email_attachments_to_case(email_id, sag_id, token)
|
|
|
|
|
if copied > 0:
|
|
|
|
|
logger.info("📎 Auto-attached %s attachment(s) from email %s to SAG-%s", copied, email_id, sag_id)
|
|
|
|
|
|
|
|
|
|
if token:
|
|
|
|
|
execute_update(
|
|
|
|
|
"""
|
|
|
|
|
UPDATE sag_document_tokens
|
|
|
|
|
SET consumed_at = COALESCE(consumed_at, CURRENT_TIMESTAMP),
|
|
|
|
|
consumed_email_id = COALESCE(consumed_email_id, %s)
|
|
|
|
|
WHERE token = %s
|
|
|
|
|
""",
|
|
|
|
|
(email_id, token),
|
|
|
|
|
)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
logger.warning("⚠️ Scanner auto-attach failed for email %s: %s", email_id, exc)
|
|
|
|
|
|
2026-03-23 20:35:15 +01:00
|
|
|
def _strip_quoted_email_text(self, body_text: str) -> str:
|
|
|
|
|
"""Return only the newest reply content (remove quoted history/signatures)."""
|
|
|
|
|
if not body_text:
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
text = str(body_text).replace("\r\n", "\n").replace("\r", "\n")
|
|
|
|
|
lines = text.split("\n")
|
|
|
|
|
cleaned_lines: List[str] = []
|
|
|
|
|
|
|
|
|
|
header_marker_re = re.compile(r'^(fra|from|sent|date|dato|to|til|emne|subject|cc):\s*', re.IGNORECASE)
|
|
|
|
|
original_message_re = re.compile(r'^(original message|oprindelig besked|videresendt besked)', re.IGNORECASE)
|
|
|
|
|
|
|
|
|
|
for idx, line in enumerate(lines):
|
|
|
|
|
stripped = line.strip()
|
|
|
|
|
lowered = stripped.lower()
|
|
|
|
|
|
|
|
|
|
if stripped.startswith('>'):
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
if original_message_re.match(stripped):
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
# Typical separator before quoted headers (e.g. "---" / "_____" lines)
|
|
|
|
|
if re.match(r'^[-_]{3,}$', stripped):
|
|
|
|
|
lookahead = lines[idx + 1: idx + 4]
|
|
|
|
|
if any(header_marker_re.match(candidate.strip()) for candidate in lookahead):
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
if idx > 0 and header_marker_re.match(stripped):
|
|
|
|
|
if lines[idx - 1].strip() == "":
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
cleaned_lines.append(line)
|
|
|
|
|
|
|
|
|
|
while cleaned_lines and cleaned_lines[-1].strip() == "":
|
|
|
|
|
cleaned_lines.pop()
|
|
|
|
|
|
|
|
|
|
return "\n".join(cleaned_lines).strip()
|
|
|
|
|
|
2026-04-03 00:50:34 +02:00
|
|
|
def _html_to_text(self, body_html: str) -> str:
|
|
|
|
|
"""Convert HTML email bodies to readable plain text fallback."""
|
|
|
|
|
if not body_html:
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
text = str(body_html)
|
|
|
|
|
text = re.sub(r'<(style|script)[^>]*>.*?</\1>', '', text, flags=re.IGNORECASE | re.DOTALL)
|
|
|
|
|
text = re.sub(r'<\s*br\s*/?>', '\n', text, flags=re.IGNORECASE)
|
|
|
|
|
text = re.sub(r'</\s*(p|div|li|tr|h[1-6])\s*>', '\n', text, flags=re.IGNORECASE)
|
|
|
|
|
text = re.sub(r'<[^>]+>', ' ', text)
|
|
|
|
|
text = html.unescape(text)
|
|
|
|
|
text = text.replace('\r\n', '\n').replace('\r', '\n')
|
|
|
|
|
text = re.sub(r'\n{3,}', '\n\n', text)
|
|
|
|
|
text = re.sub(r'[ \t]{2,}', ' ', text)
|
|
|
|
|
return text.strip()
|
|
|
|
|
|
|
|
|
|
def _extract_primary_email_body(self, email_data: Dict) -> str:
|
|
|
|
|
"""Return best-effort email body text for case/comment persistence."""
|
|
|
|
|
raw_text = (email_data.get('body_text') or '').strip()
|
|
|
|
|
if raw_text:
|
|
|
|
|
cleaned = self._strip_quoted_email_text(raw_text)
|
|
|
|
|
if cleaned:
|
|
|
|
|
return cleaned
|
|
|
|
|
|
|
|
|
|
html_fallback = self._html_to_text(email_data.get('body_html') or '')
|
|
|
|
|
if html_fallback:
|
|
|
|
|
cleaned = self._strip_quoted_email_text(html_fallback)
|
|
|
|
|
if cleaned:
|
|
|
|
|
return cleaned
|
|
|
|
|
|
|
|
|
|
return ""
|
|
|
|
|
|
2026-03-02 23:58:56 +01:00
|
|
|
def _add_helpdesk_comment(self, sag_id: int, email_data: Dict) -> None:
|
2026-03-23 20:35:15 +01:00
|
|
|
email_id = email_data.get('id')
|
2026-03-02 23:58:56 +01:00
|
|
|
sender = email_data.get('sender_email') or 'ukendt'
|
|
|
|
|
subject = email_data.get('subject') or '(ingen emne)'
|
|
|
|
|
received = email_data.get('received_date')
|
|
|
|
|
received_str = received.isoformat() if hasattr(received, 'isoformat') else str(received or '')
|
2026-04-03 00:50:34 +02:00
|
|
|
body_text = self._extract_primary_email_body(email_data)
|
2026-03-23 20:35:15 +01:00
|
|
|
|
|
|
|
|
email_meta_line = f"Email-ID: {email_id}\n" if email_id else ""
|
2026-03-02 23:58:56 +01:00
|
|
|
|
|
|
|
|
comment = (
|
|
|
|
|
f"📧 Indgående email\n"
|
2026-03-23 20:35:15 +01:00
|
|
|
f"{email_meta_line}"
|
2026-03-02 23:58:56 +01:00
|
|
|
f"Fra: {sender}\n"
|
|
|
|
|
f"Emne: {subject}\n"
|
|
|
|
|
f"Modtaget: {received_str}\n\n"
|
|
|
|
|
f"{body_text}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
execute_update(
|
|
|
|
|
"""
|
|
|
|
|
INSERT INTO sag_kommentarer (sag_id, forfatter, indhold, er_system_besked)
|
|
|
|
|
VALUES (%s, %s, %s, %s)
|
|
|
|
|
""",
|
|
|
|
|
(sag_id, 'Email Bot', comment, True)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def _create_sag_from_email(self, email_data: Dict, customer_id: int) -> Dict[str, Any]:
|
|
|
|
|
sender = email_data.get('sender_email') or 'ukendt'
|
|
|
|
|
subject = (email_data.get('subject') or '').strip() or f"Email fra {sender}"
|
2026-04-03 00:50:34 +02:00
|
|
|
body_text = self._extract_primary_email_body(email_data)
|
2026-03-02 23:58:56 +01:00
|
|
|
|
|
|
|
|
description = (
|
|
|
|
|
f"Auto-oprettet fra email\n"
|
|
|
|
|
f"Fra: {sender}\n"
|
|
|
|
|
f"Message-ID: {email_data.get('message_id') or ''}\n\n"
|
2026-04-03 00:50:34 +02:00
|
|
|
f"{body_text}"
|
2026-03-02 23:58:56 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
rows = execute_query(
|
|
|
|
|
"""
|
|
|
|
|
INSERT INTO sag_sager (
|
|
|
|
|
titel, beskrivelse, template_key, status, customer_id, created_by_user_id
|
|
|
|
|
)
|
|
|
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
|
|
|
RETURNING id, titel, customer_id
|
|
|
|
|
""",
|
|
|
|
|
(subject, description, 'ticket', 'åben', customer_id, 1)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if not rows:
|
|
|
|
|
raise ValueError('Failed to create SAG from email')
|
|
|
|
|
return rows[0]
|
|
|
|
|
|
|
|
|
|
async def _handle_helpdesk_sag_routing(self, email_data: Dict) -> Optional[Dict[str, Any]]:
|
|
|
|
|
email_id = email_data.get('id')
|
|
|
|
|
if not email_id:
|
|
|
|
|
return None
|
|
|
|
|
|
2026-03-23 20:35:15 +01:00
|
|
|
derived_thread_key = self._derive_thread_key(email_data)
|
|
|
|
|
sag_id_from_thread_key = self._find_sag_id_from_thread_key(derived_thread_key)
|
|
|
|
|
sag_id_from_thread = self._find_sag_id_from_thread_headers(email_data)
|
|
|
|
|
sag_id_from_tag = self._extract_sag_id(email_data)
|
2026-04-01 21:34:58 +02:00
|
|
|
scan_token_route = self._resolve_scan_token_route(email_id, email_data)
|
|
|
|
|
|
|
|
|
|
if scan_token_route and scan_token_route.get('sag_id'):
|
|
|
|
|
matched_sag_id = int(scan_token_route['sag_id'])
|
|
|
|
|
logger.info("🔎 Scan token matched email %s to SAG-%s", email_id, matched_sag_id)
|
|
|
|
|
return await self._finalize_sag_routing(email_id, email_data, matched_sag_id, 'scan_token')
|
|
|
|
|
|
|
|
|
|
# Priority 0: BMCid is the most reliable signal — it's our own hidden
|
|
|
|
|
# marker embedded in every outgoing case email. When present, it
|
|
|
|
|
# provides the sag_id directly and the thread_suffix lets us adopt
|
|
|
|
|
# the correct thread_key for multi-thread SAGs.
|
|
|
|
|
bmc_id = self._extract_bmc_id(email_data)
|
|
|
|
|
if bmc_id:
|
|
|
|
|
bmc_sag_id = bmc_id['sag_id']
|
|
|
|
|
bmc_thread_suffix = bmc_id['thread_suffix']
|
|
|
|
|
# Look up the thread_key of the outgoing email whose BMCid matches
|
|
|
|
|
bmc_thread_key = self._find_thread_key_by_bmc_suffix(bmc_sag_id, bmc_thread_suffix)
|
|
|
|
|
if bmc_thread_key:
|
|
|
|
|
# Adopt the outgoing email's thread_key so reply groups correctly
|
|
|
|
|
self._update_email_thread_key(email_id, bmc_thread_key)
|
|
|
|
|
logger.info(
|
|
|
|
|
"🔖 BMCid s%st%s matched → SAG-%s (thread_key=%s)",
|
|
|
|
|
bmc_sag_id, bmc_thread_suffix, bmc_sag_id, bmc_thread_key,
|
|
|
|
|
)
|
|
|
|
|
sag_id = bmc_sag_id
|
|
|
|
|
routing_source = 'bmc_id'
|
|
|
|
|
# Skip the remaining priority chain — BMCid is authoritative
|
|
|
|
|
return await self._finalize_sag_routing(email_id, email_data, sag_id, routing_source)
|
|
|
|
|
|
|
|
|
|
# Fallback: try the explicit provider thread key (e.g. Graph conversationId)
|
|
|
|
|
# separately when the derived key (References[0]) differs from it.
|
|
|
|
|
provider_thread_key = self._normalize_message_id(email_data.get('thread_key'))
|
|
|
|
|
sag_id_from_provider = None
|
|
|
|
|
if provider_thread_key and provider_thread_key != derived_thread_key:
|
|
|
|
|
sag_id_from_provider = self._find_sag_id_from_thread_key(provider_thread_key)
|
2026-03-23 20:35:15 +01:00
|
|
|
|
|
|
|
|
routing_source = None
|
|
|
|
|
sag_id = None
|
|
|
|
|
|
|
|
|
|
if sag_id_from_thread_key:
|
|
|
|
|
sag_id = sag_id_from_thread_key
|
|
|
|
|
routing_source = 'thread_key'
|
|
|
|
|
logger.info("🧵 Matched email %s to SAG-%s via thread key", email_id, sag_id)
|
|
|
|
|
|
|
|
|
|
if sag_id_from_thread:
|
|
|
|
|
if sag_id and sag_id != sag_id_from_thread:
|
|
|
|
|
logger.warning(
|
|
|
|
|
"⚠️ Email %s has conflicting thread matches (thread_key: SAG-%s, headers: SAG-%s). Using thread_key.",
|
|
|
|
|
email_id,
|
|
|
|
|
sag_id,
|
|
|
|
|
sag_id_from_thread,
|
|
|
|
|
)
|
|
|
|
|
elif not sag_id:
|
|
|
|
|
sag_id = sag_id_from_thread
|
|
|
|
|
routing_source = 'thread_headers'
|
2026-03-03 10:42:16 +01:00
|
|
|
logger.info("🔗 Matched email %s to SAG-%s via thread headers", email_id, sag_id)
|
2026-03-02 23:58:56 +01:00
|
|
|
|
2026-04-01 21:34:58 +02:00
|
|
|
if sag_id_from_provider and not sag_id:
|
|
|
|
|
sag_id = sag_id_from_provider
|
|
|
|
|
routing_source = 'provider_thread_key'
|
|
|
|
|
logger.info("🧵 Matched email %s to SAG-%s via provider thread key (conversationId)", email_id, sag_id)
|
|
|
|
|
|
2026-03-23 20:35:15 +01:00
|
|
|
if sag_id_from_tag:
|
|
|
|
|
if sag_id and sag_id != sag_id_from_tag:
|
|
|
|
|
logger.warning(
|
|
|
|
|
"⚠️ Email %s contains conflicting case hints (thread: SAG-%s, tag: SAG-%s). Using thread match.",
|
|
|
|
|
email_id,
|
|
|
|
|
sag_id,
|
|
|
|
|
sag_id_from_tag
|
|
|
|
|
)
|
|
|
|
|
elif not sag_id:
|
|
|
|
|
sag_id = sag_id_from_tag
|
|
|
|
|
routing_source = 'sag_tag'
|
|
|
|
|
logger.info("🏷️ Matched email %s to SAG-%s via SAG tag", email_id, sag_id)
|
|
|
|
|
|
2026-03-02 23:58:56 +01:00
|
|
|
# 1) Existing SAG via subject/headers
|
|
|
|
|
if sag_id:
|
2026-04-01 21:34:58 +02:00
|
|
|
return await self._finalize_sag_routing(email_id, email_data, sag_id, routing_source)
|
2026-03-02 23:58:56 +01:00
|
|
|
|
2026-04-02 09:40:23 +02:00
|
|
|
if not getattr(settings, 'EMAIL_AUTO_CREATE_CASES_FROM_EMAIL', False):
|
|
|
|
|
logger.info(
|
|
|
|
|
"⏭️ Email %s did not match existing SAG and auto-create is disabled",
|
|
|
|
|
email_id,
|
|
|
|
|
)
|
|
|
|
|
return {
|
|
|
|
|
'status': 'skipped',
|
|
|
|
|
'action': 'auto_create_disabled',
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-02 23:58:56 +01:00
|
|
|
# 2) No SAG id -> create only if sender domain belongs to known customer
|
|
|
|
|
sender_domain = self._extract_sender_domain(email_data)
|
|
|
|
|
customer = self._find_customer_by_domain(sender_domain) if sender_domain else None
|
|
|
|
|
|
|
|
|
|
if not customer:
|
|
|
|
|
logger.info("⏭️ Email %s has no known customer domain (%s) - kept in /emails", email_id, sender_domain)
|
|
|
|
|
return {'status': 'skipped', 'action': 'unknown_customer_domain', 'domain': sender_domain}
|
|
|
|
|
|
|
|
|
|
case = self._create_sag_from_email(email_data, customer['id'])
|
|
|
|
|
self._add_helpdesk_comment(case['id'], email_data)
|
|
|
|
|
self._link_email_to_sag(case['id'], email_id)
|
|
|
|
|
|
|
|
|
|
execute_update(
|
|
|
|
|
"""
|
|
|
|
|
UPDATE email_messages
|
|
|
|
|
SET linked_case_id = %s,
|
|
|
|
|
customer_id = %s,
|
|
|
|
|
status = 'processed',
|
|
|
|
|
folder = 'Processed',
|
|
|
|
|
processed_at = CURRENT_TIMESTAMP,
|
|
|
|
|
auto_processed = true
|
|
|
|
|
WHERE id = %s
|
|
|
|
|
""",
|
|
|
|
|
(case['id'], customer['id'], email_id)
|
|
|
|
|
)
|
|
|
|
|
|
2026-04-01 21:34:58 +02:00
|
|
|
self._auto_attach_scanner_email(email_id, case['id'], None)
|
2026-03-02 23:58:56 +01:00
|
|
|
logger.info("✅ Created SAG-%s from email %s for customer %s", case['id'], email_id, customer['id'])
|
|
|
|
|
return {
|
|
|
|
|
'status': 'completed',
|
|
|
|
|
'action': 'created_new_sag',
|
|
|
|
|
'sag_id': case['id'],
|
|
|
|
|
'customer_id': customer['id'],
|
2026-03-23 20:35:15 +01:00
|
|
|
'domain': sender_domain,
|
|
|
|
|
'routing_source': 'customer_domain'
|
2026-03-02 23:58:56 +01:00
|
|
|
}
|
|
|
|
|
|
2025-12-15 12:28:12 +01:00
|
|
|
async def _find_matching_workflows(self, email_data: Dict) -> List[Dict]:
|
|
|
|
|
"""Find all workflows that match this email"""
|
|
|
|
|
classification = email_data.get('classification')
|
|
|
|
|
confidence = email_data.get('confidence_score', 0.0)
|
|
|
|
|
sender = email_data.get('sender_email', '')
|
|
|
|
|
subject = email_data.get('subject', '')
|
|
|
|
|
|
|
|
|
|
query = """
|
|
|
|
|
SELECT id, name, classification_trigger, sender_pattern, subject_pattern,
|
|
|
|
|
confidence_threshold, workflow_steps, priority, stop_on_match
|
|
|
|
|
FROM email_workflows
|
|
|
|
|
WHERE enabled = true
|
|
|
|
|
AND classification_trigger = %s
|
|
|
|
|
AND confidence_threshold <= %s
|
|
|
|
|
ORDER BY priority ASC
|
|
|
|
|
"""
|
|
|
|
|
|
2026-01-07 10:32:41 +01:00
|
|
|
workflows = execute_query(query, (classification, confidence))
|
2025-12-15 12:28:12 +01:00
|
|
|
|
|
|
|
|
# Filter by additional patterns
|
|
|
|
|
matching = []
|
|
|
|
|
for wf in workflows:
|
|
|
|
|
# Check sender pattern
|
|
|
|
|
if wf.get('sender_pattern'):
|
|
|
|
|
pattern = wf['sender_pattern']
|
|
|
|
|
if not re.search(pattern, sender, re.IGNORECASE):
|
|
|
|
|
logger.debug(f"⏭️ Workflow '{wf['name']}' skipped: sender doesn't match pattern")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Check subject pattern
|
|
|
|
|
if wf.get('subject_pattern'):
|
|
|
|
|
pattern = wf['subject_pattern']
|
|
|
|
|
if not re.search(pattern, subject, re.IGNORECASE):
|
|
|
|
|
logger.debug(f"⏭️ Workflow '{wf['name']}' skipped: subject doesn't match pattern")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
matching.append(wf)
|
|
|
|
|
|
|
|
|
|
return matching
|
|
|
|
|
|
|
|
|
|
async def _execute_workflow(self, workflow: Dict, email_data: Dict) -> Dict:
|
|
|
|
|
"""Execute a single workflow"""
|
|
|
|
|
workflow_id = workflow['id']
|
|
|
|
|
workflow_name = workflow['name']
|
|
|
|
|
email_id = email_data['id']
|
|
|
|
|
|
|
|
|
|
logger.info(f"🚀 Executing workflow: {workflow_name} (ID: {workflow_id})")
|
|
|
|
|
|
|
|
|
|
# Create execution record
|
|
|
|
|
execution_id = execute_insert(
|
|
|
|
|
"""INSERT INTO email_workflow_executions
|
|
|
|
|
(workflow_id, email_id, status, steps_total, result_json)
|
|
|
|
|
VALUES (%s, %s, 'running', %s, %s) RETURNING id""",
|
|
|
|
|
(workflow_id, email_id, len(workflow['workflow_steps']), json.dumps({}))
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
started_at = datetime.now()
|
|
|
|
|
steps = workflow['workflow_steps']
|
|
|
|
|
steps_completed = 0
|
|
|
|
|
step_results = []
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Execute each step
|
|
|
|
|
for idx, step in enumerate(steps):
|
|
|
|
|
action = step.get('action')
|
|
|
|
|
params = step.get('params', {})
|
|
|
|
|
|
|
|
|
|
logger.info(f" ➡️ Step {idx + 1}/{len(steps)}: {action}")
|
|
|
|
|
|
|
|
|
|
step_result = await self._execute_action(action, params, email_data)
|
|
|
|
|
step_results.append({
|
|
|
|
|
'step': idx + 1,
|
|
|
|
|
'action': action,
|
|
|
|
|
'status': step_result['status'],
|
|
|
|
|
'result': step_result.get('result'),
|
|
|
|
|
'error': step_result.get('error')
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if step_result['status'] == 'failed':
|
|
|
|
|
logger.error(f" ❌ Step failed: {step_result.get('error')}")
|
|
|
|
|
# Continue to next step even on failure (configurable later)
|
|
|
|
|
else:
|
|
|
|
|
logger.info(f" ✅ Step completed successfully")
|
|
|
|
|
|
|
|
|
|
steps_completed += 1
|
|
|
|
|
|
|
|
|
|
# Mark execution as completed
|
|
|
|
|
completed_at = datetime.now()
|
|
|
|
|
execution_time_ms = int((completed_at - started_at).total_seconds() * 1000)
|
|
|
|
|
|
|
|
|
|
execute_update(
|
|
|
|
|
"""UPDATE email_workflow_executions
|
|
|
|
|
SET status = 'completed', steps_completed = %s,
|
|
|
|
|
result_json = %s, completed_at = CURRENT_TIMESTAMP,
|
|
|
|
|
execution_time_ms = %s
|
|
|
|
|
WHERE id = %s""",
|
|
|
|
|
(steps_completed, json.dumps(step_results), execution_time_ms, execution_id)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Update workflow statistics
|
|
|
|
|
execute_update(
|
|
|
|
|
"""UPDATE email_workflows
|
|
|
|
|
SET execution_count = execution_count + 1,
|
|
|
|
|
success_count = success_count + 1,
|
|
|
|
|
last_executed_at = CURRENT_TIMESTAMP
|
|
|
|
|
WHERE id = %s""",
|
|
|
|
|
(workflow_id,)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logger.info(f"✅ Workflow '{workflow_name}' completed ({execution_time_ms}ms)")
|
|
|
|
|
|
|
|
|
|
# Log: Workflow execution completed
|
|
|
|
|
await email_activity_logger.log_workflow_executed(
|
|
|
|
|
email_id=email_id,
|
|
|
|
|
workflow_id=workflow_id,
|
|
|
|
|
workflow_name=workflow_name,
|
|
|
|
|
status='completed',
|
|
|
|
|
steps_completed=steps_completed,
|
|
|
|
|
execution_time_ms=execution_time_ms
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'workflow_id': workflow_id,
|
|
|
|
|
'workflow_name': workflow_name,
|
|
|
|
|
'execution_id': execution_id,
|
|
|
|
|
'status': 'completed',
|
|
|
|
|
'steps_completed': steps_completed,
|
|
|
|
|
'steps_total': len(steps),
|
|
|
|
|
'execution_time_ms': execution_time_ms,
|
|
|
|
|
'step_results': step_results
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Workflow execution failed: {e}")
|
|
|
|
|
|
|
|
|
|
# Mark execution as failed
|
|
|
|
|
execute_update(
|
|
|
|
|
"""UPDATE email_workflow_executions
|
|
|
|
|
SET status = 'failed', steps_completed = %s,
|
|
|
|
|
error_message = %s, completed_at = CURRENT_TIMESTAMP
|
|
|
|
|
WHERE id = %s""",
|
|
|
|
|
(steps_completed, str(e), execution_id)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Update workflow statistics
|
|
|
|
|
execute_update(
|
|
|
|
|
"""UPDATE email_workflows
|
|
|
|
|
SET execution_count = execution_count + 1,
|
|
|
|
|
failure_count = failure_count + 1,
|
|
|
|
|
last_executed_at = CURRENT_TIMESTAMP
|
|
|
|
|
WHERE id = %s""",
|
|
|
|
|
(workflow_id,)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Log: Workflow execution failed
|
|
|
|
|
await email_activity_logger.log_workflow_executed(
|
|
|
|
|
email_id=email_id,
|
|
|
|
|
workflow_id=workflow_id,
|
|
|
|
|
workflow_name=workflow_name,
|
|
|
|
|
status='failed',
|
|
|
|
|
steps_completed=steps_completed,
|
|
|
|
|
execution_time_ms=0
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'workflow_id': workflow_id,
|
|
|
|
|
'workflow_name': workflow_name,
|
|
|
|
|
'execution_id': execution_id,
|
|
|
|
|
'status': 'failed',
|
|
|
|
|
'steps_completed': steps_completed,
|
|
|
|
|
'steps_total': len(steps),
|
|
|
|
|
'error': str(e)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async def _execute_action(self, action: str, params: Dict, email_data: Dict) -> Dict:
|
|
|
|
|
"""Execute a single workflow action"""
|
|
|
|
|
try:
|
|
|
|
|
# Dispatch to specific action handler
|
|
|
|
|
handler_map = {
|
feat(ticket-module): Implement ticket system with comprehensive database schema, permissions, and testing suite
- Added migration 025 for the Ticket System, creating tables for tickets, comments, attachments, worklogs, prepaid cards, and audit logs.
- Introduced migration 026 to add ticket-related permissions to the auth system and assign them to user groups.
- Developed a test suite for the Ticket Module, validating database schema, ticket number generation, prepaid card constraints, service logic, worklog creation, audit logging, and views.
2025-12-15 23:40:23 +01:00
|
|
|
'create_ticket': self._action_create_ticket_system,
|
|
|
|
|
'link_email_to_ticket': self._action_link_email_to_ticket,
|
2026-03-02 23:58:56 +01:00
|
|
|
'route_helpdesk_sag': self._handle_helpdesk_sag_routing,
|
2025-12-15 12:28:12 +01:00
|
|
|
'create_time_entry': self._action_create_time_entry,
|
|
|
|
|
'link_to_vendor': self._action_link_to_vendor,
|
|
|
|
|
'link_to_customer': self._action_link_to_customer,
|
|
|
|
|
'extract_invoice_data': self._action_extract_invoice_data,
|
|
|
|
|
'extract_tracking_number': self._action_extract_tracking_number,
|
2026-01-11 19:23:21 +01:00
|
|
|
'regex_extract_and_link': self._action_regex_extract_and_link,
|
2025-12-15 12:28:12 +01:00
|
|
|
'send_slack_notification': self._action_send_slack_notification,
|
|
|
|
|
'send_email_notification': self._action_send_email_notification,
|
|
|
|
|
'mark_as_processed': self._action_mark_as_processed,
|
|
|
|
|
'flag_for_review': self._action_flag_for_review,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
handler = handler_map.get(action)
|
|
|
|
|
|
|
|
|
|
if not handler:
|
|
|
|
|
logger.warning(f"⚠️ Unknown action: {action}")
|
|
|
|
|
return {
|
|
|
|
|
'status': 'skipped',
|
|
|
|
|
'error': f'Unknown action: {action}'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result = await handler(params, email_data)
|
|
|
|
|
return {
|
|
|
|
|
'status': 'success',
|
|
|
|
|
'result': result
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Action '{action}' failed: {e}")
|
|
|
|
|
return {
|
|
|
|
|
'status': 'failed',
|
|
|
|
|
'error': str(e)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Action Handlers
|
|
|
|
|
|
2026-01-11 19:23:21 +01:00
|
|
|
async def _action_regex_extract_and_link(self, params: Dict, email_data: Dict) -> Dict:
|
|
|
|
|
"""
|
|
|
|
|
Generic action to extract data via regex and link/update record
|
|
|
|
|
Params:
|
|
|
|
|
- regex_pattern: Pattern with one capture group (e.g. "CVR: (\d{8})")
|
|
|
|
|
- target_table: Table to search (e.g. "customers")
|
|
|
|
|
- target_column: Column to match value against (e.g. "cvr_number")
|
|
|
|
|
- link_column: Column in email_messages to update (e.g. "customer_id")
|
|
|
|
|
- value_column: Column in target table to retrieve (e.g. "id")
|
|
|
|
|
- on_match: "update_email" (default) or "none"
|
|
|
|
|
"""
|
|
|
|
|
regex_pattern = params.get('regex_pattern')
|
|
|
|
|
target_table = params.get('target_table')
|
|
|
|
|
target_column = params.get('target_column')
|
|
|
|
|
link_column = params.get('link_column', 'customer_id')
|
|
|
|
|
value_column = params.get('value_column', 'id')
|
|
|
|
|
|
|
|
|
|
if not all([regex_pattern, target_table, target_column]):
|
|
|
|
|
return {'status': 'failed', 'error': 'Missing required params: regex_pattern, target_table, target_column'}
|
|
|
|
|
|
|
|
|
|
# Combine text for search
|
|
|
|
|
text_content = (
|
|
|
|
|
f"{email_data.get('subject', '')} "
|
|
|
|
|
f"{email_data.get('body_text', '')} "
|
|
|
|
|
f"{email_data.get('body_html', '')}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# 1. Run Regex
|
|
|
|
|
matches = re.findall(regex_pattern, text_content, re.IGNORECASE)
|
|
|
|
|
unique_matches = list(set(matches))
|
|
|
|
|
|
|
|
|
|
if not unique_matches:
|
|
|
|
|
return {'status': 'skipped', 'reason': 'no_regex_match', 'pattern': regex_pattern}
|
|
|
|
|
|
|
|
|
|
logger.info(f"🔍 Regex '{regex_pattern}' found matches: {unique_matches}")
|
|
|
|
|
|
|
|
|
|
# 2. Look up in Target Table
|
|
|
|
|
# Safety check: simplistic validation against SQL injection for table/column names is assumed
|
|
|
|
|
# (params should come from trustworthy configuration)
|
|
|
|
|
valid_tables = ['customers', 'vendors', 'users']
|
|
|
|
|
if target_table not in valid_tables:
|
|
|
|
|
return {'status': 'failed', 'error': f'Invalid target table: {target_table}'}
|
|
|
|
|
|
|
|
|
|
placeholders = ','.join(['%s'] * len(unique_matches))
|
|
|
|
|
query = f"SELECT {value_column}, {target_column} FROM {target_table} WHERE {target_column} IN ({placeholders})"
|
|
|
|
|
|
|
|
|
|
db_matches = execute_query(query, tuple(unique_matches))
|
|
|
|
|
|
|
|
|
|
if not db_matches:
|
|
|
|
|
return {'status': 'completed', 'action': 'no_db_match', 'found_values': unique_matches}
|
|
|
|
|
|
|
|
|
|
# 3. Link (Update Email)
|
|
|
|
|
match = db_matches[0] # Take first match
|
|
|
|
|
match_value = match[value_column]
|
|
|
|
|
|
|
|
|
|
if params.get('on_match', 'update_email') == 'update_email':
|
|
|
|
|
update_query = f"UPDATE email_messages SET {link_column} = %s WHERE id = %s"
|
|
|
|
|
execute_update(update_query, (match_value, email_data['id']))
|
|
|
|
|
|
|
|
|
|
logger.info(f"🔗 Linked email {email_data['id']} to {target_table}.{value_column}={match_value}")
|
|
|
|
|
return {'status': 'completed', 'action': 'linked', 'match_id': match_value}
|
|
|
|
|
|
|
|
|
|
return {'status': 'completed', 'action': 'found_only', 'match_id': match_value}
|
|
|
|
|
|
feat(ticket-module): Implement ticket system with comprehensive database schema, permissions, and testing suite
- Added migration 025 for the Ticket System, creating tables for tickets, comments, attachments, worklogs, prepaid cards, and audit logs.
- Introduced migration 026 to add ticket-related permissions to the auth system and assign them to user groups.
- Developed a test suite for the Ticket Module, validating database schema, ticket number generation, prepaid card constraints, service logic, worklog creation, audit logging, and views.
2025-12-15 23:40:23 +01:00
|
|
|
async def _action_create_ticket_system(self, params: Dict, email_data: Dict) -> Dict:
|
|
|
|
|
"""Create a ticket from email using new ticket system"""
|
|
|
|
|
from app.ticket.backend.email_integration import EmailTicketIntegration
|
2025-12-15 12:28:12 +01:00
|
|
|
|
feat(ticket-module): Implement ticket system with comprehensive database schema, permissions, and testing suite
- Added migration 025 for the Ticket System, creating tables for tickets, comments, attachments, worklogs, prepaid cards, and audit logs.
- Introduced migration 026 to add ticket-related permissions to the auth system and assign them to user groups.
- Developed a test suite for the Ticket Module, validating database schema, ticket number generation, prepaid card constraints, service logic, worklog creation, audit logging, and views.
2025-12-15 23:40:23 +01:00
|
|
|
# Build email_data dict for ticket integration
|
|
|
|
|
ticket_email_data = {
|
|
|
|
|
'message_id': email_data.get('message_id'),
|
|
|
|
|
'subject': email_data.get('subject'),
|
|
|
|
|
'from_address': email_data.get('sender_email'),
|
|
|
|
|
'body': email_data.get('body_text', ''),
|
|
|
|
|
'html_body': email_data.get('body_html'),
|
|
|
|
|
'received_at': email_data.get('received_date').isoformat() if email_data.get('received_date') else None,
|
2026-03-02 23:58:56 +01:00
|
|
|
'in_reply_to': email_data.get('in_reply_to'),
|
|
|
|
|
'references': email_data.get('email_references')
|
feat(ticket-module): Implement ticket system with comprehensive database schema, permissions, and testing suite
- Added migration 025 for the Ticket System, creating tables for tickets, comments, attachments, worklogs, prepaid cards, and audit logs.
- Introduced migration 026 to add ticket-related permissions to the auth system and assign them to user groups.
- Developed a test suite for the Ticket Module, validating database schema, ticket number generation, prepaid card constraints, service logic, worklog creation, audit logging, and views.
2025-12-15 23:40:23 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Get params from workflow
|
|
|
|
|
customer_id = params.get('customer_id') or email_data.get('customer_id')
|
|
|
|
|
assigned_to_user_id = params.get('assigned_to_user_id')
|
|
|
|
|
|
|
|
|
|
logger.info(f"🎫 Creating ticket from email: {email_data.get('message_id')}")
|
|
|
|
|
|
|
|
|
|
result = await EmailTicketIntegration.process_email_for_ticket(
|
|
|
|
|
email_data=ticket_email_data,
|
|
|
|
|
customer_id=customer_id,
|
|
|
|
|
assigned_to_user_id=assigned_to_user_id
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logger.info(f"✅ Created ticket {result.get('ticket_number')} from email")
|
2025-12-15 12:28:12 +01:00
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'action': 'create_ticket',
|
feat(ticket-module): Implement ticket system with comprehensive database schema, permissions, and testing suite
- Added migration 025 for the Ticket System, creating tables for tickets, comments, attachments, worklogs, prepaid cards, and audit logs.
- Introduced migration 026 to add ticket-related permissions to the auth system and assign them to user groups.
- Developed a test suite for the Ticket Module, validating database schema, ticket number generation, prepaid card constraints, service logic, worklog creation, audit logging, and views.
2025-12-15 23:40:23 +01:00
|
|
|
'ticket_id': result.get('ticket_id'),
|
|
|
|
|
'ticket_number': result.get('ticket_number'),
|
|
|
|
|
'created': result.get('created', False),
|
|
|
|
|
'linked': result.get('linked', False)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async def _action_link_email_to_ticket(self, params: Dict, email_data: Dict) -> Dict:
|
|
|
|
|
"""Link email to existing ticket"""
|
|
|
|
|
from app.ticket.backend.email_integration import EmailTicketIntegration
|
|
|
|
|
|
|
|
|
|
ticket_number = params.get('ticket_number')
|
|
|
|
|
|
|
|
|
|
if not ticket_number:
|
|
|
|
|
logger.warning("⚠️ No ticket_number provided for link_email_to_ticket action")
|
|
|
|
|
return {
|
|
|
|
|
'action': 'link_email_to_ticket',
|
|
|
|
|
'status': 'failed',
|
|
|
|
|
'error': 'ticket_number required'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ticket_email_data = {
|
|
|
|
|
'message_id': email_data.get('message_id'),
|
|
|
|
|
'subject': email_data.get('subject'),
|
|
|
|
|
'from_address': email_data.get('sender_email'),
|
|
|
|
|
'body': email_data.get('body_text', ''),
|
|
|
|
|
'html_body': email_data.get('body_html'),
|
|
|
|
|
'received_at': email_data.get('received_date').isoformat() if email_data.get('received_date') else None,
|
2026-03-02 23:58:56 +01:00
|
|
|
'in_reply_to': email_data.get('in_reply_to'),
|
|
|
|
|
'references': email_data.get('email_references')
|
feat(ticket-module): Implement ticket system with comprehensive database schema, permissions, and testing suite
- Added migration 025 for the Ticket System, creating tables for tickets, comments, attachments, worklogs, prepaid cards, and audit logs.
- Introduced migration 026 to add ticket-related permissions to the auth system and assign them to user groups.
- Developed a test suite for the Ticket Module, validating database schema, ticket number generation, prepaid card constraints, service logic, worklog creation, audit logging, and views.
2025-12-15 23:40:23 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.info(f"🔗 Linking email to ticket {ticket_number}")
|
|
|
|
|
|
|
|
|
|
result = await EmailTicketIntegration.link_email_to_ticket(
|
|
|
|
|
ticket_number=ticket_number,
|
|
|
|
|
email_data=ticket_email_data
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logger.info(f"✅ Linked email to ticket {ticket_number}")
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'action': 'link_email_to_ticket',
|
|
|
|
|
'ticket_id': result.get('ticket_id'),
|
|
|
|
|
'ticket_number': result.get('ticket_number'),
|
|
|
|
|
'linked': True
|
2025-12-15 12:28:12 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async def _action_create_time_entry(self, params: Dict, email_data: Dict) -> Dict:
|
|
|
|
|
"""Create time entry from email"""
|
|
|
|
|
logger.info(f"⏱️ Would create time entry")
|
|
|
|
|
|
|
|
|
|
# TODO: Integrate with time tracking system
|
|
|
|
|
return {
|
|
|
|
|
'action': 'create_time_entry',
|
|
|
|
|
'note': 'Time entry creation not yet implemented'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async def _action_link_to_vendor(self, params: Dict, email_data: Dict) -> Dict:
|
|
|
|
|
"""Link email to vendor"""
|
|
|
|
|
match_by = params.get('match_by', 'email')
|
|
|
|
|
sender_email = email_data.get('sender_email')
|
|
|
|
|
|
|
|
|
|
if not sender_email:
|
|
|
|
|
return {'action': 'link_to_vendor', 'matched': False, 'reason': 'No sender email'}
|
|
|
|
|
|
|
|
|
|
# Find vendor by email
|
|
|
|
|
query = "SELECT id, name FROM vendors WHERE email = %s LIMIT 1"
|
2025-12-16 15:36:11 +01:00
|
|
|
result = execute_query(query, (sender_email,))
|
2025-12-15 12:28:12 +01:00
|
|
|
|
|
|
|
|
if result:
|
|
|
|
|
vendor_id = result['id']
|
|
|
|
|
|
|
|
|
|
# Check if already linked to avoid duplicate updates
|
2026-01-07 10:32:41 +01:00
|
|
|
result_vendor = execute_query(
|
2025-12-15 12:28:12 +01:00
|
|
|
"SELECT supplier_id FROM email_messages WHERE id = %s",
|
2025-12-16 15:36:11 +01:00
|
|
|
(email_data['id'],))
|
2026-01-07 10:32:41 +01:00
|
|
|
current_vendor = result_vendor[0] if result_vendor else None
|
2025-12-15 12:28:12 +01:00
|
|
|
|
|
|
|
|
if current_vendor and current_vendor.get('supplier_id') == vendor_id:
|
|
|
|
|
logger.info(f"⏭️ Email already linked to vendor {vendor_id}, skipping duplicate update")
|
|
|
|
|
return {
|
|
|
|
|
'action': 'link_to_vendor',
|
|
|
|
|
'matched': True,
|
|
|
|
|
'vendor_id': vendor_id,
|
|
|
|
|
'vendor_name': result['name'],
|
|
|
|
|
'note': 'Already linked (skipped duplicate)'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Update email with vendor link
|
|
|
|
|
execute_update(
|
|
|
|
|
"UPDATE email_messages SET supplier_id = %s WHERE id = %s",
|
|
|
|
|
(vendor_id, email_data['id'])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logger.info(f"🔗 Linked email to vendor: {result['name']} (ID: {vendor_id})")
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'action': 'link_to_vendor',
|
|
|
|
|
'matched': True,
|
|
|
|
|
'vendor_id': vendor_id,
|
|
|
|
|
'vendor_name': result['name']
|
|
|
|
|
}
|
|
|
|
|
else:
|
|
|
|
|
logger.info(f"⚠️ No vendor found for email: {sender_email}")
|
|
|
|
|
return {'action': 'link_to_vendor', 'matched': False, 'reason': 'Vendor not found'}
|
|
|
|
|
|
|
|
|
|
async def _action_link_to_customer(self, params: Dict, email_data: Dict) -> Dict:
|
2026-03-02 23:58:56 +01:00
|
|
|
"""Link email to customer by sender domain and persist on email_messages"""
|
|
|
|
|
sender_domain = self._extract_sender_domain(email_data)
|
|
|
|
|
if not sender_domain:
|
|
|
|
|
return {'action': 'link_to_customer', 'matched': False, 'reason': 'No sender domain'}
|
|
|
|
|
|
|
|
|
|
customer = self._find_customer_by_domain(sender_domain)
|
|
|
|
|
if not customer:
|
|
|
|
|
return {
|
|
|
|
|
'action': 'link_to_customer',
|
|
|
|
|
'matched': False,
|
|
|
|
|
'reason': 'Customer not found for domain',
|
|
|
|
|
'domain': sender_domain
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
execute_update(
|
|
|
|
|
"UPDATE email_messages SET customer_id = %s WHERE id = %s",
|
|
|
|
|
(customer['id'], email_data['id'])
|
|
|
|
|
)
|
|
|
|
|
|
2025-12-15 12:28:12 +01:00
|
|
|
return {
|
|
|
|
|
'action': 'link_to_customer',
|
2026-03-02 23:58:56 +01:00
|
|
|
'matched': True,
|
|
|
|
|
'customer_id': customer['id'],
|
|
|
|
|
'customer_name': customer['name'],
|
|
|
|
|
'domain': sender_domain
|
2025-12-15 12:28:12 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async def _action_extract_invoice_data(self, params: Dict, email_data: Dict) -> Dict:
|
|
|
|
|
"""Save email PDF attachment to incoming_files for processing"""
|
|
|
|
|
logger.info(f"📄 Saving invoice PDF from email to incoming files")
|
|
|
|
|
|
|
|
|
|
email_id = email_data.get('id')
|
|
|
|
|
sender_email = email_data.get('sender_email', '')
|
|
|
|
|
vendor_id = email_data.get('supplier_id')
|
|
|
|
|
|
|
|
|
|
# Get PDF attachments from email
|
2026-01-07 10:32:41 +01:00
|
|
|
attachments = execute_query(
|
2025-12-15 12:28:12 +01:00
|
|
|
"""SELECT filename, file_path, size_bytes, content_type
|
|
|
|
|
FROM email_attachments
|
|
|
|
|
WHERE email_id = %s AND content_type = 'application/pdf'""",
|
|
|
|
|
(email_id,)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if not attachments:
|
|
|
|
|
attachments = []
|
|
|
|
|
elif not isinstance(attachments, list):
|
|
|
|
|
attachments = [attachments]
|
|
|
|
|
|
|
|
|
|
if not attachments:
|
|
|
|
|
logger.warning(f"⚠️ No PDF attachments found for email {email_id}")
|
|
|
|
|
return {
|
|
|
|
|
'action': 'extract_invoice_data',
|
|
|
|
|
'success': False,
|
|
|
|
|
'note': 'No PDF attachment found in email'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
uploaded_files = []
|
|
|
|
|
|
|
|
|
|
for attachment in attachments:
|
|
|
|
|
try:
|
|
|
|
|
attachment_path = attachment['file_path']
|
|
|
|
|
if not attachment_path:
|
|
|
|
|
logger.warning(f"⚠️ No file path for attachment {attachment['filename']}")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Handle both absolute and relative paths
|
|
|
|
|
file_path = Path(attachment_path)
|
|
|
|
|
if not file_path.is_absolute():
|
|
|
|
|
# Try common base directories
|
|
|
|
|
for base in [Path.cwd(), Path('/app'), Path('.')]:
|
|
|
|
|
test_path = base / attachment_path
|
|
|
|
|
if test_path.exists():
|
|
|
|
|
file_path = test_path
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
if not file_path.exists():
|
|
|
|
|
error_msg = f"Attachment file not found: {attachment_path}"
|
|
|
|
|
logger.error(f"❌ {error_msg}")
|
|
|
|
|
raise FileNotFoundError(error_msg)
|
|
|
|
|
|
|
|
|
|
# Old code continues here but never reached if file missing
|
|
|
|
|
if False and not file_path.exists():
|
|
|
|
|
logger.warning(f"⚠️ Attachment file not found: {attachment_path}")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Calculate checksum
|
|
|
|
|
with open(file_path, 'rb') as f:
|
|
|
|
|
file_content = f.read()
|
|
|
|
|
checksum = hashlib.sha256(file_content).hexdigest()
|
|
|
|
|
|
|
|
|
|
# Check if file already exists
|
|
|
|
|
existing = execute_query(
|
|
|
|
|
"SELECT file_id FROM incoming_files WHERE checksum = %s",
|
2025-12-16 15:36:11 +01:00
|
|
|
(checksum,))
|
2025-12-15 12:28:12 +01:00
|
|
|
|
|
|
|
|
if existing:
|
|
|
|
|
logger.info(f"⚠️ File already exists: {attachment['filename']}")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Create uploads directory if it doesn't exist
|
|
|
|
|
upload_dir = Path("uploads")
|
|
|
|
|
upload_dir.mkdir(exist_ok=True)
|
|
|
|
|
|
|
|
|
|
# Copy file to uploads directory
|
|
|
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
|
safe_filename = f"{timestamp}_{attachment['filename']}"
|
|
|
|
|
destination = upload_dir / safe_filename
|
|
|
|
|
|
|
|
|
|
shutil.copy2(file_path, destination)
|
|
|
|
|
|
|
|
|
|
# Insert into incoming_files
|
|
|
|
|
file_id = execute_insert(
|
|
|
|
|
"""INSERT INTO incoming_files
|
|
|
|
|
(filename, original_filename, file_path, file_size, mime_type, checksum,
|
|
|
|
|
status, detected_vendor_id, uploaded_at)
|
|
|
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
|
|
|
|
|
RETURNING file_id""",
|
|
|
|
|
(
|
|
|
|
|
safe_filename,
|
|
|
|
|
attachment['filename'],
|
|
|
|
|
str(destination),
|
|
|
|
|
attachment['size_bytes'],
|
|
|
|
|
'application/pdf',
|
|
|
|
|
checksum,
|
|
|
|
|
'pending', # Will appear in "Mangler Behandling"
|
|
|
|
|
vendor_id
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
uploaded_files.append({
|
|
|
|
|
'file_id': file_id,
|
|
|
|
|
'filename': attachment['filename']
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
logger.info(f"✅ Saved PDF to incoming_files: {attachment['filename']} (file_id: {file_id})")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Failed to save attachment: {e}")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if uploaded_files:
|
|
|
|
|
return {
|
|
|
|
|
'action': 'extract_invoice_data',
|
|
|
|
|
'success': True,
|
|
|
|
|
'files_uploaded': len(uploaded_files),
|
|
|
|
|
'file_ids': [f['file_id'] for f in uploaded_files],
|
|
|
|
|
'note': f"{len(uploaded_files)} PDF(er) gemt i 'Mangler Behandling'"
|
|
|
|
|
}
|
|
|
|
|
else:
|
|
|
|
|
return {
|
|
|
|
|
'action': 'extract_invoice_data',
|
|
|
|
|
'success': False,
|
|
|
|
|
'note': 'No files could be uploaded'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async def _action_extract_tracking_number(self, params: Dict, email_data: Dict) -> Dict:
|
|
|
|
|
"""Extract tracking number from email"""
|
|
|
|
|
body = email_data.get('body_text', '')
|
|
|
|
|
subject = email_data.get('subject', '')
|
|
|
|
|
text = f"{subject} {body}"
|
|
|
|
|
|
|
|
|
|
# Simple regex patterns for common carriers
|
|
|
|
|
patterns = {
|
|
|
|
|
'postnord': r'\b[0-9]{18}\b',
|
|
|
|
|
'gls': r'\b[0-9]{11}\b',
|
|
|
|
|
'dao': r'\b[0-9]{14}\b'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tracking_numbers = []
|
|
|
|
|
|
|
|
|
|
for carrier, pattern in patterns.items():
|
|
|
|
|
matches = re.findall(pattern, text)
|
|
|
|
|
if matches:
|
|
|
|
|
tracking_numbers.extend([{'carrier': carrier, 'number': m} for m in matches])
|
|
|
|
|
|
|
|
|
|
if tracking_numbers:
|
|
|
|
|
logger.info(f"📦 Extracted {len(tracking_numbers)} tracking number(s)")
|
|
|
|
|
|
|
|
|
|
# Update email with tracking number
|
|
|
|
|
if tracking_numbers:
|
|
|
|
|
first_number = tracking_numbers[0]['number']
|
|
|
|
|
execute_update(
|
|
|
|
|
"UPDATE email_messages SET extracted_tracking_number = %s WHERE id = %s",
|
|
|
|
|
(first_number, email_data['id'])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'action': 'extract_tracking_number',
|
|
|
|
|
'tracking_numbers': tracking_numbers
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async def _action_send_slack_notification(self, params: Dict, email_data: Dict) -> Dict:
|
|
|
|
|
"""Send Slack notification"""
|
|
|
|
|
channel = params.get('channel', '#general')
|
|
|
|
|
template = params.get('template', 'New email: {{subject}}')
|
|
|
|
|
|
|
|
|
|
logger.info(f"💬 Would send Slack notification to {channel}")
|
|
|
|
|
|
|
|
|
|
# TODO: Integrate with Slack API
|
|
|
|
|
return {
|
|
|
|
|
'action': 'send_slack_notification',
|
|
|
|
|
'channel': channel,
|
|
|
|
|
'note': 'Slack integration not yet implemented'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async def _action_send_email_notification(self, params: Dict, email_data: Dict) -> Dict:
|
|
|
|
|
"""Send email notification"""
|
|
|
|
|
recipients = params.get('recipients', [])
|
|
|
|
|
|
|
|
|
|
logger.info(f"📧 Would send email notification to {len(recipients)} recipient(s)")
|
|
|
|
|
|
|
|
|
|
# TODO: Integrate with email sending service
|
|
|
|
|
return {
|
|
|
|
|
'action': 'send_email_notification',
|
|
|
|
|
'recipients': recipients,
|
|
|
|
|
'note': 'Email notification not yet implemented'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async def _action_mark_as_processed(self, params: Dict, email_data: Dict) -> Dict:
|
|
|
|
|
"""Mark email as processed"""
|
|
|
|
|
status = params.get('status', 'processed')
|
|
|
|
|
|
|
|
|
|
execute_update(
|
|
|
|
|
"""UPDATE email_messages
|
|
|
|
|
SET status = %s, processed_at = CURRENT_TIMESTAMP, auto_processed = true
|
|
|
|
|
WHERE id = %s""",
|
|
|
|
|
(status, email_data['id'])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logger.info(f"✅ Marked email as: {status}")
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'action': 'mark_as_processed',
|
|
|
|
|
'status': status
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async def _action_flag_for_review(self, params: Dict, email_data: Dict) -> Dict:
|
|
|
|
|
"""Flag email for manual review"""
|
|
|
|
|
reason = params.get('reason', 'Flagged by workflow')
|
|
|
|
|
|
|
|
|
|
execute_update(
|
|
|
|
|
"""UPDATE email_messages
|
|
|
|
|
SET status = 'flagged', approval_status = 'pending_review'
|
|
|
|
|
WHERE id = %s""",
|
|
|
|
|
(email_data['id'],)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logger.info(f"🚩 Flagged email for review: {reason}")
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'action': 'flag_for_review',
|
|
|
|
|
'reason': reason
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Global instance
|
|
|
|
|
email_workflow_service = EmailWorkflowService()
|