""" Email Analysis Service AI-powered email classification using Ollama LLM Adapted from OmniSync for BMC Hub timetracking use cases """ import logging import json from typing import Dict, Optional, List from datetime import datetime import aiohttp from app.core.config import settings from app.core.database import execute_query, execute_insert logger = logging.getLogger(__name__) class EmailAnalysisService: """AI-powered email analysis and classification using Ollama""" def __init__(self): self.ollama_endpoint = settings.OLLAMA_ENDPOINT self.ollama_model = settings.OLLAMA_MODEL self.confidence_threshold = settings.EMAIL_AI_CONFIDENCE_THRESHOLD async def classify_email(self, email_data: Dict) -> Dict: """ Classify email using AI into predefined categories Returns: {classification: str, confidence: float, reasoning: str} """ # Check cache first cached_result = self._get_cached_classification(email_data['id']) if cached_result: logger.info(f"✅ Using cached classification for email {email_data['id']}") return cached_result # Build system prompt (Danish for accuracy) system_prompt = self._build_classification_prompt() # Build user message with email content user_message = self._build_email_context(email_data) # Call Ollama result = await self._call_ollama(system_prompt, user_message) if result: # Save to cache await self._cache_classification(email_data['id'], result) return result else: # Fallback to unknown return { 'classification': 'unknown', 'confidence': 0.0, 'reasoning': 'AI classification failed' } def _build_classification_prompt(self) -> str: """Build Danish system prompt for email classification""" return """Classify this Danish business email into ONE category. Return ONLY valid JSON with no explanation. Categories: invoice, freight_note, order_confirmation, time_confirmation, case_notification, customer_email, bankruptcy, newsletter, general, spam, unknown Rules: - invoice: Contains invoice number, amount, or payment info - time_confirmation: Time/hours confirmation, often with case references - case_notification: Notifications about specific cases (CC0001, Case #123) - bankruptcy: Explicit bankruptcy/insolvency notice - newsletter: Info mails, marketing, campaigns, webinars, or non-critical updates (not spam) - Be conservative: Use general or unknown if uncertain Response format (JSON only, no other text): {"classification": "invoice", "confidence": 0.95, "reasoning": "Subject contains 'Faktura' and invoice number"} IMPORTANT: Return ONLY the JSON object. Do not include any explanation, thinking, or additional text.""" def _build_email_context(self, email_data: Dict) -> str: """Build email context for AI classification (email body only - fast)""" subject = email_data.get('subject', '') sender = email_data.get('sender_email', '') body = email_data.get('body_text', '') or email_data.get('body_html', '') # Truncate body to avoid token limits (keep first 2000 chars) if len(body) > 2000: body = body[:2000] + "... [truncated]" # Also note if PDF attachments exist (helps classification even without reading them) attachments = email_data.get('attachments', []) pdf_filenames = [a.get('filename', '') for a in attachments if a.get('filename', '').lower().endswith('.pdf')] attachment_note = '' if pdf_filenames: attachment_note = f"\n\nVedhæftede filer: {', '.join(pdf_filenames)}" context = f"""**Email Information:** From: {sender} Subject: {subject}{attachment_note} **Email Body:** {body} Klassificer denne email.""" return context def _extract_pdf_texts_from_attachments(self, email_data: Dict) -> List[str]: """Extract text from PDF attachments in email_data (in-memory bytes)""" pdf_texts = [] attachments = email_data.get('attachments', []) for att in attachments: filename = att.get('filename', '') if not filename.lower().endswith('.pdf'): continue content = att.get('content', b'') if not content: continue try: import pdfplumber import io with pdfplumber.open(io.BytesIO(content)) as pdf: pages = [] for page in pdf.pages: text = page.extract_text(layout=True, x_tolerance=2, y_tolerance=2) if text: pages.append(text) if pages: pdf_texts.append(f"=== PDF: {filename} ===\n" + "\n".join(pages)) logger.info(f"📄 Extracted PDF text from attachment {filename} ({len(pages)} pages)") except Exception as e: logger.warning(f"⚠️ Could not extract PDF text from {filename}: {e}") return pdf_texts def _get_attachment_texts_from_db(self, email_id: int) -> List[str]: """Fetch PDF attachment text from DB (content_data column) for already-saved emails""" from pathlib import Path pdf_texts = [] try: attachments = execute_query( """SELECT filename, content_data, file_path FROM email_attachments WHERE email_id = %s AND filename ILIKE '%.pdf'""", (email_id,) ) for att in (attachments or []): filename = att.get('filename', 'unknown.pdf') content = None # Prefer content_data (bytes in DB) if att.get('content_data'): content = bytes(att['content_data']) # Fallback: read from disk elif att.get('file_path'): fp = Path(att['file_path']) if fp.exists(): content = fp.read_bytes() if not content: continue try: import pdfplumber import io with pdfplumber.open(io.BytesIO(content)) as pdf: pages = [] for page in pdf.pages: text = page.extract_text(layout=True, x_tolerance=2, y_tolerance=2) if text: pages.append(text) if pages: pdf_texts.append(f"=== PDF: {filename} ===\n" + "\n".join(pages)) logger.info(f"📄 Extracted PDF text from DB for {filename} ({len(pages)} pages)") except Exception as e: logger.warning(f"⚠️ Could not extract PDF text for {filename} from DB: {e}") except Exception as e: logger.error(f"❌ Error fetching attachment texts from DB for email {email_id}: {e}") return pdf_texts def _build_invoice_extraction_context(self, email_data: Dict) -> str: """Build extraction context with PDF as PRIMARY data source. Email body/sender are ignored for invoice data — only the attached PDF counts. Sender can be a forwarder or external bookkeeper, not the actual vendor. """ subject = email_data.get('subject', '') body = email_data.get('body_text', '') or '' # Keep body brief — it's secondary context at best if len(body) > 300: body = body[:300] + "..." # 1. Try in-memory attachment bytes first (during initial fetch) pdf_texts = self._extract_pdf_texts_from_attachments(email_data) # 2. Fallback: load from DB for already-processed emails if not pdf_texts and email_data.get('id'): pdf_texts = self._get_attachment_texts_from_db(email_data['id']) if pdf_texts: pdf_section = "\n\n".join(pdf_texts) return f"""VEDHÆFTET FAKTURA (primær datakilde - analyser grundigt): {pdf_section} --- Email emne: {subject} Email tekst (sekundær): {body} VIGTIGT: Udtrækket SKAL baseres på PDF-indholdet ovenfor. Afsenderens email-adresse er IKKE leverandøren — leverandøren fremgår af fakturaen.""" else: # No PDF found — fall back to email body logger.warning(f"⚠️ No PDF attachment found for email {email_data.get('id')} — using email body only") body_full = email_data.get('body_text', '') or email_data.get('body_html', '') or '' if len(body_full) > 3000: body_full = body_full[:3000] + "..." return f"""Email emne: {subject} Email tekst: {body_full} Ingen PDF vedhæftet — udtræk fakturadata fra email-teksten.""" async def _call_ollama(self, system_prompt: str, user_message: str) -> Optional[Dict]: """Call Ollama API for classification""" url = f"{self.ollama_endpoint}/api/chat" payload = { "model": self.ollama_model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_message} ], "stream": False, "options": { "temperature": 0.1, # Low temperature for consistent classification "num_predict": 500 # Enough for complete JSON response } } try: start_time = datetime.now() async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=30)) as response: if response.status != 200: error_text = await response.text() logger.error(f"❌ Ollama API error: {response.status} - {error_text}") return None data = await response.json() message_data = data.get('message', {}) # qwen3 model returns 'thinking' field instead of 'content' for reasoning # Try both fields content = message_data.get('content', '') or message_data.get('thinking', '') processing_time = (datetime.now() - start_time).total_seconds() * 1000 if not content: logger.error(f"❌ Ollama returned empty response. Message keys: {message_data.keys()}") return None # Parse JSON response result = self._parse_ollama_response(content) if result: result['processing_time_ms'] = int(processing_time) logger.info(f"✅ AI classification: {result['classification']} (confidence: {result['confidence']}, {processing_time:.0f}ms)") return result else: logger.error(f"❌ Failed to parse Ollama response. Content length: {len(content)}, First 300 chars: {content[:300]}") return None except asyncio.TimeoutError: logger.error("❌ Ollama API timeout (30s)") return None except Exception as e: logger.error(f"❌ Error calling Ollama API: {e}") return None def _parse_ollama_response(self, content: str) -> Optional[Dict]: """Parse Ollama JSON response""" try: # Extract JSON from response (handle markdown code blocks) if '```json' in content: start = content.find('```json') + 7 end = content.find('```', start) json_str = content[start:end].strip() elif '```' in content: start = content.find('```') + 3 end = content.find('```', start) json_str = content[start:end].strip() else: json_str = content.strip() # Parse JSON data = json.loads(json_str) # Validate required fields if 'classification' not in data: logger.error("❌ Missing 'classification' field in AI response") return None # Normalize and validate classification = data['classification'].lower() confidence = float(data.get('confidence', 0.0)) reasoning = data.get('reasoning', '') # Validate classification category valid_categories = [ 'invoice', 'freight_note', 'order_confirmation', 'time_confirmation', 'case_notification', 'customer_email', 'bankruptcy', 'general', 'spam', 'unknown' ] if classification not in valid_categories: logger.warning(f"⚠️ Unknown classification '{classification}', defaulting to 'unknown'") classification = 'unknown' return { 'classification': classification, 'confidence': confidence, 'reasoning': reasoning } except json.JSONDecodeError as e: logger.error(f"❌ JSON parse error: {e} - Content: {content[:200]}") return None except Exception as e: logger.error(f"❌ Error parsing Ollama response: {e}") return None def _get_cached_classification(self, email_id: int) -> Optional[Dict]: """Get cached classification from database""" query = """ SELECT result_json, confidence_score, processing_time_ms FROM email_analysis WHERE email_id = %s AND analysis_type = 'classification' """ result = execute_query(query, (email_id,)) if result: row = result[0] return { 'classification': row['result_json'].get('classification'), 'confidence': float(row['confidence_score']), 'reasoning': row['result_json'].get('reasoning', ''), 'processing_time_ms': row['processing_time_ms'], 'cached': True } return None async def _cache_classification(self, email_id: int, result: Dict): """Save classification result to cache""" try: query = """ INSERT INTO email_analysis (email_id, analysis_type, result_json, confidence_score, model_used, processing_time_ms) VALUES (%s, 'classification', %s, %s, %s, %s) ON CONFLICT (email_id, analysis_type) DO UPDATE SET result_json = EXCLUDED.result_json, confidence_score = EXCLUDED.confidence_score, processing_time_ms = EXCLUDED.processing_time_ms, created_at = CURRENT_TIMESTAMP """ result_json = json.dumps({ 'classification': result['classification'], 'reasoning': result.get('reasoning', '') }) execute_query(query, ( email_id, result_json, result['confidence'], self.ollama_model, result.get('processing_time_ms', 0) )) logger.info(f"✅ Cached classification for email {email_id}") except Exception as e: logger.error(f"❌ Error caching classification: {e}") async def extract_invoice_details(self, email_data: Dict) -> Optional[Dict]: """ Extract structured data from invoice emails Returns: {invoice_number, amount, due_date, vendor_name, ...} """ # Only run for invoice-classified emails if email_data.get('classification') != 'invoice': return None # Check cache cached_result = self._get_cached_extraction(email_data['id']) if cached_result: logger.info(f"✅ Using cached extraction for email {email_data['id']}") return cached_result # Build extraction prompt — use PDF-first context, not email sender system_prompt = self._build_extraction_prompt() user_message = self._build_invoice_extraction_context(email_data) # Call Ollama result = await self._call_ollama_extraction(system_prompt, user_message) if result: # Save to cache await self._cache_extraction(email_data['id'], result) return result return None def _build_extraction_prompt(self) -> str: """Build comprehensive Danish system prompt for deep invoice data extraction.""" from app.core.config import settings as cfg own_cvr = getattr(cfg, 'OWN_CVR', '') return f"""Du er en ekspert i at læse og udtrække strukturerede data fra danske fakturaer og kreditnotaer. DU SKAL ANALYSERE SELVE FAKTURAEN (PDF-indholdet) - IKKE email-afsenderen. Afsender kan være os selv der videresender, eller en ekstern bogholder - IGNORER afsender. Leverandørens navn og CVR fremgår ALTID af selve fakturadokumentet. VIGTIGE REGLER: 1. Returner KUN gyldig JSON - ingen forklaring eller ekstra tekst 2. Hvis et felt ikke findes, sæt det til null 3. Datoer skal være i format YYYY-MM-DD 4. DANSKE PRISFORMATER: - Tusind-separator: . (punkt) eller mellemrum: "5.965,18" eller "5 965,18" - Decimal-separator: , (komma): "1.234,56 kr" - I JSON: brug . (punkt) som decimal: 1234.56 - Eksempel: "5.965,18 kr" → 5965.18 5. CVR-nummer: 8 cifre uden mellemrum - IGNORER CVR {own_cvr} — det er VORES eget CVR (køber), ikke leverandørens! - Find LEVERANDØRENS CVR i toppen/headeren af fakturaen 6. DOKUMENTTYPE: - "invoice" = Almindelig faktura - "credit_note" = Kreditnota (Kreditnota, Refusion, Tilbagebetaling, Credit Note) 7. Varelinjer: udtræk ALLE linjer med beskrivelse, antal, enhedspris, total JSON STRUKTUR: {{ "document_type": "invoice" eller "credit_note", "invoice_number": "fakturanummer", "vendor_name": "leverandørens firmanavn", "vendor_cvr": "12345678", "invoice_date": "YYYY-MM-DD", "due_date": "YYYY-MM-DD", "currency": "DKK", "total_amount": 1234.56, "vat_amount": 246.91, "net_amount": 987.65, "order_number": "ordrenummer eller null", "original_invoice_reference": "ref til original faktura (kun kreditnotaer) eller null", "lines": [ {{ "line_number": 1, "description": "varebeskrivelse", "quantity": 2.0, "unit_price": 500.00, "line_total": 1000.00, "vat_rate": 25.00, "vat_amount": 250.00 }} ], "confidence": 0.95 }} Returner KUN JSON - ingen anden tekst.""" async def _call_ollama_extraction(self, system_prompt: str, user_message: str) -> Optional[Dict]: """Call Ollama for data extraction""" url = f"{self.ollama_endpoint}/api/chat" payload = { "model": self.ollama_model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_message} ], "stream": False, "format": "json", "options": { "temperature": 0.0, # Deterministic extraction "num_predict": 3000 # Enough for full invoice with many lines } } try: async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=120)) as response: if response.status != 200: return None data = await response.json() msg = data.get('message', {}) # qwen3 sometimes returns content in 'thinking' field content = msg.get('content', '') or msg.get('thinking', '') # Parse JSON response result = self._parse_extraction_response(content) return result except Exception as e: logger.error(f"❌ Error calling Ollama for extraction: {e}") return None def _parse_extraction_response(self, content: str) -> Optional[Dict]: """Parse Ollama extraction JSON response""" try: # Extract JSON if '```json' in content: start = content.find('```json') + 7 end = content.find('```', start) json_str = content[start:end].strip() elif '```' in content: start = content.find('```') + 3 end = content.find('```', start) json_str = content[start:end].strip() else: json_str = content.strip() data = json.loads(json_str) return data except Exception as e: logger.error(f"❌ Error parsing extraction response: {e}") return None def _get_cached_extraction(self, email_id: int) -> Optional[Dict]: """Get cached extraction from database""" query = """ SELECT result_json FROM email_analysis WHERE email_id = %s AND analysis_type = 'extraction' """ result = execute_query(query, (email_id,)) if result: return result[0]['result_json'] return None async def _cache_extraction(self, email_id: int, result: Dict): """Save extraction result to cache""" try: query = """ INSERT INTO email_analysis (email_id, analysis_type, result_json, model_used) VALUES (%s, 'extraction', %s, %s) ON CONFLICT (email_id, analysis_type) DO UPDATE SET result_json = EXCLUDED.result_json """ result_json = json.dumps(result) execute_query(query, (email_id, result_json, self.ollama_model)) logger.info(f"✅ Cached extraction for email {email_id}") except Exception as e: logger.error(f"❌ Error caching extraction: {e}")