bmc_hub/app/services/email_analysis_service.py

564 lines
23 KiB
Python
Raw Permalink Normal View History

"""
Email Analysis Service
AI-powered email classification using Ollama LLM
Adapted from OmniSync for BMC Hub timetracking use cases
"""
import logging
import json
import asyncio
from typing import Dict, Optional, List
from datetime import datetime
import aiohttp
from app.core.config import settings
from app.core.database import execute_query, execute_insert
logger = logging.getLogger(__name__)
class EmailAnalysisService:
"""AI-powered email analysis and classification using Ollama"""
def __init__(self):
self.ollama_endpoint = settings.OLLAMA_ENDPOINT
self.ollama_model = settings.OLLAMA_MODEL
self.confidence_threshold = settings.EMAIL_AI_CONFIDENCE_THRESHOLD
async def classify_email(self, email_data: Dict) -> Dict:
"""
Classify email using AI into predefined categories
Returns: {classification: str, confidence: float, reasoning: str}
"""
# Check cache first
cached_result = self._get_cached_classification(email_data['id'])
if cached_result:
logger.info(f"✅ Using cached classification for email {email_data['id']}")
return cached_result
# Build system prompt (Danish for accuracy)
system_prompt = self._build_classification_prompt()
# Build user message with email content
user_message = self._build_email_context(email_data)
# Call Ollama
result = await self._call_ollama(system_prompt, user_message)
if result:
# Save to cache
await self._cache_classification(email_data['id'], result)
return result
else:
# Fallback to unknown
return {
'classification': 'unknown',
'confidence': 0.0,
'reasoning': 'AI classification failed'
}
def _build_classification_prompt(self) -> str:
"""Build Danish system prompt for email classification"""
return """Classify this Danish business email into ONE category. Return ONLY valid JSON with no explanation.
Categories: invoice, freight_note, order_confirmation, time_confirmation, case_notification, customer_email, bankruptcy, newsletter, general, spam, unknown
Rules:
- invoice: Contains invoice number, amount, or payment info
- time_confirmation: Time/hours confirmation, often with case references
- case_notification: Notifications about specific cases (CC0001, Case #123)
- bankruptcy: Explicit bankruptcy/insolvency notice
- newsletter: Info mails, marketing, campaigns, webinars, or non-critical updates (not spam)
- Be conservative: Use general or unknown if uncertain
Response format (JSON only, no other text):
{"classification": "invoice", "confidence": 0.95, "reasoning": "Subject contains 'Faktura' and invoice number"}
IMPORTANT: Return ONLY the JSON object. Do not include any explanation, thinking, or additional text."""
def _build_email_context(self, email_data: Dict) -> str:
"""Build email context for AI classification (email body only - fast)"""
subject = email_data.get('subject', '')
sender = email_data.get('sender_email', '')
body = email_data.get('body_text', '') or email_data.get('body_html', '')
# Truncate body to avoid token limits (keep first 2000 chars)
if len(body) > 2000:
body = body[:2000] + "... [truncated]"
# Also note if PDF attachments exist (helps classification even without reading them)
attachments = email_data.get('attachments', [])
pdf_filenames = [a.get('filename', '') for a in attachments
if a.get('filename', '').lower().endswith('.pdf')]
attachment_note = ''
if pdf_filenames:
attachment_note = f"\n\nVedhæftede filer: {', '.join(pdf_filenames)}"
context = f"""**Email Information:**
From: {sender}
Subject: {subject}{attachment_note}
**Email Body:**
{body}
Klassificer denne email."""
return context
def _extract_pdf_texts_from_attachments(self, email_data: Dict) -> List[str]:
"""Extract text from PDF attachments in email_data (in-memory bytes)"""
pdf_texts = []
attachments = email_data.get('attachments', [])
for att in attachments:
filename = att.get('filename', '')
if not filename.lower().endswith('.pdf'):
continue
content = att.get('content', b'')
if not content:
continue
try:
import pdfplumber
import io
with pdfplumber.open(io.BytesIO(content)) as pdf:
pages = []
for page in pdf.pages:
text = page.extract_text(layout=True, x_tolerance=2, y_tolerance=2)
if text:
pages.append(text)
if pages:
pdf_texts.append(f"=== PDF: {filename} ===\n" + "\n".join(pages))
logger.info(f"📄 Extracted PDF text from attachment {filename} ({len(pages)} pages)")
except Exception as e:
logger.warning(f"⚠️ Could not extract PDF text from {filename}: {e}")
return pdf_texts
def _get_attachment_texts_from_db(self, email_id: int) -> List[str]:
"""Fetch PDF attachment text from DB (content_data column) for already-saved emails"""
from pathlib import Path
pdf_texts = []
try:
attachments = execute_query(
"""SELECT filename, content_data, file_path
FROM email_attachments
WHERE email_id = %s AND filename ILIKE '%.pdf'""",
(email_id,)
)
for att in (attachments or []):
filename = att.get('filename', 'unknown.pdf')
content = None
# Prefer content_data (bytes in DB)
if att.get('content_data'):
content = bytes(att['content_data'])
# Fallback: read from disk
elif att.get('file_path'):
fp = Path(att['file_path'])
if fp.exists():
content = fp.read_bytes()
if not content:
continue
try:
import pdfplumber
import io
with pdfplumber.open(io.BytesIO(content)) as pdf:
pages = []
for page in pdf.pages:
text = page.extract_text(layout=True, x_tolerance=2, y_tolerance=2)
if text:
pages.append(text)
if pages:
pdf_texts.append(f"=== PDF: {filename} ===\n" + "\n".join(pages))
logger.info(f"📄 Extracted PDF text from DB for {filename} ({len(pages)} pages)")
except Exception as e:
logger.warning(f"⚠️ Could not extract PDF text for {filename} from DB: {e}")
except Exception as e:
logger.error(f"❌ Error fetching attachment texts from DB for email {email_id}: {e}")
return pdf_texts
def _build_invoice_extraction_context(self, email_data: Dict) -> str:
"""Build extraction context with PDF as PRIMARY data source.
Email body/sender are ignored for invoice data only the attached PDF counts.
Sender can be a forwarder or external bookkeeper, not the actual vendor.
"""
subject = email_data.get('subject', '')
body = email_data.get('body_text', '') or ''
# Keep body brief — it's secondary context at best
if len(body) > 300:
body = body[:300] + "..."
# 1. Try in-memory attachment bytes first (during initial fetch)
pdf_texts = self._extract_pdf_texts_from_attachments(email_data)
# 2. Fallback: load from DB for already-processed emails
if not pdf_texts and email_data.get('id'):
pdf_texts = self._get_attachment_texts_from_db(email_data['id'])
if pdf_texts:
pdf_section = "\n\n".join(pdf_texts)
return f"""VEDHÆFTET FAKTURA (primær datakilde - analyser grundigt):
{pdf_section}
---
Email emne: {subject}
Email tekst (sekundær): {body}
VIGTIGT: Udtrækket SKAL baseres PDF-indholdet ovenfor.
Afsenderens email-adresse er IKKE leverandøren leverandøren fremgår af fakturaen."""
else:
# No PDF found — fall back to email body
logger.warning(f"⚠️ No PDF attachment found for email {email_data.get('id')} — using email body only")
body_full = email_data.get('body_text', '') or email_data.get('body_html', '') or ''
if len(body_full) > 3000:
body_full = body_full[:3000] + "..."
return f"""Email emne: {subject}
Email tekst:
{body_full}
Ingen PDF vedhæftet udtræk fakturadata fra email-teksten."""
async def _call_ollama(self, system_prompt: str, user_message: str) -> Optional[Dict]:
"""Call Ollama API for classification"""
url = f"{self.ollama_endpoint}/api/chat"
payload = {
"model": self.ollama_model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
],
"stream": False,
"options": {
"temperature": 0.1, # Low temperature for consistent classification
"num_predict": 500 # Enough for complete JSON response
}
}
try:
start_time = datetime.now()
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"❌ Ollama API error: {response.status} - {error_text}")
return None
data = await response.json()
message_data = data.get('message', {})
# qwen3 model returns 'thinking' field instead of 'content' for reasoning
# Try both fields
content = message_data.get('content', '') or message_data.get('thinking', '')
processing_time = (datetime.now() - start_time).total_seconds() * 1000
if not content:
logger.error(f"❌ Ollama returned empty response. Message keys: {message_data.keys()}")
return None
# Parse JSON response
result = self._parse_ollama_response(content)
if result:
result['processing_time_ms'] = int(processing_time)
logger.info(f"✅ AI classification: {result['classification']} (confidence: {result['confidence']}, {processing_time:.0f}ms)")
return result
else:
logger.error(f"❌ Failed to parse Ollama response. Content length: {len(content)}, First 300 chars: {content[:300]}")
return None
except asyncio.TimeoutError:
logger.error("❌ Ollama API timeout (30s)")
return None
except Exception as e:
logger.error(f"❌ Error calling Ollama API: {e}")
return None
def _parse_ollama_response(self, content: str) -> Optional[Dict]:
"""Parse Ollama JSON response"""
try:
# Extract JSON from response (handle markdown code blocks)
if '```json' in content:
start = content.find('```json') + 7
end = content.find('```', start)
json_str = content[start:end].strip()
elif '```' in content:
start = content.find('```') + 3
end = content.find('```', start)
json_str = content[start:end].strip()
else:
json_str = content.strip()
# Parse JSON
data = json.loads(json_str)
# Validate required fields
if 'classification' not in data:
logger.error("❌ Missing 'classification' field in AI response")
return None
# Normalize and validate
classification = data['classification'].lower()
confidence = float(data.get('confidence', 0.0))
reasoning = data.get('reasoning', '')
# Validate classification category
valid_categories = [
'invoice', 'freight_note', 'order_confirmation', 'time_confirmation',
'case_notification', 'customer_email', 'bankruptcy', 'general', 'spam', 'unknown'
]
if classification not in valid_categories:
logger.warning(f"⚠️ Unknown classification '{classification}', defaulting to 'unknown'")
classification = 'unknown'
return {
'classification': classification,
'confidence': confidence,
'reasoning': reasoning
}
except json.JSONDecodeError as e:
logger.error(f"❌ JSON parse error: {e} - Content: {content[:200]}")
return None
except Exception as e:
logger.error(f"❌ Error parsing Ollama response: {e}")
return None
def _get_cached_classification(self, email_id: int) -> Optional[Dict]:
"""Get cached classification from database"""
query = """
SELECT result_json, confidence_score, processing_time_ms
FROM email_analysis
WHERE email_id = %s AND analysis_type = 'classification'
"""
result = execute_query(query, (email_id,))
if result:
row = result[0]
return {
'classification': row['result_json'].get('classification'),
'confidence': float(row['confidence_score']),
'reasoning': row['result_json'].get('reasoning', ''),
'processing_time_ms': row['processing_time_ms'],
'cached': True
}
return None
async def _cache_classification(self, email_id: int, result: Dict):
"""Save classification result to cache"""
try:
query = """
INSERT INTO email_analysis
(email_id, analysis_type, result_json, confidence_score, model_used, processing_time_ms)
VALUES (%s, 'classification', %s, %s, %s, %s)
ON CONFLICT (email_id, analysis_type)
DO UPDATE SET
result_json = EXCLUDED.result_json,
confidence_score = EXCLUDED.confidence_score,
processing_time_ms = EXCLUDED.processing_time_ms,
created_at = CURRENT_TIMESTAMP
"""
result_json = json.dumps({
'classification': result['classification'],
'reasoning': result.get('reasoning', '')
})
execute_query(query, (
email_id,
result_json,
result['confidence'],
self.ollama_model,
result.get('processing_time_ms', 0)
))
logger.info(f"✅ Cached classification for email {email_id}")
except Exception as e:
logger.error(f"❌ Error caching classification: {e}")
async def extract_invoice_details(self, email_data: Dict) -> Optional[Dict]:
"""
Extract structured data from invoice emails
Returns: {invoice_number, amount, due_date, vendor_name, ...}
"""
# Only run for invoice-classified emails
if email_data.get('classification') != 'invoice':
return None
# Check cache
cached_result = self._get_cached_extraction(email_data['id'])
if cached_result:
logger.info(f"✅ Using cached extraction for email {email_data['id']}")
return cached_result
# Build extraction prompt — use PDF-first context, not email sender
system_prompt = self._build_extraction_prompt()
user_message = self._build_invoice_extraction_context(email_data)
# Call Ollama
result = await self._call_ollama_extraction(system_prompt, user_message)
if result:
# Save to cache
await self._cache_extraction(email_data['id'], result)
return result
return None
def _build_extraction_prompt(self) -> str:
"""Build comprehensive Danish system prompt for deep invoice data extraction."""
from app.core.config import settings as cfg
own_cvr = getattr(cfg, 'OWN_CVR', '')
return f"""Du er en ekspert i at læse og udtrække strukturerede data fra danske fakturaer og kreditnotaer.
DU SKAL ANALYSERE SELVE FAKTURAEN (PDF-indholdet) - IKKE email-afsenderen.
Afsender kan være os selv der videresender, eller en ekstern bogholder - IGNORER afsender.
Leverandørens navn og CVR fremgår ALTID af selve fakturadokumentet.
VIGTIGE REGLER:
1. Returner KUN gyldig JSON - ingen forklaring eller ekstra tekst
2. Hvis et felt ikke findes, sæt det til null
3. Datoer skal være i format YYYY-MM-DD
4. DANSKE PRISFORMATER:
- Tusind-separator: . (punkt) eller mellemrum: "5.965,18" eller "5 965,18"
- Decimal-separator: , (komma): "1.234,56 kr"
- I JSON: brug . (punkt) som decimal: 1234.56
- Eksempel: "5.965,18 kr" 5965.18
5. CVR-nummer: 8 cifre uden mellemrum
- IGNORER CVR {own_cvr} det er VORES eget CVR (køber), ikke leverandørens!
- Find LEVERANDØRENS CVR i toppen/headeren af fakturaen
6. DOKUMENTTYPE:
- "invoice" = Almindelig faktura
- "credit_note" = Kreditnota (Kreditnota, Refusion, Tilbagebetaling, Credit Note)
7. Varelinjer: udtræk ALLE linjer med beskrivelse, antal, enhedspris, total
JSON STRUKTUR:
{{
"document_type": "invoice" eller "credit_note",
"invoice_number": "fakturanummer",
"vendor_name": "leverandørens firmanavn",
"vendor_cvr": "12345678",
"invoice_date": "YYYY-MM-DD",
"due_date": "YYYY-MM-DD",
"currency": "DKK",
"total_amount": 1234.56,
"vat_amount": 246.91,
"net_amount": 987.65,
"order_number": "ordrenummer eller null",
"original_invoice_reference": "ref til original faktura (kun kreditnotaer) eller null",
"lines": [
{{
"line_number": 1,
"description": "varebeskrivelse",
"quantity": 2.0,
"unit_price": 500.00,
"line_total": 1000.00,
"vat_rate": 25.00,
"vat_amount": 250.00
}}
],
"confidence": 0.95
}}
Returner KUN JSON - ingen anden tekst."""
async def _call_ollama_extraction(self, system_prompt: str, user_message: str) -> Optional[Dict]:
"""Call Ollama for data extraction"""
url = f"{self.ollama_endpoint}/api/chat"
payload = {
"model": self.ollama_model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
],
"stream": False,
"format": "json",
"options": {
"temperature": 0.0, # Deterministic extraction
"num_predict": 3000 # Enough for full invoice with many lines
}
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=120)) as response:
if response.status != 200:
return None
data = await response.json()
msg = data.get('message', {})
# qwen3 sometimes returns content in 'thinking' field
content = msg.get('content', '') or msg.get('thinking', '')
# Parse JSON response
result = self._parse_extraction_response(content)
return result
except Exception as e:
logger.error(f"❌ Error calling Ollama for extraction: {e}")
return None
def _parse_extraction_response(self, content: str) -> Optional[Dict]:
"""Parse Ollama extraction JSON response"""
try:
# Extract JSON
if '```json' in content:
start = content.find('```json') + 7
end = content.find('```', start)
json_str = content[start:end].strip()
elif '```' in content:
start = content.find('```') + 3
end = content.find('```', start)
json_str = content[start:end].strip()
else:
json_str = content.strip()
data = json.loads(json_str)
return data
except Exception as e:
logger.error(f"❌ Error parsing extraction response: {e}")
return None
def _get_cached_extraction(self, email_id: int) -> Optional[Dict]:
"""Get cached extraction from database"""
query = """
SELECT result_json
FROM email_analysis
WHERE email_id = %s AND analysis_type = 'extraction'
"""
result = execute_query(query, (email_id,))
if result:
return result[0]['result_json']
return None
async def _cache_extraction(self, email_id: int, result: Dict):
"""Save extraction result to cache"""
try:
query = """
INSERT INTO email_analysis
(email_id, analysis_type, result_json, model_used)
VALUES (%s, 'extraction', %s, %s)
ON CONFLICT (email_id, analysis_type)
DO UPDATE SET result_json = EXCLUDED.result_json
"""
result_json = json.dumps(result)
execute_query(query, (email_id, result_json, self.ollama_model))
logger.info(f"✅ Cached extraction for email {email_id}")
except Exception as e:
logger.error(f"❌ Error caching extraction: {e}")