feat: Enhance email processing and backup scheduling

- Added PostgreSQL client installation to Dockerfile for database interactions.
- Updated BackupScheduler to manage both backup jobs and email fetching jobs.
- Implemented email fetching job with logging and error handling.
- Enhanced the frontend to display scheduled jobs, including email fetch status.
- Introduced email upload functionality with drag-and-drop support and progress tracking.
- Added import_method tracking to email_messages for better source identification.
- Updated email parsing logic for .eml and .msg files, including attachment handling.
- Removed obsolete email scheduler service as functionality is integrated into BackupScheduler.
- Updated requirements for extract-msg to the latest version.
- Created migration script to add import_method column to email_messages table.
This commit is contained in:
Christian 2026-01-06 15:11:28 +01:00
parent 5f603bdd2e
commit ca53573952
12 changed files with 997 additions and 127 deletions

View File

@ -10,6 +10,7 @@ RUN apt-get update && apt-get install -y \
gcc \ gcc \
g++ \ g++ \
python3-dev \ python3-dev \
postgresql-client \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# Build arguments for GitHub release deployment # Build arguments for GitHub release deployment

View File

@ -1,6 +1,6 @@
""" """
Backup Scheduler Backup Scheduler
Manages scheduled backup jobs, rotation, offsite uploads, and retry logic Manages scheduled backup jobs, rotation, offsite uploads, retry logic, and email fetch
""" """
import logging import logging
@ -26,17 +26,42 @@ class BackupScheduler:
self.running = False self.running = False
def start(self): def start(self):
"""Start the backup scheduler with all jobs""" """Start the scheduler with enabled jobs (backups and/or emails)"""
if not self.enabled:
logger.info("⏭️ Backup scheduler disabled (BACKUP_ENABLED=false)")
return
if self.running: if self.running:
logger.warning("⚠️ Backup scheduler already running") logger.warning("⚠️ Scheduler already running")
return return
logger.info("🚀 Starting backup scheduler...") logger.info("🚀 Starting unified scheduler...")
# Add backup jobs if enabled
if self.enabled:
self._add_backup_jobs()
else:
logger.info("⏭️ Backup jobs disabled (BACKUP_ENABLED=false)")
# Email fetch job (every N minutes if enabled)
if settings.EMAIL_TO_TICKET_ENABLED:
self.scheduler.add_job(
func=self._email_fetch_job,
trigger=IntervalTrigger(minutes=settings.EMAIL_PROCESS_INTERVAL_MINUTES),
id='email_fetch',
name='Email Fetch & Process',
max_instances=1,
replace_existing=True
)
logger.info("✅ Scheduled: Email fetch every %d minute(s)",
settings.EMAIL_PROCESS_INTERVAL_MINUTES)
else:
logger.info("⏭️ Email fetch disabled (EMAIL_TO_TICKET_ENABLED=false)")
# Start the scheduler
self.scheduler.start()
self.running = True
logger.info("✅ Scheduler started successfully")
def _add_backup_jobs(self):
"""Add all backup-related jobs to scheduler"""
# Daily full backup at 02:00 CET # Daily full backup at 02:00 CET
self.scheduler.add_job( self.scheduler.add_job(
func=self._daily_backup_job, func=self._daily_backup_job,
@ -106,12 +131,6 @@ class BackupScheduler:
) )
logger.info("✅ Scheduled: Storage check at 01:30") logger.info("✅ Scheduled: Storage check at 01:30")
# Start the scheduler
self.scheduler.start()
self.running = True
logger.info("✅ Backup scheduler started successfully")
def stop(self): def stop(self):
"""Stop the backup scheduler""" """Stop the backup scheduler"""
if not self.running: if not self.running:
@ -377,6 +396,25 @@ class BackupScheduler:
except Exception as e: except Exception as e:
logger.error("❌ Storage check error: %s", str(e), exc_info=True) logger.error("❌ Storage check error: %s", str(e), exc_info=True)
async def _email_fetch_job(self):
"""Email fetch and processing job"""
try:
logger.info("🔄 Email processing job started...")
# Import here to avoid circular dependencies
from app.services.email_processor_service import EmailProcessorService
processor = EmailProcessorService()
start_time = datetime.now()
stats = await processor.process_inbox()
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"✅ Email processing complete: {stats} (duration: {duration:.1f}s)")
except Exception as e:
logger.error(f"❌ Email processing job failed: {e}")
def _get_weekday_number(self, day_name: str) -> int: def _get_weekday_number(self, day_name: str) -> int:
"""Convert day name to APScheduler weekday number (0=Monday, 6=Sunday)""" """Convert day name to APScheduler weekday number (0=Monday, 6=Sunday)"""
days = { days = {

View File

@ -248,11 +248,15 @@
</div> </div>
<div class="col-md-4"> <div class="col-md-4">
<div class="card"> <div class="card">
<div class="card-header"> <div class="card-header d-flex justify-content-between align-items-center">
<i class="bi bi-clock-history"></i> Scheduler Status <span><i class="bi bi-clock-history"></i> Scheduled Jobs</span>
<button class="btn btn-light btn-sm" onclick="loadSchedulerStatus()">
<i class="bi bi-arrow-clockwise"></i>
</button>
</div> </div>
<div class="card-body"> <div class="card-body p-0">
<div id="scheduler-status"> <div id="scheduler-status">
<div class="text-center p-4">
<div class="spinner-border spinner-border-sm" role="status"></div> <div class="spinner-border spinner-border-sm" role="status"></div>
<span class="ms-2">Loading...</span> <span class="ms-2">Loading...</span>
</div> </div>
@ -260,6 +264,7 @@
</div> </div>
</div> </div>
</div> </div>
</div>
<!-- Backup History --> <!-- Backup History -->
<div class="row"> <div class="row">
@ -501,29 +506,138 @@
if (!status.running) { if (!status.running) {
container.innerHTML = ` container.innerHTML = `
<div class="alert alert-warning mb-0"> <div class="alert alert-warning mb-0 m-3">
<i class="bi bi-exclamation-triangle"></i> Scheduler not running <i class="bi bi-exclamation-triangle"></i> Scheduler not running
</div> </div>
`; `;
return; return;
} }
container.innerHTML = ` // Group jobs by type
<div class="alert alert-success mb-0"> const backupJobs = status.jobs.filter(j => ['daily_backup', 'monthly_backup'].includes(j.id));
<i class="bi bi-check-circle"></i> Active const maintenanceJobs = status.jobs.filter(j => ['backup_rotation', 'storage_check', 'offsite_upload', 'offsite_retry'].includes(j.id));
const emailJob = status.jobs.find(j => j.id === 'email_fetch');
let html = `
<div class="list-group list-group-flush">
<div class="list-group-item bg-success bg-opacity-10">
<div class="d-flex align-items-center">
<i class="bi bi-check-circle-fill text-success me-2"></i>
<strong>Scheduler Active</strong>
</div>
</div> </div>
<small class="text-muted">Next jobs:</small>
<ul class="list-unstyled mb-0 mt-1">
${status.jobs.slice(0, 3).map(j => `
<li><small>${j.name}: ${j.next_run ? formatDate(j.next_run) : 'N/A'}</small></li>
`).join('')}
</ul>
`; `;
// Email Fetch Job
if (emailJob) {
const nextRun = emailJob.next_run ? new Date(emailJob.next_run) : null;
const timeUntil = nextRun ? formatTimeUntil(nextRun) : 'N/A';
html += `
<div class="list-group-item">
<div class="d-flex justify-content-between align-items-start">
<div>
<i class="bi bi-envelope text-primary"></i>
<strong class="ms-1">Email Fetch</strong>
<br>
<small class="text-muted">Every 5 minutes</small>
</div>
<span class="badge bg-primary">${timeUntil}</span>
</div>
</div>
`;
}
// Backup Jobs
if (backupJobs.length > 0) {
html += `
<div class="list-group-item bg-light">
<small class="text-muted fw-bold"><i class="bi bi-database"></i> BACKUP JOBS</small>
</div>
`;
backupJobs.forEach(job => {
const nextRun = job.next_run ? new Date(job.next_run) : null;
const timeUntil = nextRun ? formatTimeUntil(nextRun) : 'N/A';
const icon = job.id === 'daily_backup' ? 'bi-arrow-repeat' : 'bi-calendar-month';
html += `
<div class="list-group-item">
<div class="d-flex justify-content-between align-items-start">
<div>
<i class="bi ${icon} text-info"></i>
<small class="ms-1">${job.name}</small>
<br>
<small class="text-muted">${nextRun ? formatDateTime(nextRun) : 'N/A'}</small>
</div>
<span class="badge bg-info">${timeUntil}</span>
</div>
</div>
`;
});
}
// Maintenance Jobs
if (maintenanceJobs.length > 0) {
html += `
<div class="list-group-item bg-light">
<small class="text-muted fw-bold"><i class="bi bi-wrench"></i> MAINTENANCE</small>
</div>
`;
maintenanceJobs.forEach(job => {
const nextRun = job.next_run ? new Date(job.next_run) : null;
const timeUntil = nextRun ? formatTimeUntil(nextRun) : 'N/A';
html += `
<div class="list-group-item">
<div class="d-flex justify-content-between align-items-start">
<div style="max-width: 70%;">
<i class="bi bi-gear text-secondary"></i>
<small class="ms-1">${job.name}</small>
<br>
<small class="text-muted">${nextRun ? formatDateTime(nextRun) : 'N/A'}</small>
</div>
<span class="badge bg-secondary text-nowrap">${timeUntil}</span>
</div>
</div>
`;
});
}
html += `</div>`;
container.innerHTML = html;
} catch (error) { } catch (error) {
console.error('Load scheduler status error:', error); console.error('Load scheduler status error:', error);
document.getElementById('scheduler-status').innerHTML = `
<div class="alert alert-danger m-3">
<i class="bi bi-exclamation-triangle"></i> Failed to load scheduler status
</div>
`;
} }
} }
function formatTimeUntil(date) {
const now = new Date();
const diff = date - now;
if (diff < 0) return 'Overdue';
const minutes = Math.floor(diff / 60000);
const hours = Math.floor(minutes / 60);
const days = Math.floor(hours / 24);
if (days > 0) return `${days}d`;
if (hours > 0) return `${hours}h`;
if (minutes > 0) return `${minutes}m`;
return 'Now';
}
function formatDateTime(date) {
return date.toLocaleString('da-DK', {
day: '2-digit',
month: '2-digit',
year: 'numeric',
hour: '2-digit',
minute: '2-digit'
});
}
// Create manual backup // Create manual backup
async function createBackup(event) { async function createBackup(event) {
event.preventDefault(); event.preventDefault();

View File

@ -120,6 +120,9 @@ class Settings(BaseSettings):
# Offsite Backup Settings (SFTP) # Offsite Backup Settings (SFTP)
OFFSITE_ENABLED: bool = False OFFSITE_ENABLED: bool = False
OFFSITE_WEEKLY_DAY: str = "sunday"
OFFSITE_RETRY_DELAY_HOURS: int = 1
OFFSITE_RETRY_MAX_ATTEMPTS: int = 3
SFTP_HOST: str = "" SFTP_HOST: str = ""
SFTP_PORT: int = 22 SFTP_PORT: int = 22
SFTP_USER: str = "" SFTP_USER: str = ""

View File

@ -4,7 +4,7 @@ API endpoints for email viewing, classification, and rule management
""" """
import logging import logging
from fastapi import APIRouter, HTTPException, Query from fastapi import APIRouter, HTTPException, Query, UploadFile, File
from typing import List, Optional from typing import List, Optional
from pydantic import BaseModel from pydantic import BaseModel
from datetime import datetime, date from datetime import datetime, date
@ -359,30 +359,20 @@ async def reprocess_email(email_id: int):
email = result[0] email = result[0]
# Re-classify # Re-classify using processor service
processor = EmailProcessorService() processor = EmailProcessorService()
classification, confidence = await processor.classify_email( await processor._classify_and_update(email)
email['subject'],
email['body_text'] or email['body_html']
)
# Update classification # Re-fetch updated email
update_query = """ result = execute_query(query, (email_id,))
UPDATE email_messages email = result[0]
SET classification = %s,
confidence_score = %s,
classification_date = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
"""
execute_update(update_query, (classification, confidence, email_id))
logger.info(f"🔄 Reprocessed email {email_id}: {classification} ({confidence:.2f})") logger.info(f"🔄 Reprocessed email {email_id}: {email['classification']} ({email.get('confidence_score', 0):.2f})")
return { return {
"success": True, "success": True,
"message": "Email reprocessed", "message": "Email reprocessed",
"classification": classification, "classification": email['classification'],
"confidence": confidence "confidence": email.get('confidence_score', 0)
} }
except HTTPException: except HTTPException:
@ -410,6 +400,211 @@ async def process_emails():
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@router.post("/emails/upload")
async def upload_emails(files: List[UploadFile] = File(...)):
"""
Upload email files (.eml or .msg) via drag-and-drop
Supports multiple files at once
"""
from app.services.email_service import EmailService
from app.services.email_processor_service import EmailProcessorService
from app.services.email_workflow_service import email_workflow_service
from app.services.email_activity_logger import EmailActivityLogger
from app.core.config import settings
email_service = EmailService()
processor = EmailProcessorService()
activity_logger = EmailActivityLogger()
results = []
max_size = settings.EMAIL_MAX_UPLOAD_SIZE_MB * 1024 * 1024 # Convert MB to bytes
logger.info(f"📤 Upload started: {len(files)} file(s)")
for file in files:
try:
logger.info(f"📄 Processing file: {file.filename}")
# Validate file type
if not file.filename.lower().endswith(('.eml', '.msg')):
logger.warning(f"⚠️ Skipped non-email file: {file.filename}")
results.append({
"filename": file.filename,
"status": "skipped",
"message": "Only .eml and .msg files are supported"
})
continue
# Read file content
content = await file.read()
logger.info(f"📊 File size: {len(content)} bytes")
# Check file size
if len(content) > max_size:
logger.warning(f"⚠️ File too large: {file.filename}")
results.append({
"filename": file.filename,
"status": "error",
"message": f"File too large (max {settings.EMAIL_MAX_UPLOAD_SIZE_MB}MB)"
})
continue
# Parse email based on file type
if file.filename.lower().endswith('.eml'):
logger.info(f"📧 Parsing .eml file: {file.filename}")
email_data = email_service.parse_eml_file(content)
else: # .msg
logger.info(f"📧 Parsing .msg file: {file.filename}")
email_data = email_service.parse_msg_file(content)
if not email_data:
logger.error(f"❌ Failed to parse: {file.filename}")
results.append({
"filename": file.filename,
"status": "error",
"message": "Failed to parse email file"
})
continue
logger.info(f"✅ Parsed: {email_data.get('subject', 'No Subject')[:50]}")
# Save to database
email_id = await email_service.save_uploaded_email(email_data)
if email_id is None:
logger.info(f"⏭️ Duplicate email: {file.filename}")
results.append({
"filename": file.filename,
"status": "duplicate",
"message": "Email already exists in system"
})
continue
logger.info(f"💾 Saved to database with ID: {email_id}")
# Log activity
activity_logger.log_fetched(
email_id=email_id,
source="manual_upload",
metadata={"filename": file.filename}
)
# Auto-classify
classification = None
confidence = None
try:
logger.info(f"🤖 Classifying email {email_id}...")
classification, confidence = await processor.classify_email(
email_data['subject'],
email_data['body_text'] or email_data['body_html']
)
logger.info(f"✅ Classified as: {classification} ({confidence:.2f})")
# Update classification
update_query = """
UPDATE email_messages
SET classification = %s, confidence_score = %s,
classification_date = CURRENT_TIMESTAMP
WHERE id = %s
"""
execute_update(update_query, (classification, confidence, email_id))
activity_logger.log_classified(
email_id=email_id,
classification=classification,
confidence=confidence,
metadata={"method": "auto", "source": "manual_upload"}
)
except Exception as e:
logger.warning(f"⚠️ Classification failed for uploaded email: {e}")
# Execute workflows
try:
logger.info(f"⚙️ Executing workflows for email {email_id}...")
await email_workflow_service.execute_workflows_for_email(email_id)
except Exception as e:
logger.warning(f"⚠️ Workflow execution failed for uploaded email: {e}")
results.append({
"filename": file.filename,
"status": "success",
"message": "Email imported successfully",
"email_id": email_id,
"subject": email_data['subject'],
"classification": classification,
"confidence": confidence,
"attachments": len(email_data.get('attachments', []))
})
logger.info(f"✅ Successfully processed: {file.filename} -> Email ID {email_id}")
except Exception as e:
logger.error(f"❌ Failed to process {file.filename}: {e}", exc_info=True)
results.append({
"filename": file.filename,
"status": "error",
"message": str(e)
})
# Summary
success_count = len([r for r in results if r["status"] == "success"])
duplicate_count = len([r for r in results if r["status"] == "duplicate"])
error_count = len([r for r in results if r["status"] == "error"])
skipped_count = len([r for r in results if r["status"] == "skipped"])
logger.info(f"📊 Upload summary: {success_count} success, {duplicate_count} duplicates, {error_count} errors, {skipped_count} skipped")
return {
"uploaded": success_count,
"duplicates": duplicate_count,
"failed": error_count,
"skipped": skipped_count,
"results": results
}
@router.get("/emails/processing/stats")
async def get_processing_stats():
"""Get email processing statistics"""
try:
query = """
SELECT
COUNT(*) as total_emails,
COUNT(*) FILTER (WHERE status = 'new') as new_emails,
COUNT(*) FILTER (WHERE status = 'processed') as processed_emails,
COUNT(*) FILTER (WHERE status = 'error') as error_emails,
COUNT(*) FILTER (WHERE has_attachments = true) as with_attachments,
COUNT(*) FILTER (WHERE import_method = 'manual_upload') as manually_uploaded,
COUNT(*) FILTER (WHERE import_method = 'imap') as from_imap,
COUNT(*) FILTER (WHERE import_method = 'graph_api') as from_graph_api,
MAX(received_date) as last_email_received
FROM email_messages
WHERE deleted_at IS NULL
AND received_date >= NOW() - INTERVAL '30 days'
"""
result = execute_query(query)
if result:
return result[0]
else:
return {
"total_emails": 0,
"new_emails": 0,
"processed_emails": 0,
"error_emails": 0,
"with_attachments": 0,
"manually_uploaded": 0,
"from_imap": 0,
"from_graph_api": 0,
"last_email_received": None
}
except Exception as e:
logger.error(f"❌ Error getting processing stats: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/emails/bulk/archive") @router.post("/emails/bulk/archive")
async def bulk_archive(email_ids: List[int]): async def bulk_archive(email_ids: List[int]):
"""Bulk archive emails""" """Bulk archive emails"""

View File

@ -713,6 +713,64 @@
max-height: calc(100vh - 250px); max-height: calc(100vh - 250px);
overflow-y: auto; overflow-y: auto;
} }
/* Email Upload Drop Zone */
.email-upload-zone {
padding: 0.75rem 1rem;
border-bottom: 1px solid rgba(0,0,0,0.05);
}
.drop-zone {
border: 2px dashed var(--accent);
border-radius: var(--border-radius);
padding: 1.5rem 1rem;
text-align: center;
background: rgba(15, 76, 117, 0.03);
cursor: pointer;
transition: all 0.3s ease;
}
.drop-zone:hover {
background: rgba(15, 76, 117, 0.08);
border-color: #0a3a5c;
transform: scale(1.02);
}
.drop-zone.dragover {
background: rgba(15, 76, 117, 0.15);
border-color: var(--accent);
border-width: 3px;
transform: scale(1.05);
}
.drop-zone i {
font-size: 2rem;
color: var(--accent);
display: block;
margin-bottom: 0.5rem;
}
.drop-zone p {
margin: 0;
font-size: 0.9rem;
}
.upload-progress {
padding: 0 0.5rem;
}
.upload-progress .progress {
height: 4px;
}
[data-bs-theme="dark"] .drop-zone {
background: rgba(15, 76, 117, 0.1);
border-color: var(--accent-light);
}
[data-bs-theme="dark"] .drop-zone:hover {
background: rgba(15, 76, 117, 0.2);
}
</style> </style>
{% endblock %} {% endblock %}
@ -769,6 +827,28 @@
</div> </div>
</div> </div>
<!-- Email Upload Drop Zone -->
<div class="email-upload-zone" id="emailUploadZone" style="display: none;">
<div class="drop-zone" id="dropZone">
<i class="bi bi-cloud-upload"></i>
<p class="mb-1"><strong>Træk emails hertil</strong></p>
<small class="text-muted">eller klik for at vælge filer</small>
<small class="text-muted d-block mt-1">.eml og .msg filer</small>
<input type="file" id="fileInput" multiple accept=".eml,.msg" style="display: none;">
</div>
<div class="upload-progress mt-2" id="uploadProgress" style="display: none;">
<div class="progress">
<div class="progress-bar progress-bar-striped progress-bar-animated" id="progressBar" style="width: 0%"></div>
</div>
<small class="text-muted d-block text-center mt-1" id="uploadStatus">Uploader...</small>
</div>
</div>
<div class="p-2 border-bottom">
<button class="btn btn-sm btn-outline-secondary w-100" onclick="toggleUploadZone()">
<i class="bi bi-upload me-1"></i> Upload Emails
</button>
</div>
<div class="email-list-filters"> <div class="email-list-filters">
<button class="filter-pill active" data-filter="active" onclick="setFilter('active')"> <button class="filter-pill active" data-filter="active" onclick="setFilter('active')">
Aktive <span class="count" id="countActive">0</span> Aktive <span class="count" id="countActive">0</span>
@ -3877,5 +3957,143 @@ function showNotification(message, type = 'info') {
document.body.appendChild(toast); document.body.appendChild(toast);
setTimeout(() => toast.remove(), 3000); setTimeout(() => toast.remove(), 3000);
} }
// Email Upload Functionality
function toggleUploadZone() {
const uploadZone = document.getElementById('emailUploadZone');
if (uploadZone.style.display === 'none') {
uploadZone.style.display = 'block';
} else {
uploadZone.style.display = 'none';
}
}
// Setup drag and drop
document.addEventListener('DOMContentLoaded', () => {
const dropZone = document.getElementById('dropZone');
const fileInput = document.getElementById('fileInput');
if (!dropZone || !fileInput) return;
// Click to select files
dropZone.addEventListener('click', () => fileInput.click());
// Drag and drop events
dropZone.addEventListener('dragover', (e) => {
e.preventDefault();
dropZone.classList.add('dragover');
});
dropZone.addEventListener('dragleave', () => {
dropZone.classList.remove('dragover');
});
dropZone.addEventListener('drop', (e) => {
e.preventDefault();
dropZone.classList.remove('dragover');
const files = Array.from(e.dataTransfer.files);
uploadEmailFiles(files);
});
fileInput.addEventListener('change', (e) => {
const files = Array.from(e.target.files);
uploadEmailFiles(files);
});
});
async function uploadEmailFiles(files) {
if (files.length === 0) return;
console.log(`📤 Starting upload of ${files.length} file(s)`);
const uploadProgress = document.getElementById('uploadProgress');
const progressBar = document.getElementById('progressBar');
const uploadStatus = document.getElementById('uploadStatus');
// Show progress
uploadProgress.style.display = 'block';
progressBar.style.width = '0%';
uploadStatus.textContent = `Uploader ${files.length} fil(er)...`;
const formData = new FormData();
files.forEach(file => {
console.log(`📎 Adding file to upload: ${file.name} (${file.size} bytes)`);
formData.append('files', file);
});
try {
console.log('🌐 Sending upload request to /api/v1/emails/upload');
const response = await fetch('/api/v1/emails/upload', {
method: 'POST',
body: formData
});
console.log(`📡 Response status: ${response.status}`);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const result = await response.json();
console.log('✅ Upload result:', result);
// Update progress
progressBar.style.width = '100%';
// Show result
const totalFiles = files.length;
const { uploaded, duplicates, failed, skipped } = result;
let statusMessage = [];
if (uploaded > 0) statusMessage.push(`${uploaded} uploadet`);
if (duplicates > 0) statusMessage.push(`${duplicates} eksisterer allerede`);
if (failed > 0) statusMessage.push(`${failed} fejlet`);
if (skipped > 0) statusMessage.push(`${skipped} sprunget over`);
uploadStatus.textContent = `✅ ${statusMessage.join(', ')}`;
// Show detailed results
if (result.results && result.results.length > 0) {
console.log('📋 Detailed results:');
result.results.forEach(r => {
console.log(` ${r.status}: ${r.filename} - ${r.message}`);
if (r.status === 'success') {
console.log(` ✅ Email ID: ${r.email_id}, Subject: ${r.subject}`);
showNotification(`✅ ${r.filename} uploadet`, 'success');
} else if (r.status === 'duplicate') {
showNotification(` ${r.filename} findes allerede`, 'info');
} else if (r.status === 'error') {
showNotification(`❌ ${r.filename}: ${r.message}`, 'danger');
}
});
}
// Reload email list after successful uploads
if (uploaded > 0) {
console.log(`🔄 Reloading email list (${uploaded} new emails uploaded)`);
setTimeout(async () => {
await loadEmails();
await loadStats();
console.log('✅ Email list and stats reloaded');
uploadProgress.style.display = 'none';
fileInput.value = '';
toggleUploadZone(); // Close upload zone after successful upload
}, 2000);
} else {
console.log(' No new emails uploaded, not reloading');
setTimeout(() => {
uploadProgress.style.display = 'none';
fileInput.value = '';
}, 3000);
}
} catch (error) {
console.error('❌ Upload failed:', error);
uploadStatus.textContent = '❌ Upload fejlede';
progressBar.classList.add('bg-danger');
showNotification('Upload fejlede: ' + error.message, 'danger');
}
}
</script> </script>
{% endblock %} {% endblock %}

