- Add SFTP upload support with paramiko
- Add database columns for offsite tracking (status, location, attempts, error)
- Add manual upload endpoint /api/v1/backups/offsite/{job_id}
- Add frontend button for offsite upload
- Add SFTP configuration in config.py
- Fix infinite loop in _ensure_remote_directory for relative paths
- Add upload verification and retry mechanism
- Add progress tracking and logging
786 lines
32 KiB
Python
786 lines
32 KiB
Python
"""
|
|
Backup Service
|
|
Handles database and file backup operations, rotation, restore, and offsite uploads.
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
import hashlib
|
|
import tarfile
|
|
import subprocess
|
|
import fcntl
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, List, Tuple
|
|
import paramiko
|
|
from stat import S_ISDIR
|
|
|
|
from app.core.config import settings
|
|
from app.core.database import execute_query, execute_insert, execute_update, execute_query_single
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BackupService:
|
|
"""Service for managing backup operations"""
|
|
|
|
def __init__(self):
|
|
self.backup_dir = Path(settings.BACKUP_STORAGE_PATH)
|
|
self.backup_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Subdirectories for different backup types
|
|
self.db_dir = self.backup_dir / "database"
|
|
self.files_dir = self.backup_dir / "files"
|
|
self.db_dir.mkdir(exist_ok=True)
|
|
self.files_dir.mkdir(exist_ok=True)
|
|
|
|
async def create_database_backup(self, is_monthly: bool = False) -> Optional[int]:
|
|
"""
|
|
Create PostgreSQL database backup using pg_dump
|
|
|
|
Args:
|
|
is_monthly: If True, creates plain SQL backup for readability
|
|
|
|
Returns:
|
|
backup_job_id or None if failed
|
|
"""
|
|
if settings.BACKUP_DRY_RUN:
|
|
logger.info("🔄 DRY RUN: Would create database backup (monthly=%s)", is_monthly)
|
|
return None
|
|
|
|
# Determine format based on monthly flag
|
|
backup_format = settings.DB_MONTHLY_FORMAT if is_monthly else settings.DB_DAILY_FORMAT
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"db_{timestamp}_{'monthly' if is_monthly else 'daily'}.{backup_format}"
|
|
backup_path = self.db_dir / filename
|
|
|
|
# Create backup job record
|
|
job_id = execute_insert(
|
|
"""INSERT INTO backup_jobs (job_type, status, backup_format, is_monthly, started_at)
|
|
VALUES (%s, %s, %s, %s, %s) RETURNING id""",
|
|
('database', 'running', backup_format, is_monthly, datetime.now())
|
|
)
|
|
|
|
logger.info("🔄 Starting database backup: job_id=%s, format=%s, monthly=%s",
|
|
job_id, backup_format, is_monthly)
|
|
|
|
try:
|
|
# Build pg_dump command - connect via network to postgres service
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = settings.DATABASE_URL.split(':')[2].split('@')[0] # Extract password
|
|
|
|
# Parse database connection info from DATABASE_URL
|
|
# Format: postgresql://user:pass@host:port/dbname
|
|
db_parts = settings.DATABASE_URL.replace('postgresql://', '').split('@')
|
|
user_pass = db_parts[0].split(':')
|
|
host_db = db_parts[1].split('/')
|
|
|
|
user = user_pass[0]
|
|
password = user_pass[1] if len(user_pass) > 1 else ''
|
|
host = host_db[0].split(':')[0] if ':' in host_db[0] else host_db[0]
|
|
dbname = host_db[1] if len(host_db) > 1 else 'bmc_hub'
|
|
|
|
env['PGPASSWORD'] = password
|
|
|
|
if backup_format == 'dump':
|
|
# Compressed custom format (-Fc)
|
|
cmd = ['pg_dump', '-h', host, '-U', user, '-Fc', dbname]
|
|
else:
|
|
# Plain SQL format
|
|
cmd = ['pg_dump', '-h', host, '-U', user, dbname]
|
|
|
|
# Execute pg_dump and write to file
|
|
logger.info("📦 Executing: %s > %s", ' '.join(cmd), backup_path)
|
|
|
|
with open(backup_path, 'wb') as f:
|
|
result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, check=True, env=env)
|
|
|
|
# Calculate file size and checksum
|
|
file_size = backup_path.stat().st_size
|
|
checksum = self._calculate_checksum(backup_path)
|
|
|
|
# Calculate retention date
|
|
if is_monthly:
|
|
retention_until = datetime.now() + timedelta(days=settings.BACKUP_RETENTION_MONTHLY * 30)
|
|
else:
|
|
retention_until = datetime.now() + timedelta(days=settings.BACKUP_RETENTION_DAYS)
|
|
|
|
# Update job record
|
|
execute_update(
|
|
"""UPDATE backup_jobs
|
|
SET status = %s, completed_at = %s, file_path = %s,
|
|
file_size_bytes = %s, checksum_sha256 = %s, retention_until = %s
|
|
WHERE id = %s""",
|
|
('completed', datetime.now(), str(backup_path), file_size, checksum,
|
|
retention_until.date(), job_id)
|
|
)
|
|
|
|
logger.info("✅ Database backup completed: %s (%.2f MB)",
|
|
filename, file_size / 1024 / 1024)
|
|
|
|
return job_id
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
error_msg = e.stderr.decode() if e.stderr else str(e)
|
|
logger.error("❌ Database backup failed: %s", error_msg)
|
|
|
|
execute_update(
|
|
"""UPDATE backup_jobs
|
|
SET status = %s, completed_at = %s, error_message = %s
|
|
WHERE id = %s""",
|
|
('failed', datetime.now(), error_msg, job_id)
|
|
)
|
|
|
|
# Clean up partial backup file
|
|
if backup_path.exists():
|
|
backup_path.unlink()
|
|
|
|
return None
|
|
|
|
async def create_files_backup(self) -> Optional[int]:
|
|
"""
|
|
Create tar.gz backup of file directories (uploads/, data/, logs/)
|
|
|
|
Returns:
|
|
backup_job_id or None if failed
|
|
"""
|
|
if settings.BACKUP_DRY_RUN:
|
|
logger.info("🔄 DRY RUN: Would create files backup")
|
|
return None
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"files_{timestamp}.tar.gz"
|
|
backup_path = self.files_dir / filename
|
|
|
|
# Paths to backup (relative to project root)
|
|
base_path = Path.cwd()
|
|
paths_to_backup = []
|
|
|
|
if settings.BACKUP_INCLUDE_UPLOADS:
|
|
uploads_path = base_path / settings.UPLOAD_DIR
|
|
if uploads_path.exists():
|
|
paths_to_backup.append((uploads_path, 'uploads'))
|
|
|
|
if settings.BACKUP_INCLUDE_DATA:
|
|
data_path = base_path / 'data'
|
|
if data_path.exists():
|
|
paths_to_backup.append((data_path, 'data'))
|
|
|
|
if settings.BACKUP_INCLUDE_LOGS:
|
|
logs_path = base_path / 'logs'
|
|
if logs_path.exists():
|
|
paths_to_backup.append((logs_path, 'logs'))
|
|
|
|
if not paths_to_backup:
|
|
logger.warning("⚠️ No file directories to backup")
|
|
return None
|
|
|
|
# Create backup job record
|
|
job_id = execute_insert(
|
|
"""INSERT INTO backup_jobs
|
|
(job_type, status, backup_format, includes_uploads, includes_logs, includes_data, started_at)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id""",
|
|
('files', 'running', 'tar.gz',
|
|
settings.BACKUP_INCLUDE_UPLOADS,
|
|
settings.BACKUP_INCLUDE_LOGS,
|
|
settings.BACKUP_INCLUDE_DATA,
|
|
datetime.now())
|
|
)
|
|
|
|
logger.info("🔄 Starting files backup: job_id=%s, paths=%s",
|
|
job_id, [name for _, name in paths_to_backup])
|
|
|
|
try:
|
|
# Exclude patterns
|
|
exclude_patterns = [
|
|
'__pycache__',
|
|
'*.pyc',
|
|
'*.pyo',
|
|
'*.pyd',
|
|
'.DS_Store',
|
|
'.git',
|
|
'backup', # Don't backup the backup directory itself!
|
|
]
|
|
|
|
# Create tar.gz archive
|
|
with tarfile.open(backup_path, 'w:gz') as tar:
|
|
for path, arcname in paths_to_backup:
|
|
tar.add(
|
|
path,
|
|
arcname=arcname,
|
|
recursive=True,
|
|
filter=lambda ti: None if any(
|
|
pattern in ti.name for pattern in exclude_patterns
|
|
) else ti
|
|
)
|
|
|
|
# Calculate file size and checksum
|
|
file_size = backup_path.stat().st_size
|
|
checksum = self._calculate_checksum(backup_path)
|
|
|
|
# Calculate retention date (files use daily retention)
|
|
retention_until = datetime.now() + timedelta(days=settings.BACKUP_RETENTION_DAYS)
|
|
|
|
# Update job record
|
|
execute_update(
|
|
"""UPDATE backup_jobs
|
|
SET status = %s, completed_at = %s, file_path = %s,
|
|
file_size_bytes = %s, checksum_sha256 = %s, retention_until = %s
|
|
WHERE id = %s""",
|
|
('completed', datetime.now(), str(backup_path), file_size, checksum,
|
|
retention_until.date(), job_id)
|
|
)
|
|
|
|
logger.info("✅ Files backup completed: %s (%.2f MB)",
|
|
filename, file_size / 1024 / 1024)
|
|
|
|
return job_id
|
|
|
|
except Exception as e:
|
|
logger.error("❌ Files backup failed: %s", str(e))
|
|
|
|
execute_update(
|
|
"""UPDATE backup_jobs
|
|
SET status = %s, completed_at = %s, error_message = %s
|
|
WHERE id = %s""",
|
|
('failed', datetime.now(), str(e), job_id)
|
|
)
|
|
|
|
# Clean up partial backup file
|
|
if backup_path.exists():
|
|
backup_path.unlink()
|
|
|
|
return None
|
|
|
|
async def create_full_backup(self, is_monthly: bool = False) -> Tuple[Optional[int], Optional[int]]:
|
|
"""
|
|
Create full backup (database + files)
|
|
|
|
Returns:
|
|
(db_job_id, files_job_id) tuple
|
|
"""
|
|
logger.info("🔄 Starting full backup (database + files)")
|
|
|
|
db_job_id = await self.create_database_backup(is_monthly=is_monthly)
|
|
files_job_id = await self.create_files_backup()
|
|
|
|
if db_job_id and files_job_id:
|
|
logger.info("✅ Full backup completed: db=%s, files=%s", db_job_id, files_job_id)
|
|
else:
|
|
logger.warning("⚠️ Full backup partially failed: db=%s, files=%s",
|
|
db_job_id, files_job_id)
|
|
|
|
return (db_job_id, files_job_id)
|
|
|
|
async def rotate_backups(self):
|
|
"""
|
|
Remove old backups based on retention policy:
|
|
- Daily backups: Keep for RETENTION_DAYS (default 30 days)
|
|
- Monthly backups: Keep for MONTHLY_KEEP_MONTHS (default 12 months)
|
|
"""
|
|
if settings.BACKUP_DRY_RUN:
|
|
logger.info("🔄 DRY RUN: Would rotate backups")
|
|
return
|
|
|
|
logger.info("🔄 Starting backup rotation")
|
|
|
|
# Find expired backups
|
|
expired_backups = execute_query_single(
|
|
"""SELECT id, file_path, is_monthly, retention_until
|
|
FROM backup_jobs
|
|
WHERE status = 'completed'
|
|
AND retention_until < CURRENT_DATE
|
|
ORDER BY retention_until ASC"""
|
|
)
|
|
|
|
deleted_count = 0
|
|
freed_bytes = 0
|
|
|
|
for backup in expired_backups:
|
|
file_path = Path(backup['file_path'])
|
|
|
|
if file_path.exists():
|
|
file_size = file_path.stat().st_size
|
|
file_path.unlink()
|
|
freed_bytes += file_size
|
|
logger.info("🗑️ Deleted expired backup: %s (%.2f MB, retention_until=%s)",
|
|
file_path.name, file_size / 1024 / 1024, backup['retention_until'])
|
|
|
|
# Delete from database
|
|
execute_update("DELETE FROM backup_jobs WHERE id = %s", (backup['id'],))
|
|
deleted_count += 1
|
|
|
|
if deleted_count > 0:
|
|
logger.info("✅ Rotation complete: deleted %d backups, freed %.2f MB",
|
|
deleted_count, freed_bytes / 1024 / 1024)
|
|
else:
|
|
logger.info("✅ Rotation complete: no expired backups")
|
|
|
|
async def restore_database(self, job_id: int) -> bool:
|
|
"""
|
|
Restore database from backup to NEW database with timestamp suffix
|
|
|
|
Strategy:
|
|
1. Create new database: bmc_hub_restored_YYYYMMDD_HHMMSS
|
|
2. Restore backup to NEW database (no conflicts!)
|
|
3. Return new database name in response
|
|
4. User updates .env to point to new database
|
|
5. Test system, then cleanup old database
|
|
|
|
Args:
|
|
job_id: Backup job ID to restore from
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
if settings.BACKUP_READ_ONLY:
|
|
logger.error("❌ Restore blocked: BACKUP_READ_ONLY=true")
|
|
return False
|
|
if settings.BACKUP_RESTORE_DRY_RUN:
|
|
logger.warning("🔄 DRY RUN MODE: Would restore database from backup job %s", job_id)
|
|
logger.warning("🔄 Set BACKUP_RESTORE_DRY_RUN=false to actually restore")
|
|
return False
|
|
# Get backup job
|
|
backup = execute_query_single(
|
|
"SELECT * FROM backup_jobs WHERE id = %s AND job_type = 'database'",
|
|
(job_id,))
|
|
|
|
if not backup:
|
|
logger.error("❌ Backup job not found: %s", job_id)
|
|
return False
|
|
|
|
backup_path = Path(backup['file_path'])
|
|
|
|
if not backup_path.exists():
|
|
logger.error("❌ Backup file not found: %s", backup_path)
|
|
return False
|
|
|
|
# Generate new database name with timestamp
|
|
from datetime import datetime
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
new_dbname = f"bmc_hub_restored_{timestamp}"
|
|
|
|
logger.info("🔄 Starting database restore from backup: %s", backup_path.name)
|
|
logger.info("🎯 Target: NEW database '%s' (safe restore!)", new_dbname)
|
|
|
|
# Enable maintenance mode
|
|
await self.set_maintenance_mode(True, "Database restore i gang", eta_minutes=5)
|
|
|
|
# TODO: Stop scheduler (will be implemented in scheduler.py)
|
|
|
|
try:
|
|
# Verify checksum
|
|
current_checksum = self._calculate_checksum(backup_path)
|
|
if current_checksum != backup['checksum_sha256']:
|
|
raise ValueError(f"Checksum mismatch! Expected {backup['checksum_sha256']}, got {current_checksum}")
|
|
|
|
logger.info("✅ Checksum verified")
|
|
|
|
# Acquire file lock to prevent concurrent operations
|
|
lock_file = self.backup_dir / ".restore.lock"
|
|
with open(lock_file, 'w') as lock_f:
|
|
fcntl.flock(lock_f.fileno(), fcntl.LOCK_EX)
|
|
|
|
# Parse database connection info
|
|
env = os.environ.copy()
|
|
db_parts = settings.DATABASE_URL.replace('postgresql://', '').split('@')
|
|
user_pass = db_parts[0].split(':')
|
|
host_db = db_parts[1].split('/')
|
|
|
|
user = user_pass[0]
|
|
password = user_pass[1] if len(user_pass) > 1 else ''
|
|
host = host_db[0].split(':')[0] if ':' in host_db[0] else host_db[0]
|
|
dbname = host_db[1] if len(host_db) > 1 else 'bmc_hub'
|
|
|
|
env['PGPASSWORD'] = password
|
|
|
|
# Step 1: Create new empty database
|
|
logger.info("📦 Creating new database: %s", new_dbname)
|
|
create_cmd = ['psql', '-h', host, '-U', user, '-d', 'postgres', '-c',
|
|
f"CREATE DATABASE {new_dbname} OWNER {user};"]
|
|
result = subprocess.run(create_cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
text=True, env=env)
|
|
|
|
if result.returncode != 0:
|
|
logger.error("❌ Failed to create database: %s", result.stderr)
|
|
fcntl.flock(lock_f.fileno(), fcntl.LOCK_UN)
|
|
raise RuntimeError(f"CREATE DATABASE failed: {result.stderr}")
|
|
|
|
logger.info("✅ New database created: %s", new_dbname)
|
|
|
|
# Step 2: Restore to NEW database (no conflicts!)
|
|
# Build restore command based on format
|
|
if backup['backup_format'] == 'dump':
|
|
# Restore from compressed custom format
|
|
cmd = ['pg_restore', '-h', host, '-U', user, '-d', new_dbname]
|
|
|
|
logger.info("📥 Restoring to %s: %s < %s", new_dbname, ' '.join(cmd), backup_path)
|
|
|
|
with open(backup_path, 'rb') as f:
|
|
result = subprocess.run(cmd, stdin=f, stderr=subprocess.PIPE, text=True, env=env)
|
|
|
|
# pg_restore returns 1 even for warnings, check if there are real errors
|
|
if result.returncode != 0:
|
|
logger.warning("⚠️ pg_restore returned code %s", result.returncode)
|
|
if result.stderr:
|
|
logger.warning("pg_restore stderr: %s", result.stderr[:500])
|
|
|
|
# Check for real errors vs harmless config warnings
|
|
stderr_lower = result.stderr.lower() if result.stderr else ""
|
|
|
|
# Harmless errors to ignore
|
|
harmless_errors = [
|
|
"transaction_timeout", # Config parameter that may not exist in all PG versions
|
|
"idle_in_transaction_session_timeout" # Another version-specific parameter
|
|
]
|
|
|
|
# Check if errors are only harmless ones
|
|
is_harmless = any(err in stderr_lower for err in harmless_errors)
|
|
has_real_errors = "error:" in stderr_lower and not all(
|
|
err in stderr_lower for err in harmless_errors
|
|
)
|
|
|
|
if has_real_errors and not is_harmless:
|
|
logger.error("❌ pg_restore had REAL errors: %s", result.stderr[:1000])
|
|
# Try to drop the failed database
|
|
subprocess.run(['psql', '-h', host, '-U', user, '-d', 'postgres', '-c',
|
|
f"DROP DATABASE IF EXISTS {new_dbname};"], env=env)
|
|
raise RuntimeError(f"pg_restore failed with errors")
|
|
else:
|
|
logger.info("✅ Restore completed (harmless config warnings ignored)")
|
|
|
|
else:
|
|
# Restore from plain SQL
|
|
cmd = ['psql', '-h', host, '-U', user, '-d', new_dbname]
|
|
|
|
logger.info("📥 Executing: %s < %s", ' '.join(cmd), backup_path)
|
|
|
|
with open(backup_path, 'rb') as f:
|
|
result = subprocess.run(cmd, stdin=f, stderr=subprocess.PIPE, text=True, env=env)
|
|
|
|
if result.returncode != 0:
|
|
logger.error("❌ psql stderr: %s", result.stderr)
|
|
raise RuntimeError(f"psql failed with code {result.returncode}")
|
|
|
|
# Release file lock
|
|
fcntl.flock(lock_f.fileno(), fcntl.LOCK_UN)
|
|
|
|
logger.info("✅ Database restore completed successfully to: %s", new_dbname)
|
|
logger.info("🔧 NEXT STEPS:")
|
|
logger.info(" 1. Update .env: DATABASE_URL=postgresql://%s:%s@%s:5432/%s",
|
|
user, "***", host, new_dbname)
|
|
logger.info(" 2. Restart: docker-compose restart api")
|
|
logger.info(" 3. Test system thoroughly")
|
|
logger.info(" 4. If OK, cleanup old database:")
|
|
logger.info(" docker exec bmc-hub-postgres psql -U %s -d postgres -c 'DROP DATABASE %s;'",
|
|
user, dbname)
|
|
logger.info(" docker exec bmc-hub-postgres psql -U %s -d postgres -c 'ALTER DATABASE %s RENAME TO %s;'",
|
|
user, new_dbname, dbname)
|
|
logger.info(" 5. Revert .env and restart")
|
|
|
|
# Store new database name in notification for user
|
|
execute_insert(
|
|
"""INSERT INTO backup_notifications (backup_job_id, event_type, message)
|
|
VALUES (%s, %s, %s) RETURNING id""",
|
|
(job_id, 'backup_success',
|
|
f'✅ Database restored to: {new_dbname}\n'
|
|
f'Update .env: DATABASE_URL=postgresql://{user}:PASSWORD@{host}:5432/{new_dbname}')
|
|
)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("❌ Database restore failed: %s", str(e))
|
|
return False
|
|
|
|
finally:
|
|
# Disable maintenance mode
|
|
await self.set_maintenance_mode(False)
|
|
|
|
# TODO: Restart scheduler (will be implemented in scheduler.py)
|
|
|
|
# Clean up lock file
|
|
if lock_file.exists():
|
|
lock_file.unlink()
|
|
|
|
async def restore_files(self, job_id: int) -> bool:
|
|
"""
|
|
Restore files from tar.gz backup
|
|
|
|
Args:
|
|
job_id: Backup job ID to restore from
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
if settings.BACKUP_READ_ONLY:
|
|
logger.error("❌ Restore blocked: BACKUP_READ_ONLY=true")
|
|
return False
|
|
|
|
if settings.BACKUP_RESTORE_DRY_RUN:
|
|
logger.warning("🔄 DRY RUN MODE: Would restore files from backup job %s", job_id)
|
|
logger.warning("🔄 Set BACKUP_RESTORE_DRY_RUN=false to actually restore")
|
|
return False
|
|
|
|
# Get backup job
|
|
backup = execute_query_single(
|
|
"SELECT * FROM backup_jobs WHERE id = %s AND job_type = 'files'",
|
|
(job_id,))
|
|
|
|
if not backup:
|
|
logger.error("❌ Backup job not found: %s", job_id)
|
|
return False
|
|
|
|
backup_path = Path(backup['file_path'])
|
|
|
|
if not backup_path.exists():
|
|
logger.error("❌ Backup file not found: %s", backup_path)
|
|
return False
|
|
|
|
logger.info("🔄 Starting files restore from backup: %s", backup_path.name)
|
|
|
|
try:
|
|
# Verify checksum
|
|
current_checksum = self._calculate_checksum(backup_path)
|
|
if current_checksum != backup['checksum_sha256']:
|
|
raise ValueError(f"Checksum mismatch! Expected {backup['checksum_sha256']}, got {current_checksum}")
|
|
|
|
logger.info("✅ Checksum verified")
|
|
|
|
# Acquire file lock
|
|
lock_file = self.backup_dir / ".restore_files.lock"
|
|
with open(lock_file, 'w') as f:
|
|
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
|
|
|
|
# Extract tar.gz to project root
|
|
base_path = Path.cwd()
|
|
|
|
with tarfile.open(backup_path, 'r:gz') as tar:
|
|
# Extract all files, excluding backup directory
|
|
members = [m for m in tar.getmembers() if 'backup' not in m.name]
|
|
tar.extractall(path=base_path, members=members)
|
|
|
|
# Release file lock
|
|
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
|
|
|
|
logger.info("✅ Files restore completed successfully")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("❌ Files restore failed: %s", str(e))
|
|
return False
|
|
|
|
finally:
|
|
# Clean up lock file
|
|
if lock_file.exists():
|
|
lock_file.unlink()
|
|
|
|
async def upload_offsite(self, job_id: int) -> bool:
|
|
"""
|
|
Upload backup to offsite location via SFTP/SSH
|
|
|
|
Args:
|
|
job_id: Backup job ID to upload
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
if not settings.OFFSITE_ENABLED:
|
|
logger.info("⏭️ Offsite upload disabled")
|
|
return False
|
|
|
|
if settings.BACKUP_DRY_RUN:
|
|
logger.info("🔄 DRY RUN: Would upload backup to offsite")
|
|
return False
|
|
|
|
# Get backup job
|
|
backup = execute_query_single(
|
|
"SELECT * FROM backup_jobs WHERE id = %s",
|
|
(job_id,))
|
|
|
|
if not backup:
|
|
logger.error("❌ Backup job not found: %s", job_id)
|
|
return False
|
|
|
|
if backup['offsite_uploaded_at']:
|
|
logger.info("⏭️ Backup already uploaded to offsite: %s", job_id)
|
|
return True
|
|
|
|
backup_path = Path(backup['file_path'])
|
|
|
|
if not backup_path.exists():
|
|
logger.error("❌ Backup file not found: %s", backup_path)
|
|
return False
|
|
|
|
logger.info("☁️ Starting offsite upload: %s to %s:%s",
|
|
backup_path.name, settings.SFTP_HOST, settings.SFTP_REMOTE_PATH)
|
|
|
|
try:
|
|
# Connect via SFTP
|
|
transport = paramiko.Transport((settings.SFTP_HOST, settings.SFTP_PORT))
|
|
|
|
if settings.SSH_KEY_PATH:
|
|
# Use SSH key authentication
|
|
private_key = paramiko.RSAKey.from_private_key_file(settings.SSH_KEY_PATH)
|
|
transport.connect(username=settings.SFTP_USER, pkey=private_key)
|
|
else:
|
|
# Use password authentication
|
|
transport.connect(username=settings.SFTP_USER, password=settings.SFTP_PASSWORD)
|
|
|
|
sftp = paramiko.SFTPClient.from_transport(transport)
|
|
|
|
# Create remote directory if needed
|
|
remote_path = settings.SFTP_REMOTE_PATH
|
|
if remote_path and remote_path not in ('.', '/', ''):
|
|
logger.info("📁 Ensuring remote directory exists: %s", remote_path)
|
|
self._ensure_remote_directory(sftp, remote_path)
|
|
logger.info("✅ Remote directory ready")
|
|
|
|
# Upload file
|
|
remote_file = f"{remote_path}/{backup_path.name}"
|
|
logger.info("📤 Uploading to: %s", remote_file)
|
|
sftp.put(str(backup_path), remote_file)
|
|
logger.info("✅ Upload completed")
|
|
|
|
# Verify upload
|
|
remote_stat = sftp.stat(remote_file)
|
|
local_size = backup_path.stat().st_size
|
|
|
|
if remote_stat.st_size != local_size:
|
|
raise ValueError(f"Upload verification failed: remote size {remote_stat.st_size} != local size {local_size}")
|
|
|
|
# Close connection
|
|
sftp.close()
|
|
transport.close()
|
|
|
|
# Update job record
|
|
execute_update(
|
|
"""UPDATE backup_jobs
|
|
SET offsite_uploaded_at = %s, offsite_retry_count = 0
|
|
WHERE id = %s""",
|
|
(datetime.now(), job_id)
|
|
)
|
|
|
|
logger.info("✅ Offsite upload completed: %s", backup_path.name)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("❌ Offsite upload failed: %s", str(e))
|
|
|
|
# Increment retry count
|
|
execute_update(
|
|
"""UPDATE backup_jobs
|
|
SET offsite_retry_count = offsite_retry_count + 1
|
|
WHERE id = %s""",
|
|
(job_id,)
|
|
)
|
|
|
|
return False
|
|
|
|
async def check_storage_usage(self) -> Dict[str, any]:
|
|
"""
|
|
Check backup storage usage and warn if exceeding threshold
|
|
|
|
Returns:
|
|
Dict with storage statistics
|
|
"""
|
|
total_size = 0
|
|
file_count = 0
|
|
|
|
for backup_file in self.backup_dir.rglob('*'):
|
|
if backup_file.is_file() and not backup_file.name.startswith('.'):
|
|
total_size += backup_file.stat().st_size
|
|
file_count += 1
|
|
|
|
max_size_bytes = settings.BACKUP_MAX_SIZE_GB * 1024 * 1024 * 1024
|
|
usage_pct = (total_size / max_size_bytes) * 100 if max_size_bytes > 0 else 0
|
|
|
|
stats = {
|
|
'total_size_bytes': total_size,
|
|
'total_size_gb': total_size / 1024 / 1024 / 1024,
|
|
'max_size_gb': settings.BACKUP_MAX_SIZE_GB,
|
|
'usage_pct': usage_pct,
|
|
'file_count': file_count,
|
|
'warning': usage_pct >= settings.STORAGE_WARNING_THRESHOLD_PCT
|
|
}
|
|
|
|
if stats['warning']:
|
|
logger.warning("⚠️ Backup storage usage high: %.1f%% (%.2f GB / %d GB)",
|
|
usage_pct, stats['total_size_gb'], settings.BACKUP_MAX_SIZE_GB)
|
|
|
|
# Log notification
|
|
execute_insert(
|
|
"""INSERT INTO backup_notifications (event_type, message)
|
|
VALUES (%s, %s) RETURNING id""",
|
|
('storage_low',
|
|
f"Backup storage usage at {usage_pct:.1f}% ({stats['total_size_gb']:.2f} GB / {settings.BACKUP_MAX_SIZE_GB} GB)")
|
|
)
|
|
|
|
return stats
|
|
|
|
async def set_maintenance_mode(self, enabled: bool, message: str = None, eta_minutes: int = None):
|
|
"""
|
|
Enable or disable system maintenance mode
|
|
|
|
Args:
|
|
enabled: True to enable maintenance mode, False to disable
|
|
message: Custom maintenance message
|
|
eta_minutes: Estimated time to completion in minutes
|
|
"""
|
|
if message is None:
|
|
message = "System under vedligeholdelse" if enabled else ""
|
|
|
|
execute_update(
|
|
"""UPDATE system_status
|
|
SET maintenance_mode = %s, maintenance_message = %s,
|
|
maintenance_eta_minutes = %s, updated_at = %s
|
|
WHERE id = 1""",
|
|
(enabled, message, eta_minutes, datetime.now())
|
|
)
|
|
|
|
if enabled:
|
|
logger.warning("🔧 Maintenance mode ENABLED: %s (ETA: %s min)", message, eta_minutes)
|
|
else:
|
|
logger.info("✅ Maintenance mode DISABLED")
|
|
|
|
def _calculate_checksum(self, file_path: Path) -> str:
|
|
"""Calculate SHA256 checksum of file"""
|
|
sha256_hash = hashlib.sha256()
|
|
|
|
with open(file_path, "rb") as f:
|
|
for byte_block in iter(lambda: f.read(4096), b""):
|
|
sha256_hash.update(byte_block)
|
|
|
|
return sha256_hash.hexdigest()
|
|
|
|
def _ensure_remote_directory(self, sftp: paramiko.SFTPClient, path: str):
|
|
"""Create remote directory if it doesn't exist (recursive)"""
|
|
# Skip if path is root or current directory
|
|
if not path or path in ('.', '/', ''):
|
|
return
|
|
|
|
# Try to stat the directory
|
|
try:
|
|
sftp.stat(path)
|
|
logger.info("✅ Directory exists: %s", path)
|
|
return
|
|
except FileNotFoundError:
|
|
# Directory doesn't exist, create it
|
|
try:
|
|
# Try to create parent directory first
|
|
parent = os.path.dirname(path)
|
|
if parent and parent != path:
|
|
self._ensure_remote_directory(sftp, parent)
|
|
|
|
# Create this directory
|
|
sftp.mkdir(path)
|
|
logger.info("📁 Created remote directory: %s", path)
|
|
except Exception as e:
|
|
logger.warning("⚠️ Could not create directory %s: %s", path, str(e))
|
|
|
|
|
|
# Singleton instance
|
|
backup_service = BackupService()
|