From 8791e34f4e32e4cff0001c2beb750e3a1b9bdc1f Mon Sep 17 00:00:00 2001 From: Christian Date: Thu, 11 Dec 2025 02:31:29 +0100 Subject: [PATCH] feat: Implement email processing system with scheduler, fetching, classification, and rule matching - Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules. - Introduced EmailScheduler for background processing of emails every 5 minutes. - Developed EmailService to handle email fetching from IMAP and Microsoft Graph API. - Created database migration for email system, including tables for email messages, rules, attachments, and analysis. - Implemented AI classification and extraction for invoices and time confirmations. - Added logging for better traceability and error handling throughout the email processing pipeline. --- app/core/config.py | 33 ++ app/emails/backend/router.py | 385 +++++++++++++++++ app/services/email_analysis_service.py | 428 +++++++++++++++++++ app/services/email_processor_service.py | 434 +++++++++++++++++++ app/services/email_scheduler.py | 77 ++++ app/services/email_service.py | 449 ++++++++++++++++++++ app/timetracking/backend/economic_export.py | 74 +++- app/timetracking/backend/models.py | 10 +- app/timetracking/backend/order_service.py | 35 +- app/timetracking/backend/wizard.py | 2 + app/timetracking/frontend/orders.html | 47 +- app/timetracking/frontend/wizard.html | 16 +- main.py | 7 + migrations/013_email_system.sql | 244 +++++++++++ requirements.txt | 4 + 15 files changed, 2202 insertions(+), 43 deletions(-) create mode 100644 app/emails/backend/router.py create mode 100644 app/services/email_analysis_service.py create mode 100644 app/services/email_processor_service.py create mode 100644 app/services/email_scheduler.py create mode 100644 app/services/email_service.py create mode 100644 migrations/013_email_system.sql diff --git a/app/core/config.py b/app/core/config.py index 1a7ca8c..0559223 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -59,6 +59,39 @@ class Settings(BaseSettings): OLLAMA_ENDPOINT: str = "http://ai_direct.cs.blaahund.dk" OLLAMA_MODEL: str = "qwen2.5-coder:7b" # qwen2.5-coder fungerer bedre til JSON udtrækning + # Email System Configuration + EMAIL_TO_TICKET_ENABLED: bool = False # 🚨 SAFETY: Disable auto-processing until configured + + # Email Fetching (IMAP) + USE_GRAPH_API: bool = False # Use Microsoft Graph API instead of IMAP (preferred) + IMAP_SERVER: str = "outlook.office365.com" + IMAP_PORT: int = 993 + IMAP_USE_SSL: bool = True + IMAP_USERNAME: str = "" + IMAP_PASSWORD: str = "" + IMAP_FOLDER: str = "INBOX" + IMAP_READ_ONLY: bool = True # 🚨 SAFETY: Never mark emails as read or modify mailbox + + # Microsoft Graph API (OAuth2) + GRAPH_TENANT_ID: str = "" + GRAPH_CLIENT_ID: str = "" + GRAPH_CLIENT_SECRET: str = "" + GRAPH_USER_EMAIL: str = "" # Email account to monitor + + # Email Processing + EMAIL_PROCESS_INTERVAL_MINUTES: int = 5 # Background job frequency + EMAIL_MAX_FETCH_PER_RUN: int = 50 # Limit emails per processing cycle + EMAIL_RETENTION_DAYS: int = 90 # Days to keep emails before soft delete + + # Email Classification (AI) + EMAIL_AI_ENABLED: bool = True + EMAIL_AI_CONFIDENCE_THRESHOLD: float = 0.7 # Minimum confidence for auto-processing + EMAIL_AUTO_CLASSIFY: bool = True # Run AI classification on new emails + + # Email Rules Engine + EMAIL_RULES_ENABLED: bool = True + EMAIL_RULES_AUTO_PROCESS: bool = False # 🚨 SAFETY: Require manual approval initially + # Company Info OWN_CVR: str = "29522790" # BMC Denmark ApS - ignore when detecting vendors diff --git a/app/emails/backend/router.py b/app/emails/backend/router.py new file mode 100644 index 0000000..11d4f62 --- /dev/null +++ b/app/emails/backend/router.py @@ -0,0 +1,385 @@ +""" +Email Management Router +API endpoints for email viewing, classification, and rule management +""" + +import logging +from fastapi import APIRouter, HTTPException, Query +from typing import List, Optional +from pydantic import BaseModel +from datetime import datetime, date + +from app.core.database import execute_query, execute_insert +from app.services.email_processor_service import EmailProcessorService + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +# Pydantic Models +class EmailListItem(BaseModel): + id: int + message_id: str + subject: str + sender_email: str + sender_name: Optional[str] + received_date: datetime + classification: Optional[str] + confidence_score: Optional[float] + status: str + is_read: bool + has_attachments: bool + attachment_count: int + rule_name: Optional[str] = None + supplier_name: Optional[str] = None + customer_name: Optional[str] = None + + +class EmailDetail(BaseModel): + id: int + message_id: str + subject: str + sender_email: str + sender_name: Optional[str] + recipient_email: Optional[str] + cc: Optional[str] + body_text: Optional[str] + body_html: Optional[str] + received_date: datetime + folder: str + classification: Optional[str] + confidence_score: Optional[float] + status: str + is_read: bool + has_attachments: bool + attachment_count: int + rule_id: Optional[int] + supplier_id: Optional[int] + customer_id: Optional[int] + linked_case_id: Optional[int] + extracted_invoice_number: Optional[str] + extracted_amount: Optional[float] + extracted_due_date: Optional[date] + auto_processed: bool + created_at: datetime + updated_at: datetime + + +class EmailRule(BaseModel): + id: Optional[int] = None + name: str + description: Optional[str] + conditions: dict + action_type: str + action_params: Optional[dict] = {} + priority: int = 100 + enabled: bool = True + match_count: int = 0 + last_matched_at: Optional[datetime] + + +class ProcessingStats(BaseModel): + status: str + fetched: int = 0 + saved: int = 0 + classified: int = 0 + rules_matched: int = 0 + errors: int = 0 + + +# Email Endpoints +@router.get("/emails", response_model=List[EmailListItem]) +async def list_emails( + status: Optional[str] = Query(None), + classification: Optional[str] = Query(None), + limit: int = Query(50, le=500), + offset: int = Query(0, ge=0) +): + """Get list of emails with filtering""" + try: + where_clauses = ["em.deleted_at IS NULL"] + params = [] + + if status: + where_clauses.append("em.status = %s") + params.append(status) + + if classification: + where_clauses.append("em.classification = %s") + params.append(classification) + + where_sql = " AND ".join(where_clauses) + + query = f""" + SELECT + em.id, em.message_id, em.subject, em.sender_email, em.sender_name, + em.received_date, em.classification, em.confidence_score, em.status, + em.is_read, em.has_attachments, em.attachment_count, + er.name as rule_name, + v.name as supplier_name, + NULL as customer_name + FROM email_messages em + LEFT JOIN email_rules er ON em.rule_id = er.id + LEFT JOIN vendors v ON em.supplier_id = v.id + WHERE {where_sql} + ORDER BY em.received_date DESC + LIMIT %s OFFSET %s + """ + + params.extend([limit, offset]) + result = execute_query(query, tuple(params)) + + return result + + except Exception as e: + logger.error(f"❌ Error listing emails: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/emails/{email_id}", response_model=EmailDetail) +async def get_email(email_id: int): + """Get email detail by ID""" + try: + query = """ + SELECT * FROM email_messages + WHERE id = %s AND deleted_at IS NULL + """ + result = execute_query(query, (email_id,)) + + if not result: + raise HTTPException(status_code=404, detail="Email not found") + + # Mark as read + update_query = "UPDATE email_messages SET is_read = true WHERE id = %s" + execute_query(update_query, (email_id,)) + + return result[0] + + except HTTPException: + raise + except Exception as e: + logger.error(f"❌ Error getting email {email_id}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/emails/process") +async def process_emails(): + """Manually trigger email processing""" + try: + processor = EmailProcessorService() + stats = await processor.process_inbox() + + return { + "success": True, + "message": "Email processing completed", + "stats": stats + } + + except Exception as e: + logger.error(f"❌ Email processing failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/emails/{email_id}/reprocess") +async def reprocess_email(email_id: int): + """Manually reprocess a single email (reclassify + rematch rules)""" + try: + processor = EmailProcessorService() + await processor.reprocess_email(email_id) + + return { + "success": True, + "message": f"Email {email_id} reprocessed successfully" + } + + except Exception as e: + logger.error(f"❌ Error reprocessing email {email_id}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.put("/emails/{email_id}/classify") +async def update_classification(email_id: int, classification: str): + """Manually update email classification""" + try: + valid_classifications = [ + 'invoice', 'freight_note', 'order_confirmation', 'time_confirmation', + 'case_notification', 'customer_email', 'bankruptcy', 'general', 'spam', 'unknown' + ] + + if classification not in valid_classifications: + raise HTTPException(status_code=400, detail=f"Invalid classification. Must be one of: {valid_classifications}") + + query = """ + UPDATE email_messages + SET classification = %s, + classification_date = CURRENT_TIMESTAMP + WHERE id = %s AND deleted_at IS NULL + """ + execute_query(query, (classification, email_id)) + + return { + "success": True, + "message": f"Email {email_id} classified as '{classification}'" + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"❌ Error updating classification: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.delete("/emails/{email_id}") +async def delete_email(email_id: int): + """Soft delete email""" + try: + query = """ + UPDATE email_messages + SET deleted_at = CURRENT_TIMESTAMP + WHERE id = %s + """ + execute_query(query, (email_id,)) + + return { + "success": True, + "message": f"Email {email_id} deleted" + } + + except Exception as e: + logger.error(f"❌ Error deleting email: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +# Email Rules Endpoints +@router.get("/email-rules", response_model=List[EmailRule]) +async def list_rules(): + """Get all email rules""" + try: + query = """ + SELECT * FROM email_rules + ORDER BY priority ASC, name ASC + """ + result = execute_query(query) + return result + + except Exception as e: + logger.error(f"❌ Error listing rules: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/email-rules", response_model=EmailRule) +async def create_rule(rule: EmailRule): + """Create new email rule""" + try: + query = """ + INSERT INTO email_rules + (name, description, conditions, action_type, action_params, priority, enabled, created_by_user_id) + VALUES (%s, %s, %s, %s, %s, %s, %s, 1) + RETURNING * + """ + + import json + result = execute_query(query, ( + rule.name, + rule.description, + json.dumps(rule.conditions), + rule.action_type, + json.dumps(rule.action_params or {}), + rule.priority, + rule.enabled + )) + + if result: + logger.info(f"✅ Created email rule: {rule.name}") + return result[0] + else: + raise HTTPException(status_code=500, detail="Failed to create rule") + + except Exception as e: + logger.error(f"❌ Error creating rule: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.put("/email-rules/{rule_id}", response_model=EmailRule) +async def update_rule(rule_id: int, rule: EmailRule): + """Update existing email rule""" + try: + import json + query = """ + UPDATE email_rules + SET name = %s, + description = %s, + conditions = %s, + action_type = %s, + action_params = %s, + priority = %s, + enabled = %s + WHERE id = %s + RETURNING * + """ + + result = execute_query(query, ( + rule.name, + rule.description, + json.dumps(rule.conditions), + rule.action_type, + json.dumps(rule.action_params or {}), + rule.priority, + rule.enabled, + rule_id + )) + + if result: + logger.info(f"✅ Updated email rule {rule_id}") + return result[0] + else: + raise HTTPException(status_code=404, detail="Rule not found") + + except Exception as e: + logger.error(f"❌ Error updating rule: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.delete("/email-rules/{rule_id}") +async def delete_rule(rule_id: int): + """Delete email rule""" + try: + query = "DELETE FROM email_rules WHERE id = %s" + execute_query(query, (rule_id,)) + + return { + "success": True, + "message": f"Rule {rule_id} deleted" + } + + except Exception as e: + logger.error(f"❌ Error deleting rule: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +# Statistics Endpoint +@router.get("/emails/stats/summary") +async def get_email_stats(): + """Get email processing statistics""" + try: + query = """ + SELECT + COUNT(*) as total_emails, + COUNT(CASE WHEN status = 'new' THEN 1 END) as new_emails, + COUNT(CASE WHEN status = 'processed' THEN 1 END) as processed_emails, + COUNT(CASE WHEN classification = 'invoice' THEN 1 END) as invoices, + COUNT(CASE WHEN classification = 'time_confirmation' THEN 1 END) as time_confirmations, + COUNT(CASE WHEN classification = 'spam' THEN 1 END) as spam_emails, + COUNT(CASE WHEN auto_processed THEN 1 END) as auto_processed, + AVG(confidence_score) as avg_confidence + FROM email_messages + WHERE deleted_at IS NULL + """ + + result = execute_query(query) + return result[0] if result else {} + + except Exception as e: + logger.error(f"❌ Error getting stats: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/app/services/email_analysis_service.py b/app/services/email_analysis_service.py new file mode 100644 index 0000000..456d852 --- /dev/null +++ b/app/services/email_analysis_service.py @@ -0,0 +1,428 @@ +""" +Email Analysis Service +AI-powered email classification using Ollama LLM +Adapted from OmniSync for BMC Hub timetracking use cases +""" + +import logging +import json +from typing import Dict, Optional, List +from datetime import datetime +import aiohttp + +from app.core.config import settings +from app.core.database import execute_query, execute_insert + +logger = logging.getLogger(__name__) + + +class EmailAnalysisService: + """AI-powered email analysis and classification using Ollama""" + + def __init__(self): + self.ollama_endpoint = settings.OLLAMA_ENDPOINT + self.ollama_model = settings.OLLAMA_MODEL + self.confidence_threshold = settings.EMAIL_AI_CONFIDENCE_THRESHOLD + + async def classify_email(self, email_data: Dict) -> Dict: + """ + Classify email using AI into predefined categories + Returns: {classification: str, confidence: float, reasoning: str} + """ + + # Check cache first + cached_result = self._get_cached_classification(email_data['id']) + if cached_result: + logger.info(f"✅ Using cached classification for email {email_data['id']}") + return cached_result + + # Build system prompt (Danish for accuracy) + system_prompt = self._build_classification_prompt() + + # Build user message with email content + user_message = self._build_email_context(email_data) + + # Call Ollama + result = await self._call_ollama(system_prompt, user_message) + + if result: + # Save to cache + await self._cache_classification(email_data['id'], result) + return result + else: + # Fallback to unknown + return { + 'classification': 'unknown', + 'confidence': 0.0, + 'reasoning': 'AI classification failed' + } + + def _build_classification_prompt(self) -> str: + """Build Danish system prompt for email classification""" + return """Du er en ekspert i at klassificere danske forretningsemails. + +Din opgave er at analysere emailens indhold og klassificere den i én af følgende kategorier: + +**Kategorier:** +1. **invoice** - Fakturaer fra leverandører (inkl. kreditnotaer) +2. **freight_note** - Fragtbreve og forsendelsesbekræftelser +3. **order_confirmation** - Ordrebekræftelser fra leverandører +4. **time_confirmation** - Bekræftelser på tidsforbrug/timer (fra kunder eller interne) +5. **case_notification** - Notifikationer om sager, support tickets, opgaver +6. **customer_email** - Generelle kundehenvendelser (spørgsmål, feedback, klager) +7. **bankruptcy** - Konkursmeldinger, rekonstruktion, betalingsstandsning +8. **general** - Almindelig kommunikation (opdateringer, møder, newsletters) +9. **spam** - Spam, reklame, phishing +10. **unknown** - Kan ikke klassificeres med sikkerhed + +**Vigtige regler:** +- `invoice` skal indeholde fakturanummer, beløb, eller betalingsinformation +- `time_confirmation` indeholder timer/tidsforbrug, ofte med case/sagsreferencer +- `case_notification` er notifikationer om specifikke sager (CC0001, Case #123 osv.) +- `bankruptcy` kun hvis der er EKSPLICIT konkursmelding +- Vær konservativ: Hvis du er i tvivl, brug `general` eller `unknown` + +**Output format (JSON):** +```json +{ + "classification": "invoice", + "confidence": 0.95, + "reasoning": "Emailen indeholder fakturanummer, beløb og betalingsinstruktioner" +} +``` + +Returner KUN JSON - ingen anden tekst. +""" + + def _build_email_context(self, email_data: Dict) -> str: + """Build email context for AI analysis""" + + subject = email_data.get('subject', '') + sender = email_data.get('sender_email', '') + body = email_data.get('body_text', '') or email_data.get('body_html', '') + + # Truncate body to avoid token limits (keep first 2000 chars) + if len(body) > 2000: + body = body[:2000] + "... [truncated]" + + context = f"""**Email Information:** +From: {sender} +Subject: {subject} + +**Email Body:** +{body} + +Klassificer denne email.""" + + return context + + async def _call_ollama(self, system_prompt: str, user_message: str) -> Optional[Dict]: + """Call Ollama API for classification""" + + url = f"{self.ollama_endpoint}/api/chat" + + payload = { + "model": self.ollama_model, + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_message} + ], + "stream": False, + "options": { + "temperature": 0.1, # Low temperature for consistent classification + "num_predict": 200 # Short response expected + } + } + + try: + start_time = datetime.now() + + async with aiohttp.ClientSession() as session: + async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=30)) as response: + if response.status != 200: + error_text = await response.text() + logger.error(f"❌ Ollama API error: {response.status} - {error_text}") + return None + + data = await response.json() + content = data.get('message', {}).get('content', '') + + processing_time = (datetime.now() - start_time).total_seconds() * 1000 + + # Parse JSON response + result = self._parse_ollama_response(content) + + if result: + result['processing_time_ms'] = int(processing_time) + logger.info(f"✅ AI classification: {result['classification']} (confidence: {result['confidence']}, {processing_time:.0f}ms)") + return result + else: + logger.error(f"❌ Failed to parse Ollama response: {content[:100]}") + return None + + except asyncio.TimeoutError: + logger.error("❌ Ollama API timeout (30s)") + return None + except Exception as e: + logger.error(f"❌ Error calling Ollama API: {e}") + return None + + def _parse_ollama_response(self, content: str) -> Optional[Dict]: + """Parse Ollama JSON response""" + try: + # Extract JSON from response (handle markdown code blocks) + if '```json' in content: + start = content.find('```json') + 7 + end = content.find('```', start) + json_str = content[start:end].strip() + elif '```' in content: + start = content.find('```') + 3 + end = content.find('```', start) + json_str = content[start:end].strip() + else: + json_str = content.strip() + + # Parse JSON + data = json.loads(json_str) + + # Validate required fields + if 'classification' not in data: + logger.error("❌ Missing 'classification' field in AI response") + return None + + # Normalize and validate + classification = data['classification'].lower() + confidence = float(data.get('confidence', 0.0)) + reasoning = data.get('reasoning', '') + + # Validate classification category + valid_categories = [ + 'invoice', 'freight_note', 'order_confirmation', 'time_confirmation', + 'case_notification', 'customer_email', 'bankruptcy', 'general', 'spam', 'unknown' + ] + + if classification not in valid_categories: + logger.warning(f"⚠️ Unknown classification '{classification}', defaulting to 'unknown'") + classification = 'unknown' + + return { + 'classification': classification, + 'confidence': confidence, + 'reasoning': reasoning + } + + except json.JSONDecodeError as e: + logger.error(f"❌ JSON parse error: {e} - Content: {content[:200]}") + return None + except Exception as e: + logger.error(f"❌ Error parsing Ollama response: {e}") + return None + + def _get_cached_classification(self, email_id: int) -> Optional[Dict]: + """Get cached classification from database""" + query = """ + SELECT result_json, confidence_score, processing_time_ms + FROM email_analysis + WHERE email_id = %s AND analysis_type = 'classification' + """ + result = execute_query(query, (email_id,)) + + if result: + row = result[0] + return { + 'classification': row['result_json'].get('classification'), + 'confidence': float(row['confidence_score']), + 'reasoning': row['result_json'].get('reasoning', ''), + 'processing_time_ms': row['processing_time_ms'], + 'cached': True + } + + return None + + async def _cache_classification(self, email_id: int, result: Dict): + """Save classification result to cache""" + try: + query = """ + INSERT INTO email_analysis + (email_id, analysis_type, result_json, confidence_score, model_used, processing_time_ms) + VALUES (%s, 'classification', %s, %s, %s, %s) + ON CONFLICT (email_id, analysis_type) + DO UPDATE SET + result_json = EXCLUDED.result_json, + confidence_score = EXCLUDED.confidence_score, + processing_time_ms = EXCLUDED.processing_time_ms, + created_at = CURRENT_TIMESTAMP + """ + + result_json = json.dumps({ + 'classification': result['classification'], + 'reasoning': result.get('reasoning', '') + }) + + execute_query(query, ( + email_id, + result_json, + result['confidence'], + self.ollama_model, + result.get('processing_time_ms', 0) + )) + + logger.info(f"✅ Cached classification for email {email_id}") + + except Exception as e: + logger.error(f"❌ Error caching classification: {e}") + + async def extract_invoice_details(self, email_data: Dict) -> Optional[Dict]: + """ + Extract structured data from invoice emails + Returns: {invoice_number, amount, due_date, vendor_name, ...} + """ + + # Only run for invoice-classified emails + if email_data.get('classification') != 'invoice': + return None + + # Check cache + cached_result = self._get_cached_extraction(email_data['id']) + if cached_result: + logger.info(f"✅ Using cached extraction for email {email_data['id']}") + return cached_result + + # Build extraction prompt + system_prompt = self._build_extraction_prompt() + user_message = self._build_email_context(email_data) + + # Call Ollama + result = await self._call_ollama_extraction(system_prompt, user_message) + + if result: + # Save to cache + await self._cache_extraction(email_data['id'], result) + return result + + return None + + def _build_extraction_prompt(self) -> str: + """Build Danish system prompt for invoice data extraction""" + return """Du er en ekspert i at udtrække struktureret data fra danske fakturaer. + +Din opgave er at finde og udtrække følgende information fra emailen: + +**Felter at udtrække:** +- `invoice_number` (string) - Fakturanummer +- `amount` (decimal) - Fakturabeløb i DKK (uden valutasymbol) +- `due_date` (string YYYY-MM-DD) - Forfaldsdato +- `vendor_name` (string) - Leverandørens navn +- `order_number` (string) - Ordrenummer (hvis angivet) +- `cvr_number` (string) - CVR-nummer (hvis angivet) + +**Vigtige regler:** +- Hvis et felt ikke findes, brug `null` +- Beløb skal være numerisk (uden "kr", "DKK" osv.) +- Datoer skal være i formatet YYYY-MM-DD +- Vær præcis - returner kun data du er sikker på + +**Output format (JSON):** +```json +{ + "invoice_number": "INV-2024-001", + "amount": 5250.00, + "due_date": "2025-01-15", + "vendor_name": "Acme Leverandør A/S", + "order_number": "ORD-123", + "cvr_number": "12345678" +} +``` + +Returner KUN JSON - ingen anden tekst. +""" + + async def _call_ollama_extraction(self, system_prompt: str, user_message: str) -> Optional[Dict]: + """Call Ollama for data extraction""" + + url = f"{self.ollama_endpoint}/api/chat" + + payload = { + "model": self.ollama_model, + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_message} + ], + "stream": False, + "options": { + "temperature": 0.0, # Zero temperature for deterministic extraction + "num_predict": 300 + } + } + + try: + async with aiohttp.ClientSession() as session: + async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=30)) as response: + if response.status != 200: + return None + + data = await response.json() + content = data.get('message', {}).get('content', '') + + # Parse JSON response + result = self._parse_extraction_response(content) + return result + + except Exception as e: + logger.error(f"❌ Error calling Ollama for extraction: {e}") + return None + + def _parse_extraction_response(self, content: str) -> Optional[Dict]: + """Parse Ollama extraction JSON response""" + try: + # Extract JSON + if '```json' in content: + start = content.find('```json') + 7 + end = content.find('```', start) + json_str = content[start:end].strip() + elif '```' in content: + start = content.find('```') + 3 + end = content.find('```', start) + json_str = content[start:end].strip() + else: + json_str = content.strip() + + data = json.loads(json_str) + return data + + except Exception as e: + logger.error(f"❌ Error parsing extraction response: {e}") + return None + + def _get_cached_extraction(self, email_id: int) -> Optional[Dict]: + """Get cached extraction from database""" + query = """ + SELECT result_json + FROM email_analysis + WHERE email_id = %s AND analysis_type = 'extraction' + """ + result = execute_query(query, (email_id,)) + + if result: + return result[0]['result_json'] + + return None + + async def _cache_extraction(self, email_id: int, result: Dict): + """Save extraction result to cache""" + try: + query = """ + INSERT INTO email_analysis + (email_id, analysis_type, result_json, model_used) + VALUES (%s, 'extraction', %s, %s) + ON CONFLICT (email_id, analysis_type) + DO UPDATE SET result_json = EXCLUDED.result_json + """ + + result_json = json.dumps(result) + execute_query(query, (email_id, result_json, self.ollama_model)) + + logger.info(f"✅ Cached extraction for email {email_id}") + + except Exception as e: + logger.error(f"❌ Error caching extraction: {e}") diff --git a/app/services/email_processor_service.py b/app/services/email_processor_service.py new file mode 100644 index 0000000..ea345c0 --- /dev/null +++ b/app/services/email_processor_service.py @@ -0,0 +1,434 @@ +""" +Email Processor Service +Main orchestrator for email workflow: Fetch → Store → Classify → Match Rules → Link Entities +Based on OmniSync architecture adapted for BMC Hub +""" + +import logging +from typing import List, Dict, Optional +from datetime import datetime + +from app.services.email_service import EmailService +from app.services.email_analysis_service import EmailAnalysisService +from app.core.config import settings +from app.core.database import execute_query + +logger = logging.getLogger(__name__) + + +class EmailProcessorService: + """Main orchestrator for email processing pipeline""" + + def __init__(self): + self.email_service = EmailService() + self.analysis_service = EmailAnalysisService() + self.enabled = settings.EMAIL_TO_TICKET_ENABLED + self.rules_enabled = settings.EMAIL_RULES_ENABLED + self.auto_process = settings.EMAIL_RULES_AUTO_PROCESS + + async def process_inbox(self) -> Dict: + """ + Main entry point: Process all new emails from inbox + Returns: Processing statistics + """ + + if not self.enabled: + logger.info("⏭️ Email processing disabled (EMAIL_TO_TICKET_ENABLED=false)") + return {'status': 'disabled'} + + logger.info("🔄 Starting email processing cycle...") + + stats = { + 'fetched': 0, + 'saved': 0, + 'classified': 0, + 'rules_matched': 0, + 'errors': 0 + } + + try: + # Step 1: Fetch new emails + limit = settings.EMAIL_MAX_FETCH_PER_RUN + new_emails = await self.email_service.fetch_new_emails(limit=limit) + stats['fetched'] = len(new_emails) + + if not new_emails: + logger.info("✅ No new emails to process") + return stats + + logger.info(f"📥 Fetched {len(new_emails)} new emails") + + # Step 2: Save emails to database + for email_data in new_emails: + try: + email_id = await self.email_service.save_email(email_data) + + if email_id: + email_data['id'] = email_id + stats['saved'] += 1 + + # Step 3: Classify email with AI + if settings.EMAIL_AI_ENABLED and settings.EMAIL_AUTO_CLASSIFY: + await self._classify_and_update(email_data) + stats['classified'] += 1 + + # Step 4: Match against rules + if self.rules_enabled: + matched = await self._match_rules(email_data) + if matched: + stats['rules_matched'] += 1 + + except Exception as e: + logger.error(f"❌ Error processing email: {e}") + stats['errors'] += 1 + + logger.info(f"✅ Email processing complete: {stats}") + return stats + + except Exception as e: + logger.error(f"❌ Email processing failed: {e}") + stats['errors'] += 1 + return stats + + async def _classify_and_update(self, email_data: Dict): + """Classify email and update database""" + try: + # Run AI classification + result = await self.analysis_service.classify_email(email_data) + + classification = result.get('classification', 'unknown') + confidence = result.get('confidence', 0.0) + + # Update email record + query = """ + UPDATE email_messages + SET classification = %s, + confidence_score = %s, + classification_date = CURRENT_TIMESTAMP + WHERE id = %s + """ + execute_query(query, (classification, confidence, email_data['id'])) + + logger.info(f"✅ Classified email {email_data['id']} as '{classification}' (confidence: {confidence:.2f})") + + # Update email_data for rule matching + email_data['classification'] = classification + email_data['confidence_score'] = confidence + + # Extract invoice details if classified as invoice + if classification == 'invoice' and confidence >= settings.EMAIL_AI_CONFIDENCE_THRESHOLD: + extraction = await self.analysis_service.extract_invoice_details(email_data) + + if extraction: + await self._update_extracted_fields(email_data['id'], extraction) + + except Exception as e: + logger.error(f"❌ Classification failed for email {email_data['id']}: {e}") + + async def _update_extracted_fields(self, email_id: int, extraction: Dict): + """Update email with extracted invoice fields""" + try: + query = """ + UPDATE email_messages + SET extracted_invoice_number = %s, + extracted_amount = %s, + extracted_due_date = %s + WHERE id = %s + """ + + execute_query(query, ( + extraction.get('invoice_number'), + extraction.get('amount'), + extraction.get('due_date'), + email_id + )) + + logger.info(f"✅ Updated extracted fields for email {email_id}") + + except Exception as e: + logger.error(f"❌ Error updating extracted fields: {e}") + + async def _match_rules(self, email_data: Dict) -> bool: + """ + Match email against active rules and execute actions + Returns True if rule was matched + """ + try: + # Get active rules ordered by priority + rules = self._get_active_rules() + + if not rules: + return False + + for rule in rules: + if self._rule_matches(email_data, rule): + logger.info(f"✅ Email {email_data['id']} matched rule: {rule['name']}") + + # Update email with rule_id + query = "UPDATE email_messages SET rule_id = %s WHERE id = %s" + execute_query(query, (rule['id'], email_data['id'])) + + # Update rule statistics + self._update_rule_stats(rule['id']) + + # Execute rule action (if auto-process enabled) + if self.auto_process: + await self._execute_rule_action(email_data, rule) + else: + logger.info(f"⏭️ Auto-process disabled - rule action not executed") + + return True # First matching rule wins + + return False + + except Exception as e: + logger.error(f"❌ Error matching rules: {e}") + return False + + def _get_active_rules(self) -> List[Dict]: + """Get all enabled rules ordered by priority""" + query = """ + SELECT * FROM email_rules + WHERE enabled = true + ORDER BY priority ASC + """ + return execute_query(query) + + def _rule_matches(self, email_data: Dict, rule: Dict) -> bool: + """ + Check if email matches rule conditions + Rule conditions format: {"sender_email": "x@y.com", "classification": "invoice", ...} + """ + try: + conditions = rule['conditions'] + + # Check sender_email + if 'sender_email' in conditions: + if email_data.get('sender_email') != conditions['sender_email']: + return False + + # Check sender_domain + if 'sender_domain' in conditions: + sender_email = email_data.get('sender_email', '') + if '@' in sender_email: + domain = sender_email.split('@')[1] + if domain not in conditions['sender_domain']: + return False + else: + return False + + # Check classification + if 'classification' in conditions: + if email_data.get('classification') != conditions['classification']: + return False + + # Check subject_contains + if 'subject_contains' in conditions: + subject = email_data.get('subject', '').lower() + keywords = conditions['subject_contains'] + + if isinstance(keywords, list): + if not any(kw.lower() in subject for kw in keywords): + return False + elif isinstance(keywords, str): + if keywords.lower() not in subject: + return False + + # Check subject_regex (advanced pattern matching) + if 'subject_regex' in conditions: + import re + subject = email_data.get('subject', '') + pattern = conditions['subject_regex'] + + if not re.search(pattern, subject, re.IGNORECASE): + return False + + # All conditions matched + return True + + except Exception as e: + logger.error(f"❌ Error checking rule conditions: {e}") + return False + + async def _execute_rule_action(self, email_data: Dict, rule: Dict): + """Execute rule action (link_supplier, create_purchase, link_case, etc.)""" + try: + action_type = rule['action_type'] + action_params = rule.get('action_params', {}) + + logger.info(f"🚀 Executing rule action: {action_type} for email {email_data['id']}") + + if action_type == 'mark_spam': + await self._mark_as_spam(email_data['id']) + + elif action_type == 'link_supplier': + await self._link_to_supplier(email_data, action_params) + + elif action_type == 'link_customer': + await self._link_to_customer(email_data, action_params) + + elif action_type == 'link_case': + await self._link_to_case(email_data, action_params) + + elif action_type == 'create_purchase': + logger.info(f"⏭️ Purchase creation not implemented yet") + + else: + logger.warning(f"⚠️ Unknown action type: {action_type}") + + # Mark email as auto-processed + query = "UPDATE email_messages SET auto_processed = true WHERE id = %s" + execute_query(query, (email_data['id'],)) + + except Exception as e: + logger.error(f"❌ Error executing rule action: {e}") + + async def _mark_as_spam(self, email_id: int): + """Mark email as spam""" + query = "UPDATE email_messages SET classification = 'spam', status = 'processed' WHERE id = %s" + execute_query(query, (email_id,)) + logger.info(f"✅ Marked email {email_id} as spam") + + async def _link_to_supplier(self, email_data: Dict, params: Dict): + """Link email to supplier/vendor""" + try: + # Auto-match supplier by email domain + if params.get('auto_match_domain'): + sender_email = email_data.get('sender_email', '') + + if '@' in sender_email: + domain = sender_email.split('@')[1] + + # Find vendor by domain + query = """ + SELECT id, name FROM vendors + WHERE email LIKE %s OR contact_email LIKE %s + LIMIT 1 + """ + result = execute_query(query, (f'%{domain}%', f'%{domain}%')) + + if result: + vendor_id = result[0]['id'] + vendor_name = result[0]['name'] + + # Link email to vendor + query = "UPDATE email_messages SET supplier_id = %s WHERE id = %s" + execute_query(query, (vendor_id, email_data['id'])) + + logger.info(f"✅ Linked email {email_data['id']} to vendor {vendor_name}") + return + + # Manual supplier_id from params + elif 'supplier_id' in params: + vendor_id = params['supplier_id'] + query = "UPDATE email_messages SET supplier_id = %s WHERE id = %s" + execute_query(query, (vendor_id, email_data['id'])) + logger.info(f"✅ Linked email {email_data['id']} to vendor {vendor_id}") + + except Exception as e: + logger.error(f"❌ Error linking to supplier: {e}") + + async def _link_to_customer(self, email_data: Dict, params: Dict): + """Link email to customer""" + try: + # Auto-match customer by email domain + if params.get('auto_match_domain'): + sender_email = email_data.get('sender_email', '') + + if '@' in sender_email: + domain = sender_email.split('@')[1] + + # Find customer by domain + query = """ + SELECT id, customer_name FROM tmodule_customers + WHERE email LIKE %s + LIMIT 1 + """ + result = execute_query(query, (f'%{domain}%',)) + + if result: + customer_id = result[0]['id'] + customer_name = result[0]['customer_name'] + + # Link email to customer + query = "UPDATE email_messages SET customer_id = %s WHERE id = %s" + execute_query(query, (customer_id, email_data['id'])) + + logger.info(f"✅ Linked email {email_data['id']} to customer {customer_name}") + + except Exception as e: + logger.error(f"❌ Error linking to customer: {e}") + + async def _link_to_case(self, email_data: Dict, params: Dict): + """Link email to timetracking case""" + try: + # Extract case number from subject (e.g., "CC0042", "Case #123") + if params.get('extract_case_from_subject'): + import re + subject = email_data.get('subject', '') + + # Match patterns like CC0001, CC0042, etc. + match = re.search(r'CC(\d{4})', subject, re.IGNORECASE) + + if match: + case_number = f"CC{match.group(1)}" + + # Find case by case_number + query = """ + SELECT id, title FROM tmodule_cases + WHERE case_number = %s + LIMIT 1 + """ + result = execute_query(query, (case_number,)) + + if result: + case_id = result[0]['id'] + case_title = result[0]['title'] + + # Link email to case + query = "UPDATE email_messages SET linked_case_id = %s WHERE id = %s" + execute_query(query, (case_id, email_data['id'])) + + logger.info(f"✅ Linked email {email_data['id']} to case {case_number}: {case_title}") + return + + logger.info(f"⚠️ No case number found in subject: {subject}") + + except Exception as e: + logger.error(f"❌ Error linking to case: {e}") + + def _update_rule_stats(self, rule_id: int): + """Update rule match statistics""" + query = """ + UPDATE email_rules + SET match_count = match_count + 1, + last_matched_at = CURRENT_TIMESTAMP + WHERE id = %s + """ + execute_query(query, (rule_id,)) + + async def reprocess_email(self, email_id: int): + """Manually reprocess a single email""" + try: + # Get email from database + query = "SELECT * FROM email_messages WHERE id = %s" + result = execute_query(query, (email_id,)) + + if not result: + logger.error(f"❌ Email {email_id} not found") + return + + email_data = result[0] + + # Reclassify + if settings.EMAIL_AI_ENABLED: + await self._classify_and_update(email_data) + + # Rematch rules + if self.rules_enabled: + await self._match_rules(email_data) + + logger.info(f"✅ Reprocessed email {email_id}") + + except Exception as e: + logger.error(f"❌ Error reprocessing email {email_id}: {e}") diff --git a/app/services/email_scheduler.py b/app/services/email_scheduler.py new file mode 100644 index 0000000..da716b5 --- /dev/null +++ b/app/services/email_scheduler.py @@ -0,0 +1,77 @@ +""" +Email Scheduler +Background job that runs every 5 minutes to fetch and process emails +Based on OmniSync scheduler with APScheduler +""" + +import logging +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.interval import IntervalTrigger +from datetime import datetime + +from app.core.config import settings +from app.services.email_processor_service import EmailProcessorService + +logger = logging.getLogger(__name__) + + +class EmailScheduler: + """Background scheduler for email processing""" + + def __init__(self): + self.scheduler = AsyncIOScheduler() + self.processor = EmailProcessorService() + self.enabled = settings.EMAIL_TO_TICKET_ENABLED + self.interval_minutes = settings.EMAIL_PROCESS_INTERVAL_MINUTES + + def start(self): + """Start the background scheduler""" + if not self.enabled: + logger.info("⏭️ Email scheduler disabled (EMAIL_TO_TICKET_ENABLED=false)") + return + + logger.info(f"🚀 Starting email scheduler (interval: {self.interval_minutes} minutes)") + + # Add job with interval trigger + self.scheduler.add_job( + func=self._process_emails_job, + trigger=IntervalTrigger(minutes=self.interval_minutes), + id='email_processor', + name='Email Processing Job', + max_instances=1, # Prevent overlapping runs + replace_existing=True + ) + + self.scheduler.start() + logger.info("✅ Email scheduler started successfully") + + def stop(self): + """Stop the scheduler""" + if self.scheduler.running: + self.scheduler.shutdown(wait=True) + logger.info("👋 Email scheduler stopped") + + async def _process_emails_job(self): + """Job function that processes emails""" + try: + logger.info("🔄 Email processing job started...") + + start_time = datetime.now() + stats = await self.processor.process_inbox() + + duration = (datetime.now() - start_time).total_seconds() + + logger.info(f"✅ Email processing complete: {stats} (duration: {duration:.1f}s)") + + except Exception as e: + logger.error(f"❌ Email processing job failed: {e}") + + def run_manual(self): + """Manually trigger email processing (for testing)""" + logger.info("🚀 Manual email processing triggered") + import asyncio + asyncio.create_task(self._process_emails_job()) + + +# Global scheduler instance +email_scheduler = EmailScheduler() diff --git a/app/services/email_service.py b/app/services/email_service.py new file mode 100644 index 0000000..41039fd --- /dev/null +++ b/app/services/email_service.py @@ -0,0 +1,449 @@ +""" +Email Service +Handles email fetching from IMAP or Microsoft Graph API +Based on OmniSync architecture - READ-ONLY mode for safety +""" + +import logging +import imaplib +import email +from email.header import decode_header +from typing import List, Dict, Optional, Tuple +from datetime import datetime +import json +import asyncio +from aiohttp import ClientSession, BasicAuth +import msal + +from app.core.config import settings +from app.core.database import execute_query, execute_insert + +logger = logging.getLogger(__name__) + + +class EmailService: + """Email fetching service with IMAP and Graph API support""" + + def __init__(self): + self.use_graph = settings.USE_GRAPH_API + self.imap_config = { + 'server': settings.IMAP_SERVER, + 'port': settings.IMAP_PORT, + 'username': settings.IMAP_USERNAME, + 'password': settings.IMAP_PASSWORD, + 'use_ssl': settings.IMAP_USE_SSL, + 'folder': settings.IMAP_FOLDER, + 'readonly': settings.IMAP_READ_ONLY + } + self.graph_config = { + 'tenant_id': settings.GRAPH_TENANT_ID, + 'client_id': settings.GRAPH_CLIENT_ID, + 'client_secret': settings.GRAPH_CLIENT_SECRET, + 'user_email': settings.GRAPH_USER_EMAIL + } + + async def fetch_new_emails(self, limit: int = 50) -> List[Dict]: + """ + Fetch new emails from configured source (IMAP or Graph API) + Returns list of parsed email dictionaries + """ + if self.use_graph and self.graph_config['client_id']: + logger.info("📥 Fetching emails via Microsoft Graph API") + return await self._fetch_via_graph(limit) + elif self.imap_config['username']: + logger.info("📥 Fetching emails via IMAP") + return await self._fetch_via_imap(limit) + else: + logger.warning("⚠️ No email source configured (IMAP or Graph API)") + return [] + + async def _fetch_via_imap(self, limit: int) -> List[Dict]: + """Fetch emails using IMAP protocol (READ-ONLY mode)""" + emails = [] + + try: + # Connect to IMAP server + if self.imap_config['use_ssl']: + mail = imaplib.IMAP4_SSL(self.imap_config['server'], self.imap_config['port']) + else: + mail = imaplib.IMAP4(self.imap_config['server'], self.imap_config['port']) + + # Login + mail.login(self.imap_config['username'], self.imap_config['password']) + + # Select folder in READ-ONLY mode (critical for safety) + folder = self.imap_config['folder'] + readonly = self.imap_config['readonly'] + mail.select(folder, readonly=readonly) + + if readonly: + logger.info(f"🔒 Connected to {folder} in READ-ONLY mode (emails will NOT be marked as read)") + + # Search for all emails + status, messages = mail.search(None, 'ALL') + + if status != 'OK': + logger.error(f"❌ IMAP search failed: {status}") + return emails + + email_ids = messages[0].split() + total_emails = len(email_ids) + + logger.info(f"📊 Found {total_emails} emails in {folder}") + + # Get most recent emails (reverse order, limit) + email_ids_to_fetch = email_ids[-limit:] if len(email_ids) > limit else email_ids + email_ids_to_fetch.reverse() # Newest first + + for email_id in email_ids_to_fetch: + try: + # Fetch email using BODY.PEEK to avoid marking as read + status, msg_data = mail.fetch(email_id, '(BODY.PEEK[])') + + if status != 'OK': + logger.warning(f"⚠️ Failed to fetch email {email_id}: {status}") + continue + + # Parse email + raw_email = msg_data[0][1] + msg = email.message_from_bytes(raw_email) + + # Extract fields + parsed_email = self._parse_email(msg, email_id.decode()) + + # Check if already exists in database + if not self._email_exists(parsed_email['message_id']): + emails.append(parsed_email) + logger.info(f"✅ New email: {parsed_email['subject'][:50]}... from {parsed_email['sender_email']}") + else: + logger.debug(f"⏭️ Email already exists: {parsed_email['message_id']}") + + except Exception as e: + logger.error(f"❌ Error parsing email {email_id}: {e}") + continue + + # Logout + mail.logout() + + logger.info(f"📥 Fetched {len(emails)} new emails via IMAP") + return emails + + except imaplib.IMAP4.error as e: + logger.error(f"❌ IMAP error: {e}") + return [] + except Exception as e: + logger.error(f"❌ Unexpected error fetching via IMAP: {e}") + return [] + + async def _fetch_via_graph(self, limit: int) -> List[Dict]: + """Fetch emails using Microsoft Graph API (OAuth2)""" + emails = [] + + try: + # Get access token using MSAL + access_token = await self._get_graph_access_token() + + if not access_token: + logger.error("❌ Failed to get Graph API access token") + return [] + + # Build Graph API request + user_email = self.graph_config['user_email'] + folder = self.imap_config['folder'] # Use same folder name + + # Graph API endpoint for messages + url = f"https://graph.microsoft.com/v1.0/users/{user_email}/mailFolders/{folder}/messages" + params = { + '$top': limit, + '$orderby': 'receivedDateTime desc', + '$select': 'id,subject,from,toRecipients,ccRecipients,receivedDateTime,bodyPreview,body,hasAttachments,internetMessageId' + } + + headers = { + 'Authorization': f'Bearer {access_token}', + 'Content-Type': 'application/json' + } + + async with ClientSession() as session: + async with session.get(url, params=params, headers=headers) as response: + if response.status != 200: + error_text = await response.text() + logger.error(f"❌ Graph API error: {response.status} - {error_text}") + return [] + + data = await response.json() + messages = data.get('value', []) + + logger.info(f"📊 Found {len(messages)} emails via Graph API") + + for msg in messages: + try: + parsed_email = self._parse_graph_message(msg) + + # Check if already exists + if not self._email_exists(parsed_email['message_id']): + emails.append(parsed_email) + logger.info(f"✅ New email: {parsed_email['subject'][:50]}... from {parsed_email['sender_email']}") + else: + logger.debug(f"⏭️ Email already exists: {parsed_email['message_id']}") + + except Exception as e: + logger.error(f"❌ Error parsing Graph message: {e}") + continue + + logger.info(f"📥 Fetched {len(emails)} new emails via Graph API") + return emails + + except Exception as e: + logger.error(f"❌ Unexpected error fetching via Graph API: {e}") + return [] + + async def _get_graph_access_token(self) -> Optional[str]: + """Get OAuth2 access token for Microsoft Graph API using MSAL""" + try: + authority = f"https://login.microsoftonline.com/{self.graph_config['tenant_id']}" + + app = msal.ConfidentialClientApplication( + self.graph_config['client_id'], + authority=authority, + client_credential=self.graph_config['client_secret'] + ) + + # Request token with Mail.Read scope (Application permission) + scopes = ["https://graph.microsoft.com/.default"] + result = app.acquire_token_for_client(scopes=scopes) + + if "access_token" in result: + logger.info("✅ Successfully obtained Graph API access token") + return result["access_token"] + else: + error = result.get("error_description", result.get("error", "Unknown error")) + logger.error(f"❌ Failed to obtain access token: {error}") + return None + + except Exception as e: + logger.error(f"❌ Error getting Graph access token: {e}") + return None + + def _parse_email(self, msg: email.message.Message, email_id: str) -> Dict: + """Parse IMAP email message into dictionary""" + + # Decode subject + subject = self._decode_header(msg.get('Subject', '')) + + # Decode sender + from_header = self._decode_header(msg.get('From', '')) + sender_name, sender_email = self._parse_email_address(from_header) + + # Decode recipient + to_header = self._decode_header(msg.get('To', '')) + recipient_name, recipient_email = self._parse_email_address(to_header) + + # Decode CC + cc_header = self._decode_header(msg.get('Cc', '')) + + # Get message ID + message_id = msg.get('Message-ID', f"imap-{email_id}") + + # Get date + date_str = msg.get('Date', '') + received_date = self._parse_email_date(date_str) + + # Get body + body_text = "" + body_html = "" + + if msg.is_multipart(): + for part in msg.walk(): + content_type = part.get_content_type() + + if content_type == "text/plain": + try: + body_text = part.get_payload(decode=True).decode('utf-8', errors='ignore') + except Exception: + pass + + elif content_type == "text/html": + try: + body_html = part.get_payload(decode=True).decode('utf-8', errors='ignore') + except Exception: + pass + else: + try: + body_text = msg.get_payload(decode=True).decode('utf-8', errors='ignore') + except Exception: + body_text = str(msg.get_payload()) + + # Check for attachments + has_attachments = False + attachment_count = 0 + + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_maintype() == 'multipart': + continue + if part.get('Content-Disposition') is not None: + has_attachments = True + attachment_count += 1 + + return { + 'message_id': message_id, + 'subject': subject, + 'sender_name': sender_name, + 'sender_email': sender_email, + 'recipient_email': recipient_email, + 'cc': cc_header, + 'body_text': body_text, + 'body_html': body_html, + 'received_date': received_date, + 'folder': self.imap_config['folder'], + 'has_attachments': has_attachments, + 'attachment_count': attachment_count + } + + def _parse_graph_message(self, msg: Dict) -> Dict: + """Parse Microsoft Graph API message into dictionary""" + + # Extract sender + from_data = msg.get('from', {}).get('emailAddress', {}) + sender_name = from_data.get('name', '') + sender_email = from_data.get('address', '') + + # Extract recipient (first TO recipient) + to_recipients = msg.get('toRecipients', []) + recipient_email = to_recipients[0]['emailAddress']['address'] if to_recipients else '' + + # Extract CC recipients + cc_recipients = msg.get('ccRecipients', []) + cc = ', '.join([r['emailAddress']['address'] for r in cc_recipients]) + + # Get body + body_data = msg.get('body', {}) + body_content = body_data.get('content', '') + body_type = body_data.get('contentType', 'text') + + body_text = body_content if body_type == 'text' else '' + body_html = body_content if body_type == 'html' else '' + + # Parse date + received_date_str = msg.get('receivedDateTime', '') + received_date = datetime.fromisoformat(received_date_str.replace('Z', '+00:00')) if received_date_str else datetime.now() + + return { + 'message_id': msg.get('internetMessageId', msg.get('id', '')), + 'subject': msg.get('subject', ''), + 'sender_name': sender_name, + 'sender_email': sender_email, + 'recipient_email': recipient_email, + 'cc': cc, + 'body_text': body_text, + 'body_html': body_html, + 'received_date': received_date, + 'folder': self.imap_config['folder'], + 'has_attachments': msg.get('hasAttachments', False), + 'attachment_count': 0 # TODO: Fetch attachment count from Graph API if needed + } + + def _decode_header(self, header: str) -> str: + """Decode email header (handles MIME encoding)""" + if not header: + return "" + + decoded_parts = decode_header(header) + decoded_string = "" + + for content, encoding in decoded_parts: + if isinstance(content, bytes): + try: + decoded_string += content.decode(encoding or 'utf-8', errors='ignore') + except Exception: + decoded_string += content.decode('utf-8', errors='ignore') + else: + decoded_string += str(content) + + return decoded_string.strip() + + def _parse_email_address(self, header: str) -> Tuple[str, str]: + """Parse 'Name ' into (name, email)""" + if not header: + return ("", "") + + if '<' in header and '>' in header: + # Format: "Name " + parts = header.split('<') + name = parts[0].strip().strip('"') + email_addr = parts[1].strip('>').strip() + return (name, email_addr) + else: + # Just email address + return ("", header.strip()) + + def _parse_email_date(self, date_str: str) -> datetime: + """Parse email date header into datetime object""" + if not date_str: + return datetime.now() + + try: + # Use email.utils to parse RFC 2822 date + from email.utils import parsedate_to_datetime + return parsedate_to_datetime(date_str) + except Exception: + logger.warning(f"⚠️ Failed to parse date: {date_str}") + return datetime.now() + + def _email_exists(self, message_id: str) -> bool: + """Check if email already exists in database""" + query = "SELECT id FROM email_messages WHERE message_id = %s AND deleted_at IS NULL" + result = execute_query(query, (message_id,)) + return len(result) > 0 + + async def save_email(self, email_data: Dict) -> Optional[int]: + """Save email to database""" + try: + query = """ + INSERT INTO email_messages + (message_id, subject, sender_email, sender_name, recipient_email, cc, + body_text, body_html, received_date, folder, has_attachments, attachment_count, + status, is_read) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'new', false) + RETURNING id + """ + + email_id = execute_insert(query, ( + email_data['message_id'], + email_data['subject'], + email_data['sender_email'], + email_data['sender_name'], + email_data['recipient_email'], + email_data['cc'], + email_data['body_text'], + email_data['body_html'], + email_data['received_date'], + email_data['folder'], + email_data['has_attachments'], + email_data['attachment_count'] + )) + + logger.info(f"✅ Saved email {email_id}: {email_data['subject'][:50]}...") + return email_id + + except Exception as e: + logger.error(f"❌ Error saving email to database: {e}") + return None + + async def get_unprocessed_emails(self, limit: int = 100) -> List[Dict]: + """Get emails from database that haven't been processed yet""" + query = """ + SELECT * FROM email_messages + WHERE status = 'new' AND deleted_at IS NULL + ORDER BY received_date DESC + LIMIT %s + """ + result = execute_query(query, (limit,)) + return result + + async def update_email_status(self, email_id: int, status: str): + """Update email processing status""" + query = "UPDATE email_messages SET status = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s" + execute_query(query, (status, email_id)) + logger.info(f"✅ Updated email {email_id} status to: {status}") diff --git a/app/timetracking/backend/economic_export.py b/app/timetracking/backend/economic_export.py index dd0f39c..26dc23d 100644 --- a/app/timetracking/backend/economic_export.py +++ b/app/timetracking/backend/economic_export.py @@ -130,6 +130,13 @@ class EconomicExportService: if not order: raise HTTPException(status_code=404, detail="Order not found") + # Check if order is posted (locked) + if order['status'] == 'posted': + raise HTTPException( + status_code=403, + detail=f"Ordre er bogført til e-conomic og kan ikke ændres. e-conomic ordre nr.: {order.get('economic_order_number')}" + ) + # Check if already exported if order['economic_draft_id'] and not request.force: raise HTTPException( @@ -227,7 +234,7 @@ class EconomicExportService: "paymentTermsNumber": 1 # Default payment terms }, "layout": { - "layoutNumber": 19 # Default layout + "layoutNumber": 21 # DK. std. m. bankoplys. 1.8 }, "notes": { "heading": f"Tidsregistrering - {order['order_number']}" @@ -241,24 +248,45 @@ class EconomicExportService: # Build order lines for idx, line in enumerate(lines, start=1): + # Format: "CC0042. 3 timer 1200,- 3600 / Fejlsøgning / 27.05.2025 - Kontaktnavn" + # Extract case number and title from existing description + desc_parts = line['description'].split(' - ', 1) + case_number = desc_parts[0] if desc_parts else "" + case_title = desc_parts[1] if len(desc_parts) > 1 else line['description'] + + # Build formatted description + hours = float(line['quantity']) + price = float(line['unit_price']) + total = hours * price + + # Format date (Danish format DD.MM.YYYY) + date_str = "" + if line.get('time_date'): + time_date = line['time_date'] + if isinstance(time_date, str): + from datetime import datetime + time_date = datetime.fromisoformat(time_date).date() + date_str = time_date.strftime("%d.%m.%Y") + + # Build description + contact_part = f" - {line['case_contact']}" if line.get('case_contact') else "" + travel_marker = " - (Udkørsel)" if line.get('is_travel') else "" + formatted_desc = f"{case_number}. {hours} timer {price:,.0f},- {total:,.0f} / {case_title} / {date_str}{contact_part}{travel_marker}" + economic_line = { "lineNumber": idx, "sortKey": idx, - "description": line['description'], - "quantity": float(line['quantity']), - "unitNetPrice": float(line['unit_price']), + "description": formatted_desc, + "quantity": hours, + "unitNetPrice": price, + "product": { + "productNumber": line.get('product_number') or 'TIME001' # Default til Konsulenttime + }, "unit": { - "unitNumber": 1 # Default unit (stk/pcs) + "unitNumber": 2 # timer (unit 2 in e-conomic) } } - # Add product if specified - if line.get('product_number'): - product_number = str(line['product_number'])[:25] # Max 25 chars - economic_line['product'] = { - "productNumber": product_number - } - # Add discount if present if line.get('discount_percentage'): economic_line['discountPercentage'] = float(line['discount_percentage']) @@ -316,21 +344,35 @@ class EconomicExportService: result_data = await response.json() logger.info(f"✅ e-conomic response: {json.dumps(result_data, indent=2, default=str)}") - economic_draft_id = result_data.get('draftOrderNumber') - economic_order_number = result_data.get('orderNumber', str(economic_draft_id)) + # e-conomic returnerer orderNumber direkte for draft orders + order_number = result_data.get('orderNumber') or result_data.get('draftOrderNumber') + economic_draft_id = int(order_number) if order_number else None + economic_order_number = str(order_number) if order_number else None - # Update order med e-conomic IDs + # Update order med e-conomic IDs og status = posted (bogført) execute_update( """UPDATE tmodule_orders SET economic_draft_id = %s, economic_order_number = %s, exported_at = CURRENT_TIMESTAMP, exported_by = %s, - status = 'exported' + status = 'posted' WHERE id = %s""", (economic_draft_id, economic_order_number, user_id, request.order_id) ) + # Marker time entries som billed + execute_update( + """UPDATE tmodule_times + SET status = 'billed' + WHERE id IN ( + SELECT UNNEST(time_entry_ids) + FROM tmodule_order_lines + WHERE order_id = %s + )""", + (request.order_id,) + ) + # Log successful export audit.log_export_completed( order_id=request.order_id, diff --git a/app/timetracking/backend/models.py b/app/timetracking/backend/models.py index b809023..a283b6a 100644 --- a/app/timetracking/backend/models.py +++ b/app/timetracking/backend/models.py @@ -100,6 +100,7 @@ class TModuleTimeUpdate(BaseModel): rounded_to: Optional[Decimal] = Field(None, ge=0.25, description="Afrundingsinterval") approval_note: Optional[str] = None billable: Optional[bool] = None + is_travel: Optional[bool] = None status: Optional[str] = Field(None, pattern="^(pending|approved|rejected|billed)$") @@ -110,6 +111,7 @@ class TModuleTimeApproval(BaseModel): rounded_to: Optional[Decimal] = Field(None, ge=0.25, description="Afrundingsinterval brugt") approval_note: Optional[str] = Field(None, description="Brugerens note") billable: bool = Field(True, description="Skal faktureres?") + is_travel: bool = Field(False, description="Indeholder kørsel?") @field_validator('approved_hours') @classmethod @@ -168,6 +170,9 @@ class TModuleOrderLineBase(BaseModel): case_id: Optional[int] = Field(None, gt=0) time_entry_ids: List[int] = Field(default_factory=list) product_number: Optional[str] = Field(None, max_length=50) + case_contact: Optional[str] = Field(None, max_length=255) + time_date: Optional[date] = None + is_travel: bool = False account_number: Optional[str] = Field(None, max_length=50) @@ -209,7 +214,7 @@ class TModuleOrderCreate(TModuleOrderBase): class TModuleOrderUpdate(BaseModel): """Model for updating order""" - status: Optional[str] = Field(None, pattern="^(draft|exported|sent|cancelled)$") + status: Optional[str] = Field(None, pattern="^(draft|exported|posted|sent|cancelled)$") notes: Optional[str] = None @@ -218,7 +223,7 @@ class TModuleOrder(TModuleOrderBase): id: int order_number: Optional[str] = None customer_name: Optional[str] = None # From JOIN med customers table - status: str = Field("draft", pattern="^(draft|exported|sent|cancelled)$") + status: str = Field("draft", pattern="^(draft|exported|posted|sent|cancelled)$") economic_draft_id: Optional[int] = None economic_order_number: Optional[str] = None exported_at: Optional[datetime] = None @@ -226,6 +231,7 @@ class TModuleOrder(TModuleOrderBase): created_at: datetime updated_at: Optional[datetime] = None created_by: Optional[int] = None + line_count: Optional[int] = Field(None, ge=0) class Config: from_attributes = True diff --git a/app/timetracking/backend/order_service.py b/app/timetracking/backend/order_service.py index cd9929b..a162e26 100644 --- a/app/timetracking/backend/order_service.py +++ b/app/timetracking/backend/order_service.py @@ -134,10 +134,19 @@ class OrderService: case_groups[case_id] = { 'case_vtiger_id': time_entry.get('case_vtiger_id'), 'contact_name': time_entry.get('contact_name'), + 'worked_date': time_entry.get('worked_date'), # Seneste dato + 'is_travel': False, # Marker hvis nogen entry er rejse 'entries': [], 'descriptions': [] # Samle alle beskrivelser } case_groups[case_id]['entries'].append(time_entry) + # Opdater til seneste dato + if time_entry.get('worked_date'): + if not case_groups[case_id]['worked_date'] or time_entry['worked_date'] > case_groups[case_id]['worked_date']: + case_groups[case_id]['worked_date'] = time_entry['worked_date'] + # Marker som rejse hvis nogen entry er rejse + if time_entry.get('is_travel'): + case_groups[case_id]['is_travel'] = True # Tilføj beskrivelse hvis den ikke er tom if time_entry.get('description') and time_entry['description'].strip(): case_groups[case_id]['descriptions'].append(time_entry['description'].strip()) @@ -195,7 +204,10 @@ class OrderService: unit_price=hourly_rate, line_total=line_total, case_id=case_id, - time_entry_ids=time_entry_ids + time_entry_ids=time_entry_ids, + case_contact=group.get('contact_name'), + time_date=group.get('worked_date'), + is_travel=group.get('is_travel', False) )) total_hours += case_hours @@ -237,8 +249,8 @@ class OrderService: line_id = execute_insert( """INSERT INTO tmodule_order_lines (order_id, case_id, line_number, description, quantity, unit_price, - line_total, time_entry_ids) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""", + line_total, time_entry_ids, case_contact, time_date, is_travel) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""", ( order_id, line.case_id, @@ -247,7 +259,10 @@ class OrderService: line.quantity, line.unit_price, line.line_total, - line.time_entry_ids + line.time_entry_ids, + line.case_contact, + line.time_date, + line.is_travel ) ) created_lines.append(line_id) @@ -298,7 +313,7 @@ class OrderService: order_query = """ SELECT o.*, c.name as customer_name FROM tmodule_orders o - LEFT JOIN customers c ON o.customer_id = c.id + LEFT JOIN tmodule_customers c ON o.customer_id = c.id WHERE o.id = %s """ order = execute_query(order_query, (order_id,), fetchone=True) @@ -356,9 +371,11 @@ class OrderService: where_clause = " WHERE " + " AND ".join(conditions) if conditions else "" query = f""" - SELECT o.*, c.name as customer_name + SELECT o.*, + c.name as customer_name, + (SELECT COUNT(*) FROM tmodule_order_lines WHERE order_id = o.id) as line_count FROM tmodule_orders o - LEFT JOIN customers c ON o.customer_id = c.id + LEFT JOIN tmodule_customers c ON o.customer_id = c.id {where_clause} ORDER BY o.order_date DESC, o.id DESC LIMIT %s @@ -394,10 +411,10 @@ class OrderService: if order['status'] == 'cancelled': raise HTTPException(status_code=400, detail="Order already cancelled") - if order['status'] == 'exported': + if order['status'] in ('exported', 'posted'): raise HTTPException( status_code=400, - detail="Cannot cancel exported order" + detail="Kan ikke annullere bogført ordre. Ordren er overført til e-conomic." ) # Update status diff --git a/app/timetracking/backend/wizard.py b/app/timetracking/backend/wizard.py index bb3d05b..cb20540 100644 --- a/app/timetracking/backend/wizard.py +++ b/app/timetracking/backend/wizard.py @@ -180,6 +180,7 @@ class WizardService: rounded_to = %s, approval_note = %s, billable = %s, + is_travel = %s, approved_at = CURRENT_TIMESTAMP, approved_by = %s WHERE id = %s @@ -192,6 +193,7 @@ class WizardService: approval.rounded_to, approval.approval_note, approval.billable, + approval.is_travel, user_id, approval.time_id ) diff --git a/app/timetracking/frontend/orders.html b/app/timetracking/frontend/orders.html index 871ff8e..2f22faf 100644 --- a/app/timetracking/frontend/orders.html +++ b/app/timetracking/frontend/orders.html @@ -324,15 +324,16 @@ const tbody = document.getElementById('orders-tbody'); tbody.innerHTML = orders.map(order => { const statusBadge = getStatusBadge(order); - const exportedIcon = order.exported_to_economic - ? '' + const isPosted = order.status === 'posted'; + const economicInfo = order.economic_order_number + ? `
e-conomic #${order.economic_order_number}` : ''; return ` - + ${order.order_number} - ${exportedIcon} + ${economicInfo} ${order.customer_name} ${new Date(order.order_date).toLocaleDateString('da-DK')} @@ -344,12 +345,15 @@ onclick="event.stopPropagation(); viewOrder(${order.id})"> - ${!order.exported_to_economic ? ` + ${order.status === 'draft' ? ` ` : ''} + ${isPosted ? ` + Låst + ` : ''} `; @@ -370,13 +374,14 @@ // Get status badge function getStatusBadge(order) { - if (order.cancelled_at) { - return 'Annulleret'; - } - if (order.exported_to_economic) { - return 'Eksporteret'; - } - return 'Pending'; + const statusMap = { + 'cancelled': 'Annulleret', + 'posted': ' Bogført', + 'exported': 'Eksporteret', + 'draft': 'Kladde' + }; + + return statusMap[order.status] || 'Ukendt'; } // View order details @@ -450,8 +455,15 @@ `; }).join('')} - ${order.economic_draft_id ? ` + ${order.status === 'posted' ? `
+ + Bogført til e-conomic den ${new Date(order.exported_at).toLocaleDateString('da-DK')} +
e-conomic ordre nr.: ${order.economic_order_number} +
Ordren er låst og kan ikke ændres. +
+ ` : order.economic_draft_id ? ` +
Eksporteret til e-conomic den ${new Date(order.exported_at).toLocaleDateString('da-DK')}
Draft Order nr.: ${order.economic_draft_id} @@ -462,7 +474,12 @@ // Update export button const exportBtn = document.getElementById('export-order-btn'); - if (order.economic_draft_id) { + if (order.status === 'posted') { + exportBtn.disabled = true; + exportBtn.innerHTML = ' Bogført (Låst)'; + exportBtn.classList.remove('btn-primary'); + exportBtn.classList.add('btn-secondary'); + } else if (order.economic_draft_id) { exportBtn.disabled = false; exportBtn.innerHTML = ' Re-eksporter (force)'; exportBtn.onclick = () => { diff --git a/app/timetracking/frontend/wizard.html b/app/timetracking/frontend/wizard.html index d31f728..85c97ea 100644 --- a/app/timetracking/frontend/wizard.html +++ b/app/timetracking/frontend/wizard.html @@ -787,6 +787,15 @@ -
+ +
+
+ + +
+
@@ -1201,6 +1210,10 @@ // Get billable hours and hourly rate from calculation const billableHours = window.entryBillableHours?.[entryId] || entry.original_hours; const hourlyRate = window.entryHourlyRates?.[entryId] || entry.customer_hourly_rate || defaultHourlyRate; + + // Get travel checkbox state + const travelCheckbox = document.getElementById(`travel-${entryId}`); + const isTravel = travelCheckbox ? travelCheckbox.checked : false; try { const response = await fetch(`/api/v1/timetracking/wizard/approve/${entryId}`, { @@ -1210,7 +1223,8 @@ }, body: JSON.stringify({ billable_hours: billableHours, - hourly_rate: hourlyRate + hourly_rate: hourlyRate, + is_travel: isTravel }) }); diff --git a/main.py b/main.py index 7f5a19b..f3f26ea 100644 --- a/main.py +++ b/main.py @@ -12,6 +12,7 @@ from contextlib import asynccontextmanager from app.core.config import settings from app.core.database import init_db +from app.services.email_scheduler import email_scheduler # Import Feature Routers from app.auth.backend import router as auth_api @@ -34,6 +35,7 @@ from app.devportal.backend import router as devportal_api from app.devportal.backend import views as devportal_views from app.timetracking.backend import router as timetracking_api from app.timetracking.frontend import views as timetracking_views +from app.emails.backend import router as emails_api # Configure logging logging.basicConfig( @@ -56,10 +58,14 @@ async def lifespan(app: FastAPI): init_db() + # Start email scheduler (background job) + email_scheduler.start() + logger.info("✅ System initialized successfully") yield # Shutdown logger.info("👋 Shutting down...") + email_scheduler.stop() # Create FastAPI app app = FastAPI( @@ -106,6 +112,7 @@ app.include_router(system_api.router, prefix="/api/v1", tags=["System"]) app.include_router(dashboard_api.router, prefix="/api/v1/dashboard", tags=["Dashboard"]) app.include_router(devportal_api.router, prefix="/api/v1/devportal", tags=["DEV Portal"]) app.include_router(timetracking_api, prefix="/api/v1/timetracking", tags=["Time Tracking"]) +app.include_router(emails_api.router, prefix="/api/v1", tags=["Email System"]) # Frontend Routers app.include_router(auth_views.router, tags=["Frontend"]) diff --git a/migrations/013_email_system.sql b/migrations/013_email_system.sql new file mode 100644 index 0000000..1324da9 --- /dev/null +++ b/migrations/013_email_system.sql @@ -0,0 +1,244 @@ +-- Migration 013: Email System for Invoice and Time Confirmation Processing +-- Based on OmniSync email architecture adapted for BMC Hub + +-- Drop existing tables if any (clean slate) +DROP TABLE IF EXISTS email_analysis CASCADE; +DROP TABLE IF EXISTS email_attachments CASCADE; +DROP TABLE IF EXISTS email_messages CASCADE; +DROP TABLE IF EXISTS email_rules CASCADE; +DROP VIEW IF EXISTS v_unprocessed_emails; +DROP VIEW IF EXISTS v_email_activity; +DROP FUNCTION IF EXISTS update_email_messages_updated_at(); +DROP FUNCTION IF EXISTS update_email_rules_updated_at(); + +-- Email Rules Table (create first - referenced by email_messages) +CREATE TABLE email_rules ( + id SERIAL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + description TEXT, + + -- Rule Conditions (JSON for flexibility) + conditions JSONB NOT NULL, + + -- Rule Actions + action_type VARCHAR(50) NOT NULL, + action_params JSONB, + + -- Priority and Status + priority INTEGER DEFAULT 100, + enabled BOOLEAN DEFAULT true, + + -- Statistics + match_count INTEGER DEFAULT 0, + last_matched_at TIMESTAMP, + + -- Audit + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + created_by_user_id INTEGER, + + FOREIGN KEY (created_by_user_id) REFERENCES users(id) ON DELETE SET NULL +); + +-- Email Messages Table (main storage) +CREATE TABLE email_messages ( + id SERIAL PRIMARY KEY, + message_id VARCHAR(500) UNIQUE NOT NULL, + subject TEXT, + sender_email VARCHAR(255), + sender_name VARCHAR(255), + recipient_email VARCHAR(255), + cc TEXT, + body_text TEXT, + body_html TEXT, + received_date TIMESTAMP, + folder VARCHAR(100) DEFAULT 'INBOX', + + -- AI Classification + classification VARCHAR(50), + confidence_score DECIMAL(3,2), + classification_date TIMESTAMP, + + -- Rule Matching + rule_id INTEGER, + auto_processed BOOLEAN DEFAULT false, + + -- Linking + supplier_id INTEGER, + customer_id INTEGER, + linked_case_id INTEGER, + linked_time_entry_id INTEGER, + linked_purchase_id INTEGER, + + -- Metadata + has_attachments BOOLEAN DEFAULT false, + attachment_count INTEGER DEFAULT 0, + is_read BOOLEAN DEFAULT false, + status VARCHAR(50) DEFAULT 'new', + approval_status VARCHAR(50), + + -- Extraction Fields + extracted_invoice_number VARCHAR(100), + extracted_order_number VARCHAR(100), + extracted_tracking_number VARCHAR(100), + extracted_amount DECIMAL(15,2), + extracted_due_date DATE, + + -- Audit + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + processed_at TIMESTAMP, + processed_by_user_id INTEGER, + + -- Soft Delete + deleted_at TIMESTAMP, + deleted_by_user_id INTEGER +); + +-- Email Attachments Table +CREATE TABLE email_attachments ( + id SERIAL PRIMARY KEY, + email_id INTEGER NOT NULL, + filename VARCHAR(255) NOT NULL, + content_type VARCHAR(100), + size_bytes INTEGER, + file_path TEXT, -- Path in filesystem or object storage + + -- Extraction Status + extracted BOOLEAN DEFAULT false, + extraction_error TEXT, + + -- Metadata + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + + FOREIGN KEY (email_id) REFERENCES email_messages(id) ON DELETE CASCADE +); + +-- Email Analysis Cache (for AI classifications) +CREATE TABLE email_analysis ( + id SERIAL PRIMARY KEY, + email_id INTEGER NOT NULL, + analysis_type VARCHAR(50) NOT NULL, -- classification, extraction, summary + + -- AI Results + result_json JSONB, + confidence_score DECIMAL(3,2), + model_used VARCHAR(100), + + -- Performance + processing_time_ms INTEGER, + + -- Metadata + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + + FOREIGN KEY (email_id) REFERENCES email_messages(id) ON DELETE CASCADE, + UNIQUE(email_id, analysis_type) +); + +-- Indexes for Performance +CREATE INDEX idx_email_messages_sender ON email_messages(sender_email) WHERE deleted_at IS NULL; +CREATE INDEX idx_email_messages_classification ON email_messages(classification) WHERE deleted_at IS NULL; +CREATE INDEX idx_email_messages_status ON email_messages(status) WHERE deleted_at IS NULL; +CREATE INDEX idx_email_messages_received_date ON email_messages(received_date DESC) WHERE deleted_at IS NULL; +CREATE INDEX idx_email_messages_message_id ON email_messages(message_id); +CREATE INDEX idx_email_messages_supplier ON email_messages(supplier_id) WHERE supplier_id IS NOT NULL; +CREATE INDEX idx_email_messages_customer ON email_messages(customer_id) WHERE customer_id IS NOT NULL; +CREATE INDEX idx_email_messages_linked_case ON email_messages(linked_case_id) WHERE linked_case_id IS NOT NULL; +CREATE INDEX idx_email_attachments_email_id ON email_attachments(email_id); +CREATE INDEX idx_email_analysis_email_id ON email_analysis(email_id); +CREATE INDEX idx_email_rules_priority ON email_rules(priority) WHERE enabled = true; + +-- Update Trigger for email_messages +CREATE OR REPLACE FUNCTION update_email_messages_updated_at() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = CURRENT_TIMESTAMP; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER trigger_email_messages_updated_at + BEFORE UPDATE ON email_messages + FOR EACH ROW + EXECUTE FUNCTION update_email_messages_updated_at(); + +-- Update Trigger for email_rules +CREATE OR REPLACE FUNCTION update_email_rules_updated_at() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = CURRENT_TIMESTAMP; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER trigger_email_rules_updated_at + BEFORE UPDATE ON email_rules + FOR EACH ROW + EXECUTE FUNCTION update_email_rules_updated_at(); + +-- View for unprocessed emails +CREATE OR REPLACE VIEW v_unprocessed_emails AS +SELECT + em.*, + COUNT(ea.id) as attachment_count_actual, + er.name as rule_name, + v.name as supplier_name, + tc.customer_name, + tcase.title as case_title +FROM email_messages em +LEFT JOIN email_attachments ea ON em.id = ea.email_id +LEFT JOIN email_rules er ON em.rule_id = er.id +LEFT JOIN vendors v ON em.supplier_id = v.id +LEFT JOIN tmodule_customers tc ON em.customer_id = tc.id +LEFT JOIN tmodule_cases tcase ON em.linked_case_id = tcase.id +WHERE em.deleted_at IS NULL + AND em.status IN ('new', 'error') +GROUP BY em.id, er.name, v.name, tc.customer_name, tcase.title +ORDER BY em.received_date DESC; + +-- View for recent email activity +CREATE OR REPLACE VIEW v_email_activity AS +SELECT + DATE(em.received_date) as activity_date, + em.classification, + COUNT(*) as email_count, + COUNT(CASE WHEN em.auto_processed THEN 1 END) as auto_processed_count, + AVG(em.confidence_score) as avg_confidence +FROM email_messages em +WHERE em.deleted_at IS NULL + AND em.received_date >= CURRENT_DATE - INTERVAL '30 days' +GROUP BY DATE(em.received_date), em.classification +ORDER BY activity_date DESC, email_count DESC; + +-- Sample email rules for common scenarios +INSERT INTO email_rules (name, description, conditions, action_type, action_params, priority, created_by_user_id) +VALUES + ('Mark Spam - Known Domains', + 'Automatically mark emails from known spam domains as spam', + '{"sender_domain": ["spamsite.com", "marketing-spam.net"], "action": "mark_spam"}', + 'mark_spam', + '{}', + 10, + 1), + + ('Link Supplier Invoices', + 'Automatically link invoices from known supplier email domains', + '{"classification": "invoice", "sender_domain_match": "supplier"}', + 'link_supplier', + '{"auto_match_domain": true}', + 50, + 1), + + ('Time Confirmation Auto-Link', + 'Link time confirmation emails to cases based on case number in subject', + '{"classification": "time_confirmation", "subject_regex": "CC[0-9]{4}"}', + 'link_case', + '{"extract_case_from_subject": true}', + 30, + 1) +ON CONFLICT DO NOTHING; + +COMMENT ON TABLE email_messages IS 'Main email storage with AI classification and linking'; +COMMENT ON TABLE email_attachments IS 'Email attachment metadata and file references'; +COMMENT ON TABLE email_analysis IS 'Cache for AI analysis results (classification, extraction)'; +COMMENT ON TABLE email_rules IS 'Automatic email processing rules with priority matching'; diff --git a/requirements.txt b/requirements.txt index 59aad77..e81d451 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,10 @@ jinja2==3.1.4 pyjwt==2.9.0 aiohttp==3.10.10 +# Email & Scheduling +APScheduler==3.10.4 +msal==1.31.1 + # AI & Document Processing httpx==0.27.2 PyPDF2==3.0.1