bmc_hub/app/services/email_processor_service.py

732 lines
32 KiB
Python
Raw Permalink Normal View History

"""
Email Processor Service
Main orchestrator for email workflow: Fetch Store Classify Match Rules Link Entities
Based on OmniSync architecture adapted for BMC Hub
"""
import logging
import re
from typing import List, Dict, Optional
from datetime import datetime
from app.services.email_service import EmailService
from app.services.email_analysis_service import EmailAnalysisService
from app.services.transcription_service import TranscriptionService
from app.services.simple_classifier import simple_classifier
from app.services.email_workflow_service import email_workflow_service
from app.services.email_activity_logger import email_activity_logger
from app.core.config import settings
from app.core.database import execute_query, execute_update, execute_insert
logger = logging.getLogger(__name__)
class EmailProcessorService:
"""Main orchestrator for email processing pipeline"""
def __init__(self):
self.email_service = EmailService()
self.analysis_service = EmailAnalysisService()
self.transcription_service = TranscriptionService()
self.enabled = settings.EMAIL_TO_TICKET_ENABLED
self.rules_enabled = settings.EMAIL_RULES_ENABLED
self.auto_process = settings.EMAIL_RULES_AUTO_PROCESS
self.ai_enabled = settings.EMAIL_AI_ENABLED
async def process_inbox(self) -> Dict:
"""
Main entry point: Process all new emails from inbox
Returns: Processing statistics
"""
if not self.enabled:
logger.info("⏭️ Email processing disabled (EMAIL_TO_TICKET_ENABLED=false)")
return {'status': 'disabled'}
logger.info("🔄 Starting email processing cycle...")
stats = {
'fetched': 0,
'saved': 0,
'classified': 0,
'awaiting_user_action': 0,
'rules_matched': 0,
'errors': 0
}
try:
# Step 1: Fetch new emails
limit = settings.EMAIL_MAX_FETCH_PER_RUN
new_emails = await self.email_service.fetch_new_emails(limit=limit)
stats['fetched'] = len(new_emails)
if not new_emails:
logger.info("✅ No new emails to process")
return stats
logger.info(f"📥 Fetched {len(new_emails)} new emails")
# Step 2: Save emails to database
for email_data in new_emails:
try:
email_id = await self.email_service.save_email(email_data)
if email_id:
email_data['id'] = email_id
stats['saved'] += 1
# Log: Email fetched and saved
await email_activity_logger.log_fetched(
email_id=email_id,
source='email_server',
message_id=email_data.get('message_id', 'unknown')
)
# Step 3-5: Process the single email
result = await self.process_single_email(email_data)
if result.get('classified'):
stats['classified'] += 1
if result.get('awaiting_user_action'):
stats['awaiting_user_action'] += 1
if result.get('rules_matched'):
stats['rules_matched'] += 1
except Exception as e:
logger.error(f"❌ Error processing email: {e}")
stats['errors'] += 1
logger.info(f"✅ Email processing complete: {stats}")
return stats
except Exception as e:
logger.error(f"❌ Email processing failed: {e}")
stats['errors'] += 1
return stats
async def process_single_email(self, email_data: Dict) -> Dict:
"""
Process a single email: Classify -> Workflow -> Rules
Can be used by process_inbox (new emails) or bulk_reprocess (existing emails)
"""
email_id = email_data.get('id')
stats = {
'classified': False,
'awaiting_user_action': False,
'workflows_executed': 0,
'rules_matched': False
}
try:
# Step 2.5: Detect and transcribe audio attachments
# This is done BEFORE classification so the AI can "read" the voice note
if settings.WHISPER_ENABLED:
await self._process_attachments_for_transcription(email_data)
# Step 3: Classify email (AI or Keyword)
if settings.EMAIL_AUTO_CLASSIFY:
await self._classify_and_update(email_data)
stats['classified'] = True
# Step 3.5: Gate automation by manual-approval policy and confidence
# Phase-1 policy: suggestions are generated automatically, actions are user-approved.
classification = (email_data.get('classification') or '').strip().lower()
confidence = float(email_data.get('confidence_score') or 0.0)
require_manual_approval = getattr(settings, 'EMAIL_REQUIRE_MANUAL_APPROVAL', True)
if require_manual_approval:
await self._set_awaiting_user_action(email_id, reason='manual_approval_required')
stats['awaiting_user_action'] = True
return stats
if not classification or confidence < settings.EMAIL_AI_CONFIDENCE_THRESHOLD:
await self._set_awaiting_user_action(email_id, reason='low_confidence')
stats['awaiting_user_action'] = True
return stats
# Step 4: Execute workflows based on classification
workflow_processed = False
if hasattr(settings, 'EMAIL_WORKFLOWS_ENABLED') and settings.EMAIL_WORKFLOWS_ENABLED:
workflow_result = await email_workflow_service.execute_workflows(email_data)
executed_count = workflow_result.get('workflows_executed', 0)
stats['workflows_executed'] = executed_count
if executed_count > 0:
logger.info(f"✅ Executed {executed_count} workflow(s) for email {email_id}")
# Mark as workflow-processed to avoid duplicate rule execution
if workflow_result.get('workflows_succeeded', 0) > 0:
workflow_processed = True
email_data['_workflow_processed'] = True
# Definition: A processed email is one that is classified and workflow run
# Mark as 'processed' and move to 'Processed' folder
logger.info(f"✅ Auto-marking email {email_id} as processed (Workflow executed)")
execute_update(
"""UPDATE email_messages
SET status = 'processed',
folder = 'Processed',
processed_at = CURRENT_TIMESTAMP,
auto_processed = true
WHERE id = %s""",
(email_id,)
)
# Step 5: Match against rules (legacy support) - skip if workflow already processed
if self.rules_enabled and not workflow_processed:
# Check if workflow already processed this email (double check DB)
existing_execution = execute_query(
"SELECT id FROM email_workflow_executions WHERE email_id = %s AND status = 'completed' LIMIT 1",
(email_data['id'],))
if existing_execution:
logger.info(f"⏭️ Email {email_id} already processed by workflow, skipping rules")
else:
matched = await self._match_rules(email_data)
if matched:
stats['rules_matched'] = True
elif workflow_processed:
logger.info(f"⏭️ Email {email_id} processed by workflow, skipping rules (coordination)")
return stats
except Exception as e:
logger.error(f"❌ Error in process_single_email for {email_id}: {e}")
raise e
async def _set_awaiting_user_action(self, email_id: Optional[int], reason: str):
"""Park an email for manual review before any automatic routing/action."""
if not email_id:
return
execute_update(
"""
UPDATE email_messages
SET status = 'awaiting_user_action',
folder = COALESCE(folder, 'INBOX'),
auto_processed = false,
processed_at = NULL,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""",
(email_id,)
)
logger.info("🛑 Email %s moved to awaiting_user_action (%s)", email_id, reason)
async def _classify_and_update(self, email_data: Dict):
"""Classify email and update database"""
try:
logger.info(f"🔍 _classify_and_update: ai_enabled={self.ai_enabled}, EMAIL_AI_ENABLED={settings.EMAIL_AI_ENABLED}")
# 1. Always start with Simple Keyword Classification (fast, free, deterministic)
logger.info(f"🔍 Running keyword classification for email {email_data['id']}")
result = simple_classifier.classify(email_data)
classification = result.get('classification', 'unknown')
confidence = result.get('confidence', 0.0)
# 2. Use AI if keyword analysis is weak or inconclusive
# Trigger if: 'general' OR 'unknown' OR confidence is low (< 0.70)
should_use_ai = (classification in ['general', 'unknown'] or confidence < 0.70)
if should_use_ai and self.ai_enabled:
logger.info(f"🤖 Escalating to AI analysis (Reason: '{classification}' with confidence {confidence})")
ai_result = await self.analysis_service.classify_email(email_data)
# Update result if AI returns valid data
if ai_result:
result = ai_result
logger.info(f"✅ AI re-classified as '{result.get('classification')}'")
classification = result.get('classification', 'unknown')
confidence = result.get('confidence', 0.0)
# Update email record
query = """
UPDATE email_messages
SET classification = %s,
confidence_score = %s,
classification_date = CURRENT_TIMESTAMP
WHERE id = %s
"""
execute_update(query, (classification, confidence, email_data['id']))
# Update email_data dict with classification for workflow execution
email_data['classification'] = classification
email_data['confidence_score'] = confidence
logger.info(f"✅ Classified email {email_data['id']} as '{classification}' (confidence: {confidence:.2f})")
# Log: Email classified
await email_activity_logger.log_classified(
email_id=email_data['id'],
classification=classification,
confidence=confidence,
method='ai' if self.ai_enabled else 'keyword'
)
# Update email_data for rule matching
email_data['classification'] = classification
email_data['confidence_score'] = confidence
# Extract invoice details if classified as invoice
if classification == 'invoice' and confidence >= settings.EMAIL_AI_CONFIDENCE_THRESHOLD:
extraction = await self.analysis_service.extract_invoice_details(email_data)
if extraction:
await self._update_extracted_fields(email_data['id'], extraction)
except Exception as e:
logger.error(f"❌ Classification failed for email {email_data['id']}: {e}")
async def _update_extracted_fields(self, email_id: int, extraction: Dict):
"""Update email with extracted invoice fields (from PDF attachment analysis)"""
try:
# Normalize amount field (new extraction uses total_amount, old used amount)
amount = extraction.get('total_amount') or extraction.get('amount')
query = """
UPDATE email_messages
SET extracted_invoice_number = %s,
extracted_amount = %s,
extracted_due_date = %s,
extracted_vendor_name = %s,
extracted_vendor_cvr = %s,
extracted_invoice_date = %s
WHERE id = %s
"""
execute_query(query, (
extraction.get('invoice_number'),
amount,
extraction.get('due_date'),
extraction.get('vendor_name'),
extraction.get('vendor_cvr'),
extraction.get('invoice_date'),
email_id
))
logger.info(
f"✅ Updated extracted fields for email {email_id}: "
f"invoice={extraction.get('invoice_number')}, "
f"vendor={extraction.get('vendor_name')}, "
f"cvr={extraction.get('vendor_cvr')}, "
f"amount={amount}"
)
except Exception as e:
logger.error(f"❌ Error updating extracted fields: {e}")
async def _match_rules(self, email_data: Dict) -> bool:
"""
Match email against active rules and execute actions
Returns True if rule was matched
"""
try:
# Get active rules ordered by priority
rules = self._get_active_rules()
if not rules:
return False
for rule in rules:
if self._rule_matches(email_data, rule):
logger.info(f"✅ Email {email_data['id']} matched rule: {rule['name']}")
# Update email with rule_id
query = "UPDATE email_messages SET rule_id = %s WHERE id = %s"
execute_query(query, (rule['id'], email_data['id']))
# Update rule statistics
self._update_rule_stats(rule['id'])
# Execute rule action (if auto-process enabled)
if self.auto_process:
await self._execute_rule_action(email_data, rule)
else:
logger.info(f"⏭️ Auto-process disabled - rule action not executed")
return True # First matching rule wins
return False
except Exception as e:
logger.error(f"❌ Error matching rules: {e}")
return False
def _get_active_rules(self) -> List[Dict]:
"""Get all enabled rules ordered by priority"""
query = """
SELECT * FROM email_rules
WHERE enabled = true
ORDER BY priority ASC
"""
return execute_query(query)
def _rule_matches(self, email_data: Dict, rule: Dict) -> bool:
"""
Check if email matches rule conditions
Rule conditions format: {"sender_email": "x@y.com", "classification": "invoice", ...}
"""
try:
conditions = rule['conditions']
# Check sender_email
if 'sender_email' in conditions:
if email_data.get('sender_email') != conditions['sender_email']:
return False
# Check sender_domain
if 'sender_domain' in conditions:
sender_email = email_data.get('sender_email', '')
if '@' in sender_email:
domain = sender_email.split('@')[1]
if domain not in conditions['sender_domain']:
return False
else:
return False
# Check classification
if 'classification' in conditions:
if email_data.get('classification') != conditions['classification']:
return False
# Check subject_contains
if 'subject_contains' in conditions:
subject = email_data.get('subject', '').lower()
keywords = conditions['subject_contains']
if isinstance(keywords, list):
if not any(kw.lower() in subject for kw in keywords):
return False
elif isinstance(keywords, str):
if keywords.lower() not in subject:
return False
# Check subject_regex (advanced pattern matching)
if 'subject_regex' in conditions:
import re
subject = email_data.get('subject', '')
pattern = conditions['subject_regex']
if not re.search(pattern, subject, re.IGNORECASE):
return False
# All conditions matched
return True
except Exception as e:
logger.error(f"❌ Error checking rule conditions: {e}")
return False
async def _execute_rule_action(self, email_data: Dict, rule: Dict):
"""Execute rule action (link_supplier, create_purchase, link_case, etc.)"""
try:
action_type = rule['action_type']
action_params = rule.get('action_params', {})
logger.info(f"🚀 Executing rule action: {action_type} for email {email_data['id']}")
if action_type == 'mark_spam':
await self._mark_as_spam(email_data['id'])
elif action_type == 'link_supplier':
await self._link_to_supplier(email_data, action_params)
elif action_type == 'link_customer':
await self._link_to_customer(email_data, action_params)
elif action_type == 'link_case':
await self._link_to_case(email_data, action_params)
elif action_type == 'create_purchase':
logger.info(f"⏭️ Purchase creation not implemented yet")
else:
logger.warning(f"⚠️ Unknown action type: {action_type}")
# Mark email as auto-processed
query = "UPDATE email_messages SET auto_processed = true WHERE id = %s"
execute_query(query, (email_data['id'],))
except Exception as e:
logger.error(f"❌ Error executing rule action: {e}")
async def _mark_as_spam(self, email_id: int):
"""Mark email as spam"""
query = "UPDATE email_messages SET classification = 'spam', status = 'processed' WHERE id = %s"
execute_query(query, (email_id,))
logger.info(f"✅ Marked email {email_id} as spam")
async def _link_to_supplier(self, email_data: Dict, params: Dict):
"""Link email to supplier/vendor"""
try:
# Auto-match supplier by email domain
if params.get('auto_match_domain'):
sender_email = email_data.get('sender_email', '')
if '@' in sender_email:
domain = sender_email.split('@')[1]
# Find vendor by domain
query = """
SELECT id, name FROM vendors
WHERE email LIKE %s OR contact_email LIKE %s
LIMIT 1
"""
result = execute_query(query, (f'%{domain}%', f'%{domain}%'))
if result:
vendor_id = result[0]['id']
vendor_name = result[0]['name']
# Link email to vendor
query = "UPDATE email_messages SET supplier_id = %s WHERE id = %s"
execute_query(query, (vendor_id, email_data['id']))
logger.info(f"✅ Linked email {email_data['id']} to vendor {vendor_name}")
return
# Manual supplier_id from params
elif 'supplier_id' in params:
vendor_id = params['supplier_id']
query = "UPDATE email_messages SET supplier_id = %s WHERE id = %s"
execute_query(query, (vendor_id, email_data['id']))
logger.info(f"✅ Linked email {email_data['id']} to vendor {vendor_id}")
except Exception as e:
logger.error(f"❌ Error linking to supplier: {e}")
async def _link_to_customer(self, email_data: Dict, params: Dict):
"""Link email to customer"""
try:
# Auto-match customer by email domain
if params.get('auto_match_domain'):
sender_email = email_data.get('sender_email', '')
if '@' in sender_email:
domain = sender_email.split('@')[1]
# Find customer by domain
query = """
SELECT id, customer_name FROM tmodule_customers
WHERE email LIKE %s
LIMIT 1
"""
result = execute_query(query, (f'%{domain}%',))
if result:
customer_id = result[0]['id']
customer_name = result[0]['customer_name']
# Link email to customer
query = "UPDATE email_messages SET customer_id = %s WHERE id = %s"
execute_query(query, (customer_id, email_data['id']))
logger.info(f"✅ Linked email {email_data['id']} to customer {customer_name}")
except Exception as e:
logger.error(f"❌ Error linking to customer: {e}")
async def _link_to_case(self, email_data: Dict, params: Dict):
"""Link email to timetracking case"""
try:
# Extract case number from subject (e.g., "CC0042", "Case #123")
if params.get('extract_case_from_subject'):
import re
subject = email_data.get('subject', '')
# Match patterns like CC0001, CC0042, etc.
match = re.search(r'CC(\d{4})', subject, re.IGNORECASE)
if match:
case_number = f"CC{match.group(1)}"
# Find case by case_number
query = """
SELECT id, title FROM tmodule_cases
WHERE case_number = %s
LIMIT 1
"""
result = execute_query(query, (case_number,))
if result:
case_id = result[0]['id']
case_title = result[0]['title']
# Link email to case
query = "UPDATE email_messages SET linked_case_id = %s WHERE id = %s"
execute_query(query, (case_id, email_data['id']))
logger.info(f"✅ Linked email {email_data['id']} to case {case_number}: {case_title}")
return
logger.info(f"⚠️ No case number found in subject: {subject}")
except Exception as e:
logger.error(f"❌ Error linking to case: {e}")
def _update_rule_stats(self, rule_id: int):
"""Update rule match statistics"""
query = """
UPDATE email_rules
SET match_count = match_count + 1,
last_matched_at = CURRENT_TIMESTAMP
WHERE id = %s
"""
execute_query(query, (rule_id,))
async def _process_attachments_for_transcription(self, email_data: Dict) -> None:
"""
Scan attachments for audio files, transcribe them, and enrich email body.
Also creates 'conversations' record.
"""
attachments = email_data.get('attachments', [])
if not attachments:
return
import hashlib
from pathlib import Path
transcripts = []
for att in attachments:
filename = att.get('filename', '')
content = att.get('content')
# Simple check, TranscriptionService does the real check
ext = Path(filename).suffix.lower()
if ext in settings.WHISPER_SUPPORTED_FORMATS:
transcript = await self.transcription_service.transcribe_audio(filename, content)
if transcript:
transcripts.append(f"--- TRANSKRIBERET LYDFIL ({filename}) ---\n{transcript}\n----------------------------------")
# Create conversation record (ALWAYS for supported audio, even if transcription fails)
try:
# Reconstruct path - mirroring EmailService._save_attachments logic
md5_hash = hashlib.md5(content).hexdigest()
# Default path in EmailService is "uploads/email_attachments"
file_path = f"uploads/email_attachments/{md5_hash}_{filename}"
# Determine Title from Subject if possible
title = f"Email Attachment: {filename}"
subject = email_data.get('subject', '')
# Pattern: "Optagelse af samtale(n) mellem 204 og 209"
# Handles both "samtale" and "samtalen", case insensitive
match = re.search(r'Optagelse af samtalen?\s+mellem\s+(\S+)\s+og\s+(\S+)', subject, re.IGNORECASE)
if match:
num1 = match.group(1)
num2 = match.group(2)
title = f"Samtale: {num1}{num2}"
# Generate Summary
summary = None
try:
from app.services.ollama_service import ollama_service
if transcript:
logger.info("🧠 Generating conversation summary...")
summary = await ollama_service.generate_summary(transcript)
except Exception as e:
logger.error(f"⚠️ Failed to generate summary: {e}")
# Determine user_id (optional, maybe from sender if internal?)
# For now, create as system/unassigned
query = """
INSERT INTO conversations
(title, transcript, summary, audio_file_path, source, email_message_id, created_at)
VALUES (%s, %s, %s, %s, 'email', %s, CURRENT_TIMESTAMP)
RETURNING id
"""
conversation_id = execute_insert(query, (
title,
transcript,
summary,
file_path,
email_data.get('id')
))
# Try to link to customer if we already know them?
# ACTUALLY: We are BEFORE classification/domain matching.
# Ideally, we should link later.
# BUT, we can store the 'email_id' if we had a column.
# I didn't add 'email_id' to conversations table.
# I added customer_id and ticket_id.
# Since this runs BEFORE those links are established, the conversation will be orphaned initially.
# We could improve this by updating the conversation AFTER Step 5 (customer linking).
# Or, simplified: The transcribed text is in the email body.
# When the email is converted to a Ticket, the text follows.
# But the 'Conversation' record is separate.
logger.info(f"✅ Created conversation record {conversation_id} for {filename}")
except Exception as e:
logger.error(f"❌ Failed to create conversation record: {e}")
if transcripts:
# Append to body
full_transcript_text = "\n\n" + "\n\n".join(transcripts)
if 'body' in email_data:
email_data['body'] += full_transcript_text
# Also update body_text if it exists (often used for analysis)
if 'body_text' in email_data and email_data['body_text']:
email_data['body_text'] += full_transcript_text
logger.info(f"✅ Enriched email {email_data.get('id')} with {len(transcripts)} transcription(s)")
async def reprocess_email(self, email_id: int):
"""Manually reprocess a single email"""
try:
# Get email from database
query = "SELECT * FROM email_messages WHERE id = %s"
result = execute_query(query, (email_id,))
if not result:
logger.error(f"❌ Email {email_id} not found")
return
email_data = result[0]
# Fetch attachments from DB to allow transcription on reprocess
query_att = "SELECT * FROM email_attachments WHERE email_id = %s"
atts = execute_query(query_att, (email_id,))
loaded_atts = []
if atts:
from pathlib import Path
for att in atts:
# 'file_path' is in DB
fpath = att.get('file_path')
if fpath:
try:
# If path is relative to cwd
path_obj = Path(fpath)
if path_obj.exists():
att['content'] = path_obj.read_bytes()
loaded_atts.append(att)
logger.info(f"📎 Loaded attachment content for reprocess: {att['filename']}")
except Exception as e:
logger.error(f"❌ Could not verify/load attachment {fpath}: {e}")
email_data['attachments'] = loaded_atts
# Run Transcription (Step 2.5 equivalent)
if settings.WHISPER_ENABLED and loaded_atts:
await self._process_attachments_for_transcription(email_data)
# Reclassify (either AI or keyword-based)
if settings.EMAIL_AUTO_CLASSIFY:
await self._classify_and_update(email_data)
# Rematch rules
if self.rules_enabled:
await self._match_rules(email_data)
logger.info(f"✅ Reprocessed email {email_id}")
except Exception as e:
logger.error(f"❌ Error reprocessing email {email_id}: {e}")