214 lines
7.5 KiB
Python
214 lines
7.5 KiB
Python
|
|
"""
|
||
|
|
Email Activity Logger
|
||
|
|
Helper service for logging all email events
|
||
|
|
"""
|
||
|
|
|
||
|
|
import logging
|
||
|
|
from typing import Optional, Dict, Any
|
||
|
|
import json
|
||
|
|
from app.core.database import execute_query, execute_insert
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
class EmailActivityLogger:
|
||
|
|
"""Centralized email activity logging"""
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
async def log(
|
||
|
|
email_id: int,
|
||
|
|
event_type: str,
|
||
|
|
description: str,
|
||
|
|
category: str = 'system',
|
||
|
|
metadata: Optional[Dict[str, Any]] = None,
|
||
|
|
user_id: Optional[int] = None,
|
||
|
|
created_by: str = 'system'
|
||
|
|
) -> Optional[int]:
|
||
|
|
"""
|
||
|
|
Log an email activity event
|
||
|
|
|
||
|
|
Args:
|
||
|
|
email_id: Email ID
|
||
|
|
event_type: Type of event (fetched, classified, workflow_executed, etc.)
|
||
|
|
description: Human-readable description
|
||
|
|
category: Event category (system, user, workflow, rule, integration)
|
||
|
|
metadata: Additional event data as dict
|
||
|
|
user_id: User ID if user-triggered
|
||
|
|
created_by: Who/what created this log entry
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Log ID or None on failure
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
metadata_json = json.dumps(metadata) if metadata else None
|
||
|
|
|
||
|
|
log_id = execute_insert(
|
||
|
|
"""INSERT INTO email_activity_log
|
||
|
|
(email_id, event_type, event_category, description, metadata, user_id, created_by)
|
||
|
|
VALUES (%s, %s, %s, %s, %s::jsonb, %s, %s)""",
|
||
|
|
(email_id, event_type, category, description, metadata_json, user_id, created_by)
|
||
|
|
)
|
||
|
|
|
||
|
|
logger.debug(f"📝 Logged email event: {event_type} for email {email_id}")
|
||
|
|
return log_id
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"❌ Failed to log email activity: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
async def log_fetched(email_id: int, source: str, message_id: str):
|
||
|
|
"""Log email fetch event"""
|
||
|
|
return await EmailActivityLogger.log(
|
||
|
|
email_id=email_id,
|
||
|
|
event_type='fetched',
|
||
|
|
category='system',
|
||
|
|
description=f'Email fetched from {source}',
|
||
|
|
metadata={'source': source, 'message_id': message_id}
|
||
|
|
)
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
async def log_classified(email_id: int, classification: str, confidence: float, method: str = 'ai'):
|
||
|
|
"""Log email classification"""
|
||
|
|
return await EmailActivityLogger.log(
|
||
|
|
email_id=email_id,
|
||
|
|
event_type='classified',
|
||
|
|
category='system',
|
||
|
|
description=f'Classified as {classification} (confidence: {confidence:.2%})',
|
||
|
|
metadata={
|
||
|
|
'classification': classification,
|
||
|
|
'confidence': confidence,
|
||
|
|
'method': method
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
async def log_workflow_executed(email_id: int, workflow_id: int, workflow_name: str,
|
||
|
|
status: str, steps_completed: int, execution_time_ms: int):
|
||
|
|
"""Log workflow execution"""
|
||
|
|
return await EmailActivityLogger.log(
|
||
|
|
email_id=email_id,
|
||
|
|
event_type='workflow_executed',
|
||
|
|
category='workflow',
|
||
|
|
description=f'Workflow "{workflow_name}" {status} ({steps_completed} steps, {execution_time_ms}ms)',
|
||
|
|
metadata={
|
||
|
|
'workflow_id': workflow_id,
|
||
|
|
'workflow_name': workflow_name,
|
||
|
|
'status': status,
|
||
|
|
'steps_completed': steps_completed,
|
||
|
|
'execution_time_ms': execution_time_ms
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
async def log_rule_matched(email_id: int, rule_id: int, rule_name: str, action_type: str):
|
||
|
|
"""Log rule match"""
|
||
|
|
return await EmailActivityLogger.log(
|
||
|
|
email_id=email_id,
|
||
|
|
event_type='rule_matched',
|
||
|
|
category='rule',
|
||
|
|
description=f'Matched rule "{rule_name}" → action: {action_type}',
|
||
|
|
metadata={
|
||
|
|
'rule_id': rule_id,
|
||
|
|
'rule_name': rule_name,
|
||
|
|
'action_type': action_type
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
async def log_status_changed(email_id: int, old_status: str, new_status: str, reason: Optional[str] = None):
|
||
|
|
"""Log status change"""
|
||
|
|
desc = f'Status changed: {old_status} → {new_status}'
|
||
|
|
if reason:
|
||
|
|
desc += f' ({reason})'
|
||
|
|
|
||
|
|
return await EmailActivityLogger.log(
|
||
|
|
email_id=email_id,
|
||
|
|
event_type='status_changed',
|
||
|
|
category='system',
|
||
|
|
description=desc,
|
||
|
|
metadata={
|
||
|
|
'old_status': old_status,
|
||
|
|
'new_status': new_status,
|
||
|
|
'reason': reason
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
async def log_read(email_id: int, user_id: Optional[int] = None, username: Optional[str] = None):
|
||
|
|
"""Log email read"""
|
||
|
|
return await EmailActivityLogger.log(
|
||
|
|
email_id=email_id,
|
||
|
|
event_type='read',
|
||
|
|
category='user',
|
||
|
|
description=f'Email read by {username or "user"}',
|
||
|
|
metadata={'username': username},
|
||
|
|
user_id=user_id,
|
||
|
|
created_by=username or 'user'
|
||
|
|
)
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
async def log_attachment_action(email_id: int, filename: str, action: str = 'downloaded'):
|
||
|
|
"""Log attachment action"""
|
||
|
|
return await EmailActivityLogger.log(
|
||
|
|
email_id=email_id,
|
||
|
|
event_type=f'attachment_{action}',
|
||
|
|
category='user',
|
||
|
|
description=f'Attachment {action}: {filename}',
|
||
|
|
metadata={'filename': filename, 'action': action}
|
||
|
|
)
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
async def log_linked(email_id: int, entity_type: str, entity_id: int, entity_name: str):
|
||
|
|
"""Log entity linking"""
|
||
|
|
return await EmailActivityLogger.log(
|
||
|
|
email_id=email_id,
|
||
|
|
event_type='linked',
|
||
|
|
category='system',
|
||
|
|
description=f'Linked to {entity_type}: {entity_name}',
|
||
|
|
metadata={
|
||
|
|
'entity_type': entity_type,
|
||
|
|
'entity_id': entity_id,
|
||
|
|
'entity_name': entity_name
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
async def log_invoice_extracted(email_id: int, invoice_number: Optional[str],
|
||
|
|
amount: Optional[float], success: bool):
|
||
|
|
"""Log invoice data extraction"""
|
||
|
|
desc = f'Invoice data extraction {"succeeded" if success else "failed"}'
|
||
|
|
if success and invoice_number:
|
||
|
|
desc += f' - #{invoice_number}'
|
||
|
|
|
||
|
|
return await EmailActivityLogger.log(
|
||
|
|
email_id=email_id,
|
||
|
|
event_type='invoice_extracted',
|
||
|
|
category='integration',
|
||
|
|
description=desc,
|
||
|
|
metadata={
|
||
|
|
'success': success,
|
||
|
|
'invoice_number': invoice_number,
|
||
|
|
'amount': amount
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
async def log_error(email_id: int, error_type: str, error_message: str, context: Optional[Dict] = None):
|
||
|
|
"""Log error event"""
|
||
|
|
return await EmailActivityLogger.log(
|
||
|
|
email_id=email_id,
|
||
|
|
event_type='error',
|
||
|
|
category='system',
|
||
|
|
description=f'Error: {error_type} - {error_message}',
|
||
|
|
metadata={
|
||
|
|
'error_type': error_type,
|
||
|
|
'error_message': error_message,
|
||
|
|
'context': context
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
# Export singleton instance
|
||
|
|
email_activity_logger = EmailActivityLogger()
|