bmc_hub/app/services/email_service.py
Christian 30d1be61eb feat: Add global search functionality and email results section
- Introduced a global search button and modal for enhanced user experience.
- Added a new section for displaying email results in the global search modal.
- Implemented functionality to fetch and display emails based on user queries.
- Updated the UI to include a reminders button and improved accessibility features.

fix: Update docker-compose to allow reload configuration

- Changed ENABLE_RELOAD environment variable to default to true for easier development.

chore: Update requirements for new dependencies

- Added brother_ql, pyzbar, and pypdfium2 to requirements for label printing and PDF processing.

feat: Implement Brother label printing service

- Created a new service for printing labels using Brother QL printers.
- Supports direct printing of case hardware labels with customizable layouts.

feat: Add Vaultwarden service for credential management

- Implemented a service to interact with Vaultwarden for secure credential storage and retrieval.

sql: Add migrations for email thread keys and document tokens

- Created migrations to backfill email thread keys and manage document tokens for work orders.
- Introduced new tables and updated existing structures to support token-based linking of scanned documents.

sql: Import links into the database

- Added a script to import a predefined set of links into the database with associated categories.
2026-04-01 21:34:58 +02:00

1579 lines
66 KiB
Python

"""
Email Service
Handles email fetching from IMAP or Microsoft Graph API
Based on OmniSync architecture - READ-ONLY mode for safety
Also handles outbound SMTP email sending for reminders and notifications
"""
import logging
import imaplib
import email
from email.header import decode_header
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
from typing import List, Dict, Optional, Tuple, Any
from datetime import datetime
import json
import asyncio
import base64
import re
from uuid import uuid4
# Try to import aiosmtplib, but don't fail if not available
try:
import aiosmtplib
HAS_AIOSMTPLIB = True
except ImportError:
HAS_AIOSMTPLIB = False
aiosmtplib = None
from aiohttp import ClientSession, BasicAuth
import msal
from app.core.config import settings
from app.core.database import execute_query, execute_insert
logger = logging.getLogger(__name__)
class EmailService:
"""Email fetching service with IMAP and Graph API support"""
def __init__(self):
self.use_graph = settings.USE_GRAPH_API
self.imap_config = {
'server': settings.IMAP_SERVER,
'port': settings.IMAP_PORT,
'username': settings.IMAP_USERNAME,
'password': settings.IMAP_PASSWORD,
'use_ssl': settings.IMAP_USE_SSL,
'folder': settings.IMAP_FOLDER,
'readonly': settings.IMAP_READ_ONLY
}
self.graph_config = {
'tenant_id': settings.GRAPH_TENANT_ID,
'client_id': settings.GRAPH_CLIENT_ID,
'client_secret': settings.GRAPH_CLIENT_SECRET,
'user_email': settings.GRAPH_USER_EMAIL
}
def _graph_send_available(self) -> bool:
return bool(
self.use_graph
and self.graph_config.get('tenant_id')
and self.graph_config.get('client_id')
and self.graph_config.get('client_secret')
and self.graph_config.get('user_email')
)
async def _send_via_graph(
self,
to_addresses: List[str],
subject: str,
body_text: str,
body_html: Optional[str] = None,
cc: Optional[List[str]] = None,
bcc: Optional[List[str]] = None,
reply_to: Optional[str] = None,
in_reply_to: Optional[str] = None,
references: Optional[str] = None,
attachments: Optional[List[Dict]] = None,
) -> Tuple[bool, str, Optional[Dict[str, str]]]:
"""Send email via Microsoft Graph sendMail endpoint."""
access_token = await self._get_graph_access_token()
if not access_token:
return False, "Graph token acquisition failed", None
def _recipient(addr: str) -> Dict:
return {"emailAddress": {"address": addr}}
message: Dict = {
"subject": subject,
"body": {
"contentType": "HTML" if body_html else "Text",
"content": body_html or body_text,
},
"toRecipients": [_recipient(addr) for addr in (to_addresses or [])],
}
if cc:
message["ccRecipients"] = [_recipient(addr) for addr in cc]
if bcc:
message["bccRecipients"] = [_recipient(addr) for addr in bcc]
if reply_to:
message["replyTo"] = [_recipient(reply_to)]
# Microsoft Graph only allows custom internet headers prefixed with x-.
# Standard headers like In-Reply-To/References are rejected with
# InvalidInternetMessageHeader, so only attach safe diagnostic metadata.
headers = []
if in_reply_to:
headers.append({"name": "x-bmc-in-reply-to", "value": in_reply_to[:900]})
if references:
headers.append({"name": "x-bmc-references", "value": references[:900]})
if headers:
message["internetMessageHeaders"] = headers
graph_attachments = []
for attachment in (attachments or []):
content = attachment.get("content")
if not content:
continue
graph_attachments.append({
"@odata.type": "#microsoft.graph.fileAttachment",
"name": attachment.get("filename") or "attachment.bin",
"contentType": attachment.get("content_type") or "application/octet-stream",
"contentBytes": base64.b64encode(content).decode("ascii"),
})
if graph_attachments:
message["attachments"] = graph_attachments
url = f"https://graph.microsoft.com/v1.0/users/{self.graph_config['user_email']}/sendMail"
request_body = {
"message": message,
"saveToSentItems": True,
}
request_headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
try:
async with ClientSession() as session:
async with session.post(url, headers=request_headers, json=request_body) as response:
if response.status in (200, 202):
metadata = None
try:
metadata = await self._find_recent_sent_graph_message(
access_token=access_token,
subject=subject,
to_addresses=to_addresses,
)
except Exception as metadata_error:
logger.warning(
"⚠️ Graph send succeeded but SentItems metadata lookup failed: %s",
metadata_error,
)
return True, f"Email sent to {len(to_addresses)} recipient(s) via Graph", metadata
error_text = await response.text()
logger.error("❌ Graph send failed: status=%s body=%s", response.status, error_text)
return False, f"Graph send failed ({response.status}): {error_text[:300]}", None
except Exception as e:
return False, f"Graph send exception: {str(e)}", None
def _recipient_addresses_match(self, graph_recipients: Optional[List[Dict[str, Any]]], to_addresses: List[str]) -> bool:
if not to_addresses:
return True
expected = {addr.strip().lower() for addr in (to_addresses or []) if addr}
if not expected:
return True
actual = set()
for recipient in graph_recipients or []:
address = (
recipient.get("emailAddress", {}).get("address")
if isinstance(recipient, dict)
else None
)
if address:
actual.add(str(address).strip().lower())
return bool(actual) and expected.issubset(actual)
async def _find_recent_sent_graph_message(
self,
access_token: str,
subject: str,
to_addresses: List[str],
) -> Optional[Dict[str, str]]:
"""Best-effort lookup for the most recent sent Graph message metadata."""
try:
user_email = self.graph_config['user_email']
url = f"https://graph.microsoft.com/v1.0/users/{user_email}/mailFolders/SentItems/messages"
params = {
'$top': 15,
'$orderby': 'sentDateTime desc',
'$select': 'id,subject,toRecipients,internetMessageId,conversationId,sentDateTime'
}
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json',
}
async with ClientSession() as session:
async with session.get(url, params=params, headers=headers) as response:
if response.status != 200:
logger.warning("⚠️ Could not read SentItems metadata (status=%s)", response.status)
return None
payload = await response.json()
messages = payload.get('value') or []
normalized_subject = (subject or '').strip().lower()
for msg in messages:
candidate_subject = str(msg.get('subject') or '').strip().lower()
if normalized_subject and candidate_subject != normalized_subject:
continue
if not self._recipient_addresses_match(msg.get('toRecipients'), to_addresses):
continue
internet_message_id = self._normalize_message_id_value(msg.get('internetMessageId'))
conversation_id = self._normalize_message_id_value(msg.get('conversationId'))
if internet_message_id or conversation_id:
logger.info(
"🧵 Matched sent Graph metadata (conversationId=%s, messageId=%s)",
conversation_id,
internet_message_id,
)
return {
'internet_message_id': internet_message_id,
'conversation_id': conversation_id,
}
except Exception as e:
logger.warning("⚠️ Failed to resolve sent Graph metadata: %s", e)
return None
async def fetch_new_emails(self, limit: int = 50) -> List[Dict]:
"""
Fetch new emails from configured source (IMAP or Graph API)
Returns list of parsed email dictionaries
"""
if self.use_graph and self.graph_config['client_id']:
logger.info("📥 Fetching emails via Microsoft Graph API")
return await self._fetch_via_graph(limit)
elif self.imap_config['username']:
logger.info("📥 Fetching emails via IMAP")
return await self._fetch_via_imap(limit)
else:
logger.warning("⚠️ No email source configured (IMAP or Graph API)")
return []
async def _fetch_via_imap(self, limit: int) -> List[Dict]:
"""Fetch emails using IMAP protocol (READ-ONLY mode)"""
emails = []
try:
# Connect to IMAP server
if self.imap_config['use_ssl']:
mail = imaplib.IMAP4_SSL(self.imap_config['server'], self.imap_config['port'])
else:
mail = imaplib.IMAP4(self.imap_config['server'], self.imap_config['port'])
# Login
mail.login(self.imap_config['username'], self.imap_config['password'])
# Select folder in READ-ONLY mode (critical for safety)
folder = self.imap_config['folder']
readonly = self.imap_config['readonly']
mail.select(folder, readonly=readonly)
if readonly:
logger.info(f"🔒 Connected to {folder} in READ-ONLY mode (emails will NOT be marked as read)")
# Search for all emails
status, messages = mail.search(None, 'ALL')
if status != 'OK':
logger.error(f"❌ IMAP search failed: {status}")
return emails
email_ids = messages[0].split()
total_emails = len(email_ids)
logger.info(f"📊 Found {total_emails} emails in {folder}")
# Get most recent emails (reverse order, limit)
email_ids_to_fetch = email_ids[-limit:] if len(email_ids) > limit else email_ids
email_ids_to_fetch.reverse() # Newest first
for email_id in email_ids_to_fetch:
try:
# Fetch email using BODY.PEEK to avoid marking as read
status, msg_data = mail.fetch(email_id, '(BODY.PEEK[])')
if status != 'OK':
logger.warning(f"⚠️ Failed to fetch email {email_id}: {status}")
continue
# Parse email
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
# Extract fields
parsed_email = self._parse_email(msg, email_id.decode())
# Check if already exists in database
if not self._email_exists(parsed_email['message_id']):
emails.append(parsed_email)
logger.info(f"✅ New email: {parsed_email['subject'][:50]}... from {parsed_email['sender_email']}")
else:
logger.debug(f"⏭️ Email already exists: {parsed_email['message_id']}")
except Exception as e:
logger.error(f"❌ Error parsing email {email_id}: {e}")
continue
# Logout
mail.logout()
logger.info(f"📥 Fetched {len(emails)} new emails via IMAP")
return emails
except imaplib.IMAP4.error as e:
logger.error(f"❌ IMAP error: {e}")
return []
except Exception as e:
logger.error(f"❌ Unexpected error fetching via IMAP: {e}")
return []
async def _fetch_via_graph(self, limit: int) -> List[Dict]:
"""Fetch emails using Microsoft Graph API (OAuth2)"""
emails = []
try:
# Get access token using MSAL
access_token = await self._get_graph_access_token()
if not access_token:
logger.error("❌ Failed to get Graph API access token")
return []
# Build Graph API request
user_email = self.graph_config['user_email']
folder = self.imap_config['folder'] # Use same folder name
# Graph API endpoint for messages
url = f"https://graph.microsoft.com/v1.0/users/{user_email}/mailFolders/{folder}/messages"
params = {
'$top': limit,
'$orderby': 'receivedDateTime desc',
'$select': 'id,subject,from,toRecipients,ccRecipients,receivedDateTime,bodyPreview,body,hasAttachments,internetMessageId,conversationId,internetMessageHeaders'
}
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
async with ClientSession() as session:
async with session.get(url, params=params, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"❌ Graph API error: {response.status} - {error_text}")
return []
data = await response.json()
messages = data.get('value', [])
logger.info(f"📊 Found {len(messages)} emails via Graph API")
for msg in messages:
try:
parsed_email = self._parse_graph_message(msg)
# Fetch attachments if email has them
if msg.get('hasAttachments', False):
attachments = await self._fetch_graph_attachments(
user_email,
msg['id'],
access_token,
session
)
parsed_email['attachments'] = attachments
parsed_email['attachment_count'] = len(attachments)
else:
parsed_email['attachments'] = []
# Check if already exists
if not self._email_exists(parsed_email['message_id']):
emails.append(parsed_email)
logger.info(f"✅ New email: {parsed_email['subject'][:50]}... from {parsed_email['sender_email']}")
else:
logger.debug(f"⏭️ Email already exists: {parsed_email['message_id']}")
# Re-save attachment bytes for existing emails (fills content_data for old emails)
if parsed_email.get('attachments'):
await self._resave_attachment_content(
parsed_email['message_id'],
parsed_email['attachments']
)
except Exception as e:
logger.error(f"❌ Error parsing Graph message: {e}")
continue
logger.info(f"📥 Fetched {len(emails)} new emails via Graph API")
return emails
except Exception as e:
logger.error(f"❌ Unexpected error fetching via Graph API: {e}")
return []
async def _get_graph_access_token(self) -> Optional[str]:
"""Get OAuth2 access token for Microsoft Graph API using MSAL"""
try:
authority = f"https://login.microsoftonline.com/{self.graph_config['tenant_id']}"
app = msal.ConfidentialClientApplication(
self.graph_config['client_id'],
authority=authority,
client_credential=self.graph_config['client_secret']
)
# Request token with Mail.Read scope (Application permission)
scopes = ["https://graph.microsoft.com/.default"]
result = app.acquire_token_for_client(scopes=scopes)
if "access_token" in result:
logger.info("✅ Successfully obtained Graph API access token")
return result["access_token"]
else:
error = result.get("error_description", result.get("error", "Unknown error"))
logger.error(f"❌ Failed to obtain access token: {error}")
return None
except Exception as e:
logger.error(f"❌ Error getting Graph access token: {e}")
return None
def _parse_email(self, msg: email.message.Message, email_id: str) -> Dict:
"""Parse IMAP email message into dictionary"""
# Decode subject
subject = self._decode_header(msg.get('Subject', ''))
# Decode sender
from_header = self._decode_header(msg.get('From', ''))
sender_name, sender_email = self._parse_email_address(from_header)
# Decode recipient
to_header = self._decode_header(msg.get('To', ''))
recipient_name, recipient_email = self._parse_email_address(to_header)
# Decode CC
cc_header = self._decode_header(msg.get('Cc', ''))
# Get message ID
message_id = msg.get('Message-ID', f"imap-{email_id}")
in_reply_to = msg.get('In-Reply-To', '')
email_references = msg.get('References', '')
# Get date
date_str = msg.get('Date', '')
received_date = self._parse_email_date(date_str)
# Get body
body_text = ""
body_html = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
try:
body_text = part.get_payload(decode=True).decode('utf-8', errors='ignore')
except Exception:
pass
elif content_type == "text/html":
try:
body_html = part.get_payload(decode=True).decode('utf-8', errors='ignore')
except Exception:
pass
else:
try:
body_text = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
except Exception:
body_text = str(msg.get_payload())
# Extract attachments
attachments = []
if msg.is_multipart():
for part in msg.walk():
if part.get_content_maintype() == 'multipart':
continue
# Skip text parts (body content)
content_type = part.get_content_type()
if content_type in ['text/plain', 'text/html']:
continue
# Check if part has a filename (indicates attachment)
filename = part.get_filename()
# FALLBACK: If no filename but content-type is audio, generate one
if not filename and content_type.startswith('audio/'):
ext = '.mp3'
if 'wav' in content_type: ext = '.wav'
elif 'ogg' in content_type: ext = '.ogg'
elif 'm4a' in content_type: ext = '.m4a'
filename = f"audio_attachment{ext}"
logger.info(f"⚠️ Found audio attachment without filename. Generated: {filename}")
if filename:
# Decode filename if needed
filename = self._decode_header(filename)
# Get attachment content
content = part.get_payload(decode=True)
content_type = part.get_content_type()
if content: # Only add if we got content
attachments.append({
'filename': filename,
'content': content,
'content_type': content_type,
'size': len(content)
})
return {
'message_id': message_id,
'in_reply_to': in_reply_to,
'email_references': email_references,
'subject': subject,
'sender_name': sender_name,
'sender_email': sender_email,
'recipient_email': recipient_email,
'cc': cc_header,
'body_text': body_text,
'body_html': body_html,
'received_date': received_date,
'folder': self.imap_config['folder'],
'has_attachments': len(attachments) > 0,
'attachment_count': len(attachments),
'attachments': attachments
}
def _parse_graph_message(self, msg: Dict) -> Dict:
"""Parse Microsoft Graph API message into dictionary"""
# Extract sender
from_data = msg.get('from', {}).get('emailAddress', {})
sender_name = from_data.get('name', '')
sender_email = from_data.get('address', '')
# Extract recipient (first TO recipient)
to_recipients = msg.get('toRecipients', [])
recipient_email = to_recipients[0]['emailAddress']['address'] if to_recipients else ''
# Extract CC recipients
cc_recipients = msg.get('ccRecipients', [])
cc = ', '.join([r['emailAddress']['address'] for r in cc_recipients])
# Get body
body_data = msg.get('body', {})
body_content = body_data.get('content', '')
body_type = body_data.get('contentType', 'text')
body_text = body_content if body_type == 'text' else ''
body_html = body_content if body_type == 'html' else ''
# Parse date
received_date_str = msg.get('receivedDateTime', '')
received_date = datetime.fromisoformat(received_date_str.replace('Z', '+00:00')) if received_date_str else datetime.now()
headers = msg.get('internetMessageHeaders') or []
in_reply_to = None
references = None
for header in headers:
name = str(header.get('name') or '').strip().lower()
value = str(header.get('value') or '').strip()
if not value:
continue
if name == 'in-reply-to':
in_reply_to = value
elif name == 'references':
references = value
conversation_id = self._normalize_message_id_value(msg.get('conversationId'))
return {
'message_id': msg.get('internetMessageId', msg.get('id', '')),
'in_reply_to': in_reply_to,
'email_references': references,
'thread_key': conversation_id,
'subject': msg.get('subject', ''),
'sender_name': sender_name,
'sender_email': sender_email,
'recipient_email': recipient_email,
'cc': cc,
'body_text': body_text,
'body_html': body_html,
'received_date': received_date,
'folder': self.imap_config['folder'],
'has_attachments': msg.get('hasAttachments', False),
'attachment_count': 0 # Will be updated after fetching attachments
}
async def _fetch_graph_attachments(
self,
user_email: str,
message_id: str,
access_token: str,
session: ClientSession
) -> List[Dict]:
"""Fetch attachments for a specific message from Graph API"""
attachments = []
try:
# Graph API endpoint for message attachments
url = f"https://graph.microsoft.com/v1.0/users/{user_email}/messages/{message_id}/attachments"
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
async with session.get(url, headers=headers) as response:
if response.status != 200:
logger.warning(f"⚠️ Failed to fetch attachments for message {message_id}: {response.status}")
return []
data = await response.json()
attachment_list = data.get('value', [])
for att in attachment_list:
# Graph API returns base64 content in contentBytes
content_bytes = att.get('contentBytes', '')
if content_bytes:
import base64
content = base64.b64decode(content_bytes)
else:
content = b''
# Handle missing filenames for audio (FALLBACK)
filename = att.get('name')
content_type = att.get('contentType', 'application/octet-stream')
if not filename and content_type.startswith('audio/'):
ext = '.mp3'
if 'wav' in content_type: ext = '.wav'
elif 'ogg' in content_type: ext = '.ogg'
elif 'm4a' in content_type: ext = '.m4a'
filename = f"audio_attachment{ext}"
logger.info(f"⚠️ Found (Graph) audio attachment without filename. Generated: {filename}")
attachments.append({
'filename': filename or 'unknown',
'content': content,
'content_type': content_type,
'size': att.get('size', len(content))
})
logger.info(f"📎 Fetched attachment: {filename} ({att.get('size', 0)} bytes)")
except Exception as e:
logger.error(f"❌ Error fetching attachments for message {message_id}: {e}")
return attachments
def _decode_header(self, header: str) -> str:
"""Decode email header (handles MIME encoding)"""
if not header:
return ""
decoded_parts = decode_header(header)
decoded_string = ""
for content, encoding in decoded_parts:
if isinstance(content, bytes):
try:
decoded_string += content.decode(encoding or 'utf-8', errors='ignore')
except Exception:
decoded_string += content.decode('utf-8', errors='ignore')
else:
decoded_string += str(content)
return decoded_string.strip()
def _parse_email_address(self, header: str) -> Tuple[str, str]:
"""Parse 'Name <email@domain.com>' into (name, email)"""
if not header:
return ("", "")
if '<' in header and '>' in header:
# Format: "Name <email@domain.com>"
parts = header.split('<')
name = parts[0].strip().strip('"')
email_addr = parts[1].strip('>').strip()
return (name, email_addr)
else:
# Just email address
return ("", header.strip())
def _normalize_message_id_value(self, value: Optional[str]) -> Optional[str]:
"""Normalize message-id like tokens for stable thread matching."""
if not value:
return None
normalized = str(value).strip().strip("<>").lower()
normalized = "".join(normalized.split())
return normalized or None
def _extract_reference_ids(self, raw_references: Optional[str]) -> List[str]:
if not raw_references:
return []
refs: List[str] = []
for token in re.split(r"[\s,]+", str(raw_references).strip()):
normalized = self._normalize_message_id_value(token)
if normalized:
refs.append(normalized)
return list(dict.fromkeys(refs))
def _derive_thread_key(self, email_data: Dict) -> Optional[str]:
"""
Derive a stable conversation key.
Priority:
1) First References token (root message id)
2) In-Reply-To
3) Explicit provider thread key (e.g. Graph conversationId)
4) Message-ID
"""
reference_ids = self._extract_reference_ids(email_data.get("email_references"))
if reference_ids:
return reference_ids[0]
in_reply_to = self._normalize_message_id_value(email_data.get("in_reply_to"))
if in_reply_to:
return in_reply_to
explicit_thread_key = self._normalize_message_id_value(email_data.get("thread_key"))
if explicit_thread_key:
return explicit_thread_key
return self._normalize_message_id_value(email_data.get("message_id"))
def _parse_email_date(self, date_str: str) -> datetime:
"""Parse email date header into datetime object"""
if not date_str:
return datetime.now()
try:
# Use email.utils to parse RFC 2822 date
from email.utils import parsedate_to_datetime
return parsedate_to_datetime(date_str)
except Exception:
logger.warning(f"⚠️ Failed to parse date: {date_str}")
return datetime.now()
def _email_exists(self, message_id: str) -> bool:
"""Check if email already exists in database"""
query = "SELECT id FROM email_messages WHERE message_id = %s AND deleted_at IS NULL"
result = execute_query(query, (message_id,))
return len(result) > 0
def _adopt_parent_thread_key(self, email_data: Dict, derived_thread_key: Optional[str]) -> Optional[str]:
"""Look up parent emails by References/In-Reply-To and adopt their thread_key
so outgoing+incoming emails share the same canonical group key."""
# Strategy 1: If the email has an explicit provider thread key (e.g. Graph
# conversationId), check if ANY existing email in the DB already uses it as
# its thread_key. ConversationId is the most reliable stable identifier
# across all emails in an Exchange conversation.
explicit_thread_key = self._normalize_message_id_value(email_data.get("thread_key"))
if explicit_thread_key:
try:
rows = execute_query(
"""
SELECT thread_key
FROM email_messages
WHERE deleted_at IS NULL
AND LOWER(REGEXP_REPLACE(COALESCE(thread_key, ''), '[<>\\s]', '', 'g')) = %s
LIMIT 1
""",
(explicit_thread_key,),
)
if rows:
logger.info(
"🧵 Adopted conversationId thread_key '%s' for incoming email (derived was '%s')",
explicit_thread_key,
derived_thread_key,
)
return explicit_thread_key
except Exception as e:
logger.warning("⚠️ Failed conversationId thread_key lookup: %s", e)
# Strategy 2: Look up parent emails by message_id matching our
# References/In-Reply-To headers.
parent_ids: List[str] = []
ref_ids = self._extract_reference_ids(email_data.get("email_references"))
parent_ids.extend(ref_ids)
in_reply = self._normalize_message_id_value(email_data.get("in_reply_to"))
if in_reply and in_reply not in parent_ids:
parent_ids.append(in_reply)
if not parent_ids:
# Strategy 3: No thread headers at all — try conversationId as thread_key
# even if no existing email has it yet (new conversation from Graph).
if explicit_thread_key:
return explicit_thread_key
return derived_thread_key
# Query parent emails that already have a thread_key stored
placeholders = ",".join(["%s"] * len(parent_ids))
try:
rows = execute_query(
f"""
SELECT thread_key
FROM email_messages
WHERE deleted_at IS NULL
AND thread_key IS NOT NULL
AND TRIM(thread_key) != ''
AND LOWER(REGEXP_REPLACE(COALESCE(message_id, ''), '[<>\\s]', '', 'g')) IN ({placeholders})
ORDER BY received_date ASC
LIMIT 1
""",
tuple(parent_ids),
)
if rows and rows[0].get("thread_key"):
adopted = self._normalize_message_id_value(rows[0]["thread_key"])
if adopted:
logger.info(
"🧵 Adopted parent thread_key '%s' for incoming email (derived was '%s')",
adopted,
derived_thread_key,
)
return adopted
except Exception as e:
logger.warning("⚠️ Failed to adopt parent thread_key: %s", e)
# Fallback: prefer the explicit conversationId over derived References[0]
# since the References message-id often doesn't match any stored message_id
if explicit_thread_key:
return explicit_thread_key
return derived_thread_key
async def save_email(self, email_data: Dict) -> Optional[int]:
"""Save email to database"""
try:
thread_key = self._derive_thread_key(email_data)
# When this email is a reply, look up the parent email(s) by
# message_id matching our References/In-Reply-To. If the parent
# already has a thread_key stored, adopt it so both emails share the
# same canonical key and are grouped in the same visual thread.
thread_key = self._adopt_parent_thread_key(email_data, thread_key)
try:
query = """
INSERT INTO email_messages
(message_id, subject, sender_email, sender_name, recipient_email, cc,
body_text, body_html, received_date, folder, has_attachments, attachment_count,
in_reply_to, email_references, thread_key,
status, is_read)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'new', false)
RETURNING id
"""
email_id = execute_insert(query, (
email_data['message_id'],
email_data['subject'],
email_data['sender_email'],
email_data['sender_name'],
email_data['recipient_email'],
email_data['cc'],
email_data['body_text'],
email_data['body_html'],
email_data['received_date'],
email_data['folder'],
email_data['has_attachments'],
email_data['attachment_count'],
email_data.get('in_reply_to'),
email_data.get('email_references'),
thread_key,
))
except Exception:
query = """
INSERT INTO email_messages
(message_id, subject, sender_email, sender_name, recipient_email, cc,
body_text, body_html, received_date, folder, has_attachments, attachment_count,
in_reply_to, email_references,
status, is_read)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'new', false)
RETURNING id
"""
email_id = execute_insert(query, (
email_data['message_id'],
email_data['subject'],
email_data['sender_email'],
email_data['sender_name'],
email_data['recipient_email'],
email_data['cc'],
email_data['body_text'],
email_data['body_html'],
email_data['received_date'],
email_data['folder'],
email_data['has_attachments'],
email_data['attachment_count'],
email_data.get('in_reply_to'),
email_data.get('email_references')
))
logger.info(f"✅ Saved email {email_id}: {email_data['subject'][:50]}...")
# Save attachments if any
if email_data.get('attachments'):
await self._save_attachments(email_id, email_data['attachments'])
return email_id
except Exception as e:
logger.error(f"❌ Error saving email to database: {e}")
return None
async def _save_attachments(self, email_id: int, attachments: List[Dict]):
"""Save email attachments to disk and database (also stores bytes as fallback)"""
import os
import hashlib
from pathlib import Path
# Use absolute path based on UPLOAD_DIR setting
from app.core.config import settings
upload_dir = Path(settings.UPLOAD_DIR) / "email_attachments"
try:
upload_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
logger.warning(f"⚠️ Could not create upload dir {upload_dir}: {e}")
for att in attachments:
try:
filename = att['filename']
content = att['content'] # bytes
content_type = att.get('content_type', 'application/octet-stream')
size_bytes = att.get('size', len(content) if content else 0)
if not content:
continue
# Generate MD5 hash for deduplication
md5_hash = hashlib.md5(content).hexdigest()
# Try to save to disk
file_path_str = None
try:
file_path = upload_dir / f"{md5_hash}_{filename}"
file_path.write_bytes(content)
file_path_str = str(file_path)
except Exception as e:
logger.warning(f"⚠️ Could not save attachment to disk ({filename}): {e}")
# Save to database — always store content_data as fallback
query = """
INSERT INTO email_attachments
(email_id, filename, content_type, size_bytes, file_path, content_data)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT DO NOTHING
"""
from psycopg2 import Binary
execute_query(query, (
email_id,
filename,
content_type,
size_bytes,
file_path_str,
Binary(content)
))
logger.info(f"📎 Saved attachment: {filename} ({size_bytes} bytes, disk={file_path_str is not None})")
except Exception as e:
logger.error(f"❌ Failed to save attachment {att.get('filename', '?')}: {e}")
async def _resave_attachment_content(self, message_id: str, attachments: List[Dict]):
"""For existing emails, store attachment bytes in content_data if not already saved"""
from psycopg2 import Binary
for att in attachments:
try:
filename = att.get('filename')
content = att.get('content')
if not filename or not content:
continue
query = """
UPDATE email_attachments
SET content_data = %s
WHERE email_id = (
SELECT id FROM email_messages WHERE message_id = %s LIMIT 1
)
AND filename = %s
AND content_data IS NULL
"""
execute_query(query, (Binary(content), message_id, filename))
logger.debug(f"💾 Re-saved content_data for attachment: {filename}")
except Exception as e:
logger.warning(f"⚠️ Could not re-save content_data for {att.get('filename', '?')}: {e}")
async def get_unprocessed_emails(self, limit: int = 100) -> List[Dict]:
"""Get emails from database that haven't been processed yet"""
query = """
SELECT * FROM email_messages
WHERE status = 'new' AND deleted_at IS NULL
ORDER BY received_date DESC
LIMIT %s
"""
result = execute_query(query, (limit,))
return result
async def update_email_status(self, email_id: int, status: str):
"""Update email processing status"""
query = "UPDATE email_messages SET status = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s"
execute_query(query, (status, email_id))
logger.info(f"✅ Updated email {email_id} status to: {status}")
def parse_eml_file(self, content: bytes) -> Optional[Dict]:
"""
Parse .eml file content into standardized email dictionary
Args:
content: Raw .eml file bytes
Returns:
Dictionary with email data or None if parsing fails
"""
try:
from email import policy
from email.parser import BytesParser
msg = BytesParser(policy=policy.default).parsebytes(content)
# Extract basic fields
message_id = msg.get("Message-ID", "").strip("<>")
if not message_id:
# Generate fallback message ID from date + subject
import hashlib
hash_str = f"{msg.get('Date', '')}{msg.get('Subject', '')}{msg.get('From', '')}"
message_id = f"uploaded-{hashlib.md5(hash_str.encode()).hexdigest()}@bmchub.local"
# Parse sender
from_header = msg.get("From", "")
sender_name, sender_email = self._parse_email_address(from_header)
# Parse recipient
to_header = msg.get("To", "")
recipient_name, recipient_email = self._parse_email_address(to_header)
# Parse CC
cc_header = msg.get("Cc", "")
# Parse date
date_str = msg.get("Date")
try:
if date_str:
from email.utils import parsedate_to_datetime
received_date = parsedate_to_datetime(date_str)
else:
received_date = datetime.now()
except:
received_date = datetime.now()
# Extract body
body_text = ""
body_html = ""
attachments = []
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = part.get_content_disposition()
# Get text body
if content_type == "text/plain" and content_disposition != "attachment":
try:
body = part.get_payload(decode=True)
if body:
body_text = body.decode('utf-8', errors='ignore')
except:
pass
# Get HTML body
elif content_type == "text/html" and content_disposition != "attachment":
try:
body = part.get_payload(decode=True)
if body:
body_html = body.decode('utf-8', errors='ignore')
except:
pass
# Get attachments
elif content_disposition == "attachment":
filename = part.get_filename()
if filename:
try:
content = part.get_payload(decode=True)
if content:
attachments.append({
"filename": filename,
"content_type": content_type,
"content": content,
"size_bytes": len(content)
})
except:
pass
else:
# Single part email
try:
body = msg.get_payload(decode=True)
if body:
content_type = msg.get_content_type()
if content_type == "text/html":
body_html = body.decode('utf-8', errors='ignore')
else:
body_text = body.decode('utf-8', errors='ignore')
except:
pass
return {
"message_id": message_id,
"in_reply_to": msg.get("In-Reply-To", ""),
"email_references": msg.get("References", ""),
"subject": msg.get("Subject", "No Subject"),
"sender_name": sender_name,
"sender_email": sender_email,
"recipient_email": recipient_email,
"cc": cc_header,
"body_text": body_text,
"body_html": body_html,
"received_date": received_date,
"has_attachments": len(attachments) > 0,
"attachments": attachments,
"folder": "uploaded"
}
except Exception as e:
logger.error(f"❌ Failed to parse .eml file: {e}")
return None
def parse_msg_file(self, content: bytes) -> Optional[Dict]:
"""
Parse Outlook .msg file content into standardized email dictionary
Args:
content: Raw .msg file bytes
Returns:
Dictionary with email data or None if parsing fails
"""
try:
import extract_msg
import io
import hashlib
# Create BytesIO object from content
msg_io = io.BytesIO(content)
msg = extract_msg.Message(msg_io)
# Generate message ID if not present
message_id = msg.messageId
if not message_id:
hash_str = f"{msg.date}{msg.subject}{msg.sender}"
message_id = f"uploaded-{hashlib.md5(hash_str.encode()).hexdigest()}@bmchub.local"
else:
message_id = message_id.strip("<>")
# Parse date
try:
received_date = msg.date if msg.date else datetime.now()
except:
received_date = datetime.now()
# Extract attachments
attachments = []
for attachment in msg.attachments:
try:
attachments.append({
"filename": attachment.longFilename or attachment.shortFilename or "unknown",
"content_type": attachment.mimetype or "application/octet-stream",
"content": attachment.data,
"size_bytes": len(attachment.data) if attachment.data else 0
})
except:
pass
return {
"message_id": message_id,
"in_reply_to": None,
"email_references": None,
"subject": msg.subject or "No Subject",
"sender_name": msg.sender or "",
"sender_email": msg.senderEmail or "",
"recipient_email": msg.to or "",
"cc": msg.cc or "",
"body_text": msg.body or "",
"body_html": msg.htmlBody or "",
"received_date": received_date,
"has_attachments": len(attachments) > 0,
"attachments": attachments,
"folder": "uploaded"
}
except Exception as e:
logger.error(f"❌ Failed to parse .msg file: {e}")
return None
async def save_uploaded_email(self, email_data: Dict) -> Optional[int]:
"""
Save uploaded email to database
Args:
email_data: Parsed email dictionary
Returns:
email_id if successful, None if duplicate or error
"""
try:
# Check if email already exists
check_query = "SELECT id FROM email_messages WHERE message_id = %s"
existing = execute_query(check_query, (email_data["message_id"],))
if existing:
logger.info(f"⏭️ Email already exists: {email_data['message_id']}")
return None
# Insert email
thread_key = self._derive_thread_key(email_data)
try:
query = """
INSERT INTO email_messages (
message_id, subject, sender_email, sender_name,
recipient_email, cc, body_text, body_html,
received_date, folder, has_attachments, attachment_count,
in_reply_to, email_references, thread_key,
status, import_method, created_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
RETURNING id
"""
result = execute_insert(query, (
email_data["message_id"],
email_data["subject"],
email_data["sender_email"],
email_data["sender_name"],
email_data.get("recipient_email", ""),
email_data.get("cc", ""),
email_data["body_text"],
email_data["body_html"],
email_data["received_date"],
email_data["folder"],
email_data["has_attachments"],
len(email_data.get("attachments", [])),
email_data.get("in_reply_to"),
email_data.get("email_references"),
thread_key,
"new",
"manual_upload"
))
except Exception:
query = """
INSERT INTO email_messages (
message_id, subject, sender_email, sender_name,
recipient_email, cc, body_text, body_html,
received_date, folder, has_attachments, attachment_count,
in_reply_to, email_references,
status, import_method, created_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
RETURNING id
"""
result = execute_insert(query, (
email_data["message_id"],
email_data["subject"],
email_data["sender_email"],
email_data["sender_name"],
email_data.get("recipient_email", ""),
email_data.get("cc", ""),
email_data["body_text"],
email_data["body_html"],
email_data["received_date"],
email_data["folder"],
email_data["has_attachments"],
len(email_data.get("attachments", [])),
email_data.get("in_reply_to"),
email_data.get("email_references"),
"new",
"manual_upload"
))
if not result:
logger.error("❌ Failed to insert email - no ID returned")
return None
email_id = result[0]["id"]
# Save attachments
if email_data.get("attachments"):
await self._save_attachments(email_id, email_data["attachments"])
logger.info(f"✅ Saved uploaded email: {email_data['subject'][:50]}... (ID: {email_id})")
return email_id
except Exception as e:
logger.error(f"❌ Failed to save uploaded email: {e}")
return None
async def send_email(
self,
to_addresses: List[str],
subject: str,
body_text: str,
body_html: Optional[str] = None,
cc: Optional[List[str]] = None,
bcc: Optional[List[str]] = None,
reply_to: Optional[str] = None
) -> Tuple[bool, str]:
"""
Send email via SMTP to one or more recipients
Args:
to_addresses: List of recipient email addresses
subject: Email subject
body_text: Plain text body
body_html: Optional HTML body
cc: Optional list of CC addresses
bcc: Optional list of BCC addresses
reply_to: Optional reply-to address
Returns:
Tuple of (success: bool, message: str)
"""
# Safety check
if settings.REMINDERS_DRY_RUN:
logger.warning(f"🔒 DRY RUN MODE: Would send email to {to_addresses} with subject '{subject}'")
return True, "Dry run mode - email not actually sent"
graph_failure_message: Optional[str] = None
# Prefer Graph send when Graph integration is enabled/configured.
if self._graph_send_available():
graph_ok, graph_message = await self._send_via_graph(
to_addresses=to_addresses,
subject=subject,
body_text=body_text,
body_html=body_html,
cc=cc,
bcc=bcc,
reply_to=reply_to,
)
if graph_ok:
logger.info("✅ Email sent via Graph to %s recipient(s): %s", len(to_addresses), subject)
return True, graph_message
graph_failure_message = graph_message
logger.warning("⚠️ Graph send failed, falling back to SMTP: %s", graph_message)
# Check if aiosmtplib is available
if not HAS_AIOSMTPLIB:
logger.error("❌ aiosmtplib not installed - cannot send email. Install with: pip install aiosmtplib")
if graph_failure_message:
return False, f"Graph failed: {graph_failure_message}; SMTP fallback unavailable: aiosmtplib not installed"
return False, "aiosmtplib not installed"
# Validate SMTP configuration
if not all([settings.EMAIL_SMTP_HOST, settings.EMAIL_SMTP_USER, settings.EMAIL_SMTP_PASSWORD]):
logger.error("❌ SMTP not configured - cannot send email")
if graph_failure_message:
return False, f"Graph failed: {graph_failure_message}; SMTP fallback unavailable: SMTP not configured"
return False, "SMTP not configured"
try:
# Build message
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = f"{settings.EMAIL_SMTP_FROM_NAME} <{settings.EMAIL_SMTP_FROM_ADDRESS}>"
msg['To'] = ', '.join(to_addresses)
if cc:
msg['Cc'] = ', '.join(cc)
if reply_to:
msg['Reply-To'] = reply_to
# Attach plain text
msg.attach(MIMEText(body_text, 'plain'))
# Attach HTML if provided
if body_html:
msg.attach(MIMEText(body_html, 'html'))
# Send via SMTP
async with aiosmtplib.SMTP(
hostname=settings.EMAIL_SMTP_HOST,
port=settings.EMAIL_SMTP_PORT,
use_tls=settings.EMAIL_SMTP_USE_TLS
) as smtp:
await smtp.login(settings.EMAIL_SMTP_USER, settings.EMAIL_SMTP_PASSWORD)
# Combine all recipients
all_recipients = to_addresses.copy()
if cc:
all_recipients.extend(cc)
if bcc:
all_recipients.extend(bcc)
await smtp.sendmail(
settings.EMAIL_SMTP_FROM_ADDRESS,
all_recipients,
msg.as_string()
)
logger.info(f"✅ Email sent successfully to {len(to_addresses)} recipient(s): {subject}")
return True, f"Email sent to {len(to_addresses)} recipient(s)"
except Exception as e:
error_msg = f"❌ SMTP send error: {str(e)}"
logger.error(error_msg)
if graph_failure_message:
return False, f"Graph failed: {graph_failure_message}; SMTP fallback failed: {str(e)}"
return False, error_msg
async def send_email_with_attachments(
self,
to_addresses: List[str],
subject: str,
body_text: str,
body_html: Optional[str] = None,
cc: Optional[List[str]] = None,
bcc: Optional[List[str]] = None,
reply_to: Optional[str] = None,
in_reply_to: Optional[str] = None,
references: Optional[str] = None,
attachments: Optional[List[Dict]] = None,
respect_dry_run: bool = True,
) -> Tuple[bool, str, str, Optional[str]]:
"""Send email and return status, message, message-id, and optional provider thread key."""
generated_message_id = f"<{uuid4().hex}@bmchub.local>"
provider_thread_key: Optional[str] = None
if respect_dry_run and settings.REMINDERS_DRY_RUN:
logger.warning(
"🔒 DRY RUN MODE: Would send email to %s with subject '%s'",
to_addresses,
subject,
)
return True, "Dry run mode - email not actually sent", generated_message_id, provider_thread_key
graph_failure_message: Optional[str] = None
# Prefer Graph send when Graph integration is enabled/configured.
if self._graph_send_available():
graph_ok, graph_message, graph_metadata = await self._send_via_graph(
to_addresses=to_addresses,
subject=subject,
body_text=body_text,
body_html=body_html,
cc=cc,
bcc=bcc,
reply_to=reply_to,
in_reply_to=in_reply_to,
references=references,
attachments=attachments,
)
if graph_ok:
if graph_metadata:
graph_message_id = graph_metadata.get('internet_message_id')
graph_thread_key = graph_metadata.get('conversation_id')
if graph_message_id:
generated_message_id = graph_message_id
if graph_thread_key:
provider_thread_key = graph_thread_key
logger.info(
"✅ Email with attachments sent via Graph to %s recipient(s): %s (thread_key=%s)",
len(to_addresses),
subject,
provider_thread_key,
)
return True, graph_message, generated_message_id, provider_thread_key
graph_failure_message = graph_message
logger.warning("⚠️ Graph send with attachments failed, falling back to SMTP: %s", graph_message)
if not HAS_AIOSMTPLIB:
logger.error("❌ aiosmtplib not installed - cannot send email. Install with: pip install aiosmtplib")
if graph_failure_message:
return False, f"Graph failed: {graph_failure_message}; SMTP fallback unavailable: aiosmtplib not installed", generated_message_id, provider_thread_key
return False, "aiosmtplib not installed", generated_message_id, provider_thread_key
if not all([settings.EMAIL_SMTP_HOST, settings.EMAIL_SMTP_USER, settings.EMAIL_SMTP_PASSWORD]):
logger.error("❌ SMTP not configured - cannot send email")
if graph_failure_message:
return False, f"Graph failed: {graph_failure_message}; SMTP fallback unavailable: SMTP not configured", generated_message_id, provider_thread_key
return False, "SMTP not configured", generated_message_id, provider_thread_key
try:
msg = MIMEMultipart('mixed')
msg['Subject'] = subject
msg['From'] = f"{settings.EMAIL_SMTP_FROM_NAME} <{settings.EMAIL_SMTP_FROM_ADDRESS}>"
msg['To'] = ', '.join(to_addresses)
msg['Message-ID'] = generated_message_id
if cc:
msg['Cc'] = ', '.join(cc)
if reply_to:
msg['Reply-To'] = reply_to
if in_reply_to:
msg['In-Reply-To'] = in_reply_to
if references:
msg['References'] = references
content_part = MIMEMultipart('alternative')
content_part.attach(MIMEText(body_text, 'plain'))
if body_html:
content_part.attach(MIMEText(body_html, 'html'))
msg.attach(content_part)
for attachment in (attachments or []):
content = attachment.get("content")
if not content:
continue
filename = attachment.get("filename") or "attachment.bin"
content_type = attachment.get("content_type") or "application/octet-stream"
maintype, _, subtype = content_type.partition("/")
if not maintype or not subtype:
maintype, subtype = "application", "octet-stream"
mime_attachment = MIMEBase(maintype, subtype)
mime_attachment.set_payload(content)
encoders.encode_base64(mime_attachment)
mime_attachment.add_header('Content-Disposition', f'attachment; filename="{filename}"')
msg.attach(mime_attachment)
async with aiosmtplib.SMTP(
hostname=settings.EMAIL_SMTP_HOST,
port=settings.EMAIL_SMTP_PORT,
use_tls=settings.EMAIL_SMTP_USE_TLS
) as smtp:
await smtp.login(settings.EMAIL_SMTP_USER, settings.EMAIL_SMTP_PASSWORD)
all_recipients = to_addresses.copy()
if cc:
all_recipients.extend(cc)
if bcc:
all_recipients.extend(bcc)
await smtp.sendmail(
settings.EMAIL_SMTP_FROM_ADDRESS,
all_recipients,
msg.as_string()
)
logger.info(
"✅ Email with attachments sent successfully to %s recipient(s): %s",
len(to_addresses),
subject,
)
return True, f"Email sent to {len(to_addresses)} recipient(s)", generated_message_id, provider_thread_key
except Exception as e:
error_msg = f"❌ SMTP send error (attachments): {str(e)}"
logger.error(error_msg)
if graph_failure_message:
return False, f"Graph failed: {graph_failure_message}; SMTP fallback failed: {str(e)}", generated_message_id, provider_thread_key
return False, error_msg, generated_message_id, provider_thread_key