- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules. - Introduced EmailScheduler for background processing of emails every 5 minutes. - Developed EmailService to handle email fetching from IMAP and Microsoft Graph API. - Created database migration for email system, including tables for email messages, rules, attachments, and analysis. - Implemented AI classification and extraction for invoices and time confirmations. - Added logging for better traceability and error handling throughout the email processing pipeline.
78 lines
2.6 KiB
Python
78 lines
2.6 KiB
Python
"""
|
|
Email Scheduler
|
|
Background job that runs every 5 minutes to fetch and process emails
|
|
Based on OmniSync scheduler with APScheduler
|
|
"""
|
|
|
|
import logging
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
from datetime import datetime
|
|
|
|
from app.core.config import settings
|
|
from app.services.email_processor_service import EmailProcessorService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EmailScheduler:
|
|
"""Background scheduler for email processing"""
|
|
|
|
def __init__(self):
|
|
self.scheduler = AsyncIOScheduler()
|
|
self.processor = EmailProcessorService()
|
|
self.enabled = settings.EMAIL_TO_TICKET_ENABLED
|
|
self.interval_minutes = settings.EMAIL_PROCESS_INTERVAL_MINUTES
|
|
|
|
def start(self):
|
|
"""Start the background scheduler"""
|
|
if not self.enabled:
|
|
logger.info("⏭️ Email scheduler disabled (EMAIL_TO_TICKET_ENABLED=false)")
|
|
return
|
|
|
|
logger.info(f"🚀 Starting email scheduler (interval: {self.interval_minutes} minutes)")
|
|
|
|
# Add job with interval trigger
|
|
self.scheduler.add_job(
|
|
func=self._process_emails_job,
|
|
trigger=IntervalTrigger(minutes=self.interval_minutes),
|
|
id='email_processor',
|
|
name='Email Processing Job',
|
|
max_instances=1, # Prevent overlapping runs
|
|
replace_existing=True
|
|
)
|
|
|
|
self.scheduler.start()
|
|
logger.info("✅ Email scheduler started successfully")
|
|
|
|
def stop(self):
|
|
"""Stop the scheduler"""
|
|
if self.scheduler.running:
|
|
self.scheduler.shutdown(wait=True)
|
|
logger.info("👋 Email scheduler stopped")
|
|
|
|
async def _process_emails_job(self):
|
|
"""Job function that processes emails"""
|
|
try:
|
|
logger.info("🔄 Email processing job started...")
|
|
|
|
start_time = datetime.now()
|
|
stats = await self.processor.process_inbox()
|
|
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
|
|
logger.info(f"✅ Email processing complete: {stats} (duration: {duration:.1f}s)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Email processing job failed: {e}")
|
|
|
|
def run_manual(self):
|
|
"""Manually trigger email processing (for testing)"""
|
|
logger.info("🚀 Manual email processing triggered")
|
|
import asyncio
|
|
asyncio.create_task(self._process_emails_job())
|
|
|
|
|
|
# Global scheduler instance
|
|
email_scheduler = EmailScheduler()
|