bmc_hub/app/services/email_scheduler.py
Christian 8791e34f4e feat: Implement email processing system with scheduler, fetching, classification, and rule matching
- Added EmailProcessorService to orchestrate email workflow: fetching, saving, classifying, and matching rules.
- Introduced EmailScheduler for background processing of emails every 5 minutes.
- Developed EmailService to handle email fetching from IMAP and Microsoft Graph API.
- Created database migration for email system, including tables for email messages, rules, attachments, and analysis.
- Implemented AI classification and extraction for invoices and time confirmations.
- Added logging for better traceability and error handling throughout the email processing pipeline.
2025-12-11 02:31:29 +01:00

78 lines
2.6 KiB
Python

"""
Email Scheduler
Background job that runs every 5 minutes to fetch and process emails
Based on OmniSync scheduler with APScheduler
"""
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from datetime import datetime
from app.core.config import settings
from app.services.email_processor_service import EmailProcessorService
logger = logging.getLogger(__name__)
class EmailScheduler:
"""Background scheduler for email processing"""
def __init__(self):
self.scheduler = AsyncIOScheduler()
self.processor = EmailProcessorService()
self.enabled = settings.EMAIL_TO_TICKET_ENABLED
self.interval_minutes = settings.EMAIL_PROCESS_INTERVAL_MINUTES
def start(self):
"""Start the background scheduler"""
if not self.enabled:
logger.info("⏭️ Email scheduler disabled (EMAIL_TO_TICKET_ENABLED=false)")
return
logger.info(f"🚀 Starting email scheduler (interval: {self.interval_minutes} minutes)")
# Add job with interval trigger
self.scheduler.add_job(
func=self._process_emails_job,
trigger=IntervalTrigger(minutes=self.interval_minutes),
id='email_processor',
name='Email Processing Job',
max_instances=1, # Prevent overlapping runs
replace_existing=True
)
self.scheduler.start()
logger.info("✅ Email scheduler started successfully")
def stop(self):
"""Stop the scheduler"""
if self.scheduler.running:
self.scheduler.shutdown(wait=True)
logger.info("👋 Email scheduler stopped")
async def _process_emails_job(self):
"""Job function that processes emails"""
try:
logger.info("🔄 Email processing job started...")
start_time = datetime.now()
stats = await self.processor.process_inbox()
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"✅ Email processing complete: {stats} (duration: {duration:.1f}s)")
except Exception as e:
logger.error(f"❌ Email processing job failed: {e}")
def run_manual(self):
"""Manually trigger email processing (for testing)"""
logger.info("🚀 Manual email processing triggered")
import asyncio
asyncio.create_task(self._process_emails_job())
# Global scheduler instance
email_scheduler = EmailScheduler()