execute_query_single function does not exist in database module. All calls should use execute_query instead.
1082 lines
34 KiB
Python
1082 lines
34 KiB
Python
"""
|
|
Email Management Router
|
|
API endpoints for email viewing, classification, and rule management
|
|
"""
|
|
|
|
import logging
|
|
from fastapi import APIRouter, HTTPException, Query
|
|
from typing import List, Optional
|
|
from pydantic import BaseModel
|
|
from datetime import datetime, date
|
|
|
|
from app.core.database import execute_query, execute_insert, execute_update
|
|
from app.services.email_processor_service import EmailProcessorService
|
|
from app.services.email_workflow_service import email_workflow_service
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# Pydantic Models
|
|
class EmailListItem(BaseModel):
|
|
id: int
|
|
message_id: str
|
|
subject: str
|
|
sender_email: str
|
|
sender_name: Optional[str]
|
|
received_date: datetime
|
|
classification: Optional[str]
|
|
confidence_score: Optional[float]
|
|
status: str
|
|
is_read: bool
|
|
has_attachments: bool
|
|
attachment_count: int
|
|
rule_name: Optional[str] = None
|
|
supplier_name: Optional[str] = None
|
|
customer_name: Optional[str] = None
|
|
|
|
|
|
class EmailAttachment(BaseModel):
|
|
id: int
|
|
email_id: int
|
|
filename: str
|
|
content_type: Optional[str]
|
|
size_bytes: Optional[int]
|
|
file_path: Optional[str]
|
|
created_at: datetime
|
|
|
|
|
|
class EmailDetail(BaseModel):
|
|
id: int
|
|
message_id: str
|
|
subject: str
|
|
sender_email: str
|
|
sender_name: Optional[str]
|
|
recipient_email: Optional[str]
|
|
cc: Optional[str]
|
|
body_text: Optional[str]
|
|
body_html: Optional[str]
|
|
received_date: datetime
|
|
folder: str
|
|
classification: Optional[str]
|
|
confidence_score: Optional[float]
|
|
status: str
|
|
is_read: bool
|
|
has_attachments: bool
|
|
attachment_count: int
|
|
rule_id: Optional[int]
|
|
supplier_id: Optional[int]
|
|
customer_id: Optional[int]
|
|
linked_case_id: Optional[int]
|
|
extracted_invoice_number: Optional[str]
|
|
extracted_amount: Optional[float]
|
|
extracted_due_date: Optional[date]
|
|
auto_processed: bool
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
attachments: List[EmailAttachment] = []
|
|
|
|
|
|
class EmailRule(BaseModel):
|
|
id: Optional[int] = None
|
|
name: str
|
|
description: Optional[str]
|
|
conditions: dict
|
|
action_type: str
|
|
action_params: Optional[dict] = {}
|
|
priority: int = 100
|
|
enabled: bool = True
|
|
match_count: int = 0
|
|
last_matched_at: Optional[datetime]
|
|
|
|
|
|
class EmailWorkflow(BaseModel):
|
|
id: Optional[int] = None
|
|
name: str
|
|
description: Optional[str]
|
|
classification_trigger: str
|
|
sender_pattern: Optional[str] = None
|
|
subject_pattern: Optional[str] = None
|
|
confidence_threshold: float = 0.70
|
|
workflow_steps: List[dict]
|
|
priority: int = 100
|
|
enabled: bool = True
|
|
stop_on_match: bool = True
|
|
execution_count: int = 0
|
|
success_count: int = 0
|
|
failure_count: int = 0
|
|
last_executed_at: Optional[datetime] = None
|
|
|
|
|
|
class WorkflowExecution(BaseModel):
|
|
id: int
|
|
workflow_id: int
|
|
email_id: int
|
|
status: str
|
|
steps_completed: int
|
|
steps_total: Optional[int]
|
|
result_json: Optional[List[dict]] = None # Can be list of step results
|
|
error_message: Optional[str]
|
|
started_at: datetime
|
|
completed_at: Optional[datetime]
|
|
execution_time_ms: Optional[int]
|
|
|
|
|
|
class WorkflowAction(BaseModel):
|
|
id: int
|
|
action_code: str
|
|
name: str
|
|
description: Optional[str]
|
|
category: Optional[str]
|
|
parameter_schema: Optional[dict]
|
|
example_config: Optional[dict]
|
|
enabled: bool
|
|
|
|
|
|
class ProcessingStats(BaseModel):
|
|
status: str
|
|
fetched: int = 0
|
|
saved: int = 0
|
|
classified: int = 0
|
|
rules_matched: int = 0
|
|
errors: int = 0
|
|
|
|
|
|
# Email Endpoints
|
|
@router.get("/emails", response_model=List[EmailListItem])
|
|
async def list_emails(
|
|
status: Optional[str] = Query(None),
|
|
classification: Optional[str] = Query(None),
|
|
limit: int = Query(50, le=500),
|
|
offset: int = Query(0, ge=0)
|
|
):
|
|
"""Get list of emails with filtering"""
|
|
try:
|
|
where_clauses = ["em.deleted_at IS NULL"]
|
|
params = []
|
|
|
|
if status:
|
|
where_clauses.append("em.status = %s")
|
|
params.append(status)
|
|
|
|
if classification:
|
|
where_clauses.append("em.classification = %s")
|
|
params.append(classification)
|
|
|
|
where_sql = " AND ".join(where_clauses)
|
|
|
|
query = f"""
|
|
SELECT
|
|
em.id, em.message_id, em.subject, em.sender_email, em.sender_name,
|
|
em.received_date, em.classification, em.confidence_score, em.status,
|
|
em.is_read, em.has_attachments, em.attachment_count,
|
|
er.name as rule_name,
|
|
v.name as supplier_name,
|
|
NULL as customer_name
|
|
FROM email_messages em
|
|
LEFT JOIN email_rules er ON em.rule_id = er.id
|
|
LEFT JOIN vendors v ON em.supplier_id = v.id
|
|
WHERE {where_sql}
|
|
ORDER BY em.received_date DESC
|
|
LIMIT %s OFFSET %s
|
|
"""
|
|
|
|
params.extend([limit, offset])
|
|
result = execute_query(query, tuple(params))
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error listing emails: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/emails/{email_id}", response_model=EmailDetail)
|
|
async def get_email(email_id: int):
|
|
"""Get email detail by ID"""
|
|
try:
|
|
query = """
|
|
SELECT * FROM email_messages
|
|
WHERE id = %s AND deleted_at IS NULL
|
|
"""
|
|
result = execute_query(query, (email_id,))
|
|
logger.info(f"🔍 Query result type: {type(result)}, length: {len(result) if result else 0}")
|
|
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Email not found")
|
|
|
|
# Store email before update
|
|
email_data = result[0]
|
|
|
|
# Get attachments
|
|
att_query = "SELECT * FROM email_attachments WHERE email_id = %s ORDER BY id"
|
|
attachments = execute_query(att_query, (email_id,))
|
|
email_data['attachments'] = attachments or []
|
|
|
|
# Mark as read
|
|
update_query = "UPDATE email_messages SET is_read = true WHERE id = %s"
|
|
execute_update(update_query, (email_id,))
|
|
|
|
return email_data
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error getting email {email_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/emails/{email_id}/mark-processed")
|
|
async def mark_email_processed(email_id: int):
|
|
"""Mark email as processed and move to 'Processed' folder"""
|
|
try:
|
|
# Update email status and folder
|
|
update_query = """
|
|
UPDATE email_messages
|
|
SET status = 'processed',
|
|
folder = 'Processed',
|
|
processed_at = CURRENT_TIMESTAMP,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s AND deleted_at IS NULL
|
|
RETURNING id, folder, status
|
|
"""
|
|
result = execute_query(update_query, (email_id,))
|
|
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Email not found")
|
|
|
|
logger.info(f"✅ Email {email_id} marked as processed and moved to Processed folder")
|
|
|
|
return {
|
|
"success": True,
|
|
"email_id": result.get('id') if result else email_id,
|
|
"folder": result.get('folder') if result else 'Processed',
|
|
"status": result.get('status') if result else 'processed'
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error marking email {email_id} as processed: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/emails/{email_id}/attachments/{attachment_id}")
|
|
async def download_attachment(email_id: int, attachment_id: int):
|
|
"""Download email attachment"""
|
|
from fastapi.responses import FileResponse
|
|
import os
|
|
|
|
try:
|
|
query = """
|
|
SELECT a.* FROM email_attachments a
|
|
JOIN email_messages e ON e.id = a.email_id
|
|
WHERE a.id = %s AND a.email_id = %s AND e.deleted_at IS NULL
|
|
"""
|
|
result = execute_query(query, (attachment_id, email_id))
|
|
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Attachment not found")
|
|
|
|
attachment = result[0]
|
|
file_path = attachment['file_path']
|
|
|
|
if not os.path.exists(file_path):
|
|
raise HTTPException(status_code=404, detail="File not found on disk")
|
|
|
|
return FileResponse(
|
|
path=file_path,
|
|
filename=attachment['filename'],
|
|
media_type=attachment.get('content_type', 'application/octet-stream')
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error downloading attachment {attachment_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.put("/emails/{email_id}")
|
|
async def update_email(email_id: int, status: Optional[str] = None):
|
|
"""Update email (archive, mark as read, etc)"""
|
|
try:
|
|
# Build update fields dynamically
|
|
updates = []
|
|
params = []
|
|
|
|
if status:
|
|
updates.append("status = %s")
|
|
params.append(status)
|
|
|
|
if not updates:
|
|
raise HTTPException(status_code=400, detail="No fields to update")
|
|
|
|
params.append(email_id)
|
|
query = f"UPDATE email_messages SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s"
|
|
execute_update(query, tuple(params))
|
|
|
|
logger.info(f"✅ Updated email {email_id}: status={status}")
|
|
return {"success": True, "message": "Email updated"}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error updating email {email_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.delete("/emails/{email_id}")
|
|
async def delete_email(email_id: int):
|
|
"""Soft delete email"""
|
|
try:
|
|
query = """
|
|
UPDATE email_messages
|
|
SET deleted_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s AND deleted_at IS NULL
|
|
"""
|
|
execute_update(query, (email_id,))
|
|
|
|
logger.info(f"🗑️ Deleted email {email_id}")
|
|
return {"success": True, "message": "Email deleted"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error deleting email {email_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/emails/{email_id}/reprocess")
|
|
async def reprocess_email(email_id: int):
|
|
"""Reprocess email (re-classify and apply rules)"""
|
|
try:
|
|
# Get email
|
|
query = "SELECT * FROM email_messages WHERE id = %s AND deleted_at IS NULL"
|
|
result = execute_query(query, (email_id,))
|
|
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Email not found")
|
|
|
|
email = result[0]
|
|
|
|
# Re-classify
|
|
processor = EmailProcessorService()
|
|
classification, confidence = await processor.classify_email(
|
|
email['subject'],
|
|
email['body_text'] or email['body_html']
|
|
)
|
|
|
|
# Update classification
|
|
update_query = """
|
|
UPDATE email_messages
|
|
SET classification = %s,
|
|
confidence_score = %s,
|
|
classification_date = CURRENT_TIMESTAMP,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
"""
|
|
execute_update(update_query, (classification, confidence, email_id))
|
|
|
|
logger.info(f"🔄 Reprocessed email {email_id}: {classification} ({confidence:.2f})")
|
|
return {
|
|
"success": True,
|
|
"message": "Email reprocessed",
|
|
"classification": classification,
|
|
"confidence": confidence
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error reprocessing email {email_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/emails/process")
|
|
async def process_emails():
|
|
"""Manually trigger email processing"""
|
|
try:
|
|
processor = EmailProcessorService()
|
|
stats = await processor.process_inbox()
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "Email processing completed",
|
|
"stats": stats
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Email processing failed: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/emails/bulk/archive")
|
|
async def bulk_archive(email_ids: List[int]):
|
|
"""Bulk archive emails"""
|
|
try:
|
|
if not email_ids:
|
|
raise HTTPException(status_code=400, detail="No email IDs provided")
|
|
|
|
placeholders = ','.join(['%s'] * len(email_ids))
|
|
query = f"""
|
|
UPDATE email_messages
|
|
SET status = 'archived', updated_at = CURRENT_TIMESTAMP
|
|
WHERE id IN ({placeholders}) AND deleted_at IS NULL
|
|
"""
|
|
execute_update(query, tuple(email_ids))
|
|
|
|
logger.info(f"📦 Bulk archived {len(email_ids)} emails")
|
|
return {"success": True, "message": f"{len(email_ids)} emails archived"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error bulk archiving: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/emails/bulk/reprocess")
|
|
async def bulk_reprocess(email_ids: List[int]):
|
|
"""Bulk reprocess emails"""
|
|
try:
|
|
if not email_ids:
|
|
raise HTTPException(status_code=400, detail="No email IDs provided")
|
|
|
|
processor = EmailProcessorService()
|
|
success_count = 0
|
|
|
|
for email_id in email_ids:
|
|
try:
|
|
# Get email
|
|
query = "SELECT * FROM email_messages WHERE id = %s AND deleted_at IS NULL"
|
|
result = execute_query(query, (email_id,))
|
|
|
|
if result:
|
|
email = result[0]
|
|
classification, confidence = await processor.classify_email(
|
|
email['subject'],
|
|
email['body_text'] or email['body_html']
|
|
)
|
|
|
|
update_query = """
|
|
UPDATE email_messages
|
|
SET classification = %s, confidence_score = %s,
|
|
classification_date = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
"""
|
|
execute_update(update_query, (classification, confidence, email_id))
|
|
success_count += 1
|
|
except Exception as e:
|
|
logger.error(f"Failed to reprocess email {email_id}: {e}")
|
|
|
|
logger.info(f"🔄 Bulk reprocessed {success_count}/{len(email_ids)} emails")
|
|
return {"success": True, "message": f"{success_count} emails reprocessed"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error bulk reprocessing: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/emails/bulk/delete")
|
|
async def bulk_delete(email_ids: List[int]):
|
|
"""Bulk soft delete emails"""
|
|
try:
|
|
if not email_ids:
|
|
raise HTTPException(status_code=400, detail="No email IDs provided")
|
|
|
|
placeholders = ','.join(['%s'] * len(email_ids))
|
|
query = f"""
|
|
UPDATE email_messages
|
|
SET deleted_at = CURRENT_TIMESTAMP
|
|
WHERE id IN ({placeholders}) AND deleted_at IS NULL
|
|
"""
|
|
execute_update(query, tuple(email_ids))
|
|
|
|
logger.info(f"🗑️ Bulk deleted {len(email_ids)} emails")
|
|
return {"success": True, "message": f"{len(email_ids)} emails deleted"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error bulk deleting: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
class ClassificationUpdate(BaseModel):
|
|
classification: str
|
|
confidence: Optional[float] = None
|
|
|
|
|
|
@router.put("/emails/{email_id}/classify")
|
|
async def update_classification(email_id: int, data: ClassificationUpdate):
|
|
"""Manually update email classification"""
|
|
try:
|
|
valid_classifications = [
|
|
'invoice', 'freight_note', 'order_confirmation', 'time_confirmation',
|
|
'case_notification', 'customer_email', 'bankruptcy', 'general', 'spam', 'unknown'
|
|
]
|
|
|
|
if data.classification not in valid_classifications:
|
|
raise HTTPException(status_code=400, detail=f"Invalid classification. Must be one of: {valid_classifications}")
|
|
|
|
confidence = data.confidence if data.confidence is not None else 1.0
|
|
|
|
query = """
|
|
UPDATE email_messages
|
|
SET classification = %s,
|
|
confidence_score = %s,
|
|
classification_date = CURRENT_TIMESTAMP
|
|
WHERE id = %s AND deleted_at IS NULL
|
|
"""
|
|
execute_update(query, (data.classification, confidence, email_id))
|
|
|
|
logger.info(f"✏️ Manual classification: Email {email_id} → {data.classification}")
|
|
return {
|
|
"success": True,
|
|
"message": f"Email {email_id} classified as '{data.classification}'"
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error updating classification: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.delete("/emails/{email_id}")
|
|
async def delete_email(email_id: int):
|
|
"""Soft delete email"""
|
|
try:
|
|
query = """
|
|
UPDATE email_messages
|
|
SET deleted_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
"""
|
|
execute_query(query, (email_id,))
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Email {email_id} deleted"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error deleting email: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# Email Rules Endpoints
|
|
@router.get("/email-rules", response_model=List[EmailRule])
|
|
async def list_rules():
|
|
"""Get all email rules"""
|
|
try:
|
|
query = """
|
|
SELECT * FROM email_rules
|
|
ORDER BY priority ASC, name ASC
|
|
"""
|
|
result = execute_query(query)
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error listing rules: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/email-rules", response_model=EmailRule)
|
|
async def create_rule(rule: EmailRule):
|
|
"""Create new email rule"""
|
|
try:
|
|
query = """
|
|
INSERT INTO email_rules
|
|
(name, description, conditions, action_type, action_params, priority, enabled, created_by_user_id)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, 1)
|
|
RETURNING *
|
|
"""
|
|
|
|
import json
|
|
result = execute_query(query, (
|
|
rule.name,
|
|
rule.description,
|
|
json.dumps(rule.conditions),
|
|
rule.action_type,
|
|
json.dumps(rule.action_params or {}),
|
|
rule.priority,
|
|
rule.enabled
|
|
))
|
|
|
|
if result:
|
|
logger.info(f"✅ Created email rule: {rule.name}")
|
|
return result[0]
|
|
else:
|
|
raise HTTPException(status_code=500, detail="Failed to create rule")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error creating rule: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.put("/email-rules/{rule_id}", response_model=EmailRule)
|
|
async def update_rule(rule_id: int, rule: EmailRule):
|
|
"""Update existing email rule"""
|
|
try:
|
|
import json
|
|
query = """
|
|
UPDATE email_rules
|
|
SET name = %s,
|
|
description = %s,
|
|
conditions = %s,
|
|
action_type = %s,
|
|
action_params = %s,
|
|
priority = %s,
|
|
enabled = %s
|
|
WHERE id = %s
|
|
RETURNING *
|
|
"""
|
|
|
|
result = execute_query(query, (
|
|
rule.name,
|
|
rule.description,
|
|
json.dumps(rule.conditions),
|
|
rule.action_type,
|
|
json.dumps(rule.action_params or {}),
|
|
rule.priority,
|
|
rule.enabled,
|
|
rule_id
|
|
))
|
|
|
|
if result:
|
|
logger.info(f"✅ Updated email rule {rule_id}")
|
|
return result[0]
|
|
else:
|
|
raise HTTPException(status_code=404, detail="Rule not found")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error updating rule: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.delete("/email-rules/{rule_id}")
|
|
async def delete_rule(rule_id: int):
|
|
"""Delete email rule"""
|
|
try:
|
|
query = "DELETE FROM email_rules WHERE id = %s"
|
|
execute_query(query, (rule_id,))
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Rule {rule_id} deleted"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error deleting rule: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# Statistics Endpoint
|
|
@router.get("/emails/stats/summary")
|
|
async def get_email_stats():
|
|
"""Get email processing statistics"""
|
|
try:
|
|
query = """
|
|
SELECT
|
|
COUNT(*) as total_emails,
|
|
COUNT(CASE WHEN status = 'new' THEN 1 END) as new_emails,
|
|
COUNT(CASE WHEN status = 'processed' THEN 1 END) as processed_emails,
|
|
COUNT(CASE WHEN classification = 'invoice' THEN 1 END) as invoices,
|
|
COUNT(CASE WHEN classification = 'time_confirmation' THEN 1 END) as time_confirmations,
|
|
COUNT(CASE WHEN classification = 'spam' THEN 1 END) as spam_emails,
|
|
COUNT(CASE WHEN auto_processed THEN 1 END) as auto_processed,
|
|
AVG(confidence_score) as avg_confidence
|
|
FROM email_messages
|
|
WHERE deleted_at IS NULL
|
|
"""
|
|
|
|
result = execute_query(query)
|
|
return result[0] if result else {}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error getting stats: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# ========== Workflow Endpoints ==========
|
|
|
|
@router.get("/workflows", response_model=List[EmailWorkflow])
|
|
async def list_workflows():
|
|
"""Get all email workflows"""
|
|
try:
|
|
query = """
|
|
SELECT * FROM email_workflows
|
|
ORDER BY priority ASC, name ASC
|
|
"""
|
|
result = execute_query(query)
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error listing workflows: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/workflows/{workflow_id}", response_model=EmailWorkflow)
|
|
async def get_workflow(workflow_id: int):
|
|
"""Get specific workflow by ID"""
|
|
try:
|
|
query = "SELECT * FROM email_workflows WHERE id = %s"
|
|
result = execute_query(query, (workflow_id,))
|
|
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Workflow not found")
|
|
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error getting workflow: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/workflows", response_model=EmailWorkflow)
|
|
async def create_workflow(workflow: EmailWorkflow):
|
|
"""Create new email workflow"""
|
|
try:
|
|
import json
|
|
|
|
query = """
|
|
INSERT INTO email_workflows
|
|
(name, description, classification_trigger, sender_pattern, subject_pattern,
|
|
confidence_threshold, workflow_steps, priority, enabled, stop_on_match, created_by_user_id)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 1)
|
|
RETURNING *
|
|
"""
|
|
|
|
result = execute_query(query, (
|
|
workflow.name,
|
|
workflow.description,
|
|
workflow.classification_trigger,
|
|
workflow.sender_pattern,
|
|
workflow.subject_pattern,
|
|
workflow.confidence_threshold,
|
|
json.dumps(workflow.workflow_steps),
|
|
workflow.priority,
|
|
workflow.enabled,
|
|
workflow.stop_on_match
|
|
))
|
|
|
|
if result:
|
|
logger.info(f"✅ Created workflow: {workflow.name}")
|
|
return result
|
|
else:
|
|
raise HTTPException(status_code=500, detail="Failed to create workflow")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error creating workflow: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.put("/workflows/{workflow_id}", response_model=EmailWorkflow)
|
|
async def update_workflow(workflow_id: int, workflow: EmailWorkflow):
|
|
"""Update existing email workflow"""
|
|
try:
|
|
import json
|
|
|
|
query = """
|
|
UPDATE email_workflows
|
|
SET name = %s,
|
|
description = %s,
|
|
classification_trigger = %s,
|
|
sender_pattern = %s,
|
|
subject_pattern = %s,
|
|
confidence_threshold = %s,
|
|
workflow_steps = %s,
|
|
priority = %s,
|
|
enabled = %s,
|
|
stop_on_match = %s
|
|
WHERE id = %s
|
|
RETURNING *
|
|
"""
|
|
|
|
result = execute_query(query, (
|
|
workflow.name,
|
|
workflow.description,
|
|
workflow.classification_trigger,
|
|
workflow.sender_pattern,
|
|
workflow.subject_pattern,
|
|
workflow.confidence_threshold,
|
|
json.dumps(workflow.workflow_steps),
|
|
workflow.priority,
|
|
workflow.enabled,
|
|
workflow.stop_on_match,
|
|
workflow_id
|
|
))
|
|
|
|
if result:
|
|
logger.info(f"✅ Updated workflow {workflow_id}")
|
|
return result
|
|
else:
|
|
raise HTTPException(status_code=404, detail="Workflow not found")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error updating workflow: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.delete("/workflows/{workflow_id}")
|
|
async def delete_workflow(workflow_id: int):
|
|
"""Delete email workflow"""
|
|
try:
|
|
query = "DELETE FROM email_workflows WHERE id = %s"
|
|
execute_update(query, (workflow_id,))
|
|
|
|
logger.info(f"🗑️ Deleted workflow {workflow_id}")
|
|
return {"success": True, "message": f"Workflow {workflow_id} deleted"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error deleting workflow: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/workflows/{workflow_id}/toggle")
|
|
async def toggle_workflow(workflow_id: int):
|
|
"""Toggle workflow enabled status"""
|
|
try:
|
|
query = """
|
|
UPDATE email_workflows
|
|
SET enabled = NOT enabled
|
|
WHERE id = %s
|
|
RETURNING enabled
|
|
"""
|
|
result = execute_query(query, (workflow_id,))
|
|
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Workflow not found")
|
|
|
|
status = "enabled" if result['enabled'] else "disabled"
|
|
logger.info(f"🔄 Workflow {workflow_id} {status}")
|
|
|
|
return {
|
|
"success": True,
|
|
"workflow_id": workflow_id,
|
|
"enabled": result['enabled']
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error toggling workflow: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/emails/{email_id}/execute-workflows")
|
|
async def execute_workflows_for_email(email_id: int):
|
|
"""Manually trigger workflow execution for an email"""
|
|
try:
|
|
# Get email data
|
|
query = """
|
|
SELECT id, message_id, subject, sender_email, sender_name, body_text,
|
|
classification, confidence_score, status
|
|
FROM email_messages
|
|
WHERE id = %s AND deleted_at IS NULL
|
|
"""
|
|
email_data = execute_query(query, (email_id,))
|
|
|
|
if not email_data:
|
|
raise HTTPException(status_code=404, detail="Email not found")
|
|
|
|
# Execute workflows
|
|
result = await email_workflow_service.execute_workflows(email_data)
|
|
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error executing workflows: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/workflow-executions", response_model=List[WorkflowExecution])
|
|
async def list_workflow_executions(
|
|
workflow_id: Optional[int] = Query(None),
|
|
email_id: Optional[int] = Query(None),
|
|
status: Optional[str] = Query(None),
|
|
limit: int = Query(50, le=500)
|
|
):
|
|
"""Get workflow execution history"""
|
|
try:
|
|
where_clauses = []
|
|
params = []
|
|
|
|
if workflow_id:
|
|
where_clauses.append("workflow_id = %s")
|
|
params.append(workflow_id)
|
|
|
|
if email_id:
|
|
where_clauses.append("email_id = %s")
|
|
params.append(email_id)
|
|
|
|
if status:
|
|
where_clauses.append("status = %s")
|
|
params.append(status)
|
|
|
|
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
|
|
|
|
query = f"""
|
|
SELECT * FROM email_workflow_executions
|
|
WHERE {where_sql}
|
|
ORDER BY started_at DESC
|
|
LIMIT %s
|
|
"""
|
|
|
|
params.append(limit)
|
|
result = execute_query(query, tuple(params))
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error listing workflow executions: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/workflow-actions", response_model=List[WorkflowAction])
|
|
async def list_workflow_actions():
|
|
"""Get all available workflow actions"""
|
|
try:
|
|
query = """
|
|
SELECT * FROM email_workflow_actions
|
|
WHERE enabled = true
|
|
ORDER BY category, name
|
|
"""
|
|
result = execute_query(query)
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error listing workflow actions: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/workflows/stats/summary")
|
|
async def get_workflow_stats():
|
|
"""Get workflow execution statistics"""
|
|
try:
|
|
query = """
|
|
SELECT * FROM v_workflow_stats
|
|
"""
|
|
result = execute_query(query)
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error getting workflow stats: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# ========== Email Activity Log Endpoints ==========
|
|
|
|
class EmailActivityLog(BaseModel):
|
|
id: int
|
|
email_id: int
|
|
event_type: str
|
|
event_category: str
|
|
description: str
|
|
metadata: Optional[dict]
|
|
user_id: Optional[int]
|
|
user_name: Optional[str]
|
|
created_at: datetime
|
|
created_by: str
|
|
|
|
|
|
@router.get("/emails/{email_id}/activity", response_model=List[EmailActivityLog])
|
|
async def get_email_activity_log(email_id: int, limit: int = Query(default=100, le=500)):
|
|
"""Get complete activity log for an email"""
|
|
try:
|
|
query = """
|
|
SELECT
|
|
eal.id,
|
|
eal.email_id,
|
|
eal.event_type,
|
|
eal.event_category,
|
|
eal.description,
|
|
eal.metadata,
|
|
eal.user_id,
|
|
u.username as user_name,
|
|
eal.created_at,
|
|
eal.created_by
|
|
FROM email_activity_log eal
|
|
LEFT JOIN users u ON eal.user_id = u.user_id
|
|
WHERE eal.email_id = %s
|
|
ORDER BY eal.created_at DESC
|
|
LIMIT %s
|
|
"""
|
|
result = execute_query(query, (email_id, limit))
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error getting email activity log: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/emails/activity/recent", response_model=List[EmailActivityLog])
|
|
async def get_recent_activity(
|
|
limit: int = Query(default=50, le=200),
|
|
event_type: Optional[str] = None,
|
|
event_category: Optional[str] = None
|
|
):
|
|
"""Get recent email activity across all emails"""
|
|
try:
|
|
conditions = []
|
|
params = []
|
|
|
|
if event_type:
|
|
conditions.append("eal.event_type = %s")
|
|
params.append(event_type)
|
|
|
|
if event_category:
|
|
conditions.append("eal.event_category = %s")
|
|
params.append(event_category)
|
|
|
|
where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
|
|
params.append(limit)
|
|
|
|
query = f"""
|
|
SELECT
|
|
eal.id,
|
|
eal.email_id,
|
|
eal.event_type,
|
|
eal.event_category,
|
|
eal.description,
|
|
eal.metadata,
|
|
eal.user_id,
|
|
u.username as user_name,
|
|
eal.created_at,
|
|
eal.created_by
|
|
FROM email_activity_log eal
|
|
LEFT JOIN users u ON eal.user_id = u.user_id
|
|
{where_clause}
|
|
ORDER BY eal.created_at DESC
|
|
LIMIT %s
|
|
"""
|
|
result = execute_query(query, tuple(params))
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error getting recent activity: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/emails/activity/stats")
|
|
async def get_activity_stats():
|
|
"""Get activity statistics"""
|
|
try:
|
|
query = """
|
|
SELECT
|
|
event_type,
|
|
event_category,
|
|
COUNT(*) as count,
|
|
MAX(created_at) as last_occurrence
|
|
FROM email_activity_log
|
|
WHERE created_at >= NOW() - INTERVAL '7 days'
|
|
GROUP BY event_type, event_category
|
|
ORDER BY count DESC
|
|
"""
|
|
result = execute_query(query)
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error getting activity stats: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|