- Added FastAPI router for serving email management UI at /emails - Created Jinja2 template for the email frontend - Developed SimpleEmailClassifier for keyword-based email classification - Documented email UI implementation details, features, and API integration in EMAIL_UI_IMPLEMENTATION.md
616 lines
20 KiB
Python
616 lines
20 KiB
Python
"""
|
|
Email Management Router
|
|
API endpoints for email viewing, classification, and rule management
|
|
"""
|
|
|
|
import logging
|
|
from fastapi import APIRouter, HTTPException, Query
|
|
from typing import List, Optional
|
|
from pydantic import BaseModel
|
|
from datetime import datetime, date
|
|
|
|
from app.core.database import execute_query, execute_insert, execute_update
|
|
from app.services.email_processor_service import EmailProcessorService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# Pydantic Models
|
|
class EmailListItem(BaseModel):
|
|
id: int
|
|
message_id: str
|
|
subject: str
|
|
sender_email: str
|
|
sender_name: Optional[str]
|
|
received_date: datetime
|
|
classification: Optional[str]
|
|
confidence_score: Optional[float]
|
|
status: str
|
|
is_read: bool
|
|
has_attachments: bool
|
|
attachment_count: int
|
|
rule_name: Optional[str] = None
|
|
supplier_name: Optional[str] = None
|
|
customer_name: Optional[str] = None
|
|
|
|
|
|
class EmailAttachment(BaseModel):
|
|
id: int
|
|
email_id: int
|
|
filename: str
|
|
content_type: Optional[str]
|
|
size_bytes: Optional[int]
|
|
file_path: Optional[str]
|
|
created_at: datetime
|
|
|
|
|
|
class EmailDetail(BaseModel):
|
|
id: int
|
|
message_id: str
|
|
subject: str
|
|
sender_email: str
|
|
sender_name: Optional[str]
|
|
recipient_email: Optional[str]
|
|
cc: Optional[str]
|
|
body_text: Optional[str]
|
|
body_html: Optional[str]
|
|
received_date: datetime
|
|
folder: str
|
|
classification: Optional[str]
|
|
confidence_score: Optional[float]
|
|
status: str
|
|
is_read: bool
|
|
has_attachments: bool
|
|
attachment_count: int
|
|
rule_id: Optional[int]
|
|
supplier_id: Optional[int]
|
|
customer_id: Optional[int]
|
|
linked_case_id: Optional[int]
|
|
extracted_invoice_number: Optional[str]
|
|
extracted_amount: Optional[float]
|
|
extracted_due_date: Optional[date]
|
|
auto_processed: bool
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
attachments: List[EmailAttachment] = []
|
|
|
|
|
|
class EmailRule(BaseModel):
|
|
id: Optional[int] = None
|
|
name: str
|
|
description: Optional[str]
|
|
conditions: dict
|
|
action_type: str
|
|
action_params: Optional[dict] = {}
|
|
priority: int = 100
|
|
enabled: bool = True
|
|
match_count: int = 0
|
|
last_matched_at: Optional[datetime]
|
|
|
|
|
|
class ProcessingStats(BaseModel):
|
|
status: str
|
|
fetched: int = 0
|
|
saved: int = 0
|
|
classified: int = 0
|
|
rules_matched: int = 0
|
|
errors: int = 0
|
|
|
|
|
|
# Email Endpoints
|
|
@router.get("/emails", response_model=List[EmailListItem])
|
|
async def list_emails(
|
|
status: Optional[str] = Query(None),
|
|
classification: Optional[str] = Query(None),
|
|
limit: int = Query(50, le=500),
|
|
offset: int = Query(0, ge=0)
|
|
):
|
|
"""Get list of emails with filtering"""
|
|
try:
|
|
where_clauses = ["em.deleted_at IS NULL"]
|
|
params = []
|
|
|
|
if status:
|
|
where_clauses.append("em.status = %s")
|
|
params.append(status)
|
|
|
|
if classification:
|
|
where_clauses.append("em.classification = %s")
|
|
params.append(classification)
|
|
|
|
where_sql = " AND ".join(where_clauses)
|
|
|
|
query = f"""
|
|
SELECT
|
|
em.id, em.message_id, em.subject, em.sender_email, em.sender_name,
|
|
em.received_date, em.classification, em.confidence_score, em.status,
|
|
em.is_read, em.has_attachments, em.attachment_count,
|
|
er.name as rule_name,
|
|
v.name as supplier_name,
|
|
NULL as customer_name
|
|
FROM email_messages em
|
|
LEFT JOIN email_rules er ON em.rule_id = er.id
|
|
LEFT JOIN vendors v ON em.supplier_id = v.id
|
|
WHERE {where_sql}
|
|
ORDER BY em.received_date DESC
|
|
LIMIT %s OFFSET %s
|
|
"""
|
|
|
|
params.extend([limit, offset])
|
|
result = execute_query(query, tuple(params))
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error listing emails: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/emails/{email_id}", response_model=EmailDetail)
|
|
async def get_email(email_id: int):
|
|
"""Get email detail by ID"""
|
|
try:
|
|
query = """
|
|
SELECT * FROM email_messages
|
|
WHERE id = %s AND deleted_at IS NULL
|
|
"""
|
|
result = execute_query(query, (email_id,))
|
|
logger.info(f"🔍 Query result type: {type(result)}, length: {len(result) if result else 0}")
|
|
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Email not found")
|
|
|
|
# Store email before update
|
|
email_data = result[0]
|
|
|
|
# Get attachments
|
|
att_query = "SELECT * FROM email_attachments WHERE email_id = %s ORDER BY id"
|
|
attachments = execute_query(att_query, (email_id,))
|
|
email_data['attachments'] = attachments or []
|
|
|
|
# Mark as read
|
|
update_query = "UPDATE email_messages SET is_read = true WHERE id = %s"
|
|
execute_update(update_query, (email_id,))
|
|
|
|
return email_data
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error getting email {email_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/emails/{email_id}/attachments/{attachment_id}")
|
|
async def download_attachment(email_id: int, attachment_id: int):
|
|
"""Download email attachment"""
|
|
from fastapi.responses import FileResponse
|
|
import os
|
|
|
|
try:
|
|
query = """
|
|
SELECT a.* FROM email_attachments a
|
|
JOIN email_messages e ON e.id = a.email_id
|
|
WHERE a.id = %s AND a.email_id = %s AND e.deleted_at IS NULL
|
|
"""
|
|
result = execute_query(query, (attachment_id, email_id))
|
|
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Attachment not found")
|
|
|
|
attachment = result[0]
|
|
file_path = attachment['file_path']
|
|
|
|
if not os.path.exists(file_path):
|
|
raise HTTPException(status_code=404, detail="File not found on disk")
|
|
|
|
return FileResponse(
|
|
path=file_path,
|
|
filename=attachment['filename'],
|
|
media_type=attachment.get('content_type', 'application/octet-stream')
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error downloading attachment {attachment_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.put("/emails/{email_id}")
|
|
async def update_email(email_id: int, status: Optional[str] = None):
|
|
"""Update email (archive, mark as read, etc)"""
|
|
try:
|
|
# Build update fields dynamically
|
|
updates = []
|
|
params = []
|
|
|
|
if status:
|
|
updates.append("status = %s")
|
|
params.append(status)
|
|
|
|
if not updates:
|
|
raise HTTPException(status_code=400, detail="No fields to update")
|
|
|
|
params.append(email_id)
|
|
query = f"UPDATE email_messages SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s"
|
|
execute_update(query, tuple(params))
|
|
|
|
logger.info(f"✅ Updated email {email_id}: status={status}")
|
|
return {"success": True, "message": "Email updated"}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error updating email {email_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.delete("/emails/{email_id}")
|
|
async def delete_email(email_id: int):
|
|
"""Soft delete email"""
|
|
try:
|
|
query = """
|
|
UPDATE email_messages
|
|
SET deleted_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s AND deleted_at IS NULL
|
|
"""
|
|
execute_update(query, (email_id,))
|
|
|
|
logger.info(f"🗑️ Deleted email {email_id}")
|
|
return {"success": True, "message": "Email deleted"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error deleting email {email_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/emails/{email_id}/reprocess")
|
|
async def reprocess_email(email_id: int):
|
|
"""Reprocess email (re-classify and apply rules)"""
|
|
try:
|
|
# Get email
|
|
query = "SELECT * FROM email_messages WHERE id = %s AND deleted_at IS NULL"
|
|
result = execute_query(query, (email_id,))
|
|
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Email not found")
|
|
|
|
email = result[0]
|
|
|
|
# Re-classify
|
|
processor = EmailProcessorService()
|
|
classification, confidence = await processor.classify_email(
|
|
email['subject'],
|
|
email['body_text'] or email['body_html']
|
|
)
|
|
|
|
# Update classification
|
|
update_query = """
|
|
UPDATE email_messages
|
|
SET classification = %s,
|
|
confidence_score = %s,
|
|
classification_date = CURRENT_TIMESTAMP,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
"""
|
|
execute_update(update_query, (classification, confidence, email_id))
|
|
|
|
logger.info(f"🔄 Reprocessed email {email_id}: {classification} ({confidence:.2f})")
|
|
return {
|
|
"success": True,
|
|
"message": "Email reprocessed",
|
|
"classification": classification,
|
|
"confidence": confidence
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error reprocessing email {email_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/emails/process")
|
|
async def process_emails():
|
|
"""Manually trigger email processing"""
|
|
try:
|
|
processor = EmailProcessorService()
|
|
stats = await processor.process_inbox()
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "Email processing completed",
|
|
"stats": stats
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Email processing failed: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/emails/bulk/archive")
|
|
async def bulk_archive(email_ids: List[int]):
|
|
"""Bulk archive emails"""
|
|
try:
|
|
if not email_ids:
|
|
raise HTTPException(status_code=400, detail="No email IDs provided")
|
|
|
|
placeholders = ','.join(['%s'] * len(email_ids))
|
|
query = f"""
|
|
UPDATE email_messages
|
|
SET status = 'archived', updated_at = CURRENT_TIMESTAMP
|
|
WHERE id IN ({placeholders}) AND deleted_at IS NULL
|
|
"""
|
|
execute_update(query, tuple(email_ids))
|
|
|
|
logger.info(f"📦 Bulk archived {len(email_ids)} emails")
|
|
return {"success": True, "message": f"{len(email_ids)} emails archived"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error bulk archiving: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/emails/bulk/reprocess")
|
|
async def bulk_reprocess(email_ids: List[int]):
|
|
"""Bulk reprocess emails"""
|
|
try:
|
|
if not email_ids:
|
|
raise HTTPException(status_code=400, detail="No email IDs provided")
|
|
|
|
processor = EmailProcessorService()
|
|
success_count = 0
|
|
|
|
for email_id in email_ids:
|
|
try:
|
|
# Get email
|
|
query = "SELECT * FROM email_messages WHERE id = %s AND deleted_at IS NULL"
|
|
result = execute_query(query, (email_id,))
|
|
|
|
if result:
|
|
email = result[0]
|
|
classification, confidence = await processor.classify_email(
|
|
email['subject'],
|
|
email['body_text'] or email['body_html']
|
|
)
|
|
|
|
update_query = """
|
|
UPDATE email_messages
|
|
SET classification = %s, confidence_score = %s,
|
|
classification_date = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
"""
|
|
execute_update(update_query, (classification, confidence, email_id))
|
|
success_count += 1
|
|
except Exception as e:
|
|
logger.error(f"Failed to reprocess email {email_id}: {e}")
|
|
|
|
logger.info(f"🔄 Bulk reprocessed {success_count}/{len(email_ids)} emails")
|
|
return {"success": True, "message": f"{success_count} emails reprocessed"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error bulk reprocessing: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/emails/bulk/delete")
|
|
async def bulk_delete(email_ids: List[int]):
|
|
"""Bulk soft delete emails"""
|
|
try:
|
|
if not email_ids:
|
|
raise HTTPException(status_code=400, detail="No email IDs provided")
|
|
|
|
placeholders = ','.join(['%s'] * len(email_ids))
|
|
query = f"""
|
|
UPDATE email_messages
|
|
SET deleted_at = CURRENT_TIMESTAMP
|
|
WHERE id IN ({placeholders}) AND deleted_at IS NULL
|
|
"""
|
|
execute_update(query, tuple(email_ids))
|
|
|
|
logger.info(f"🗑️ Bulk deleted {len(email_ids)} emails")
|
|
return {"success": True, "message": f"{len(email_ids)} emails deleted"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error bulk deleting: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
class ClassificationUpdate(BaseModel):
|
|
classification: str
|
|
confidence: Optional[float] = None
|
|
|
|
|
|
@router.put("/emails/{email_id}/classify")
|
|
async def update_classification(email_id: int, data: ClassificationUpdate):
|
|
"""Manually update email classification"""
|
|
try:
|
|
valid_classifications = [
|
|
'invoice', 'freight_note', 'order_confirmation', 'time_confirmation',
|
|
'case_notification', 'customer_email', 'bankruptcy', 'general', 'spam', 'unknown'
|
|
]
|
|
|
|
if data.classification not in valid_classifications:
|
|
raise HTTPException(status_code=400, detail=f"Invalid classification. Must be one of: {valid_classifications}")
|
|
|
|
confidence = data.confidence if data.confidence is not None else 1.0
|
|
|
|
query = """
|
|
UPDATE email_messages
|
|
SET classification = %s,
|
|
confidence_score = %s,
|
|
classification_date = CURRENT_TIMESTAMP
|
|
WHERE id = %s AND deleted_at IS NULL
|
|
"""
|
|
execute_update(query, (data.classification, confidence, email_id))
|
|
|
|
logger.info(f"✏️ Manual classification: Email {email_id} → {data.classification}")
|
|
return {
|
|
"success": True,
|
|
"message": f"Email {email_id} classified as '{data.classification}'"
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error updating classification: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.delete("/emails/{email_id}")
|
|
async def delete_email(email_id: int):
|
|
"""Soft delete email"""
|
|
try:
|
|
query = """
|
|
UPDATE email_messages
|
|
SET deleted_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
"""
|
|
execute_query(query, (email_id,))
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Email {email_id} deleted"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error deleting email: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# Email Rules Endpoints
|
|
@router.get("/email-rules", response_model=List[EmailRule])
|
|
async def list_rules():
|
|
"""Get all email rules"""
|
|
try:
|
|
query = """
|
|
SELECT * FROM email_rules
|
|
ORDER BY priority ASC, name ASC
|
|
"""
|
|
result = execute_query(query)
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error listing rules: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/email-rules", response_model=EmailRule)
|
|
async def create_rule(rule: EmailRule):
|
|
"""Create new email rule"""
|
|
try:
|
|
query = """
|
|
INSERT INTO email_rules
|
|
(name, description, conditions, action_type, action_params, priority, enabled, created_by_user_id)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, 1)
|
|
RETURNING *
|
|
"""
|
|
|
|
import json
|
|
result = execute_query(query, (
|
|
rule.name,
|
|
rule.description,
|
|
json.dumps(rule.conditions),
|
|
rule.action_type,
|
|
json.dumps(rule.action_params or {}),
|
|
rule.priority,
|
|
rule.enabled
|
|
))
|
|
|
|
if result:
|
|
logger.info(f"✅ Created email rule: {rule.name}")
|
|
return result[0]
|
|
else:
|
|
raise HTTPException(status_code=500, detail="Failed to create rule")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error creating rule: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.put("/email-rules/{rule_id}", response_model=EmailRule)
|
|
async def update_rule(rule_id: int, rule: EmailRule):
|
|
"""Update existing email rule"""
|
|
try:
|
|
import json
|
|
query = """
|
|
UPDATE email_rules
|
|
SET name = %s,
|
|
description = %s,
|
|
conditions = %s,
|
|
action_type = %s,
|
|
action_params = %s,
|
|
priority = %s,
|
|
enabled = %s
|
|
WHERE id = %s
|
|
RETURNING *
|
|
"""
|
|
|
|
result = execute_query(query, (
|
|
rule.name,
|
|
rule.description,
|
|
json.dumps(rule.conditions),
|
|
rule.action_type,
|
|
json.dumps(rule.action_params or {}),
|
|
rule.priority,
|
|
rule.enabled,
|
|
rule_id
|
|
))
|
|
|
|
if result:
|
|
logger.info(f"✅ Updated email rule {rule_id}")
|
|
return result[0]
|
|
else:
|
|
raise HTTPException(status_code=404, detail="Rule not found")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error updating rule: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.delete("/email-rules/{rule_id}")
|
|
async def delete_rule(rule_id: int):
|
|
"""Delete email rule"""
|
|
try:
|
|
query = "DELETE FROM email_rules WHERE id = %s"
|
|
execute_query(query, (rule_id,))
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Rule {rule_id} deleted"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error deleting rule: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# Statistics Endpoint
|
|
@router.get("/emails/stats/summary")
|
|
async def get_email_stats():
|
|
"""Get email processing statistics"""
|
|
try:
|
|
query = """
|
|
SELECT
|
|
COUNT(*) as total_emails,
|
|
COUNT(CASE WHEN status = 'new' THEN 1 END) as new_emails,
|
|
COUNT(CASE WHEN status = 'processed' THEN 1 END) as processed_emails,
|
|
COUNT(CASE WHEN classification = 'invoice' THEN 1 END) as invoices,
|
|
COUNT(CASE WHEN classification = 'time_confirmation' THEN 1 END) as time_confirmations,
|
|
COUNT(CASE WHEN classification = 'spam' THEN 1 END) as spam_emails,
|
|
COUNT(CASE WHEN auto_processed THEN 1 END) as auto_processed,
|
|
AVG(confidence_score) as avg_confidence
|
|
FROM email_messages
|
|
WHERE deleted_at IS NULL
|
|
"""
|
|
|
|
result = execute_query(query)
|
|
return result[0] if result else {}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error getting stats: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|