bmc_hub/app/services/email_workflow_service.py
Christian 3fb43783a6 feat: Implement Email Workflow System with comprehensive documentation and migration scripts
- Added Email Workflow System with automated actions based on email classification.
- Created database schema with tables for workflows, executions, and actions.
- Developed API endpoints for CRUD operations on workflows and execution history.
- Included pre-configured workflows for invoice processing, time confirmation, and bankruptcy alerts.
- Introduced user guide and workflow system improvements for better usability.
- Implemented backup system for automated backup jobs and notifications.
- Established email activity log to track all actions and events related to emails.
2025-12-15 12:28:12 +01:00

621 lines
24 KiB
Python

"""
Email Workflow Service
Executes automated workflows based on email classification
Inspired by OmniSync architecture adapted for BMC Hub
"""
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime, date
import re
import json
import hashlib
import shutil
from pathlib import Path
from decimal import Decimal
from app.core.database import execute_query, execute_insert, execute_update
from app.core.config import settings
from app.services.email_activity_logger import email_activity_logger
logger = logging.getLogger(__name__)
class EmailWorkflowService:
"""Orchestrates workflow execution for classified emails"""
def __init__(self):
self.enabled = settings.EMAIL_WORKFLOWS_ENABLED if hasattr(settings, 'EMAIL_WORKFLOWS_ENABLED') else True
async def execute_workflows(self, email_data: Dict) -> Dict:
"""
Execute all matching workflows for an email
Args:
email_data: Email dict with classification, confidence_score, id, etc.
Returns:
Dict with execution results
"""
if not self.enabled:
logger.info("⏭️ Workflows disabled")
return {'status': 'disabled', 'workflows_executed': 0}
email_id = email_data.get('id')
classification = email_data.get('classification')
confidence = email_data.get('confidence_score', 0.0)
if not email_id or not classification:
logger.warning(f"⚠️ Cannot execute workflows: missing email_id or classification")
return {'status': 'skipped', 'reason': 'missing_data'}
logger.info(f"🔄 Finding workflows for classification: {classification} (confidence: {confidence})")
# Find matching workflows
workflows = await self._find_matching_workflows(email_data)
if not workflows:
logger.info(f"✅ No workflows match classification: {classification}")
return {'status': 'no_match', 'workflows_executed': 0}
logger.info(f"📋 Found {len(workflows)} matching workflow(s)")
results = {
'status': 'executed',
'workflows_executed': 0,
'workflows_succeeded': 0,
'workflows_failed': 0,
'details': []
}
# Execute workflows in priority order
for workflow in workflows:
result = await self._execute_workflow(workflow, email_data)
results['details'].append(result)
results['workflows_executed'] += 1
if result['status'] == 'completed':
results['workflows_succeeded'] += 1
else:
results['workflows_failed'] += 1
# Stop if workflow has stop_on_match=true
if workflow.get('stop_on_match') and result['status'] == 'completed':
logger.info(f"🛑 Stopping workflow chain (stop_on_match=true)")
break
logger.info(f"✅ Workflow execution complete: {results['workflows_succeeded']}/{results['workflows_executed']} succeeded")
return results
async def _find_matching_workflows(self, email_data: Dict) -> List[Dict]:
"""Find all workflows that match this email"""
classification = email_data.get('classification')
confidence = email_data.get('confidence_score', 0.0)
sender = email_data.get('sender_email', '')
subject = email_data.get('subject', '')
query = """
SELECT id, name, classification_trigger, sender_pattern, subject_pattern,
confidence_threshold, workflow_steps, priority, stop_on_match
FROM email_workflows
WHERE enabled = true
AND classification_trigger = %s
AND confidence_threshold <= %s
ORDER BY priority ASC
"""
workflows = execute_query(query, (classification, confidence))
# Filter by additional patterns
matching = []
for wf in workflows:
# Check sender pattern
if wf.get('sender_pattern'):
pattern = wf['sender_pattern']
if not re.search(pattern, sender, re.IGNORECASE):
logger.debug(f"⏭️ Workflow '{wf['name']}' skipped: sender doesn't match pattern")
continue
# Check subject pattern
if wf.get('subject_pattern'):
pattern = wf['subject_pattern']
if not re.search(pattern, subject, re.IGNORECASE):
logger.debug(f"⏭️ Workflow '{wf['name']}' skipped: subject doesn't match pattern")
continue
matching.append(wf)
return matching
async def _execute_workflow(self, workflow: Dict, email_data: Dict) -> Dict:
"""Execute a single workflow"""
workflow_id = workflow['id']
workflow_name = workflow['name']
email_id = email_data['id']
logger.info(f"🚀 Executing workflow: {workflow_name} (ID: {workflow_id})")
# Create execution record
execution_id = execute_insert(
"""INSERT INTO email_workflow_executions
(workflow_id, email_id, status, steps_total, result_json)
VALUES (%s, %s, 'running', %s, %s) RETURNING id""",
(workflow_id, email_id, len(workflow['workflow_steps']), json.dumps({}))
)
started_at = datetime.now()
steps = workflow['workflow_steps']
steps_completed = 0
step_results = []
try:
# Execute each step
for idx, step in enumerate(steps):
action = step.get('action')
params = step.get('params', {})
logger.info(f" ➡️ Step {idx + 1}/{len(steps)}: {action}")
step_result = await self._execute_action(action, params, email_data)
step_results.append({
'step': idx + 1,
'action': action,
'status': step_result['status'],
'result': step_result.get('result'),
'error': step_result.get('error')
})
if step_result['status'] == 'failed':
logger.error(f" ❌ Step failed: {step_result.get('error')}")
# Continue to next step even on failure (configurable later)
else:
logger.info(f" ✅ Step completed successfully")
steps_completed += 1
# Mark execution as completed
completed_at = datetime.now()
execution_time_ms = int((completed_at - started_at).total_seconds() * 1000)
execute_update(
"""UPDATE email_workflow_executions
SET status = 'completed', steps_completed = %s,
result_json = %s, completed_at = CURRENT_TIMESTAMP,
execution_time_ms = %s
WHERE id = %s""",
(steps_completed, json.dumps(step_results), execution_time_ms, execution_id)
)
# Update workflow statistics
execute_update(
"""UPDATE email_workflows
SET execution_count = execution_count + 1,
success_count = success_count + 1,
last_executed_at = CURRENT_TIMESTAMP
WHERE id = %s""",
(workflow_id,)
)
logger.info(f"✅ Workflow '{workflow_name}' completed ({execution_time_ms}ms)")
# Log: Workflow execution completed
await email_activity_logger.log_workflow_executed(
email_id=email_id,
workflow_id=workflow_id,
workflow_name=workflow_name,
status='completed',
steps_completed=steps_completed,
execution_time_ms=execution_time_ms
)
return {
'workflow_id': workflow_id,
'workflow_name': workflow_name,
'execution_id': execution_id,
'status': 'completed',
'steps_completed': steps_completed,
'steps_total': len(steps),
'execution_time_ms': execution_time_ms,
'step_results': step_results
}
except Exception as e:
logger.error(f"❌ Workflow execution failed: {e}")
# Mark execution as failed
execute_update(
"""UPDATE email_workflow_executions
SET status = 'failed', steps_completed = %s,
error_message = %s, completed_at = CURRENT_TIMESTAMP
WHERE id = %s""",
(steps_completed, str(e), execution_id)
)
# Update workflow statistics
execute_update(
"""UPDATE email_workflows
SET execution_count = execution_count + 1,
failure_count = failure_count + 1,
last_executed_at = CURRENT_TIMESTAMP
WHERE id = %s""",
(workflow_id,)
)
# Log: Workflow execution failed
await email_activity_logger.log_workflow_executed(
email_id=email_id,
workflow_id=workflow_id,
workflow_name=workflow_name,
status='failed',
steps_completed=steps_completed,
execution_time_ms=0
)
return {
'workflow_id': workflow_id,
'workflow_name': workflow_name,
'execution_id': execution_id,
'status': 'failed',
'steps_completed': steps_completed,
'steps_total': len(steps),
'error': str(e)
}
async def _execute_action(self, action: str, params: Dict, email_data: Dict) -> Dict:
"""Execute a single workflow action"""
try:
# Dispatch to specific action handler
handler_map = {
'create_ticket': self._action_create_ticket,
'create_time_entry': self._action_create_time_entry,
'link_to_vendor': self._action_link_to_vendor,
'link_to_customer': self._action_link_to_customer,
'extract_invoice_data': self._action_extract_invoice_data,
'extract_tracking_number': self._action_extract_tracking_number,
'send_slack_notification': self._action_send_slack_notification,
'send_email_notification': self._action_send_email_notification,
'mark_as_processed': self._action_mark_as_processed,
'flag_for_review': self._action_flag_for_review,
}
handler = handler_map.get(action)
if not handler:
logger.warning(f"⚠️ Unknown action: {action}")
return {
'status': 'skipped',
'error': f'Unknown action: {action}'
}
result = await handler(params, email_data)
return {
'status': 'success',
'result': result
}
except Exception as e:
logger.error(f"❌ Action '{action}' failed: {e}")
return {
'status': 'failed',
'error': str(e)
}
# Action Handlers
async def _action_create_ticket(self, params: Dict, email_data: Dict) -> Dict:
"""Create a ticket/case from email"""
module = params.get('module', 'support_cases')
priority = params.get('priority', 'normal')
# TODO: Integrate with actual case/ticket system
logger.info(f"🎫 Would create ticket in module '{module}' with priority '{priority}'")
return {
'action': 'create_ticket',
'module': module,
'priority': priority,
'note': 'Ticket creation not yet implemented'
}
async def _action_create_time_entry(self, params: Dict, email_data: Dict) -> Dict:
"""Create time entry from email"""
logger.info(f"⏱️ Would create time entry")
# TODO: Integrate with time tracking system
return {
'action': 'create_time_entry',
'note': 'Time entry creation not yet implemented'
}
async def _action_link_to_vendor(self, params: Dict, email_data: Dict) -> Dict:
"""Link email to vendor"""
match_by = params.get('match_by', 'email')
sender_email = email_data.get('sender_email')
if not sender_email:
return {'action': 'link_to_vendor', 'matched': False, 'reason': 'No sender email'}
# Find vendor by email
query = "SELECT id, name FROM vendors WHERE email = %s LIMIT 1"
result = execute_query(query, (sender_email,), fetchone=True)
if result:
vendor_id = result['id']
# Check if already linked to avoid duplicate updates
current_vendor = execute_query(
"SELECT supplier_id FROM email_messages WHERE id = %s",
(email_data['id'],), fetchone=True
)
if current_vendor and current_vendor.get('supplier_id') == vendor_id:
logger.info(f"⏭️ Email already linked to vendor {vendor_id}, skipping duplicate update")
return {
'action': 'link_to_vendor',
'matched': True,
'vendor_id': vendor_id,
'vendor_name': result['name'],
'note': 'Already linked (skipped duplicate)'
}
# Update email with vendor link
execute_update(
"UPDATE email_messages SET supplier_id = %s WHERE id = %s",
(vendor_id, email_data['id'])
)
logger.info(f"🔗 Linked email to vendor: {result['name']} (ID: {vendor_id})")
return {
'action': 'link_to_vendor',
'matched': True,
'vendor_id': vendor_id,
'vendor_name': result['name']
}
else:
logger.info(f"⚠️ No vendor found for email: {sender_email}")
return {'action': 'link_to_vendor', 'matched': False, 'reason': 'Vendor not found'}
async def _action_link_to_customer(self, params: Dict, email_data: Dict) -> Dict:
"""Link email to customer"""
logger.info(f"🔗 Would link to customer")
# TODO: Implement customer matching logic
return {
'action': 'link_to_customer',
'note': 'Customer linking not yet implemented'
}
async def _action_extract_invoice_data(self, params: Dict, email_data: Dict) -> Dict:
"""Save email PDF attachment to incoming_files for processing"""
logger.info(f"📄 Saving invoice PDF from email to incoming files")
email_id = email_data.get('id')
sender_email = email_data.get('sender_email', '')
vendor_id = email_data.get('supplier_id')
# Get PDF attachments from email
attachments = execute_query(
"""SELECT filename, file_path, size_bytes, content_type
FROM email_attachments
WHERE email_id = %s AND content_type = 'application/pdf'""",
(email_id,)
)
if not attachments:
attachments = []
elif not isinstance(attachments, list):
attachments = [attachments]
if not attachments:
logger.warning(f"⚠️ No PDF attachments found for email {email_id}")
return {
'action': 'extract_invoice_data',
'success': False,
'note': 'No PDF attachment found in email'
}
uploaded_files = []
for attachment in attachments:
try:
attachment_path = attachment['file_path']
if not attachment_path:
logger.warning(f"⚠️ No file path for attachment {attachment['filename']}")
continue
# Handle both absolute and relative paths
file_path = Path(attachment_path)
if not file_path.is_absolute():
# Try common base directories
for base in [Path.cwd(), Path('/app'), Path('.')]:
test_path = base / attachment_path
if test_path.exists():
file_path = test_path
break
if not file_path.exists():
error_msg = f"Attachment file not found: {attachment_path}"
logger.error(f"{error_msg}")
raise FileNotFoundError(error_msg)
# Old code continues here but never reached if file missing
if False and not file_path.exists():
logger.warning(f"⚠️ Attachment file not found: {attachment_path}")
continue
# Calculate checksum
with open(file_path, 'rb') as f:
file_content = f.read()
checksum = hashlib.sha256(file_content).hexdigest()
# Check if file already exists
existing = execute_query(
"SELECT file_id FROM incoming_files WHERE checksum = %s",
(checksum,),
fetchone=True
)
if existing:
logger.info(f"⚠️ File already exists: {attachment['filename']}")
continue
# Create uploads directory if it doesn't exist
upload_dir = Path("uploads")
upload_dir.mkdir(exist_ok=True)
# Copy file to uploads directory
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
safe_filename = f"{timestamp}_{attachment['filename']}"
destination = upload_dir / safe_filename
shutil.copy2(file_path, destination)
# Insert into incoming_files
file_id = execute_insert(
"""INSERT INTO incoming_files
(filename, original_filename, file_path, file_size, mime_type, checksum,
status, detected_vendor_id, uploaded_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
RETURNING file_id""",
(
safe_filename,
attachment['filename'],
str(destination),
attachment['size_bytes'],
'application/pdf',
checksum,
'pending', # Will appear in "Mangler Behandling"
vendor_id
)
)
uploaded_files.append({
'file_id': file_id,
'filename': attachment['filename']
})
logger.info(f"✅ Saved PDF to incoming_files: {attachment['filename']} (file_id: {file_id})")
except Exception as e:
logger.error(f"❌ Failed to save attachment: {e}")
continue
if uploaded_files:
return {
'action': 'extract_invoice_data',
'success': True,
'files_uploaded': len(uploaded_files),
'file_ids': [f['file_id'] for f in uploaded_files],
'note': f"{len(uploaded_files)} PDF(er) gemt i 'Mangler Behandling'"
}
else:
return {
'action': 'extract_invoice_data',
'success': False,
'note': 'No files could be uploaded'
}
async def _action_extract_tracking_number(self, params: Dict, email_data: Dict) -> Dict:
"""Extract tracking number from email"""
body = email_data.get('body_text', '')
subject = email_data.get('subject', '')
text = f"{subject} {body}"
# Simple regex patterns for common carriers
patterns = {
'postnord': r'\b[0-9]{18}\b',
'gls': r'\b[0-9]{11}\b',
'dao': r'\b[0-9]{14}\b'
}
tracking_numbers = []
for carrier, pattern in patterns.items():
matches = re.findall(pattern, text)
if matches:
tracking_numbers.extend([{'carrier': carrier, 'number': m} for m in matches])
if tracking_numbers:
logger.info(f"📦 Extracted {len(tracking_numbers)} tracking number(s)")
# Update email with tracking number
if tracking_numbers:
first_number = tracking_numbers[0]['number']
execute_update(
"UPDATE email_messages SET extracted_tracking_number = %s WHERE id = %s",
(first_number, email_data['id'])
)
return {
'action': 'extract_tracking_number',
'tracking_numbers': tracking_numbers
}
async def _action_send_slack_notification(self, params: Dict, email_data: Dict) -> Dict:
"""Send Slack notification"""
channel = params.get('channel', '#general')
template = params.get('template', 'New email: {{subject}}')
logger.info(f"💬 Would send Slack notification to {channel}")
# TODO: Integrate with Slack API
return {
'action': 'send_slack_notification',
'channel': channel,
'note': 'Slack integration not yet implemented'
}
async def _action_send_email_notification(self, params: Dict, email_data: Dict) -> Dict:
"""Send email notification"""
recipients = params.get('recipients', [])
logger.info(f"📧 Would send email notification to {len(recipients)} recipient(s)")
# TODO: Integrate with email sending service
return {
'action': 'send_email_notification',
'recipients': recipients,
'note': 'Email notification not yet implemented'
}
async def _action_mark_as_processed(self, params: Dict, email_data: Dict) -> Dict:
"""Mark email as processed"""
status = params.get('status', 'processed')
execute_update(
"""UPDATE email_messages
SET status = %s, processed_at = CURRENT_TIMESTAMP, auto_processed = true
WHERE id = %s""",
(status, email_data['id'])
)
logger.info(f"✅ Marked email as: {status}")
return {
'action': 'mark_as_processed',
'status': status
}
async def _action_flag_for_review(self, params: Dict, email_data: Dict) -> Dict:
"""Flag email for manual review"""
reason = params.get('reason', 'Flagged by workflow')
execute_update(
"""UPDATE email_messages
SET status = 'flagged', approval_status = 'pending_review'
WHERE id = %s""",
(email_data['id'],)
)
logger.info(f"🚩 Flagged email for review: {reason}")
return {
'action': 'flag_for_review',
'reason': reason
}
# Global instance
email_workflow_service = EmailWorkflowService()