""" Email Processor Service Main orchestrator for email workflow: Fetch → Store → Classify → Match Rules → Link Entities Based on OmniSync architecture adapted for BMC Hub """ import logging import re from typing import List, Dict, Optional from datetime import datetime from app.services.email_service import EmailService from app.services.email_analysis_service import EmailAnalysisService from app.services.transcription_service import TranscriptionService from app.services.simple_classifier import simple_classifier from app.services.email_workflow_service import email_workflow_service from app.services.email_activity_logger import email_activity_logger from app.core.config import settings from app.core.database import execute_query, execute_update, execute_insert logger = logging.getLogger(__name__) class EmailProcessorService: """Main orchestrator for email processing pipeline""" def __init__(self): self.email_service = EmailService() self.analysis_service = EmailAnalysisService() self.transcription_service = TranscriptionService() self.enabled = settings.EMAIL_TO_TICKET_ENABLED self.rules_enabled = settings.EMAIL_RULES_ENABLED self.auto_process = settings.EMAIL_RULES_AUTO_PROCESS self.ai_enabled = settings.EMAIL_AI_ENABLED async def process_inbox(self) -> Dict: """ Main entry point: Process all new emails from inbox Returns: Processing statistics """ if not self.enabled: logger.info("⏭️ Email processing disabled (EMAIL_TO_TICKET_ENABLED=false)") return {'status': 'disabled'} logger.info("🔄 Starting email processing cycle...") stats = { 'fetched': 0, 'saved': 0, 'classified': 0, 'rules_matched': 0, 'errors': 0 } try: # Step 1: Fetch new emails limit = settings.EMAIL_MAX_FETCH_PER_RUN new_emails = await self.email_service.fetch_new_emails(limit=limit) stats['fetched'] = len(new_emails) if not new_emails: logger.info("✅ No new emails to process") return stats logger.info(f"📥 Fetched {len(new_emails)} new emails") # Step 2: Save emails to database for email_data in new_emails: try: email_id = await self.email_service.save_email(email_data) if email_id: email_data['id'] = email_id stats['saved'] += 1 # Log: Email fetched and saved await email_activity_logger.log_fetched( email_id=email_id, source='email_server', message_id=email_data.get('message_id', 'unknown') ) # Step 3-5: Process the single email result = await self.process_single_email(email_data) if result.get('classified'): stats['classified'] += 1 if result.get('rules_matched'): stats['rules_matched'] += 1 except Exception as e: logger.error(f"❌ Error processing email: {e}") stats['errors'] += 1 logger.info(f"✅ Email processing complete: {stats}") return stats except Exception as e: logger.error(f"❌ Email processing failed: {e}") stats['errors'] += 1 return stats async def process_single_email(self, email_data: Dict) -> Dict: """ Process a single email: Classify -> Workflow -> Rules Can be used by process_inbox (new emails) or bulk_reprocess (existing emails) """ email_id = email_data.get('id') stats = { 'classified': False, 'workflows_executed': 0, 'rules_matched': False } try: # Step 2.5: Detect and transcribe audio attachments # This is done BEFORE classification so the AI can "read" the voice note if settings.WHISPER_ENABLED: await self._process_attachments_for_transcription(email_data) # Step 3: Classify email (AI or Keyword) if settings.EMAIL_AUTO_CLASSIFY: await self._classify_and_update(email_data) stats['classified'] = True # Step 4: Execute workflows based on classification workflow_processed = False if hasattr(settings, 'EMAIL_WORKFLOWS_ENABLED') and settings.EMAIL_WORKFLOWS_ENABLED: workflow_result = await email_workflow_service.execute_workflows(email_data) executed_count = workflow_result.get('workflows_executed', 0) stats['workflows_executed'] = executed_count if executed_count > 0: logger.info(f"✅ Executed {executed_count} workflow(s) for email {email_id}") # Mark as workflow-processed to avoid duplicate rule execution if workflow_result.get('workflows_succeeded', 0) > 0: workflow_processed = True email_data['_workflow_processed'] = True # Definition: A processed email is one that is classified and workflow run # Mark as 'processed' and move to 'Processed' folder logger.info(f"✅ Auto-marking email {email_id} as processed (Workflow executed)") execute_update( """UPDATE email_messages SET status = 'processed', folder = 'Processed', processed_at = CURRENT_TIMESTAMP, auto_processed = true WHERE id = %s""", (email_id,) ) # Step 5: Match against rules (legacy support) - skip if workflow already processed if self.rules_enabled and not workflow_processed: # Check if workflow already processed this email (double check DB) existing_execution = execute_query( "SELECT id FROM email_workflow_executions WHERE email_id = %s AND status = 'completed' LIMIT 1", (email_data['id'],)) if existing_execution: logger.info(f"⏭️ Email {email_id} already processed by workflow, skipping rules") else: matched = await self._match_rules(email_data) if matched: stats['rules_matched'] = True elif workflow_processed: logger.info(f"⏭️ Email {email_id} processed by workflow, skipping rules (coordination)") return stats except Exception as e: logger.error(f"❌ Error in process_single_email for {email_id}: {e}") raise e async def _classify_and_update(self, email_data: Dict): """Classify email and update database""" try: logger.info(f"🔍 _classify_and_update: ai_enabled={self.ai_enabled}, EMAIL_AI_ENABLED={settings.EMAIL_AI_ENABLED}") # 1. Always start with Simple Keyword Classification (fast, free, deterministic) logger.info(f"🔍 Running keyword classification for email {email_data['id']}") result = simple_classifier.classify(email_data) classification = result.get('classification', 'unknown') confidence = result.get('confidence', 0.0) # 2. Use AI if keyword analysis is weak or inconclusive # Trigger if: 'general' OR 'unknown' OR confidence is low (< 0.70) should_use_ai = (classification in ['general', 'unknown'] or confidence < 0.70) if should_use_ai and self.ai_enabled: logger.info(f"🤖 Escalating to AI analysis (Reason: '{classification}' with confidence {confidence})") ai_result = await self.analysis_service.classify_email(email_data) # Update result if AI returns valid data if ai_result: result = ai_result logger.info(f"✅ AI re-classified as '{result.get('classification')}'") classification = result.get('classification', 'unknown') confidence = result.get('confidence', 0.0) # Update email record query = """ UPDATE email_messages SET classification = %s, confidence_score = %s, classification_date = CURRENT_TIMESTAMP WHERE id = %s """ execute_update(query, (classification, confidence, email_data['id'])) # Update email_data dict with classification for workflow execution email_data['classification'] = classification email_data['confidence_score'] = confidence logger.info(f"✅ Classified email {email_data['id']} as '{classification}' (confidence: {confidence:.2f})") # Log: Email classified await email_activity_logger.log_classified( email_id=email_data['id'], classification=classification, confidence=confidence, method='ai' if self.ai_enabled else 'keyword' ) # Update email_data for rule matching email_data['classification'] = classification email_data['confidence_score'] = confidence # Extract invoice details if classified as invoice if classification == 'invoice' and confidence >= settings.EMAIL_AI_CONFIDENCE_THRESHOLD: extraction = await self.analysis_service.extract_invoice_details(email_data) if extraction: await self._update_extracted_fields(email_data['id'], extraction) except Exception as e: logger.error(f"❌ Classification failed for email {email_data['id']}: {e}") async def _update_extracted_fields(self, email_id: int, extraction: Dict): """Update email with extracted invoice fields (from PDF attachment analysis)""" try: # Normalize amount field (new extraction uses total_amount, old used amount) amount = extraction.get('total_amount') or extraction.get('amount') query = """ UPDATE email_messages SET extracted_invoice_number = %s, extracted_amount = %s, extracted_due_date = %s, extracted_vendor_name = %s, extracted_vendor_cvr = %s, extracted_invoice_date = %s WHERE id = %s """ execute_query(query, ( extraction.get('invoice_number'), amount, extraction.get('due_date'), extraction.get('vendor_name'), extraction.get('vendor_cvr'), extraction.get('invoice_date'), email_id )) logger.info( f"✅ Updated extracted fields for email {email_id}: " f"invoice={extraction.get('invoice_number')}, " f"vendor={extraction.get('vendor_name')}, " f"cvr={extraction.get('vendor_cvr')}, " f"amount={amount}" ) except Exception as e: logger.error(f"❌ Error updating extracted fields: {e}") async def _match_rules(self, email_data: Dict) -> bool: """ Match email against active rules and execute actions Returns True if rule was matched """ try: # Get active rules ordered by priority rules = self._get_active_rules() if not rules: return False for rule in rules: if self._rule_matches(email_data, rule): logger.info(f"✅ Email {email_data['id']} matched rule: {rule['name']}") # Update email with rule_id query = "UPDATE email_messages SET rule_id = %s WHERE id = %s" execute_query(query, (rule['id'], email_data['id'])) # Update rule statistics self._update_rule_stats(rule['id']) # Execute rule action (if auto-process enabled) if self.auto_process: await self._execute_rule_action(email_data, rule) else: logger.info(f"⏭️ Auto-process disabled - rule action not executed") return True # First matching rule wins return False except Exception as e: logger.error(f"❌ Error matching rules: {e}") return False def _get_active_rules(self) -> List[Dict]: """Get all enabled rules ordered by priority""" query = """ SELECT * FROM email_rules WHERE enabled = true ORDER BY priority ASC """ return execute_query(query) def _rule_matches(self, email_data: Dict, rule: Dict) -> bool: """ Check if email matches rule conditions Rule conditions format: {"sender_email": "x@y.com", "classification": "invoice", ...} """ try: conditions = rule['conditions'] # Check sender_email if 'sender_email' in conditions: if email_data.get('sender_email') != conditions['sender_email']: return False # Check sender_domain if 'sender_domain' in conditions: sender_email = email_data.get('sender_email', '') if '@' in sender_email: domain = sender_email.split('@')[1] if domain not in conditions['sender_domain']: return False else: return False # Check classification if 'classification' in conditions: if email_data.get('classification') != conditions['classification']: return False # Check subject_contains if 'subject_contains' in conditions: subject = email_data.get('subject', '').lower() keywords = conditions['subject_contains'] if isinstance(keywords, list): if not any(kw.lower() in subject for kw in keywords): return False elif isinstance(keywords, str): if keywords.lower() not in subject: return False # Check subject_regex (advanced pattern matching) if 'subject_regex' in conditions: import re subject = email_data.get('subject', '') pattern = conditions['subject_regex'] if not re.search(pattern, subject, re.IGNORECASE): return False # All conditions matched return True except Exception as e: logger.error(f"❌ Error checking rule conditions: {e}") return False async def _execute_rule_action(self, email_data: Dict, rule: Dict): """Execute rule action (link_supplier, create_purchase, link_case, etc.)""" try: action_type = rule['action_type'] action_params = rule.get('action_params', {}) logger.info(f"🚀 Executing rule action: {action_type} for email {email_data['id']}") if action_type == 'mark_spam': await self._mark_as_spam(email_data['id']) elif action_type == 'link_supplier': await self._link_to_supplier(email_data, action_params) elif action_type == 'link_customer': await self._link_to_customer(email_data, action_params) elif action_type == 'link_case': await self._link_to_case(email_data, action_params) elif action_type == 'create_purchase': logger.info(f"⏭️ Purchase creation not implemented yet") else: logger.warning(f"⚠️ Unknown action type: {action_type}") # Mark email as auto-processed query = "UPDATE email_messages SET auto_processed = true WHERE id = %s" execute_query(query, (email_data['id'],)) except Exception as e: logger.error(f"❌ Error executing rule action: {e}") async def _mark_as_spam(self, email_id: int): """Mark email as spam""" query = "UPDATE email_messages SET classification = 'spam', status = 'processed' WHERE id = %s" execute_query(query, (email_id,)) logger.info(f"✅ Marked email {email_id} as spam") async def _link_to_supplier(self, email_data: Dict, params: Dict): """Link email to supplier/vendor""" try: # Auto-match supplier by email domain if params.get('auto_match_domain'): sender_email = email_data.get('sender_email', '') if '@' in sender_email: domain = sender_email.split('@')[1] # Find vendor by domain query = """ SELECT id, name FROM vendors WHERE email LIKE %s OR contact_email LIKE %s LIMIT 1 """ result = execute_query(query, (f'%{domain}%', f'%{domain}%')) if result: vendor_id = result[0]['id'] vendor_name = result[0]['name'] # Link email to vendor query = "UPDATE email_messages SET supplier_id = %s WHERE id = %s" execute_query(query, (vendor_id, email_data['id'])) logger.info(f"✅ Linked email {email_data['id']} to vendor {vendor_name}") return # Manual supplier_id from params elif 'supplier_id' in params: vendor_id = params['supplier_id'] query = "UPDATE email_messages SET supplier_id = %s WHERE id = %s" execute_query(query, (vendor_id, email_data['id'])) logger.info(f"✅ Linked email {email_data['id']} to vendor {vendor_id}") except Exception as e: logger.error(f"❌ Error linking to supplier: {e}") async def _link_to_customer(self, email_data: Dict, params: Dict): """Link email to customer""" try: # Auto-match customer by email domain if params.get('auto_match_domain'): sender_email = email_data.get('sender_email', '') if '@' in sender_email: domain = sender_email.split('@')[1] # Find customer by domain query = """ SELECT id, customer_name FROM tmodule_customers WHERE email LIKE %s LIMIT 1 """ result = execute_query(query, (f'%{domain}%',)) if result: customer_id = result[0]['id'] customer_name = result[0]['customer_name'] # Link email to customer query = "UPDATE email_messages SET customer_id = %s WHERE id = %s" execute_query(query, (customer_id, email_data['id'])) logger.info(f"✅ Linked email {email_data['id']} to customer {customer_name}") except Exception as e: logger.error(f"❌ Error linking to customer: {e}") async def _link_to_case(self, email_data: Dict, params: Dict): """Link email to timetracking case""" try: # Extract case number from subject (e.g., "CC0042", "Case #123") if params.get('extract_case_from_subject'): import re subject = email_data.get('subject', '') # Match patterns like CC0001, CC0042, etc. match = re.search(r'CC(\d{4})', subject, re.IGNORECASE) if match: case_number = f"CC{match.group(1)}" # Find case by case_number query = """ SELECT id, title FROM tmodule_cases WHERE case_number = %s LIMIT 1 """ result = execute_query(query, (case_number,)) if result: case_id = result[0]['id'] case_title = result[0]['title'] # Link email to case query = "UPDATE email_messages SET linked_case_id = %s WHERE id = %s" execute_query(query, (case_id, email_data['id'])) logger.info(f"✅ Linked email {email_data['id']} to case {case_number}: {case_title}") return logger.info(f"⚠️ No case number found in subject: {subject}") except Exception as e: logger.error(f"❌ Error linking to case: {e}") def _update_rule_stats(self, rule_id: int): """Update rule match statistics""" query = """ UPDATE email_rules SET match_count = match_count + 1, last_matched_at = CURRENT_TIMESTAMP WHERE id = %s """ execute_query(query, (rule_id,)) async def _process_attachments_for_transcription(self, email_data: Dict) -> None: """ Scan attachments for audio files, transcribe them, and enrich email body. Also creates 'conversations' record. """ attachments = email_data.get('attachments', []) if not attachments: return import hashlib from pathlib import Path transcripts = [] for att in attachments: filename = att.get('filename', '') content = att.get('content') # Simple check, TranscriptionService does the real check ext = Path(filename).suffix.lower() if ext in settings.WHISPER_SUPPORTED_FORMATS: transcript = await self.transcription_service.transcribe_audio(filename, content) if transcript: transcripts.append(f"--- TRANSKRIBERET LYDFIL ({filename}) ---\n{transcript}\n----------------------------------") # Create conversation record (ALWAYS for supported audio, even if transcription fails) try: # Reconstruct path - mirroring EmailService._save_attachments logic md5_hash = hashlib.md5(content).hexdigest() # Default path in EmailService is "uploads/email_attachments" file_path = f"uploads/email_attachments/{md5_hash}_{filename}" # Determine Title from Subject if possible title = f"Email Attachment: {filename}" subject = email_data.get('subject', '') # Pattern: "Optagelse af samtale(n) mellem 204 og 209" # Handles both "samtale" and "samtalen", case insensitive match = re.search(r'Optagelse af samtalen?\s+mellem\s+(\S+)\s+og\s+(\S+)', subject, re.IGNORECASE) if match: num1 = match.group(1) num2 = match.group(2) title = f"Samtale: {num1} ↔ {num2}" # Generate Summary summary = None try: from app.services.ollama_service import ollama_service if transcript: logger.info("🧠 Generating conversation summary...") summary = await ollama_service.generate_summary(transcript) except Exception as e: logger.error(f"⚠️ Failed to generate summary: {e}") # Determine user_id (optional, maybe from sender if internal?) # For now, create as system/unassigned query = """ INSERT INTO conversations (title, transcript, summary, audio_file_path, source, email_message_id, created_at) VALUES (%s, %s, %s, %s, 'email', %s, CURRENT_TIMESTAMP) RETURNING id """ conversation_id = execute_insert(query, ( title, transcript, summary, file_path, email_data.get('id') )) # Try to link to customer if we already know them? # ACTUALLY: We are BEFORE classification/domain matching. # Ideally, we should link later. # BUT, we can store the 'email_id' if we had a column. # I didn't add 'email_id' to conversations table. # I added customer_id and ticket_id. # Since this runs BEFORE those links are established, the conversation will be orphaned initially. # We could improve this by updating the conversation AFTER Step 5 (customer linking). # Or, simplified: The transcribed text is in the email body. # When the email is converted to a Ticket, the text follows. # But the 'Conversation' record is separate. logger.info(f"✅ Created conversation record {conversation_id} for {filename}") except Exception as e: logger.error(f"❌ Failed to create conversation record: {e}") if transcripts: # Append to body full_transcript_text = "\n\n" + "\n\n".join(transcripts) if 'body' in email_data: email_data['body'] += full_transcript_text # Also update body_text if it exists (often used for analysis) if 'body_text' in email_data and email_data['body_text']: email_data['body_text'] += full_transcript_text logger.info(f"✅ Enriched email {email_data.get('id')} with {len(transcripts)} transcription(s)") async def reprocess_email(self, email_id: int): """Manually reprocess a single email""" try: # Get email from database query = "SELECT * FROM email_messages WHERE id = %s" result = execute_query(query, (email_id,)) if not result: logger.error(f"❌ Email {email_id} not found") return email_data = result[0] # Fetch attachments from DB to allow transcription on reprocess query_att = "SELECT * FROM email_attachments WHERE email_id = %s" atts = execute_query(query_att, (email_id,)) loaded_atts = [] if atts: from pathlib import Path for att in atts: # 'file_path' is in DB fpath = att.get('file_path') if fpath: try: # If path is relative to cwd path_obj = Path(fpath) if path_obj.exists(): att['content'] = path_obj.read_bytes() loaded_atts.append(att) logger.info(f"📎 Loaded attachment content for reprocess: {att['filename']}") except Exception as e: logger.error(f"❌ Could not verify/load attachment {fpath}: {e}") email_data['attachments'] = loaded_atts # Run Transcription (Step 2.5 equivalent) if settings.WHISPER_ENABLED and loaded_atts: await self._process_attachments_for_transcription(email_data) # Reclassify (either AI or keyword-based) if settings.EMAIL_AUTO_CLASSIFY: await self._classify_and_update(email_data) # Rematch rules if self.rules_enabled: await self._match_rules(email_data) logger.info(f"✅ Reprocessed email {email_id}") except Exception as e: logger.error(f"❌ Error reprocessing email {email_id}: {e}")