docs: Create vTiger & Simply-CRM integration setup guide with credential requirements feat: Implement ticket system enhancements including relations, calendar events, templates, and AI suggestions refactor: Update ticket system migration to include audit logging and enhanced email metadata
485 lines
20 KiB
Python
485 lines
20 KiB
Python
"""
|
|
Email Processor Service
|
|
Main orchestrator for email workflow: Fetch → Store → Classify → Match Rules → Link Entities
|
|
Based on OmniSync architecture adapted for BMC Hub
|
|
"""
|
|
|
|
import logging
|
|
from typing import List, Dict, Optional
|
|
from datetime import datetime
|
|
|
|
from app.services.email_service import EmailService
|
|
from app.services.email_analysis_service import EmailAnalysisService
|
|
from app.services.simple_classifier import simple_classifier
|
|
from app.services.email_workflow_service import email_workflow_service
|
|
from app.services.email_activity_logger import email_activity_logger
|
|
from app.core.config import settings
|
|
from app.core.database import execute_query, execute_update
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EmailProcessorService:
|
|
"""Main orchestrator for email processing pipeline"""
|
|
|
|
def __init__(self):
|
|
self.email_service = EmailService()
|
|
self.analysis_service = EmailAnalysisService()
|
|
self.enabled = settings.EMAIL_TO_TICKET_ENABLED
|
|
self.rules_enabled = settings.EMAIL_RULES_ENABLED
|
|
self.auto_process = settings.EMAIL_RULES_AUTO_PROCESS
|
|
self.ai_enabled = settings.EMAIL_AI_ENABLED
|
|
|
|
async def process_inbox(self) -> Dict:
|
|
"""
|
|
Main entry point: Process all new emails from inbox
|
|
Returns: Processing statistics
|
|
"""
|
|
|
|
if not self.enabled:
|
|
logger.info("⏭️ Email processing disabled (EMAIL_TO_TICKET_ENABLED=false)")
|
|
return {'status': 'disabled'}
|
|
|
|
logger.info("🔄 Starting email processing cycle...")
|
|
|
|
stats = {
|
|
'fetched': 0,
|
|
'saved': 0,
|
|
'classified': 0,
|
|
'rules_matched': 0,
|
|
'errors': 0
|
|
}
|
|
|
|
try:
|
|
# Step 1: Fetch new emails
|
|
limit = settings.EMAIL_MAX_FETCH_PER_RUN
|
|
new_emails = await self.email_service.fetch_new_emails(limit=limit)
|
|
stats['fetched'] = len(new_emails)
|
|
|
|
if not new_emails:
|
|
logger.info("✅ No new emails to process")
|
|
return stats
|
|
|
|
logger.info(f"📥 Fetched {len(new_emails)} new emails")
|
|
|
|
# Step 2: Save emails to database
|
|
for email_data in new_emails:
|
|
try:
|
|
email_id = await self.email_service.save_email(email_data)
|
|
|
|
if email_id:
|
|
email_data['id'] = email_id
|
|
stats['saved'] += 1
|
|
|
|
# Log: Email fetched and saved
|
|
await email_activity_logger.log_fetched(
|
|
email_id=email_id,
|
|
source='email_server',
|
|
message_id=email_data.get('message_id', 'unknown')
|
|
)
|
|
|
|
# Step 3: Classify email with AI
|
|
if settings.EMAIL_AI_ENABLED and settings.EMAIL_AUTO_CLASSIFY:
|
|
await self._classify_and_update(email_data)
|
|
stats['classified'] += 1
|
|
|
|
# Step 4: Execute workflows based on classification
|
|
workflow_processed = False
|
|
if hasattr(settings, 'EMAIL_WORKFLOWS_ENABLED') and settings.EMAIL_WORKFLOWS_ENABLED:
|
|
workflow_result = await email_workflow_service.execute_workflows(email_data)
|
|
if workflow_result.get('workflows_executed', 0) > 0:
|
|
logger.info(f"✅ Executed {workflow_result['workflows_executed']} workflow(s) for email {email_id}")
|
|
# Mark as workflow-processed to avoid duplicate rule execution
|
|
if workflow_result.get('workflows_succeeded', 0) > 0:
|
|
workflow_processed = True
|
|
email_data['_workflow_processed'] = True
|
|
|
|
# Step 5: Match against rules (legacy support) - skip if workflow already processed
|
|
if self.rules_enabled and not workflow_processed:
|
|
# Check if workflow already processed this email
|
|
existing_execution = execute_query_single(
|
|
"SELECT id FROM email_workflow_executions WHERE email_id = %s AND status = 'completed' LIMIT 1",
|
|
(email_id,))
|
|
|
|
if existing_execution:
|
|
logger.info(f"⏭️ Email {email_id} already processed by workflow, skipping rules")
|
|
else:
|
|
matched = await self._match_rules(email_data)
|
|
if matched:
|
|
stats['rules_matched'] += 1
|
|
elif workflow_processed:
|
|
logger.info(f"⏭️ Email {email_id} processed by workflow, skipping rules (coordination)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error processing email: {e}")
|
|
stats['errors'] += 1
|
|
|
|
logger.info(f"✅ Email processing complete: {stats}")
|
|
return stats
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Email processing failed: {e}")
|
|
stats['errors'] += 1
|
|
return stats
|
|
|
|
async def _classify_and_update(self, email_data: Dict):
|
|
"""Classify email and update database"""
|
|
try:
|
|
logger.info(f"🔍 _classify_and_update: ai_enabled={self.ai_enabled}, EMAIL_AI_ENABLED={settings.EMAIL_AI_ENABLED}")
|
|
|
|
# Run classification (AI or simple keyword-based)
|
|
if self.ai_enabled:
|
|
result = await self.analysis_service.classify_email(email_data)
|
|
else:
|
|
logger.info(f"🔍 Using simple keyword classifier for email {email_data['id']}")
|
|
result = simple_classifier.classify(email_data)
|
|
|
|
classification = result.get('classification', 'unknown')
|
|
confidence = result.get('confidence', 0.0)
|
|
|
|
# Update email record
|
|
query = """
|
|
UPDATE email_messages
|
|
SET classification = %s,
|
|
confidence_score = %s,
|
|
classification_date = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
"""
|
|
execute_update(query, (classification, confidence, email_data['id']))
|
|
|
|
# Update email_data dict with classification for workflow execution
|
|
email_data['classification'] = classification
|
|
email_data['confidence_score'] = confidence
|
|
|
|
logger.info(f"✅ Classified email {email_data['id']} as '{classification}' (confidence: {confidence:.2f})")
|
|
|
|
# Log: Email classified
|
|
await email_activity_logger.log_classified(
|
|
email_id=email_data['id'],
|
|
classification=classification,
|
|
confidence=confidence,
|
|
method='ai' if self.ai_enabled else 'keyword'
|
|
)
|
|
|
|
# Update email_data for rule matching
|
|
email_data['classification'] = classification
|
|
email_data['confidence_score'] = confidence
|
|
|
|
# Extract invoice details if classified as invoice
|
|
if classification == 'invoice' and confidence >= settings.EMAIL_AI_CONFIDENCE_THRESHOLD:
|
|
extraction = await self.analysis_service.extract_invoice_details(email_data)
|
|
|
|
if extraction:
|
|
await self._update_extracted_fields(email_data['id'], extraction)
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Classification failed for email {email_data['id']}: {e}")
|
|
|
|
async def _update_extracted_fields(self, email_id: int, extraction: Dict):
|
|
"""Update email with extracted invoice fields"""
|
|
try:
|
|
query = """
|
|
UPDATE email_messages
|
|
SET extracted_invoice_number = %s,
|
|
extracted_amount = %s,
|
|
extracted_due_date = %s
|
|
WHERE id = %s
|
|
"""
|
|
|
|
execute_query(query, (
|
|
extraction.get('invoice_number'),
|
|
extraction.get('amount'),
|
|
extraction.get('due_date'),
|
|
email_id
|
|
))
|
|
|
|
logger.info(f"✅ Updated extracted fields for email {email_id}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error updating extracted fields: {e}")
|
|
|
|
async def _match_rules(self, email_data: Dict) -> bool:
|
|
"""
|
|
Match email against active rules and execute actions
|
|
Returns True if rule was matched
|
|
"""
|
|
try:
|
|
# Get active rules ordered by priority
|
|
rules = self._get_active_rules()
|
|
|
|
if not rules:
|
|
return False
|
|
|
|
for rule in rules:
|
|
if self._rule_matches(email_data, rule):
|
|
logger.info(f"✅ Email {email_data['id']} matched rule: {rule['name']}")
|
|
|
|
# Update email with rule_id
|
|
query = "UPDATE email_messages SET rule_id = %s WHERE id = %s"
|
|
execute_query(query, (rule['id'], email_data['id']))
|
|
|
|
# Update rule statistics
|
|
self._update_rule_stats(rule['id'])
|
|
|
|
# Execute rule action (if auto-process enabled)
|
|
if self.auto_process:
|
|
await self._execute_rule_action(email_data, rule)
|
|
else:
|
|
logger.info(f"⏭️ Auto-process disabled - rule action not executed")
|
|
|
|
return True # First matching rule wins
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error matching rules: {e}")
|
|
return False
|
|
|
|
def _get_active_rules(self) -> List[Dict]:
|
|
"""Get all enabled rules ordered by priority"""
|
|
query = """
|
|
SELECT * FROM email_rules
|
|
WHERE enabled = true
|
|
ORDER BY priority ASC
|
|
"""
|
|
return execute_query(query)
|
|
|
|
def _rule_matches(self, email_data: Dict, rule: Dict) -> bool:
|
|
"""
|
|
Check if email matches rule conditions
|
|
Rule conditions format: {"sender_email": "x@y.com", "classification": "invoice", ...}
|
|
"""
|
|
try:
|
|
conditions = rule['conditions']
|
|
|
|
# Check sender_email
|
|
if 'sender_email' in conditions:
|
|
if email_data.get('sender_email') != conditions['sender_email']:
|
|
return False
|
|
|
|
# Check sender_domain
|
|
if 'sender_domain' in conditions:
|
|
sender_email = email_data.get('sender_email', '')
|
|
if '@' in sender_email:
|
|
domain = sender_email.split('@')[1]
|
|
if domain not in conditions['sender_domain']:
|
|
return False
|
|
else:
|
|
return False
|
|
|
|
# Check classification
|
|
if 'classification' in conditions:
|
|
if email_data.get('classification') != conditions['classification']:
|
|
return False
|
|
|
|
# Check subject_contains
|
|
if 'subject_contains' in conditions:
|
|
subject = email_data.get('subject', '').lower()
|
|
keywords = conditions['subject_contains']
|
|
|
|
if isinstance(keywords, list):
|
|
if not any(kw.lower() in subject for kw in keywords):
|
|
return False
|
|
elif isinstance(keywords, str):
|
|
if keywords.lower() not in subject:
|
|
return False
|
|
|
|
# Check subject_regex (advanced pattern matching)
|
|
if 'subject_regex' in conditions:
|
|
import re
|
|
subject = email_data.get('subject', '')
|
|
pattern = conditions['subject_regex']
|
|
|
|
if not re.search(pattern, subject, re.IGNORECASE):
|
|
return False
|
|
|
|
# All conditions matched
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error checking rule conditions: {e}")
|
|
return False
|
|
|
|
async def _execute_rule_action(self, email_data: Dict, rule: Dict):
|
|
"""Execute rule action (link_supplier, create_purchase, link_case, etc.)"""
|
|
try:
|
|
action_type = rule['action_type']
|
|
action_params = rule.get('action_params', {})
|
|
|
|
logger.info(f"🚀 Executing rule action: {action_type} for email {email_data['id']}")
|
|
|
|
if action_type == 'mark_spam':
|
|
await self._mark_as_spam(email_data['id'])
|
|
|
|
elif action_type == 'link_supplier':
|
|
await self._link_to_supplier(email_data, action_params)
|
|
|
|
elif action_type == 'link_customer':
|
|
await self._link_to_customer(email_data, action_params)
|
|
|
|
elif action_type == 'link_case':
|
|
await self._link_to_case(email_data, action_params)
|
|
|
|
elif action_type == 'create_purchase':
|
|
logger.info(f"⏭️ Purchase creation not implemented yet")
|
|
|
|
else:
|
|
logger.warning(f"⚠️ Unknown action type: {action_type}")
|
|
|
|
# Mark email as auto-processed
|
|
query = "UPDATE email_messages SET auto_processed = true WHERE id = %s"
|
|
execute_query(query, (email_data['id'],))
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error executing rule action: {e}")
|
|
|
|
async def _mark_as_spam(self, email_id: int):
|
|
"""Mark email as spam"""
|
|
query = "UPDATE email_messages SET classification = 'spam', status = 'processed' WHERE id = %s"
|
|
execute_query(query, (email_id,))
|
|
logger.info(f"✅ Marked email {email_id} as spam")
|
|
|
|
async def _link_to_supplier(self, email_data: Dict, params: Dict):
|
|
"""Link email to supplier/vendor"""
|
|
try:
|
|
# Auto-match supplier by email domain
|
|
if params.get('auto_match_domain'):
|
|
sender_email = email_data.get('sender_email', '')
|
|
|
|
if '@' in sender_email:
|
|
domain = sender_email.split('@')[1]
|
|
|
|
# Find vendor by domain
|
|
query = """
|
|
SELECT id, name FROM vendors
|
|
WHERE email LIKE %s OR contact_email LIKE %s
|
|
LIMIT 1
|
|
"""
|
|
result = execute_query(query, (f'%{domain}%', f'%{domain}%'))
|
|
|
|
if result:
|
|
vendor_id = result[0]['id']
|
|
vendor_name = result[0]['name']
|
|
|
|
# Link email to vendor
|
|
query = "UPDATE email_messages SET supplier_id = %s WHERE id = %s"
|
|
execute_query(query, (vendor_id, email_data['id']))
|
|
|
|
logger.info(f"✅ Linked email {email_data['id']} to vendor {vendor_name}")
|
|
return
|
|
|
|
# Manual supplier_id from params
|
|
elif 'supplier_id' in params:
|
|
vendor_id = params['supplier_id']
|
|
query = "UPDATE email_messages SET supplier_id = %s WHERE id = %s"
|
|
execute_query(query, (vendor_id, email_data['id']))
|
|
logger.info(f"✅ Linked email {email_data['id']} to vendor {vendor_id}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error linking to supplier: {e}")
|
|
|
|
async def _link_to_customer(self, email_data: Dict, params: Dict):
|
|
"""Link email to customer"""
|
|
try:
|
|
# Auto-match customer by email domain
|
|
if params.get('auto_match_domain'):
|
|
sender_email = email_data.get('sender_email', '')
|
|
|
|
if '@' in sender_email:
|
|
domain = sender_email.split('@')[1]
|
|
|
|
# Find customer by domain
|
|
query = """
|
|
SELECT id, customer_name FROM tmodule_customers
|
|
WHERE email LIKE %s
|
|
LIMIT 1
|
|
"""
|
|
result = execute_query(query, (f'%{domain}%',))
|
|
|
|
if result:
|
|
customer_id = result[0]['id']
|
|
customer_name = result[0]['customer_name']
|
|
|
|
# Link email to customer
|
|
query = "UPDATE email_messages SET customer_id = %s WHERE id = %s"
|
|
execute_query(query, (customer_id, email_data['id']))
|
|
|
|
logger.info(f"✅ Linked email {email_data['id']} to customer {customer_name}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error linking to customer: {e}")
|
|
|
|
async def _link_to_case(self, email_data: Dict, params: Dict):
|
|
"""Link email to timetracking case"""
|
|
try:
|
|
# Extract case number from subject (e.g., "CC0042", "Case #123")
|
|
if params.get('extract_case_from_subject'):
|
|
import re
|
|
subject = email_data.get('subject', '')
|
|
|
|
# Match patterns like CC0001, CC0042, etc.
|
|
match = re.search(r'CC(\d{4})', subject, re.IGNORECASE)
|
|
|
|
if match:
|
|
case_number = f"CC{match.group(1)}"
|
|
|
|
# Find case by case_number
|
|
query = """
|
|
SELECT id, title FROM tmodule_cases
|
|
WHERE case_number = %s
|
|
LIMIT 1
|
|
"""
|
|
result = execute_query(query, (case_number,))
|
|
|
|
if result:
|
|
case_id = result[0]['id']
|
|
case_title = result[0]['title']
|
|
|
|
# Link email to case
|
|
query = "UPDATE email_messages SET linked_case_id = %s WHERE id = %s"
|
|
execute_query(query, (case_id, email_data['id']))
|
|
|
|
logger.info(f"✅ Linked email {email_data['id']} to case {case_number}: {case_title}")
|
|
return
|
|
|
|
logger.info(f"⚠️ No case number found in subject: {subject}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error linking to case: {e}")
|
|
|
|
def _update_rule_stats(self, rule_id: int):
|
|
"""Update rule match statistics"""
|
|
query = """
|
|
UPDATE email_rules
|
|
SET match_count = match_count + 1,
|
|
last_matched_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
"""
|
|
execute_query(query, (rule_id,))
|
|
|
|
async def reprocess_email(self, email_id: int):
|
|
"""Manually reprocess a single email"""
|
|
try:
|
|
# Get email from database
|
|
query = "SELECT * FROM email_messages WHERE id = %s"
|
|
result = execute_query(query, (email_id,))
|
|
|
|
if not result:
|
|
logger.error(f"❌ Email {email_id} not found")
|
|
return
|
|
|
|
email_data = result[0]
|
|
|
|
# Reclassify (either AI or keyword-based)
|
|
if settings.EMAIL_AUTO_CLASSIFY:
|
|
await self._classify_and_update(email_data)
|
|
|
|
# Rematch rules
|
|
if self.rules_enabled:
|
|
await self._match_rules(email_data)
|
|
|
|
logger.info(f"✅ Reprocessed email {email_id}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error reprocessing email {email_id}: {e}")
|