feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
"""
|
|
|
|
|
Email Service
|
|
|
|
|
Handles email fetching from IMAP or Microsoft Graph API
|
|
|
|
|
Based on OmniSync architecture - READ-ONLY mode for safety
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import logging
|
|
|
|
|
import imaplib
|
|
|
|
|
import email
|
|
|
|
|
from email.header import decode_header
|
|
|
|
|
from typing import List, Dict, Optional, Tuple
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
import json
|
|
|
|
|
import asyncio
|
2025-12-11 12:45:29 +01:00
|
|
|
import base64
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
from aiohttp import ClientSession, BasicAuth
|
|
|
|
|
import msal
|
|
|
|
|
|
|
|
|
|
from app.core.config import settings
|
|
|
|
|
from app.core.database import execute_query, execute_insert
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EmailService:
|
|
|
|
|
"""Email fetching service with IMAP and Graph API support"""
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.use_graph = settings.USE_GRAPH_API
|
|
|
|
|
self.imap_config = {
|
|
|
|
|
'server': settings.IMAP_SERVER,
|
|
|
|
|
'port': settings.IMAP_PORT,
|
|
|
|
|
'username': settings.IMAP_USERNAME,
|
|
|
|
|
'password': settings.IMAP_PASSWORD,
|
|
|
|
|
'use_ssl': settings.IMAP_USE_SSL,
|
|
|
|
|
'folder': settings.IMAP_FOLDER,
|
|
|
|
|
'readonly': settings.IMAP_READ_ONLY
|
|
|
|
|
}
|
|
|
|
|
self.graph_config = {
|
|
|
|
|
'tenant_id': settings.GRAPH_TENANT_ID,
|
|
|
|
|
'client_id': settings.GRAPH_CLIENT_ID,
|
|
|
|
|
'client_secret': settings.GRAPH_CLIENT_SECRET,
|
|
|
|
|
'user_email': settings.GRAPH_USER_EMAIL
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async def fetch_new_emails(self, limit: int = 50) -> List[Dict]:
|
|
|
|
|
"""
|
|
|
|
|
Fetch new emails from configured source (IMAP or Graph API)
|
|
|
|
|
Returns list of parsed email dictionaries
|
|
|
|
|
"""
|
|
|
|
|
if self.use_graph and self.graph_config['client_id']:
|
|
|
|
|
logger.info("📥 Fetching emails via Microsoft Graph API")
|
|
|
|
|
return await self._fetch_via_graph(limit)
|
|
|
|
|
elif self.imap_config['username']:
|
|
|
|
|
logger.info("📥 Fetching emails via IMAP")
|
|
|
|
|
return await self._fetch_via_imap(limit)
|
|
|
|
|
else:
|
|
|
|
|
logger.warning("⚠️ No email source configured (IMAP or Graph API)")
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
async def _fetch_via_imap(self, limit: int) -> List[Dict]:
|
|
|
|
|
"""Fetch emails using IMAP protocol (READ-ONLY mode)"""
|
|
|
|
|
emails = []
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Connect to IMAP server
|
|
|
|
|
if self.imap_config['use_ssl']:
|
|
|
|
|
mail = imaplib.IMAP4_SSL(self.imap_config['server'], self.imap_config['port'])
|
|
|
|
|
else:
|
|
|
|
|
mail = imaplib.IMAP4(self.imap_config['server'], self.imap_config['port'])
|
|
|
|
|
|
|
|
|
|
# Login
|
|
|
|
|
mail.login(self.imap_config['username'], self.imap_config['password'])
|
|
|
|
|
|
|
|
|
|
# Select folder in READ-ONLY mode (critical for safety)
|
|
|
|
|
folder = self.imap_config['folder']
|
|
|
|
|
readonly = self.imap_config['readonly']
|
|
|
|
|
mail.select(folder, readonly=readonly)
|
|
|
|
|
|
|
|
|
|
if readonly:
|
|
|
|
|
logger.info(f"🔒 Connected to {folder} in READ-ONLY mode (emails will NOT be marked as read)")
|
|
|
|
|
|
|
|
|
|
# Search for all emails
|
|
|
|
|
status, messages = mail.search(None, 'ALL')
|
|
|
|
|
|
|
|
|
|
if status != 'OK':
|
|
|
|
|
logger.error(f"❌ IMAP search failed: {status}")
|
|
|
|
|
return emails
|
|
|
|
|
|
|
|
|
|
email_ids = messages[0].split()
|
|
|
|
|
total_emails = len(email_ids)
|
|
|
|
|
|
|
|
|
|
logger.info(f"📊 Found {total_emails} emails in {folder}")
|
|
|
|
|
|
|
|
|
|
# Get most recent emails (reverse order, limit)
|
|
|
|
|
email_ids_to_fetch = email_ids[-limit:] if len(email_ids) > limit else email_ids
|
|
|
|
|
email_ids_to_fetch.reverse() # Newest first
|
|
|
|
|
|
|
|
|
|
for email_id in email_ids_to_fetch:
|
|
|
|
|
try:
|
|
|
|
|
# Fetch email using BODY.PEEK to avoid marking as read
|
|
|
|
|
status, msg_data = mail.fetch(email_id, '(BODY.PEEK[])')
|
|
|
|
|
|
|
|
|
|
if status != 'OK':
|
|
|
|
|
logger.warning(f"⚠️ Failed to fetch email {email_id}: {status}")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Parse email
|
|
|
|
|
raw_email = msg_data[0][1]
|
|
|
|
|
msg = email.message_from_bytes(raw_email)
|
|
|
|
|
|
|
|
|
|
# Extract fields
|
|
|
|
|
parsed_email = self._parse_email(msg, email_id.decode())
|
|
|
|
|
|
|
|
|
|
# Check if already exists in database
|
|
|
|
|
if not self._email_exists(parsed_email['message_id']):
|
|
|
|
|
emails.append(parsed_email)
|
|
|
|
|
logger.info(f"✅ New email: {parsed_email['subject'][:50]}... from {parsed_email['sender_email']}")
|
|
|
|
|
else:
|
|
|
|
|
logger.debug(f"⏭️ Email already exists: {parsed_email['message_id']}")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Error parsing email {email_id}: {e}")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Logout
|
|
|
|
|
mail.logout()
|
|
|
|
|
|
|
|
|
|
logger.info(f"📥 Fetched {len(emails)} new emails via IMAP")
|
|
|
|
|
return emails
|
|
|
|
|
|
|
|
|
|
except imaplib.IMAP4.error as e:
|
|
|
|
|
logger.error(f"❌ IMAP error: {e}")
|
|
|
|
|
return []
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Unexpected error fetching via IMAP: {e}")
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
async def _fetch_via_graph(self, limit: int) -> List[Dict]:
|
|
|
|
|
"""Fetch emails using Microsoft Graph API (OAuth2)"""
|
|
|
|
|
emails = []
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Get access token using MSAL
|
|
|
|
|
access_token = await self._get_graph_access_token()
|
|
|
|
|
|
|
|
|
|
if not access_token:
|
|
|
|
|
logger.error("❌ Failed to get Graph API access token")
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
# Build Graph API request
|
|
|
|
|
user_email = self.graph_config['user_email']
|
|
|
|
|
folder = self.imap_config['folder'] # Use same folder name
|
|
|
|
|
|
|
|
|
|
# Graph API endpoint for messages
|
|
|
|
|
url = f"https://graph.microsoft.com/v1.0/users/{user_email}/mailFolders/{folder}/messages"
|
|
|
|
|
params = {
|
|
|
|
|
'$top': limit,
|
|
|
|
|
'$orderby': 'receivedDateTime desc',
|
|
|
|
|
'$select': 'id,subject,from,toRecipients,ccRecipients,receivedDateTime,bodyPreview,body,hasAttachments,internetMessageId'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
headers = {
|
|
|
|
|
'Authorization': f'Bearer {access_token}',
|
|
|
|
|
'Content-Type': 'application/json'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async with ClientSession() as session:
|
|
|
|
|
async with session.get(url, params=params, headers=headers) as response:
|
|
|
|
|
if response.status != 200:
|
|
|
|
|
error_text = await response.text()
|
|
|
|
|
logger.error(f"❌ Graph API error: {response.status} - {error_text}")
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
data = await response.json()
|
|
|
|
|
messages = data.get('value', [])
|
|
|
|
|
|
|
|
|
|
logger.info(f"📊 Found {len(messages)} emails via Graph API")
|
|
|
|
|
|
|
|
|
|
for msg in messages:
|
|
|
|
|
try:
|
|
|
|
|
parsed_email = self._parse_graph_message(msg)
|
|
|
|
|
|
2025-12-11 12:45:29 +01:00
|
|
|
# Fetch attachments if email has them
|
|
|
|
|
if msg.get('hasAttachments', False):
|
|
|
|
|
attachments = await self._fetch_graph_attachments(
|
|
|
|
|
user_email,
|
|
|
|
|
msg['id'],
|
|
|
|
|
access_token,
|
|
|
|
|
session
|
|
|
|
|
)
|
|
|
|
|
parsed_email['attachments'] = attachments
|
|
|
|
|
parsed_email['attachment_count'] = len(attachments)
|
|
|
|
|
else:
|
|
|
|
|
parsed_email['attachments'] = []
|
|
|
|
|
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
# Check if already exists
|
|
|
|
|
if not self._email_exists(parsed_email['message_id']):
|
|
|
|
|
emails.append(parsed_email)
|
|
|
|
|
logger.info(f"✅ New email: {parsed_email['subject'][:50]}... from {parsed_email['sender_email']}")
|
|
|
|
|
else:
|
|
|
|
|
logger.debug(f"⏭️ Email already exists: {parsed_email['message_id']}")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Error parsing Graph message: {e}")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
logger.info(f"📥 Fetched {len(emails)} new emails via Graph API")
|
|
|
|
|
return emails
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Unexpected error fetching via Graph API: {e}")
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
async def _get_graph_access_token(self) -> Optional[str]:
|
|
|
|
|
"""Get OAuth2 access token for Microsoft Graph API using MSAL"""
|
|
|
|
|
try:
|
|
|
|
|
authority = f"https://login.microsoftonline.com/{self.graph_config['tenant_id']}"
|
|
|
|
|
|
|
|
|
|
app = msal.ConfidentialClientApplication(
|
|
|
|
|
self.graph_config['client_id'],
|
|
|
|
|
authority=authority,
|
|
|
|
|
client_credential=self.graph_config['client_secret']
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Request token with Mail.Read scope (Application permission)
|
|
|
|
|
scopes = ["https://graph.microsoft.com/.default"]
|
|
|
|
|
result = app.acquire_token_for_client(scopes=scopes)
|
|
|
|
|
|
|
|
|
|
if "access_token" in result:
|
|
|
|
|
logger.info("✅ Successfully obtained Graph API access token")
|
|
|
|
|
return result["access_token"]
|
|
|
|
|
else:
|
|
|
|
|
error = result.get("error_description", result.get("error", "Unknown error"))
|
|
|
|
|
logger.error(f"❌ Failed to obtain access token: {error}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Error getting Graph access token: {e}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _parse_email(self, msg: email.message.Message, email_id: str) -> Dict:
|
|
|
|
|
"""Parse IMAP email message into dictionary"""
|
|
|
|
|
|
|
|
|
|
# Decode subject
|
|
|
|
|
subject = self._decode_header(msg.get('Subject', ''))
|
|
|
|
|
|
|
|
|
|
# Decode sender
|
|
|
|
|
from_header = self._decode_header(msg.get('From', ''))
|
|
|
|
|
sender_name, sender_email = self._parse_email_address(from_header)
|
|
|
|
|
|
|
|
|
|
# Decode recipient
|
|
|
|
|
to_header = self._decode_header(msg.get('To', ''))
|
|
|
|
|
recipient_name, recipient_email = self._parse_email_address(to_header)
|
|
|
|
|
|
|
|
|
|
# Decode CC
|
|
|
|
|
cc_header = self._decode_header(msg.get('Cc', ''))
|
|
|
|
|
|
|
|
|
|
# Get message ID
|
|
|
|
|
message_id = msg.get('Message-ID', f"imap-{email_id}")
|
|
|
|
|
|
|
|
|
|
# Get date
|
|
|
|
|
date_str = msg.get('Date', '')
|
|
|
|
|
received_date = self._parse_email_date(date_str)
|
|
|
|
|
|
|
|
|
|
# Get body
|
|
|
|
|
body_text = ""
|
|
|
|
|
body_html = ""
|
|
|
|
|
|
|
|
|
|
if msg.is_multipart():
|
|
|
|
|
for part in msg.walk():
|
|
|
|
|
content_type = part.get_content_type()
|
|
|
|
|
|
|
|
|
|
if content_type == "text/plain":
|
|
|
|
|
try:
|
|
|
|
|
body_text = part.get_payload(decode=True).decode('utf-8', errors='ignore')
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
elif content_type == "text/html":
|
|
|
|
|
try:
|
|
|
|
|
body_html = part.get_payload(decode=True).decode('utf-8', errors='ignore')
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
else:
|
|
|
|
|
try:
|
|
|
|
|
body_text = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
|
|
|
|
|
except Exception:
|
|
|
|
|
body_text = str(msg.get_payload())
|
|
|
|
|
|
2025-12-11 12:45:29 +01:00
|
|
|
# Extract attachments
|
|
|
|
|
attachments = []
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
|
|
|
|
|
if msg.is_multipart():
|
|
|
|
|
for part in msg.walk():
|
|
|
|
|
if part.get_content_maintype() == 'multipart':
|
|
|
|
|
continue
|
2025-12-11 12:45:29 +01:00
|
|
|
|
|
|
|
|
# Skip text parts (body content)
|
|
|
|
|
if part.get_content_type() in ['text/plain', 'text/html']:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Check if part has a filename (indicates attachment)
|
|
|
|
|
filename = part.get_filename()
|
|
|
|
|
if filename:
|
|
|
|
|
# Decode filename if needed
|
|
|
|
|
filename = self._decode_header(filename)
|
|
|
|
|
|
|
|
|
|
# Get attachment content
|
|
|
|
|
content = part.get_payload(decode=True)
|
|
|
|
|
content_type = part.get_content_type()
|
|
|
|
|
|
|
|
|
|
if content: # Only add if we got content
|
|
|
|
|
attachments.append({
|
|
|
|
|
'filename': filename,
|
|
|
|
|
'content': content,
|
|
|
|
|
'content_type': content_type,
|
|
|
|
|
'size': len(content)
|
|
|
|
|
})
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'message_id': message_id,
|
|
|
|
|
'subject': subject,
|
|
|
|
|
'sender_name': sender_name,
|
|
|
|
|
'sender_email': sender_email,
|
|
|
|
|
'recipient_email': recipient_email,
|
|
|
|
|
'cc': cc_header,
|
|
|
|
|
'body_text': body_text,
|
|
|
|
|
'body_html': body_html,
|
|
|
|
|
'received_date': received_date,
|
|
|
|
|
'folder': self.imap_config['folder'],
|
2025-12-11 12:45:29 +01:00
|
|
|
'has_attachments': len(attachments) > 0,
|
|
|
|
|
'attachment_count': len(attachments),
|
|
|
|
|
'attachments': attachments
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def _parse_graph_message(self, msg: Dict) -> Dict:
|
|
|
|
|
"""Parse Microsoft Graph API message into dictionary"""
|
|
|
|
|
|
|
|
|
|
# Extract sender
|
|
|
|
|
from_data = msg.get('from', {}).get('emailAddress', {})
|
|
|
|
|
sender_name = from_data.get('name', '')
|
|
|
|
|
sender_email = from_data.get('address', '')
|
|
|
|
|
|
|
|
|
|
# Extract recipient (first TO recipient)
|
|
|
|
|
to_recipients = msg.get('toRecipients', [])
|
|
|
|
|
recipient_email = to_recipients[0]['emailAddress']['address'] if to_recipients else ''
|
|
|
|
|
|
|
|
|
|
# Extract CC recipients
|
|
|
|
|
cc_recipients = msg.get('ccRecipients', [])
|
|
|
|
|
cc = ', '.join([r['emailAddress']['address'] for r in cc_recipients])
|
|
|
|
|
|
|
|
|
|
# Get body
|
|
|
|
|
body_data = msg.get('body', {})
|
|
|
|
|
body_content = body_data.get('content', '')
|
|
|
|
|
body_type = body_data.get('contentType', 'text')
|
|
|
|
|
|
|
|
|
|
body_text = body_content if body_type == 'text' else ''
|
|
|
|
|
body_html = body_content if body_type == 'html' else ''
|
|
|
|
|
|
|
|
|
|
# Parse date
|
|
|
|
|
received_date_str = msg.get('receivedDateTime', '')
|
|
|
|
|
received_date = datetime.fromisoformat(received_date_str.replace('Z', '+00:00')) if received_date_str else datetime.now()
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'message_id': msg.get('internetMessageId', msg.get('id', '')),
|
|
|
|
|
'subject': msg.get('subject', ''),
|
|
|
|
|
'sender_name': sender_name,
|
|
|
|
|
'sender_email': sender_email,
|
|
|
|
|
'recipient_email': recipient_email,
|
|
|
|
|
'cc': cc,
|
|
|
|
|
'body_text': body_text,
|
|
|
|
|
'body_html': body_html,
|
|
|
|
|
'received_date': received_date,
|
|
|
|
|
'folder': self.imap_config['folder'],
|
|
|
|
|
'has_attachments': msg.get('hasAttachments', False),
|
2025-12-11 12:45:29 +01:00
|
|
|
'attachment_count': 0 # Will be updated after fetching attachments
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
}
|
|
|
|
|
|
2025-12-11 12:45:29 +01:00
|
|
|
async def _fetch_graph_attachments(
|
|
|
|
|
self,
|
|
|
|
|
user_email: str,
|
|
|
|
|
message_id: str,
|
|
|
|
|
access_token: str,
|
|
|
|
|
session: ClientSession
|
|
|
|
|
) -> List[Dict]:
|
|
|
|
|
"""Fetch attachments for a specific message from Graph API"""
|
|
|
|
|
attachments = []
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Graph API endpoint for message attachments
|
|
|
|
|
url = f"https://graph.microsoft.com/v1.0/users/{user_email}/messages/{message_id}/attachments"
|
|
|
|
|
headers = {
|
|
|
|
|
'Authorization': f'Bearer {access_token}',
|
|
|
|
|
'Content-Type': 'application/json'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async with session.get(url, headers=headers) as response:
|
|
|
|
|
if response.status != 200:
|
|
|
|
|
logger.warning(f"⚠️ Failed to fetch attachments for message {message_id}: {response.status}")
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
data = await response.json()
|
|
|
|
|
attachment_list = data.get('value', [])
|
|
|
|
|
|
|
|
|
|
for att in attachment_list:
|
|
|
|
|
# Graph API returns base64 content in contentBytes
|
|
|
|
|
content_bytes = att.get('contentBytes', '')
|
|
|
|
|
if content_bytes:
|
|
|
|
|
import base64
|
|
|
|
|
content = base64.b64decode(content_bytes)
|
|
|
|
|
else:
|
|
|
|
|
content = b''
|
|
|
|
|
|
|
|
|
|
attachments.append({
|
|
|
|
|
'filename': att.get('name', 'unknown'),
|
|
|
|
|
'content': content,
|
|
|
|
|
'content_type': att.get('contentType', 'application/octet-stream'),
|
|
|
|
|
'size': att.get('size', len(content))
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
logger.info(f"📎 Fetched attachment: {att.get('name')} ({att.get('size', 0)} bytes)")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Error fetching attachments for message {message_id}: {e}")
|
|
|
|
|
|
|
|
|
|
return attachments
|
|
|
|
|
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
def _decode_header(self, header: str) -> str:
|
|
|
|
|
"""Decode email header (handles MIME encoding)"""
|
|
|
|
|
if not header:
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
decoded_parts = decode_header(header)
|
|
|
|
|
decoded_string = ""
|
|
|
|
|
|
|
|
|
|
for content, encoding in decoded_parts:
|
|
|
|
|
if isinstance(content, bytes):
|
|
|
|
|
try:
|
|
|
|
|
decoded_string += content.decode(encoding or 'utf-8', errors='ignore')
|
|
|
|
|
except Exception:
|
|
|
|
|
decoded_string += content.decode('utf-8', errors='ignore')
|
|
|
|
|
else:
|
|
|
|
|
decoded_string += str(content)
|
|
|
|
|
|
|
|
|
|
return decoded_string.strip()
|
|
|
|
|
|
|
|
|
|
def _parse_email_address(self, header: str) -> Tuple[str, str]:
|
|
|
|
|
"""Parse 'Name <email@domain.com>' into (name, email)"""
|
|
|
|
|
if not header:
|
|
|
|
|
return ("", "")
|
|
|
|
|
|
|
|
|
|
if '<' in header and '>' in header:
|
|
|
|
|
# Format: "Name <email@domain.com>"
|
|
|
|
|
parts = header.split('<')
|
|
|
|
|
name = parts[0].strip().strip('"')
|
|
|
|
|
email_addr = parts[1].strip('>').strip()
|
|
|
|
|
return (name, email_addr)
|
|
|
|
|
else:
|
|
|
|
|
# Just email address
|
|
|
|
|
return ("", header.strip())
|
|
|
|
|
|
|
|
|
|
def _parse_email_date(self, date_str: str) -> datetime:
|
|
|
|
|
"""Parse email date header into datetime object"""
|
|
|
|
|
if not date_str:
|
|
|
|
|
return datetime.now()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Use email.utils to parse RFC 2822 date
|
|
|
|
|
from email.utils import parsedate_to_datetime
|
|
|
|
|
return parsedate_to_datetime(date_str)
|
|
|
|
|
except Exception:
|
|
|
|
|
logger.warning(f"⚠️ Failed to parse date: {date_str}")
|
|
|
|
|
return datetime.now()
|
|
|
|
|
|
|
|
|
|
def _email_exists(self, message_id: str) -> bool:
|
|
|
|
|
"""Check if email already exists in database"""
|
|
|
|
|
query = "SELECT id FROM email_messages WHERE message_id = %s AND deleted_at IS NULL"
|
|
|
|
|
result = execute_query(query, (message_id,))
|
|
|
|
|
return len(result) > 0
|
|
|
|
|
|
|
|
|
|
async def save_email(self, email_data: Dict) -> Optional[int]:
|
|
|
|
|
"""Save email to database"""
|
|
|
|
|
try:
|
|
|
|
|
query = """
|
|
|
|
|
INSERT INTO email_messages
|
|
|
|
|
(message_id, subject, sender_email, sender_name, recipient_email, cc,
|
|
|
|
|
body_text, body_html, received_date, folder, has_attachments, attachment_count,
|
|
|
|
|
status, is_read)
|
|
|
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'new', false)
|
|
|
|
|
RETURNING id
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
email_id = execute_insert(query, (
|
|
|
|
|
email_data['message_id'],
|
|
|
|
|
email_data['subject'],
|
|
|
|
|
email_data['sender_email'],
|
|
|
|
|
email_data['sender_name'],
|
|
|
|
|
email_data['recipient_email'],
|
|
|
|
|
email_data['cc'],
|
|
|
|
|
email_data['body_text'],
|
|
|
|
|
email_data['body_html'],
|
|
|
|
|
email_data['received_date'],
|
|
|
|
|
email_data['folder'],
|
|
|
|
|
email_data['has_attachments'],
|
|
|
|
|
email_data['attachment_count']
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
logger.info(f"✅ Saved email {email_id}: {email_data['subject'][:50]}...")
|
2025-12-11 12:45:29 +01:00
|
|
|
|
|
|
|
|
# Save attachments if any
|
|
|
|
|
if email_data.get('attachments'):
|
|
|
|
|
await self._save_attachments(email_id, email_data['attachments'])
|
|
|
|
|
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
return email_id
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Error saving email to database: {e}")
|
|
|
|
|
return None
|
|
|
|
|
|
2025-12-11 12:45:29 +01:00
|
|
|
async def _save_attachments(self, email_id: int, attachments: List[Dict]):
|
|
|
|
|
"""Save email attachments to disk and database"""
|
|
|
|
|
import os
|
|
|
|
|
import hashlib
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
|
# Create uploads directory if not exists
|
|
|
|
|
upload_dir = Path("uploads/email_attachments")
|
|
|
|
|
upload_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
for att in attachments:
|
|
|
|
|
try:
|
|
|
|
|
filename = att['filename']
|
|
|
|
|
content = att['content']
|
|
|
|
|
content_type = att.get('content_type', 'application/octet-stream')
|
|
|
|
|
size_bytes = att['size']
|
|
|
|
|
|
|
|
|
|
# Generate MD5 hash for deduplication
|
|
|
|
|
md5_hash = hashlib.md5(content).hexdigest()
|
|
|
|
|
|
|
|
|
|
# Save to disk with hash prefix
|
|
|
|
|
file_path = upload_dir / f"{md5_hash}_{filename}"
|
|
|
|
|
file_path.write_bytes(content)
|
|
|
|
|
|
|
|
|
|
# Save to database
|
|
|
|
|
query = """
|
|
|
|
|
INSERT INTO email_attachments
|
|
|
|
|
(email_id, filename, content_type, size_bytes, file_path)
|
|
|
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
|
|
|
"""
|
|
|
|
|
execute_insert(query, (
|
|
|
|
|
email_id,
|
|
|
|
|
filename,
|
|
|
|
|
content_type,
|
|
|
|
|
size_bytes,
|
|
|
|
|
str(file_path)
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
logger.info(f"📎 Saved attachment: {filename} ({size_bytes} bytes)")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Failed to save attachment {filename}: {e}")
|
|
|
|
|
|
feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00
|
|
|
async def get_unprocessed_emails(self, limit: int = 100) -> List[Dict]:
|
|
|
|
|
"""Get emails from database that haven't been processed yet"""
|
|
|
|
|
query = """
|
|
|
|
|
SELECT * FROM email_messages
|
|
|
|
|
WHERE status = 'new' AND deleted_at IS NULL
|
|
|
|
|
ORDER BY received_date DESC
|
|
|
|
|
LIMIT %s
|
|
|
|
|
"""
|
|
|
|
|
result = execute_query(query, (limit,))
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
async def update_email_status(self, email_id: int, status: str):
|
|
|
|
|
"""Update email processing status"""
|
|
|
|
|
query = "UPDATE email_messages SET status = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s"
|
|
|
|
|
execute_query(query, (status, email_id))
|
|
|
|
|
logger.info(f"✅ Updated email {email_id} status to: {status}")
|