View File

@ -1,77 +0,0 @@
"""
Email Scheduler
Background job that runs every 5 minutes to fetch and process emails
Based on OmniSync scheduler with APScheduler
"""
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from datetime import datetime
from app.core.config import settings
from app.services.email_processor_service import EmailProcessorService
logger = logging.getLogger(__name__)
class EmailScheduler:
"""Background scheduler for email processing"""
def __init__(self):
self.scheduler = AsyncIOScheduler()
self.processor = EmailProcessorService()
self.enabled = settings.EMAIL_TO_TICKET_ENABLED
self.interval_minutes = settings.EMAIL_PROCESS_INTERVAL_MINUTES
def start(self):
"""Start the background scheduler"""
if not self.enabled:
logger.info("⏭️ Email scheduler disabled (EMAIL_TO_TICKET_ENABLED=false)")
return
logger.info(f"🚀 Starting email scheduler (interval: {self.interval_minutes} minutes)")
# Add job with interval trigger
self.scheduler.add_job(
func=self._process_emails_job,
trigger=IntervalTrigger(minutes=self.interval_minutes),
id='email_processor',
name='Email Processing Job',
max_instances=1, # Prevent overlapping runs
replace_existing=True
)
self.scheduler.start()
logger.info("✅ Email scheduler started successfully")
def stop(self):
"""Stop the scheduler"""
if self.scheduler.running:
self.scheduler.shutdown(wait=True)
logger.info("👋 Email scheduler stopped")
async def _process_emails_job(self):
"""Job function that processes emails"""
try:
logger.info("🔄 Email processing job started...")
start_time = datetime.now()
stats = await self.processor.process_inbox()
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"✅ Email processing complete: {stats} (duration: {duration:.1f}s)")
except Exception as e:
logger.error(f"❌ Email processing job failed: {e}")
def run_manual(self):
"""Manually trigger email processing (for testing)"""
logger.info("🚀 Manual email processing triggered")
import asyncio
asyncio.create_task(self._process_emails_job())
# Global scheduler instance
email_scheduler = EmailScheduler()

