""" Email Analysis Service AI-powered email classification using Ollama LLM Adapted from OmniSync for BMC Hub timetracking use cases """ import logging import json from typing import Dict, Optional, List from datetime import datetime import aiohttp from app.core.config import settings from app.core.database import execute_query, execute_insert logger = logging.getLogger(__name__) class EmailAnalysisService: """AI-powered email analysis and classification using Ollama""" def __init__(self): self.ollama_endpoint = settings.OLLAMA_ENDPOINT self.ollama_model = settings.OLLAMA_MODEL self.confidence_threshold = settings.EMAIL_AI_CONFIDENCE_THRESHOLD async def classify_email(self, email_data: Dict) -> Dict: """ Classify email using AI into predefined categories Returns: {classification: str, confidence: float, reasoning: str} """ # Check cache first cached_result = self._get_cached_classification(email_data['id']) if cached_result: logger.info(f"✅ Using cached classification for email {email_data['id']}") return cached_result # Build system prompt (Danish for accuracy) system_prompt = self._build_classification_prompt() # Build user message with email content user_message = self._build_email_context(email_data) # Call Ollama result = await self._call_ollama(system_prompt, user_message) if result: # Save to cache await self._cache_classification(email_data['id'], result) return result else: # Fallback to unknown return { 'classification': 'unknown', 'confidence': 0.0, 'reasoning': 'AI classification failed' } def _build_classification_prompt(self) -> str: """Build Danish system prompt for email classification""" return """Classify this Danish business email into ONE category. Return ONLY valid JSON with no explanation. Categories: invoice, freight_note, order_confirmation, time_confirmation, case_notification, customer_email, bankruptcy, general, spam, unknown Rules: - invoice: Contains invoice number, amount, or payment info - time_confirmation: Time/hours confirmation, often with case references - case_notification: Notifications about specific cases (CC0001, Case #123) - bankruptcy: Explicit bankruptcy/insolvency notice - Be conservative: Use general or unknown if uncertain Response format (JSON only, no other text): {"classification": "invoice", "confidence": 0.95, "reasoning": "Subject contains 'Faktura' and invoice number"} IMPORTANT: Return ONLY the JSON object. Do not include any explanation, thinking, or additional text.""" def _build_email_context(self, email_data: Dict) -> str: """Build email context for AI analysis""" subject = email_data.get('subject', '') sender = email_data.get('sender_email', '') body = email_data.get('body_text', '') or email_data.get('body_html', '') # Truncate body to avoid token limits (keep first 2000 chars) if len(body) > 2000: body = body[:2000] + "... [truncated]" context = f"""**Email Information:** From: {sender} Subject: {subject} **Email Body:** {body} Klassificer denne email.""" return context async def _call_ollama(self, system_prompt: str, user_message: str) -> Optional[Dict]: """Call Ollama API for classification""" url = f"{self.ollama_endpoint}/api/chat" payload = { "model": self.ollama_model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_message} ], "stream": False, "options": { "temperature": 0.1, # Low temperature for consistent classification "num_predict": 500 # Enough for complete JSON response } } try: start_time = datetime.now() async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=30)) as response: if response.status != 200: error_text = await response.text() logger.error(f"❌ Ollama API error: {response.status} - {error_text}") return None data = await response.json() message_data = data.get('message', {}) # qwen3 model returns 'thinking' field instead of 'content' for reasoning # Try both fields content = message_data.get('content', '') or message_data.get('thinking', '') processing_time = (datetime.now() - start_time).total_seconds() * 1000 if not content: logger.error(f"❌ Ollama returned empty response. Message keys: {message_data.keys()}") return None # Parse JSON response result = self._parse_ollama_response(content) if result: result['processing_time_ms'] = int(processing_time) logger.info(f"✅ AI classification: {result['classification']} (confidence: {result['confidence']}, {processing_time:.0f}ms)") return result else: logger.error(f"❌ Failed to parse Ollama response. Content length: {len(content)}, First 300 chars: {content[:300]}") return None except asyncio.TimeoutError: logger.error("❌ Ollama API timeout (30s)") return None except Exception as e: logger.error(f"❌ Error calling Ollama API: {e}") return None def _parse_ollama_response(self, content: str) -> Optional[Dict]: """Parse Ollama JSON response""" try: # Extract JSON from response (handle markdown code blocks) if '```json' in content: start = content.find('```json') + 7 end = content.find('```', start) json_str = content[start:end].strip() elif '```' in content: start = content.find('```') + 3 end = content.find('```', start) json_str = content[start:end].strip() else: json_str = content.strip() # Parse JSON data = json.loads(json_str) # Validate required fields if 'classification' not in data: logger.error("❌ Missing 'classification' field in AI response") return None # Normalize and validate classification = data['classification'].lower() confidence = float(data.get('confidence', 0.0)) reasoning = data.get('reasoning', '') # Validate classification category valid_categories = [ 'invoice', 'freight_note', 'order_confirmation', 'time_confirmation', 'case_notification', 'customer_email', 'bankruptcy', 'general', 'spam', 'unknown' ] if classification not in valid_categories: logger.warning(f"⚠️ Unknown classification '{classification}', defaulting to 'unknown'") classification = 'unknown' return { 'classification': classification, 'confidence': confidence, 'reasoning': reasoning } except json.JSONDecodeError as e: logger.error(f"❌ JSON parse error: {e} - Content: {content[:200]}") return None except Exception as e: logger.error(f"❌ Error parsing Ollama response: {e}") return None def _get_cached_classification(self, email_id: int) -> Optional[Dict]: """Get cached classification from database""" query = """ SELECT result_json, confidence_score, processing_time_ms FROM email_analysis WHERE email_id = %s AND analysis_type = 'classification' """ result = execute_query(query, (email_id,)) if result: row = result[0] return { 'classification': row['result_json'].get('classification'), 'confidence': float(row['confidence_score']), 'reasoning': row['result_json'].get('reasoning', ''), 'processing_time_ms': row['processing_time_ms'], 'cached': True } return None async def _cache_classification(self, email_id: int, result: Dict): """Save classification result to cache""" try: query = """ INSERT INTO email_analysis (email_id, analysis_type, result_json, confidence_score, model_used, processing_time_ms) VALUES (%s, 'classification', %s, %s, %s, %s) ON CONFLICT (email_id, analysis_type) DO UPDATE SET result_json = EXCLUDED.result_json, confidence_score = EXCLUDED.confidence_score, processing_time_ms = EXCLUDED.processing_time_ms, created_at = CURRENT_TIMESTAMP """ result_json = json.dumps({ 'classification': result['classification'], 'reasoning': result.get('reasoning', '') }) execute_query(query, ( email_id, result_json, result['confidence'], self.ollama_model, result.get('processing_time_ms', 0) )) logger.info(f"✅ Cached classification for email {email_id}") except Exception as e: logger.error(f"❌ Error caching classification: {e}") async def extract_invoice_details(self, email_data: Dict) -> Optional[Dict]: """ Extract structured data from invoice emails Returns: {invoice_number, amount, due_date, vendor_name, ...} """ # Only run for invoice-classified emails if email_data.get('classification') != 'invoice': return None # Check cache cached_result = self._get_cached_extraction(email_data['id']) if cached_result: logger.info(f"✅ Using cached extraction for email {email_data['id']}") return cached_result # Build extraction prompt system_prompt = self._build_extraction_prompt() user_message = self._build_email_context(email_data) # Call Ollama result = await self._call_ollama_extraction(system_prompt, user_message) if result: # Save to cache await self._cache_extraction(email_data['id'], result) return result return None def _build_extraction_prompt(self) -> str: """Build Danish system prompt for invoice data extraction""" return """Du er en ekspert i at udtrække struktureret data fra danske fakturaer. Din opgave er at finde og udtrække følgende information fra emailen: **Felter at udtrække:** - `invoice_number` (string) - Fakturanummer - `amount` (decimal) - Fakturabeløb i DKK (uden valutasymbol) - `due_date` (string YYYY-MM-DD) - Forfaldsdato - `vendor_name` (string) - Leverandørens navn - `order_number` (string) - Ordrenummer (hvis angivet) - `cvr_number` (string) - CVR-nummer (hvis angivet) **Vigtige regler:** - Hvis et felt ikke findes, brug `null` - Beløb skal være numerisk (uden "kr", "DKK" osv.) - Datoer skal være i formatet YYYY-MM-DD - Vær præcis - returner kun data du er sikker på **Output format (JSON):** ```json { "invoice_number": "INV-2024-001", "amount": 5250.00, "due_date": "2025-01-15", "vendor_name": "Acme Leverandør A/S", "order_number": "ORD-123", "cvr_number": "12345678" } ``` Returner KUN JSON - ingen anden tekst. """ async def _call_ollama_extraction(self, system_prompt: str, user_message: str) -> Optional[Dict]: """Call Ollama for data extraction""" url = f"{self.ollama_endpoint}/api/chat" payload = { "model": self.ollama_model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_message} ], "stream": False, "options": { "temperature": 0.0, # Zero temperature for deterministic extraction "num_predict": 300 } } try: async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=30)) as response: if response.status != 200: return None data = await response.json() content = data.get('message', {}).get('content', '') # Parse JSON response result = self._parse_extraction_response(content) return result except Exception as e: logger.error(f"❌ Error calling Ollama for extraction: {e}") return None def _parse_extraction_response(self, content: str) -> Optional[Dict]: """Parse Ollama extraction JSON response""" try: # Extract JSON if '```json' in content: start = content.find('```json') + 7 end = content.find('```', start) json_str = content[start:end].strip() elif '```' in content: start = content.find('```') + 3 end = content.find('```', start) json_str = content[start:end].strip() else: json_str = content.strip() data = json.loads(json_str) return data except Exception as e: logger.error(f"❌ Error parsing extraction response: {e}") return None def _get_cached_extraction(self, email_id: int) -> Optional[Dict]: """Get cached extraction from database""" query = """ SELECT result_json FROM email_analysis WHERE email_id = %s AND analysis_type = 'extraction' """ result = execute_query(query, (email_id,)) if result: return result[0]['result_json'] return None async def _cache_extraction(self, email_id: int, result: Dict): """Save extraction result to cache""" try: query = """ INSERT INTO email_analysis (email_id, analysis_type, result_json, model_used) VALUES (%s, 'extraction', %s, %s) ON CONFLICT (email_id, analysis_type) DO UPDATE SET result_json = EXCLUDED.result_json """ result_json = json.dumps(result) execute_query(query, (email_id, result_json, self.ollama_model)) logger.info(f"✅ Cached extraction for email {email_id}") except Exception as e: logger.error(f"❌ Error caching extraction: {e}")