From ca535739522059c010b36d5c67be51cf215ee67d Mon Sep 17 00:00:00 2001 From: Christian Date: Tue, 6 Jan 2026 15:11:28 +0100 Subject: [PATCH] feat: Enhance email processing and backup scheduling - Added PostgreSQL client installation to Dockerfile for database interactions. - Updated BackupScheduler to manage both backup jobs and email fetching jobs. - Implemented email fetching job with logging and error handling. - Enhanced the frontend to display scheduled jobs, including email fetch status. - Introduced email upload functionality with drag-and-drop support and progress tracking. - Added import_method tracking to email_messages for better source identification. - Updated email parsing logic for .eml and .msg files, including attachment handling. - Removed obsolete email scheduler service as functionality is integrated into BackupScheduler. - Updated requirements for extract-msg to the latest version. - Created migration script to add import_method column to email_messages table. --- Dockerfile | 1 + app/backups/backend/scheduler.py | 66 +++++-- app/backups/templates/index.html | 146 ++++++++++++-- app/core/config.py | 3 + app/emails/backend/router.py | 233 +++++++++++++++++++++-- app/emails/frontend/emails.html | 218 +++++++++++++++++++++ app/services/email_scheduler.py | 77 -------- app/services/email_service.py | 252 +++++++++++++++++++++++++ main.py | 5 + migrations/056_email_import_method.sql | 24 +++ requirements.txt | 2 +- scripts/link_tmodule_customers.sh | 97 ++++++++++ 12 files changed, 997 insertions(+), 127 deletions(-) delete mode 100644 app/services/email_scheduler.py create mode 100644 migrations/056_email_import_method.sql create mode 100644 scripts/link_tmodule_customers.sh diff --git a/Dockerfile b/Dockerfile index 10eb97d..a0343d6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,6 +10,7 @@ RUN apt-get update && apt-get install -y \ gcc \ g++ \ python3-dev \ + postgresql-client \ && rm -rf /var/lib/apt/lists/* # Build arguments for GitHub release deployment diff --git a/app/backups/backend/scheduler.py b/app/backups/backend/scheduler.py index 6963702..ad88754 100644 --- a/app/backups/backend/scheduler.py +++ b/app/backups/backend/scheduler.py @@ -1,6 +1,6 @@ """ Backup Scheduler -Manages scheduled backup jobs, rotation, offsite uploads, and retry logic +Manages scheduled backup jobs, rotation, offsite uploads, retry logic, and email fetch """ import logging @@ -26,17 +26,42 @@ class BackupScheduler: self.running = False def start(self): - """Start the backup scheduler with all jobs""" - if not self.enabled: - logger.info("⏭️ Backup scheduler disabled (BACKUP_ENABLED=false)") - return - + """Start the scheduler with enabled jobs (backups and/or emails)""" if self.running: - logger.warning("⚠️ Backup scheduler already running") + logger.warning("⚠️ Scheduler already running") return - logger.info("🚀 Starting backup scheduler...") + logger.info("🚀 Starting unified scheduler...") + # Add backup jobs if enabled + if self.enabled: + self._add_backup_jobs() + else: + logger.info("⏭️ Backup jobs disabled (BACKUP_ENABLED=false)") + + # Email fetch job (every N minutes if enabled) + if settings.EMAIL_TO_TICKET_ENABLED: + self.scheduler.add_job( + func=self._email_fetch_job, + trigger=IntervalTrigger(minutes=settings.EMAIL_PROCESS_INTERVAL_MINUTES), + id='email_fetch', + name='Email Fetch & Process', + max_instances=1, + replace_existing=True + ) + logger.info("✅ Scheduled: Email fetch every %d minute(s)", + settings.EMAIL_PROCESS_INTERVAL_MINUTES) + else: + logger.info("⏭️ Email fetch disabled (EMAIL_TO_TICKET_ENABLED=false)") + + # Start the scheduler + self.scheduler.start() + self.running = True + + logger.info("✅ Scheduler started successfully") + + def _add_backup_jobs(self): + """Add all backup-related jobs to scheduler""" # Daily full backup at 02:00 CET self.scheduler.add_job( func=self._daily_backup_job, @@ -105,12 +130,6 @@ class BackupScheduler: replace_existing=True ) logger.info("✅ Scheduled: Storage check at 01:30") - - # Start the scheduler - self.scheduler.start() - self.running = True - - logger.info("✅ Backup scheduler started successfully") def stop(self): """Stop the backup scheduler""" @@ -377,6 +396,25 @@ class BackupScheduler: except Exception as e: logger.error("❌ Storage check error: %s", str(e), exc_info=True) + async def _email_fetch_job(self): + """Email fetch and processing job""" + try: + logger.info("🔄 Email processing job started...") + + # Import here to avoid circular dependencies + from app.services.email_processor_service import EmailProcessorService + + processor = EmailProcessorService() + start_time = datetime.now() + stats = await processor.process_inbox() + + duration = (datetime.now() - start_time).total_seconds() + + logger.info(f"✅ Email processing complete: {stats} (duration: {duration:.1f}s)") + + except Exception as e: + logger.error(f"❌ Email processing job failed: {e}") + def _get_weekday_number(self, day_name: str) -> int: """Convert day name to APScheduler weekday number (0=Monday, 6=Sunday)""" days = { diff --git a/app/backups/templates/index.html b/app/backups/templates/index.html index 30f5372..990bedd 100644 --- a/app/backups/templates/index.html +++ b/app/backups/templates/index.html @@ -248,13 +248,18 @@
-
- Scheduler Status +
+ Scheduled Jobs +
-
+
-
- Loading... +
+
+ Loading... +
@@ -501,28 +506,137 @@ if (!status.running) { container.innerHTML = ` -
+
Scheduler not running
`; return; } - container.innerHTML = ` -
- Active -
- Next jobs: -
    - ${status.jobs.slice(0, 3).map(j => ` -
  • ${j.name}: ${j.next_run ? formatDate(j.next_run) : 'N/A'}
  • - `).join('')} -
+ // Group jobs by type + const backupJobs = status.jobs.filter(j => ['daily_backup', 'monthly_backup'].includes(j.id)); + const maintenanceJobs = status.jobs.filter(j => ['backup_rotation', 'storage_check', 'offsite_upload', 'offsite_retry'].includes(j.id)); + const emailJob = status.jobs.find(j => j.id === 'email_fetch'); + + let html = ` +
+
+
+ + Scheduler Active +
+
`; + + // Email Fetch Job + if (emailJob) { + const nextRun = emailJob.next_run ? new Date(emailJob.next_run) : null; + const timeUntil = nextRun ? formatTimeUntil(nextRun) : 'N/A'; + html += ` +
+
+
+ + Email Fetch +
+ Every 5 minutes +
+ ${timeUntil} +
+
+ `; + } + + // Backup Jobs + if (backupJobs.length > 0) { + html += ` +
+ BACKUP JOBS +
+ `; + backupJobs.forEach(job => { + const nextRun = job.next_run ? new Date(job.next_run) : null; + const timeUntil = nextRun ? formatTimeUntil(nextRun) : 'N/A'; + const icon = job.id === 'daily_backup' ? 'bi-arrow-repeat' : 'bi-calendar-month'; + html += ` +
+
+
+ + ${job.name} +
+ ${nextRun ? formatDateTime(nextRun) : 'N/A'} +
+ ${timeUntil} +
+
+ `; + }); + } + + // Maintenance Jobs + if (maintenanceJobs.length > 0) { + html += ` +
+ MAINTENANCE +
+ `; + maintenanceJobs.forEach(job => { + const nextRun = job.next_run ? new Date(job.next_run) : null; + const timeUntil = nextRun ? formatTimeUntil(nextRun) : 'N/A'; + html += ` +
+
+
+ + ${job.name} +
+ ${nextRun ? formatDateTime(nextRun) : 'N/A'} +
+ ${timeUntil} +
+
+ `; + }); + } + + html += `
`; + container.innerHTML = html; } catch (error) { console.error('Load scheduler status error:', error); + document.getElementById('scheduler-status').innerHTML = ` +
+ Failed to load scheduler status +
+ `; } } + + function formatTimeUntil(date) { + const now = new Date(); + const diff = date - now; + + if (diff < 0) return 'Overdue'; + + const minutes = Math.floor(diff / 60000); + const hours = Math.floor(minutes / 60); + const days = Math.floor(hours / 24); + + if (days > 0) return `${days}d`; + if (hours > 0) return `${hours}h`; + if (minutes > 0) return `${minutes}m`; + return 'Now'; + } + + function formatDateTime(date) { + return date.toLocaleString('da-DK', { + day: '2-digit', + month: '2-digit', + year: 'numeric', + hour: '2-digit', + minute: '2-digit' + }); + } // Create manual backup async function createBackup(event) { diff --git a/app/core/config.py b/app/core/config.py index 7729129..0ecbca8 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -120,6 +120,9 @@ class Settings(BaseSettings): # Offsite Backup Settings (SFTP) OFFSITE_ENABLED: bool = False + OFFSITE_WEEKLY_DAY: str = "sunday" + OFFSITE_RETRY_DELAY_HOURS: int = 1 + OFFSITE_RETRY_MAX_ATTEMPTS: int = 3 SFTP_HOST: str = "" SFTP_PORT: int = 22 SFTP_USER: str = "" diff --git a/app/emails/backend/router.py b/app/emails/backend/router.py index 4deaa3a..e75108a 100644 --- a/app/emails/backend/router.py +++ b/app/emails/backend/router.py @@ -4,7 +4,7 @@ API endpoints for email viewing, classification, and rule management """ import logging -from fastapi import APIRouter, HTTPException, Query +from fastapi import APIRouter, HTTPException, Query, UploadFile, File from typing import List, Optional from pydantic import BaseModel from datetime import datetime, date @@ -359,30 +359,20 @@ async def reprocess_email(email_id: int): email = result[0] - # Re-classify + # Re-classify using processor service processor = EmailProcessorService() - classification, confidence = await processor.classify_email( - email['subject'], - email['body_text'] or email['body_html'] - ) + await processor._classify_and_update(email) - # Update classification - update_query = """ - UPDATE email_messages - SET classification = %s, - confidence_score = %s, - classification_date = CURRENT_TIMESTAMP, - updated_at = CURRENT_TIMESTAMP - WHERE id = %s - """ - execute_update(update_query, (classification, confidence, email_id)) + # Re-fetch updated email + result = execute_query(query, (email_id,)) + email = result[0] - logger.info(f"🔄 Reprocessed email {email_id}: {classification} ({confidence:.2f})") + logger.info(f"🔄 Reprocessed email {email_id}: {email['classification']} ({email.get('confidence_score', 0):.2f})") return { "success": True, "message": "Email reprocessed", - "classification": classification, - "confidence": confidence + "classification": email['classification'], + "confidence": email.get('confidence_score', 0) } except HTTPException: @@ -410,6 +400,211 @@ async def process_emails(): raise HTTPException(status_code=500, detail=str(e)) +@router.post("/emails/upload") +async def upload_emails(files: List[UploadFile] = File(...)): + """ + Upload email files (.eml or .msg) via drag-and-drop + Supports multiple files at once + """ + from app.services.email_service import EmailService + from app.services.email_processor_service import EmailProcessorService + from app.services.email_workflow_service import email_workflow_service + from app.services.email_activity_logger import EmailActivityLogger + from app.core.config import settings + + email_service = EmailService() + processor = EmailProcessorService() + activity_logger = EmailActivityLogger() + + results = [] + max_size = settings.EMAIL_MAX_UPLOAD_SIZE_MB * 1024 * 1024 # Convert MB to bytes + + logger.info(f"📤 Upload started: {len(files)} file(s)") + + for file in files: + try: + logger.info(f"📄 Processing file: {file.filename}") + + # Validate file type + if not file.filename.lower().endswith(('.eml', '.msg')): + logger.warning(f"⚠️ Skipped non-email file: {file.filename}") + results.append({ + "filename": file.filename, + "status": "skipped", + "message": "Only .eml and .msg files are supported" + }) + continue + + # Read file content + content = await file.read() + logger.info(f"📊 File size: {len(content)} bytes") + + # Check file size + if len(content) > max_size: + logger.warning(f"⚠️ File too large: {file.filename}") + results.append({ + "filename": file.filename, + "status": "error", + "message": f"File too large (max {settings.EMAIL_MAX_UPLOAD_SIZE_MB}MB)" + }) + continue + + # Parse email based on file type + if file.filename.lower().endswith('.eml'): + logger.info(f"📧 Parsing .eml file: {file.filename}") + email_data = email_service.parse_eml_file(content) + else: # .msg + logger.info(f"📧 Parsing .msg file: {file.filename}") + email_data = email_service.parse_msg_file(content) + + if not email_data: + logger.error(f"❌ Failed to parse: {file.filename}") + results.append({ + "filename": file.filename, + "status": "error", + "message": "Failed to parse email file" + }) + continue + + logger.info(f"✅ Parsed: {email_data.get('subject', 'No Subject')[:50]}") + + # Save to database + email_id = await email_service.save_uploaded_email(email_data) + + if email_id is None: + logger.info(f"⏭️ Duplicate email: {file.filename}") + results.append({ + "filename": file.filename, + "status": "duplicate", + "message": "Email already exists in system" + }) + continue + + logger.info(f"💾 Saved to database with ID: {email_id}") + + # Log activity + activity_logger.log_fetched( + email_id=email_id, + source="manual_upload", + metadata={"filename": file.filename} + ) + + # Auto-classify + classification = None + confidence = None + try: + logger.info(f"🤖 Classifying email {email_id}...") + classification, confidence = await processor.classify_email( + email_data['subject'], + email_data['body_text'] or email_data['body_html'] + ) + + logger.info(f"✅ Classified as: {classification} ({confidence:.2f})") + + # Update classification + update_query = """ + UPDATE email_messages + SET classification = %s, confidence_score = %s, + classification_date = CURRENT_TIMESTAMP + WHERE id = %s + """ + execute_update(update_query, (classification, confidence, email_id)) + + activity_logger.log_classified( + email_id=email_id, + classification=classification, + confidence=confidence, + metadata={"method": "auto", "source": "manual_upload"} + ) + except Exception as e: + logger.warning(f"⚠️ Classification failed for uploaded email: {e}") + + # Execute workflows + try: + logger.info(f"⚙️ Executing workflows for email {email_id}...") + await email_workflow_service.execute_workflows_for_email(email_id) + except Exception as e: + logger.warning(f"⚠️ Workflow execution failed for uploaded email: {e}") + + results.append({ + "filename": file.filename, + "status": "success", + "message": "Email imported successfully", + "email_id": email_id, + "subject": email_data['subject'], + "classification": classification, + "confidence": confidence, + "attachments": len(email_data.get('attachments', [])) + }) + + logger.info(f"✅ Successfully processed: {file.filename} -> Email ID {email_id}") + + except Exception as e: + logger.error(f"❌ Failed to process {file.filename}: {e}", exc_info=True) + results.append({ + "filename": file.filename, + "status": "error", + "message": str(e) + }) + + # Summary + success_count = len([r for r in results if r["status"] == "success"]) + duplicate_count = len([r for r in results if r["status"] == "duplicate"]) + error_count = len([r for r in results if r["status"] == "error"]) + skipped_count = len([r for r in results if r["status"] == "skipped"]) + + logger.info(f"📊 Upload summary: {success_count} success, {duplicate_count} duplicates, {error_count} errors, {skipped_count} skipped") + + return { + "uploaded": success_count, + "duplicates": duplicate_count, + "failed": error_count, + "skipped": skipped_count, + "results": results + } + + +@router.get("/emails/processing/stats") +async def get_processing_stats(): + """Get email processing statistics""" + try: + query = """ + SELECT + COUNT(*) as total_emails, + COUNT(*) FILTER (WHERE status = 'new') as new_emails, + COUNT(*) FILTER (WHERE status = 'processed') as processed_emails, + COUNT(*) FILTER (WHERE status = 'error') as error_emails, + COUNT(*) FILTER (WHERE has_attachments = true) as with_attachments, + COUNT(*) FILTER (WHERE import_method = 'manual_upload') as manually_uploaded, + COUNT(*) FILTER (WHERE import_method = 'imap') as from_imap, + COUNT(*) FILTER (WHERE import_method = 'graph_api') as from_graph_api, + MAX(received_date) as last_email_received + FROM email_messages + WHERE deleted_at IS NULL + AND received_date >= NOW() - INTERVAL '30 days' + """ + result = execute_query(query) + + if result: + return result[0] + else: + return { + "total_emails": 0, + "new_emails": 0, + "processed_emails": 0, + "error_emails": 0, + "with_attachments": 0, + "manually_uploaded": 0, + "from_imap": 0, + "from_graph_api": 0, + "last_email_received": None + } + + except Exception as e: + logger.error(f"❌ Error getting processing stats: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + @router.post("/emails/bulk/archive") async def bulk_archive(email_ids: List[int]): """Bulk archive emails""" diff --git a/app/emails/frontend/emails.html b/app/emails/frontend/emails.html index 2073b47..154ff15 100644 --- a/app/emails/frontend/emails.html +++ b/app/emails/frontend/emails.html @@ -713,6 +713,64 @@ max-height: calc(100vh - 250px); overflow-y: auto; } + + /* Email Upload Drop Zone */ + .email-upload-zone { + padding: 0.75rem 1rem; + border-bottom: 1px solid rgba(0,0,0,0.05); + } + + .drop-zone { + border: 2px dashed var(--accent); + border-radius: var(--border-radius); + padding: 1.5rem 1rem; + text-align: center; + background: rgba(15, 76, 117, 0.03); + cursor: pointer; + transition: all 0.3s ease; + } + + .drop-zone:hover { + background: rgba(15, 76, 117, 0.08); + border-color: #0a3a5c; + transform: scale(1.02); + } + + .drop-zone.dragover { + background: rgba(15, 76, 117, 0.15); + border-color: var(--accent); + border-width: 3px; + transform: scale(1.05); + } + + .drop-zone i { + font-size: 2rem; + color: var(--accent); + display: block; + margin-bottom: 0.5rem; + } + + .drop-zone p { + margin: 0; + font-size: 0.9rem; + } + + .upload-progress { + padding: 0 0.5rem; + } + + .upload-progress .progress { + height: 4px; + } + + [data-bs-theme="dark"] .drop-zone { + background: rgba(15, 76, 117, 0.1); + border-color: var(--accent-light); + } + + [data-bs-theme="dark"] .drop-zone:hover { + background: rgba(15, 76, 117, 0.2); + } {% endblock %} @@ -769,6 +827,28 @@
+ + +
+ +
+