bmc_hub/app/services/email_workflow_service.py

1834 lines
72 KiB
Python
Raw Normal View History

"""
Email Workflow Service
Executes automated workflows based on email classification
Inspired by OmniSync architecture adapted for BMC Hub
"""
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime, date
import re
import json
import hashlib
import shutil
import io
from pathlib import Path
from decimal import Decimal
from uuid import uuid4
from app.core.database import execute_query, execute_insert, execute_update, table_has_column
from app.core.config import settings
from app.services.email_activity_logger import email_activity_logger
logger = logging.getLogger(__name__)
class EmailWorkflowService:
"""Orchestrates workflow execution for classified emails"""
def __init__(self):
self.enabled = settings.EMAIL_WORKFLOWS_ENABLED if hasattr(settings, 'EMAIL_WORKFLOWS_ENABLED') else True
2026-03-02 23:58:56 +01:00
HELPDESK_SKIP_CLASSIFICATIONS = {
'invoice',
'order_confirmation',
'freight_note',
'time_confirmation',
'newsletter',
'spam',
'bankruptcy',
'recording'
}
_SCAN_TOKEN_PATTERN = re.compile(r'\bBMCSCAN-[A-Z0-9-]{10,100}\b', re.IGNORECASE)
async def execute_workflows(self, email_data: Dict) -> Dict:
"""
Execute all matching workflows for an email
Args:
email_data: Email dict with classification, confidence_score, id, etc.
Returns:
Dict with execution results
"""
if not self.enabled:
logger.info("⏭️ Workflows disabled")
return {'status': 'disabled', 'workflows_executed': 0}
email_id = email_data.get('id')
classification = (email_data.get('classification') or '').strip().lower()
confidence = email_data.get('confidence_score', 0.0)
has_hint = self.has_helpdesk_routing_hint(email_data)
if not email_id:
logger.warning("⚠️ Cannot execute workflows: missing email_id")
return {'status': 'skipped', 'reason': 'missing_data'}
if not classification:
if has_hint:
classification = 'general'
email_data['classification'] = classification
else:
logger.warning("⚠️ Cannot execute workflows: missing classification")
return {'status': 'skipped', 'reason': 'missing_data'}
logger.info(f"🔄 Finding workflows for classification: {classification} (confidence: {confidence})")
results = {
'status': 'executed',
'workflows_executed': 0,
'workflows_succeeded': 0,
'workflows_failed': 0,
'details': []
}
# Special System Workflow: Bankruptcy Analysis
# Parses Statstidende emails for CVR numbers to link to customers
if classification == 'bankruptcy':
sys_result = await self._handle_bankruptcy_analysis(email_data)
results['details'].append(sys_result)
if sys_result['status'] == 'completed':
results['workflows_executed'] += 1
results['workflows_succeeded'] += 1
logger.info("✅ Bankruptcy system workflow executed successfully")
2026-03-02 23:58:56 +01:00
# Special System Workflow: Helpdesk SAG routing
# - If SAG/tråd-hint findes => forsøg routing til eksisterende sag
# - Newsletters/spam skip routing ENTIRELY (even with thread hints)
# - Uden hints: brug klassifikationsgating som før
HARD_SKIP = {'newsletter', 'spam'}
should_try_helpdesk = (
classification not in HARD_SKIP
and (
classification not in self.HELPDESK_SKIP_CLASSIFICATIONS
or has_hint
)
)
if should_try_helpdesk:
2026-03-02 23:58:56 +01:00
helpdesk_result = await self._handle_helpdesk_sag_routing(email_data)
if helpdesk_result:
results['details'].append(helpdesk_result)
if helpdesk_result.get('status') == 'completed':
results['workflows_executed'] += 1
results['workflows_succeeded'] += 1
logger.info("✅ Helpdesk SAG routing workflow executed")
# Find matching workflows
workflows = await self._find_matching_workflows(email_data)
if not workflows and results['workflows_executed'] == 0:
logger.info(f"✅ No workflows match classification: {classification}")
return {'status': 'no_match', 'workflows_executed': 0}
logger.info(f"📋 Found {len(workflows)} matching workflow(s)")
# Initialize results if not already (moved up)
# results = { ... } (already initialized in my thought, but need to move init up)
# Execute workflows in priority order
for workflow in workflows:
result = await self._execute_workflow(workflow, email_data)
results['details'].append(result)
results['workflows_executed'] += 1
if result['status'] == 'completed':
results['workflows_succeeded'] += 1
else:
results['workflows_failed'] += 1
# Stop if workflow has stop_on_match=true
if workflow.get('stop_on_match') and result['status'] == 'completed':
logger.info(f"🛑 Stopping workflow chain (stop_on_match=true)")
break
logger.info(f"✅ Workflow execution complete: {results['workflows_succeeded']}/{results['workflows_executed']} succeeded")
return results
async def _handle_bankruptcy_analysis(self, email_data: Dict) -> Dict:
"""
System workflow for bankruptcy emails (Statstidende).
Parses body for CVR numbers and links to customer if match found.
Returns: Execution result dict
"""
logger.info("🕵️ Running Bankruptcy Analysis on email")
# Combine subject, body and html for search
text_content = (
f"{email_data.get('subject', '')} "
f"{email_data.get('body_text', '')} "
f"{email_data.get('body_html', '')}"
)
# Regex for CVR numbers (8 digits, possibly preceded by 'CVR-nr.:')
# We look for explicit 'CVR-nr.: XXXXXXXX' pattern first as it's more reliable
cvr_matches = re.findall(r'CVR-nr\.?:?\s*(\d{8})', text_content, re.IGNORECASE)
if not cvr_matches:
logger.info("✅ No CVR numbers found in bankruptcy email")
return {'status': 'skipped', 'reason': 'no_cvr_found'}
unique_cvrs = list(set(cvr_matches))
logger.info(f"📋 Found CVRs in email: {unique_cvrs}")
if not unique_cvrs:
return {'status': 'skipped', 'reason': 'no_unique_cvr'}
# Check if any CVRs belong to our customers
# Safe parameterized query for variable list length
format_strings = ','.join(['%s'] * len(unique_cvrs))
query = f"""
SELECT id, name, cvr_number
FROM customers
WHERE cvr_number IN ({format_strings})
"""
matching_customers = execute_query(query, tuple(unique_cvrs))
if not matching_customers:
logger.info("✅ No matching customers found for bankruptcy CVRs - Marking as processed")
execute_update(
"""UPDATE email_messages
SET status = 'processed', folder = 'Processed',
processed_at = CURRENT_TIMESTAMP, auto_processed = true
WHERE id = %s""",
(email_data['id'],)
)
return {'status': 'completed', 'action': 'marked_processed_no_match'}
logger.warning(f"⚠️ FOUND BANKRUPTCY MATCHES: {[c['name'] for c in matching_customers]}")
# Link to the first customer found (limitation of 1:1 schema)
first_match = matching_customers[0]
execute_update(
"""UPDATE email_messages
SET customer_id = %s, status = 'processed', folder = 'Processed',
processed_at = CURRENT_TIMESTAMP, auto_processed = true
WHERE id = %s""",
(first_match['id'], email_data['id'])
)
logger.info(f"🔗 Linked bankruptcy email {email_data['id']} to customer {first_match['name']} ({first_match['id']}) and marked as processed")
if len(matching_customers) > 1:
logger.warning(f"❗ Email contained multiple customer matches! Only linked to first one.")
return {
'status': 'completed',
'action': 'linked_customer',
'customer_name': first_match['name']
}
2026-03-02 23:58:56 +01:00
def _extract_sender_domain(self, email_data: Dict) -> Optional[str]:
sender_email = (email_data.get('sender_email') or '').strip().lower()
if '@' not in sender_email:
return None
domain = sender_email.split('@', 1)[1].strip()
if domain.startswith('www.'):
domain = domain[4:]
return domain or None
def has_helpdesk_routing_hint(self, email_data: Dict) -> bool:
"""Return True when email has explicit routing hints (SAG tag, BMCid, or reply headers).
NOTE: A bare thread_key (Graph conversationId) is NOT a routing hint
because every Graph email has one, including newsletters and spam.
Only actual reply indicators (In-Reply-To, References), explicit
SAG tags, or BMCid markers count as hints."""
if self._extract_bmc_id(email_data):
return True
if self._extract_sag_id(email_data):
return True
if self._normalize_message_id(email_data.get('in_reply_to')):
return True
if self._extract_reference_message_ids(email_data.get('email_references')):
return True
return False
def _extract_bmc_id(self, email_data: Dict) -> Optional[Dict[str, Any]]:
"""Extract structured BMCid from email body/subject.
Returns dict with 'sag_id' (int) and 'thread_suffix' (str, e.g. '472193')
or None if no BMCid is found.
"""
candidates = [
email_data.get('body_html') or '',
email_data.get('body_text') or '',
email_data.get('subject') or '',
]
pattern = r'\bBMCid\s*:\s*s(\d+)t(\d+)\b'
for value in candidates:
match = re.search(pattern, value, re.IGNORECASE)
if match:
return {
'sag_id': int(match.group(1)),
'thread_suffix': match.group(2),
}
return None
2026-03-02 23:58:56 +01:00
def _extract_sag_id(self, email_data: Dict) -> Optional[int]:
# First try structured BMCid (most reliable)
bmc_id = self._extract_bmc_id(email_data)
if bmc_id:
return bmc_id['sag_id']
2026-03-02 23:58:56 +01:00
candidates = [
email_data.get('subject') or '',
email_data.get('in_reply_to') or '',
email_data.get('email_references') or '',
email_data.get('body_text') or '',
email_data.get('body_html') or '',
]
# Accept both strict and human variants used in real subjects, e.g.:
# - [SAG-53] (hidden/subject prefix)
# - SAG-53
# - SAG #53
# - Sag 53
sag_patterns = [
r'\[SAG-(\d+)\]',
r'\bSAG-(\d+)\b',
r'\bSAG\s*#\s*(\d+)\b',
r'\bSAG\s+(\d+)\b',
2026-03-02 23:58:56 +01:00
]
for value in candidates:
for pattern in sag_patterns:
match = re.search(pattern, value, re.IGNORECASE)
if match:
return int(match.group(1))
2026-03-02 23:58:56 +01:00
return None
def _normalize_message_id(self, value: Optional[str]) -> Optional[str]:
if not value:
return None
normalized = re.sub(r'[<>\s]', '', str(value)).lower().strip()
return normalized or None
def _extract_thread_message_ids(self, email_data: Dict) -> List[str]:
tokens: List[str] = []
in_reply_to = self._normalize_message_id(email_data.get('in_reply_to'))
if in_reply_to:
tokens.append(in_reply_to)
raw_references = (email_data.get('email_references') or '').strip()
if raw_references:
for ref in re.split(r'[\s,]+', raw_references):
normalized_ref = self._normalize_message_id(ref)
if normalized_ref:
tokens.append(normalized_ref)
# De-duplicate while preserving order
return list(dict.fromkeys(tokens))
def _extract_reference_message_ids(self, raw_references: Optional[str]) -> List[str]:
tokens: List[str] = []
if raw_references:
for ref in re.split(r'[\s,]+', str(raw_references).strip()):
normalized_ref = self._normalize_message_id(ref)
if normalized_ref:
tokens.append(normalized_ref)
return list(dict.fromkeys(tokens))
def _derive_thread_key(self, email_data: Dict) -> Optional[str]:
"""Derive stable conversation key: root References -> In-Reply-To -> explicit -> Message-ID."""
ref_ids = self._extract_reference_message_ids(email_data.get('email_references'))
if ref_ids:
return ref_ids[0]
in_reply_to = self._normalize_message_id(email_data.get('in_reply_to'))
if in_reply_to:
return in_reply_to
explicit = self._normalize_message_id(email_data.get('thread_key'))
if explicit:
return explicit
return self._normalize_message_id(email_data.get('message_id'))
def _find_sag_id_from_thread_key(self, thread_key: Optional[str]) -> Optional[int]:
if not thread_key:
return None
# Backward compatibility when DB migration is not yet applied.
try:
rows = execute_query(
"""
SELECT se.sag_id
FROM sag_emails se
JOIN email_messages em ON em.id = se.email_id
WHERE em.deleted_at IS NULL
AND (
LOWER(REGEXP_REPLACE(COALESCE(em.thread_key, ''), '[<>\\s]', '', 'g')) = %s
OR LOWER(REGEXP_REPLACE(COALESCE(em.message_id, ''), '[<>\\s]', '', 'g')) = %s
)
ORDER BY se.created_at DESC
LIMIT 1
""",
(thread_key, thread_key)
)
return rows[0]['sag_id'] if rows else None
except Exception:
return None
def _find_sag_id_from_thread_headers(self, email_data: Dict) -> Optional[int]:
thread_message_ids = self._extract_thread_message_ids(email_data)
if not thread_message_ids:
return None
placeholders = ','.join(['%s'] * len(thread_message_ids))
rows = execute_query(
f"""
SELECT se.sag_id
FROM sag_emails se
JOIN email_messages em ON em.id = se.email_id
WHERE em.deleted_at IS NULL
AND LOWER(REGEXP_REPLACE(COALESCE(em.message_id, ''), '[<>\\s]', '', 'g')) IN ({placeholders})
ORDER BY se.created_at DESC
LIMIT 1
""",
tuple(thread_message_ids)
)
return rows[0]['sag_id'] if rows else None
# Sender domains that should never trigger customer-domain SAG creation.
# Includes own sending domain and common automated senders.
_IGNORED_SENDER_DOMAINS = {
'bmcnetworks.dk',
'bmchub.local',
}
2026-03-02 23:58:56 +01:00
def _find_customer_by_domain(self, domain: str) -> Optional[Dict[str, Any]]:
if not domain:
return None
domain = domain.lower().strip()
# Never match the system's own sending domain as a customer
if domain in self._IGNORED_SENDER_DOMAINS:
return None
2026-03-02 23:58:56 +01:00
domain_alt = domain[4:] if domain.startswith('www.') else f"www.{domain}"
query = """
SELECT id, name
FROM customers
WHERE is_active = true
AND (
LOWER(TRIM(email_domain)) = %s
OR LOWER(TRIM(email_domain)) = %s
)
ORDER BY id ASC
LIMIT 1
"""
rows = execute_query(query, (domain, domain_alt))
return rows[0] if rows else None
def _find_thread_key_by_bmc_suffix(self, sag_id: int, thread_suffix: str) -> Optional[str]:
"""Find the thread_key of an outgoing email whose BMCid matches s{sag_id}t{thread_suffix}."""
try:
# Legacy compatibility: older outbound emails used t001 when the
# provisional thread key was unknown. In that case, pick the most
# recent outbound thread key in the same case as best effort.
if str(thread_suffix) == '001':
fallback = execute_query(
"""
SELECT em.thread_key
FROM sag_emails se
JOIN email_messages em ON em.id = se.email_id
WHERE se.sag_id = %s
AND em.deleted_at IS NULL
AND em.thread_key IS NOT NULL
AND TRIM(em.thread_key) != ''
AND LOWER(COALESCE(em.sender_email, '')) = %s
ORDER BY em.received_date DESC
LIMIT 1
""",
(sag_id, 'noreply@bmcnetworks.dk'),
)
if fallback and fallback[0].get('thread_key'):
return fallback[0]['thread_key']
rows = execute_query(
"""
SELECT em.thread_key
FROM sag_emails se
JOIN email_messages em ON em.id = se.email_id
WHERE se.sag_id = %s
AND em.deleted_at IS NULL
AND em.thread_key IS NOT NULL
AND TRIM(em.thread_key) != ''
ORDER BY em.received_date DESC
""",
(sag_id,),
)
if not rows:
return None
# Rebuild the BMCid suffix for each candidate thread_key
# and return the one that matches our target suffix.
for row in rows:
tk = row['thread_key']
normalized = re.sub(r"[^a-z0-9]+", "", str(tk).lower())
if not normalized:
continue
digest = hashlib.sha1(normalized.encode("utf-8")).hexdigest()
candidate_suffix = str((int(digest[:8], 16) % 900000) + 100000)
if candidate_suffix == thread_suffix:
return tk
return None
except Exception as e:
logger.warning("⚠️ Failed BMCid thread_key lookup: %s", e)
return None
def _update_email_thread_key(self, email_id: int, thread_key: str) -> None:
"""Set the thread_key on an email so it groups correctly."""
execute_update(
"UPDATE email_messages SET thread_key = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s",
(thread_key, email_id),
)
async def _finalize_sag_routing(
self, email_id: int, email_data: Dict, sag_id: int, routing_source: str
) -> Dict[str, Any]:
"""Link an email to an existing SAG and mark as processed."""
case_rows = execute_query(
"SELECT id, customer_id, titel FROM sag_sager WHERE id = %s AND deleted_at IS NULL",
(sag_id,),
)
if not case_rows:
logger.warning("⚠️ Email %s referenced SAG-%s but case was not found", email_id, sag_id)
return {'status': 'skipped', 'action': 'sag_id_not_found', 'sag_id': sag_id}
case = case_rows[0]
self._add_helpdesk_comment(sag_id, email_data)
self._link_email_to_sag(sag_id, email_id)
execute_update(
"""
UPDATE email_messages
SET linked_case_id = %s,
customer_id = COALESCE(customer_id, %s),
status = 'processed',
folder = 'Processed',
processed_at = CURRENT_TIMESTAMP,
auto_processed = true
WHERE id = %s
""",
(sag_id, case.get('customer_id'), email_id),
)
token_for_attach = None
token_route = self._resolve_scan_token_route(email_id, email_data)
if token_route:
token_for_attach = token_route.get('token')
self._auto_attach_scanner_email(email_id, sag_id, token_for_attach)
return {
'status': 'completed',
'action': 'updated_existing_sag',
'sag_id': sag_id,
'customer_id': case.get('customer_id'),
'routing_source': routing_source,
}
2026-03-02 23:58:56 +01:00
def _link_email_to_sag(self, sag_id: int, email_id: int) -> None:
execute_update(
"""
INSERT INTO sag_emails (sag_id, email_id)
SELECT %s, %s
WHERE NOT EXISTS (
SELECT 1 FROM sag_emails WHERE sag_id = %s AND email_id = %s
)
""",
(sag_id, email_id, sag_id, email_id)
)
def _extract_scan_tokens(self, *values: Optional[str]) -> List[str]:
tokens: List[str] = []
for value in values:
if not value:
continue
found = self._SCAN_TOKEN_PATTERN.findall(str(value))
if found:
tokens.extend(token.upper() for token in found)
return list(dict.fromkeys(tokens))
def _resolve_scan_token_route(self, email_id: int, email_data: Dict) -> Optional[Dict[str, Any]]:
text_tokens = self._extract_scan_tokens(
email_data.get('subject'),
email_data.get('body_text'),
email_data.get('body_html'),
email_data.get('in_reply_to'),
email_data.get('email_references'),
)
filename_tokens: List[str] = []
attachment_content_tokens: List[str] = []
try:
attachment_rows = execute_query(
"""
SELECT filename, content_type, content_data, file_path
FROM email_attachments
WHERE email_id = %s
ORDER BY id ASC
""",
(email_id,),
) or []
for row in attachment_rows:
filename_tokens.extend(self._extract_scan_tokens(row.get('filename')))
attachment_content_tokens.extend(
self._extract_scan_tokens_from_attachment(
filename=row.get('filename'),
content_type=row.get('content_type'),
content_data=row.get('content_data'),
file_path=row.get('file_path'),
)
)
except Exception as exc:
logger.warning("⚠️ Failed to inspect attachment filenames for scan token: %s", exc)
all_tokens = list(dict.fromkeys(text_tokens + filename_tokens + attachment_content_tokens))
if not all_tokens:
return self._resolve_scan_route_from_scanner_headers(email_data)
placeholders = ','.join(['%s'] * len(all_tokens))
try:
rows = execute_query(
f"""
SELECT token, sag_id, token_type
FROM sag_document_tokens
WHERE token IN ({placeholders})
AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)
ORDER BY consumed_at IS NULL DESC, created_at DESC
LIMIT 1
""",
tuple(all_tokens),
)
if rows:
return rows[0]
# Fallback for scanner workflows where token only exists in barcode image
# and therefore not in plain text metadata.
return self._resolve_scan_route_from_scanner_headers(email_data)
except Exception as exc:
logger.warning("⚠️ Scan token lookup failed: %s", exc)
return self._resolve_scan_route_from_scanner_headers(email_data)
def _extract_scan_tokens_from_attachment(
self,
filename: Optional[str],
content_type: Optional[str],
content_data: Optional[Any],
file_path: Optional[str],
) -> List[str]:
tokens: List[str] = []
payload: Optional[bytes] = None
if content_data is not None:
try:
payload = bytes(content_data)
except Exception:
payload = None
if payload is None and file_path:
try:
payload = Path(file_path).read_bytes()
except Exception:
payload = None
if not payload:
return tokens
# 1) Cheap text extraction directly from bytes catches tokens in OCR-layer PDFs,
# plain text files, or metadata-rich attachments.
try:
sample = payload[:1_500_000]
tokens.extend(self._extract_scan_tokens(sample.decode('utf-8', errors='ignore')))
tokens.extend(self._extract_scan_tokens(sample.decode('latin-1', errors='ignore')))
except Exception:
pass
ext = (Path(str(filename or '')).suffix or '').lower().strip('.')
ctype = (content_type or '').lower()
# 2) PDF text-layer extraction (when available) for scanned documents with OCR.
if ext == 'pdf' or 'pdf' in ctype:
try:
from pypdf import PdfReader # type: ignore
reader = PdfReader(io.BytesIO(payload))
text_chunks: List[str] = []
for page in reader.pages[:5]:
extracted = page.extract_text() or ''
if extracted:
text_chunks.append(extracted)
if text_chunks:
tokens.extend(self._extract_scan_tokens("\n".join(text_chunks)))
except Exception:
pass
# 3) Decode barcode directly from scanned attachments.
# This catches cases where BMCSCAN exists only as a barcode image.
try:
if ext == 'pdf' or 'pdf' in ctype:
tokens.extend(self._extract_scan_tokens_from_pdf_barcode(payload))
else:
tokens.extend(self._extract_scan_tokens_from_image_barcode(payload))
except Exception:
pass
return list(dict.fromkeys(token.upper() for token in tokens if token))
def _extract_scan_tokens_from_image_barcode(self, payload: bytes) -> List[str]:
try:
from PIL import Image # type: ignore
from pyzbar.pyzbar import decode as zbar_decode # type: ignore
except Exception:
return []
try:
image = Image.open(io.BytesIO(payload))
except Exception:
return []
decoded_tokens: List[str] = []
variants = [image]
try:
variants.append(image.convert('L'))
variants.append(image.convert('L').point(lambda p: 255 if p > 140 else 0))
except Exception:
pass
for variant in variants:
try:
for item in zbar_decode(variant):
raw = item.data.decode('utf-8', errors='ignore')
decoded_tokens.extend(self._extract_scan_tokens(raw))
except Exception:
continue
return list(dict.fromkeys(decoded_tokens))
def _extract_scan_tokens_from_pdf_barcode(self, payload: bytes) -> List[str]:
try:
import pypdfium2 as pdfium # type: ignore
from pyzbar.pyzbar import decode as zbar_decode # type: ignore
except Exception:
return []
decoded_tokens: List[str] = []
try:
doc = pdfium.PdfDocument(io.BytesIO(payload))
except Exception:
return []
page_count = min(len(doc), 3)
for page_index in range(page_count):
page = None
try:
page = doc.get_page(page_index)
bitmap = page.render(scale=2.2)
pil_image = bitmap.to_pil()
for variant in (pil_image, pil_image.convert('L')):
for item in zbar_decode(variant):
raw = item.data.decode('utf-8', errors='ignore')
decoded_tokens.extend(self._extract_scan_tokens(raw))
except Exception:
continue
finally:
try:
if page is not None:
page.close()
except Exception:
pass
return list(dict.fromkeys(decoded_tokens))
def _resolve_scan_route_from_scanner_headers(self, email_data: Dict) -> Optional[Dict[str, Any]]:
"""Infer case route from scanner-generated message-id timestamps.
Some scanner/MFP flows only include the barcode token inside the attached image/PDF,
while headers contain a timestamped local message-id such as
`<1.20260401075731@172.16.31.35>`. We map that timestamp to the nearest recent,
unconsumed document token.
"""
header_values = [
email_data.get('in_reply_to'),
email_data.get('email_references'),
email_data.get('message_id'),
email_data.get('thread_key'),
]
candidates: List[datetime] = []
ts_pattern = re.compile(r'(20\d{12})')
for raw in header_values:
if not raw:
continue
for match in ts_pattern.findall(str(raw)):
try:
candidates.append(datetime.strptime(match, "%Y%m%d%H%M%S"))
except ValueError:
continue
if not candidates:
return None
for ts in candidates:
try:
rows = execute_query(
"""
SELECT token, sag_id, token_type, created_at
FROM sag_document_tokens
WHERE consumed_at IS NULL
AND created_at BETWEEN %s::timestamp - INTERVAL '90 minutes'
AND %s::timestamp + INTERVAL '20 minutes'
ORDER BY ABS(EXTRACT(EPOCH FROM (created_at - %s::timestamp))) ASC,
CASE WHEN token_type = 'work_order' THEN 0 ELSE 1 END,
id DESC
LIMIT 1
""",
(ts, ts, ts),
) or []
if rows:
row = rows[0]
logger.info(
"🔎 Inferred scanner route via header timestamp %s -> SAG-%s (%s)",
ts.isoformat(),
row.get('sag_id'),
row.get('token'),
)
return {
'token': row.get('token'),
'sag_id': row.get('sag_id'),
'token_type': row.get('token_type'),
}
except Exception as exc:
logger.warning("⚠️ Scanner header timestamp route lookup failed: %s", exc)
return None
def _copy_email_attachments_to_case(self, email_id: int, sag_id: int, source_token: Optional[str]) -> int:
attachments = execute_query(
"""
SELECT filename, content_type, size_bytes, file_path, content_data
FROM email_attachments
WHERE email_id = %s
ORDER BY id ASC
""",
(email_id,),
) or []
if not attachments:
return 0
upload_base = Path(settings.UPLOAD_DIR).resolve()
(upload_base / "sag_files").mkdir(parents=True, exist_ok=True)
has_source_email = table_has_column("sag_files", "source_email_id")
has_source_type = table_has_column("sag_files", "source_type")
has_source_token = table_has_column("sag_files", "source_token")
copied = 0
for attachment in attachments:
filename = Path(attachment.get('filename') or 'scanned-document.bin').name
if has_source_email:
existing = execute_query(
"""
SELECT 1
FROM sag_files
WHERE sag_id = %s
AND source_email_id = %s
AND filename = %s
LIMIT 1
""",
(sag_id, email_id, filename),
) or []
if existing:
continue
payload = attachment.get('content_data')
if payload is None and attachment.get('file_path'):
try:
payload = Path(attachment['file_path']).read_bytes()
except Exception as exc:
logger.warning("⚠️ Could not read attachment file (%s): %s", filename, exc)
continue
if payload is None:
continue
raw_payload = bytes(payload)
stored_name = f"sag_files/{uuid4().hex}_{filename}"
target_path = upload_base / stored_name
try:
target_path.write_bytes(raw_payload)
except Exception as exc:
logger.warning("⚠️ Could not write case file from attachment (%s): %s", filename, exc)
continue
columns = ["sag_id", "filename", "content_type", "size_bytes", "stored_name"]
values: List[Any] = [
sag_id,
filename,
attachment.get('content_type') or 'application/octet-stream',
attachment.get('size_bytes') or len(raw_payload),
stored_name,
]
if has_source_email:
columns.append("source_email_id")
values.append(email_id)
if has_source_type:
columns.append("source_type")
values.append("scanner_email")
if has_source_token:
columns.append("source_token")
values.append(source_token)
execute_query(
f"INSERT INTO sag_files ({', '.join(columns)}) VALUES ({', '.join(['%s'] * len(values))})",
tuple(values),
)
copied += 1
return copied
def _auto_attach_scanner_email(self, email_id: int, sag_id: int, token: Optional[str]) -> None:
try:
copied = self._copy_email_attachments_to_case(email_id, sag_id, token)
if copied > 0:
logger.info("📎 Auto-attached %s attachment(s) from email %s to SAG-%s", copied, email_id, sag_id)
if token:
execute_update(
"""
UPDATE sag_document_tokens
SET consumed_at = COALESCE(consumed_at, CURRENT_TIMESTAMP),
consumed_email_id = COALESCE(consumed_email_id, %s)
WHERE token = %s
""",
(email_id, token),
)
except Exception as exc:
logger.warning("⚠️ Scanner auto-attach failed for email %s: %s", email_id, exc)
def _strip_quoted_email_text(self, body_text: str) -> str:
"""Return only the newest reply content (remove quoted history/signatures)."""
if not body_text:
return ""
text = str(body_text).replace("\r\n", "\n").replace("\r", "\n")
lines = text.split("\n")
cleaned_lines: List[str] = []
header_marker_re = re.compile(r'^(fra|from|sent|date|dato|to|til|emne|subject|cc):\s*', re.IGNORECASE)
original_message_re = re.compile(r'^(original message|oprindelig besked|videresendt besked)', re.IGNORECASE)
for idx, line in enumerate(lines):
stripped = line.strip()
lowered = stripped.lower()
if stripped.startswith('>'):
break
if original_message_re.match(stripped):
break
# Typical separator before quoted headers (e.g. "---" / "_____" lines)
if re.match(r'^[-_]{3,}$', stripped):
lookahead = lines[idx + 1: idx + 4]
if any(header_marker_re.match(candidate.strip()) for candidate in lookahead):
break
if idx > 0 and header_marker_re.match(stripped):
if lines[idx - 1].strip() == "":
break
cleaned_lines.append(line)
while cleaned_lines and cleaned_lines[-1].strip() == "":
cleaned_lines.pop()
return "\n".join(cleaned_lines).strip()
2026-03-02 23:58:56 +01:00
def _add_helpdesk_comment(self, sag_id: int, email_data: Dict) -> None:
email_id = email_data.get('id')
2026-03-02 23:58:56 +01:00
sender = email_data.get('sender_email') or 'ukendt'
subject = email_data.get('subject') or '(ingen emne)'
received = email_data.get('received_date')
received_str = received.isoformat() if hasattr(received, 'isoformat') else str(received or '')
body_text = self._strip_quoted_email_text((email_data.get('body_text') or '').strip())
email_meta_line = f"Email-ID: {email_id}\n" if email_id else ""
2026-03-02 23:58:56 +01:00
comment = (
f"📧 Indgående email\n"
f"{email_meta_line}"
2026-03-02 23:58:56 +01:00
f"Fra: {sender}\n"
f"Emne: {subject}\n"
f"Modtaget: {received_str}\n\n"
f"{body_text}"
)
execute_update(
"""
INSERT INTO sag_kommentarer (sag_id, forfatter, indhold, er_system_besked)
VALUES (%s, %s, %s, %s)
""",
(sag_id, 'Email Bot', comment, True)
)
def _create_sag_from_email(self, email_data: Dict, customer_id: int) -> Dict[str, Any]:
sender = email_data.get('sender_email') or 'ukendt'
subject = (email_data.get('subject') or '').strip() or f"Email fra {sender}"
description = (
f"Auto-oprettet fra email\n"
f"Fra: {sender}\n"
f"Message-ID: {email_data.get('message_id') or ''}\n\n"
f"{(email_data.get('body_text') or '').strip()}"
)
rows = execute_query(
"""
INSERT INTO sag_sager (
titel, beskrivelse, template_key, status, customer_id, created_by_user_id
)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id, titel, customer_id
""",
(subject, description, 'ticket', 'åben', customer_id, 1)
)
if not rows:
raise ValueError('Failed to create SAG from email')
return rows[0]
async def _handle_helpdesk_sag_routing(self, email_data: Dict) -> Optional[Dict[str, Any]]:
email_id = email_data.get('id')
if not email_id:
return None
derived_thread_key = self._derive_thread_key(email_data)
sag_id_from_thread_key = self._find_sag_id_from_thread_key(derived_thread_key)
sag_id_from_thread = self._find_sag_id_from_thread_headers(email_data)
sag_id_from_tag = self._extract_sag_id(email_data)
scan_token_route = self._resolve_scan_token_route(email_id, email_data)
if scan_token_route and scan_token_route.get('sag_id'):
matched_sag_id = int(scan_token_route['sag_id'])
logger.info("🔎 Scan token matched email %s to SAG-%s", email_id, matched_sag_id)
return await self._finalize_sag_routing(email_id, email_data, matched_sag_id, 'scan_token')
# Priority 0: BMCid is the most reliable signal — it's our own hidden
# marker embedded in every outgoing case email. When present, it
# provides the sag_id directly and the thread_suffix lets us adopt
# the correct thread_key for multi-thread SAGs.
bmc_id = self._extract_bmc_id(email_data)
if bmc_id:
bmc_sag_id = bmc_id['sag_id']
bmc_thread_suffix = bmc_id['thread_suffix']
# Look up the thread_key of the outgoing email whose BMCid matches
bmc_thread_key = self._find_thread_key_by_bmc_suffix(bmc_sag_id, bmc_thread_suffix)
if bmc_thread_key:
# Adopt the outgoing email's thread_key so reply groups correctly
self._update_email_thread_key(email_id, bmc_thread_key)
logger.info(
"🔖 BMCid s%st%s matched → SAG-%s (thread_key=%s)",
bmc_sag_id, bmc_thread_suffix, bmc_sag_id, bmc_thread_key,
)
sag_id = bmc_sag_id
routing_source = 'bmc_id'
# Skip the remaining priority chain — BMCid is authoritative
return await self._finalize_sag_routing(email_id, email_data, sag_id, routing_source)
# Fallback: try the explicit provider thread key (e.g. Graph conversationId)
# separately when the derived key (References[0]) differs from it.
provider_thread_key = self._normalize_message_id(email_data.get('thread_key'))
sag_id_from_provider = None
if provider_thread_key and provider_thread_key != derived_thread_key:
sag_id_from_provider = self._find_sag_id_from_thread_key(provider_thread_key)
routing_source = None
sag_id = None
if sag_id_from_thread_key:
sag_id = sag_id_from_thread_key
routing_source = 'thread_key'
logger.info("🧵 Matched email %s to SAG-%s via thread key", email_id, sag_id)
if sag_id_from_thread:
if sag_id and sag_id != sag_id_from_thread:
logger.warning(
"⚠️ Email %s has conflicting thread matches (thread_key: SAG-%s, headers: SAG-%s). Using thread_key.",
email_id,
sag_id,
sag_id_from_thread,
)
elif not sag_id:
sag_id = sag_id_from_thread
routing_source = 'thread_headers'
logger.info("🔗 Matched email %s to SAG-%s via thread headers", email_id, sag_id)
2026-03-02 23:58:56 +01:00
if sag_id_from_provider and not sag_id:
sag_id = sag_id_from_provider
routing_source = 'provider_thread_key'
logger.info("🧵 Matched email %s to SAG-%s via provider thread key (conversationId)", email_id, sag_id)
if sag_id_from_tag:
if sag_id and sag_id != sag_id_from_tag:
logger.warning(
"⚠️ Email %s contains conflicting case hints (thread: SAG-%s, tag: SAG-%s). Using thread match.",
email_id,
sag_id,
sag_id_from_tag
)
elif not sag_id:
sag_id = sag_id_from_tag
routing_source = 'sag_tag'
logger.info("🏷️ Matched email %s to SAG-%s via SAG tag", email_id, sag_id)
2026-03-02 23:58:56 +01:00
# 1) Existing SAG via subject/headers
if sag_id:
return await self._finalize_sag_routing(email_id, email_data, sag_id, routing_source)
2026-03-02 23:58:56 +01:00
if not getattr(settings, 'EMAIL_AUTO_CREATE_CASES_FROM_EMAIL', False):
logger.info(
"⏭️ Email %s did not match existing SAG and auto-create is disabled",
email_id,
)
return {
'status': 'skipped',
'action': 'auto_create_disabled',
}
2026-03-02 23:58:56 +01:00
# 2) No SAG id -> create only if sender domain belongs to known customer
sender_domain = self._extract_sender_domain(email_data)
customer = self._find_customer_by_domain(sender_domain) if sender_domain else None
if not customer:
logger.info("⏭️ Email %s has no known customer domain (%s) - kept in /emails", email_id, sender_domain)
return {'status': 'skipped', 'action': 'unknown_customer_domain', 'domain': sender_domain}
case = self._create_sag_from_email(email_data, customer['id'])
self._add_helpdesk_comment(case['id'], email_data)
self._link_email_to_sag(case['id'], email_id)
execute_update(
"""
UPDATE email_messages
SET linked_case_id = %s,
customer_id = %s,
status = 'processed',
folder = 'Processed',
processed_at = CURRENT_TIMESTAMP,
auto_processed = true
WHERE id = %s
""",
(case['id'], customer['id'], email_id)
)
self._auto_attach_scanner_email(email_id, case['id'], None)
2026-03-02 23:58:56 +01:00
logger.info("✅ Created SAG-%s from email %s for customer %s", case['id'], email_id, customer['id'])
return {
'status': 'completed',
'action': 'created_new_sag',
'sag_id': case['id'],
'customer_id': customer['id'],
'domain': sender_domain,
'routing_source': 'customer_domain'
2026-03-02 23:58:56 +01:00
}
async def _find_matching_workflows(self, email_data: Dict) -> List[Dict]:
"""Find all workflows that match this email"""
classification = email_data.get('classification')
confidence = email_data.get('confidence_score', 0.0)
sender = email_data.get('sender_email', '')
subject = email_data.get('subject', '')
query = """
SELECT id, name, classification_trigger, sender_pattern, subject_pattern,
confidence_threshold, workflow_steps, priority, stop_on_match
FROM email_workflows
WHERE enabled = true
AND classification_trigger = %s
AND confidence_threshold <= %s
ORDER BY priority ASC
"""
workflows = execute_query(query, (classification, confidence))
# Filter by additional patterns
matching = []
for wf in workflows:
# Check sender pattern
if wf.get('sender_pattern'):
pattern = wf['sender_pattern']
if not re.search(pattern, sender, re.IGNORECASE):
logger.debug(f"⏭️ Workflow '{wf['name']}' skipped: sender doesn't match pattern")
continue
# Check subject pattern
if wf.get('subject_pattern'):
pattern = wf['subject_pattern']
if not re.search(pattern, subject, re.IGNORECASE):
logger.debug(f"⏭️ Workflow '{wf['name']}' skipped: subject doesn't match pattern")
continue
matching.append(wf)
return matching
async def _execute_workflow(self, workflow: Dict, email_data: Dict) -> Dict:
"""Execute a single workflow"""
workflow_id = workflow['id']
workflow_name = workflow['name']
email_id = email_data['id']
logger.info(f"🚀 Executing workflow: {workflow_name} (ID: {workflow_id})")
# Create execution record
execution_id = execute_insert(
"""INSERT INTO email_workflow_executions
(workflow_id, email_id, status, steps_total, result_json)
VALUES (%s, %s, 'running', %s, %s) RETURNING id""",
(workflow_id, email_id, len(workflow['workflow_steps']), json.dumps({}))
)
started_at = datetime.now()
steps = workflow['workflow_steps']
steps_completed = 0
step_results = []
try:
# Execute each step
for idx, step in enumerate(steps):
action = step.get('action')
params = step.get('params', {})
logger.info(f" ➡️ Step {idx + 1}/{len(steps)}: {action}")
step_result = await self._execute_action(action, params, email_data)
step_results.append({
'step': idx + 1,
'action': action,
'status': step_result['status'],
'result': step_result.get('result'),
'error': step_result.get('error')
})
if step_result['status'] == 'failed':
logger.error(f" ❌ Step failed: {step_result.get('error')}")
# Continue to next step even on failure (configurable later)
else:
logger.info(f" ✅ Step completed successfully")
steps_completed += 1
# Mark execution as completed
completed_at = datetime.now()
execution_time_ms = int((completed_at - started_at).total_seconds() * 1000)
execute_update(
"""UPDATE email_workflow_executions
SET status = 'completed', steps_completed = %s,
result_json = %s, completed_at = CURRENT_TIMESTAMP,
execution_time_ms = %s
WHERE id = %s""",
(steps_completed, json.dumps(step_results), execution_time_ms, execution_id)
)
# Update workflow statistics
execute_update(
"""UPDATE email_workflows
SET execution_count = execution_count + 1,
success_count = success_count + 1,
last_executed_at = CURRENT_TIMESTAMP
WHERE id = %s""",
(workflow_id,)
)
logger.info(f"✅ Workflow '{workflow_name}' completed ({execution_time_ms}ms)")
# Log: Workflow execution completed
await email_activity_logger.log_workflow_executed(
email_id=email_id,
workflow_id=workflow_id,
workflow_name=workflow_name,
status='completed',
steps_completed=steps_completed,
execution_time_ms=execution_time_ms
)
return {
'workflow_id': workflow_id,
'workflow_name': workflow_name,
'execution_id': execution_id,
'status': 'completed',
'steps_completed': steps_completed,
'steps_total': len(steps),
'execution_time_ms': execution_time_ms,
'step_results': step_results
}
except Exception as e:
logger.error(f"❌ Workflow execution failed: {e}")
# Mark execution as failed
execute_update(
"""UPDATE email_workflow_executions
SET status = 'failed', steps_completed = %s,
error_message = %s, completed_at = CURRENT_TIMESTAMP
WHERE id = %s""",
(steps_completed, str(e), execution_id)
)
# Update workflow statistics
execute_update(
"""UPDATE email_workflows
SET execution_count = execution_count + 1,
failure_count = failure_count + 1,
last_executed_at = CURRENT_TIMESTAMP
WHERE id = %s""",
(workflow_id,)
)
# Log: Workflow execution failed
await email_activity_logger.log_workflow_executed(
email_id=email_id,
workflow_id=workflow_id,
workflow_name=workflow_name,
status='failed',
steps_completed=steps_completed,
execution_time_ms=0
)
return {
'workflow_id': workflow_id,
'workflow_name': workflow_name,
'execution_id': execution_id,
'status': 'failed',
'steps_completed': steps_completed,
'steps_total': len(steps),
'error': str(e)
}
async def _execute_action(self, action: str, params: Dict, email_data: Dict) -> Dict:
"""Execute a single workflow action"""
try:
# Dispatch to specific action handler
handler_map = {
'create_ticket': self._action_create_ticket_system,
'link_email_to_ticket': self._action_link_email_to_ticket,
2026-03-02 23:58:56 +01:00
'route_helpdesk_sag': self._handle_helpdesk_sag_routing,
'create_time_entry': self._action_create_time_entry,
'link_to_vendor': self._action_link_to_vendor,
'link_to_customer': self._action_link_to_customer,
'extract_invoice_data': self._action_extract_invoice_data,
'extract_tracking_number': self._action_extract_tracking_number,
'regex_extract_and_link': self._action_regex_extract_and_link,
'send_slack_notification': self._action_send_slack_notification,
'send_email_notification': self._action_send_email_notification,
'mark_as_processed': self._action_mark_as_processed,
'flag_for_review': self._action_flag_for_review,
}
handler = handler_map.get(action)
if not handler:
logger.warning(f"⚠️ Unknown action: {action}")
return {
'status': 'skipped',
'error': f'Unknown action: {action}'
}
result = await handler(params, email_data)
return {
'status': 'success',
'result': result
}
except Exception as e:
logger.error(f"❌ Action '{action}' failed: {e}")
return {
'status': 'failed',
'error': str(e)
}
# Action Handlers
async def _action_regex_extract_and_link(self, params: Dict, email_data: Dict) -> Dict:
"""
Generic action to extract data via regex and link/update record
Params:
- regex_pattern: Pattern with one capture group (e.g. "CVR: (\d{8})")
- target_table: Table to search (e.g. "customers")
- target_column: Column to match value against (e.g. "cvr_number")
- link_column: Column in email_messages to update (e.g. "customer_id")
- value_column: Column in target table to retrieve (e.g. "id")
- on_match: "update_email" (default) or "none"
"""
regex_pattern = params.get('regex_pattern')
target_table = params.get('target_table')
target_column = params.get('target_column')
link_column = params.get('link_column', 'customer_id')
value_column = params.get('value_column', 'id')
if not all([regex_pattern, target_table, target_column]):
return {'status': 'failed', 'error': 'Missing required params: regex_pattern, target_table, target_column'}
# Combine text for search
text_content = (
f"{email_data.get('subject', '')} "
f"{email_data.get('body_text', '')} "
f"{email_data.get('body_html', '')}"
)
# 1. Run Regex
matches = re.findall(regex_pattern, text_content, re.IGNORECASE)
unique_matches = list(set(matches))
if not unique_matches:
return {'status': 'skipped', 'reason': 'no_regex_match', 'pattern': regex_pattern}
logger.info(f"🔍 Regex '{regex_pattern}' found matches: {unique_matches}")
# 2. Look up in Target Table
# Safety check: simplistic validation against SQL injection for table/column names is assumed
# (params should come from trustworthy configuration)
valid_tables = ['customers', 'vendors', 'users']
if target_table not in valid_tables:
return {'status': 'failed', 'error': f'Invalid target table: {target_table}'}
placeholders = ','.join(['%s'] * len(unique_matches))
query = f"SELECT {value_column}, {target_column} FROM {target_table} WHERE {target_column} IN ({placeholders})"
db_matches = execute_query(query, tuple(unique_matches))
if not db_matches:
return {'status': 'completed', 'action': 'no_db_match', 'found_values': unique_matches}
# 3. Link (Update Email)
match = db_matches[0] # Take first match
match_value = match[value_column]
if params.get('on_match', 'update_email') == 'update_email':
update_query = f"UPDATE email_messages SET {link_column} = %s WHERE id = %s"
execute_update(update_query, (match_value, email_data['id']))
logger.info(f"🔗 Linked email {email_data['id']} to {target_table}.{value_column}={match_value}")
return {'status': 'completed', 'action': 'linked', 'match_id': match_value}
return {'status': 'completed', 'action': 'found_only', 'match_id': match_value}
async def _action_create_ticket_system(self, params: Dict, email_data: Dict) -> Dict:
"""Create a ticket from email using new ticket system"""
from app.ticket.backend.email_integration import EmailTicketIntegration
# Build email_data dict for ticket integration
ticket_email_data = {
'message_id': email_data.get('message_id'),
'subject': email_data.get('subject'),
'from_address': email_data.get('sender_email'),
'body': email_data.get('body_text', ''),
'html_body': email_data.get('body_html'),
'received_at': email_data.get('received_date').isoformat() if email_data.get('received_date') else None,
2026-03-02 23:58:56 +01:00
'in_reply_to': email_data.get('in_reply_to'),
'references': email_data.get('email_references')
}
# Get params from workflow
customer_id = params.get('customer_id') or email_data.get('customer_id')
assigned_to_user_id = params.get('assigned_to_user_id')
logger.info(f"🎫 Creating ticket from email: {email_data.get('message_id')}")
result = await EmailTicketIntegration.process_email_for_ticket(
email_data=ticket_email_data,
customer_id=customer_id,
assigned_to_user_id=assigned_to_user_id
)
logger.info(f"✅ Created ticket {result.get('ticket_number')} from email")
return {
'action': 'create_ticket',
'ticket_id': result.get('ticket_id'),
'ticket_number': result.get('ticket_number'),
'created': result.get('created', False),
'linked': result.get('linked', False)
}
async def _action_link_email_to_ticket(self, params: Dict, email_data: Dict) -> Dict:
"""Link email to existing ticket"""
from app.ticket.backend.email_integration import EmailTicketIntegration
ticket_number = params.get('ticket_number')
if not ticket_number:
logger.warning("⚠️ No ticket_number provided for link_email_to_ticket action")
return {
'action': 'link_email_to_ticket',
'status': 'failed',
'error': 'ticket_number required'
}
ticket_email_data = {
'message_id': email_data.get('message_id'),
'subject': email_data.get('subject'),
'from_address': email_data.get('sender_email'),
'body': email_data.get('body_text', ''),
'html_body': email_data.get('body_html'),
'received_at': email_data.get('received_date').isoformat() if email_data.get('received_date') else None,
2026-03-02 23:58:56 +01:00
'in_reply_to': email_data.get('in_reply_to'),
'references': email_data.get('email_references')
}
logger.info(f"🔗 Linking email to ticket {ticket_number}")
result = await EmailTicketIntegration.link_email_to_ticket(
ticket_number=ticket_number,
email_data=ticket_email_data
)
logger.info(f"✅ Linked email to ticket {ticket_number}")
return {
'action': 'link_email_to_ticket',
'ticket_id': result.get('ticket_id'),
'ticket_number': result.get('ticket_number'),
'linked': True
}
async def _action_create_time_entry(self, params: Dict, email_data: Dict) -> Dict:
"""Create time entry from email"""
logger.info(f"⏱️ Would create time entry")
# TODO: Integrate with time tracking system
return {
'action': 'create_time_entry',
'note': 'Time entry creation not yet implemented'
}
async def _action_link_to_vendor(self, params: Dict, email_data: Dict) -> Dict:
"""Link email to vendor"""
match_by = params.get('match_by', 'email')
sender_email = email_data.get('sender_email')
if not sender_email:
return {'action': 'link_to_vendor', 'matched': False, 'reason': 'No sender email'}
# Find vendor by email
query = "SELECT id, name FROM vendors WHERE email = %s LIMIT 1"
result = execute_query(query, (sender_email,))
if result:
vendor_id = result['id']
# Check if already linked to avoid duplicate updates
result_vendor = execute_query(
"SELECT supplier_id FROM email_messages WHERE id = %s",
(email_data['id'],))
current_vendor = result_vendor[0] if result_vendor else None
if current_vendor and current_vendor.get('supplier_id') == vendor_id:
logger.info(f"⏭️ Email already linked to vendor {vendor_id}, skipping duplicate update")
return {
'action': 'link_to_vendor',
'matched': True,
'vendor_id': vendor_id,
'vendor_name': result['name'],
'note': 'Already linked (skipped duplicate)'
}
# Update email with vendor link
execute_update(
"UPDATE email_messages SET supplier_id = %s WHERE id = %s",
(vendor_id, email_data['id'])
)
logger.info(f"🔗 Linked email to vendor: {result['name']} (ID: {vendor_id})")
return {
'action': 'link_to_vendor',
'matched': True,
'vendor_id': vendor_id,
'vendor_name': result['name']
}
else:
logger.info(f"⚠️ No vendor found for email: {sender_email}")
return {'action': 'link_to_vendor', 'matched': False, 'reason': 'Vendor not found'}
async def _action_link_to_customer(self, params: Dict, email_data: Dict) -> Dict:
2026-03-02 23:58:56 +01:00
"""Link email to customer by sender domain and persist on email_messages"""
sender_domain = self._extract_sender_domain(email_data)
if not sender_domain:
return {'action': 'link_to_customer', 'matched': False, 'reason': 'No sender domain'}
customer = self._find_customer_by_domain(sender_domain)
if not customer:
return {
'action': 'link_to_customer',
'matched': False,
'reason': 'Customer not found for domain',
'domain': sender_domain
}
execute_update(
"UPDATE email_messages SET customer_id = %s WHERE id = %s",
(customer['id'], email_data['id'])
)
return {
'action': 'link_to_customer',
2026-03-02 23:58:56 +01:00
'matched': True,
'customer_id': customer['id'],
'customer_name': customer['name'],
'domain': sender_domain
}
async def _action_extract_invoice_data(self, params: Dict, email_data: Dict) -> Dict:
"""Save email PDF attachment to incoming_files for processing"""
logger.info(f"📄 Saving invoice PDF from email to incoming files")
email_id = email_data.get('id')
sender_email = email_data.get('sender_email', '')
vendor_id = email_data.get('supplier_id')
# Get PDF attachments from email
attachments = execute_query(
"""SELECT filename, file_path, size_bytes, content_type
FROM email_attachments
WHERE email_id = %s AND content_type = 'application/pdf'""",
(email_id,)
)
if not attachments:
attachments = []
elif not isinstance(attachments, list):
attachments = [attachments]
if not attachments:
logger.warning(f"⚠️ No PDF attachments found for email {email_id}")
return {
'action': 'extract_invoice_data',
'success': False,
'note': 'No PDF attachment found in email'
}
uploaded_files = []
for attachment in attachments:
try:
attachment_path = attachment['file_path']
if not attachment_path:
logger.warning(f"⚠️ No file path for attachment {attachment['filename']}")
continue
# Handle both absolute and relative paths
file_path = Path(attachment_path)
if not file_path.is_absolute():
# Try common base directories
for base in [Path.cwd(), Path('/app'), Path('.')]:
test_path = base / attachment_path
if test_path.exists():
file_path = test_path
break
if not file_path.exists():
error_msg = f"Attachment file not found: {attachment_path}"
logger.error(f"{error_msg}")
raise FileNotFoundError(error_msg)
# Old code continues here but never reached if file missing
if False and not file_path.exists():
logger.warning(f"⚠️ Attachment file not found: {attachment_path}")
continue
# Calculate checksum
with open(file_path, 'rb') as f:
file_content = f.read()
checksum = hashlib.sha256(file_content).hexdigest()
# Check if file already exists
existing = execute_query(
"SELECT file_id FROM incoming_files WHERE checksum = %s",
(checksum,))
if existing:
logger.info(f"⚠️ File already exists: {attachment['filename']}")
continue
# Create uploads directory if it doesn't exist
upload_dir = Path("uploads")
upload_dir.mkdir(exist_ok=True)
# Copy file to uploads directory
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
safe_filename = f"{timestamp}_{attachment['filename']}"
destination = upload_dir / safe_filename
shutil.copy2(file_path, destination)
# Insert into incoming_files
file_id = execute_insert(
"""INSERT INTO incoming_files
(filename, original_filename, file_path, file_size, mime_type, checksum,
status, detected_vendor_id, uploaded_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
RETURNING file_id""",
(
safe_filename,
attachment['filename'],
str(destination),
attachment['size_bytes'],
'application/pdf',
checksum,
'pending', # Will appear in "Mangler Behandling"
vendor_id
)
)
uploaded_files.append({
'file_id': file_id,
'filename': attachment['filename']
})
logger.info(f"✅ Saved PDF to incoming_files: {attachment['filename']} (file_id: {file_id})")
except Exception as e:
logger.error(f"❌ Failed to save attachment: {e}")
continue
if uploaded_files:
return {
'action': 'extract_invoice_data',
'success': True,
'files_uploaded': len(uploaded_files),
'file_ids': [f['file_id'] for f in uploaded_files],
'note': f"{len(uploaded_files)} PDF(er) gemt i 'Mangler Behandling'"
}
else:
return {
'action': 'extract_invoice_data',
'success': False,
'note': 'No files could be uploaded'
}
async def _action_extract_tracking_number(self, params: Dict, email_data: Dict) -> Dict:
"""Extract tracking number from email"""
body = email_data.get('body_text', '')
subject = email_data.get('subject', '')
text = f"{subject} {body}"
# Simple regex patterns for common carriers
patterns = {
'postnord': r'\b[0-9]{18}\b',
'gls': r'\b[0-9]{11}\b',
'dao': r'\b[0-9]{14}\b'
}
tracking_numbers = []
for carrier, pattern in patterns.items():
matches = re.findall(pattern, text)
if matches:
tracking_numbers.extend([{'carrier': carrier, 'number': m} for m in matches])
if tracking_numbers:
logger.info(f"📦 Extracted {len(tracking_numbers)} tracking number(s)")
# Update email with tracking number
if tracking_numbers:
first_number = tracking_numbers[0]['number']
execute_update(
"UPDATE email_messages SET extracted_tracking_number = %s WHERE id = %s",
(first_number, email_data['id'])
)
return {
'action': 'extract_tracking_number',
'tracking_numbers': tracking_numbers
}
async def _action_send_slack_notification(self, params: Dict, email_data: Dict) -> Dict:
"""Send Slack notification"""
channel = params.get('channel', '#general')
template = params.get('template', 'New email: {{subject}}')
logger.info(f"💬 Would send Slack notification to {channel}")
# TODO: Integrate with Slack API
return {
'action': 'send_slack_notification',
'channel': channel,
'note': 'Slack integration not yet implemented'
}
async def _action_send_email_notification(self, params: Dict, email_data: Dict) -> Dict:
"""Send email notification"""
recipients = params.get('recipients', [])
logger.info(f"📧 Would send email notification to {len(recipients)} recipient(s)")
# TODO: Integrate with email sending service
return {
'action': 'send_email_notification',
'recipients': recipients,
'note': 'Email notification not yet implemented'
}
async def _action_mark_as_processed(self, params: Dict, email_data: Dict) -> Dict:
"""Mark email as processed"""
status = params.get('status', 'processed')
execute_update(
"""UPDATE email_messages
SET status = %s, processed_at = CURRENT_TIMESTAMP, auto_processed = true
WHERE id = %s""",
(status, email_data['id'])
)
logger.info(f"✅ Marked email as: {status}")
return {
'action': 'mark_as_processed',
'status': status
}
async def _action_flag_for_review(self, params: Dict, email_data: Dict) -> Dict:
"""Flag email for manual review"""
reason = params.get('reason', 'Flagged by workflow')
execute_update(
"""UPDATE email_messages
SET status = 'flagged', approval_status = 'pending_review'
WHERE id = %s""",
(email_data['id'],)
)
logger.info(f"🚩 Flagged email for review: {reason}")
return {
'action': 'flag_for_review',
'reason': reason
}
# Global instance
email_workflow_service = EmailWorkflowService()