feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
"""
|
|
|
|
|
Email Analysis Service
|
|
|
|
|
AI-powered email classification using Ollama LLM
|
|
|
|
|
Adapted from OmniSync for BMC Hub timetracking use cases
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import logging
|
|
|
|
|
import json
|
|
|
|
|
from typing import Dict, Optional, List
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
import aiohttp
|
|
|
|
|
|
|
|
|
|
from app.core.config import settings
|
|
|
|
|
from app.core.database import execute_query, execute_insert
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EmailAnalysisService:
|
|
|
|
|
"""AI-powered email analysis and classification using Ollama"""
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.ollama_endpoint = settings.OLLAMA_ENDPOINT
|
|
|
|
|
self.ollama_model = settings.OLLAMA_MODEL
|
|
|
|
|
self.confidence_threshold = settings.EMAIL_AI_CONFIDENCE_THRESHOLD
|
|
|
|
|
|
|
|
|
|
async def classify_email(self, email_data: Dict) -> Dict:
|
|
|
|
|
"""
|
|
|
|
|
Classify email using AI into predefined categories
|
|
|
|
|
Returns: {classification: str, confidence: float, reasoning: str}
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# Check cache first
|
|
|
|
|
cached_result = self._get_cached_classification(email_data['id'])
|
|
|
|
|
if cached_result:
|
|
|
|
|
logger.info(f"✅ Using cached classification for email {email_data['id']}")
|
|
|
|
|
return cached_result
|
|
|
|
|
|
|
|
|
|
# Build system prompt (Danish for accuracy)
|
|
|
|
|
system_prompt = self._build_classification_prompt()
|
|
|
|
|
|
|
|
|
|
# Build user message with email content
|
|
|
|
|
user_message = self._build_email_context(email_data)
|
|
|
|
|
|
|
|
|
|
# Call Ollama
|
|
|
|
|
result = await self._call_ollama(system_prompt, user_message)
|
|
|
|
|
|
|
|
|
|
if result:
|
|
|
|
|
# Save to cache
|
|
|
|
|
await self._cache_classification(email_data['id'], result)
|
|
|
|
|
return result
|
|
|
|
|
else:
|
|
|
|
|
# Fallback to unknown
|
|
|
|
|
return {
|
|
|
|
|
'classification': 'unknown',
|
|
|
|
|
'confidence': 0.0,
|
|
|
|
|
'reasoning': 'AI classification failed'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def _build_classification_prompt(self) -> str:
|
|
|
|
|
"""Build Danish system prompt for email classification"""
|
2025-12-11 12:45:29 +01:00
|
|
|
return """Classify this Danish business email into ONE category. Return ONLY valid JSON with no explanation.
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
|
2025-12-11 12:45:29 +01:00
|
|
|
Categories: invoice, freight_note, order_confirmation, time_confirmation, case_notification, customer_email, bankruptcy, general, spam, unknown
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
|
2025-12-11 12:45:29 +01:00
|
|
|
Rules:
|
|
|
|
|
- invoice: Contains invoice number, amount, or payment info
|
|
|
|
|
- time_confirmation: Time/hours confirmation, often with case references
|
|
|
|
|
- case_notification: Notifications about specific cases (CC0001, Case #123)
|
|
|
|
|
- bankruptcy: Explicit bankruptcy/insolvency notice
|
|
|
|
|
- Be conservative: Use general or unknown if uncertain
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
|
2025-12-11 12:45:29 +01:00
|
|
|
Response format (JSON only, no other text):
|
|
|
|
|
{"classification": "invoice", "confidence": 0.95, "reasoning": "Subject contains 'Faktura' and invoice number"}
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
|
2025-12-11 12:45:29 +01:00
|
|
|
IMPORTANT: Return ONLY the JSON object. Do not include any explanation, thinking, or additional text."""
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
|
|
|
|
|
def _build_email_context(self, email_data: Dict) -> str:
|
|
|
|
|
"""Build email context for AI analysis"""
|
|
|
|
|
|
|
|
|
|
subject = email_data.get('subject', '')
|
|
|
|
|
sender = email_data.get('sender_email', '')
|
|
|
|
|
body = email_data.get('body_text', '') or email_data.get('body_html', '')
|
|
|
|
|
|
|
|
|
|
# Truncate body to avoid token limits (keep first 2000 chars)
|
|
|
|
|
if len(body) > 2000:
|
|
|
|
|
body = body[:2000] + "... [truncated]"
|
|
|
|
|
|
|
|
|
|
context = f"""**Email Information:**
|
|
|
|
|
From: {sender}
|
|
|
|
|
Subject: {subject}
|
|
|
|
|
|
|
|
|
|
**Email Body:**
|
|
|
|
|
{body}
|
|
|
|
|
|
|
|
|
|
Klassificer denne email."""
|
|
|
|
|
|
|
|
|
|
return context
|
|
|
|
|
|
|
|
|
|
async def _call_ollama(self, system_prompt: str, user_message: str) -> Optional[Dict]:
|
|
|
|
|
"""Call Ollama API for classification"""
|
|
|
|
|
|
|
|
|
|
url = f"{self.ollama_endpoint}/api/chat"
|
|
|
|
|
|
|
|
|
|
payload = {
|
|
|
|
|
"model": self.ollama_model,
|
|
|
|
|
"messages": [
|
|
|
|
|
{"role": "system", "content": system_prompt},
|
|
|
|
|
{"role": "user", "content": user_message}
|
|
|
|
|
],
|
|
|
|
|
"stream": False,
|
|
|
|
|
"options": {
|
|
|
|
|
"temperature": 0.1, # Low temperature for consistent classification
|
2025-12-11 12:45:29 +01:00
|
|
|
"num_predict": 500 # Enough for complete JSON response
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
start_time = datetime.now()
|
|
|
|
|
|
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
|
|
|
async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=30)) as response:
|
|
|
|
|
if response.status != 200:
|
|
|
|
|
error_text = await response.text()
|
|
|
|
|
logger.error(f"❌ Ollama API error: {response.status} - {error_text}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
data = await response.json()
|
2025-12-11 12:45:29 +01:00
|
|
|
|
|
|
|
|
message_data = data.get('message', {})
|
|
|
|
|
|
|
|
|
|
# qwen3 model returns 'thinking' field instead of 'content' for reasoning
|
|
|
|
|
# Try both fields
|
|
|
|
|
content = message_data.get('content', '') or message_data.get('thinking', '')
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
|
|
|
|
|
processing_time = (datetime.now() - start_time).total_seconds() * 1000
|
|
|
|
|
|
2025-12-11 12:45:29 +01:00
|
|
|
if not content:
|
|
|
|
|
logger.error(f"❌ Ollama returned empty response. Message keys: {message_data.keys()}")
|
|
|
|
|
return None
|
|
|
|
|
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
# Parse JSON response
|
|
|
|
|
result = self._parse_ollama_response(content)
|
|
|
|
|
|
|
|
|
|
if result:
|
|
|
|
|
result['processing_time_ms'] = int(processing_time)
|
|
|
|
|
logger.info(f"✅ AI classification: {result['classification']} (confidence: {result['confidence']}, {processing_time:.0f}ms)")
|
|
|
|
|
return result
|
|
|
|
|
else:
|
2025-12-11 12:45:29 +01:00
|
|
|
logger.error(f"❌ Failed to parse Ollama response. Content length: {len(content)}, First 300 chars: {content[:300]}")
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
logger.error("❌ Ollama API timeout (30s)")
|
|
|
|
|
return None
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Error calling Ollama API: {e}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _parse_ollama_response(self, content: str) -> Optional[Dict]:
|
|
|
|
|
"""Parse Ollama JSON response"""
|
|
|
|
|
try:
|
|
|
|
|
# Extract JSON from response (handle markdown code blocks)
|
|
|
|
|
if '```json' in content:
|
|
|
|
|
start = content.find('```json') + 7
|
|
|
|
|
end = content.find('```', start)
|
|
|
|
|
json_str = content[start:end].strip()
|
|
|
|
|
elif '```' in content:
|
|
|
|
|
start = content.find('```') + 3
|
|
|
|
|
end = content.find('```', start)
|
|
|
|
|
json_str = content[start:end].strip()
|
|
|
|
|
else:
|
|
|
|
|
json_str = content.strip()
|
|
|
|
|
|
|
|
|
|
# Parse JSON
|
|
|
|
|
data = json.loads(json_str)
|
|
|
|
|
|
|
|
|
|
# Validate required fields
|
|
|
|
|
if 'classification' not in data:
|
|
|
|
|
logger.error("❌ Missing 'classification' field in AI response")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# Normalize and validate
|
|
|
|
|
classification = data['classification'].lower()
|
|
|
|
|
confidence = float(data.get('confidence', 0.0))
|
|
|
|
|
reasoning = data.get('reasoning', '')
|
|
|
|
|
|
|
|
|
|
# Validate classification category
|
|
|
|
|
valid_categories = [
|
|
|
|
|
'invoice', 'freight_note', 'order_confirmation', 'time_confirmation',
|
|
|
|
|
'case_notification', 'customer_email', 'bankruptcy', 'general', 'spam', 'unknown'
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
if classification not in valid_categories:
|
|
|
|
|
logger.warning(f"⚠️ Unknown classification '{classification}', defaulting to 'unknown'")
|
|
|
|
|
classification = 'unknown'
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'classification': classification,
|
|
|
|
|
'confidence': confidence,
|
|
|
|
|
'reasoning': reasoning
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
except json.JSONDecodeError as e:
|
|
|
|
|
logger.error(f"❌ JSON parse error: {e} - Content: {content[:200]}")
|
|
|
|
|
return None
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Error parsing Ollama response: {e}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _get_cached_classification(self, email_id: int) -> Optional[Dict]:
|
|
|
|
|
"""Get cached classification from database"""
|
|
|
|
|
query = """
|
|
|
|
|
SELECT result_json, confidence_score, processing_time_ms
|
|
|
|
|
FROM email_analysis
|
|
|
|
|
WHERE email_id = %s AND analysis_type = 'classification'
|
|
|
|
|
"""
|
|
|
|
|
result = execute_query(query, (email_id,))
|
|
|
|
|
|
|
|
|
|
if result:
|
|
|
|
|
row = result[0]
|
|
|
|
|
return {
|
|
|
|
|
'classification': row['result_json'].get('classification'),
|
|
|
|
|
'confidence': float(row['confidence_score']),
|
|
|
|
|
'reasoning': row['result_json'].get('reasoning', ''),
|
|
|
|
|
'processing_time_ms': row['processing_time_ms'],
|
|
|
|
|
'cached': True
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
async def _cache_classification(self, email_id: int, result: Dict):
|
|
|
|
|
"""Save classification result to cache"""
|
|
|
|
|
try:
|
|
|
|
|
query = """
|
|
|
|
|
INSERT INTO email_analysis
|
|
|
|
|
(email_id, analysis_type, result_json, confidence_score, model_used, processing_time_ms)
|
|
|
|
|
VALUES (%s, 'classification', %s, %s, %s, %s)
|
|
|
|
|
ON CONFLICT (email_id, analysis_type)
|
|
|
|
|
DO UPDATE SET
|
|
|
|
|
result_json = EXCLUDED.result_json,
|
|
|
|
|
confidence_score = EXCLUDED.confidence_score,
|
|
|
|
|
processing_time_ms = EXCLUDED.processing_time_ms,
|
|
|
|
|
created_at = CURRENT_TIMESTAMP
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
result_json = json.dumps({
|
|
|
|
|
'classification': result['classification'],
|
|
|
|
|
'reasoning': result.get('reasoning', '')
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
execute_query(query, (
|
|
|
|
|
email_id,
|
|
|
|
|
result_json,
|
|
|
|
|
result['confidence'],
|
|
|
|
|
self.ollama_model,
|
|
|
|
|
result.get('processing_time_ms', 0)
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
logger.info(f"✅ Cached classification for email {email_id}")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Error caching classification: {e}")
|
|
|
|
|
|
|
|
|
|
async def extract_invoice_details(self, email_data: Dict) -> Optional[Dict]:
|
|
|
|
|
"""
|
|
|
|
|
Extract structured data from invoice emails
|
|
|
|
|
Returns: {invoice_number, amount, due_date, vendor_name, ...}
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# Only run for invoice-classified emails
|
|
|
|
|
if email_data.get('classification') != 'invoice':
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# Check cache
|
|
|
|
|
cached_result = self._get_cached_extraction(email_data['id'])
|
|
|
|
|
if cached_result:
|
|
|
|
|
logger.info(f"✅ Using cached extraction for email {email_data['id']}")
|
|
|
|
|
return cached_result
|
|
|
|
|
|
|
|
|
|
# Build extraction prompt
|
|
|
|
|
system_prompt = self._build_extraction_prompt()
|
|
|
|
|
user_message = self._build_email_context(email_data)
|
|
|
|
|
|
|
|
|
|
# Call Ollama
|
|
|
|
|
result = await self._call_ollama_extraction(system_prompt, user_message)
|
|
|
|
|
|
|
|
|
|
if result:
|
|
|
|
|
# Save to cache
|
|
|
|
|
await self._cache_extraction(email_data['id'], result)
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _build_extraction_prompt(self) -> str:
|
|
|
|
|
"""Build Danish system prompt for invoice data extraction"""
|
|
|
|
|
return """Du er en ekspert i at udtrække struktureret data fra danske fakturaer.
|
|
|
|
|
|
|
|
|
|
Din opgave er at finde og udtrække følgende information fra emailen:
|
|
|
|
|
|
|
|
|
|
**Felter at udtrække:**
|
|
|
|
|
- `invoice_number` (string) - Fakturanummer
|
|
|
|
|
- `amount` (decimal) - Fakturabeløb i DKK (uden valutasymbol)
|
|
|
|
|
- `due_date` (string YYYY-MM-DD) - Forfaldsdato
|
|
|
|
|
- `vendor_name` (string) - Leverandørens navn
|
|
|
|
|
- `order_number` (string) - Ordrenummer (hvis angivet)
|
|
|
|
|
- `cvr_number` (string) - CVR-nummer (hvis angivet)
|
|
|
|
|
|
|
|
|
|
**Vigtige regler:**
|
|
|
|
|
- Hvis et felt ikke findes, brug `null`
|
|
|
|
|
- Beløb skal være numerisk (uden "kr", "DKK" osv.)
|
|
|
|
|
- Datoer skal være i formatet YYYY-MM-DD
|
|
|
|
|
- Vær præcis - returner kun data du er sikker på
|
|
|
|
|
|
|
|
|
|
**Output format (JSON):**
|
|
|
|
|
```json
|
|
|
|
|
{
|
|
|
|
|
"invoice_number": "INV-2024-001",
|
|
|
|
|
"amount": 5250.00,
|
|
|
|
|
"due_date": "2025-01-15",
|
|
|
|
|
"vendor_name": "Acme Leverandør A/S",
|
|
|
|
|
"order_number": "ORD-123",
|
|
|
|
|
"cvr_number": "12345678"
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
Returner KUN JSON - ingen anden tekst.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
async def _call_ollama_extraction(self, system_prompt: str, user_message: str) -> Optional[Dict]:
|
|
|
|
|
"""Call Ollama for data extraction"""
|
|
|
|
|
|
|
|
|
|
url = f"{self.ollama_endpoint}/api/chat"
|
|
|
|
|
|
|
|
|
|
payload = {
|
|
|
|
|
"model": self.ollama_model,
|
|
|
|
|
"messages": [
|
|
|
|
|
{"role": "system", "content": system_prompt},
|
|
|
|
|
{"role": "user", "content": user_message}
|
|
|
|
|
],
|
|
|
|
|
"stream": False,
|
|
|
|
|
"options": {
|
|
|
|
|
"temperature": 0.0, # Zero temperature for deterministic extraction
|
|
|
|
|
"num_predict": 300
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
|
|
|
async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=30)) as response:
|
|
|
|
|
if response.status != 200:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
data = await response.json()
|
|
|
|
|
content = data.get('message', {}).get('content', '')
|
|
|
|
|
|
|
|
|
|
# Parse JSON response
|
|
|
|
|
result = self._parse_extraction_response(content)
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Error calling Ollama for extraction: {e}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _parse_extraction_response(self, content: str) -> Optional[Dict]:
|
|
|
|
|
"""Parse Ollama extraction JSON response"""
|
|
|
|
|
try:
|
|
|
|
|
# Extract JSON
|
|
|
|
|
if '```json' in content:
|
|
|
|
|
start = content.find('```json') + 7
|
|
|
|
|
end = content.find('```', start)
|
|
|
|
|
json_str = content[start:end].strip()
|
|
|
|
|
elif '```' in content:
|
|
|
|
|
start = content.find('```') + 3
|
|
|
|
|
end = content.find('```', start)
|
|
|
|
|
json_str = content[start:end].strip()
|
|
|
|
|
else:
|
|
|
|
|
json_str = content.strip()
|
|
|
|
|
|
|
|
|
|
data = json.loads(json_str)
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Error parsing extraction response: {e}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _get_cached_extraction(self, email_id: int) -> Optional[Dict]:
|
|
|
|
|
"""Get cached extraction from database"""
|
|
|
|
|
query = """
|
|
|
|
|
SELECT result_json
|
|
|
|
|
FROM email_analysis
|
|
|
|
|
WHERE email_id = %s AND analysis_type = 'extraction'
|
|
|
|
|
"""
|
|
|
|
|
result = execute_query(query, (email_id,))
|
|
|
|
|
|
|
|
|
|
if result:
|
|
|
|
|
return result[0]['result_json']
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
async def _cache_extraction(self, email_id: int, result: Dict):
|
|
|
|
|
"""Save extraction result to cache"""
|
|
|
|
|
try:
|
|
|
|
|
query = """
|
|
|
|
|
INSERT INTO email_analysis
|
|
|
|
|
(email_id, analysis_type, result_json, model_used)
|
|
|
|
|
VALUES (%s, 'extraction', %s, %s)
|
|
|
|
|
ON CONFLICT (email_id, analysis_type)
|
|
|
|
|
DO UPDATE SET result_json = EXCLUDED.result_json
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
result_json = json.dumps(result)
|
|
|
|
|
execute_query(query, (email_id, result_json, self.ollama_model))
|
|
|
|
|
|
|
|
|
|
logger.info(f"✅ Cached extraction for email {email_id}")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Error caching extraction: {e}")
|