""" Email Processor Service Main orchestrator for email workflow: Fetch → Store → Classify → Match Rules → Link Entities Based on OmniSync architecture adapted for BMC Hub """ import logging from typing import List, Dict, Optional from datetime import datetime from app.services.email_service import EmailService from app.services.email_analysis_service import EmailAnalysisService from app.services.simple_classifier import simple_classifier from app.services.email_workflow_service import email_workflow_service from app.services.email_activity_logger import email_activity_logger from app.core.config import settings from app.core.database import execute_query, execute_update logger = logging.getLogger(__name__) class EmailProcessorService: """Main orchestrator for email processing pipeline""" def __init__(self): self.email_service = EmailService() self.analysis_service = EmailAnalysisService() self.enabled = settings.EMAIL_TO_TICKET_ENABLED self.rules_enabled = settings.EMAIL_RULES_ENABLED self.auto_process = settings.EMAIL_RULES_AUTO_PROCESS self.ai_enabled = settings.EMAIL_AI_ENABLED async def process_inbox(self) -> Dict: """ Main entry point: Process all new emails from inbox Returns: Processing statistics """ if not self.enabled: logger.info("⏭️ Email processing disabled (EMAIL_TO_TICKET_ENABLED=false)") return {'status': 'disabled'} logger.info("🔄 Starting email processing cycle...") stats = { 'fetched': 0, 'saved': 0, 'classified': 0, 'rules_matched': 0, 'errors': 0 } try: # Step 1: Fetch new emails limit = settings.EMAIL_MAX_FETCH_PER_RUN new_emails = await self.email_service.fetch_new_emails(limit=limit) stats['fetched'] = len(new_emails) if not new_emails: logger.info("✅ No new emails to process") return stats logger.info(f"📥 Fetched {len(new_emails)} new emails") # Step 2: Save emails to database for email_data in new_emails: try: email_id = await self.email_service.save_email(email_data) if email_id: email_data['id'] = email_id stats['saved'] += 1 # Log: Email fetched and saved await email_activity_logger.log_fetched( email_id=email_id, source='email_server', message_id=email_data.get('message_id', 'unknown') ) # Step 3: Classify email with AI if settings.EMAIL_AI_ENABLED and settings.EMAIL_AUTO_CLASSIFY: await self._classify_and_update(email_data) stats['classified'] += 1 # Step 4: Execute workflows based on classification workflow_processed = False if hasattr(settings, 'EMAIL_WORKFLOWS_ENABLED') and settings.EMAIL_WORKFLOWS_ENABLED: workflow_result = await email_workflow_service.execute_workflows(email_data) if workflow_result.get('workflows_executed', 0) > 0: logger.info(f"✅ Executed {workflow_result['workflows_executed']} workflow(s) for email {email_id}") # Mark as workflow-processed to avoid duplicate rule execution if workflow_result.get('workflows_succeeded', 0) > 0: workflow_processed = True email_data['_workflow_processed'] = True # Step 5: Match against rules (legacy support) - skip if workflow already processed if self.rules_enabled and not workflow_processed: # Check if workflow already processed this email existing_execution = execute_query_single( "SELECT id FROM email_workflow_executions WHERE email_id = %s AND status = 'completed' LIMIT 1", (email_id,)) if existing_execution: logger.info(f"⏭️ Email {email_id} already processed by workflow, skipping rules") else: matched = await self._match_rules(email_data) if matched: stats['rules_matched'] += 1 elif workflow_processed: logger.info(f"⏭️ Email {email_id} processed by workflow, skipping rules (coordination)") except Exception as e: logger.error(f"❌ Error processing email: {e}") stats['errors'] += 1 logger.info(f"✅ Email processing complete: {stats}") return stats except Exception as e: logger.error(f"❌ Email processing failed: {e}") stats['errors'] += 1 return stats async def _classify_and_update(self, email_data: Dict): """Classify email and update database""" try: logger.info(f"🔍 _classify_and_update: ai_enabled={self.ai_enabled}, EMAIL_AI_ENABLED={settings.EMAIL_AI_ENABLED}") # Run classification (AI or simple keyword-based) if self.ai_enabled: result = await self.analysis_service.classify_email(email_data) else: logger.info(f"🔍 Using simple keyword classifier for email {email_data['id']}") result = simple_classifier.classify(email_data) classification = result.get('classification', 'unknown') confidence = result.get('confidence', 0.0) # Update email record query = """ UPDATE email_messages SET classification = %s, confidence_score = %s, classification_date = CURRENT_TIMESTAMP WHERE id = %s """ execute_update(query, (classification, confidence, email_data['id'])) # Update email_data dict with classification for workflow execution email_data['classification'] = classification email_data['confidence_score'] = confidence logger.info(f"✅ Classified email {email_data['id']} as '{classification}' (confidence: {confidence:.2f})") # Log: Email classified await email_activity_logger.log_classified( email_id=email_data['id'], classification=classification, confidence=confidence, method='ai' if self.ai_enabled else 'keyword' ) # Update email_data for rule matching email_data['classification'] = classification email_data['confidence_score'] = confidence # Extract invoice details if classified as invoice if classification == 'invoice' and confidence >= settings.EMAIL_AI_CONFIDENCE_THRESHOLD: extraction = await self.analysis_service.extract_invoice_details(email_data) if extraction: await self._update_extracted_fields(email_data['id'], extraction) except Exception as e: logger.error(f"❌ Classification failed for email {email_data['id']}: {e}") async def _update_extracted_fields(self, email_id: int, extraction: Dict): """Update email with extracted invoice fields""" try: query = """ UPDATE email_messages SET extracted_invoice_number = %s, extracted_amount = %s, extracted_due_date = %s WHERE id = %s """ execute_query(query, ( extraction.get('invoice_number'), extraction.get('amount'), extraction.get('due_date'), email_id )) logger.info(f"✅ Updated extracted fields for email {email_id}") except Exception as e: logger.error(f"❌ Error updating extracted fields: {e}") async def _match_rules(self, email_data: Dict) -> bool: """ Match email against active rules and execute actions Returns True if rule was matched """ try: # Get active rules ordered by priority rules = self._get_active_rules() if not rules: return False for rule in rules: if self._rule_matches(email_data, rule): logger.info(f"✅ Email {email_data['id']} matched rule: {rule['name']}") # Update email with rule_id query = "UPDATE email_messages SET rule_id = %s WHERE id = %s" execute_query(query, (rule['id'], email_data['id'])) # Update rule statistics self._update_rule_stats(rule['id']) # Execute rule action (if auto-process enabled) if self.auto_process: await self._execute_rule_action(email_data, rule) else: logger.info(f"⏭️ Auto-process disabled - rule action not executed") return True # First matching rule wins return False except Exception as e: logger.error(f"❌ Error matching rules: {e}") return False def _get_active_rules(self) -> List[Dict]: """Get all enabled rules ordered by priority""" query = """ SELECT * FROM email_rules WHERE enabled = true ORDER BY priority ASC """ return execute_query(query) def _rule_matches(self, email_data: Dict, rule: Dict) -> bool: """ Check if email matches rule conditions Rule conditions format: {"sender_email": "x@y.com", "classification": "invoice", ...} """ try: conditions = rule['conditions'] # Check sender_email if 'sender_email' in conditions: if email_data.get('sender_email') != conditions['sender_email']: return False # Check sender_domain if 'sender_domain' in conditions: sender_email = email_data.get('sender_email', '') if '@' in sender_email: domain = sender_email.split('@')[1] if domain not in conditions['sender_domain']: return False else: return False # Check classification if 'classification' in conditions: if email_data.get('classification') != conditions['classification']: return False # Check subject_contains if 'subject_contains' in conditions: subject = email_data.get('subject', '').lower() keywords = conditions['subject_contains'] if isinstance(keywords, list): if not any(kw.lower() in subject for kw in keywords): return False elif isinstance(keywords, str): if keywords.lower() not in subject: return False # Check subject_regex (advanced pattern matching) if 'subject_regex' in conditions: import re subject = email_data.get('subject', '') pattern = conditions['subject_regex'] if not re.search(pattern, subject, re.IGNORECASE): return False # All conditions matched return True except Exception as e: logger.error(f"❌ Error checking rule conditions: {e}") return False async def _execute_rule_action(self, email_data: Dict, rule: Dict): """Execute rule action (link_supplier, create_purchase, link_case, etc.)""" try: action_type = rule['action_type'] action_params = rule.get('action_params', {}) logger.info(f"🚀 Executing rule action: {action_type} for email {email_data['id']}") if action_type == 'mark_spam': await self._mark_as_spam(email_data['id']) elif action_type == 'link_supplier': await self._link_to_supplier(email_data, action_params) elif action_type == 'link_customer': await self._link_to_customer(email_data, action_params) elif action_type == 'link_case': await self._link_to_case(email_data, action_params) elif action_type == 'create_purchase': logger.info(f"⏭️ Purchase creation not implemented yet") else: logger.warning(f"⚠️ Unknown action type: {action_type}") # Mark email as auto-processed query = "UPDATE email_messages SET auto_processed = true WHERE id = %s" execute_query(query, (email_data['id'],)) except Exception as e: logger.error(f"❌ Error executing rule action: {e}") async def _mark_as_spam(self, email_id: int): """Mark email as spam""" query = "UPDATE email_messages SET classification = 'spam', status = 'processed' WHERE id = %s" execute_query(query, (email_id,)) logger.info(f"✅ Marked email {email_id} as spam") async def _link_to_supplier(self, email_data: Dict, params: Dict): """Link email to supplier/vendor""" try: # Auto-match supplier by email domain if params.get('auto_match_domain'): sender_email = email_data.get('sender_email', '') if '@' in sender_email: domain = sender_email.split('@')[1] # Find vendor by domain query = """ SELECT id, name FROM vendors WHERE email LIKE %s OR contact_email LIKE %s LIMIT 1 """ result = execute_query(query, (f'%{domain}%', f'%{domain}%')) if result: vendor_id = result[0]['id'] vendor_name = result[0]['name'] # Link email to vendor query = "UPDATE email_messages SET supplier_id = %s WHERE id = %s" execute_query(query, (vendor_id, email_data['id'])) logger.info(f"✅ Linked email {email_data['id']} to vendor {vendor_name}") return # Manual supplier_id from params elif 'supplier_id' in params: vendor_id = params['supplier_id'] query = "UPDATE email_messages SET supplier_id = %s WHERE id = %s" execute_query(query, (vendor_id, email_data['id'])) logger.info(f"✅ Linked email {email_data['id']} to vendor {vendor_id}") except Exception as e: logger.error(f"❌ Error linking to supplier: {e}") async def _link_to_customer(self, email_data: Dict, params: Dict): """Link email to customer""" try: # Auto-match customer by email domain if params.get('auto_match_domain'): sender_email = email_data.get('sender_email', '') if '@' in sender_email: domain = sender_email.split('@')[1] # Find customer by domain query = """ SELECT id, customer_name FROM tmodule_customers WHERE email LIKE %s LIMIT 1 """ result = execute_query(query, (f'%{domain}%',)) if result: customer_id = result[0]['id'] customer_name = result[0]['customer_name'] # Link email to customer query = "UPDATE email_messages SET customer_id = %s WHERE id = %s" execute_query(query, (customer_id, email_data['id'])) logger.info(f"✅ Linked email {email_data['id']} to customer {customer_name}") except Exception as e: logger.error(f"❌ Error linking to customer: {e}") async def _link_to_case(self, email_data: Dict, params: Dict): """Link email to timetracking case""" try: # Extract case number from subject (e.g., "CC0042", "Case #123") if params.get('extract_case_from_subject'): import re subject = email_data.get('subject', '') # Match patterns like CC0001, CC0042, etc. match = re.search(r'CC(\d{4})', subject, re.IGNORECASE) if match: case_number = f"CC{match.group(1)}" # Find case by case_number query = """ SELECT id, title FROM tmodule_cases WHERE case_number = %s LIMIT 1 """ result = execute_query(query, (case_number,)) if result: case_id = result[0]['id'] case_title = result[0]['title'] # Link email to case query = "UPDATE email_messages SET linked_case_id = %s WHERE id = %s" execute_query(query, (case_id, email_data['id'])) logger.info(f"✅ Linked email {email_data['id']} to case {case_number}: {case_title}") return logger.info(f"⚠️ No case number found in subject: {subject}") except Exception as e: logger.error(f"❌ Error linking to case: {e}") def _update_rule_stats(self, rule_id: int): """Update rule match statistics""" query = """ UPDATE email_rules SET match_count = match_count + 1, last_matched_at = CURRENT_TIMESTAMP WHERE id = %s """ execute_query(query, (rule_id,)) async def reprocess_email(self, email_id: int): """Manually reprocess a single email""" try: # Get email from database query = "SELECT * FROM email_messages WHERE id = %s" result = execute_query(query, (email_id,)) if not result: logger.error(f"❌ Email {email_id} not found") return email_data = result[0] # Reclassify (either AI or keyword-based) if settings.EMAIL_AUTO_CLASSIFY: await self._classify_and_update(email_data) # Rematch rules if self.rules_enabled: await self._match_rules(email_data) logger.info(f"✅ Reprocessed email {email_id}") except Exception as e: logger.error(f"❌ Error reprocessing email {email_id}: {e}")