bmc_hub/app/emails/backend/router.py

616 lines
20 KiB
Python
Raw Normal View History

"""
Email Management Router
API endpoints for email viewing, classification, and rule management
"""
import logging
from fastapi import APIRouter, HTTPException, Query
from typing import List, Optional
from pydantic import BaseModel
from datetime import datetime, date
from app.core.database import execute_query, execute_insert, execute_update
from app.services.email_processor_service import EmailProcessorService
logger = logging.getLogger(__name__)
router = APIRouter()
# Pydantic Models
class EmailListItem(BaseModel):
id: int
message_id: str
subject: str
sender_email: str
sender_name: Optional[str]
received_date: datetime
classification: Optional[str]
confidence_score: Optional[float]
status: str
is_read: bool
has_attachments: bool
attachment_count: int
rule_name: Optional[str] = None
supplier_name: Optional[str] = None
customer_name: Optional[str] = None
class EmailAttachment(BaseModel):
id: int
email_id: int
filename: str
content_type: Optional[str]
size_bytes: Optional[int]
file_path: Optional[str]
created_at: datetime
class EmailDetail(BaseModel):
id: int
message_id: str
subject: str
sender_email: str
sender_name: Optional[str]
recipient_email: Optional[str]
cc: Optional[str]
body_text: Optional[str]
body_html: Optional[str]
received_date: datetime
folder: str
classification: Optional[str]
confidence_score: Optional[float]
status: str
is_read: bool
has_attachments: bool
attachment_count: int
rule_id: Optional[int]
supplier_id: Optional[int]
customer_id: Optional[int]
linked_case_id: Optional[int]
extracted_invoice_number: Optional[str]
extracted_amount: Optional[float]
extracted_due_date: Optional[date]
auto_processed: bool
created_at: datetime
updated_at: datetime
attachments: List[EmailAttachment] = []
class EmailRule(BaseModel):
id: Optional[int] = None
name: str
description: Optional[str]
conditions: dict
action_type: str
action_params: Optional[dict] = {}
priority: int = 100
enabled: bool = True
match_count: int = 0
last_matched_at: Optional[datetime]
class ProcessingStats(BaseModel):
status: str
fetched: int = 0
saved: int = 0
classified: int = 0
rules_matched: int = 0
errors: int = 0
# Email Endpoints
@router.get("/emails", response_model=List[EmailListItem])
async def list_emails(
status: Optional[str] = Query(None),
classification: Optional[str] = Query(None),
limit: int = Query(50, le=500),
offset: int = Query(0, ge=0)
):
"""Get list of emails with filtering"""
try:
where_clauses = ["em.deleted_at IS NULL"]
params = []
if status:
where_clauses.append("em.status = %s")
params.append(status)
if classification:
where_clauses.append("em.classification = %s")
params.append(classification)
where_sql = " AND ".join(where_clauses)
query = f"""
SELECT
em.id, em.message_id, em.subject, em.sender_email, em.sender_name,
em.received_date, em.classification, em.confidence_score, em.status,
em.is_read, em.has_attachments, em.attachment_count,
er.name as rule_name,
v.name as supplier_name,
NULL as customer_name
FROM email_messages em
LEFT JOIN email_rules er ON em.rule_id = er.id
LEFT JOIN vendors v ON em.supplier_id = v.id
WHERE {where_sql}
ORDER BY em.received_date DESC
LIMIT %s OFFSET %s
"""
params.extend([limit, offset])
result = execute_query(query, tuple(params))
return result
except Exception as e:
logger.error(f"❌ Error listing emails: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/emails/{email_id}", response_model=EmailDetail)
async def get_email(email_id: int):
"""Get email detail by ID"""
try:
query = """
SELECT * FROM email_messages
WHERE id = %s AND deleted_at IS NULL
"""
result = execute_query(query, (email_id,))
logger.info(f"🔍 Query result type: {type(result)}, length: {len(result) if result else 0}")
if not result:
raise HTTPException(status_code=404, detail="Email not found")
# Store email before update
email_data = result[0]
# Get attachments
att_query = "SELECT * FROM email_attachments WHERE email_id = %s ORDER BY id"
attachments = execute_query(att_query, (email_id,))
email_data['attachments'] = attachments or []
# Mark as read
update_query = "UPDATE email_messages SET is_read = true WHERE id = %s"
execute_update(update_query, (email_id,))
return email_data
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error getting email {email_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/emails/{email_id}/attachments/{attachment_id}")
async def download_attachment(email_id: int, attachment_id: int):
"""Download email attachment"""
from fastapi.responses import FileResponse
import os
try:
query = """
SELECT a.* FROM email_attachments a
JOIN email_messages e ON e.id = a.email_id
WHERE a.id = %s AND a.email_id = %s AND e.deleted_at IS NULL
"""
result = execute_query(query, (attachment_id, email_id))
if not result:
raise HTTPException(status_code=404, detail="Attachment not found")
attachment = result[0]
file_path = attachment['file_path']
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found on disk")
return FileResponse(
path=file_path,
filename=attachment['filename'],
media_type=attachment.get('content_type', 'application/octet-stream')
)
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error downloading attachment {attachment_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.put("/emails/{email_id}")
async def update_email(email_id: int, status: Optional[str] = None):
"""Update email (archive, mark as read, etc)"""
try:
# Build update fields dynamically
updates = []
params = []
if status:
updates.append("status = %s")
params.append(status)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
params.append(email_id)
query = f"UPDATE email_messages SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s"
execute_update(query, tuple(params))
logger.info(f"✅ Updated email {email_id}: status={status}")
return {"success": True, "message": "Email updated"}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error updating email {email_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/emails/{email_id}")
async def delete_email(email_id: int):
"""Soft delete email"""
try:
query = """
UPDATE email_messages
SET deleted_at = CURRENT_TIMESTAMP
WHERE id = %s AND deleted_at IS NULL
"""
execute_update(query, (email_id,))
logger.info(f"🗑️ Deleted email {email_id}")
return {"success": True, "message": "Email deleted"}
except Exception as e:
logger.error(f"❌ Error deleting email {email_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/emails/{email_id}/reprocess")
async def reprocess_email(email_id: int):
"""Reprocess email (re-classify and apply rules)"""
try:
# Get email
query = "SELECT * FROM email_messages WHERE id = %s AND deleted_at IS NULL"
result = execute_query(query, (email_id,))
if not result:
raise HTTPException(status_code=404, detail="Email not found")
email = result[0]
# Re-classify
processor = EmailProcessorService()
classification, confidence = await processor.classify_email(
email['subject'],
email['body_text'] or email['body_html']
)
# Update classification
update_query = """
UPDATE email_messages
SET classification = %s,
confidence_score = %s,
classification_date = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
"""
execute_update(update_query, (classification, confidence, email_id))
logger.info(f"🔄 Reprocessed email {email_id}: {classification} ({confidence:.2f})")
return {
"success": True,
"message": "Email reprocessed",
"classification": classification,
"confidence": confidence
}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error reprocessing email {email_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/emails/process")
async def process_emails():
"""Manually trigger email processing"""
try:
processor = EmailProcessorService()
stats = await processor.process_inbox()
return {
"success": True,
"message": "Email processing completed",
"stats": stats
}
except Exception as e:
logger.error(f"❌ Email processing failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/emails/bulk/archive")
async def bulk_archive(email_ids: List[int]):
"""Bulk archive emails"""
try:
if not email_ids:
raise HTTPException(status_code=400, detail="No email IDs provided")
placeholders = ','.join(['%s'] * len(email_ids))
query = f"""
UPDATE email_messages
SET status = 'archived', updated_at = CURRENT_TIMESTAMP
WHERE id IN ({placeholders}) AND deleted_at IS NULL
"""
execute_update(query, tuple(email_ids))
logger.info(f"📦 Bulk archived {len(email_ids)} emails")
return {"success": True, "message": f"{len(email_ids)} emails archived"}
except Exception as e:
logger.error(f"❌ Error bulk archiving: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/emails/bulk/reprocess")
async def bulk_reprocess(email_ids: List[int]):
"""Bulk reprocess emails"""
try:
if not email_ids:
raise HTTPException(status_code=400, detail="No email IDs provided")
processor = EmailProcessorService()
success_count = 0
for email_id in email_ids:
try:
# Get email
query = "SELECT * FROM email_messages WHERE id = %s AND deleted_at IS NULL"
result = execute_query(query, (email_id,))
if result:
email = result[0]
classification, confidence = await processor.classify_email(
email['subject'],
email['body_text'] or email['body_html']
)
update_query = """
UPDATE email_messages
SET classification = %s, confidence_score = %s,
classification_date = CURRENT_TIMESTAMP
WHERE id = %s
"""
execute_update(update_query, (classification, confidence, email_id))
success_count += 1
except Exception as e:
logger.error(f"Failed to reprocess email {email_id}: {e}")
logger.info(f"🔄 Bulk reprocessed {success_count}/{len(email_ids)} emails")
return {"success": True, "message": f"{success_count} emails reprocessed"}
except Exception as e:
logger.error(f"❌ Error bulk reprocessing: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/emails/bulk/delete")
async def bulk_delete(email_ids: List[int]):
"""Bulk soft delete emails"""
try:
if not email_ids:
raise HTTPException(status_code=400, detail="No email IDs provided")
placeholders = ','.join(['%s'] * len(email_ids))
query = f"""
UPDATE email_messages
SET deleted_at = CURRENT_TIMESTAMP
WHERE id IN ({placeholders}) AND deleted_at IS NULL
"""
execute_update(query, tuple(email_ids))
logger.info(f"🗑️ Bulk deleted {len(email_ids)} emails")
return {"success": True, "message": f"{len(email_ids)} emails deleted"}
except Exception as e:
logger.error(f"❌ Error bulk deleting: {e}")
raise HTTPException(status_code=500, detail=str(e))
class ClassificationUpdate(BaseModel):
classification: str
confidence: Optional[float] = None
@router.put("/emails/{email_id}/classify")
async def update_classification(email_id: int, data: ClassificationUpdate):
"""Manually update email classification"""
try:
valid_classifications = [
'invoice', 'freight_note', 'order_confirmation', 'time_confirmation',
'case_notification', 'customer_email', 'bankruptcy', 'general', 'spam', 'unknown'
]
if data.classification not in valid_classifications:
raise HTTPException(status_code=400, detail=f"Invalid classification. Must be one of: {valid_classifications}")
confidence = data.confidence if data.confidence is not None else 1.0
query = """
UPDATE email_messages
SET classification = %s,
confidence_score = %s,
classification_date = CURRENT_TIMESTAMP
WHERE id = %s AND deleted_at IS NULL
"""
execute_update(query, (data.classification, confidence, email_id))
logger.info(f"✏️ Manual classification: Email {email_id}{data.classification}")
return {
"success": True,
"message": f"Email {email_id} classified as '{data.classification}'"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error updating classification: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/emails/{email_id}")
async def delete_email(email_id: int):
"""Soft delete email"""
try:
query = """
UPDATE email_messages
SET deleted_at = CURRENT_TIMESTAMP
WHERE id = %s
"""
execute_query(query, (email_id,))
return {
"success": True,
"message": f"Email {email_id} deleted"
}
except Exception as e:
logger.error(f"❌ Error deleting email: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Email Rules Endpoints
@router.get("/email-rules", response_model=List[EmailRule])
async def list_rules():
"""Get all email rules"""
try:
query = """
SELECT * FROM email_rules
ORDER BY priority ASC, name ASC
"""
result = execute_query(query)
return result
except Exception as e:
logger.error(f"❌ Error listing rules: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/email-rules", response_model=EmailRule)
async def create_rule(rule: EmailRule):
"""Create new email rule"""
try:
query = """
INSERT INTO email_rules
(name, description, conditions, action_type, action_params, priority, enabled, created_by_user_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, 1)
RETURNING *
"""
import json
result = execute_query(query, (
rule.name,
rule.description,
json.dumps(rule.conditions),
rule.action_type,
json.dumps(rule.action_params or {}),
rule.priority,
rule.enabled
))
if result:
logger.info(f"✅ Created email rule: {rule.name}")
return result[0]
else:
raise HTTPException(status_code=500, detail="Failed to create rule")
except Exception as e:
logger.error(f"❌ Error creating rule: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.put("/email-rules/{rule_id}", response_model=EmailRule)
async def update_rule(rule_id: int, rule: EmailRule):
"""Update existing email rule"""
try:
import json
query = """
UPDATE email_rules
SET name = %s,
description = %s,
conditions = %s,
action_type = %s,
action_params = %s,
priority = %s,
enabled = %s
WHERE id = %s
RETURNING *
"""
result = execute_query(query, (
rule.name,
rule.description,
json.dumps(rule.conditions),
rule.action_type,
json.dumps(rule.action_params or {}),
rule.priority,
rule.enabled,
rule_id
))
if result:
logger.info(f"✅ Updated email rule {rule_id}")
return result[0]
else:
raise HTTPException(status_code=404, detail="Rule not found")
except Exception as e:
logger.error(f"❌ Error updating rule: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/email-rules/{rule_id}")
async def delete_rule(rule_id: int):
"""Delete email rule"""
try:
query = "DELETE FROM email_rules WHERE id = %s"
execute_query(query, (rule_id,))
return {
"success": True,
"message": f"Rule {rule_id} deleted"
}
except Exception as e:
logger.error(f"❌ Error deleting rule: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Statistics Endpoint
@router.get("/emails/stats/summary")
async def get_email_stats():
"""Get email processing statistics"""
try:
query = """
SELECT
COUNT(*) as total_emails,
COUNT(CASE WHEN status = 'new' THEN 1 END) as new_emails,
COUNT(CASE WHEN status = 'processed' THEN 1 END) as processed_emails,
COUNT(CASE WHEN classification = 'invoice' THEN 1 END) as invoices,
COUNT(CASE WHEN classification = 'time_confirmation' THEN 1 END) as time_confirmations,
COUNT(CASE WHEN classification = 'spam' THEN 1 END) as spam_emails,
COUNT(CASE WHEN auto_processed THEN 1 END) as auto_processed,
AVG(confidence_score) as avg_confidence
FROM email_messages
WHERE deleted_at IS NULL
"""
result = execute_query(query)
return result[0] if result else {}
except Exception as e:
logger.error(f"❌ Error getting stats: {e}")
raise HTTPException(status_code=500, detail=str(e))