- Added PostgreSQL client installation to Dockerfile for database interactions. - Updated BackupScheduler to manage both backup jobs and email fetching jobs. - Implemented email fetching job with logging and error handling. - Enhanced the frontend to display scheduled jobs, including email fetch status. - Introduced email upload functionality with drag-and-drop support and progress tracking. - Added import_method tracking to email_messages for better source identification. - Updated email parsing logic for .eml and .msg files, including attachment handling. - Removed obsolete email scheduler service as functionality is integrated into BackupScheduler. - Updated requirements for extract-msg to the latest version. - Created migration script to add import_method column to email_messages table.
434 lines
17 KiB
Python
434 lines
17 KiB
Python
"""
|
|
Backup Scheduler
|
|
Manages scheduled backup jobs, rotation, offsite uploads, retry logic, and email fetch
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime, time
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
|
|
from app.core.config import settings
|
|
from app.core.database import execute_query
|
|
from app.backups.backend.service import backup_service
|
|
from app.backups.backend.notifications import notifications
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BackupScheduler:
|
|
"""Scheduler for automated backup operations"""
|
|
|
|
def __init__(self):
|
|
self.scheduler = AsyncIOScheduler()
|
|
self.enabled = settings.BACKUP_ENABLED
|
|
self.running = False
|
|
|
|
def start(self):
|
|
"""Start the scheduler with enabled jobs (backups and/or emails)"""
|
|
if self.running:
|
|
logger.warning("⚠️ Scheduler already running")
|
|
return
|
|
|
|
logger.info("🚀 Starting unified scheduler...")
|
|
|
|
# Add backup jobs if enabled
|
|
if self.enabled:
|
|
self._add_backup_jobs()
|
|
else:
|
|
logger.info("⏭️ Backup jobs disabled (BACKUP_ENABLED=false)")
|
|
|
|
# Email fetch job (every N minutes if enabled)
|
|
if settings.EMAIL_TO_TICKET_ENABLED:
|
|
self.scheduler.add_job(
|
|
func=self._email_fetch_job,
|
|
trigger=IntervalTrigger(minutes=settings.EMAIL_PROCESS_INTERVAL_MINUTES),
|
|
id='email_fetch',
|
|
name='Email Fetch & Process',
|
|
max_instances=1,
|
|
replace_existing=True
|
|
)
|
|
logger.info("✅ Scheduled: Email fetch every %d minute(s)",
|
|
settings.EMAIL_PROCESS_INTERVAL_MINUTES)
|
|
else:
|
|
logger.info("⏭️ Email fetch disabled (EMAIL_TO_TICKET_ENABLED=false)")
|
|
|
|
# Start the scheduler
|
|
self.scheduler.start()
|
|
self.running = True
|
|
|
|
logger.info("✅ Scheduler started successfully")
|
|
|
|
def _add_backup_jobs(self):
|
|
"""Add all backup-related jobs to scheduler"""
|
|
# Daily full backup at 02:00 CET
|
|
self.scheduler.add_job(
|
|
func=self._daily_backup_job,
|
|
trigger=CronTrigger(hour=2, minute=0),
|
|
id='daily_backup',
|
|
name='Daily Full Backup',
|
|
max_instances=1,
|
|
replace_existing=True
|
|
)
|
|
logger.info("✅ Scheduled: Daily backup at 02:00")
|
|
|
|
# Monthly backup on the 1st at 02:00 CET
|
|
self.scheduler.add_job(
|
|
func=self._monthly_backup_job,
|
|
trigger=CronTrigger(day=1, hour=2, minute=0),
|
|
id='monthly_backup',
|
|
name='Monthly Full Backup (SQL format)',
|
|
max_instances=1,
|
|
replace_existing=True
|
|
)
|
|
logger.info("✅ Scheduled: Monthly backup on 1st at 02:00")
|
|
|
|
# Weekly offsite upload (configurable day, default Sunday at 03:00)
|
|
offsite_day = self._get_weekday_number(settings.OFFSITE_WEEKLY_DAY)
|
|
self.scheduler.add_job(
|
|
func=self._offsite_upload_job,
|
|
trigger=CronTrigger(day_of_week=offsite_day, hour=3, minute=0),
|
|
id='offsite_upload',
|
|
name=f'Weekly Offsite Upload ({settings.OFFSITE_WEEKLY_DAY.capitalize()})',
|
|
max_instances=1,
|
|
replace_existing=True
|
|
)
|
|
logger.info("✅ Scheduled: Weekly offsite upload on %s at 03:00",
|
|
settings.OFFSITE_WEEKLY_DAY.capitalize())
|
|
|
|
# Offsite retry job (every hour)
|
|
self.scheduler.add_job(
|
|
func=self._offsite_retry_job,
|
|
trigger=IntervalTrigger(hours=settings.OFFSITE_RETRY_DELAY_HOURS),
|
|
id='offsite_retry',
|
|
name='Offsite Upload Retry',
|
|
max_instances=1,
|
|
replace_existing=True
|
|
)
|
|
logger.info("✅ Scheduled: Offsite retry every %d hour(s)",
|
|
settings.OFFSITE_RETRY_DELAY_HOURS)
|
|
|
|
# Backup rotation (daily at 01:00)
|
|
self.scheduler.add_job(
|
|
func=self._rotation_job,
|
|
trigger=CronTrigger(hour=1, minute=0),
|
|
id='backup_rotation',
|
|
name='Backup Rotation',
|
|
max_instances=1,
|
|
replace_existing=True
|
|
)
|
|
logger.info("✅ Scheduled: Backup rotation at 01:00")
|
|
|
|
# Storage check (daily at 01:30)
|
|
self.scheduler.add_job(
|
|
func=self._storage_check_job,
|
|
trigger=CronTrigger(hour=1, minute=30),
|
|
id='storage_check',
|
|
name='Storage Usage Check',
|
|
max_instances=1,
|
|
replace_existing=True
|
|
)
|
|
logger.info("✅ Scheduled: Storage check at 01:30")
|
|
|
|
def stop(self):
|
|
"""Stop the backup scheduler"""
|
|
if not self.running:
|
|
return
|
|
|
|
logger.info("🛑 Stopping backup scheduler...")
|
|
self.scheduler.shutdown(wait=True)
|
|
self.running = False
|
|
logger.info("✅ Backup scheduler stopped")
|
|
|
|
def pause(self):
|
|
"""Pause all scheduled jobs (for maintenance)"""
|
|
if not self.running:
|
|
return
|
|
|
|
logger.info("⏸️ Pausing backup scheduler...")
|
|
self.scheduler.pause()
|
|
logger.info("✅ Backup scheduler paused")
|
|
|
|
def resume(self):
|
|
"""Resume all scheduled jobs"""
|
|
if not self.running:
|
|
return
|
|
|
|
logger.info("▶️ Resuming backup scheduler...")
|
|
self.scheduler.resume()
|
|
logger.info("✅ Backup scheduler resumed")
|
|
|
|
async def _daily_backup_job(self):
|
|
"""Daily full backup job (database + files)"""
|
|
logger.info("🔄 Starting daily backup job...")
|
|
|
|
try:
|
|
start_time = datetime.now()
|
|
|
|
# Create full backup (database uses compressed .dump format)
|
|
db_job_id, files_job_id = await backup_service.create_full_backup(is_monthly=False)
|
|
|
|
end_time = datetime.now()
|
|
duration = (end_time - start_time).total_seconds()
|
|
|
|
if db_job_id and files_job_id:
|
|
logger.info("✅ Daily backup completed: db=%s, files=%s (%.1fs)",
|
|
db_job_id, files_job_id, duration)
|
|
|
|
# Send success notification for database backup
|
|
db_backup = execute_query_single(
|
|
"SELECT * FROM backup_jobs WHERE id = %s",
|
|
(db_job_id,))
|
|
|
|
if db_backup:
|
|
await notifications.send_backup_success(
|
|
job_id=db_job_id,
|
|
job_type='database',
|
|
file_size_bytes=db_backup['file_size_bytes'],
|
|
duration_seconds=duration / 2, # Rough estimate
|
|
checksum=db_backup['checksum_sha256'],
|
|
is_monthly=False
|
|
)
|
|
else:
|
|
logger.error("❌ Daily backup failed: db=%s, files=%s", db_job_id, files_job_id)
|
|
|
|
# Send failure notification
|
|
if not db_job_id:
|
|
await notifications.send_backup_failed(
|
|
job_id=0,
|
|
job_type='database',
|
|
error_message='Database backup failed - see logs for details'
|
|
)
|
|
|
|
if not files_job_id:
|
|
await notifications.send_backup_failed(
|
|
job_id=0,
|
|
job_type='files',
|
|
error_message='Files backup failed - see logs for details'
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error("❌ Daily backup job error: %s", str(e), exc_info=True)
|
|
await notifications.send_backup_failed(
|
|
job_id=0,
|
|
job_type='full',
|
|
error_message=str(e)
|
|
)
|
|
|
|
async def _monthly_backup_job(self):
|
|
"""Monthly full backup job (database uses plain SQL format)"""
|
|
logger.info("🔄 Starting monthly backup job...")
|
|
|
|
try:
|
|
start_time = datetime.now()
|
|
|
|
# Create full backup with is_monthly=True (uses plain SQL format)
|
|
db_job_id, files_job_id = await backup_service.create_full_backup(is_monthly=True)
|
|
|
|
end_time = datetime.now()
|
|
duration = (end_time - start_time).total_seconds()
|
|
|
|
if db_job_id and files_job_id:
|
|
logger.info("✅ Monthly backup completed: db=%s, files=%s (%.1fs)",
|
|
db_job_id, files_job_id, duration)
|
|
|
|
# Send success notification for database backup
|
|
db_backup = execute_query_single(
|
|
"SELECT * FROM backup_jobs WHERE id = %s",
|
|
(db_job_id,))
|
|
|
|
if db_backup:
|
|
await notifications.send_backup_success(
|
|
job_id=db_job_id,
|
|
job_type='database',
|
|
file_size_bytes=db_backup['file_size_bytes'],
|
|
duration_seconds=duration / 2,
|
|
checksum=db_backup['checksum_sha256'],
|
|
is_monthly=True
|
|
)
|
|
else:
|
|
logger.error("❌ Monthly backup failed: db=%s, files=%s", db_job_id, files_job_id)
|
|
|
|
await notifications.send_backup_failed(
|
|
job_id=0,
|
|
job_type='monthly',
|
|
error_message='Monthly backup failed - see logs for details'
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error("❌ Monthly backup job error: %s", str(e), exc_info=True)
|
|
await notifications.send_backup_failed(
|
|
job_id=0,
|
|
job_type='monthly',
|
|
error_message=str(e)
|
|
)
|
|
|
|
async def _offsite_upload_job(self):
|
|
"""Weekly offsite upload job - uploads all backups not yet uploaded"""
|
|
if not settings.OFFSITE_ENABLED:
|
|
logger.info("⏭️ Offsite upload skipped (OFFSITE_ENABLED=false)")
|
|
return
|
|
|
|
logger.info("☁️ Starting weekly offsite upload job...")
|
|
|
|
try:
|
|
# Find all completed backups not yet uploaded
|
|
pending_backups = execute_query_single(
|
|
"""SELECT * FROM backup_jobs
|
|
WHERE status = 'completed'
|
|
AND offsite_uploaded_at IS NULL
|
|
AND offsite_retry_count < %s
|
|
ORDER BY completed_at ASC""",
|
|
(settings.OFFSITE_RETRY_MAX_ATTEMPTS,)
|
|
)
|
|
|
|
if not pending_backups:
|
|
logger.info("✅ No pending backups for offsite upload")
|
|
return
|
|
|
|
logger.info("📦 Found %d backups pending offsite upload", len(pending_backups))
|
|
|
|
success_count = 0
|
|
fail_count = 0
|
|
|
|
for backup in pending_backups:
|
|
success = await backup_service.upload_offsite(backup['id'])
|
|
|
|
if success:
|
|
success_count += 1
|
|
|
|
# Send success notification
|
|
await notifications.send_offsite_success(
|
|
job_id=backup['id'],
|
|
backup_name=backup['file_path'].split('/')[-1],
|
|
file_size_bytes=backup['file_size_bytes']
|
|
)
|
|
else:
|
|
fail_count += 1
|
|
|
|
# Get updated retry count
|
|
updated_backup = execute_query(
|
|
"SELECT offsite_retry_count FROM backup_jobs WHERE id = %s",
|
|
(backup['id'],))
|
|
|
|
# Send failure notification
|
|
await notifications.send_offsite_failed(
|
|
job_id=backup['id'],
|
|
backup_name=backup['file_path'].split('/')[-1],
|
|
error_message='Offsite upload failed - will retry',
|
|
retry_count=updated_backup['offsite_retry_count']
|
|
)
|
|
|
|
logger.info("✅ Offsite upload job completed: %d success, %d failed",
|
|
success_count, fail_count)
|
|
|
|
except Exception as e:
|
|
logger.error("❌ Offsite upload job error: %s", str(e), exc_info=True)
|
|
|
|
async def _offsite_retry_job(self):
|
|
"""Retry failed offsite uploads"""
|
|
if not settings.OFFSITE_ENABLED:
|
|
return
|
|
|
|
# Find backups that failed offsite upload and haven't exceeded retry limit
|
|
retry_backups = execute_query(
|
|
"""SELECT * FROM backup_jobs
|
|
WHERE status = 'completed'
|
|
AND offsite_uploaded_at IS NULL
|
|
AND offsite_retry_count > 0
|
|
AND offsite_retry_count < %s
|
|
ORDER BY offsite_retry_count ASC, completed_at ASC
|
|
LIMIT 5""", # Limit to 5 retries per run
|
|
(settings.OFFSITE_RETRY_MAX_ATTEMPTS,)
|
|
)
|
|
|
|
if not retry_backups:
|
|
return
|
|
|
|
logger.info("🔁 Retrying %d failed offsite uploads...", len(retry_backups))
|
|
|
|
for backup in retry_backups:
|
|
logger.info("🔁 Retry attempt %d/%d for backup %s",
|
|
backup['offsite_retry_count'] + 1,
|
|
settings.OFFSITE_RETRY_MAX_ATTEMPTS,
|
|
backup['id'])
|
|
|
|
success = await backup_service.upload_offsite(backup['id'])
|
|
|
|
if success:
|
|
logger.info("✅ Offsite upload succeeded on retry: backup %s", backup['id'])
|
|
|
|
await notifications.send_offsite_success(
|
|
job_id=backup['id'],
|
|
backup_name=backup['file_path'].split('/')[-1],
|
|
file_size_bytes=backup['file_size_bytes']
|
|
)
|
|
|
|
async def _rotation_job(self):
|
|
"""Backup rotation job - removes expired backups"""
|
|
logger.info("🔄 Starting backup rotation job...")
|
|
|
|
try:
|
|
await backup_service.rotate_backups()
|
|
logger.info("✅ Backup rotation completed")
|
|
|
|
except Exception as e:
|
|
logger.error("❌ Backup rotation error: %s", str(e), exc_info=True)
|
|
|
|
async def _storage_check_job(self):
|
|
"""Storage usage check job - warns if storage is running low"""
|
|
logger.info("🔄 Starting storage check job...")
|
|
|
|
try:
|
|
stats = await backup_service.check_storage_usage()
|
|
|
|
if stats['warning']:
|
|
await notifications.send_storage_warning(
|
|
usage_pct=stats['usage_pct'],
|
|
used_gb=stats['total_size_gb'],
|
|
max_gb=settings.BACKUP_MAX_SIZE_GB
|
|
)
|
|
|
|
logger.info("✅ Storage check completed: %.1f%% used (%.2f GB / %d GB)",
|
|
stats['usage_pct'], stats['total_size_gb'], settings.BACKUP_MAX_SIZE_GB)
|
|
|
|
except Exception as e:
|
|
logger.error("❌ Storage check error: %s", str(e), exc_info=True)
|
|
|
|
async def _email_fetch_job(self):
|
|
"""Email fetch and processing job"""
|
|
try:
|
|
logger.info("🔄 Email processing job started...")
|
|
|
|
# Import here to avoid circular dependencies
|
|
from app.services.email_processor_service import EmailProcessorService
|
|
|
|
processor = EmailProcessorService()
|
|
start_time = datetime.now()
|
|
stats = await processor.process_inbox()
|
|
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
|
|
logger.info(f"✅ Email processing complete: {stats} (duration: {duration:.1f}s)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Email processing job failed: {e}")
|
|
|
|
def _get_weekday_number(self, day_name: str) -> int:
|
|
"""Convert day name to APScheduler weekday number (0=Monday, 6=Sunday)"""
|
|
days = {
|
|
'monday': 0,
|
|
'tuesday': 1,
|
|
'wednesday': 2,
|
|
'thursday': 3,
|
|
'friday': 4,
|
|
'saturday': 5,
|
|
'sunday': 6
|
|
}
|
|
return days.get(day_name.lower(), 6) # Default to Sunday
|
|
|
|
|
|
# Singleton instance
|
|
backup_scheduler = BackupScheduler()
|