- Added migration 025 for the Ticket System, creating tables for tickets, comments, attachments, worklogs, prepaid cards, and audit logs. - Introduced migration 026 to add ticket-related permissions to the auth system and assign them to user groups. - Developed a test suite for the Ticket Module, validating database schema, ticket number generation, prepaid card constraints, service logic, worklog creation, audit logging, and views.
684 lines
27 KiB
Python
684 lines
27 KiB
Python
"""
|
|
Email Workflow Service
|
|
Executes automated workflows based on email classification
|
|
Inspired by OmniSync architecture adapted for BMC Hub
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, List, Optional, Any
|
|
from datetime import datetime, date
|
|
import re
|
|
import json
|
|
import hashlib
|
|
import shutil
|
|
from pathlib import Path
|
|
from decimal import Decimal
|
|
|
|
from app.core.database import execute_query, execute_insert, execute_update
|
|
from app.core.config import settings
|
|
from app.services.email_activity_logger import email_activity_logger
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EmailWorkflowService:
|
|
"""Orchestrates workflow execution for classified emails"""
|
|
|
|
def __init__(self):
|
|
self.enabled = settings.EMAIL_WORKFLOWS_ENABLED if hasattr(settings, 'EMAIL_WORKFLOWS_ENABLED') else True
|
|
|
|
async def execute_workflows(self, email_data: Dict) -> Dict:
|
|
"""
|
|
Execute all matching workflows for an email
|
|
|
|
Args:
|
|
email_data: Email dict with classification, confidence_score, id, etc.
|
|
|
|
Returns:
|
|
Dict with execution results
|
|
"""
|
|
if not self.enabled:
|
|
logger.info("⏭️ Workflows disabled")
|
|
return {'status': 'disabled', 'workflows_executed': 0}
|
|
|
|
email_id = email_data.get('id')
|
|
classification = email_data.get('classification')
|
|
confidence = email_data.get('confidence_score', 0.0)
|
|
|
|
if not email_id or not classification:
|
|
logger.warning(f"⚠️ Cannot execute workflows: missing email_id or classification")
|
|
return {'status': 'skipped', 'reason': 'missing_data'}
|
|
|
|
logger.info(f"🔄 Finding workflows for classification: {classification} (confidence: {confidence})")
|
|
|
|
# Find matching workflows
|
|
workflows = await self._find_matching_workflows(email_data)
|
|
|
|
if not workflows:
|
|
logger.info(f"✅ No workflows match classification: {classification}")
|
|
return {'status': 'no_match', 'workflows_executed': 0}
|
|
|
|
logger.info(f"📋 Found {len(workflows)} matching workflow(s)")
|
|
|
|
results = {
|
|
'status': 'executed',
|
|
'workflows_executed': 0,
|
|
'workflows_succeeded': 0,
|
|
'workflows_failed': 0,
|
|
'details': []
|
|
}
|
|
|
|
# Execute workflows in priority order
|
|
for workflow in workflows:
|
|
result = await self._execute_workflow(workflow, email_data)
|
|
results['details'].append(result)
|
|
results['workflows_executed'] += 1
|
|
|
|
if result['status'] == 'completed':
|
|
results['workflows_succeeded'] += 1
|
|
else:
|
|
results['workflows_failed'] += 1
|
|
|
|
# Stop if workflow has stop_on_match=true
|
|
if workflow.get('stop_on_match') and result['status'] == 'completed':
|
|
logger.info(f"🛑 Stopping workflow chain (stop_on_match=true)")
|
|
break
|
|
|
|
logger.info(f"✅ Workflow execution complete: {results['workflows_succeeded']}/{results['workflows_executed']} succeeded")
|
|
return results
|
|
|
|
async def _find_matching_workflows(self, email_data: Dict) -> List[Dict]:
|
|
"""Find all workflows that match this email"""
|
|
classification = email_data.get('classification')
|
|
confidence = email_data.get('confidence_score', 0.0)
|
|
sender = email_data.get('sender_email', '')
|
|
subject = email_data.get('subject', '')
|
|
|
|
query = """
|
|
SELECT id, name, classification_trigger, sender_pattern, subject_pattern,
|
|
confidence_threshold, workflow_steps, priority, stop_on_match
|
|
FROM email_workflows
|
|
WHERE enabled = true
|
|
AND classification_trigger = %s
|
|
AND confidence_threshold <= %s
|
|
ORDER BY priority ASC
|
|
"""
|
|
|
|
workflows = execute_query(query, (classification, confidence))
|
|
|
|
# Filter by additional patterns
|
|
matching = []
|
|
for wf in workflows:
|
|
# Check sender pattern
|
|
if wf.get('sender_pattern'):
|
|
pattern = wf['sender_pattern']
|
|
if not re.search(pattern, sender, re.IGNORECASE):
|
|
logger.debug(f"⏭️ Workflow '{wf['name']}' skipped: sender doesn't match pattern")
|
|
continue
|
|
|
|
# Check subject pattern
|
|
if wf.get('subject_pattern'):
|
|
pattern = wf['subject_pattern']
|
|
if not re.search(pattern, subject, re.IGNORECASE):
|
|
logger.debug(f"⏭️ Workflow '{wf['name']}' skipped: subject doesn't match pattern")
|
|
continue
|
|
|
|
matching.append(wf)
|
|
|
|
return matching
|
|
|
|
async def _execute_workflow(self, workflow: Dict, email_data: Dict) -> Dict:
|
|
"""Execute a single workflow"""
|
|
workflow_id = workflow['id']
|
|
workflow_name = workflow['name']
|
|
email_id = email_data['id']
|
|
|
|
logger.info(f"🚀 Executing workflow: {workflow_name} (ID: {workflow_id})")
|
|
|
|
# Create execution record
|
|
execution_id = execute_insert(
|
|
"""INSERT INTO email_workflow_executions
|
|
(workflow_id, email_id, status, steps_total, result_json)
|
|
VALUES (%s, %s, 'running', %s, %s) RETURNING id""",
|
|
(workflow_id, email_id, len(workflow['workflow_steps']), json.dumps({}))
|
|
)
|
|
|
|
started_at = datetime.now()
|
|
steps = workflow['workflow_steps']
|
|
steps_completed = 0
|
|
step_results = []
|
|
|
|
try:
|
|
# Execute each step
|
|
for idx, step in enumerate(steps):
|
|
action = step.get('action')
|
|
params = step.get('params', {})
|
|
|
|
logger.info(f" ➡️ Step {idx + 1}/{len(steps)}: {action}")
|
|
|
|
step_result = await self._execute_action(action, params, email_data)
|
|
step_results.append({
|
|
'step': idx + 1,
|
|
'action': action,
|
|
'status': step_result['status'],
|
|
'result': step_result.get('result'),
|
|
'error': step_result.get('error')
|
|
})
|
|
|
|
if step_result['status'] == 'failed':
|
|
logger.error(f" ❌ Step failed: {step_result.get('error')}")
|
|
# Continue to next step even on failure (configurable later)
|
|
else:
|
|
logger.info(f" ✅ Step completed successfully")
|
|
|
|
steps_completed += 1
|
|
|
|
# Mark execution as completed
|
|
completed_at = datetime.now()
|
|
execution_time_ms = int((completed_at - started_at).total_seconds() * 1000)
|
|
|
|
execute_update(
|
|
"""UPDATE email_workflow_executions
|
|
SET status = 'completed', steps_completed = %s,
|
|
result_json = %s, completed_at = CURRENT_TIMESTAMP,
|
|
execution_time_ms = %s
|
|
WHERE id = %s""",
|
|
(steps_completed, json.dumps(step_results), execution_time_ms, execution_id)
|
|
)
|
|
|
|
# Update workflow statistics
|
|
execute_update(
|
|
"""UPDATE email_workflows
|
|
SET execution_count = execution_count + 1,
|
|
success_count = success_count + 1,
|
|
last_executed_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s""",
|
|
(workflow_id,)
|
|
)
|
|
|
|
logger.info(f"✅ Workflow '{workflow_name}' completed ({execution_time_ms}ms)")
|
|
|
|
# Log: Workflow execution completed
|
|
await email_activity_logger.log_workflow_executed(
|
|
email_id=email_id,
|
|
workflow_id=workflow_id,
|
|
workflow_name=workflow_name,
|
|
status='completed',
|
|
steps_completed=steps_completed,
|
|
execution_time_ms=execution_time_ms
|
|
)
|
|
|
|
return {
|
|
'workflow_id': workflow_id,
|
|
'workflow_name': workflow_name,
|
|
'execution_id': execution_id,
|
|
'status': 'completed',
|
|
'steps_completed': steps_completed,
|
|
'steps_total': len(steps),
|
|
'execution_time_ms': execution_time_ms,
|
|
'step_results': step_results
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Workflow execution failed: {e}")
|
|
|
|
# Mark execution as failed
|
|
execute_update(
|
|
"""UPDATE email_workflow_executions
|
|
SET status = 'failed', steps_completed = %s,
|
|
error_message = %s, completed_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s""",
|
|
(steps_completed, str(e), execution_id)
|
|
)
|
|
|
|
# Update workflow statistics
|
|
execute_update(
|
|
"""UPDATE email_workflows
|
|
SET execution_count = execution_count + 1,
|
|
failure_count = failure_count + 1,
|
|
last_executed_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s""",
|
|
(workflow_id,)
|
|
)
|
|
|
|
# Log: Workflow execution failed
|
|
await email_activity_logger.log_workflow_executed(
|
|
email_id=email_id,
|
|
workflow_id=workflow_id,
|
|
workflow_name=workflow_name,
|
|
status='failed',
|
|
steps_completed=steps_completed,
|
|
execution_time_ms=0
|
|
)
|
|
|
|
return {
|
|
'workflow_id': workflow_id,
|
|
'workflow_name': workflow_name,
|
|
'execution_id': execution_id,
|
|
'status': 'failed',
|
|
'steps_completed': steps_completed,
|
|
'steps_total': len(steps),
|
|
'error': str(e)
|
|
}
|
|
|
|
async def _execute_action(self, action: str, params: Dict, email_data: Dict) -> Dict:
|
|
"""Execute a single workflow action"""
|
|
try:
|
|
# Dispatch to specific action handler
|
|
handler_map = {
|
|
'create_ticket': self._action_create_ticket_system,
|
|
'link_email_to_ticket': self._action_link_email_to_ticket,
|
|
'create_time_entry': self._action_create_time_entry,
|
|
'link_to_vendor': self._action_link_to_vendor,
|
|
'link_to_customer': self._action_link_to_customer,
|
|
'extract_invoice_data': self._action_extract_invoice_data,
|
|
'extract_tracking_number': self._action_extract_tracking_number,
|
|
'send_slack_notification': self._action_send_slack_notification,
|
|
'send_email_notification': self._action_send_email_notification,
|
|
'mark_as_processed': self._action_mark_as_processed,
|
|
'flag_for_review': self._action_flag_for_review,
|
|
}
|
|
|
|
handler = handler_map.get(action)
|
|
|
|
if not handler:
|
|
logger.warning(f"⚠️ Unknown action: {action}")
|
|
return {
|
|
'status': 'skipped',
|
|
'error': f'Unknown action: {action}'
|
|
}
|
|
|
|
result = await handler(params, email_data)
|
|
return {
|
|
'status': 'success',
|
|
'result': result
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Action '{action}' failed: {e}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e)
|
|
}
|
|
|
|
# Action Handlers
|
|
|
|
async def _action_create_ticket_system(self, params: Dict, email_data: Dict) -> Dict:
|
|
"""Create a ticket from email using new ticket system"""
|
|
from app.ticket.backend.email_integration import EmailTicketIntegration
|
|
|
|
# Build email_data dict for ticket integration
|
|
ticket_email_data = {
|
|
'message_id': email_data.get('message_id'),
|
|
'subject': email_data.get('subject'),
|
|
'from_address': email_data.get('sender_email'),
|
|
'body': email_data.get('body_text', ''),
|
|
'html_body': email_data.get('body_html'),
|
|
'received_at': email_data.get('received_date').isoformat() if email_data.get('received_date') else None,
|
|
'in_reply_to': None, # TODO: Extract from email headers
|
|
'references': None # TODO: Extract from email headers
|
|
}
|
|
|
|
# Get params from workflow
|
|
customer_id = params.get('customer_id') or email_data.get('customer_id')
|
|
assigned_to_user_id = params.get('assigned_to_user_id')
|
|
|
|
logger.info(f"🎫 Creating ticket from email: {email_data.get('message_id')}")
|
|
|
|
result = await EmailTicketIntegration.process_email_for_ticket(
|
|
email_data=ticket_email_data,
|
|
customer_id=customer_id,
|
|
assigned_to_user_id=assigned_to_user_id
|
|
)
|
|
|
|
logger.info(f"✅ Created ticket {result.get('ticket_number')} from email")
|
|
|
|
return {
|
|
'action': 'create_ticket',
|
|
'ticket_id': result.get('ticket_id'),
|
|
'ticket_number': result.get('ticket_number'),
|
|
'created': result.get('created', False),
|
|
'linked': result.get('linked', False)
|
|
}
|
|
|
|
async def _action_link_email_to_ticket(self, params: Dict, email_data: Dict) -> Dict:
|
|
"""Link email to existing ticket"""
|
|
from app.ticket.backend.email_integration import EmailTicketIntegration
|
|
|
|
ticket_number = params.get('ticket_number')
|
|
|
|
if not ticket_number:
|
|
logger.warning("⚠️ No ticket_number provided for link_email_to_ticket action")
|
|
return {
|
|
'action': 'link_email_to_ticket',
|
|
'status': 'failed',
|
|
'error': 'ticket_number required'
|
|
}
|
|
|
|
ticket_email_data = {
|
|
'message_id': email_data.get('message_id'),
|
|
'subject': email_data.get('subject'),
|
|
'from_address': email_data.get('sender_email'),
|
|
'body': email_data.get('body_text', ''),
|
|
'html_body': email_data.get('body_html'),
|
|
'received_at': email_data.get('received_date').isoformat() if email_data.get('received_date') else None,
|
|
}
|
|
|
|
logger.info(f"🔗 Linking email to ticket {ticket_number}")
|
|
|
|
result = await EmailTicketIntegration.link_email_to_ticket(
|
|
ticket_number=ticket_number,
|
|
email_data=ticket_email_data
|
|
)
|
|
|
|
logger.info(f"✅ Linked email to ticket {ticket_number}")
|
|
|
|
return {
|
|
'action': 'link_email_to_ticket',
|
|
'ticket_id': result.get('ticket_id'),
|
|
'ticket_number': result.get('ticket_number'),
|
|
'linked': True
|
|
}
|
|
|
|
async def _action_create_time_entry(self, params: Dict, email_data: Dict) -> Dict:
|
|
"""Create time entry from email"""
|
|
logger.info(f"⏱️ Would create time entry")
|
|
|
|
# TODO: Integrate with time tracking system
|
|
return {
|
|
'action': 'create_time_entry',
|
|
'note': 'Time entry creation not yet implemented'
|
|
}
|
|
|
|
async def _action_link_to_vendor(self, params: Dict, email_data: Dict) -> Dict:
|
|
"""Link email to vendor"""
|
|
match_by = params.get('match_by', 'email')
|
|
sender_email = email_data.get('sender_email')
|
|
|
|
if not sender_email:
|
|
return {'action': 'link_to_vendor', 'matched': False, 'reason': 'No sender email'}
|
|
|
|
# Find vendor by email
|
|
query = "SELECT id, name FROM vendors WHERE email = %s LIMIT 1"
|
|
result = execute_query(query, (sender_email,), fetchone=True)
|
|
|
|
if result:
|
|
vendor_id = result['id']
|
|
|
|
# Check if already linked to avoid duplicate updates
|
|
current_vendor = execute_query(
|
|
"SELECT supplier_id FROM email_messages WHERE id = %s",
|
|
(email_data['id'],), fetchone=True
|
|
)
|
|
|
|
if current_vendor and current_vendor.get('supplier_id') == vendor_id:
|
|
logger.info(f"⏭️ Email already linked to vendor {vendor_id}, skipping duplicate update")
|
|
return {
|
|
'action': 'link_to_vendor',
|
|
'matched': True,
|
|
'vendor_id': vendor_id,
|
|
'vendor_name': result['name'],
|
|
'note': 'Already linked (skipped duplicate)'
|
|
}
|
|
|
|
# Update email with vendor link
|
|
execute_update(
|
|
"UPDATE email_messages SET supplier_id = %s WHERE id = %s",
|
|
(vendor_id, email_data['id'])
|
|
)
|
|
|
|
logger.info(f"🔗 Linked email to vendor: {result['name']} (ID: {vendor_id})")
|
|
|
|
return {
|
|
'action': 'link_to_vendor',
|
|
'matched': True,
|
|
'vendor_id': vendor_id,
|
|
'vendor_name': result['name']
|
|
}
|
|
else:
|
|
logger.info(f"⚠️ No vendor found for email: {sender_email}")
|
|
return {'action': 'link_to_vendor', 'matched': False, 'reason': 'Vendor not found'}
|
|
|
|
async def _action_link_to_customer(self, params: Dict, email_data: Dict) -> Dict:
|
|
"""Link email to customer"""
|
|
logger.info(f"🔗 Would link to customer")
|
|
|
|
# TODO: Implement customer matching logic
|
|
return {
|
|
'action': 'link_to_customer',
|
|
'note': 'Customer linking not yet implemented'
|
|
}
|
|
|
|
async def _action_extract_invoice_data(self, params: Dict, email_data: Dict) -> Dict:
|
|
"""Save email PDF attachment to incoming_files for processing"""
|
|
logger.info(f"📄 Saving invoice PDF from email to incoming files")
|
|
|
|
email_id = email_data.get('id')
|
|
sender_email = email_data.get('sender_email', '')
|
|
vendor_id = email_data.get('supplier_id')
|
|
|
|
# Get PDF attachments from email
|
|
attachments = execute_query(
|
|
"""SELECT filename, file_path, size_bytes, content_type
|
|
FROM email_attachments
|
|
WHERE email_id = %s AND content_type = 'application/pdf'""",
|
|
(email_id,)
|
|
)
|
|
|
|
if not attachments:
|
|
attachments = []
|
|
elif not isinstance(attachments, list):
|
|
attachments = [attachments]
|
|
|
|
if not attachments:
|
|
logger.warning(f"⚠️ No PDF attachments found for email {email_id}")
|
|
return {
|
|
'action': 'extract_invoice_data',
|
|
'success': False,
|
|
'note': 'No PDF attachment found in email'
|
|
}
|
|
|
|
uploaded_files = []
|
|
|
|
for attachment in attachments:
|
|
try:
|
|
attachment_path = attachment['file_path']
|
|
if not attachment_path:
|
|
logger.warning(f"⚠️ No file path for attachment {attachment['filename']}")
|
|
continue
|
|
|
|
# Handle both absolute and relative paths
|
|
file_path = Path(attachment_path)
|
|
if not file_path.is_absolute():
|
|
# Try common base directories
|
|
for base in [Path.cwd(), Path('/app'), Path('.')]:
|
|
test_path = base / attachment_path
|
|
if test_path.exists():
|
|
file_path = test_path
|
|
break
|
|
|
|
if not file_path.exists():
|
|
error_msg = f"Attachment file not found: {attachment_path}"
|
|
logger.error(f"❌ {error_msg}")
|
|
raise FileNotFoundError(error_msg)
|
|
|
|
# Old code continues here but never reached if file missing
|
|
if False and not file_path.exists():
|
|
logger.warning(f"⚠️ Attachment file not found: {attachment_path}")
|
|
continue
|
|
|
|
# Calculate checksum
|
|
with open(file_path, 'rb') as f:
|
|
file_content = f.read()
|
|
checksum = hashlib.sha256(file_content).hexdigest()
|
|
|
|
# Check if file already exists
|
|
existing = execute_query(
|
|
"SELECT file_id FROM incoming_files WHERE checksum = %s",
|
|
(checksum,),
|
|
fetchone=True
|
|
)
|
|
|
|
if existing:
|
|
logger.info(f"⚠️ File already exists: {attachment['filename']}")
|
|
continue
|
|
|
|
# Create uploads directory if it doesn't exist
|
|
upload_dir = Path("uploads")
|
|
upload_dir.mkdir(exist_ok=True)
|
|
|
|
# Copy file to uploads directory
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
safe_filename = f"{timestamp}_{attachment['filename']}"
|
|
destination = upload_dir / safe_filename
|
|
|
|
shutil.copy2(file_path, destination)
|
|
|
|
# Insert into incoming_files
|
|
file_id = execute_insert(
|
|
"""INSERT INTO incoming_files
|
|
(filename, original_filename, file_path, file_size, mime_type, checksum,
|
|
status, detected_vendor_id, uploaded_at)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
|
|
RETURNING file_id""",
|
|
(
|
|
safe_filename,
|
|
attachment['filename'],
|
|
str(destination),
|
|
attachment['size_bytes'],
|
|
'application/pdf',
|
|
checksum,
|
|
'pending', # Will appear in "Mangler Behandling"
|
|
vendor_id
|
|
)
|
|
)
|
|
|
|
uploaded_files.append({
|
|
'file_id': file_id,
|
|
'filename': attachment['filename']
|
|
})
|
|
|
|
logger.info(f"✅ Saved PDF to incoming_files: {attachment['filename']} (file_id: {file_id})")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to save attachment: {e}")
|
|
continue
|
|
|
|
if uploaded_files:
|
|
return {
|
|
'action': 'extract_invoice_data',
|
|
'success': True,
|
|
'files_uploaded': len(uploaded_files),
|
|
'file_ids': [f['file_id'] for f in uploaded_files],
|
|
'note': f"{len(uploaded_files)} PDF(er) gemt i 'Mangler Behandling'"
|
|
}
|
|
else:
|
|
return {
|
|
'action': 'extract_invoice_data',
|
|
'success': False,
|
|
'note': 'No files could be uploaded'
|
|
}
|
|
|
|
async def _action_extract_tracking_number(self, params: Dict, email_data: Dict) -> Dict:
|
|
"""Extract tracking number from email"""
|
|
body = email_data.get('body_text', '')
|
|
subject = email_data.get('subject', '')
|
|
text = f"{subject} {body}"
|
|
|
|
# Simple regex patterns for common carriers
|
|
patterns = {
|
|
'postnord': r'\b[0-9]{18}\b',
|
|
'gls': r'\b[0-9]{11}\b',
|
|
'dao': r'\b[0-9]{14}\b'
|
|
}
|
|
|
|
tracking_numbers = []
|
|
|
|
for carrier, pattern in patterns.items():
|
|
matches = re.findall(pattern, text)
|
|
if matches:
|
|
tracking_numbers.extend([{'carrier': carrier, 'number': m} for m in matches])
|
|
|
|
if tracking_numbers:
|
|
logger.info(f"📦 Extracted {len(tracking_numbers)} tracking number(s)")
|
|
|
|
# Update email with tracking number
|
|
if tracking_numbers:
|
|
first_number = tracking_numbers[0]['number']
|
|
execute_update(
|
|
"UPDATE email_messages SET extracted_tracking_number = %s WHERE id = %s",
|
|
(first_number, email_data['id'])
|
|
)
|
|
|
|
return {
|
|
'action': 'extract_tracking_number',
|
|
'tracking_numbers': tracking_numbers
|
|
}
|
|
|
|
async def _action_send_slack_notification(self, params: Dict, email_data: Dict) -> Dict:
|
|
"""Send Slack notification"""
|
|
channel = params.get('channel', '#general')
|
|
template = params.get('template', 'New email: {{subject}}')
|
|
|
|
logger.info(f"💬 Would send Slack notification to {channel}")
|
|
|
|
# TODO: Integrate with Slack API
|
|
return {
|
|
'action': 'send_slack_notification',
|
|
'channel': channel,
|
|
'note': 'Slack integration not yet implemented'
|
|
}
|
|
|
|
async def _action_send_email_notification(self, params: Dict, email_data: Dict) -> Dict:
|
|
"""Send email notification"""
|
|
recipients = params.get('recipients', [])
|
|
|
|
logger.info(f"📧 Would send email notification to {len(recipients)} recipient(s)")
|
|
|
|
# TODO: Integrate with email sending service
|
|
return {
|
|
'action': 'send_email_notification',
|
|
'recipients': recipients,
|
|
'note': 'Email notification not yet implemented'
|
|
}
|
|
|
|
async def _action_mark_as_processed(self, params: Dict, email_data: Dict) -> Dict:
|
|
"""Mark email as processed"""
|
|
status = params.get('status', 'processed')
|
|
|
|
execute_update(
|
|
"""UPDATE email_messages
|
|
SET status = %s, processed_at = CURRENT_TIMESTAMP, auto_processed = true
|
|
WHERE id = %s""",
|
|
(status, email_data['id'])
|
|
)
|
|
|
|
logger.info(f"✅ Marked email as: {status}")
|
|
|
|
return {
|
|
'action': 'mark_as_processed',
|
|
'status': status
|
|
}
|
|
|
|
async def _action_flag_for_review(self, params: Dict, email_data: Dict) -> Dict:
|
|
"""Flag email for manual review"""
|
|
reason = params.get('reason', 'Flagged by workflow')
|
|
|
|
execute_update(
|
|
"""UPDATE email_messages
|
|
SET status = 'flagged', approval_status = 'pending_review'
|
|
WHERE id = %s""",
|
|
(email_data['id'],)
|
|
)
|
|
|
|
logger.info(f"🚩 Flagged email for review: {reason}")
|
|
|
|
return {
|
|
'action': 'flag_for_review',
|
|
'reason': reason
|
|
}
|
|
|
|
|
|
# Global instance
|
|
email_workflow_service = EmailWorkflowService()
|