View File

@ -577,3 +577,255 @@ class EmailService:
query = "UPDATE email_messages SET status = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s" query = "UPDATE email_messages SET status = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s"
execute_query(query, (status, email_id)) execute_query(query, (status, email_id))
logger.info(f"✅ Updated email {email_id} status to: {status}") logger.info(f"✅ Updated email {email_id} status to: {status}")
def parse_eml_file(self, content: bytes) -> Optional[Dict]:
"""
Parse .eml file content into standardized email dictionary
Args:
content: Raw .eml file bytes
Returns:
Dictionary with email data or None if parsing fails
"""
try:
from email import policy
from email.parser import BytesParser
msg = BytesParser(policy=policy.default).parsebytes(content)
# Extract basic fields
message_id = msg.get("Message-ID", "").strip("<>")
if not message_id:
# Generate fallback message ID from date + subject
import hashlib
hash_str = f"{msg.get('Date', '')}{msg.get('Subject', '')}{msg.get('From', '')}"
message_id = f"uploaded-{hashlib.md5(hash_str.encode()).hexdigest()}@bmchub.local"
# Parse sender
from_header = msg.get("From", "")
sender_name, sender_email = self._parse_email_address(from_header)
# Parse recipient
to_header = msg.get("To", "")
recipient_name, recipient_email = self._parse_email_address(to_header)
# Parse CC
cc_header = msg.get("Cc", "")
# Parse date
date_str = msg.get("Date")
try:
if date_str:
from email.utils import parsedate_to_datetime
received_date = parsedate_to_datetime(date_str)
else:
received_date = datetime.now()
except:
received_date = datetime.now()
# Extract body
body_text = ""
body_html = ""
attachments = []
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = part.get_content_disposition()
# Get text body
if content_type == "text/plain" and content_disposition != "attachment":
try:
body = part.get_payload(decode=True)
if body:
body_text = body.decode('utf-8', errors='ignore')
except:
pass
# Get HTML body
elif content_type == "text/html" and content_disposition != "attachment":
try:
body = part.get_payload(decode=True)
if body:
body_html = body.decode('utf-8', errors='ignore')
except:
pass
# Get attachments
elif content_disposition == "attachment":
filename = part.get_filename()
if filename:
try:
content = part.get_payload(decode=True)
if content:
attachments.append({
"filename": filename,
"content_type": content_type,
"content": content,
"size_bytes": len(content)
})
except:
pass
else:
# Single part email
try:
body = msg.get_payload(decode=True)
if body:
content_type = msg.get_content_type()
if content_type == "text/html":
body_html = body.decode('utf-8', errors='ignore')
else:
body_text = body.decode('utf-8', errors='ignore')
except:
pass
return {
"message_id": message_id,
"subject": msg.get("Subject", "No Subject"),
"sender_name": sender_name,
"sender_email": sender_email,
"recipient_email": recipient_email,
"cc": cc_header,
"body_text": body_text,
"body_html": body_html,
"received_date": received_date,
"has_attachments": len(attachments) > 0,
"attachments": attachments,
"folder": "uploaded"
}
except Exception as e:
logger.error(f"❌ Failed to parse .eml file: {e}")
return None
def parse_msg_file(self, content: bytes) -> Optional[Dict]:
"""
Parse Outlook .msg file content into standardized email dictionary
Args:
content: Raw .msg file bytes
Returns:
Dictionary with email data or None if parsing fails
"""
try:
import extract_msg
import io
import hashlib
# Create BytesIO object from content
msg_io = io.BytesIO(content)
msg = extract_msg.Message(msg_io)
# Generate message ID if not present
message_id = msg.messageId
if not message_id:
hash_str = f"{msg.date}{msg.subject}{msg.sender}"
message_id = f"uploaded-{hashlib.md5(hash_str.encode()).hexdigest()}@bmchub.local"
else:
message_id = message_id.strip("<>")
# Parse date
try:
received_date = msg.date if msg.date else datetime.now()
except:
received_date = datetime.now()
# Extract attachments
attachments = []
for attachment in msg.attachments:
try:
attachments.append({
"filename": attachment.longFilename or attachment.shortFilename or "unknown",
"content_type": attachment.mimetype or "application/octet-stream",
"content": attachment.data,
"size_bytes": len(attachment.data) if attachment.data else 0
})
except:
pass
return {
"message_id": message_id,
"subject": msg.subject or "No Subject",
"sender_name": msg.sender or "",
"sender_email": msg.senderEmail or "",
"recipient_email": msg.to or "",
"cc": msg.cc or "",
"body_text": msg.body or "",
"body_html": msg.htmlBody or "",
"received_date": received_date,
"has_attachments": len(attachments) > 0,
"attachments": attachments,
"folder": "uploaded"
}
except Exception as e:
logger.error(f"❌ Failed to parse .msg file: {e}")
return None
async def save_uploaded_email(self, email_data: Dict) -> Optional[int]:
"""
Save uploaded email to database
Args:
email_data: Parsed email dictionary
Returns:
email_id if successful, None if duplicate or error
"""
try:
# Check if email already exists
check_query = "SELECT id FROM email_messages WHERE message_id = %s"
existing = execute_query(check_query, (email_data["message_id"],))
if existing:
logger.info(f"⏭️ Email already exists: {email_data['message_id']}")
return None
# Insert email
query = """
INSERT INTO email_messages (
message_id, subject, sender_email, sender_name,
recipient_email, cc, body_text, body_html,
received_date, folder, has_attachments, attachment_count,
status, import_method, created_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
RETURNING id
"""
result = execute_insert(query, (
email_data["message_id"],
email_data["subject"],
email_data["sender_email"],
email_data["sender_name"],
email_data.get("recipient_email", ""),
email_data.get("cc", ""),
email_data["body_text"],
email_data["body_html"],
email_data["received_date"],
email_data["folder"],
email_data["has_attachments"],
len(email_data.get("attachments", [])),
"new",
"manual_upload"
))
if not result:
logger.error("❌ Failed to insert email - no ID returned")
return None
email_id = result[0]["id"]
# Save attachments
if email_data.get("attachments"):
await self._save_attachments(email_id, email_data["attachments"])
logger.info(f"✅ Saved uploaded email: {email_data['subject'][:50]}... (ID: {email_id})")
return email_id
except Exception as e:
logger.error(f"❌ Failed to save uploaded email: {e}")
return None

