""" Backup Service Handles database and file backup operations, rotation, restore, and offsite uploads. """ import os import logging import hashlib import tarfile import subprocess import fcntl from pathlib import Path from datetime import datetime, timedelta from typing import Optional, Dict, List, Tuple import paramiko from stat import S_ISDIR from app.core.config import settings from app.core.database import execute_query, execute_insert, execute_update, execute_query_single logger = logging.getLogger(__name__) class BackupService: """Service for managing backup operations""" def __init__(self): self.backup_dir = Path(settings.BACKUP_STORAGE_PATH) self.backup_dir.mkdir(parents=True, exist_ok=True) # Subdirectories for different backup types self.db_dir = self.backup_dir / "database" self.files_dir = self.backup_dir / "files" self.db_dir.mkdir(exist_ok=True) self.files_dir.mkdir(exist_ok=True) async def create_database_backup(self, is_monthly: bool = False) -> Optional[int]: """ Create PostgreSQL database backup using pg_dump Args: is_monthly: If True, creates plain SQL backup for readability Returns: backup_job_id or None if failed """ if settings.BACKUP_DRY_RUN: logger.info("🔄 DRY RUN: Would create database backup (monthly=%s)", is_monthly) return None # Determine format based on monthly flag backup_format = settings.DB_MONTHLY_FORMAT if is_monthly else settings.DB_DAILY_FORMAT timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"db_{timestamp}_{'monthly' if is_monthly else 'daily'}.{backup_format}" backup_path = self.db_dir / filename # Create backup job record job_id = execute_insert( """INSERT INTO backup_jobs (job_type, status, backup_format, is_monthly, started_at) VALUES (%s, %s, %s, %s, %s) RETURNING id""", ('database', 'running', backup_format, is_monthly, datetime.now()) ) logger.info("🔄 Starting database backup: job_id=%s, format=%s, monthly=%s", job_id, backup_format, is_monthly) try: # Build pg_dump command - connect via network to postgres service env = os.environ.copy() env['PGPASSWORD'] = settings.DATABASE_URL.split(':')[2].split('@')[0] # Extract password # Parse database connection info from DATABASE_URL # Format: postgresql://user:pass@host:port/dbname db_parts = settings.DATABASE_URL.replace('postgresql://', '').split('@') user_pass = db_parts[0].split(':') host_db = db_parts[1].split('/') user = user_pass[0] password = user_pass[1] if len(user_pass) > 1 else '' host = host_db[0].split(':')[0] if ':' in host_db[0] else host_db[0] dbname = host_db[1] if len(host_db) > 1 else 'bmc_hub' env['PGPASSWORD'] = password if backup_format == 'dump': # Compressed custom format (-Fc) cmd = ['pg_dump', '-h', host, '-U', user, '-Fc', dbname] else: # Plain SQL format cmd = ['pg_dump', '-h', host, '-U', user, dbname] # Execute pg_dump and write to file logger.info("📦 Executing: %s > %s", ' '.join(cmd), backup_path) with open(backup_path, 'wb') as f: result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, check=True, env=env) # Calculate file size and checksum file_size = backup_path.stat().st_size checksum = self._calculate_checksum(backup_path) # Calculate retention date if is_monthly: retention_until = datetime.now() + timedelta(days=settings.BACKUP_RETENTION_MONTHLY * 30) else: retention_until = datetime.now() + timedelta(days=settings.BACKUP_RETENTION_DAYS) # Update job record execute_update( """UPDATE backup_jobs SET status = %s, completed_at = %s, file_path = %s, file_size_bytes = %s, checksum_sha256 = %s, retention_until = %s WHERE id = %s""", ('completed', datetime.now(), str(backup_path), file_size, checksum, retention_until.date(), job_id) ) logger.info("✅ Database backup completed: %s (%.2f MB)", filename, file_size / 1024 / 1024) return job_id except subprocess.CalledProcessError as e: error_msg = e.stderr.decode() if e.stderr else str(e) logger.error("❌ Database backup failed: %s", error_msg) execute_update( """UPDATE backup_jobs SET status = %s, completed_at = %s, error_message = %s WHERE id = %s""", ('failed', datetime.now(), error_msg, job_id) ) # Clean up partial backup file if backup_path.exists(): backup_path.unlink() return None async def create_files_backup(self) -> Optional[int]: """ Create tar.gz backup of file directories (uploads/, data/, logs/) Returns: backup_job_id or None if failed """ if settings.BACKUP_DRY_RUN: logger.info("🔄 DRY RUN: Would create files backup") return None timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"files_{timestamp}.tar.gz" backup_path = self.files_dir / filename # Paths to backup (relative to project root) base_path = Path.cwd() paths_to_backup = [] if settings.BACKUP_INCLUDE_UPLOADS: uploads_path = base_path / settings.UPLOAD_DIR if uploads_path.exists(): paths_to_backup.append((uploads_path, 'uploads')) if settings.BACKUP_INCLUDE_DATA: data_path = base_path / 'data' if data_path.exists(): paths_to_backup.append((data_path, 'data')) if settings.BACKUP_INCLUDE_LOGS: logs_path = base_path / 'logs' if logs_path.exists(): paths_to_backup.append((logs_path, 'logs')) if not paths_to_backup: logger.warning("⚠️ No file directories to backup") return None # Create backup job record job_id = execute_insert( """INSERT INTO backup_jobs (job_type, status, backup_format, includes_uploads, includes_logs, includes_data, started_at) VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id""", ('files', 'running', 'tar.gz', settings.BACKUP_INCLUDE_UPLOADS, settings.BACKUP_INCLUDE_LOGS, settings.BACKUP_INCLUDE_DATA, datetime.now()) ) logger.info("🔄 Starting files backup: job_id=%s, paths=%s", job_id, [name for _, name in paths_to_backup]) try: # Exclude patterns exclude_patterns = [ '__pycache__', '*.pyc', '*.pyo', '*.pyd', '.DS_Store', '.git', 'backup', # Don't backup the backup directory itself! ] # Create tar.gz archive with tarfile.open(backup_path, 'w:gz') as tar: for path, arcname in paths_to_backup: tar.add( path, arcname=arcname, recursive=True, filter=lambda ti: None if any( pattern in ti.name for pattern in exclude_patterns ) else ti ) # Calculate file size and checksum file_size = backup_path.stat().st_size checksum = self._calculate_checksum(backup_path) # Calculate retention date (files use daily retention) retention_until = datetime.now() + timedelta(days=settings.BACKUP_RETENTION_DAYS) # Update job record execute_update( """UPDATE backup_jobs SET status = %s, completed_at = %s, file_path = %s, file_size_bytes = %s, checksum_sha256 = %s, retention_until = %s WHERE id = %s""", ('completed', datetime.now(), str(backup_path), file_size, checksum, retention_until.date(), job_id) ) logger.info("✅ Files backup completed: %s (%.2f MB)", filename, file_size / 1024 / 1024) return job_id except Exception as e: logger.error("❌ Files backup failed: %s", str(e)) execute_update( """UPDATE backup_jobs SET status = %s, completed_at = %s, error_message = %s WHERE id = %s""", ('failed', datetime.now(), str(e), job_id) ) # Clean up partial backup file if backup_path.exists(): backup_path.unlink() return None async def create_full_backup(self, is_monthly: bool = False) -> Tuple[Optional[int], Optional[int]]: """ Create full backup (database + files) Returns: (db_job_id, files_job_id) tuple """ logger.info("🔄 Starting full backup (database + files)") db_job_id = await self.create_database_backup(is_monthly=is_monthly) files_job_id = await self.create_files_backup() if db_job_id and files_job_id: logger.info("✅ Full backup completed: db=%s, files=%s", db_job_id, files_job_id) else: logger.warning("⚠️ Full backup partially failed: db=%s, files=%s", db_job_id, files_job_id) return (db_job_id, files_job_id) async def rotate_backups(self): """ Remove old backups based on retention policy: - Daily backups: Keep for RETENTION_DAYS (default 30 days) - Monthly backups: Keep for MONTHLY_KEEP_MONTHS (default 12 months) """ if settings.BACKUP_DRY_RUN: logger.info("🔄 DRY RUN: Would rotate backups") return logger.info("🔄 Starting backup rotation") # Find expired backups expired_backups = execute_query_single( """SELECT id, file_path, is_monthly, retention_until FROM backup_jobs WHERE status = 'completed' AND retention_until < CURRENT_DATE ORDER BY retention_until ASC""" ) deleted_count = 0 freed_bytes = 0 for backup in expired_backups: file_path = Path(backup['file_path']) if file_path.exists(): file_size = file_path.stat().st_size file_path.unlink() freed_bytes += file_size logger.info("🗑️ Deleted expired backup: %s (%.2f MB, retention_until=%s)", file_path.name, file_size / 1024 / 1024, backup['retention_until']) # Delete from database execute_update("DELETE FROM backup_jobs WHERE id = %s", (backup['id'],)) deleted_count += 1 if deleted_count > 0: logger.info("✅ Rotation complete: deleted %d backups, freed %.2f MB", deleted_count, freed_bytes / 1024 / 1024) else: logger.info("✅ Rotation complete: no expired backups") async def restore_database(self, job_id: int) -> bool: """ Restore database from backup to NEW database with timestamp suffix Strategy: 1. Create new database: bmc_hub_restored_YYYYMMDD_HHMMSS 2. Restore backup to NEW database (no conflicts!) 3. Return new database name in response 4. User updates .env to point to new database 5. Test system, then cleanup old database Args: job_id: Backup job ID to restore from Returns: True if successful, False otherwise """ if settings.BACKUP_READ_ONLY: logger.error("❌ Restore blocked: BACKUP_READ_ONLY=true") return False if settings.BACKUP_RESTORE_DRY_RUN: logger.warning("🔄 DRY RUN MODE: Would restore database from backup job %s", job_id) logger.warning("🔄 Set BACKUP_RESTORE_DRY_RUN=false to actually restore") return False # Get backup job backup = execute_query_single( "SELECT * FROM backup_jobs WHERE id = %s AND job_type = 'database'", (job_id,)) if not backup: logger.error("❌ Backup job not found: %s", job_id) return False backup_path = Path(backup['file_path']) if not backup_path.exists(): logger.error("❌ Backup file not found: %s", backup_path) return False # Generate new database name with timestamp from datetime import datetime timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') new_dbname = f"bmc_hub_restored_{timestamp}" logger.info("🔄 Starting database restore from backup: %s", backup_path.name) logger.info("🎯 Target: NEW database '%s' (safe restore!)", new_dbname) # Enable maintenance mode await self.set_maintenance_mode(True, "Database restore i gang", eta_minutes=5) # TODO: Stop scheduler (will be implemented in scheduler.py) try: # Verify checksum current_checksum = self._calculate_checksum(backup_path) if current_checksum != backup['checksum_sha256']: raise ValueError(f"Checksum mismatch! Expected {backup['checksum_sha256']}, got {current_checksum}") logger.info("✅ Checksum verified") # Acquire file lock to prevent concurrent operations lock_file = self.backup_dir / ".restore.lock" with open(lock_file, 'w') as lock_f: fcntl.flock(lock_f.fileno(), fcntl.LOCK_EX) # Parse database connection info env = os.environ.copy() db_parts = settings.DATABASE_URL.replace('postgresql://', '').split('@') user_pass = db_parts[0].split(':') host_db = db_parts[1].split('/') user = user_pass[0] password = user_pass[1] if len(user_pass) > 1 else '' host = host_db[0].split(':')[0] if ':' in host_db[0] else host_db[0] dbname = host_db[1] if len(host_db) > 1 else 'bmc_hub' env['PGPASSWORD'] = password # Step 1: Create new empty database logger.info("📦 Creating new database: %s", new_dbname) create_cmd = ['psql', '-h', host, '-U', user, '-d', 'postgres', '-c', f"CREATE DATABASE {new_dbname} OWNER {user};"] result = subprocess.run(create_cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, text=True, env=env) if result.returncode != 0: logger.error("❌ Failed to create database: %s", result.stderr) fcntl.flock(lock_f.fileno(), fcntl.LOCK_UN) raise RuntimeError(f"CREATE DATABASE failed: {result.stderr}") logger.info("✅ New database created: %s", new_dbname) # Step 2: Restore to NEW database (no conflicts!) # Build restore command based on format if backup['backup_format'] == 'dump': # Restore from compressed custom format cmd = ['pg_restore', '-h', host, '-U', user, '-d', new_dbname] logger.info("📥 Restoring to %s: %s < %s", new_dbname, ' '.join(cmd), backup_path) with open(backup_path, 'rb') as f: result = subprocess.run(cmd, stdin=f, stderr=subprocess.PIPE, text=True, env=env) # pg_restore returns 1 even for warnings, check if there are real errors if result.returncode != 0: logger.warning("⚠️ pg_restore returned code %s", result.returncode) if result.stderr: logger.warning("pg_restore stderr: %s", result.stderr[:500]) # Check for real errors vs harmless config warnings stderr_lower = result.stderr.lower() if result.stderr else "" # Harmless errors to ignore harmless_errors = [ "transaction_timeout", # Config parameter that may not exist in all PG versions "idle_in_transaction_session_timeout" # Another version-specific parameter ] # Check if errors are only harmless ones is_harmless = any(err in stderr_lower for err in harmless_errors) has_real_errors = "error:" in stderr_lower and not all( err in stderr_lower for err in harmless_errors ) if has_real_errors and not is_harmless: logger.error("❌ pg_restore had REAL errors: %s", result.stderr[:1000]) # Try to drop the failed database subprocess.run(['psql', '-h', host, '-U', user, '-d', 'postgres', '-c', f"DROP DATABASE IF EXISTS {new_dbname};"], env=env) raise RuntimeError(f"pg_restore failed with errors") else: logger.info("✅ Restore completed (harmless config warnings ignored)") else: # Restore from plain SQL cmd = ['psql', '-h', host, '-U', user, '-d', new_dbname] logger.info("📥 Executing: %s < %s", ' '.join(cmd), backup_path) with open(backup_path, 'rb') as f: result = subprocess.run(cmd, stdin=f, stderr=subprocess.PIPE, text=True, env=env) if result.returncode != 0: logger.error("❌ psql stderr: %s", result.stderr) raise RuntimeError(f"psql failed with code {result.returncode}") # Release file lock fcntl.flock(lock_f.fileno(), fcntl.LOCK_UN) logger.info("✅ Database restore completed successfully to: %s", new_dbname) logger.info("🔧 NEXT STEPS:") logger.info(" 1. Update .env: DATABASE_URL=postgresql://%s:%s@%s:5432/%s", user, "***", host, new_dbname) logger.info(" 2. Restart: docker-compose restart api") logger.info(" 3. Test system thoroughly") logger.info(" 4. If OK, cleanup old database:") logger.info(" docker exec bmc-hub-postgres psql -U %s -d postgres -c 'DROP DATABASE %s;'", user, dbname) logger.info(" docker exec bmc-hub-postgres psql -U %s -d postgres -c 'ALTER DATABASE %s RENAME TO %s;'", user, new_dbname, dbname) logger.info(" 5. Revert .env and restart") # Store new database name in notification for user execute_insert( """INSERT INTO backup_notifications (backup_job_id, event_type, message) VALUES (%s, %s, %s) RETURNING id""", (job_id, 'backup_success', f'✅ Database restored to: {new_dbname}\n' f'Update .env: DATABASE_URL=postgresql://{user}:PASSWORD@{host}:5432/{new_dbname}') ) return True except Exception as e: logger.error("❌ Database restore failed: %s", str(e)) return False finally: # Disable maintenance mode await self.set_maintenance_mode(False) # TODO: Restart scheduler (will be implemented in scheduler.py) # Clean up lock file if lock_file.exists(): lock_file.unlink() async def restore_files(self, job_id: int) -> bool: """ Restore files from tar.gz backup Args: job_id: Backup job ID to restore from Returns: True if successful, False otherwise """ if settings.BACKUP_READ_ONLY: logger.error("❌ Restore blocked: BACKUP_READ_ONLY=true") return False if settings.BACKUP_RESTORE_DRY_RUN: logger.warning("🔄 DRY RUN MODE: Would restore files from backup job %s", job_id) logger.warning("🔄 Set BACKUP_RESTORE_DRY_RUN=false to actually restore") return False # Get backup job backup = execute_query_single( "SELECT * FROM backup_jobs WHERE id = %s AND job_type = 'files'", (job_id,)) if not backup: logger.error("❌ Backup job not found: %s", job_id) return False backup_path = Path(backup['file_path']) if not backup_path.exists(): logger.error("❌ Backup file not found: %s", backup_path) return False logger.info("🔄 Starting files restore from backup: %s", backup_path.name) try: # Verify checksum current_checksum = self._calculate_checksum(backup_path) if current_checksum != backup['checksum_sha256']: raise ValueError(f"Checksum mismatch! Expected {backup['checksum_sha256']}, got {current_checksum}") logger.info("✅ Checksum verified") # Acquire file lock lock_file = self.backup_dir / ".restore_files.lock" with open(lock_file, 'w') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) # Extract tar.gz to project root base_path = Path.cwd() with tarfile.open(backup_path, 'r:gz') as tar: # Extract all files, excluding backup directory members = [m for m in tar.getmembers() if 'backup' not in m.name] tar.extractall(path=base_path, members=members) # Release file lock fcntl.flock(f.fileno(), fcntl.LOCK_UN) logger.info("✅ Files restore completed successfully") return True except Exception as e: logger.error("❌ Files restore failed: %s", str(e)) return False finally: # Clean up lock file if lock_file.exists(): lock_file.unlink() async def upload_offsite(self, job_id: int) -> bool: """ Upload backup to offsite location via SFTP/SSH Args: job_id: Backup job ID to upload Returns: True if successful, False otherwise """ if not settings.OFFSITE_ENABLED: logger.info("⏭️ Offsite upload disabled") return False if settings.BACKUP_DRY_RUN: logger.info("🔄 DRY RUN: Would upload backup to offsite") return False # Get backup job backup = execute_query_single( "SELECT * FROM backup_jobs WHERE id = %s", (job_id,)) if not backup: logger.error("❌ Backup job not found: %s", job_id) return False if backup['offsite_uploaded_at']: logger.info("⏭️ Backup already uploaded to offsite: %s", job_id) return True backup_path = Path(backup['file_path']) if not backup_path.exists(): logger.error("❌ Backup file not found: %s", backup_path) return False logger.info("☁️ Starting offsite upload: %s to %s:%s", backup_path.name, settings.SFTP_HOST, settings.SFTP_REMOTE_PATH) try: # Connect via SFTP transport = paramiko.Transport((settings.SFTP_HOST, settings.SFTP_PORT)) if settings.SSH_KEY_PATH: # Use SSH key authentication private_key = paramiko.RSAKey.from_private_key_file(settings.SSH_KEY_PATH) transport.connect(username=settings.SFTP_USER, pkey=private_key) else: # Use password authentication transport.connect(username=settings.SFTP_USER, password=settings.SFTP_PASSWORD) sftp = paramiko.SFTPClient.from_transport(transport) # Create remote directory if needed remote_path = settings.SFTP_REMOTE_PATH if remote_path and remote_path not in ('.', '/', ''): logger.info("📁 Ensuring remote directory exists: %s", remote_path) self._ensure_remote_directory(sftp, remote_path) logger.info("✅ Remote directory ready") # Upload file remote_file = f"{remote_path}/{backup_path.name}" logger.info("📤 Uploading to: %s", remote_file) sftp.put(str(backup_path), remote_file) logger.info("✅ Upload completed") # Verify upload remote_stat = sftp.stat(remote_file) local_size = backup_path.stat().st_size if remote_stat.st_size != local_size: raise ValueError(f"Upload verification failed: remote size {remote_stat.st_size} != local size {local_size}") # Close connection sftp.close() transport.close() # Update job record execute_update( """UPDATE backup_jobs SET offsite_uploaded_at = %s, offsite_retry_count = 0 WHERE id = %s""", (datetime.now(), job_id) ) logger.info("✅ Offsite upload completed: %s", backup_path.name) return True except Exception as e: logger.error("❌ Offsite upload failed: %s", str(e)) # Increment retry count execute_update( """UPDATE backup_jobs SET offsite_retry_count = offsite_retry_count + 1 WHERE id = %s""", (job_id,) ) return False async def check_storage_usage(self) -> Dict[str, any]: """ Check backup storage usage and warn if exceeding threshold Returns: Dict with storage statistics """ total_size = 0 file_count = 0 for backup_file in self.backup_dir.rglob('*'): if backup_file.is_file() and not backup_file.name.startswith('.'): total_size += backup_file.stat().st_size file_count += 1 max_size_bytes = settings.BACKUP_MAX_SIZE_GB * 1024 * 1024 * 1024 usage_pct = (total_size / max_size_bytes) * 100 if max_size_bytes > 0 else 0 stats = { 'total_size_bytes': total_size, 'total_size_gb': total_size / 1024 / 1024 / 1024, 'max_size_gb': settings.BACKUP_MAX_SIZE_GB, 'usage_pct': usage_pct, 'file_count': file_count, 'warning': usage_pct >= settings.STORAGE_WARNING_THRESHOLD_PCT } if stats['warning']: logger.warning("⚠️ Backup storage usage high: %.1f%% (%.2f GB / %d GB)", usage_pct, stats['total_size_gb'], settings.BACKUP_MAX_SIZE_GB) # Log notification execute_insert( """INSERT INTO backup_notifications (event_type, message) VALUES (%s, %s) RETURNING id""", ('storage_low', f"Backup storage usage at {usage_pct:.1f}% ({stats['total_size_gb']:.2f} GB / {settings.BACKUP_MAX_SIZE_GB} GB)") ) return stats async def set_maintenance_mode(self, enabled: bool, message: str = None, eta_minutes: int = None): """ Enable or disable system maintenance mode Args: enabled: True to enable maintenance mode, False to disable message: Custom maintenance message eta_minutes: Estimated time to completion in minutes """ if message is None: message = "System under vedligeholdelse" if enabled else "" execute_update( """UPDATE system_status SET maintenance_mode = %s, maintenance_message = %s, maintenance_eta_minutes = %s, updated_at = %s WHERE id = 1""", (enabled, message, eta_minutes, datetime.now()) ) if enabled: logger.warning("🔧 Maintenance mode ENABLED: %s (ETA: %s min)", message, eta_minutes) else: logger.info("✅ Maintenance mode DISABLED") def _calculate_checksum(self, file_path: Path) -> str: """Calculate SHA256 checksum of file""" sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() def _ensure_remote_directory(self, sftp: paramiko.SFTPClient, path: str): """Create remote directory if it doesn't exist (recursive)""" # Skip if path is root or current directory if not path or path in ('.', '/', ''): return # Try to stat the directory try: sftp.stat(path) logger.info("✅ Directory exists: %s", path) return except FileNotFoundError: # Directory doesn't exist, create it try: # Try to create parent directory first parent = os.path.dirname(path) if parent and parent != path: self._ensure_remote_directory(sftp, parent) # Create this directory sftp.mkdir(path) logger.info("📁 Created remote directory: %s", path) except Exception as e: logger.warning("⚠️ Could not create directory %s: %s", path, str(e)) # Singleton instance backup_service = BackupService()