bmc_hub/app/services/email_processor_service.py
Christian 8791e34f4e feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00

435 lines
17 KiB
Python

"""
Email Processor Service
Main orchestrator for email workflow: Fetch → Store → Classify → Match Rules → Link Entities
Based on OmniSync architecture adapted for BMC Hub
"""
import logging
from typing import List, Dict, Optional
from datetime import datetime
from app.services.email_service import EmailService
from app.services.email_analysis_service import EmailAnalysisService
from app.core.config import settings
from app.core.database import execute_query
logger = logging.getLogger(__name__)
class EmailProcessorService:
"""Main orchestrator for email processing pipeline"""
def __init__(self):
self.email_service = EmailService()
self.analysis_service = EmailAnalysisService()
self.enabled = settings.EMAIL_TO_TICKET_ENABLED
self.rules_enabled = settings.EMAIL_RULES_ENABLED
self.auto_process = settings.EMAIL_RULES_AUTO_PROCESS
async def process_inbox(self) -> Dict:
"""
Main entry point: Process all new emails from inbox
Returns: Processing statistics
"""
if not self.enabled:
logger.info("⏭️ Email processing disabled (EMAIL_TO_TICKET_ENABLED=false)")
return {'status': 'disabled'}
logger.info("🔄 Starting email processing cycle...")
stats = {
'fetched': 0,
'saved': 0,
'classified': 0,
'rules_matched': 0,
'errors': 0
}
try:
# Step 1: Fetch new emails
limit = settings.EMAIL_MAX_FETCH_PER_RUN
new_emails = await self.email_service.fetch_new_emails(limit=limit)
stats['fetched'] = len(new_emails)
if not new_emails:
logger.info("✅ No new emails to process")
return stats
logger.info(f"📥 Fetched {len(new_emails)} new emails")
# Step 2: Save emails to database
for email_data in new_emails:
try:
email_id = await self.email_service.save_email(email_data)
if email_id:
email_data['id'] = email_id
stats['saved'] += 1
# Step 3: Classify email with AI
if settings.EMAIL_AI_ENABLED and settings.EMAIL_AUTO_CLASSIFY:
await self._classify_and_update(email_data)
stats['classified'] += 1
# Step 4: Match against rules
if self.rules_enabled:
matched = await self._match_rules(email_data)
if matched:
stats['rules_matched'] += 1
except Exception as e:
logger.error(f"❌ Error processing email: {e}")
stats['errors'] += 1
logger.info(f"✅ Email processing complete: {stats}")
return stats
except Exception as e:
logger.error(f"❌ Email processing failed: {e}")
stats['errors'] += 1
return stats
async def _classify_and_update(self, email_data: Dict):
"""Classify email and update database"""
try:
# Run AI classification
result = await self.analysis_service.classify_email(email_data)
classification = result.get('classification', 'unknown')
confidence = result.get('confidence', 0.0)
# Update email record
query = """
UPDATE email_messages
SET classification = %s,
confidence_score = %s,
classification_date = CURRENT_TIMESTAMP
WHERE id = %s
"""
execute_query(query, (classification, confidence, email_data['id']))
logger.info(f"✅ Classified email {email_data['id']} as '{classification}' (confidence: {confidence:.2f})")
# Update email_data for rule matching
email_data['classification'] = classification
email_data['confidence_score'] = confidence
# Extract invoice details if classified as invoice
if classification == 'invoice' and confidence >= settings.EMAIL_AI_CONFIDENCE_THRESHOLD:
extraction = await self.analysis_service.extract_invoice_details(email_data)
if extraction:
await self._update_extracted_fields(email_data['id'], extraction)
except Exception as e:
logger.error(f"❌ Classification failed for email {email_data['id']}: {e}")
async def _update_extracted_fields(self, email_id: int, extraction: Dict):
"""Update email with extracted invoice fields"""
try:
query = """
UPDATE email_messages
SET extracted_invoice_number = %s,
extracted_amount = %s,
extracted_due_date = %s
WHERE id = %s
"""
execute_query(query, (
extraction.get('invoice_number'),
extraction.get('amount'),
extraction.get('due_date'),
email_id
))
logger.info(f"✅ Updated extracted fields for email {email_id}")
except Exception as e:
logger.error(f"❌ Error updating extracted fields: {e}")
async def _match_rules(self, email_data: Dict) -> bool:
"""
Match email against active rules and execute actions
Returns True if rule was matched
"""
try:
# Get active rules ordered by priority
rules = self._get_active_rules()
if not rules:
return False
for rule in rules:
if self._rule_matches(email_data, rule):
logger.info(f"✅ Email {email_data['id']} matched rule: {rule['name']}")
# Update email with rule_id
query = "UPDATE email_messages SET rule_id = %s WHERE id = %s"
execute_query(query, (rule['id'], email_data['id']))
# Update rule statistics
self._update_rule_stats(rule['id'])
# Execute rule action (if auto-process enabled)
if self.auto_process:
await self._execute_rule_action(email_data, rule)
else:
logger.info(f"⏭️ Auto-process disabled - rule action not executed")
return True # First matching rule wins
return False
except Exception as e:
logger.error(f"❌ Error matching rules: {e}")
return False
def _get_active_rules(self) -> List[Dict]:
"""Get all enabled rules ordered by priority"""
query = """
SELECT * FROM email_rules
WHERE enabled = true
ORDER BY priority ASC
"""
return execute_query(query)
def _rule_matches(self, email_data: Dict, rule: Dict) -> bool:
"""
Check if email matches rule conditions
Rule conditions format: {"sender_email": "x@y.com", "classification": "invoice", ...}
"""
try:
conditions = rule['conditions']
# Check sender_email
if 'sender_email' in conditions:
if email_data.get('sender_email') != conditions['sender_email']:
return False
# Check sender_domain
if 'sender_domain' in conditions:
sender_email = email_data.get('sender_email', '')
if '@' in sender_email:
domain = sender_email.split('@')[1]
if domain not in conditions['sender_domain']:
return False
else:
return False
# Check classification
if 'classification' in conditions:
if email_data.get('classification') != conditions['classification']:
return False
# Check subject_contains
if 'subject_contains' in conditions:
subject = email_data.get('subject', '').lower()
keywords = conditions['subject_contains']
if isinstance(keywords, list):
if not any(kw.lower() in subject for kw in keywords):
return False
elif isinstance(keywords, str):
if keywords.lower() not in subject:
return False
# Check subject_regex (advanced pattern matching)
if 'subject_regex' in conditions:
import re
subject = email_data.get('subject', '')
pattern = conditions['subject_regex']
if not re.search(pattern, subject, re.IGNORECASE):
return False
# All conditions matched
return True
except Exception as e:
logger.error(f"❌ Error checking rule conditions: {e}")
return False
async def _execute_rule_action(self, email_data: Dict, rule: Dict):
"""Execute rule action (link_supplier, create_purchase, link_case, etc.)"""
try:
action_type = rule['action_type']
action_params = rule.get('action_params', {})
logger.info(f"🚀 Executing rule action: {action_type} for email {email_data['id']}")
if action_type == 'mark_spam':
await self._mark_as_spam(email_data['id'])
elif action_type == 'link_supplier':
await self._link_to_supplier(email_data, action_params)
elif action_type == 'link_customer':
await self._link_to_customer(email_data, action_params)
elif action_type == 'link_case':
await self._link_to_case(email_data, action_params)
elif action_type == 'create_purchase':
logger.info(f"⏭️ Purchase creation not implemented yet")
else:
logger.warning(f"⚠️ Unknown action type: {action_type}")
# Mark email as auto-processed
query = "UPDATE email_messages SET auto_processed = true WHERE id = %s"
execute_query(query, (email_data['id'],))
except Exception as e:
logger.error(f"❌ Error executing rule action: {e}")
async def _mark_as_spam(self, email_id: int):
"""Mark email as spam"""
query = "UPDATE email_messages SET classification = 'spam', status = 'processed' WHERE id = %s"
execute_query(query, (email_id,))
logger.info(f"✅ Marked email {email_id} as spam")
async def _link_to_supplier(self, email_data: Dict, params: Dict):
"""Link email to supplier/vendor"""
try:
# Auto-match supplier by email domain
if params.get('auto_match_domain'):
sender_email = email_data.get('sender_email', '')
if '@' in sender_email:
domain = sender_email.split('@')[1]
# Find vendor by domain
query = """
SELECT id, name FROM vendors
WHERE email LIKE %s OR contact_email LIKE %s
LIMIT 1
"""
result = execute_query(query, (f'%{domain}%', f'%{domain}%'))
if result:
vendor_id = result[0]['id']
vendor_name = result[0]['name']
# Link email to vendor
query = "UPDATE email_messages SET supplier_id = %s WHERE id = %s"
execute_query(query, (vendor_id, email_data['id']))
logger.info(f"✅ Linked email {email_data['id']} to vendor {vendor_name}")
return
# Manual supplier_id from params
elif 'supplier_id' in params:
vendor_id = params['supplier_id']
query = "UPDATE email_messages SET supplier_id = %s WHERE id = %s"
execute_query(query, (vendor_id, email_data['id']))
logger.info(f"✅ Linked email {email_data['id']} to vendor {vendor_id}")
except Exception as e:
logger.error(f"❌ Error linking to supplier: {e}")
async def _link_to_customer(self, email_data: Dict, params: Dict):
"""Link email to customer"""
try:
# Auto-match customer by email domain
if params.get('auto_match_domain'):
sender_email = email_data.get('sender_email', '')
if '@' in sender_email:
domain = sender_email.split('@')[1]
# Find customer by domain
query = """
SELECT id, customer_name FROM tmodule_customers
WHERE email LIKE %s
LIMIT 1
"""
result = execute_query(query, (f'%{domain}%',))
if result:
customer_id = result[0]['id']
customer_name = result[0]['customer_name']
# Link email to customer
query = "UPDATE email_messages SET customer_id = %s WHERE id = %s"
execute_query(query, (customer_id, email_data['id']))
logger.info(f"✅ Linked email {email_data['id']} to customer {customer_name}")
except Exception as e:
logger.error(f"❌ Error linking to customer: {e}")
async def _link_to_case(self, email_data: Dict, params: Dict):
"""Link email to timetracking case"""
try:
# Extract case number from subject (e.g., "CC0042", "Case #123")
if params.get('extract_case_from_subject'):
import re
subject = email_data.get('subject', '')
# Match patterns like CC0001, CC0042, etc.
match = re.search(r'CC(\d{4})', subject, re.IGNORECASE)
if match:
case_number = f"CC{match.group(1)}"
# Find case by case_number
query = """
SELECT id, title FROM tmodule_cases
WHERE case_number = %s
LIMIT 1
"""
result = execute_query(query, (case_number,))
if result:
case_id = result[0]['id']
case_title = result[0]['title']
# Link email to case
query = "UPDATE email_messages SET linked_case_id = %s WHERE id = %s"
execute_query(query, (case_id, email_data['id']))
logger.info(f"✅ Linked email {email_data['id']} to case {case_number}: {case_title}")
return
logger.info(f"⚠️ No case number found in subject: {subject}")
except Exception as e:
logger.error(f"❌ Error linking to case: {e}")
def _update_rule_stats(self, rule_id: int):
"""Update rule match statistics"""
query = """
UPDATE email_rules
SET match_count = match_count + 1,
last_matched_at = CURRENT_TIMESTAMP
WHERE id = %s
"""
execute_query(query, (rule_id,))
async def reprocess_email(self, email_id: int):
"""Manually reprocess a single email"""
try:
# Get email from database
query = "SELECT * FROM email_messages WHERE id = %s"
result = execute_query(query, (email_id,))
if not result:
logger.error(f"❌ Email {email_id} not found")
return
email_data = result[0]
# Reclassify
if settings.EMAIL_AI_ENABLED:
await self._classify_and_update(email_data)
# Rematch rules
if self.rules_enabled:
await self._match_rules(email_data)
logger.info(f"✅ Reprocessed email {email_id}")
except Exception as e:
logger.error(f"❌ Error reprocessing email {email_id}: {e}")