1117 lines
45 KiB
Python
1117 lines
45 KiB
Python
"""
|
|
Email Service
|
|
Handles email fetching from IMAP or Microsoft Graph API
|
|
Based on OmniSync architecture - READ-ONLY mode for safety
|
|
Also handles outbound SMTP email sending for reminders and notifications
|
|
"""
|
|
|
|
import logging
|
|
import imaplib
|
|
import email
|
|
from email.header import decode_header
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.base import MIMEBase
|
|
from email import encoders
|
|
from typing import List, Dict, Optional, Tuple
|
|
from datetime import datetime
|
|
import json
|
|
import asyncio
|
|
import base64
|
|
from uuid import uuid4
|
|
|
|
# Try to import aiosmtplib, but don't fail if not available
|
|
try:
|
|
import aiosmtplib
|
|
HAS_AIOSMTPLIB = True
|
|
except ImportError:
|
|
HAS_AIOSMTPLIB = False
|
|
aiosmtplib = None
|
|
|
|
from aiohttp import ClientSession, BasicAuth
|
|
import msal
|
|
|
|
from app.core.config import settings
|
|
from app.core.database import execute_query, execute_insert
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EmailService:
|
|
"""Email fetching service with IMAP and Graph API support"""
|
|
|
|
def __init__(self):
|
|
self.use_graph = settings.USE_GRAPH_API
|
|
self.imap_config = {
|
|
'server': settings.IMAP_SERVER,
|
|
'port': settings.IMAP_PORT,
|
|
'username': settings.IMAP_USERNAME,
|
|
'password': settings.IMAP_PASSWORD,
|
|
'use_ssl': settings.IMAP_USE_SSL,
|
|
'folder': settings.IMAP_FOLDER,
|
|
'readonly': settings.IMAP_READ_ONLY
|
|
}
|
|
self.graph_config = {
|
|
'tenant_id': settings.GRAPH_TENANT_ID,
|
|
'client_id': settings.GRAPH_CLIENT_ID,
|
|
'client_secret': settings.GRAPH_CLIENT_SECRET,
|
|
'user_email': settings.GRAPH_USER_EMAIL
|
|
}
|
|
|
|
async def fetch_new_emails(self, limit: int = 50) -> List[Dict]:
|
|
"""
|
|
Fetch new emails from configured source (IMAP or Graph API)
|
|
Returns list of parsed email dictionaries
|
|
"""
|
|
if self.use_graph and self.graph_config['client_id']:
|
|
logger.info("📥 Fetching emails via Microsoft Graph API")
|
|
return await self._fetch_via_graph(limit)
|
|
elif self.imap_config['username']:
|
|
logger.info("📥 Fetching emails via IMAP")
|
|
return await self._fetch_via_imap(limit)
|
|
else:
|
|
logger.warning("⚠️ No email source configured (IMAP or Graph API)")
|
|
return []
|
|
|
|
async def _fetch_via_imap(self, limit: int) -> List[Dict]:
|
|
"""Fetch emails using IMAP protocol (READ-ONLY mode)"""
|
|
emails = []
|
|
|
|
try:
|
|
# Connect to IMAP server
|
|
if self.imap_config['use_ssl']:
|
|
mail = imaplib.IMAP4_SSL(self.imap_config['server'], self.imap_config['port'])
|
|
else:
|
|
mail = imaplib.IMAP4(self.imap_config['server'], self.imap_config['port'])
|
|
|
|
# Login
|
|
mail.login(self.imap_config['username'], self.imap_config['password'])
|
|
|
|
# Select folder in READ-ONLY mode (critical for safety)
|
|
folder = self.imap_config['folder']
|
|
readonly = self.imap_config['readonly']
|
|
mail.select(folder, readonly=readonly)
|
|
|
|
if readonly:
|
|
logger.info(f"🔒 Connected to {folder} in READ-ONLY mode (emails will NOT be marked as read)")
|
|
|
|
# Search for all emails
|
|
status, messages = mail.search(None, 'ALL')
|
|
|
|
if status != 'OK':
|
|
logger.error(f"❌ IMAP search failed: {status}")
|
|
return emails
|
|
|
|
email_ids = messages[0].split()
|
|
total_emails = len(email_ids)
|
|
|
|
logger.info(f"📊 Found {total_emails} emails in {folder}")
|
|
|
|
# Get most recent emails (reverse order, limit)
|
|
email_ids_to_fetch = email_ids[-limit:] if len(email_ids) > limit else email_ids
|
|
email_ids_to_fetch.reverse() # Newest first
|
|
|
|
for email_id in email_ids_to_fetch:
|
|
try:
|
|
# Fetch email using BODY.PEEK to avoid marking as read
|
|
status, msg_data = mail.fetch(email_id, '(BODY.PEEK[])')
|
|
|
|
if status != 'OK':
|
|
logger.warning(f"⚠️ Failed to fetch email {email_id}: {status}")
|
|
continue
|
|
|
|
# Parse email
|
|
raw_email = msg_data[0][1]
|
|
msg = email.message_from_bytes(raw_email)
|
|
|
|
# Extract fields
|
|
parsed_email = self._parse_email(msg, email_id.decode())
|
|
|
|
# Check if already exists in database
|
|
if not self._email_exists(parsed_email['message_id']):
|
|
emails.append(parsed_email)
|
|
logger.info(f"✅ New email: {parsed_email['subject'][:50]}... from {parsed_email['sender_email']}")
|
|
else:
|
|
logger.debug(f"⏭️ Email already exists: {parsed_email['message_id']}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error parsing email {email_id}: {e}")
|
|
continue
|
|
|
|
# Logout
|
|
mail.logout()
|
|
|
|
logger.info(f"📥 Fetched {len(emails)} new emails via IMAP")
|
|
return emails
|
|
|
|
except imaplib.IMAP4.error as e:
|
|
logger.error(f"❌ IMAP error: {e}")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"❌ Unexpected error fetching via IMAP: {e}")
|
|
return []
|
|
|
|
async def _fetch_via_graph(self, limit: int) -> List[Dict]:
|
|
"""Fetch emails using Microsoft Graph API (OAuth2)"""
|
|
emails = []
|
|
|
|
try:
|
|
# Get access token using MSAL
|
|
access_token = await self._get_graph_access_token()
|
|
|
|
if not access_token:
|
|
logger.error("❌ Failed to get Graph API access token")
|
|
return []
|
|
|
|
# Build Graph API request
|
|
user_email = self.graph_config['user_email']
|
|
folder = self.imap_config['folder'] # Use same folder name
|
|
|
|
# Graph API endpoint for messages
|
|
url = f"https://graph.microsoft.com/v1.0/users/{user_email}/mailFolders/{folder}/messages"
|
|
params = {
|
|
'$top': limit,
|
|
'$orderby': 'receivedDateTime desc',
|
|
'$select': 'id,subject,from,toRecipients,ccRecipients,receivedDateTime,bodyPreview,body,hasAttachments,internetMessageId'
|
|
}
|
|
|
|
headers = {
|
|
'Authorization': f'Bearer {access_token}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
async with ClientSession() as session:
|
|
async with session.get(url, params=params, headers=headers) as response:
|
|
if response.status != 200:
|
|
error_text = await response.text()
|
|
logger.error(f"❌ Graph API error: {response.status} - {error_text}")
|
|
return []
|
|
|
|
data = await response.json()
|
|
messages = data.get('value', [])
|
|
|
|
logger.info(f"📊 Found {len(messages)} emails via Graph API")
|
|
|
|
for msg in messages:
|
|
try:
|
|
parsed_email = self._parse_graph_message(msg)
|
|
|
|
# Fetch attachments if email has them
|
|
if msg.get('hasAttachments', False):
|
|
attachments = await self._fetch_graph_attachments(
|
|
user_email,
|
|
msg['id'],
|
|
access_token,
|
|
session
|
|
)
|
|
parsed_email['attachments'] = attachments
|
|
parsed_email['attachment_count'] = len(attachments)
|
|
else:
|
|
parsed_email['attachments'] = []
|
|
|
|
# Check if already exists
|
|
if not self._email_exists(parsed_email['message_id']):
|
|
emails.append(parsed_email)
|
|
logger.info(f"✅ New email: {parsed_email['subject'][:50]}... from {parsed_email['sender_email']}")
|
|
else:
|
|
logger.debug(f"⏭️ Email already exists: {parsed_email['message_id']}")
|
|
# Re-save attachment bytes for existing emails (fills content_data for old emails)
|
|
if parsed_email.get('attachments'):
|
|
await self._resave_attachment_content(
|
|
parsed_email['message_id'],
|
|
parsed_email['attachments']
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error parsing Graph message: {e}")
|
|
continue
|
|
|
|
logger.info(f"📥 Fetched {len(emails)} new emails via Graph API")
|
|
return emails
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Unexpected error fetching via Graph API: {e}")
|
|
return []
|
|
|
|
async def _get_graph_access_token(self) -> Optional[str]:
|
|
"""Get OAuth2 access token for Microsoft Graph API using MSAL"""
|
|
try:
|
|
authority = f"https://login.microsoftonline.com/{self.graph_config['tenant_id']}"
|
|
|
|
app = msal.ConfidentialClientApplication(
|
|
self.graph_config['client_id'],
|
|
authority=authority,
|
|
client_credential=self.graph_config['client_secret']
|
|
)
|
|
|
|
# Request token with Mail.Read scope (Application permission)
|
|
scopes = ["https://graph.microsoft.com/.default"]
|
|
result = app.acquire_token_for_client(scopes=scopes)
|
|
|
|
if "access_token" in result:
|
|
logger.info("✅ Successfully obtained Graph API access token")
|
|
return result["access_token"]
|
|
else:
|
|
error = result.get("error_description", result.get("error", "Unknown error"))
|
|
logger.error(f"❌ Failed to obtain access token: {error}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error getting Graph access token: {e}")
|
|
return None
|
|
|
|
def _parse_email(self, msg: email.message.Message, email_id: str) -> Dict:
|
|
"""Parse IMAP email message into dictionary"""
|
|
|
|
# Decode subject
|
|
subject = self._decode_header(msg.get('Subject', ''))
|
|
|
|
# Decode sender
|
|
from_header = self._decode_header(msg.get('From', ''))
|
|
sender_name, sender_email = self._parse_email_address(from_header)
|
|
|
|
# Decode recipient
|
|
to_header = self._decode_header(msg.get('To', ''))
|
|
recipient_name, recipient_email = self._parse_email_address(to_header)
|
|
|
|
# Decode CC
|
|
cc_header = self._decode_header(msg.get('Cc', ''))
|
|
|
|
# Get message ID
|
|
message_id = msg.get('Message-ID', f"imap-{email_id}")
|
|
in_reply_to = msg.get('In-Reply-To', '')
|
|
email_references = msg.get('References', '')
|
|
|
|
# Get date
|
|
date_str = msg.get('Date', '')
|
|
received_date = self._parse_email_date(date_str)
|
|
|
|
# Get body
|
|
body_text = ""
|
|
body_html = ""
|
|
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
content_type = part.get_content_type()
|
|
|
|
if content_type == "text/plain":
|
|
try:
|
|
body_text = part.get_payload(decode=True).decode('utf-8', errors='ignore')
|
|
except Exception:
|
|
pass
|
|
|
|
elif content_type == "text/html":
|
|
try:
|
|
body_html = part.get_payload(decode=True).decode('utf-8', errors='ignore')
|
|
except Exception:
|
|
pass
|
|
else:
|
|
try:
|
|
body_text = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
|
|
except Exception:
|
|
body_text = str(msg.get_payload())
|
|
|
|
# Extract attachments
|
|
attachments = []
|
|
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
if part.get_content_maintype() == 'multipart':
|
|
continue
|
|
|
|
# Skip text parts (body content)
|
|
content_type = part.get_content_type()
|
|
if content_type in ['text/plain', 'text/html']:
|
|
continue
|
|
|
|
# Check if part has a filename (indicates attachment)
|
|
filename = part.get_filename()
|
|
|
|
# FALLBACK: If no filename but content-type is audio, generate one
|
|
if not filename and content_type.startswith('audio/'):
|
|
ext = '.mp3'
|
|
if 'wav' in content_type: ext = '.wav'
|
|
elif 'ogg' in content_type: ext = '.ogg'
|
|
elif 'm4a' in content_type: ext = '.m4a'
|
|
filename = f"audio_attachment{ext}"
|
|
logger.info(f"⚠️ Found audio attachment without filename. Generated: {filename}")
|
|
|
|
if filename:
|
|
# Decode filename if needed
|
|
filename = self._decode_header(filename)
|
|
|
|
# Get attachment content
|
|
content = part.get_payload(decode=True)
|
|
content_type = part.get_content_type()
|
|
|
|
if content: # Only add if we got content
|
|
attachments.append({
|
|
'filename': filename,
|
|
'content': content,
|
|
'content_type': content_type,
|
|
'size': len(content)
|
|
})
|
|
|
|
return {
|
|
'message_id': message_id,
|
|
'in_reply_to': in_reply_to,
|
|
'email_references': email_references,
|
|
'subject': subject,
|
|
'sender_name': sender_name,
|
|
'sender_email': sender_email,
|
|
'recipient_email': recipient_email,
|
|
'cc': cc_header,
|
|
'body_text': body_text,
|
|
'body_html': body_html,
|
|
'received_date': received_date,
|
|
'folder': self.imap_config['folder'],
|
|
'has_attachments': len(attachments) > 0,
|
|
'attachment_count': len(attachments),
|
|
'attachments': attachments
|
|
}
|
|
|
|
def _parse_graph_message(self, msg: Dict) -> Dict:
|
|
"""Parse Microsoft Graph API message into dictionary"""
|
|
|
|
# Extract sender
|
|
from_data = msg.get('from', {}).get('emailAddress', {})
|
|
sender_name = from_data.get('name', '')
|
|
sender_email = from_data.get('address', '')
|
|
|
|
# Extract recipient (first TO recipient)
|
|
to_recipients = msg.get('toRecipients', [])
|
|
recipient_email = to_recipients[0]['emailAddress']['address'] if to_recipients else ''
|
|
|
|
# Extract CC recipients
|
|
cc_recipients = msg.get('ccRecipients', [])
|
|
cc = ', '.join([r['emailAddress']['address'] for r in cc_recipients])
|
|
|
|
# Get body
|
|
body_data = msg.get('body', {})
|
|
body_content = body_data.get('content', '')
|
|
body_type = body_data.get('contentType', 'text')
|
|
|
|
body_text = body_content if body_type == 'text' else ''
|
|
body_html = body_content if body_type == 'html' else ''
|
|
|
|
# Parse date
|
|
received_date_str = msg.get('receivedDateTime', '')
|
|
received_date = datetime.fromisoformat(received_date_str.replace('Z', '+00:00')) if received_date_str else datetime.now()
|
|
|
|
return {
|
|
'message_id': msg.get('internetMessageId', msg.get('id', '')),
|
|
'in_reply_to': None,
|
|
'email_references': None,
|
|
'subject': msg.get('subject', ''),
|
|
'sender_name': sender_name,
|
|
'sender_email': sender_email,
|
|
'recipient_email': recipient_email,
|
|
'cc': cc,
|
|
'body_text': body_text,
|
|
'body_html': body_html,
|
|
'received_date': received_date,
|
|
'folder': self.imap_config['folder'],
|
|
'has_attachments': msg.get('hasAttachments', False),
|
|
'attachment_count': 0 # Will be updated after fetching attachments
|
|
}
|
|
|
|
async def _fetch_graph_attachments(
|
|
self,
|
|
user_email: str,
|
|
message_id: str,
|
|
access_token: str,
|
|
session: ClientSession
|
|
) -> List[Dict]:
|
|
"""Fetch attachments for a specific message from Graph API"""
|
|
attachments = []
|
|
|
|
try:
|
|
# Graph API endpoint for message attachments
|
|
url = f"https://graph.microsoft.com/v1.0/users/{user_email}/messages/{message_id}/attachments"
|
|
headers = {
|
|
'Authorization': f'Bearer {access_token}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
async with session.get(url, headers=headers) as response:
|
|
if response.status != 200:
|
|
logger.warning(f"⚠️ Failed to fetch attachments for message {message_id}: {response.status}")
|
|
return []
|
|
|
|
data = await response.json()
|
|
attachment_list = data.get('value', [])
|
|
|
|
for att in attachment_list:
|
|
# Graph API returns base64 content in contentBytes
|
|
content_bytes = att.get('contentBytes', '')
|
|
if content_bytes:
|
|
import base64
|
|
content = base64.b64decode(content_bytes)
|
|
else:
|
|
content = b''
|
|
|
|
# Handle missing filenames for audio (FALLBACK)
|
|
filename = att.get('name')
|
|
content_type = att.get('contentType', 'application/octet-stream')
|
|
|
|
if not filename and content_type.startswith('audio/'):
|
|
ext = '.mp3'
|
|
if 'wav' in content_type: ext = '.wav'
|
|
elif 'ogg' in content_type: ext = '.ogg'
|
|
elif 'm4a' in content_type: ext = '.m4a'
|
|
filename = f"audio_attachment{ext}"
|
|
logger.info(f"⚠️ Found (Graph) audio attachment without filename. Generated: {filename}")
|
|
|
|
attachments.append({
|
|
'filename': filename or 'unknown',
|
|
'content': content,
|
|
'content_type': content_type,
|
|
'size': att.get('size', len(content))
|
|
})
|
|
|
|
logger.info(f"📎 Fetched attachment: {filename} ({att.get('size', 0)} bytes)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error fetching attachments for message {message_id}: {e}")
|
|
|
|
return attachments
|
|
|
|
def _decode_header(self, header: str) -> str:
|
|
"""Decode email header (handles MIME encoding)"""
|
|
if not header:
|
|
return ""
|
|
|
|
decoded_parts = decode_header(header)
|
|
decoded_string = ""
|
|
|
|
for content, encoding in decoded_parts:
|
|
if isinstance(content, bytes):
|
|
try:
|
|
decoded_string += content.decode(encoding or 'utf-8', errors='ignore')
|
|
except Exception:
|
|
decoded_string += content.decode('utf-8', errors='ignore')
|
|
else:
|
|
decoded_string += str(content)
|
|
|
|
return decoded_string.strip()
|
|
|
|
def _parse_email_address(self, header: str) -> Tuple[str, str]:
|
|
"""Parse 'Name <email@domain.com>' into (name, email)"""
|
|
if not header:
|
|
return ("", "")
|
|
|
|
if '<' in header and '>' in header:
|
|
# Format: "Name <email@domain.com>"
|
|
parts = header.split('<')
|
|
name = parts[0].strip().strip('"')
|
|
email_addr = parts[1].strip('>').strip()
|
|
return (name, email_addr)
|
|
else:
|
|
# Just email address
|
|
return ("", header.strip())
|
|
|
|
def _parse_email_date(self, date_str: str) -> datetime:
|
|
"""Parse email date header into datetime object"""
|
|
if not date_str:
|
|
return datetime.now()
|
|
|
|
try:
|
|
# Use email.utils to parse RFC 2822 date
|
|
from email.utils import parsedate_to_datetime
|
|
return parsedate_to_datetime(date_str)
|
|
except Exception:
|
|
logger.warning(f"⚠️ Failed to parse date: {date_str}")
|
|
return datetime.now()
|
|
|
|
def _email_exists(self, message_id: str) -> bool:
|
|
"""Check if email already exists in database"""
|
|
query = "SELECT id FROM email_messages WHERE message_id = %s AND deleted_at IS NULL"
|
|
result = execute_query(query, (message_id,))
|
|
return len(result) > 0
|
|
|
|
async def save_email(self, email_data: Dict) -> Optional[int]:
|
|
"""Save email to database"""
|
|
try:
|
|
query = """
|
|
INSERT INTO email_messages
|
|
(message_id, subject, sender_email, sender_name, recipient_email, cc,
|
|
body_text, body_html, received_date, folder, has_attachments, attachment_count,
|
|
in_reply_to, email_references,
|
|
status, is_read)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'new', false)
|
|
RETURNING id
|
|
"""
|
|
|
|
email_id = execute_insert(query, (
|
|
email_data['message_id'],
|
|
email_data['subject'],
|
|
email_data['sender_email'],
|
|
email_data['sender_name'],
|
|
email_data['recipient_email'],
|
|
email_data['cc'],
|
|
email_data['body_text'],
|
|
email_data['body_html'],
|
|
email_data['received_date'],
|
|
email_data['folder'],
|
|
email_data['has_attachments'],
|
|
email_data['attachment_count'],
|
|
email_data.get('in_reply_to'),
|
|
email_data.get('email_references')
|
|
))
|
|
|
|
logger.info(f"✅ Saved email {email_id}: {email_data['subject'][:50]}...")
|
|
|
|
# Save attachments if any
|
|
if email_data.get('attachments'):
|
|
await self._save_attachments(email_id, email_data['attachments'])
|
|
|
|
return email_id
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error saving email to database: {e}")
|
|
return None
|
|
|
|
async def _save_attachments(self, email_id: int, attachments: List[Dict]):
|
|
"""Save email attachments to disk and database (also stores bytes as fallback)"""
|
|
import os
|
|
import hashlib
|
|
from pathlib import Path
|
|
|
|
# Use absolute path based on UPLOAD_DIR setting
|
|
from app.core.config import settings
|
|
upload_dir = Path(settings.UPLOAD_DIR) / "email_attachments"
|
|
try:
|
|
upload_dir.mkdir(parents=True, exist_ok=True)
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Could not create upload dir {upload_dir}: {e}")
|
|
|
|
for att in attachments:
|
|
try:
|
|
filename = att['filename']
|
|
content = att['content'] # bytes
|
|
content_type = att.get('content_type', 'application/octet-stream')
|
|
size_bytes = att.get('size', len(content) if content else 0)
|
|
|
|
if not content:
|
|
continue
|
|
|
|
# Generate MD5 hash for deduplication
|
|
md5_hash = hashlib.md5(content).hexdigest()
|
|
|
|
# Try to save to disk
|
|
file_path_str = None
|
|
try:
|
|
file_path = upload_dir / f"{md5_hash}_{filename}"
|
|
file_path.write_bytes(content)
|
|
file_path_str = str(file_path)
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Could not save attachment to disk ({filename}): {e}")
|
|
|
|
# Save to database — always store content_data as fallback
|
|
query = """
|
|
INSERT INTO email_attachments
|
|
(email_id, filename, content_type, size_bytes, file_path, content_data)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT DO NOTHING
|
|
"""
|
|
from psycopg2 import Binary
|
|
execute_query(query, (
|
|
email_id,
|
|
filename,
|
|
content_type,
|
|
size_bytes,
|
|
file_path_str,
|
|
Binary(content)
|
|
))
|
|
|
|
logger.info(f"📎 Saved attachment: {filename} ({size_bytes} bytes, disk={file_path_str is not None})")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to save attachment {att.get('filename', '?')}: {e}")
|
|
|
|
async def _resave_attachment_content(self, message_id: str, attachments: List[Dict]):
|
|
"""For existing emails, store attachment bytes in content_data if not already saved"""
|
|
from psycopg2 import Binary
|
|
for att in attachments:
|
|
try:
|
|
filename = att.get('filename')
|
|
content = att.get('content')
|
|
if not filename or not content:
|
|
continue
|
|
query = """
|
|
UPDATE email_attachments
|
|
SET content_data = %s
|
|
WHERE email_id = (
|
|
SELECT id FROM email_messages WHERE message_id = %s LIMIT 1
|
|
)
|
|
AND filename = %s
|
|
AND content_data IS NULL
|
|
"""
|
|
execute_query(query, (Binary(content), message_id, filename))
|
|
logger.debug(f"💾 Re-saved content_data for attachment: {filename}")
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Could not re-save content_data for {att.get('filename', '?')}: {e}")
|
|
|
|
async def get_unprocessed_emails(self, limit: int = 100) -> List[Dict]:
|
|
"""Get emails from database that haven't been processed yet"""
|
|
query = """
|
|
SELECT * FROM email_messages
|
|
WHERE status = 'new' AND deleted_at IS NULL
|
|
ORDER BY received_date DESC
|
|
LIMIT %s
|
|
"""
|
|
result = execute_query(query, (limit,))
|
|
return result
|
|
|
|
async def update_email_status(self, email_id: int, status: str):
|
|
"""Update email processing status"""
|
|
query = "UPDATE email_messages SET status = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s"
|
|
execute_query(query, (status, email_id))
|
|
logger.info(f"✅ Updated email {email_id} status to: {status}")
|
|
|
|
def parse_eml_file(self, content: bytes) -> Optional[Dict]:
|
|
"""
|
|
Parse .eml file content into standardized email dictionary
|
|
|
|
Args:
|
|
content: Raw .eml file bytes
|
|
|
|
Returns:
|
|
Dictionary with email data or None if parsing fails
|
|
"""
|
|
try:
|
|
from email import policy
|
|
from email.parser import BytesParser
|
|
|
|
msg = BytesParser(policy=policy.default).parsebytes(content)
|
|
|
|
# Extract basic fields
|
|
message_id = msg.get("Message-ID", "").strip("<>")
|
|
if not message_id:
|
|
# Generate fallback message ID from date + subject
|
|
import hashlib
|
|
hash_str = f"{msg.get('Date', '')}{msg.get('Subject', '')}{msg.get('From', '')}"
|
|
message_id = f"uploaded-{hashlib.md5(hash_str.encode()).hexdigest()}@bmchub.local"
|
|
|
|
# Parse sender
|
|
from_header = msg.get("From", "")
|
|
sender_name, sender_email = self._parse_email_address(from_header)
|
|
|
|
# Parse recipient
|
|
to_header = msg.get("To", "")
|
|
recipient_name, recipient_email = self._parse_email_address(to_header)
|
|
|
|
# Parse CC
|
|
cc_header = msg.get("Cc", "")
|
|
|
|
# Parse date
|
|
date_str = msg.get("Date")
|
|
try:
|
|
if date_str:
|
|
from email.utils import parsedate_to_datetime
|
|
received_date = parsedate_to_datetime(date_str)
|
|
else:
|
|
received_date = datetime.now()
|
|
except:
|
|
received_date = datetime.now()
|
|
|
|
# Extract body
|
|
body_text = ""
|
|
body_html = ""
|
|
attachments = []
|
|
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
content_type = part.get_content_type()
|
|
content_disposition = part.get_content_disposition()
|
|
|
|
# Get text body
|
|
if content_type == "text/plain" and content_disposition != "attachment":
|
|
try:
|
|
body = part.get_payload(decode=True)
|
|
if body:
|
|
body_text = body.decode('utf-8', errors='ignore')
|
|
except:
|
|
pass
|
|
|
|
# Get HTML body
|
|
elif content_type == "text/html" and content_disposition != "attachment":
|
|
try:
|
|
body = part.get_payload(decode=True)
|
|
if body:
|
|
body_html = body.decode('utf-8', errors='ignore')
|
|
except:
|
|
pass
|
|
|
|
# Get attachments
|
|
elif content_disposition == "attachment":
|
|
filename = part.get_filename()
|
|
if filename:
|
|
try:
|
|
content = part.get_payload(decode=True)
|
|
if content:
|
|
attachments.append({
|
|
"filename": filename,
|
|
"content_type": content_type,
|
|
"content": content,
|
|
"size_bytes": len(content)
|
|
})
|
|
except:
|
|
pass
|
|
else:
|
|
# Single part email
|
|
try:
|
|
body = msg.get_payload(decode=True)
|
|
if body:
|
|
content_type = msg.get_content_type()
|
|
if content_type == "text/html":
|
|
body_html = body.decode('utf-8', errors='ignore')
|
|
else:
|
|
body_text = body.decode('utf-8', errors='ignore')
|
|
except:
|
|
pass
|
|
|
|
return {
|
|
"message_id": message_id,
|
|
"in_reply_to": msg.get("In-Reply-To", ""),
|
|
"email_references": msg.get("References", ""),
|
|
"subject": msg.get("Subject", "No Subject"),
|
|
"sender_name": sender_name,
|
|
"sender_email": sender_email,
|
|
"recipient_email": recipient_email,
|
|
"cc": cc_header,
|
|
"body_text": body_text,
|
|
"body_html": body_html,
|
|
"received_date": received_date,
|
|
"has_attachments": len(attachments) > 0,
|
|
"attachments": attachments,
|
|
"folder": "uploaded"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to parse .eml file: {e}")
|
|
return None
|
|
|
|
def parse_msg_file(self, content: bytes) -> Optional[Dict]:
|
|
"""
|
|
Parse Outlook .msg file content into standardized email dictionary
|
|
|
|
Args:
|
|
content: Raw .msg file bytes
|
|
|
|
Returns:
|
|
Dictionary with email data or None if parsing fails
|
|
"""
|
|
try:
|
|
import extract_msg
|
|
import io
|
|
import hashlib
|
|
|
|
# Create BytesIO object from content
|
|
msg_io = io.BytesIO(content)
|
|
msg = extract_msg.Message(msg_io)
|
|
|
|
# Generate message ID if not present
|
|
message_id = msg.messageId
|
|
if not message_id:
|
|
hash_str = f"{msg.date}{msg.subject}{msg.sender}"
|
|
message_id = f"uploaded-{hashlib.md5(hash_str.encode()).hexdigest()}@bmchub.local"
|
|
else:
|
|
message_id = message_id.strip("<>")
|
|
|
|
# Parse date
|
|
try:
|
|
received_date = msg.date if msg.date else datetime.now()
|
|
except:
|
|
received_date = datetime.now()
|
|
|
|
# Extract attachments
|
|
attachments = []
|
|
for attachment in msg.attachments:
|
|
try:
|
|
attachments.append({
|
|
"filename": attachment.longFilename or attachment.shortFilename or "unknown",
|
|
"content_type": attachment.mimetype or "application/octet-stream",
|
|
"content": attachment.data,
|
|
"size_bytes": len(attachment.data) if attachment.data else 0
|
|
})
|
|
except:
|
|
pass
|
|
|
|
return {
|
|
"message_id": message_id,
|
|
"in_reply_to": None,
|
|
"email_references": None,
|
|
"subject": msg.subject or "No Subject",
|
|
"sender_name": msg.sender or "",
|
|
"sender_email": msg.senderEmail or "",
|
|
"recipient_email": msg.to or "",
|
|
"cc": msg.cc or "",
|
|
"body_text": msg.body or "",
|
|
"body_html": msg.htmlBody or "",
|
|
"received_date": received_date,
|
|
"has_attachments": len(attachments) > 0,
|
|
"attachments": attachments,
|
|
"folder": "uploaded"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to parse .msg file: {e}")
|
|
return None
|
|
|
|
async def save_uploaded_email(self, email_data: Dict) -> Optional[int]:
|
|
"""
|
|
Save uploaded email to database
|
|
|
|
Args:
|
|
email_data: Parsed email dictionary
|
|
|
|
Returns:
|
|
email_id if successful, None if duplicate or error
|
|
"""
|
|
try:
|
|
# Check if email already exists
|
|
check_query = "SELECT id FROM email_messages WHERE message_id = %s"
|
|
existing = execute_query(check_query, (email_data["message_id"],))
|
|
|
|
if existing:
|
|
logger.info(f"⏭️ Email already exists: {email_data['message_id']}")
|
|
return None
|
|
|
|
# Insert email
|
|
query = """
|
|
INSERT INTO email_messages (
|
|
message_id, subject, sender_email, sender_name,
|
|
recipient_email, cc, body_text, body_html,
|
|
received_date, folder, has_attachments, attachment_count,
|
|
in_reply_to, email_references,
|
|
status, import_method, created_at
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
|
|
RETURNING id
|
|
"""
|
|
|
|
result = execute_insert(query, (
|
|
email_data["message_id"],
|
|
email_data["subject"],
|
|
email_data["sender_email"],
|
|
email_data["sender_name"],
|
|
email_data.get("recipient_email", ""),
|
|
email_data.get("cc", ""),
|
|
email_data["body_text"],
|
|
email_data["body_html"],
|
|
email_data["received_date"],
|
|
email_data["folder"],
|
|
email_data["has_attachments"],
|
|
len(email_data.get("attachments", [])),
|
|
email_data.get("in_reply_to"),
|
|
email_data.get("email_references"),
|
|
"new",
|
|
"manual_upload"
|
|
))
|
|
|
|
if not result:
|
|
logger.error("❌ Failed to insert email - no ID returned")
|
|
return None
|
|
|
|
email_id = result[0]["id"]
|
|
|
|
# Save attachments
|
|
if email_data.get("attachments"):
|
|
await self._save_attachments(email_id, email_data["attachments"])
|
|
|
|
logger.info(f"✅ Saved uploaded email: {email_data['subject'][:50]}... (ID: {email_id})")
|
|
return email_id
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to save uploaded email: {e}")
|
|
return None
|
|
|
|
async def send_email(
|
|
self,
|
|
to_addresses: List[str],
|
|
subject: str,
|
|
body_text: str,
|
|
body_html: Optional[str] = None,
|
|
cc: Optional[List[str]] = None,
|
|
bcc: Optional[List[str]] = None,
|
|
reply_to: Optional[str] = None
|
|
) -> Tuple[bool, str]:
|
|
"""
|
|
Send email via SMTP to one or more recipients
|
|
|
|
Args:
|
|
to_addresses: List of recipient email addresses
|
|
subject: Email subject
|
|
body_text: Plain text body
|
|
body_html: Optional HTML body
|
|
cc: Optional list of CC addresses
|
|
bcc: Optional list of BCC addresses
|
|
reply_to: Optional reply-to address
|
|
|
|
Returns:
|
|
Tuple of (success: bool, message: str)
|
|
"""
|
|
|
|
# Safety check
|
|
if settings.REMINDERS_DRY_RUN:
|
|
logger.warning(f"🔒 DRY RUN MODE: Would send email to {to_addresses} with subject '{subject}'")
|
|
return True, "Dry run mode - email not actually sent"
|
|
|
|
# Check if aiosmtplib is available
|
|
if not HAS_AIOSMTPLIB:
|
|
logger.error("❌ aiosmtplib not installed - cannot send email. Install with: pip install aiosmtplib")
|
|
return False, "aiosmtplib not installed"
|
|
|
|
# Validate SMTP configuration
|
|
if not all([settings.EMAIL_SMTP_HOST, settings.EMAIL_SMTP_USER, settings.EMAIL_SMTP_PASSWORD]):
|
|
logger.error("❌ SMTP not configured - cannot send email")
|
|
return False, "SMTP not configured"
|
|
|
|
try:
|
|
# Build message
|
|
msg = MIMEMultipart('alternative')
|
|
msg['Subject'] = subject
|
|
msg['From'] = f"{settings.EMAIL_SMTP_FROM_NAME} <{settings.EMAIL_SMTP_FROM_ADDRESS}>"
|
|
msg['To'] = ', '.join(to_addresses)
|
|
|
|
if cc:
|
|
msg['Cc'] = ', '.join(cc)
|
|
|
|
if reply_to:
|
|
msg['Reply-To'] = reply_to
|
|
|
|
# Attach plain text
|
|
msg.attach(MIMEText(body_text, 'plain'))
|
|
|
|
# Attach HTML if provided
|
|
if body_html:
|
|
msg.attach(MIMEText(body_html, 'html'))
|
|
|
|
# Send via SMTP
|
|
async with aiosmtplib.SMTP(
|
|
hostname=settings.EMAIL_SMTP_HOST,
|
|
port=settings.EMAIL_SMTP_PORT,
|
|
use_tls=settings.EMAIL_SMTP_USE_TLS
|
|
) as smtp:
|
|
await smtp.login(settings.EMAIL_SMTP_USER, settings.EMAIL_SMTP_PASSWORD)
|
|
|
|
# Combine all recipients
|
|
all_recipients = to_addresses.copy()
|
|
if cc:
|
|
all_recipients.extend(cc)
|
|
if bcc:
|
|
all_recipients.extend(bcc)
|
|
|
|
await smtp.sendmail(
|
|
settings.EMAIL_SMTP_FROM_ADDRESS,
|
|
all_recipients,
|
|
msg.as_string()
|
|
)
|
|
|
|
logger.info(f"✅ Email sent successfully to {len(to_addresses)} recipient(s): {subject}")
|
|
return True, f"Email sent to {len(to_addresses)} recipient(s)"
|
|
|
|
except Exception as e:
|
|
error_msg = f"❌ Failed to send email: {str(e)}"
|
|
logger.error(error_msg)
|
|
return False, error_msg
|
|
|
|
async def send_email_with_attachments(
|
|
self,
|
|
to_addresses: List[str],
|
|
subject: str,
|
|
body_text: str,
|
|
body_html: Optional[str] = None,
|
|
cc: Optional[List[str]] = None,
|
|
bcc: Optional[List[str]] = None,
|
|
reply_to: Optional[str] = None,
|
|
attachments: Optional[List[Dict]] = None,
|
|
respect_dry_run: bool = True,
|
|
) -> Tuple[bool, str, str]:
|
|
"""Send email via SMTP with optional attachments and return generated Message-ID."""
|
|
|
|
generated_message_id = f"<{uuid4().hex}@bmchub.local>"
|
|
|
|
if respect_dry_run and settings.REMINDERS_DRY_RUN:
|
|
logger.warning(
|
|
"🔒 DRY RUN MODE: Would send email to %s with subject '%s'",
|
|
to_addresses,
|
|
subject,
|
|
)
|
|
return True, "Dry run mode - email not actually sent", generated_message_id
|
|
|
|
if not HAS_AIOSMTPLIB:
|
|
logger.error("❌ aiosmtplib not installed - cannot send email. Install with: pip install aiosmtplib")
|
|
return False, "aiosmtplib not installed", generated_message_id
|
|
|
|
if not all([settings.EMAIL_SMTP_HOST, settings.EMAIL_SMTP_USER, settings.EMAIL_SMTP_PASSWORD]):
|
|
logger.error("❌ SMTP not configured - cannot send email")
|
|
return False, "SMTP not configured", generated_message_id
|
|
|
|
try:
|
|
msg = MIMEMultipart('mixed')
|
|
msg['Subject'] = subject
|
|
msg['From'] = f"{settings.EMAIL_SMTP_FROM_NAME} <{settings.EMAIL_SMTP_FROM_ADDRESS}>"
|
|
msg['To'] = ', '.join(to_addresses)
|
|
msg['Message-ID'] = generated_message_id
|
|
|
|
if cc:
|
|
msg['Cc'] = ', '.join(cc)
|
|
if reply_to:
|
|
msg['Reply-To'] = reply_to
|
|
|
|
content_part = MIMEMultipart('alternative')
|
|
content_part.attach(MIMEText(body_text, 'plain'))
|
|
if body_html:
|
|
content_part.attach(MIMEText(body_html, 'html'))
|
|
msg.attach(content_part)
|
|
|
|
for attachment in (attachments or []):
|
|
content = attachment.get("content")
|
|
if not content:
|
|
continue
|
|
|
|
filename = attachment.get("filename") or "attachment.bin"
|
|
content_type = attachment.get("content_type") or "application/octet-stream"
|
|
maintype, _, subtype = content_type.partition("/")
|
|
if not maintype or not subtype:
|
|
maintype, subtype = "application", "octet-stream"
|
|
|
|
mime_attachment = MIMEBase(maintype, subtype)
|
|
mime_attachment.set_payload(content)
|
|
encoders.encode_base64(mime_attachment)
|
|
mime_attachment.add_header('Content-Disposition', f'attachment; filename="{filename}"')
|
|
msg.attach(mime_attachment)
|
|
|
|
async with aiosmtplib.SMTP(
|
|
hostname=settings.EMAIL_SMTP_HOST,
|
|
port=settings.EMAIL_SMTP_PORT,
|
|
use_tls=settings.EMAIL_SMTP_USE_TLS
|
|
) as smtp:
|
|
await smtp.login(settings.EMAIL_SMTP_USER, settings.EMAIL_SMTP_PASSWORD)
|
|
|
|
all_recipients = to_addresses.copy()
|
|
if cc:
|
|
all_recipients.extend(cc)
|
|
if bcc:
|
|
all_recipients.extend(bcc)
|
|
|
|
await smtp.sendmail(
|
|
settings.EMAIL_SMTP_FROM_ADDRESS,
|
|
all_recipients,
|
|
msg.as_string()
|
|
)
|
|
|
|
logger.info(
|
|
"✅ Email with attachments sent successfully to %s recipient(s): %s",
|
|
len(to_addresses),
|
|
subject,
|
|
)
|
|
return True, f"Email sent to {len(to_addresses)} recipient(s)", generated_message_id
|
|
|
|
except Exception as e:
|
|
error_msg = f"❌ Failed to send email with attachments: {str(e)}"
|
|
logger.error(error_msg)
|
|
return False, error_msg, generated_message_id
|