""" Email Service Handles email fetching from IMAP or Microsoft Graph API Based on OmniSync architecture - READ-ONLY mode for safety """ import logging import imaplib import email from email.header import decode_header from typing import List, Dict, Optional, Tuple from datetime import datetime import json import asyncio import base64 from aiohttp import ClientSession, BasicAuth import msal from app.core.config import settings from app.core.database import execute_query, execute_insert logger = logging.getLogger(__name__) class EmailService: """Email fetching service with IMAP and Graph API support""" def __init__(self): self.use_graph = settings.USE_GRAPH_API self.imap_config = { 'server': settings.IMAP_SERVER, 'port': settings.IMAP_PORT, 'username': settings.IMAP_USERNAME, 'password': settings.IMAP_PASSWORD, 'use_ssl': settings.IMAP_USE_SSL, 'folder': settings.IMAP_FOLDER, 'readonly': settings.IMAP_READ_ONLY } self.graph_config = { 'tenant_id': settings.GRAPH_TENANT_ID, 'client_id': settings.GRAPH_CLIENT_ID, 'client_secret': settings.GRAPH_CLIENT_SECRET, 'user_email': settings.GRAPH_USER_EMAIL } async def fetch_new_emails(self, limit: int = 50) -> List[Dict]: """ Fetch new emails from configured source (IMAP or Graph API) Returns list of parsed email dictionaries """ if self.use_graph and self.graph_config['client_id']: logger.info("📥 Fetching emails via Microsoft Graph API") return await self._fetch_via_graph(limit) elif self.imap_config['username']: logger.info("📥 Fetching emails via IMAP") return await self._fetch_via_imap(limit) else: logger.warning("⚠️ No email source configured (IMAP or Graph API)") return [] async def _fetch_via_imap(self, limit: int) -> List[Dict]: """Fetch emails using IMAP protocol (READ-ONLY mode)""" emails = [] try: # Connect to IMAP server if self.imap_config['use_ssl']: mail = imaplib.IMAP4_SSL(self.imap_config['server'], self.imap_config['port']) else: mail = imaplib.IMAP4(self.imap_config['server'], self.imap_config['port']) # Login mail.login(self.imap_config['username'], self.imap_config['password']) # Select folder in READ-ONLY mode (critical for safety) folder = self.imap_config['folder'] readonly = self.imap_config['readonly'] mail.select(folder, readonly=readonly) if readonly: logger.info(f"🔒 Connected to {folder} in READ-ONLY mode (emails will NOT be marked as read)") # Search for all emails status, messages = mail.search(None, 'ALL') if status != 'OK': logger.error(f"❌ IMAP search failed: {status}") return emails email_ids = messages[0].split() total_emails = len(email_ids) logger.info(f"📊 Found {total_emails} emails in {folder}") # Get most recent emails (reverse order, limit) email_ids_to_fetch = email_ids[-limit:] if len(email_ids) > limit else email_ids email_ids_to_fetch.reverse() # Newest first for email_id in email_ids_to_fetch: try: # Fetch email using BODY.PEEK to avoid marking as read status, msg_data = mail.fetch(email_id, '(BODY.PEEK[])') if status != 'OK': logger.warning(f"⚠️ Failed to fetch email {email_id}: {status}") continue # Parse email raw_email = msg_data[0][1] msg = email.message_from_bytes(raw_email) # Extract fields parsed_email = self._parse_email(msg, email_id.decode()) # Check if already exists in database if not self._email_exists(parsed_email['message_id']): emails.append(parsed_email) logger.info(f"✅ New email: {parsed_email['subject'][:50]}... from {parsed_email['sender_email']}") else: logger.debug(f"⏭️ Email already exists: {parsed_email['message_id']}") except Exception as e: logger.error(f"❌ Error parsing email {email_id}: {e}") continue # Logout mail.logout() logger.info(f"📥 Fetched {len(emails)} new emails via IMAP") return emails except imaplib.IMAP4.error as e: logger.error(f"❌ IMAP error: {e}") return [] except Exception as e: logger.error(f"❌ Unexpected error fetching via IMAP: {e}") return [] async def _fetch_via_graph(self, limit: int) -> List[Dict]: """Fetch emails using Microsoft Graph API (OAuth2)""" emails = [] try: # Get access token using MSAL access_token = await self._get_graph_access_token() if not access_token: logger.error("❌ Failed to get Graph API access token") return [] # Build Graph API request user_email = self.graph_config['user_email'] folder = self.imap_config['folder'] # Use same folder name # Graph API endpoint for messages url = f"https://graph.microsoft.com/v1.0/users/{user_email}/mailFolders/{folder}/messages" params = { '$top': limit, '$orderby': 'receivedDateTime desc', '$select': 'id,subject,from,toRecipients,ccRecipients,receivedDateTime,bodyPreview,body,hasAttachments,internetMessageId' } headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } async with ClientSession() as session: async with session.get(url, params=params, headers=headers) as response: if response.status != 200: error_text = await response.text() logger.error(f"❌ Graph API error: {response.status} - {error_text}") return [] data = await response.json() messages = data.get('value', []) logger.info(f"📊 Found {len(messages)} emails via Graph API") for msg in messages: try: parsed_email = self._parse_graph_message(msg) # Fetch attachments if email has them if msg.get('hasAttachments', False): attachments = await self._fetch_graph_attachments( user_email, msg['id'], access_token, session ) parsed_email['attachments'] = attachments parsed_email['attachment_count'] = len(attachments) else: parsed_email['attachments'] = [] # Check if already exists if not self._email_exists(parsed_email['message_id']): emails.append(parsed_email) logger.info(f"✅ New email: {parsed_email['subject'][:50]}... from {parsed_email['sender_email']}") else: logger.debug(f"⏭️ Email already exists: {parsed_email['message_id']}") except Exception as e: logger.error(f"❌ Error parsing Graph message: {e}") continue logger.info(f"📥 Fetched {len(emails)} new emails via Graph API") return emails except Exception as e: logger.error(f"❌ Unexpected error fetching via Graph API: {e}") return [] async def _get_graph_access_token(self) -> Optional[str]: """Get OAuth2 access token for Microsoft Graph API using MSAL""" try: authority = f"https://login.microsoftonline.com/{self.graph_config['tenant_id']}" app = msal.ConfidentialClientApplication( self.graph_config['client_id'], authority=authority, client_credential=self.graph_config['client_secret'] ) # Request token with Mail.Read scope (Application permission) scopes = ["https://graph.microsoft.com/.default"] result = app.acquire_token_for_client(scopes=scopes) if "access_token" in result: logger.info("✅ Successfully obtained Graph API access token") return result["access_token"] else: error = result.get("error_description", result.get("error", "Unknown error")) logger.error(f"❌ Failed to obtain access token: {error}") return None except Exception as e: logger.error(f"❌ Error getting Graph access token: {e}") return None def _parse_email(self, msg: email.message.Message, email_id: str) -> Dict: """Parse IMAP email message into dictionary""" # Decode subject subject = self._decode_header(msg.get('Subject', '')) # Decode sender from_header = self._decode_header(msg.get('From', '')) sender_name, sender_email = self._parse_email_address(from_header) # Decode recipient to_header = self._decode_header(msg.get('To', '')) recipient_name, recipient_email = self._parse_email_address(to_header) # Decode CC cc_header = self._decode_header(msg.get('Cc', '')) # Get message ID message_id = msg.get('Message-ID', f"imap-{email_id}") # Get date date_str = msg.get('Date', '') received_date = self._parse_email_date(date_str) # Get body body_text = "" body_html = "" if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() if content_type == "text/plain": try: body_text = part.get_payload(decode=True).decode('utf-8', errors='ignore') except Exception: pass elif content_type == "text/html": try: body_html = part.get_payload(decode=True).decode('utf-8', errors='ignore') except Exception: pass else: try: body_text = msg.get_payload(decode=True).decode('utf-8', errors='ignore') except Exception: body_text = str(msg.get_payload()) # Extract attachments attachments = [] if msg.is_multipart(): for part in msg.walk(): if part.get_content_maintype() == 'multipart': continue # Skip text parts (body content) if part.get_content_type() in ['text/plain', 'text/html']: continue # Check if part has a filename (indicates attachment) filename = part.get_filename() if filename: # Decode filename if needed filename = self._decode_header(filename) # Get attachment content content = part.get_payload(decode=True) content_type = part.get_content_type() if content: # Only add if we got content attachments.append({ 'filename': filename, 'content': content, 'content_type': content_type, 'size': len(content) }) return { 'message_id': message_id, 'subject': subject, 'sender_name': sender_name, 'sender_email': sender_email, 'recipient_email': recipient_email, 'cc': cc_header, 'body_text': body_text, 'body_html': body_html, 'received_date': received_date, 'folder': self.imap_config['folder'], 'has_attachments': len(attachments) > 0, 'attachment_count': len(attachments), 'attachments': attachments } def _parse_graph_message(self, msg: Dict) -> Dict: """Parse Microsoft Graph API message into dictionary""" # Extract sender from_data = msg.get('from', {}).get('emailAddress', {}) sender_name = from_data.get('name', '') sender_email = from_data.get('address', '') # Extract recipient (first TO recipient) to_recipients = msg.get('toRecipients', []) recipient_email = to_recipients[0]['emailAddress']['address'] if to_recipients else '' # Extract CC recipients cc_recipients = msg.get('ccRecipients', []) cc = ', '.join([r['emailAddress']['address'] for r in cc_recipients]) # Get body body_data = msg.get('body', {}) body_content = body_data.get('content', '') body_type = body_data.get('contentType', 'text') body_text = body_content if body_type == 'text' else '' body_html = body_content if body_type == 'html' else '' # Parse date received_date_str = msg.get('receivedDateTime', '') received_date = datetime.fromisoformat(received_date_str.replace('Z', '+00:00')) if received_date_str else datetime.now() return { 'message_id': msg.get('internetMessageId', msg.get('id', '')), 'subject': msg.get('subject', ''), 'sender_name': sender_name, 'sender_email': sender_email, 'recipient_email': recipient_email, 'cc': cc, 'body_text': body_text, 'body_html': body_html, 'received_date': received_date, 'folder': self.imap_config['folder'], 'has_attachments': msg.get('hasAttachments', False), 'attachment_count': 0 # Will be updated after fetching attachments } async def _fetch_graph_attachments( self, user_email: str, message_id: str, access_token: str, session: ClientSession ) -> List[Dict]: """Fetch attachments for a specific message from Graph API""" attachments = [] try: # Graph API endpoint for message attachments url = f"https://graph.microsoft.com/v1.0/users/{user_email}/messages/{message_id}/attachments" headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } async with session.get(url, headers=headers) as response: if response.status != 200: logger.warning(f"⚠️ Failed to fetch attachments for message {message_id}: {response.status}") return [] data = await response.json() attachment_list = data.get('value', []) for att in attachment_list: # Graph API returns base64 content in contentBytes content_bytes = att.get('contentBytes', '') if content_bytes: import base64 content = base64.b64decode(content_bytes) else: content = b'' attachments.append({ 'filename': att.get('name', 'unknown'), 'content': content, 'content_type': att.get('contentType', 'application/octet-stream'), 'size': att.get('size', len(content)) }) logger.info(f"📎 Fetched attachment: {att.get('name')} ({att.get('size', 0)} bytes)") except Exception as e: logger.error(f"❌ Error fetching attachments for message {message_id}: {e}") return attachments def _decode_header(self, header: str) -> str: """Decode email header (handles MIME encoding)""" if not header: return "" decoded_parts = decode_header(header) decoded_string = "" for content, encoding in decoded_parts: if isinstance(content, bytes): try: decoded_string += content.decode(encoding or 'utf-8', errors='ignore') except Exception: decoded_string += content.decode('utf-8', errors='ignore') else: decoded_string += str(content) return decoded_string.strip() def _parse_email_address(self, header: str) -> Tuple[str, str]: """Parse 'Name ' into (name, email)""" if not header: return ("", "") if '<' in header and '>' in header: # Format: "Name " parts = header.split('<') name = parts[0].strip().strip('"') email_addr = parts[1].strip('>').strip() return (name, email_addr) else: # Just email address return ("", header.strip()) def _parse_email_date(self, date_str: str) -> datetime: """Parse email date header into datetime object""" if not date_str: return datetime.now() try: # Use email.utils to parse RFC 2822 date from email.utils import parsedate_to_datetime return parsedate_to_datetime(date_str) except Exception: logger.warning(f"⚠️ Failed to parse date: {date_str}") return datetime.now() def _email_exists(self, message_id: str) -> bool: """Check if email already exists in database""" query = "SELECT id FROM email_messages WHERE message_id = %s AND deleted_at IS NULL" result = execute_query(query, (message_id,)) return len(result) > 0 async def save_email(self, email_data: Dict) -> Optional[int]: """Save email to database""" try: query = """ INSERT INTO email_messages (message_id, subject, sender_email, sender_name, recipient_email, cc, body_text, body_html, received_date, folder, has_attachments, attachment_count, status, is_read) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'new', false) RETURNING id """ email_id = execute_insert(query, ( email_data['message_id'], email_data['subject'], email_data['sender_email'], email_data['sender_name'], email_data['recipient_email'], email_data['cc'], email_data['body_text'], email_data['body_html'], email_data['received_date'], email_data['folder'], email_data['has_attachments'], email_data['attachment_count'] )) logger.info(f"✅ Saved email {email_id}: {email_data['subject'][:50]}...") # Save attachments if any if email_data.get('attachments'): await self._save_attachments(email_id, email_data['attachments']) return email_id except Exception as e: logger.error(f"❌ Error saving email to database: {e}") return None async def _save_attachments(self, email_id: int, attachments: List[Dict]): """Save email attachments to disk and database""" import os import hashlib from pathlib import Path # Create uploads directory if not exists upload_dir = Path("uploads/email_attachments") upload_dir.mkdir(parents=True, exist_ok=True) for att in attachments: try: filename = att['filename'] content = att['content'] content_type = att.get('content_type', 'application/octet-stream') size_bytes = att['size'] # Generate MD5 hash for deduplication md5_hash = hashlib.md5(content).hexdigest() # Save to disk with hash prefix file_path = upload_dir / f"{md5_hash}_{filename}" file_path.write_bytes(content) # Save to database query = """ INSERT INTO email_attachments (email_id, filename, content_type, size_bytes, file_path) VALUES (%s, %s, %s, %s, %s) """ execute_insert(query, ( email_id, filename, content_type, size_bytes, str(file_path) )) logger.info(f"📎 Saved attachment: {filename} ({size_bytes} bytes)") except Exception as e: logger.error(f"❌ Failed to save attachment {filename}: {e}") async def get_unprocessed_emails(self, limit: int = 100) -> List[Dict]: """Get emails from database that haven't been processed yet""" query = """ SELECT * FROM email_messages WHERE status = 'new' AND deleted_at IS NULL ORDER BY received_date DESC LIMIT %s """ result = execute_query(query, (limit,)) return result async def update_email_status(self, email_id: int, status: str): """Update email processing status""" query = "UPDATE email_messages SET status = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s" execute_query(query, (status, email_id)) logger.info(f"✅ Updated email {email_id} status to: {status}")