- Add SFTP upload support with paramiko
- Add database columns for offsite tracking (status, location, attempts, error)
- Add manual upload endpoint /api/v1/backups/offsite/{job_id}
- Add frontend button for offsite upload
- Add SFTP configuration in config.py
- Fix infinite loop in _ensure_remote_directory for relative paths
- Add upload verification and retry mechanism
- Add progress tracking and logging
556 lines
20 KiB
Python
556 lines
20 KiB
Python
"""
|
|
Backup System API Router
|
|
REST endpoints for backup management
|
|
"""
|
|
|
|
import logging
|
|
from typing import List, Optional
|
|
from datetime import datetime, date, timedelta
|
|
from pathlib import Path
|
|
from fastapi import APIRouter, HTTPException, Query, UploadFile, File
|
|
from pydantic import BaseModel, Field
|
|
|
|
from app.core.database import execute_query, execute_update, execute_insert, execute_query_single
|
|
from app.core.config import settings
|
|
from app.backups.backend.service import backup_service
|
|
from app.backups.backend.notifications import notifications
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# Pydantic Models
|
|
class BackupCreate(BaseModel):
|
|
"""Request model for creating a new backup"""
|
|
job_type: str = Field(..., description="Type of backup: database, files, or full")
|
|
is_monthly: bool = Field(False, description="Create monthly backup (uses SQL format)")
|
|
|
|
|
|
class BackupJob(BaseModel):
|
|
"""Response model for backup job"""
|
|
id: int
|
|
job_type: str
|
|
status: str
|
|
backup_format: str
|
|
file_path: Optional[str]
|
|
file_size_bytes: Optional[int]
|
|
checksum_sha256: Optional[str]
|
|
is_monthly: bool
|
|
includes_uploads: bool
|
|
includes_logs: bool
|
|
includes_data: bool
|
|
started_at: Optional[datetime]
|
|
completed_at: Optional[datetime]
|
|
error_message: Optional[str]
|
|
retention_until: Optional[date]
|
|
offsite_uploaded_at: Optional[datetime]
|
|
offsite_retry_count: int
|
|
notification_sent: bool
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
|
|
class RestoreRequest(BaseModel):
|
|
"""Request model for restoring from backup"""
|
|
confirmation: bool = Field(..., description="Must be true to confirm restore operation")
|
|
message: Optional[str] = Field(None, description="Optional restore reason/notes")
|
|
|
|
|
|
class MaintenanceStatus(BaseModel):
|
|
"""Response model for maintenance mode status"""
|
|
maintenance_mode: bool
|
|
maintenance_message: str
|
|
maintenance_eta_minutes: Optional[int]
|
|
updated_at: datetime
|
|
|
|
|
|
class NotificationRecord(BaseModel):
|
|
"""Response model for notification record"""
|
|
id: int
|
|
backup_job_id: Optional[int]
|
|
event_type: str
|
|
message: str
|
|
sent_at: datetime
|
|
acknowledged: bool
|
|
acknowledged_at: Optional[datetime]
|
|
|
|
|
|
class StorageStats(BaseModel):
|
|
"""Response model for storage statistics"""
|
|
total_size_bytes: int
|
|
total_size_gb: float
|
|
max_size_gb: int
|
|
usage_pct: float
|
|
file_count: int
|
|
warning: bool
|
|
|
|
|
|
# API Endpoints
|
|
|
|
@router.post("/backups", response_model=dict, tags=["Backups"])
|
|
async def create_backup(backup: BackupCreate):
|
|
"""
|
|
Create a new backup manually
|
|
|
|
- **job_type**: database, files, or full
|
|
- **is_monthly**: Use plain SQL format for database (monthly backups)
|
|
"""
|
|
if not settings.BACKUP_ENABLED:
|
|
raise HTTPException(status_code=503, detail="Backup system is disabled (BACKUP_ENABLED=false)")
|
|
|
|
logger.info("📦 Manual backup requested: type=%s, monthly=%s", backup.job_type, backup.is_monthly)
|
|
|
|
try:
|
|
if backup.job_type == 'database':
|
|
job_id = await backup_service.create_database_backup(is_monthly=backup.is_monthly)
|
|
if job_id:
|
|
return {"success": True, "job_id": job_id, "message": "Database backup created successfully"}
|
|
else:
|
|
raise HTTPException(status_code=500, detail="Database backup failed - check logs")
|
|
|
|
elif backup.job_type == 'files':
|
|
job_id = await backup_service.create_files_backup()
|
|
if job_id:
|
|
return {"success": True, "job_id": job_id, "message": "Files backup created successfully"}
|
|
else:
|
|
raise HTTPException(status_code=500, detail="Files backup failed - check logs")
|
|
|
|
elif backup.job_type == 'full':
|
|
db_job_id, files_job_id = await backup_service.create_full_backup(is_monthly=backup.is_monthly)
|
|
if db_job_id and files_job_id:
|
|
return {
|
|
"success": True,
|
|
"db_job_id": db_job_id,
|
|
"files_job_id": files_job_id,
|
|
"message": "Full backup created successfully"
|
|
}
|
|
else:
|
|
raise HTTPException(status_code=500, detail=f"Full backup partially failed: db={db_job_id}, files={files_job_id}")
|
|
|
|
else:
|
|
raise HTTPException(status_code=400, detail="Invalid job_type. Must be: database, files, or full")
|
|
|
|
except Exception as e:
|
|
logger.error("❌ Manual backup error: %s", str(e), exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/backups/jobs", response_model=List[BackupJob], tags=["Backups"])
|
|
async def list_backups(
|
|
status: Optional[str] = Query(None, description="Filter by status: pending, running, completed, failed"),
|
|
job_type: Optional[str] = Query(None, description="Filter by type: database, files, full"),
|
|
limit: int = Query(50, ge=1, le=500),
|
|
offset: int = Query(0, ge=0)
|
|
):
|
|
"""
|
|
List backup jobs with optional filtering and pagination
|
|
"""
|
|
# Build query
|
|
query = "SELECT * FROM backup_jobs WHERE 1=1"
|
|
params = []
|
|
|
|
if status:
|
|
query += " AND status = %s"
|
|
params.append(status)
|
|
|
|
if job_type:
|
|
query += " AND job_type = %s"
|
|
params.append(job_type)
|
|
|
|
query += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
|
|
params.extend([limit, offset])
|
|
|
|
backups = execute_query(query, tuple(params))
|
|
|
|
return backups if backups else []
|
|
|
|
|
|
@router.get("/backups/jobs/{job_id}", response_model=BackupJob, tags=["Backups"])
|
|
async def get_backup(job_id: int):
|
|
"""Get details of a specific backup job"""
|
|
backup = execute_query(
|
|
"SELECT * FROM backup_jobs WHERE id = %s",
|
|
(job_id,))
|
|
|
|
if not backup:
|
|
raise HTTPException(status_code=404, detail=f"Backup job {job_id} not found")
|
|
|
|
return backup
|
|
|
|
|
|
@router.post("/backups/upload", response_model=dict, tags=["Backups"])
|
|
async def upload_backup(
|
|
file: UploadFile = File(...),
|
|
backup_type: str = Query(..., description="Type: database or files"),
|
|
is_monthly: bool = Query(False, description="Mark as monthly backup")
|
|
):
|
|
"""
|
|
Upload a previously downloaded backup file
|
|
|
|
Validates file format and creates backup job record
|
|
"""
|
|
if settings.BACKUP_READ_ONLY:
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Upload blocked: BACKUP_READ_ONLY=true"
|
|
)
|
|
|
|
logger.info("📤 Backup upload: filename=%s, type=%s, size=%d bytes",
|
|
file.filename, backup_type, file.size if hasattr(file, 'size') else 0)
|
|
|
|
# Validate file type
|
|
allowed_extensions = {
|
|
'database': ['.dump', '.sql', '.sql.gz'],
|
|
'files': ['.tar.gz', '.tgz']
|
|
}
|
|
|
|
if backup_type not in allowed_extensions:
|
|
raise HTTPException(status_code=400, detail="Invalid backup_type. Must be: database or files")
|
|
|
|
file_ext = ''.join(Path(file.filename).suffixes)
|
|
if file_ext not in allowed_extensions[backup_type]:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid file extension '{file_ext}' for type '{backup_type}'. Allowed: {allowed_extensions[backup_type]}"
|
|
)
|
|
|
|
try:
|
|
# Determine storage path and format
|
|
backup_dir = Path(settings.BACKUP_STORAGE_PATH)
|
|
|
|
if backup_type == 'database':
|
|
target_dir = backup_dir / "database"
|
|
backup_format = 'dump' if file_ext == '.dump' else 'sql'
|
|
else:
|
|
target_dir = backup_dir / "files"
|
|
backup_format = 'tar.gz'
|
|
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Generate unique filename with timestamp
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
original_name = Path(file.filename).stem
|
|
new_filename = f"{original_name}_uploaded_{timestamp}{file_ext}"
|
|
target_path = target_dir / new_filename
|
|
|
|
# Save uploaded file
|
|
logger.info("💾 Saving upload to: %s", target_path)
|
|
|
|
content = await file.read()
|
|
with open(target_path, 'wb') as f:
|
|
f.write(content)
|
|
|
|
file_size = target_path.stat().st_size
|
|
|
|
# Calculate checksum
|
|
import hashlib
|
|
checksum = hashlib.sha256(content).hexdigest()
|
|
|
|
logger.info("✅ File saved: %d bytes, checksum=%s", file_size, checksum[:16])
|
|
|
|
# Calculate retention date
|
|
if is_monthly:
|
|
retention_until = datetime.now() + timedelta(days=settings.BACKUP_RETENTION_MONTHLY * 30)
|
|
else:
|
|
retention_until = datetime.now() + timedelta(days=settings.BACKUP_RETENTION_DAYS)
|
|
|
|
# Create backup job record
|
|
job_id = execute_insert(
|
|
"""INSERT INTO backup_jobs
|
|
(job_type, status, backup_format, file_path, file_size_bytes,
|
|
checksum_sha256, is_monthly, started_at, completed_at, retention_until)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id""",
|
|
(backup_type, 'completed', backup_format, str(target_path), file_size,
|
|
checksum, is_monthly, datetime.now(), datetime.now(), retention_until.date())
|
|
)
|
|
|
|
logger.info("✅ Backup upload registered: job_id=%s", job_id)
|
|
|
|
return {
|
|
"success": True,
|
|
"job_id": job_id,
|
|
"message": f"Backup uploaded successfully: {new_filename}",
|
|
"file_size_mb": round(file_size / 1024 / 1024, 2),
|
|
"checksum": checksum
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("❌ Upload failed: %s", str(e), exc_info=True)
|
|
# Clean up partial file
|
|
if target_path.exists():
|
|
target_path.unlink()
|
|
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
|
|
|
|
|
|
@router.post("/backups/restore/{job_id}", response_model=dict, tags=["Backups"])
|
|
async def restore_backup(job_id: int, request: RestoreRequest):
|
|
"""
|
|
Restore from a backup (database or files)
|
|
|
|
**WARNING**: This will enable maintenance mode and temporarily shut down the system
|
|
"""
|
|
if not request.confirmation:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Restore operation requires confirmation=true"
|
|
)
|
|
|
|
if settings.BACKUP_READ_ONLY:
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Restore blocked: BACKUP_READ_ONLY=true. Update configuration to enable restores."
|
|
)
|
|
|
|
# Get backup job
|
|
backup = execute_query_single(
|
|
"SELECT * FROM backup_jobs WHERE id = %s",
|
|
(job_id,))
|
|
|
|
if not backup:
|
|
raise HTTPException(status_code=404, detail=f"Backup job {job_id} not found")
|
|
|
|
if backup['status'] != 'completed':
|
|
raise HTTPException(status_code=400, detail=f"Cannot restore from backup with status: {backup['status']}")
|
|
|
|
logger.warning("🔧 Restore initiated: job_id=%s, type=%s, user_message=%s",
|
|
job_id, backup['job_type'], request.message)
|
|
|
|
# Check if DRY-RUN mode is enabled
|
|
if settings.BACKUP_RESTORE_DRY_RUN:
|
|
logger.warning("🔒 DRY RUN MODE: Restore test requested but not executed")
|
|
return {
|
|
"success": True,
|
|
"dry_run": True,
|
|
"message": "DRY-RUN mode: Restore was NOT executed. Set BACKUP_RESTORE_DRY_RUN=false to actually restore.",
|
|
"job_id": job_id,
|
|
"job_type": backup['job_type']
|
|
}
|
|
|
|
try:
|
|
# Send notification
|
|
await notifications.send_restore_started(
|
|
job_id=job_id,
|
|
backup_name=backup['file_path'].split('/')[-1],
|
|
eta_minutes=5
|
|
)
|
|
|
|
# Perform restore based on type
|
|
if backup['job_type'] == 'database':
|
|
success = await backup_service.restore_database(job_id)
|
|
if success:
|
|
# Get the new database name from logs (created with timestamp)
|
|
from datetime import datetime
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
new_dbname = f"bmc_hub_restored_{timestamp}"
|
|
|
|
# Parse current DATABASE_URL to get credentials
|
|
db_url = settings.DATABASE_URL
|
|
if '@' in db_url:
|
|
creds = db_url.split('@')[0].replace('postgresql://', '')
|
|
host_part = db_url.split('@')[1]
|
|
new_url = f"postgresql://{creds}@{host_part.split('/')[0]}/{new_dbname}"
|
|
else:
|
|
new_url = f"postgresql://bmc_hub:bmc_hub@postgres:5432/{new_dbname}"
|
|
|
|
logger.info("✅ Restore completed successfully: job_id=%s", job_id)
|
|
return {
|
|
"success": True,
|
|
"message": "Database restored to NEW database (safe!)",
|
|
"new_database": new_dbname,
|
|
"instructions": [
|
|
f"1. Update .env: DATABASE_URL={new_url}",
|
|
"2. Restart: docker-compose restart api",
|
|
"3. Test system thoroughly",
|
|
"4. If OK: Drop old DB, rename new DB to 'bmc_hub'",
|
|
"5. If NOT OK: Just revert .env and restart"
|
|
]
|
|
}
|
|
elif backup['job_type'] == 'files':
|
|
success = await backup_service.restore_files(job_id)
|
|
if success:
|
|
logger.info("✅ Files restore completed: job_id=%s", job_id)
|
|
return {"success": True, "message": "Files restore completed successfully"}
|
|
elif backup['job_type'] == 'full':
|
|
# Restore both database and files
|
|
db_success = await backup_service.restore_database(job_id)
|
|
files_success = await backup_service.restore_files(job_id)
|
|
success = db_success and files_success
|
|
if success:
|
|
logger.info("✅ Full restore completed: job_id=%s", job_id)
|
|
return {"success": True, "message": "Full restore completed - check logs for database name"}
|
|
else:
|
|
raise HTTPException(status_code=400, detail=f"Unknown backup type: {backup['job_type']}")
|
|
|
|
# If we get here, restore failed
|
|
logger.error("❌ Restore failed: job_id=%s", job_id)
|
|
raise HTTPException(status_code=500, detail="Restore operation failed - check logs")
|
|
|
|
except Exception as e:
|
|
logger.error("❌ Restore error: %s", str(e), exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.delete("/backups/jobs/{job_id}", response_model=dict, tags=["Backups"])
|
|
async def delete_backup(job_id: int):
|
|
"""
|
|
Delete a backup job and its associated file
|
|
"""
|
|
# Get backup job
|
|
backup = execute_query_single(
|
|
"SELECT * FROM backup_jobs WHERE id = %s",
|
|
(job_id,))
|
|
|
|
if not backup:
|
|
raise HTTPException(status_code=404, detail=f"Backup job {job_id} not found")
|
|
|
|
logger.info("🗑️ Deleting backup: job_id=%s, file=%s", job_id, backup['file_path'])
|
|
|
|
try:
|
|
# Delete file if exists
|
|
from pathlib import Path
|
|
if backup['file_path']:
|
|
file_path = Path(backup['file_path'])
|
|
if file_path.exists():
|
|
file_path.unlink()
|
|
logger.info("✅ Deleted backup file: %s", file_path.name)
|
|
|
|
# Delete database record
|
|
execute_update("DELETE FROM backup_jobs WHERE id = %s", (job_id,))
|
|
|
|
return {"success": True, "message": f"Backup {job_id} deleted successfully"}
|
|
|
|
except Exception as e:
|
|
logger.error("❌ Delete backup error: %s", str(e), exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/backups/offsite/{job_id}", response_model=dict, tags=["Backups"])
|
|
async def upload_offsite(job_id: int):
|
|
"""
|
|
Manually trigger offsite upload for a specific backup
|
|
"""
|
|
if not settings.OFFSITE_ENABLED:
|
|
raise HTTPException(status_code=503, detail="Offsite uploads are disabled (OFFSITE_ENABLED=false)")
|
|
|
|
logger.info("☁️ Manual offsite upload requested: job_id=%s", job_id)
|
|
|
|
try:
|
|
success = await backup_service.upload_offsite(job_id)
|
|
|
|
if success:
|
|
return {"success": True, "message": f"Backup {job_id} uploaded to offsite successfully"}
|
|
else:
|
|
raise HTTPException(status_code=500, detail="Offsite upload failed - check logs")
|
|
|
|
except Exception as e:
|
|
logger.error("❌ Manual offsite upload error: %s", str(e), exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/backups/maintenance", response_model=MaintenanceStatus, tags=["System"])
|
|
async def get_maintenance_status():
|
|
"""
|
|
Get current maintenance mode status
|
|
|
|
Used by frontend to display maintenance overlay
|
|
"""
|
|
status = execute_query_single(
|
|
"SELECT * FROM system_status WHERE id = 1")
|
|
|
|
if not status:
|
|
# Return default status if not found
|
|
return {
|
|
"maintenance_mode": False,
|
|
"maintenance_message": "",
|
|
"maintenance_eta_minutes": None,
|
|
"updated_at": datetime.now()
|
|
}
|
|
|
|
return status
|
|
|
|
|
|
@router.get("/backups/notifications", response_model=List[NotificationRecord], tags=["Backups"])
|
|
async def list_notifications(
|
|
acknowledged: Optional[bool] = Query(None, description="Filter by acknowledged status"),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
offset: int = Query(0, ge=0)
|
|
):
|
|
"""
|
|
List backup notifications (alerts, warnings, errors)
|
|
"""
|
|
query = "SELECT * FROM backup_notifications WHERE 1=1"
|
|
params = []
|
|
|
|
if acknowledged is not None:
|
|
query += " AND acknowledged = %s"
|
|
params.append(acknowledged)
|
|
|
|
query += " ORDER BY sent_at DESC LIMIT %s OFFSET %s"
|
|
params.extend([limit, offset])
|
|
|
|
notifications_list = execute_query(query, tuple(params))
|
|
|
|
return notifications_list if notifications_list else []
|
|
|
|
|
|
@router.post("/backups/notifications/{notification_id}/acknowledge", response_model=dict, tags=["Backups"])
|
|
async def acknowledge_notification(notification_id: int):
|
|
"""
|
|
Acknowledge a notification (mark as read)
|
|
"""
|
|
execute_update(
|
|
"""UPDATE backup_notifications
|
|
SET acknowledged = true, acknowledged_at = %s
|
|
WHERE id = %s""",
|
|
(datetime.now(), notification_id)
|
|
)
|
|
|
|
return {"success": True, "message": f"Notification {notification_id} acknowledged"}
|
|
|
|
|
|
@router.get("/backups/storage", response_model=StorageStats, tags=["System"])
|
|
async def get_storage_stats():
|
|
"""
|
|
Get backup storage usage statistics
|
|
"""
|
|
stats = await backup_service.check_storage_usage()
|
|
return stats
|
|
|
|
|
|
@router.get("/backups/scheduler/status", response_model=dict, tags=["System"])
|
|
async def get_scheduler_status():
|
|
"""
|
|
Get backup scheduler status and job information
|
|
"""
|
|
try:
|
|
from app.backups.backend.scheduler import backup_scheduler
|
|
|
|
if not backup_scheduler.running:
|
|
return {
|
|
"enabled": settings.BACKUP_ENABLED,
|
|
"running": False,
|
|
"message": "Backup scheduler is not running"
|
|
}
|
|
|
|
jobs = []
|
|
for job in backup_scheduler.scheduler.get_jobs():
|
|
jobs.append({
|
|
"id": job.id,
|
|
"name": job.name,
|
|
"next_run": job.next_run_time.isoformat() if job.next_run_time else None,
|
|
})
|
|
|
|
return {
|
|
"enabled": settings.BACKUP_ENABLED,
|
|
"running": backup_scheduler.running,
|
|
"jobs": jobs
|
|
}
|
|
except Exception as e:
|
|
logger.warning("Scheduler not available: %s", str(e))
|
|
return {
|
|
"enabled": settings.BACKUP_ENABLED,
|
|
"running": False,
|
|
"message": f"Scheduler error: {str(e)}"
|
|
}
|