View File

@ -50,6 +50,7 @@ from app.settings.backend import router as settings_api
from app.settings.backend import views as settings_views from app.settings.backend import views as settings_views
from app.backups.backend.router import router as backups_api from app.backups.backend.router import router as backups_api
from app.backups.frontend import views as backups_views from app.backups.frontend import views as backups_views
from app.backups.backend.scheduler import backup_scheduler
# Configure logging # Configure logging
logging.basicConfig( logging.basicConfig(
@ -72,9 +73,13 @@ async def lifespan(app: FastAPI):
init_db() init_db()
# Start unified scheduler (handles backups + email fetch)
backup_scheduler.start()
logger.info("✅ System initialized successfully") logger.info("✅ System initialized successfully")
yield yield
# Shutdown # Shutdown
backup_scheduler.stop()
logger.info("👋 Shutting down...") logger.info("👋 Shutting down...")
# Create FastAPI app # Create FastAPI app

View File

@ -0,0 +1,24 @@
-- Migration: Add import_method tracking to email_messages
-- Purpose: Track how emails were imported (IMAP, Graph API, or manual upload)
-- Date: 2026-01-06
-- Add import_method column
ALTER TABLE email_messages
ADD COLUMN import_method VARCHAR(50) DEFAULT 'imap';
-- Add comment
COMMENT ON COLUMN email_messages.import_method IS 'How the email was imported: imap, graph_api, or manual_upload';
-- Create index for filtering by import method
CREATE INDEX idx_email_messages_import_method ON email_messages(import_method);
-- Update existing records to reflect their actual source
-- (all existing emails were fetched via IMAP or Graph API)
UPDATE email_messages
SET import_method = 'imap'
WHERE import_method IS NULL;
-- Add constraint to ensure valid values
ALTER TABLE email_messages
ADD CONSTRAINT chk_email_import_method
CHECK (import_method IN ('imap', 'graph_api', 'manual_upload'));

View File

@ -12,4 +12,4 @@ paramiko==3.4.1
apscheduler==3.10.4 apscheduler==3.10.4
pandas==2.2.3 pandas==2.2.3
openpyxl==3.1.2 openpyxl==3.1.2
extract-msg==0.48.8 extract-msg==0.55.0

View File

@ -0,0 +1,97 @@
#!/bin/bash
# Script til at linke tmodule_customers til customers automatisk
# Matcher baseret på navn eller economic_customer_number
set -e
CONTAINER_NAME="bmc-hub-postgres-prod"
DB_USER="bmc_hub"
DB_NAME="bmc_hub"
# Farver til output
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
NC='\033[0m' # No Color
# Check om container kører
if ! sudo podman ps | grep -q "$CONTAINER_NAME"; then
echo -e "${RED}❌ Container $CONTAINER_NAME kører ikke!${NC}"
exit 1
fi
echo -e "${GREEN}🔍 Finder kunder uden hub_customer_id...${NC}"
# Find alle tmodule_customers uden hub_customer_id
UNLINKED=$(sudo podman exec -i "$CONTAINER_NAME" psql -U "$DB_USER" -d "$DB_NAME" -t -c "
SELECT COUNT(*)
FROM tmodule_customers
WHERE hub_customer_id IS NULL;
")
echo -e "${YELLOW}📊 Fandt $UNLINKED ulinkede kunder${NC}"
if [ "$UNLINKED" -eq 0 ]; then
echo -e "${GREEN}✅ Alle kunder er allerede linket!${NC}"
exit 0
fi
echo -e "${GREEN}🔗 Linker kunder baseret på navn match...${NC}"
# Link kunder hvor navnet matcher præcist
LINKED=$(sudo podman exec -i "$CONTAINER_NAME" psql -U "$DB_USER" -d "$DB_NAME" -t -c "
UPDATE tmodule_customers tc
SET hub_customer_id = c.id
FROM customers c
WHERE tc.hub_customer_id IS NULL
AND LOWER(TRIM(tc.name)) = LOWER(TRIM(c.name))
RETURNING tc.id;
" | wc -l)
echo -e "${GREEN}✅ Linkede $LINKED kunder baseret på navn${NC}"
# Link kunder baseret på economic_customer_number (hvis begge har det)
echo -e "${GREEN}🔗 Linker kunder baseret på economic_customer_number...${NC}"
LINKED_ECON=$(sudo podman exec -i "$CONTAINER_NAME" psql -U "$DB_USER" -d "$DB_NAME" -t -c "
UPDATE tmodule_customers tc
SET hub_customer_id = c.id
FROM customers c
WHERE tc.hub_customer_id IS NULL
AND tc.economic_customer_number IS NOT NULL
AND c.economic_customer_number IS NOT NULL
AND tc.economic_customer_number = c.economic_customer_number
RETURNING tc.id;
" | wc -l)
echo -e "${GREEN}✅ Linkede $LINKED_ECON kunder baseret på economic_customer_number${NC}"
# Vis stadig ulinkede kunder
echo -e "${YELLOW}📋 Kunder der stadig mangler link:${NC}"
sudo podman exec -i "$CONTAINER_NAME" psql -U "$DB_USER" -d "$DB_NAME" -c "
SELECT
tc.id,
tc.vtiger_id,
tc.name,
tc.economic_customer_number
FROM tmodule_customers tc
WHERE tc.hub_customer_id IS NULL
ORDER BY tc.name
LIMIT 20;
"
REMAINING=$(sudo podman exec -i "$CONTAINER_NAME" psql -U "$DB_USER" -d "$DB_NAME" -t -c "
SELECT COUNT(*)
FROM tmodule_customers
WHERE hub_customer_id IS NULL;
")
echo -e "${YELLOW}⚠️ $REMAINING kunder mangler stadig link${NC}"
if [ "$REMAINING" -gt 0 ]; then
echo -e "${YELLOW}💡 Disse skal linkes manuelt via UI eller direkte SQL${NC}"
fi
echo -e "${GREEN}✅ Linking komplet!${NC}"