bmc_hub/app/backups/backend/scheduler.py

434 lines
17 KiB
Python
Raw Normal View History

"""
Backup Scheduler
Manages scheduled backup jobs, rotation, offsite uploads, retry logic, and email fetch
"""
import logging
from datetime import datetime, time
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from app.core.config import settings
from app.core.database import execute_query
from app.backups.backend.service import backup_service
from app.backups.backend.notifications import notifications
logger = logging.getLogger(__name__)
class BackupScheduler:
"""Scheduler for automated backup operations"""
def __init__(self):
self.scheduler = AsyncIOScheduler()
self.enabled = settings.BACKUP_ENABLED
self.running = False
def start(self):
"""Start the scheduler with enabled jobs (backups and/or emails)"""
if self.running:
logger.warning("⚠️ Scheduler already running")
return
logger.info("🚀 Starting unified scheduler...")
# Add backup jobs if enabled
if self.enabled:
self._add_backup_jobs()
else:
logger.info("⏭️ Backup jobs disabled (BACKUP_ENABLED=false)")
# Email fetch job (every N minutes if enabled)
if settings.EMAIL_TO_TICKET_ENABLED:
self.scheduler.add_job(
func=self._email_fetch_job,
trigger=IntervalTrigger(minutes=settings.EMAIL_PROCESS_INTERVAL_MINUTES),
id='email_fetch',
name='Email Fetch & Process',
max_instances=1,
replace_existing=True
)
logger.info("✅ Scheduled: Email fetch every %d minute(s)",
settings.EMAIL_PROCESS_INTERVAL_MINUTES)
else:
logger.info("⏭️ Email fetch disabled (EMAIL_TO_TICKET_ENABLED=false)")
# Start the scheduler
self.scheduler.start()
self.running = True
logger.info("✅ Scheduler started successfully")
def _add_backup_jobs(self):
"""Add all backup-related jobs to scheduler"""
# Daily full backup at 02:00 CET
self.scheduler.add_job(
func=self._daily_backup_job,
trigger=CronTrigger(hour=2, minute=0),
id='daily_backup',
name='Daily Full Backup',
max_instances=1,
replace_existing=True
)
logger.info("✅ Scheduled: Daily backup at 02:00")
# Monthly backup on the 1st at 02:00 CET
self.scheduler.add_job(
func=self._monthly_backup_job,
trigger=CronTrigger(day=1, hour=2, minute=0),
id='monthly_backup',
name='Monthly Full Backup (SQL format)',
max_instances=1,
replace_existing=True
)
logger.info("✅ Scheduled: Monthly backup on 1st at 02:00")
# Weekly offsite upload (configurable day, default Sunday at 03:00)
offsite_day = self._get_weekday_number(settings.OFFSITE_WEEKLY_DAY)
self.scheduler.add_job(
func=self._offsite_upload_job,
trigger=CronTrigger(day_of_week=offsite_day, hour=3, minute=0),
id='offsite_upload',
name=f'Weekly Offsite Upload ({settings.OFFSITE_WEEKLY_DAY.capitalize()})',
max_instances=1,
replace_existing=True
)
logger.info("✅ Scheduled: Weekly offsite upload on %s at 03:00",
settings.OFFSITE_WEEKLY_DAY.capitalize())
# Offsite retry job (every hour)
self.scheduler.add_job(
func=self._offsite_retry_job,
trigger=IntervalTrigger(hours=settings.OFFSITE_RETRY_DELAY_HOURS),
id='offsite_retry',
name='Offsite Upload Retry',
max_instances=1,
replace_existing=True
)
logger.info("✅ Scheduled: Offsite retry every %d hour(s)",
settings.OFFSITE_RETRY_DELAY_HOURS)
# Backup rotation (daily at 01:00)
self.scheduler.add_job(
func=self._rotation_job,
trigger=CronTrigger(hour=1, minute=0),
id='backup_rotation',
name='Backup Rotation',
max_instances=1,
replace_existing=True
)
logger.info("✅ Scheduled: Backup rotation at 01:00")
# Storage check (daily at 01:30)
self.scheduler.add_job(
func=self._storage_check_job,
trigger=CronTrigger(hour=1, minute=30),
id='storage_check',
name='Storage Usage Check',
max_instances=1,
replace_existing=True
)
logger.info("✅ Scheduled: Storage check at 01:30")
def stop(self):
"""Stop the backup scheduler"""
if not self.running:
return
logger.info("🛑 Stopping backup scheduler...")
self.scheduler.shutdown(wait=True)
self.running = False
logger.info("✅ Backup scheduler stopped")
def pause(self):
"""Pause all scheduled jobs (for maintenance)"""
if not self.running:
return
logger.info("⏸️ Pausing backup scheduler...")
self.scheduler.pause()
logger.info("✅ Backup scheduler paused")
def resume(self):
"""Resume all scheduled jobs"""
if not self.running:
return
logger.info("▶️ Resuming backup scheduler...")
self.scheduler.resume()
logger.info("✅ Backup scheduler resumed")
async def _daily_backup_job(self):
"""Daily full backup job (database + files)"""
logger.info("🔄 Starting daily backup job...")
try:
start_time = datetime.now()
# Create full backup (database uses compressed .dump format)
db_job_id, files_job_id = await backup_service.create_full_backup(is_monthly=False)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
if db_job_id and files_job_id:
logger.info("✅ Daily backup completed: db=%s, files=%s (%.1fs)",
db_job_id, files_job_id, duration)
# Send success notification for database backup
db_backup = execute_query_single(
"SELECT * FROM backup_jobs WHERE id = %s",
(db_job_id,))
if db_backup:
await notifications.send_backup_success(
job_id=db_job_id,
job_type='database',
file_size_bytes=db_backup['file_size_bytes'],
duration_seconds=duration / 2, # Rough estimate
checksum=db_backup['checksum_sha256'],
is_monthly=False
)
else:
logger.error("❌ Daily backup failed: db=%s, files=%s", db_job_id, files_job_id)
# Send failure notification
if not db_job_id:
await notifications.send_backup_failed(
job_id=0,
job_type='database',
error_message='Database backup failed - see logs for details'
)
if not files_job_id:
await notifications.send_backup_failed(
job_id=0,
job_type='files',
error_message='Files backup failed - see logs for details'
)
except Exception as e:
logger.error("❌ Daily backup job error: %s", str(e), exc_info=True)
await notifications.send_backup_failed(
job_id=0,
job_type='full',
error_message=str(e)
)
async def _monthly_backup_job(self):
"""Monthly full backup job (database uses plain SQL format)"""
logger.info("🔄 Starting monthly backup job...")
try:
start_time = datetime.now()
# Create full backup with is_monthly=True (uses plain SQL format)
db_job_id, files_job_id = await backup_service.create_full_backup(is_monthly=True)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
if db_job_id and files_job_id:
logger.info("✅ Monthly backup completed: db=%s, files=%s (%.1fs)",
db_job_id, files_job_id, duration)
# Send success notification for database backup
db_backup = execute_query_single(
"SELECT * FROM backup_jobs WHERE id = %s",
(db_job_id,))
if db_backup:
await notifications.send_backup_success(
job_id=db_job_id,
job_type='database',
file_size_bytes=db_backup['file_size_bytes'],
duration_seconds=duration / 2,
checksum=db_backup['checksum_sha256'],
is_monthly=True
)
else:
logger.error("❌ Monthly backup failed: db=%s, files=%s", db_job_id, files_job_id)
await notifications.send_backup_failed(
job_id=0,
job_type='monthly',
error_message='Monthly backup failed - see logs for details'
)
except Exception as e:
logger.error("❌ Monthly backup job error: %s", str(e), exc_info=True)
await notifications.send_backup_failed(
job_id=0,
job_type='monthly',
error_message=str(e)
)
async def _offsite_upload_job(self):
"""Weekly offsite upload job - uploads all backups not yet uploaded"""
if not settings.OFFSITE_ENABLED:
logger.info("⏭️ Offsite upload skipped (OFFSITE_ENABLED=false)")
return
logger.info("☁️ Starting weekly offsite upload job...")
try:
# Find all completed backups not yet uploaded
pending_backups = execute_query_single(
"""SELECT * FROM backup_jobs
WHERE status = 'completed'
AND offsite_uploaded_at IS NULL
AND offsite_retry_count < %s
ORDER BY completed_at ASC""",
(settings.OFFSITE_RETRY_MAX_ATTEMPTS,)
)
if not pending_backups:
logger.info("✅ No pending backups for offsite upload")
return
logger.info("📦 Found %d backups pending offsite upload", len(pending_backups))
success_count = 0
fail_count = 0
for backup in pending_backups:
success = await backup_service.upload_offsite(backup['id'])
if success:
success_count += 1
# Send success notification
await notifications.send_offsite_success(
job_id=backup['id'],
backup_name=backup['file_path'].split('/')[-1],
file_size_bytes=backup['file_size_bytes']
)
else:
fail_count += 1
# Get updated retry count
updated_backup = execute_query(
"SELECT offsite_retry_count FROM backup_jobs WHERE id = %s",
(backup['id'],))
# Send failure notification
await notifications.send_offsite_failed(
job_id=backup['id'],
backup_name=backup['file_path'].split('/')[-1],
error_message='Offsite upload failed - will retry',
retry_count=updated_backup['offsite_retry_count']
)
logger.info("✅ Offsite upload job completed: %d success, %d failed",
success_count, fail_count)
except Exception as e:
logger.error("❌ Offsite upload job error: %s", str(e), exc_info=True)
async def _offsite_retry_job(self):
"""Retry failed offsite uploads"""
if not settings.OFFSITE_ENABLED:
return
# Find backups that failed offsite upload and haven't exceeded retry limit
retry_backups = execute_query(
"""SELECT * FROM backup_jobs
WHERE status = 'completed'
AND offsite_uploaded_at IS NULL
AND offsite_retry_count > 0
AND offsite_retry_count < %s
ORDER BY offsite_retry_count ASC, completed_at ASC
LIMIT 5""", # Limit to 5 retries per run
(settings.OFFSITE_RETRY_MAX_ATTEMPTS,)
)
if not retry_backups:
return
logger.info("🔁 Retrying %d failed offsite uploads...", len(retry_backups))
for backup in retry_backups:
logger.info("🔁 Retry attempt %d/%d for backup %s",
backup['offsite_retry_count'] + 1,
settings.OFFSITE_RETRY_MAX_ATTEMPTS,
backup['id'])
success = await backup_service.upload_offsite(backup['id'])
if success:
logger.info("✅ Offsite upload succeeded on retry: backup %s", backup['id'])
await notifications.send_offsite_success(
job_id=backup['id'],
backup_name=backup['file_path'].split('/')[-1],
file_size_bytes=backup['file_size_bytes']
)
async def _rotation_job(self):
"""Backup rotation job - removes expired backups"""
logger.info("🔄 Starting backup rotation job...")
try:
await backup_service.rotate_backups()
logger.info("✅ Backup rotation completed")
except Exception as e:
logger.error("❌ Backup rotation error: %s", str(e), exc_info=True)
async def _storage_check_job(self):
"""Storage usage check job - warns if storage is running low"""
logger.info("🔄 Starting storage check job...")
try:
stats = await backup_service.check_storage_usage()
if stats['warning']:
await notifications.send_storage_warning(
usage_pct=stats['usage_pct'],
used_gb=stats['total_size_gb'],
max_gb=settings.BACKUP_MAX_SIZE_GB
)
logger.info("✅ Storage check completed: %.1f%% used (%.2f GB / %d GB)",
stats['usage_pct'], stats['total_size_gb'], settings.BACKUP_MAX_SIZE_GB)
except Exception as e:
logger.error("❌ Storage check error: %s", str(e), exc_info=True)
async def _email_fetch_job(self):
"""Email fetch and processing job"""
try:
logger.info("🔄 Email processing job started...")
# Import here to avoid circular dependencies
from app.services.email_processor_service import EmailProcessorService
processor = EmailProcessorService()
start_time = datetime.now()
stats = await processor.process_inbox()
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"✅ Email processing complete: {stats} (duration: {duration:.1f}s)")
except Exception as e:
logger.error(f"❌ Email processing job failed: {e}")
def _get_weekday_number(self, day_name: str) -> int:
"""Convert day name to APScheduler weekday number (0=Monday, 6=Sunday)"""
days = {
'monday': 0,
'tuesday': 1,
'wednesday': 2,
'thursday': 3,
'friday': 4,
'saturday': 5,
'sunday': 6
}
return days.get(day_name.lower(), 6) # Default to Sunday
# Singleton instance
backup_scheduler = BackupScheduler()