bmc_hub/app/emails/backend/router.py

1552 lines
53 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Email Management Router
API endpoints for email viewing, classification, and rule management
"""
import logging
from fastapi import APIRouter, HTTPException, Query, UploadFile, File
from typing import List, Optional, Dict
from pydantic import BaseModel
from datetime import datetime, date
from app.core.database import execute_query, execute_insert, execute_update
from app.services.email_processor_service import EmailProcessorService
from app.services.email_workflow_service import email_workflow_service
from app.services.ollama_service import ollama_service
logger = logging.getLogger(__name__)
router = APIRouter()
# Pydantic Models
class EmailListItem(BaseModel):
id: int
message_id: str
subject: str
sender_email: str
sender_name: Optional[str]
received_date: datetime
classification: Optional[str]
confidence_score: Optional[float]
status: str
is_read: bool
has_attachments: bool
attachment_count: int
rule_name: Optional[str] = None
supplier_name: Optional[str] = None
customer_name: Optional[str] = None
class EmailAttachment(BaseModel):
id: int
email_id: int
filename: str
content_type: Optional[str]
size_bytes: Optional[int]
file_path: Optional[str]
created_at: datetime
class EmailDetail(BaseModel):
id: int
message_id: str
subject: str
sender_email: str
sender_name: Optional[str]
recipient_email: Optional[str]
cc: Optional[str]
body_text: Optional[str]
body_html: Optional[str]
received_date: datetime
folder: str
classification: Optional[str]
confidence_score: Optional[float]
status: str
is_read: bool
has_attachments: bool
attachment_count: int
rule_id: Optional[int]
supplier_id: Optional[int]
customer_id: Optional[int]
linked_case_id: Optional[int]
extracted_invoice_number: Optional[str]
extracted_amount: Optional[float]
extracted_due_date: Optional[date]
auto_processed: bool
created_at: datetime
updated_at: datetime
attachments: List[EmailAttachment] = []
customer_name: Optional[str] = None
supplier_name: Optional[str] = None
class EmailRule(BaseModel):
id: Optional[int] = None
name: str
description: Optional[str]
conditions: dict
action_type: str
action_params: Optional[dict] = {}
priority: int = 100
enabled: bool = True
match_count: int = 0
last_matched_at: Optional[datetime]
class EmailWorkflow(BaseModel):
id: Optional[int] = None
name: str
description: Optional[str]
classification_trigger: str
sender_pattern: Optional[str] = None
subject_pattern: Optional[str] = None
confidence_threshold: float = 0.70
workflow_steps: List[dict]
priority: int = 100
enabled: bool = True
stop_on_match: bool = True
execution_count: int = 0
success_count: int = 0
failure_count: int = 0
last_executed_at: Optional[datetime] = None
class WorkflowExecution(BaseModel):
id: int
workflow_id: int
email_id: int
status: str
steps_completed: int
steps_total: Optional[int]
result_json: Optional[List[dict]] = None # Can be list of step results
error_message: Optional[str]
started_at: datetime
completed_at: Optional[datetime]
execution_time_ms: Optional[int]
class WorkflowAction(BaseModel):
id: int
action_code: str
name: str
description: Optional[str]
category: Optional[str]
parameter_schema: Optional[dict]
example_config: Optional[dict]
enabled: bool
class ProcessingStats(BaseModel):
status: str
fetched: int = 0
saved: int = 0
classified: int = 0
rules_matched: int = 0
errors: int = 0
# Email Endpoints
@router.get("/emails", response_model=List[EmailListItem])
async def list_emails(
status: Optional[str] = Query(None),
classification: Optional[str] = Query(None),
q: Optional[str] = Query(None),
limit: int = Query(50, le=500),
offset: int = Query(0, ge=0)
):
"""Get list of emails with filtering"""
try:
where_clauses = ["em.deleted_at IS NULL"]
params = []
if status:
where_clauses.append("em.status = %s")
params.append(status)
if classification:
where_clauses.append("em.classification = %s")
params.append(classification)
if q:
where_clauses.append("(em.subject ILIKE %s OR em.sender_email ILIKE %s OR em.sender_name ILIKE %s)")
search_term = f"%{q}%"
params.extend([search_term, search_term, search_term])
where_sql = " AND ".join(where_clauses)
query = f"""
SELECT
em.id, em.message_id, em.subject, em.sender_email, em.sender_name,
em.received_date, em.classification, em.confidence_score, em.status,
em.is_read, em.has_attachments, em.attachment_count,
em.body_text, em.body_html,
er.name as rule_name,
v.name as supplier_name,
NULL as customer_name
FROM email_messages em
LEFT JOIN email_rules er ON em.rule_id = er.id
LEFT JOIN vendors v ON em.supplier_id = v.id
WHERE {where_sql}
ORDER BY em.received_date DESC
LIMIT %s OFFSET %s
"""
params.extend([limit, offset])
result = execute_query(query, tuple(params))
return result
except Exception as e:
logger.error(f"❌ Error listing emails: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/emails/{email_id}", response_model=EmailDetail)
async def get_email(email_id: int):
"""Get email detail by ID"""
try:
query = """
SELECT em.*,
c.name AS customer_name,
v.name AS supplier_name
FROM email_messages em
LEFT JOIN customers c ON em.customer_id = c.id
LEFT JOIN vendors v ON em.supplier_id = v.id
WHERE em.id = %s AND em.deleted_at IS NULL
"""
result = execute_query(query, (email_id,))
logger.info(f"🔍 Query result type: {type(result)}, length: {len(result) if result else 0}")
if not result:
raise HTTPException(status_code=404, detail="Email not found")
# Store email before update
email_data = result[0]
# Get attachments
att_query = "SELECT * FROM email_attachments WHERE email_id = %s ORDER BY id"
attachments = execute_query(att_query, (email_id,))
email_data['attachments'] = attachments or []
# Mark as read
update_query = "UPDATE email_messages SET is_read = true WHERE id = %s"
execute_update(update_query, (email_id,))
return email_data
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error getting email {email_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/emails/{email_id}/mark-processed")
async def mark_email_processed(email_id: int):
"""Mark email as processed and move to 'Processed' folder"""
try:
# Update email status and folder
update_query = """
UPDATE email_messages
SET status = 'processed',
folder = 'Processed',
processed_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s AND deleted_at IS NULL
RETURNING id, folder, status
"""
result = execute_query(update_query, (email_id,))
if not result:
raise HTTPException(status_code=404, detail="Email not found")
logger.info(f"✅ Email {email_id} marked as processed and moved to Processed folder")
return {
"success": True,
"email_id": result.get('id') if result else email_id,
"folder": result.get('folder') if result else 'Processed',
"status": result.get('status') if result else 'processed'
}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error marking email {email_id} as processed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/emails/{email_id}/attachments/{attachment_id}")
async def download_attachment(email_id: int, attachment_id: int):
"""Download email attachment"""
from fastapi.responses import FileResponse
import os
try:
query = """
SELECT a.* FROM email_attachments a
JOIN email_messages e ON e.id = a.email_id
WHERE a.id = %s AND a.email_id = %s AND e.deleted_at IS NULL
"""
result = execute_query(query, (attachment_id, email_id))
if not result:
raise HTTPException(status_code=404, detail="Attachment not found")
attachment = result[0]
file_path = attachment['file_path']
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found on disk")
return FileResponse(
path=file_path,
filename=attachment['filename'],
media_type=attachment.get('content_type', 'application/octet-stream')
)
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error downloading attachment {attachment_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.put("/emails/{email_id}")
async def update_email(email_id: int, status: Optional[str] = None):
"""Update email (archive, mark as read, etc)"""
try:
# Build update fields dynamically
updates = []
params = []
if status:
updates.append("status = %s")
params.append(status)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
params.append(email_id)
query = f"UPDATE email_messages SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s"
execute_update(query, tuple(params))
logger.info(f"✅ Updated email {email_id}: status={status}")
return {"success": True, "message": "Email updated"}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error updating email {email_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.patch("/emails/{email_id}/link")
async def link_email(email_id: int, payload: Dict):
"""Link email to a customer and/or vendor/supplier"""
try:
updates = []
params = []
if 'customer_id' in payload:
updates.append("customer_id = %s")
params.append(payload['customer_id'])
if 'supplier_id' in payload:
updates.append("supplier_id = %s")
params.append(payload['supplier_id'])
if not updates:
raise HTTPException(status_code=400, detail="Ingen felter at opdatere")
params.append(email_id)
query = f"UPDATE email_messages SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s"
execute_update(query, tuple(params))
logger.info(f"✅ Linked email {email_id}: {payload}")
return {"success": True, "message": "Email linket"}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error linking email {email_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/emails/{email_id}/extract-vendor-suggestion")
async def extract_vendor_suggestion(email_id: int):
"""
Udtrækker leverandørinfo fra email body og vedhæftede PDF-fakturaer.
Bruger stærke regex-mønstre + AI for CVR, adresse, telefon, domæne.
"""
import re
import os
# ── Hjælpefunktioner ────────────────────────────────────────────────────
def clean_phone(raw: str) -> str:
"""Normaliser telefonnummer til +45 XXXX XXXX eller 8 cifre"""
digits = re.sub(r'[^\d+]', '', raw)
if digits.startswith('+45') and len(digits) == 11:
return digits
if digits.startswith('45') and len(digits) == 10:
return '+' + digits
bare = re.sub(r'\D', '', raw)
if len(bare) == 8:
return bare
return raw.strip()[:20]
def extract_cvr(text: str, own_cvr: str = '') -> Optional[str]:
patterns = [
# Med label
r'(?:CVR|Cvr\.?-?nr\.?|cvr|Moms(?:nr\.?|registrerings?nr\.?)|VAT\s*(?:no\.?|nr\.?|number))[:\s.\-]*(?:DK)?[\s\-]?(\d{8})',
# DK-præfiks
r'\bDK[\s\-]?(\d{8})\b',
# Standalone 8 cifre (sidst mindst specifik)
r'\b(\d{8})\b',
]
for pat in patterns:
for m in re.finditer(pat, text, re.IGNORECASE):
val = m.group(1)
if val != own_cvr and val.isdigit():
return val
return None
def extract_phones(text: str) -> Optional[str]:
patterns = [
# Med label
r'(?:Tlf\.?|Tel\.?|Telefon|Phone|Mobil|Fax)[:\s.\-]*(\+?[\d][\d\s\-().]{6,18})',
# +45 XXXXXXXX
r'(\+45[\s\-]?\d{2}[\s\-]?\d{2}[\s\-]?\d{2}[\s\-]?\d{2})',
# 8 cifre i grupper: 12 34 56 78 / 1234 5678
r'\b(\d{2}[\s\-]\d{2}[\s\-]\d{2}[\s\-]\d{2})\b',
r'\b(\d{4}[\s\-]\d{4})\b',
]
for pat in patterns:
m = re.search(pat, text, re.IGNORECASE)
if m:
return clean_phone(m.group(1))
return None
def extract_address(text: str) -> Optional[str]:
# Dansk postnummer 4 cifre + by
m = re.search(
r'([A-ZÆØÅ][a-zæøåA-ZÆØÅ\-\.]+(?:\s+\d+[A-Za-z]?(?:,?\s*(?:st|tv|th|\d+\.?\s*(?:sal|etage)?))?)?,?\s*\d{4}\s+[A-ZÆØÅ][a-zæøåA-ZÆØÅ\s\-]+)',
text
)
if m:
return m.group(0).strip()
# Fallback: vejnavn + husnummer + postnummer
m = re.search(
r'([A-ZÆØÅ][a-zæøåA-ZÆØÅ]+(?:vej|gade|alle|vænge|torv|plads|stræde|boulevard|have|bakke|skov|park|strand|mark|eng)\s*\d+[A-Za-z]?\s*,?\s*\d{4})',
text, re.IGNORECASE
)
if m:
return m.group(0).strip()
return None
def extract_domain(text: str, sender_email: str = '') -> Optional[str]:
# Eksplicit www
m = re.search(r'(?:www\.|https?://)([\w\-]+\.[\w\-]+(?:\.[\w]{2,6})?)', text, re.IGNORECASE)
if m:
return m.group(1).lower()
# Emailadresser i teksten (ikke @bmcnetworks)
for em in re.finditer(r'[\w.\-+]+@([\w\-]+\.[\w\-]+(?:\.[\w]{2,6})?)', text):
dom = em.group(1).lower()
if 'bmc' not in dom and 'gmail' not in dom and 'outlook' not in dom and 'hotmail' not in dom:
return dom
# Sender email
if sender_email and '@' in sender_email:
dom = sender_email.split('@')[1].lower()
if 'gmail' not in dom and 'outlook' not in dom and 'hotmail' not in dom:
return dom
return None
def extract_company_name(text: str, sender_name: str = '') -> Optional[str]:
"""Prøv at finde firmanavn via CVR-nær tekst eller typiske DK-firmasuffikser"""
m = re.search(
r'\b([\w\s\-&\'\.]+(?:A/S|ApS|IVS|I/S|K/S|P/S|GmbH|Ltd\.?|LLC|AB|AS))\b',
text
)
if m:
return m.group(1).strip()
return sender_name or None
# ── Hoved-logik ─────────────────────────────────────────────────────────
try:
email_result = execute_query(
"SELECT * FROM email_messages WHERE id = %s AND deleted_at IS NULL",
(email_id,)
)
if not email_result:
raise HTTPException(status_code=404, detail="Email ikke fundet")
email = email_result[0]
from app.core.config import settings
own_cvr = getattr(settings, 'OWN_CVR', '')
# Saml tekst fra body + PDF-bilag
text_parts = []
if email.get('body_text'):
text_parts.append(("body", email['body_text']))
attachments = execute_query(
"SELECT * FROM email_attachments WHERE email_id = %s ORDER BY id",
(email_id,)
)
for att in (attachments or []):
file_path = att.get('file_path')
if file_path and os.path.exists(file_path):
ct = att.get('content_type', '')
if 'pdf' in ct or file_path.lower().endswith('.pdf'):
try:
pdf_text = await ollama_service._extract_text_from_file(file_path)
if pdf_text:
text_parts.append(("pdf", pdf_text))
except Exception as e:
logger.warning(f"⚠️ Kunne ikke læse PDF {file_path}: {e}")
# Prioriter PDF-tekst for leverandørinfo (header + footer indeholder firmainfo)
# Tag: første 800 tegn (header) + sidste 800 tegn (footer) fra hvert dokument
focused_parts = []
for src, txt in text_parts:
if len(txt) > 1200:
focused_parts.append(f"[{src} header]\n{txt[:800]}")
focused_parts.append(f"[{src} footer]\n{txt[-800:]}")
else:
focused_parts.append(f"[{src}]\n{txt}")
focused_text = "\n\n".join(focused_parts)
combined_text = "\n\n".join(t for _, t in text_parts)
sender_name = email.get('sender_name') or ''
sender_email = email.get('sender_email') or ''
# ── Regex udtræk ────────────────────────────────────────────────────
suggestion = {
"name": extract_company_name(focused_text, sender_name) or sender_name,
"email": sender_email,
"cvr_number": extract_cvr(focused_text, own_cvr),
"phone": extract_phones(focused_text),
"address": extract_address(focused_text),
"domain": extract_domain(focused_text, sender_email),
"source": "regex"
}
logger.info(f"🔍 Regex udtræk for email {email_id}: {suggestion}")
# ── AI udtræk (forbedrer regex-resultat) ────────────────────────────
if focused_text.strip():
try:
# Send kun den fokuserede tekst (max 4000 tegn) til AI
ai_text = focused_text[:4000]
prompt = f"""Du er en ekspert i at udtrække firmaoplysninger fra danske fakturaer og e-mails.
OPGAVE: Find LEVERANDØRENS firmaoplysninger i teksten nedenfor.
Leverandøren er AFSENDEREN - IKKE BMC Networks og IKKE køber.
RETURNER KUN DETTE JSON - ingen forklaring, ingen markdown:
{{
"name": "Firmanavn ApS",
"cvr_number": "12345678",
"address": "Vejnavn 1, 2000 By",
"phone": "12345678",
"email": "kontakt@firma.dk",
"domain": "firma.dk"
}}
REGLER:
- name: Firmanavn med A/S, ApS, IVS osv. - IKKE BMC Networks
- cvr_number: Præcis 8 cifre efter "CVR", "CVR-nr", "Moms" eller "DK" - IGNORER {own_cvr}
- address: Fuld adresse med postnummer og by (dansk format: "Vejnavn 1, 1234 By")
- phone: Telefonnummer - foretrukket format: "+45 XXXX XXXX" eller "XXXX XXXX"
- email: Kontakt-email til firmaet (IKKE afsender-email hvis den er personlig)
- domain: Hjemmeside-domæne f.eks. "firma.dk" eller "www.firma.dk"
- Sæt null for felter der IKKE kan findes med sikkerhed
KENDTE REGEX-RESULTATER (brug som hjælp, ret dem hvis de er forkerte):
- cvr: {suggestion.get('cvr_number') or 'ikke fundet'}
- phone: {suggestion.get('phone') or 'ikke fundet'}
- address: {suggestion.get('address') or 'ikke fundet'}
- domain: {suggestion.get('domain') or 'ikke fundet'}
TEKST:
{ai_text}
JSON:"""
ai_result = await ollama_service.extract_from_text(prompt)
if ai_result and isinstance(ai_result, dict):
improved = False
for field in ('name', 'cvr_number', 'address', 'phone', 'email', 'domain'):
val = ai_result.get(field)
if val and str(val).strip() not in ('null', '', 'N/A', 'None', own_cvr):
new_val = str(val).strip()
if new_val != str(suggestion.get(field) or ''):
suggestion[field] = new_val
improved = True
if improved:
suggestion['source'] = 'ai'
logger.info(f"✅ AI vendor suggestion for email {email_id}: {suggestion}")
except Exception as e:
logger.warning(f"⚠️ AI udtræk fejlede, bruger regex-resultat: {e}")
# Rens: fjern domæner der tilhører kendte mailservere
spam_domains = {'gmail.com', 'hotmail.com', 'outlook.com', 'yahoo.com', 'live.com', 'icloud.com'}
if suggestion.get('domain') in spam_domains:
suggestion['domain'] = None
# Fjern own_cvr hvis den snegte sig ind
if suggestion.get('cvr_number') == own_cvr:
suggestion['cvr_number'] = None
return suggestion
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ extract-vendor-suggestion fejlede for email {email_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/emails/{email_id}")
async def delete_email(email_id: int):
"""Soft delete email"""
try:
query = """
UPDATE email_messages
SET deleted_at = CURRENT_TIMESTAMP
WHERE id = %s AND deleted_at IS NULL
"""
execute_update(query, (email_id,))
logger.info(f"🗑️ Deleted email {email_id}")
return {"success": True, "message": "Email deleted"}
except Exception as e:
logger.error(f"❌ Error deleting email {email_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/emails/{email_id}/reprocess")
async def reprocess_email(email_id: int):
"""Reprocess email (re-classify, run workflows, and apply rules)"""
try:
# Get email
query = "SELECT * FROM email_messages WHERE id = %s AND deleted_at IS NULL"
result = execute_query(query, (email_id,))
if not result:
raise HTTPException(status_code=404, detail="Email not found")
email = result[0]
# Re-classify and run full processing pipeline
processor = EmailProcessorService()
processing_result = await processor.process_single_email(email)
# Re-fetch updated email
result = execute_query(query, (email_id,))
email = result[0]
logger.info(f"🔄 Reprocessed email {email_id}: {email['classification']} ({email.get('confidence_score', 0):.2f})")
return {
"success": True,
"message": "Email reprocessed with workflows",
"classification": email['classification'],
"confidence": email.get('confidence_score', 0),
"workflows_executed": processing_result.get('workflows_executed', 0)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error reprocessing email {email_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/emails/process")
async def process_emails():
"""Manually trigger email processing"""
try:
processor = EmailProcessorService()
stats = await processor.process_inbox()
return {
"success": True,
"message": "Email processing completed",
"stats": stats
}
except Exception as e:
logger.error(f"❌ Email processing failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/emails/upload")
async def upload_emails(files: List[UploadFile] = File(...)):
"""
Upload email files (.eml or .msg) via drag-and-drop
Supports multiple files at once
"""
from app.services.email_service import EmailService
from app.services.email_processor_service import EmailProcessorService
from app.services.email_workflow_service import email_workflow_service
from app.services.email_activity_logger import EmailActivityLogger
from app.core.config import settings
email_service = EmailService()
processor = EmailProcessorService()
activity_logger = EmailActivityLogger()
results = []
max_size = settings.EMAIL_MAX_UPLOAD_SIZE_MB * 1024 * 1024 # Convert MB to bytes
logger.info(f"📤 Upload started: {len(files)} file(s)")
for file in files:
try:
logger.info(f"📄 Processing file: {file.filename}")
# Validate file type
if not file.filename.lower().endswith(('.eml', '.msg')):
logger.warning(f"⚠️ Skipped non-email file: {file.filename}")
results.append({
"filename": file.filename,
"status": "skipped",
"message": "Only .eml and .msg files are supported"
})
continue
# Read file content
content = await file.read()
logger.info(f"📊 File size: {len(content)} bytes")
# Check file size
if len(content) > max_size:
logger.warning(f"⚠️ File too large: {file.filename}")
results.append({
"filename": file.filename,
"status": "error",
"message": f"File too large (max {settings.EMAIL_MAX_UPLOAD_SIZE_MB}MB)"
})
continue
# Parse email based on file type
if file.filename.lower().endswith('.eml'):
logger.info(f"📧 Parsing .eml file: {file.filename}")
email_data = email_service.parse_eml_file(content)
else: # .msg
logger.info(f"📧 Parsing .msg file: {file.filename}")
email_data = email_service.parse_msg_file(content)
if not email_data:
logger.error(f"❌ Failed to parse: {file.filename}")
results.append({
"filename": file.filename,
"status": "error",
"message": "Failed to parse email file"
})
continue
logger.info(f"✅ Parsed: {email_data.get('subject', 'No Subject')[:50]}")
# Save to database
email_id = await email_service.save_uploaded_email(email_data)
if email_id is None:
logger.info(f"⏭️ Duplicate email: {file.filename}")
results.append({
"filename": file.filename,
"status": "duplicate",
"message": "Email already exists in system"
})
continue
logger.info(f"💾 Saved to database with ID: {email_id}")
# Log activity
activity_logger.log_fetched(
email_id=email_id,
source="manual_upload",
metadata={"filename": file.filename}
)
# Auto-classify
classification = None
confidence = None
try:
logger.info(f"🤖 Classifying email {email_id}...")
classification, confidence = await processor.classify_email(
email_data['subject'],
email_data['body_text'] or email_data['body_html']
)
logger.info(f"✅ Classified as: {classification} ({confidence:.2f})")
# Update classification
update_query = """
UPDATE email_messages
SET classification = %s, confidence_score = %s,
classification_date = CURRENT_TIMESTAMP
WHERE id = %s
"""
execute_update(update_query, (classification, confidence, email_id))
activity_logger.log_classified(
email_id=email_id,
classification=classification,
confidence=confidence,
metadata={"method": "auto", "source": "manual_upload"}
)
except Exception as e:
logger.warning(f"⚠️ Classification failed for uploaded email: {e}")
# Execute workflows
try:
logger.info(f"⚙️ Executing workflows for email {email_id}...")
await email_workflow_service.execute_workflows_for_email(email_id)
except Exception as e:
logger.warning(f"⚠️ Workflow execution failed for uploaded email: {e}")
results.append({
"filename": file.filename,
"status": "success",
"message": "Email imported successfully",
"email_id": email_id,
"subject": email_data['subject'],
"classification": classification,
"confidence": confidence,
"attachments": len(email_data.get('attachments', []))
})
logger.info(f"✅ Successfully processed: {file.filename} -> Email ID {email_id}")
except Exception as e:
logger.error(f"❌ Failed to process {file.filename}: {e}", exc_info=True)
results.append({
"filename": file.filename,
"status": "error",
"message": str(e)
})
# Summary
success_count = len([r for r in results if r["status"] == "success"])
duplicate_count = len([r for r in results if r["status"] == "duplicate"])
error_count = len([r for r in results if r["status"] == "error"])
skipped_count = len([r for r in results if r["status"] == "skipped"])
logger.info(f"📊 Upload summary: {success_count} success, {duplicate_count} duplicates, {error_count} errors, {skipped_count} skipped")
return {
"uploaded": success_count,
"duplicates": duplicate_count,
"failed": error_count,
"skipped": skipped_count,
"results": results
}
@router.get("/emails/processing/stats")
async def get_processing_stats():
"""Get email processing statistics"""
try:
query = """
SELECT
COUNT(*) as total_emails,
COUNT(*) FILTER (WHERE status = 'new') as new_emails,
COUNT(*) FILTER (WHERE status = 'processed') as processed_emails,
COUNT(*) FILTER (WHERE status = 'error') as error_emails,
COUNT(*) FILTER (WHERE has_attachments = true) as with_attachments,
COUNT(*) FILTER (WHERE import_method = 'manual_upload') as manually_uploaded,
COUNT(*) FILTER (WHERE import_method = 'imap') as from_imap,
COUNT(*) FILTER (WHERE import_method = 'graph_api') as from_graph_api,
MAX(received_date) as last_email_received
FROM email_messages
WHERE deleted_at IS NULL
AND received_date >= NOW() - INTERVAL '30 days'
"""
result = execute_query(query)
if result:
return result[0]
else:
return {
"total_emails": 0,
"new_emails": 0,
"processed_emails": 0,
"error_emails": 0,
"with_attachments": 0,
"manually_uploaded": 0,
"from_imap": 0,
"from_graph_api": 0,
"last_email_received": None
}
except Exception as e:
logger.error(f"❌ Error getting processing stats: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/emails/bulk/archive")
async def bulk_archive(email_ids: List[int]):
"""Bulk archive emails"""
try:
if not email_ids:
raise HTTPException(status_code=400, detail="No email IDs provided")
placeholders = ','.join(['%s'] * len(email_ids))
query = f"""
UPDATE email_messages
SET status = 'archived', updated_at = CURRENT_TIMESTAMP
WHERE id IN ({placeholders}) AND deleted_at IS NULL
"""
execute_update(query, tuple(email_ids))
logger.info(f"📦 Bulk archived {len(email_ids)} emails")
return {"success": True, "message": f"{len(email_ids)} emails archived"}
except Exception as e:
logger.error(f"❌ Error bulk archiving: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/emails/bulk/reprocess")
async def bulk_reprocess(email_ids: List[int]):
"""Bulk reprocess emails"""
try:
if not email_ids:
raise HTTPException(status_code=400, detail="No email IDs provided")
processor = EmailProcessorService()
success_count = 0
for email_id in email_ids:
try:
# Get email
query = "SELECT * FROM email_messages WHERE id = %s AND deleted_at IS NULL"
result = execute_query(query, (email_id,))
if result:
email_data = result[0]
# Use central processing logic
await processor.process_single_email(email_data)
success_count += 1
except Exception as e:
logger.error(f"Error reprocessing email {email_id}: {e}")
return {"success": True, "count": success_count}
except Exception as e:
logger.error(f"❌ Error bulk reprocessing: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/emails/bulk/delete")
async def bulk_delete(email_ids: List[int]):
"""Bulk soft delete emails"""
try:
if not email_ids:
raise HTTPException(status_code=400, detail="No email IDs provided")
placeholders = ','.join(['%s'] * len(email_ids))
query = f"""
UPDATE email_messages
SET deleted_at = CURRENT_TIMESTAMP
WHERE id IN ({placeholders}) AND deleted_at IS NULL
"""
execute_update(query, tuple(email_ids))
logger.info(f"🗑️ Bulk deleted {len(email_ids)} emails")
return {"success": True, "message": f"{len(email_ids)} emails deleted"}
except Exception as e:
logger.error(f"❌ Error bulk deleting: {e}")
raise HTTPException(status_code=500, detail=str(e))
class ClassificationUpdate(BaseModel):
classification: str
confidence: Optional[float] = None
@router.put("/emails/{email_id}/classify")
async def update_classification(email_id: int, data: ClassificationUpdate):
"""Manually update email classification"""
try:
valid_classifications = [
'invoice', 'freight_note', 'order_confirmation', 'time_confirmation',
'case_notification', 'customer_email', 'bankruptcy', 'general', 'spam', 'unknown'
]
if data.classification not in valid_classifications:
raise HTTPException(status_code=400, detail=f"Invalid classification. Must be one of: {valid_classifications}")
confidence = data.confidence if data.confidence is not None else 1.0
query = """
UPDATE email_messages
SET classification = %s,
confidence_score = %s,
classification_date = CURRENT_TIMESTAMP
WHERE id = %s AND deleted_at IS NULL
"""
execute_update(query, (data.classification, confidence, email_id))
logger.info(f"✏️ Manual classification: Email {email_id}{data.classification}")
return {
"success": True,
"message": f"Email {email_id} classified as '{data.classification}'"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error updating classification: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/emails/{email_id}")
async def delete_email(email_id: int):
"""Soft delete email"""
try:
query = """
UPDATE email_messages
SET deleted_at = CURRENT_TIMESTAMP
WHERE id = %s
"""
execute_query(query, (email_id,))
return {
"success": True,
"message": f"Email {email_id} deleted"
}
except Exception as e:
logger.error(f"❌ Error deleting email: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Email Rules Endpoints
@router.get("/email-rules", response_model=List[EmailRule])
async def list_rules():
"""Get all email rules"""
try:
query = """
SELECT * FROM email_rules
ORDER BY priority ASC, name ASC
"""
result = execute_query(query)
return result
except Exception as e:
logger.error(f"❌ Error listing rules: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/email-rules", response_model=EmailRule)
async def create_rule(rule: EmailRule):
"""Create new email rule"""
try:
query = """
INSERT INTO email_rules
(name, description, conditions, action_type, action_params, priority, enabled, created_by_user_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, 1)
RETURNING *
"""
import json
result = execute_query(query, (
rule.name,
rule.description,
json.dumps(rule.conditions),
rule.action_type,
json.dumps(rule.action_params or {}),
rule.priority,
rule.enabled
))
if result:
logger.info(f"✅ Created email rule: {rule.name}")
return result[0]
else:
raise HTTPException(status_code=500, detail="Failed to create rule")
except Exception as e:
logger.error(f"❌ Error creating rule: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.put("/email-rules/{rule_id}", response_model=EmailRule)
async def update_rule(rule_id: int, rule: EmailRule):
"""Update existing email rule"""
try:
import json
query = """
UPDATE email_rules
SET name = %s,
description = %s,
conditions = %s,
action_type = %s,
action_params = %s,
priority = %s,
enabled = %s
WHERE id = %s
RETURNING *
"""
result = execute_query(query, (
rule.name,
rule.description,
json.dumps(rule.conditions),
rule.action_type,
json.dumps(rule.action_params or {}),
rule.priority,
rule.enabled,
rule_id
))
if result:
logger.info(f"✅ Updated email rule {rule_id}")
return result[0]
else:
raise HTTPException(status_code=404, detail="Rule not found")
except Exception as e:
logger.error(f"❌ Error updating rule: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/email-rules/{rule_id}")
async def delete_rule(rule_id: int):
"""Delete email rule"""
try:
query = "DELETE FROM email_rules WHERE id = %s"
execute_query(query, (rule_id,))
return {
"success": True,
"message": f"Rule {rule_id} deleted"
}
except Exception as e:
logger.error(f"❌ Error deleting rule: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Statistics Endpoint
@router.get("/emails/stats/summary")
async def get_email_stats():
"""Get email processing statistics"""
try:
query = """
SELECT
COUNT(*) as total_emails,
COUNT(CASE WHEN status = 'new' THEN 1 END) as new_emails,
COUNT(CASE WHEN status = 'processed' THEN 1 END) as processed_emails,
COUNT(CASE WHEN classification = 'invoice' THEN 1 END) as invoices,
COUNT(CASE WHEN classification = 'time_confirmation' THEN 1 END) as time_confirmations,
COUNT(CASE WHEN classification = 'newsletter' THEN 1 END) as newsletters,
COUNT(CASE WHEN classification = 'spam' THEN 1 END) as spam_emails,
COUNT(CASE WHEN auto_processed THEN 1 END) as auto_processed,
AVG(confidence_score) as avg_confidence
FROM email_messages
WHERE deleted_at IS NULL
"""
result = execute_query(query)
return result[0] if result else {}
except Exception as e:
logger.error(f"❌ Error getting stats: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ========== Workflow Endpoints ==========
@router.get("/workflows", response_model=List[EmailWorkflow])
async def list_workflows():
"""Get all email workflows"""
try:
query = """
SELECT * FROM email_workflows
ORDER BY priority ASC, name ASC
"""
result = execute_query(query)
return result
except Exception as e:
logger.error(f"❌ Error listing workflows: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/workflows/{workflow_id}", response_model=EmailWorkflow)
async def get_workflow(workflow_id: int):
"""Get specific workflow by ID"""
try:
query = "SELECT * FROM email_workflows WHERE id = %s"
result = execute_query(query, (workflow_id,))
if not result:
raise HTTPException(status_code=404, detail="Workflow not found")
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error getting workflow: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/workflows", response_model=EmailWorkflow)
async def create_workflow(workflow: EmailWorkflow):
"""Create new email workflow"""
try:
import json
query = """
INSERT INTO email_workflows
(name, description, classification_trigger, sender_pattern, subject_pattern,
confidence_threshold, workflow_steps, priority, enabled, stop_on_match, created_by_user_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 1)
RETURNING *
"""
result = execute_query(query, (
workflow.name,
workflow.description,
workflow.classification_trigger,
workflow.sender_pattern,
workflow.subject_pattern,
workflow.confidence_threshold,
json.dumps(workflow.workflow_steps),
workflow.priority,
workflow.enabled,
workflow.stop_on_match
))
if result:
logger.info(f"✅ Created workflow: {workflow.name}")
return result
else:
raise HTTPException(status_code=500, detail="Failed to create workflow")
except Exception as e:
logger.error(f"❌ Error creating workflow: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.put("/workflows/{workflow_id}", response_model=EmailWorkflow)
async def update_workflow(workflow_id: int, workflow: EmailWorkflow):
"""Update existing email workflow"""
try:
import json
query = """
UPDATE email_workflows
SET name = %s,
description = %s,
classification_trigger = %s,
sender_pattern = %s,
subject_pattern = %s,
confidence_threshold = %s,
workflow_steps = %s,
priority = %s,
enabled = %s,
stop_on_match = %s
WHERE id = %s
RETURNING *
"""
result = execute_query(query, (
workflow.name,
workflow.description,
workflow.classification_trigger,
workflow.sender_pattern,
workflow.subject_pattern,
workflow.confidence_threshold,
json.dumps(workflow.workflow_steps),
workflow.priority,
workflow.enabled,
workflow.stop_on_match,
workflow_id
))
if result:
logger.info(f"✅ Updated workflow {workflow_id}")
return result
else:
raise HTTPException(status_code=404, detail="Workflow not found")
except Exception as e:
logger.error(f"❌ Error updating workflow: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/workflows/{workflow_id}")
async def delete_workflow(workflow_id: int):
"""Delete email workflow"""
try:
query = "DELETE FROM email_workflows WHERE id = %s"
execute_update(query, (workflow_id,))
logger.info(f"🗑️ Deleted workflow {workflow_id}")
return {"success": True, "message": f"Workflow {workflow_id} deleted"}
except Exception as e:
logger.error(f"❌ Error deleting workflow: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/workflows/{workflow_id}/toggle")
async def toggle_workflow(workflow_id: int):
"""Toggle workflow enabled status"""
try:
query = """
UPDATE email_workflows
SET enabled = NOT enabled
WHERE id = %s
RETURNING enabled
"""
result = execute_query(query, (workflow_id,))
if not result:
raise HTTPException(status_code=404, detail="Workflow not found")
status = "enabled" if result['enabled'] else "disabled"
logger.info(f"🔄 Workflow {workflow_id} {status}")
return {
"success": True,
"workflow_id": workflow_id,
"enabled": result['enabled']
}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error toggling workflow: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/emails/{email_id}/execute-workflows")
async def execute_workflows_for_email(email_id: int):
"""Manually trigger workflow execution for an email"""
try:
# Get email data
query = """
SELECT id, message_id, subject, sender_email, sender_name, body_text,
classification, confidence_score, status
FROM email_messages
WHERE id = %s AND deleted_at IS NULL
"""
email_result = execute_query(query, (email_id,))
if not email_result:
raise HTTPException(status_code=404, detail="Email not found")
email_data = email_result[0] # Get first row as dict
# Execute workflows
result = await email_workflow_service.execute_workflows(email_data)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error executing workflows: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/workflow-executions", response_model=List[WorkflowExecution])
async def list_workflow_executions(
workflow_id: Optional[int] = Query(None),
email_id: Optional[int] = Query(None),
status: Optional[str] = Query(None),
limit: int = Query(50, le=500)
):
"""Get workflow execution history"""
try:
where_clauses = []
params = []
if workflow_id:
where_clauses.append("workflow_id = %s")
params.append(workflow_id)
if email_id:
where_clauses.append("email_id = %s")
params.append(email_id)
if status:
where_clauses.append("status = %s")
params.append(status)
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
query = f"""
SELECT * FROM email_workflow_executions
WHERE {where_sql}
ORDER BY started_at DESC
LIMIT %s
"""
params.append(limit)
result = execute_query(query, tuple(params))
return result
except Exception as e:
logger.error(f"❌ Error listing workflow executions: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/workflow-actions", response_model=List[WorkflowAction])
async def list_workflow_actions():
"""Get all available workflow actions"""
try:
query = """
SELECT * FROM email_workflow_actions
WHERE enabled = true
ORDER BY category, name
"""
result = execute_query(query)
return result
except Exception as e:
logger.error(f"❌ Error listing workflow actions: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/workflows/stats/summary")
async def get_workflow_stats():
"""Get workflow execution statistics"""
try:
query = """
SELECT * FROM v_workflow_stats
"""
result = execute_query(query)
return result
except Exception as e:
logger.error(f"❌ Error getting workflow stats: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ========== Email Activity Log Endpoints ==========
class EmailActivityLog(BaseModel):
id: int
email_id: int
event_type: str
event_category: str
description: str
metadata: Optional[dict]
user_id: Optional[int]
user_name: Optional[str]
created_at: datetime
created_by: str
@router.get("/emails/{email_id}/activity", response_model=List[EmailActivityLog])
async def get_email_activity_log(email_id: int, limit: int = Query(default=100, le=500)):
"""Get complete activity log for an email"""
try:
query = """
SELECT
eal.id,
eal.email_id,
eal.event_type,
eal.event_category,
eal.description,
eal.metadata,
eal.user_id,
u.username as user_name,
eal.created_at,
eal.created_by
FROM email_activity_log eal
LEFT JOIN users u ON eal.user_id = u.user_id
WHERE eal.email_id = %s
ORDER BY eal.created_at DESC
LIMIT %s
"""
result = execute_query(query, (email_id, limit))
return result
except Exception as e:
logger.error(f"❌ Error getting email activity log: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/emails/activity/recent", response_model=List[EmailActivityLog])
async def get_recent_activity(
limit: int = Query(default=50, le=200),
event_type: Optional[str] = None,
event_category: Optional[str] = None
):
"""Get recent email activity across all emails"""
try:
conditions = []
params = []
if event_type:
conditions.append("eal.event_type = %s")
params.append(event_type)
if event_category:
conditions.append("eal.event_category = %s")
params.append(event_category)
where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
params.append(limit)
query = f"""
SELECT
eal.id,
eal.email_id,
eal.event_type,
eal.event_category,
eal.description,
eal.metadata,
eal.user_id,
u.username as user_name,
eal.created_at,
eal.created_by
FROM email_activity_log eal
LEFT JOIN users u ON eal.user_id = u.user_id
{where_clause}
ORDER BY eal.created_at DESC
LIMIT %s
"""
result = execute_query(query, tuple(params))
return result
except Exception as e:
logger.error(f"❌ Error getting recent activity: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/emails/activity/stats")
async def get_activity_stats():
"""Get activity statistics"""
try:
query = """
SELECT
event_type,
event_category,
COUNT(*) as count,
MAX(created_at) as last_occurrence
FROM email_activity_log
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY event_type, event_category
ORDER BY count DESC
"""
result = execute_query(query)
return result
except Exception as e:
logger.error(f"❌ Error getting activity stats: {e}")
raise HTTPException(status_code=500, detail=str(e))