bmc_hub/app/services/email_service.py

855 lines
34 KiB
Python
Raw Normal View History

"""
Email Service
Handles email fetching from IMAP or Microsoft Graph API
Based on OmniSync architecture - READ-ONLY mode for safety
"""
import logging
import imaplib
import email
from email.header import decode_header
from typing import List, Dict, Optional, Tuple
from datetime import datetime
import json
import asyncio
import base64
from aiohttp import ClientSession, BasicAuth
import msal
from app.core.config import settings
from app.core.database import execute_query, execute_insert
logger = logging.getLogger(__name__)
class EmailService:
"""Email fetching service with IMAP and Graph API support"""
def __init__(self):
self.use_graph = settings.USE_GRAPH_API
self.imap_config = {
'server': settings.IMAP_SERVER,
'port': settings.IMAP_PORT,
'username': settings.IMAP_USERNAME,
'password': settings.IMAP_PASSWORD,
'use_ssl': settings.IMAP_USE_SSL,
'folder': settings.IMAP_FOLDER,
'readonly': settings.IMAP_READ_ONLY
}
self.graph_config = {
'tenant_id': settings.GRAPH_TENANT_ID,
'client_id': settings.GRAPH_CLIENT_ID,
'client_secret': settings.GRAPH_CLIENT_SECRET,
'user_email': settings.GRAPH_USER_EMAIL
}
async def fetch_new_emails(self, limit: int = 50) -> List[Dict]:
"""
Fetch new emails from configured source (IMAP or Graph API)
Returns list of parsed email dictionaries
"""
if self.use_graph and self.graph_config['client_id']:
logger.info("📥 Fetching emails via Microsoft Graph API")
return await self._fetch_via_graph(limit)
elif self.imap_config['username']:
logger.info("📥 Fetching emails via IMAP")
return await self._fetch_via_imap(limit)
else:
logger.warning("⚠️ No email source configured (IMAP or Graph API)")
return []
async def _fetch_via_imap(self, limit: int) -> List[Dict]:
"""Fetch emails using IMAP protocol (READ-ONLY mode)"""
emails = []
try:
# Connect to IMAP server
if self.imap_config['use_ssl']:
mail = imaplib.IMAP4_SSL(self.imap_config['server'], self.imap_config['port'])
else:
mail = imaplib.IMAP4(self.imap_config['server'], self.imap_config['port'])
# Login
mail.login(self.imap_config['username'], self.imap_config['password'])
# Select folder in READ-ONLY mode (critical for safety)
folder = self.imap_config['folder']
readonly = self.imap_config['readonly']
mail.select(folder, readonly=readonly)
if readonly:
logger.info(f"🔒 Connected to {folder} in READ-ONLY mode (emails will NOT be marked as read)")
# Search for all emails
status, messages = mail.search(None, 'ALL')
if status != 'OK':
logger.error(f"❌ IMAP search failed: {status}")
return emails
email_ids = messages[0].split()
total_emails = len(email_ids)
logger.info(f"📊 Found {total_emails} emails in {folder}")
# Get most recent emails (reverse order, limit)
email_ids_to_fetch = email_ids[-limit:] if len(email_ids) > limit else email_ids
email_ids_to_fetch.reverse() # Newest first
for email_id in email_ids_to_fetch:
try:
# Fetch email using BODY.PEEK to avoid marking as read
status, msg_data = mail.fetch(email_id, '(BODY.PEEK[])')
if status != 'OK':
logger.warning(f"⚠️ Failed to fetch email {email_id}: {status}")
continue
# Parse email
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
# Extract fields
parsed_email = self._parse_email(msg, email_id.decode())
# Check if already exists in database
if not self._email_exists(parsed_email['message_id']):
emails.append(parsed_email)
logger.info(f"✅ New email: {parsed_email['subject'][:50]}... from {parsed_email['sender_email']}")
else:
logger.debug(f"⏭️ Email already exists: {parsed_email['message_id']}")
except Exception as e:
logger.error(f"❌ Error parsing email {email_id}: {e}")
continue
# Logout
mail.logout()
logger.info(f"📥 Fetched {len(emails)} new emails via IMAP")
return emails
except imaplib.IMAP4.error as e:
logger.error(f"❌ IMAP error: {e}")
return []
except Exception as e:
logger.error(f"❌ Unexpected error fetching via IMAP: {e}")
return []
async def _fetch_via_graph(self, limit: int) -> List[Dict]:
"""Fetch emails using Microsoft Graph API (OAuth2)"""
emails = []
try:
# Get access token using MSAL
access_token = await self._get_graph_access_token()
if not access_token:
logger.error("❌ Failed to get Graph API access token")
return []
# Build Graph API request
user_email = self.graph_config['user_email']
folder = self.imap_config['folder'] # Use same folder name
# Graph API endpoint for messages
url = f"https://graph.microsoft.com/v1.0/users/{user_email}/mailFolders/{folder}/messages"
params = {
'$top': limit,
'$orderby': 'receivedDateTime desc',
'$select': 'id,subject,from,toRecipients,ccRecipients,receivedDateTime,bodyPreview,body,hasAttachments,internetMessageId'
}
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
async with ClientSession() as session:
async with session.get(url, params=params, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"❌ Graph API error: {response.status} - {error_text}")
return []
data = await response.json()
messages = data.get('value', [])
logger.info(f"📊 Found {len(messages)} emails via Graph API")
for msg in messages:
try:
parsed_email = self._parse_graph_message(msg)
# Fetch attachments if email has them
if msg.get('hasAttachments', False):
attachments = await self._fetch_graph_attachments(
user_email,
msg['id'],
access_token,
session
)
parsed_email['attachments'] = attachments
parsed_email['attachment_count'] = len(attachments)
else:
parsed_email['attachments'] = []
# Check if already exists
if not self._email_exists(parsed_email['message_id']):
emails.append(parsed_email)
logger.info(f"✅ New email: {parsed_email['subject'][:50]}... from {parsed_email['sender_email']}")
else:
logger.debug(f"⏭️ Email already exists: {parsed_email['message_id']}")
except Exception as e:
logger.error(f"❌ Error parsing Graph message: {e}")
continue
logger.info(f"📥 Fetched {len(emails)} new emails via Graph API")
return emails
except Exception as e:
logger.error(f"❌ Unexpected error fetching via Graph API: {e}")
return []
async def _get_graph_access_token(self) -> Optional[str]:
"""Get OAuth2 access token for Microsoft Graph API using MSAL"""
try:
authority = f"https://login.microsoftonline.com/{self.graph_config['tenant_id']}"
app = msal.ConfidentialClientApplication(
self.graph_config['client_id'],
authority=authority,
client_credential=self.graph_config['client_secret']
)
# Request token with Mail.Read scope (Application permission)
scopes = ["https://graph.microsoft.com/.default"]
result = app.acquire_token_for_client(scopes=scopes)
if "access_token" in result:
logger.info("✅ Successfully obtained Graph API access token")
return result["access_token"]
else:
error = result.get("error_description", result.get("error", "Unknown error"))
logger.error(f"❌ Failed to obtain access token: {error}")
return None
except Exception as e:
logger.error(f"❌ Error getting Graph access token: {e}")
return None
def _parse_email(self, msg: email.message.Message, email_id: str) -> Dict:
"""Parse IMAP email message into dictionary"""
# Decode subject
subject = self._decode_header(msg.get('Subject', ''))
# Decode sender
from_header = self._decode_header(msg.get('From', ''))
sender_name, sender_email = self._parse_email_address(from_header)
# Decode recipient
to_header = self._decode_header(msg.get('To', ''))
recipient_name, recipient_email = self._parse_email_address(to_header)
# Decode CC
cc_header = self._decode_header(msg.get('Cc', ''))
# Get message ID
message_id = msg.get('Message-ID', f"imap-{email_id}")
# Get date
date_str = msg.get('Date', '')
received_date = self._parse_email_date(date_str)
# Get body
body_text = ""
body_html = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
try:
body_text = part.get_payload(decode=True).decode('utf-8', errors='ignore')
except Exception:
pass
elif content_type == "text/html":
try:
body_html = part.get_payload(decode=True).decode('utf-8', errors='ignore')
except Exception:
pass
else:
try:
body_text = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
except Exception:
body_text = str(msg.get_payload())
# Extract attachments
attachments = []
if msg.is_multipart():
for part in msg.walk():
if part.get_content_maintype() == 'multipart':
continue
# Skip text parts (body content)
content_type = part.get_content_type()
if content_type in ['text/plain', 'text/html']:
continue
# Check if part has a filename (indicates attachment)
filename = part.get_filename()
# FALLBACK: If no filename but content-type is audio, generate one
if not filename and content_type.startswith('audio/'):
ext = '.mp3'
if 'wav' in content_type: ext = '.wav'
elif 'ogg' in content_type: ext = '.ogg'
elif 'm4a' in content_type: ext = '.m4a'
filename = f"audio_attachment{ext}"
logger.info(f"⚠️ Found audio attachment without filename. Generated: {filename}")
if filename:
# Decode filename if needed
filename = self._decode_header(filename)
# Get attachment content
content = part.get_payload(decode=True)
content_type = part.get_content_type()
if content: # Only add if we got content
attachments.append({
'filename': filename,
'content': content,
'content_type': content_type,
'size': len(content)
})
return {
'message_id': message_id,
'subject': subject,
'sender_name': sender_name,
'sender_email': sender_email,
'recipient_email': recipient_email,
'cc': cc_header,
'body_text': body_text,
'body_html': body_html,
'received_date': received_date,
'folder': self.imap_config['folder'],
'has_attachments': len(attachments) > 0,
'attachment_count': len(attachments),
'attachments': attachments
}
def _parse_graph_message(self, msg: Dict) -> Dict:
"""Parse Microsoft Graph API message into dictionary"""
# Extract sender
from_data = msg.get('from', {}).get('emailAddress', {})
sender_name = from_data.get('name', '')
sender_email = from_data.get('address', '')
# Extract recipient (first TO recipient)
to_recipients = msg.get('toRecipients', [])
recipient_email = to_recipients[0]['emailAddress']['address'] if to_recipients else ''
# Extract CC recipients
cc_recipients = msg.get('ccRecipients', [])
cc = ', '.join([r['emailAddress']['address'] for r in cc_recipients])
# Get body
body_data = msg.get('body', {})
body_content = body_data.get('content', '')
body_type = body_data.get('contentType', 'text')
body_text = body_content if body_type == 'text' else ''
body_html = body_content if body_type == 'html' else ''
# Parse date
received_date_str = msg.get('receivedDateTime', '')
received_date = datetime.fromisoformat(received_date_str.replace('Z', '+00:00')) if received_date_str else datetime.now()
return {
'message_id': msg.get('internetMessageId', msg.get('id', '')),
'subject': msg.get('subject', ''),
'sender_name': sender_name,
'sender_email': sender_email,
'recipient_email': recipient_email,
'cc': cc,
'body_text': body_text,
'body_html': body_html,
'received_date': received_date,
'folder': self.imap_config['folder'],
'has_attachments': msg.get('hasAttachments', False),
'attachment_count': 0 # Will be updated after fetching attachments
}
async def _fetch_graph_attachments(
self,
user_email: str,
message_id: str,
access_token: str,
session: ClientSession
) -> List[Dict]:
"""Fetch attachments for a specific message from Graph API"""
attachments = []
try:
# Graph API endpoint for message attachments
url = f"https://graph.microsoft.com/v1.0/users/{user_email}/messages/{message_id}/attachments"
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
async with session.get(url, headers=headers) as response:
if response.status != 200:
logger.warning(f"⚠️ Failed to fetch attachments for message {message_id}: {response.status}")
return []
data = await response.json()
attachment_list = data.get('value', [])
for att in attachment_list:
# Graph API returns base64 content in contentBytes
content_bytes = att.get('contentBytes', '')
if content_bytes:
import base64
content = base64.b64decode(content_bytes)
else:
content = b''
# Handle missing filenames for audio (FALLBACK)
filename = att.get('name')
content_type = att.get('contentType', 'application/octet-stream')
if not filename and content_type.startswith('audio/'):
ext = '.mp3'
if 'wav' in content_type: ext = '.wav'
elif 'ogg' in content_type: ext = '.ogg'
elif 'm4a' in content_type: ext = '.m4a'
filename = f"audio_attachment{ext}"
logger.info(f"⚠️ Found (Graph) audio attachment without filename. Generated: {filename}")
attachments.append({
'filename': filename or 'unknown',
'content': content,
'content_type': content_type,
'size': att.get('size', len(content))
})
logger.info(f"📎 Fetched attachment: {filename} ({att.get('size', 0)} bytes)")
except Exception as e:
logger.error(f"❌ Error fetching attachments for message {message_id}: {e}")
return attachments
def _decode_header(self, header: str) -> str:
"""Decode email header (handles MIME encoding)"""
if not header:
return ""
decoded_parts = decode_header(header)
decoded_string = ""
for content, encoding in decoded_parts:
if isinstance(content, bytes):
try:
decoded_string += content.decode(encoding or 'utf-8', errors='ignore')
except Exception:
decoded_string += content.decode('utf-8', errors='ignore')
else:
decoded_string += str(content)
return decoded_string.strip()
def _parse_email_address(self, header: str) -> Tuple[str, str]:
"""Parse 'Name <email@domain.com>' into (name, email)"""
if not header:
return ("", "")
if '<' in header and '>' in header:
# Format: "Name <email@domain.com>"
parts = header.split('<')
name = parts[0].strip().strip('"')
email_addr = parts[1].strip('>').strip()
return (name, email_addr)
else:
# Just email address
return ("", header.strip())
def _parse_email_date(self, date_str: str) -> datetime:
"""Parse email date header into datetime object"""
if not date_str:
return datetime.now()
try:
# Use email.utils to parse RFC 2822 date
from email.utils import parsedate_to_datetime
return parsedate_to_datetime(date_str)
except Exception:
logger.warning(f"⚠️ Failed to parse date: {date_str}")
return datetime.now()
def _email_exists(self, message_id: str) -> bool:
"""Check if email already exists in database"""
query = "SELECT id FROM email_messages WHERE message_id = %s AND deleted_at IS NULL"
result = execute_query(query, (message_id,))
return len(result) > 0
async def save_email(self, email_data: Dict) -> Optional[int]:
"""Save email to database"""
try:
query = """
INSERT INTO email_messages
(message_id, subject, sender_email, sender_name, recipient_email, cc,
body_text, body_html, received_date, folder, has_attachments, attachment_count,
status, is_read)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'new', false)
RETURNING id
"""
email_id = execute_insert(query, (
email_data['message_id'],
email_data['subject'],
email_data['sender_email'],
email_data['sender_name'],
email_data['recipient_email'],
email_data['cc'],
email_data['body_text'],
email_data['body_html'],
email_data['received_date'],
email_data['folder'],
email_data['has_attachments'],
email_data['attachment_count']
))
logger.info(f"✅ Saved email {email_id}: {email_data['subject'][:50]}...")
# Save attachments if any
if email_data.get('attachments'):
await self._save_attachments(email_id, email_data['attachments'])
return email_id
except Exception as e:
logger.error(f"❌ Error saving email to database: {e}")
return None
async def _save_attachments(self, email_id: int, attachments: List[Dict]):
"""Save email attachments to disk and database"""
import os
import hashlib
from pathlib import Path
# Create uploads directory if not exists
upload_dir = Path("uploads/email_attachments")
upload_dir.mkdir(parents=True, exist_ok=True)
for att in attachments:
try:
filename = att['filename']
content = att['content']
content_type = att.get('content_type', 'application/octet-stream')
size_bytes = att['size']
# Generate MD5 hash for deduplication
md5_hash = hashlib.md5(content).hexdigest()
# Save to disk with hash prefix
file_path = upload_dir / f"{md5_hash}_{filename}"
file_path.write_bytes(content)
# Save to database
query = """
INSERT INTO email_attachments
(email_id, filename, content_type, size_bytes, file_path)
VALUES (%s, %s, %s, %s, %s)
"""
execute_insert(query, (
email_id,
filename,
content_type,
size_bytes,
str(file_path)
))
logger.info(f"📎 Saved attachment: {filename} ({size_bytes} bytes)")
except Exception as e:
logger.error(f"❌ Failed to save attachment {filename}: {e}")
async def get_unprocessed_emails(self, limit: int = 100) -> List[Dict]:
"""Get emails from database that haven't been processed yet"""
query = """
SELECT * FROM email_messages
WHERE status = 'new' AND deleted_at IS NULL
ORDER BY received_date DESC
LIMIT %s
"""
result = execute_query(query, (limit,))
return result
async def update_email_status(self, email_id: int, status: str):
"""Update email processing status"""
query = "UPDATE email_messages SET status = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s"
execute_query(query, (status, email_id))
logger.info(f"✅ Updated email {email_id} status to: {status}")
def parse_eml_file(self, content: bytes) -> Optional[Dict]:
"""
Parse .eml file content into standardized email dictionary
Args:
content: Raw .eml file bytes
Returns:
Dictionary with email data or None if parsing fails
"""
try:
from email import policy
from email.parser import BytesParser
msg = BytesParser(policy=policy.default).parsebytes(content)
# Extract basic fields
message_id = msg.get("Message-ID", "").strip("<>")
if not message_id:
# Generate fallback message ID from date + subject
import hashlib
hash_str = f"{msg.get('Date', '')}{msg.get('Subject', '')}{msg.get('From', '')}"
message_id = f"uploaded-{hashlib.md5(hash_str.encode()).hexdigest()}@bmchub.local"
# Parse sender
from_header = msg.get("From", "")
sender_name, sender_email = self._parse_email_address(from_header)
# Parse recipient
to_header = msg.get("To", "")
recipient_name, recipient_email = self._parse_email_address(to_header)
# Parse CC
cc_header = msg.get("Cc", "")
# Parse date
date_str = msg.get("Date")
try:
if date_str:
from email.utils import parsedate_to_datetime
received_date = parsedate_to_datetime(date_str)
else:
received_date = datetime.now()
except:
received_date = datetime.now()
# Extract body
body_text = ""
body_html = ""
attachments = []
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = part.get_content_disposition()
# Get text body
if content_type == "text/plain" and content_disposition != "attachment":
try:
body = part.get_payload(decode=True)
if body:
body_text = body.decode('utf-8', errors='ignore')
except:
pass
# Get HTML body
elif content_type == "text/html" and content_disposition != "attachment":
try:
body = part.get_payload(decode=True)
if body:
body_html = body.decode('utf-8', errors='ignore')
except:
pass
# Get attachments
elif content_disposition == "attachment":
filename = part.get_filename()
if filename:
try:
content = part.get_payload(decode=True)
if content:
attachments.append({
"filename": filename,
"content_type": content_type,
"content": content,
"size_bytes": len(content)
})
except:
pass
else:
# Single part email
try:
body = msg.get_payload(decode=True)
if body:
content_type = msg.get_content_type()
if content_type == "text/html":
body_html = body.decode('utf-8', errors='ignore')
else:
body_text = body.decode('utf-8', errors='ignore')
except:
pass
return {
"message_id": message_id,
"subject": msg.get("Subject", "No Subject"),
"sender_name": sender_name,
"sender_email": sender_email,
"recipient_email": recipient_email,
"cc": cc_header,
"body_text": body_text,
"body_html": body_html,
"received_date": received_date,
"has_attachments": len(attachments) > 0,
"attachments": attachments,
"folder": "uploaded"
}
except Exception as e:
logger.error(f"❌ Failed to parse .eml file: {e}")
return None
def parse_msg_file(self, content: bytes) -> Optional[Dict]:
"""
Parse Outlook .msg file content into standardized email dictionary
Args:
content: Raw .msg file bytes
Returns:
Dictionary with email data or None if parsing fails
"""
try:
import extract_msg
import io
import hashlib
# Create BytesIO object from content
msg_io = io.BytesIO(content)
msg = extract_msg.Message(msg_io)
# Generate message ID if not present
message_id = msg.messageId
if not message_id:
hash_str = f"{msg.date}{msg.subject}{msg.sender}"
message_id = f"uploaded-{hashlib.md5(hash_str.encode()).hexdigest()}@bmchub.local"
else:
message_id = message_id.strip("<>")
# Parse date
try:
received_date = msg.date if msg.date else datetime.now()
except:
received_date = datetime.now()
# Extract attachments
attachments = []
for attachment in msg.attachments:
try:
attachments.append({
"filename": attachment.longFilename or attachment.shortFilename or "unknown",
"content_type": attachment.mimetype or "application/octet-stream",
"content": attachment.data,
"size_bytes": len(attachment.data) if attachment.data else 0
})
except:
pass
return {
"message_id": message_id,
"subject": msg.subject or "No Subject",
"sender_name": msg.sender or "",
"sender_email": msg.senderEmail or "",
"recipient_email": msg.to or "",
"cc": msg.cc or "",
"body_text": msg.body or "",
"body_html": msg.htmlBody or "",
"received_date": received_date,
"has_attachments": len(attachments) > 0,
"attachments": attachments,
"folder": "uploaded"
}
except Exception as e:
logger.error(f"❌ Failed to parse .msg file: {e}")
return None
async def save_uploaded_email(self, email_data: Dict) -> Optional[int]:
"""
Save uploaded email to database
Args:
email_data: Parsed email dictionary
Returns:
email_id if successful, None if duplicate or error
"""
try:
# Check if email already exists
check_query = "SELECT id FROM email_messages WHERE message_id = %s"
existing = execute_query(check_query, (email_data["message_id"],))
if existing:
logger.info(f"⏭️ Email already exists: {email_data['message_id']}")
return None
# Insert email
query = """
INSERT INTO email_messages (
message_id, subject, sender_email, sender_name,
recipient_email, cc, body_text, body_html,
received_date, folder, has_attachments, attachment_count,
status, import_method, created_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
RETURNING id
"""
result = execute_insert(query, (
email_data["message_id"],
email_data["subject"],
email_data["sender_email"],
email_data["sender_name"],
email_data.get("recipient_email", ""),
email_data.get("cc", ""),
email_data["body_text"],
email_data["body_html"],
email_data["received_date"],
email_data["folder"],
email_data["has_attachments"],
len(email_data.get("attachments", [])),
"new",
"manual_upload"
))
if not result:
logger.error("❌ Failed to insert email - no ID returned")
return None
email_id = result[0]["id"]
# Save attachments
if email_data.get("attachments"):
await self._save_attachments(email_id, email_data["attachments"])
logger.info(f"✅ Saved uploaded email: {email_data['subject'][:50]}... (ID: {email_id})")
return email_id
except Exception as e:
logger.error(f"❌ Failed to save uploaded email: {e}")
return None