diff --git a/.env.example b/.env.example index f24f769..5e124c5 100644 --- a/.env.example +++ b/.env.example @@ -74,6 +74,9 @@ TIMETRACKING_ROUND_INCREMENT=0.5 # Afrundingsinterval (0.25, 0.5, 1.0) TIMETRACKING_ROUND_METHOD=up # up (op til), nearest (nærmeste), down (ned til) TIMETRACKING_REQUIRE_APPROVAL=true # Kræv manuel godkendelse +# Order Management Security +TIMETRACKING_ADMIN_UNLOCK_CODE= # 🔐 Admin kode til at låse eksporterede ordrer op (sæt en stærk kode!) + # ===================================================== # OLLAMA AI Integration (Optional - for document extraction) # ===================================================== diff --git a/Dockerfile b/Dockerfile index fac1eb7..85839ee 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,6 +8,7 @@ RUN apt-get update && apt-get install -y \ git \ libpq-dev \ gcc \ + postgresql-client \ tesseract-ocr \ tesseract-ocr-dan \ tesseract-ocr-eng \ diff --git a/app/backups/__init__.py b/app/backups/__init__.py new file mode 100644 index 0000000..cfa5645 --- /dev/null +++ b/app/backups/__init__.py @@ -0,0 +1,10 @@ +""" +Backup System Module for BMC Hub + +Provides automated backup and restore functionality for: +- PostgreSQL database (compressed .dump + plain .sql) +- File storage (uploads/, data/, logs/) +- Automated rotation (30 days + monthly for 12 months) +- Offsite upload via SFTP/SSH +- Mattermost notifications +""" diff --git a/app/backups/backend/__init__.py b/app/backups/backend/__init__.py new file mode 100644 index 0000000..a135cce --- /dev/null +++ b/app/backups/backend/__init__.py @@ -0,0 +1 @@ +"""Backup backend services, API routes, and scheduler.""" diff --git a/app/backups/backend/notifications.py b/app/backups/backend/notifications.py new file mode 100644 index 0000000..d15c995 --- /dev/null +++ b/app/backups/backend/notifications.py @@ -0,0 +1,394 @@ +""" +Notification Service for Backup System +Sends rich formatted notifications to Mattermost webhook +""" + +import logging +import aiohttp +from datetime import datetime +from typing import Dict, Optional, List +from app.core.config import settings +from app.core.database import execute_insert + +logger = logging.getLogger(__name__) + + +class MattermostNotification: + """Service for sending Mattermost webhook notifications""" + + def __init__(self): + self.webhook_url = settings.MATTERMOST_WEBHOOK_URL + self.enabled = settings.MATTERMOST_ENABLED + self.channel = settings.MATTERMOST_CHANNEL + + async def send_backup_success(self, job_id: int, job_type: str, file_size_bytes: int, + duration_seconds: float, checksum: str, is_monthly: bool = False): + """ + Send success notification for completed backup + + Args: + job_id: Backup job ID + job_type: database, files, or full + file_size_bytes: Size of backup file + duration_seconds: Time taken to complete backup + checksum: SHA256 checksum of backup + is_monthly: Whether this is a monthly backup + """ + if not self._should_send_notification('backup_success'): + return + + file_size_mb = file_size_bytes / 1024 / 1024 + + payload = { + "channel": self.channel, + "username": "BMC Hub Backup", + "icon_emoji": ":white_check_mark:", + "text": f"✅ **Backup Complete** - {job_type.capitalize()}", + "attachments": [{ + "color": "#36a64f", # Green + "fields": [ + { + "short": True, + "title": "Job ID", + "value": f"#{job_id}" + }, + { + "short": True, + "title": "Type", + "value": f"{job_type.capitalize()} {'(Monthly)' if is_monthly else '(Daily)'}" + }, + { + "short": True, + "title": "Size", + "value": f"{file_size_mb:.2f} MB" + }, + { + "short": True, + "title": "Duration", + "value": f"{duration_seconds:.1f}s" + }, + { + "short": False, + "title": "Checksum (SHA256)", + "value": f"`{checksum[:16]}...{checksum[-16:]}`" + } + ], + "footer": "BMC Hub Backup System", + "footer_icon": "https://platform.slack-edge.com/img/default_application_icon.png", + "ts": int(datetime.now().timestamp()) + }] + } + + await self._send_webhook(payload, 'backup_success', job_id) + + async def send_backup_failed(self, job_id: int, job_type: str, error_message: str): + """ + Send failure notification for failed backup + + Args: + job_id: Backup job ID + job_type: database, files, or full + error_message: Error message from failed backup + """ + if not self._should_send_notification('backup_failed'): + return + + payload = { + "channel": self.channel, + "username": "BMC Hub Backup", + "icon_emoji": ":x:", + "text": f"❌ **Backup Failed** - {job_type.capitalize()}", + "attachments": [{ + "color": "#ff0000", # Red + "fields": [ + { + "short": True, + "title": "Job ID", + "value": f"#{job_id}" + }, + { + "short": True, + "title": "Type", + "value": job_type.capitalize() + }, + { + "short": False, + "title": "Error", + "value": f"```{error_message[:500]}```" + } + ], + "actions": [ + { + "name": "view_dashboard", + "type": "button", + "text": "View Dashboard", + "url": f"{self._get_hub_url()}/backups" + } + ], + "footer": "BMC Hub Backup System", + "ts": int(datetime.now().timestamp()) + }] + } + + await self._send_webhook(payload, 'backup_failed', job_id) + + async def send_offsite_success(self, job_id: int, backup_name: str, file_size_bytes: int): + """ + Send notification for successful offsite upload + + Args: + job_id: Backup job ID + backup_name: Name of uploaded backup file + file_size_bytes: Size of uploaded file + """ + if not settings.NOTIFY_ON_SUCCESS_OFFSITE: + return + + file_size_mb = file_size_bytes / 1024 / 1024 + + payload = { + "channel": self.channel, + "username": "BMC Hub Backup", + "icon_emoji": ":cloud:", + "text": f"☁️ **Offsite Upload Complete**", + "attachments": [{ + "color": "#0066cc", # Blue + "fields": [ + { + "short": True, + "title": "Job ID", + "value": f"#{job_id}" + }, + { + "short": True, + "title": "Destination", + "value": f"{settings.SFTP_HOST}:{settings.SFTP_REMOTE_PATH}" + }, + { + "short": True, + "title": "Filename", + "value": backup_name + }, + { + "short": True, + "title": "Size", + "value": f"{file_size_mb:.2f} MB" + } + ], + "footer": "BMC Hub Backup System", + "ts": int(datetime.now().timestamp()) + }] + } + + await self._send_webhook(payload, 'backup_success', job_id) + + async def send_offsite_failed(self, job_id: int, backup_name: str, error_message: str, + retry_count: int): + """ + Send notification for failed offsite upload + + Args: + job_id: Backup job ID + backup_name: Name of backup file + error_message: Error message + retry_count: Current retry attempt number + """ + if not self._should_send_notification('offsite_failed'): + return + + max_retries = settings.OFFSITE_RETRY_MAX_ATTEMPTS + + payload = { + "channel": self.channel, + "username": "BMC Hub Backup", + "icon_emoji": ":warning:", + "text": f"⚠️ **Offsite Upload Failed** (Retry {retry_count}/{max_retries})", + "attachments": [{ + "color": "#ff9900", # Orange + "fields": [ + { + "short": True, + "title": "Job ID", + "value": f"#{job_id}" + }, + { + "short": True, + "title": "Destination", + "value": f"{settings.SFTP_HOST}:{settings.SFTP_REMOTE_PATH}" + }, + { + "short": True, + "title": "Filename", + "value": backup_name + }, + { + "short": True, + "title": "Retry Status", + "value": f"{retry_count}/{max_retries}" + }, + { + "short": False, + "title": "Error", + "value": f"```{error_message[:300]}```" + } + ], + "footer": "BMC Hub Backup System", + "ts": int(datetime.now().timestamp()) + }] + } + + await self._send_webhook(payload, 'offsite_failed', job_id) + + async def send_storage_warning(self, usage_pct: float, used_gb: float, max_gb: int): + """ + Send notification for high storage usage + + Args: + usage_pct: Percentage of storage used + used_gb: Gigabytes used + max_gb: Maximum storage capacity + """ + if not self._should_send_notification('storage_low'): + return + + payload = { + "channel": self.channel, + "username": "BMC Hub Backup", + "icon_emoji": ":warning:", + "text": f"⚠️ **Backup Storage Warning**", + "attachments": [{ + "color": "#ff9900", # Orange + "fields": [ + { + "short": True, + "title": "Usage", + "value": f"{usage_pct:.1f}%" + }, + { + "short": True, + "title": "Space Used", + "value": f"{used_gb:.2f} GB / {max_gb} GB" + }, + { + "short": False, + "title": "Recommendation", + "value": "Consider running backup rotation or increasing storage capacity." + } + ], + "actions": [ + { + "name": "view_dashboard", + "type": "button", + "text": "View Dashboard", + "url": f"{self._get_hub_url()}/backups" + } + ], + "footer": "BMC Hub Backup System", + "ts": int(datetime.now().timestamp()) + }] + } + + await self._send_webhook(payload, 'storage_low') + + async def send_restore_started(self, job_id: int, backup_name: str, eta_minutes: int): + """ + Send notification when restore operation starts + + Args: + job_id: Backup job ID being restored + backup_name: Name of backup file + eta_minutes: Estimated time to completion + """ + payload = { + "channel": self.channel, + "username": "BMC Hub Backup", + "icon_emoji": ":gear:", + "text": f"🔧 **System Maintenance: Restore in Progress**", + "attachments": [{ + "color": "#ffcc00", # Yellow + "fields": [ + { + "short": True, + "title": "Backup Job ID", + "value": f"#{job_id}" + }, + { + "short": True, + "title": "ETA", + "value": f"{eta_minutes} minutes" + }, + { + "short": False, + "title": "Backup File", + "value": backup_name + }, + { + "short": False, + "title": "Status", + "value": "System is in maintenance mode. All services temporarily unavailable." + } + ], + "footer": "BMC Hub Backup System", + "ts": int(datetime.now().timestamp()) + }] + } + + await self._send_webhook(payload, 'restore_started', job_id) + + async def _send_webhook(self, payload: Dict, event_type: str, job_id: Optional[int] = None): + """ + Send webhook to Mattermost and log notification + + Args: + payload: Mattermost webhook payload + event_type: Type of notification event + job_id: Optional backup job ID + """ + if not self.enabled or not self.webhook_url: + logger.info("📢 Notification (disabled): %s - job_id=%s", event_type, job_id) + return + + try: + async with aiohttp.ClientSession() as session: + async with session.post(self.webhook_url, json=payload, timeout=10) as response: + if response.status == 200: + logger.info("📢 Notification sent: %s - job_id=%s", event_type, job_id) + + # Log to database + execute_insert( + """INSERT INTO backup_notifications + (backup_job_id, event_type, message, mattermost_payload) + VALUES (%s, %s, %s, %s)""", + (job_id, event_type, payload.get('text', ''), str(payload)) + ) + else: + error_text = await response.text() + logger.error("❌ Notification failed: HTTP %s - %s", + response.status, error_text) + + except aiohttp.ClientError as e: + logger.error("❌ Notification connection error: %s", str(e)) + except Exception as e: + logger.error("❌ Notification error: %s", str(e)) + + def _should_send_notification(self, event_type: str) -> bool: + """Check if notification should be sent based on settings""" + if not self.enabled or not self.webhook_url: + return False + + if event_type in ['backup_failed', 'offsite_failed', 'storage_low']: + return settings.NOTIFY_ON_FAILURE + + if event_type == 'backup_success' and 'offsite' in event_type: + return settings.NOTIFY_ON_SUCCESS_OFFSITE + + return True + + def _get_hub_url(self) -> str: + """Get BMC Hub base URL for action buttons""" + # TODO: Add HUB_BASE_URL to config + return "http://localhost:8000" # Fallback + + +# Singleton instance +notifications = MattermostNotification() diff --git a/app/backups/backend/router.py b/app/backups/backend/router.py new file mode 100644 index 0000000..3e94a91 --- /dev/null +++ b/app/backups/backend/router.py @@ -0,0 +1,513 @@ +""" +Backup System API Router +REST endpoints for backup management +""" + +import logging +from typing import List, Optional +from datetime import datetime, date, timedelta +from pathlib import Path +from fastapi import APIRouter, HTTPException, Query, UploadFile, File +from pydantic import BaseModel, Field + +from app.core.database import execute_query, execute_update, execute_insert +from app.core.config import settings +from app.backups.backend.service import backup_service +from app.backups.backend.notifications import notifications + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +# Pydantic Models +class BackupCreate(BaseModel): + """Request model for creating a new backup""" + job_type: str = Field(..., description="Type of backup: database, files, or full") + is_monthly: bool = Field(False, description="Create monthly backup (uses SQL format)") + + +class BackupJob(BaseModel): + """Response model for backup job""" + id: int + job_type: str + status: str + backup_format: str + file_path: Optional[str] + file_size_bytes: Optional[int] + checksum_sha256: Optional[str] + is_monthly: bool + includes_uploads: bool + includes_logs: bool + includes_data: bool + started_at: Optional[datetime] + completed_at: Optional[datetime] + error_message: Optional[str] + retention_until: Optional[date] + offsite_uploaded_at: Optional[datetime] + offsite_retry_count: int + notification_sent: bool + created_at: datetime + updated_at: datetime + + +class RestoreRequest(BaseModel): + """Request model for restoring from backup""" + confirmation: bool = Field(..., description="Must be true to confirm restore operation") + message: Optional[str] = Field(None, description="Optional restore reason/notes") + + +class MaintenanceStatus(BaseModel): + """Response model for maintenance mode status""" + maintenance_mode: bool + maintenance_message: str + maintenance_eta_minutes: Optional[int] + updated_at: datetime + + +class NotificationRecord(BaseModel): + """Response model for notification record""" + id: int + backup_job_id: Optional[int] + event_type: str + message: str + sent_at: datetime + acknowledged: bool + acknowledged_at: Optional[datetime] + + +class StorageStats(BaseModel): + """Response model for storage statistics""" + total_size_bytes: int + total_size_gb: float + max_size_gb: int + usage_pct: float + file_count: int + warning: bool + + +# API Endpoints + +@router.post("/backups", response_model=dict, tags=["Backups"]) +async def create_backup(backup: BackupCreate): + """ + Create a new backup manually + + - **job_type**: database, files, or full + - **is_monthly**: Use plain SQL format for database (monthly backups) + """ + if not settings.BACKUP_ENABLED: + raise HTTPException(status_code=503, detail="Backup system is disabled (BACKUP_ENABLED=false)") + + logger.info("📦 Manual backup requested: type=%s, monthly=%s", backup.job_type, backup.is_monthly) + + try: + if backup.job_type == 'database': + job_id = await backup_service.create_database_backup(is_monthly=backup.is_monthly) + if job_id: + return {"success": True, "job_id": job_id, "message": "Database backup created successfully"} + else: + raise HTTPException(status_code=500, detail="Database backup failed - check logs") + + elif backup.job_type == 'files': + job_id = await backup_service.create_files_backup() + if job_id: + return {"success": True, "job_id": job_id, "message": "Files backup created successfully"} + else: + raise HTTPException(status_code=500, detail="Files backup failed - check logs") + + elif backup.job_type == 'full': + db_job_id, files_job_id = await backup_service.create_full_backup(is_monthly=backup.is_monthly) + if db_job_id and files_job_id: + return { + "success": True, + "db_job_id": db_job_id, + "files_job_id": files_job_id, + "message": "Full backup created successfully" + } + else: + raise HTTPException(status_code=500, detail=f"Full backup partially failed: db={db_job_id}, files={files_job_id}") + + else: + raise HTTPException(status_code=400, detail="Invalid job_type. Must be: database, files, or full") + + except Exception as e: + logger.error("❌ Manual backup error: %s", str(e), exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/backups/jobs", response_model=List[BackupJob], tags=["Backups"]) +async def list_backups( + status: Optional[str] = Query(None, description="Filter by status: pending, running, completed, failed"), + job_type: Optional[str] = Query(None, description="Filter by type: database, files, full"), + limit: int = Query(50, ge=1, le=500), + offset: int = Query(0, ge=0) +): + """ + List backup jobs with optional filtering and pagination + """ + # Build query + query = "SELECT * FROM backup_jobs WHERE 1=1" + params = [] + + if status: + query += " AND status = %s" + params.append(status) + + if job_type: + query += " AND job_type = %s" + params.append(job_type) + + query += " ORDER BY created_at DESC LIMIT %s OFFSET %s" + params.extend([limit, offset]) + + backups = execute_query(query, tuple(params)) + + return backups if backups else [] + + +@router.get("/backups/jobs/{job_id}", response_model=BackupJob, tags=["Backups"]) +async def get_backup(job_id: int): + """Get details of a specific backup job""" + backup = execute_query( + "SELECT * FROM backup_jobs WHERE id = %s", + (job_id,), + fetchone=True + ) + + if not backup: + raise HTTPException(status_code=404, detail=f"Backup job {job_id} not found") + + return backup + + +@router.post("/backups/upload", response_model=dict, tags=["Backups"]) +async def upload_backup( + file: UploadFile = File(...), + backup_type: str = Query(..., description="Type: database or files"), + is_monthly: bool = Query(False, description="Mark as monthly backup") +): + """ + Upload a previously downloaded backup file + + Validates file format and creates backup job record + """ + if settings.BACKUP_READ_ONLY: + raise HTTPException( + status_code=403, + detail="Upload blocked: BACKUP_READ_ONLY=true" + ) + + logger.info("📤 Backup upload: filename=%s, type=%s, size=%d bytes", + file.filename, backup_type, file.size if hasattr(file, 'size') else 0) + + # Validate file type + allowed_extensions = { + 'database': ['.dump', '.sql', '.sql.gz'], + 'files': ['.tar.gz', '.tgz'] + } + + if backup_type not in allowed_extensions: + raise HTTPException(status_code=400, detail="Invalid backup_type. Must be: database or files") + + file_ext = ''.join(Path(file.filename).suffixes) + if file_ext not in allowed_extensions[backup_type]: + raise HTTPException( + status_code=400, + detail=f"Invalid file extension '{file_ext}' for type '{backup_type}'. Allowed: {allowed_extensions[backup_type]}" + ) + + try: + # Determine storage path and format + backup_dir = Path(settings.BACKUP_STORAGE_PATH) + + if backup_type == 'database': + target_dir = backup_dir / "database" + backup_format = 'dump' if file_ext == '.dump' else 'sql' + else: + target_dir = backup_dir / "files" + backup_format = 'tar.gz' + + target_dir.mkdir(parents=True, exist_ok=True) + + # Generate unique filename with timestamp + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + original_name = Path(file.filename).stem + new_filename = f"{original_name}_uploaded_{timestamp}{file_ext}" + target_path = target_dir / new_filename + + # Save uploaded file + logger.info("💾 Saving upload to: %s", target_path) + + content = await file.read() + with open(target_path, 'wb') as f: + f.write(content) + + file_size = target_path.stat().st_size + + # Calculate checksum + import hashlib + checksum = hashlib.sha256(content).hexdigest() + + logger.info("✅ File saved: %d bytes, checksum=%s", file_size, checksum[:16]) + + # Calculate retention date + if is_monthly: + retention_until = datetime.now() + timedelta(days=settings.MONTHLY_KEEP_MONTHS * 30) + else: + retention_until = datetime.now() + timedelta(days=settings.RETENTION_DAYS) + + # Create backup job record + job_id = execute_insert( + """INSERT INTO backup_jobs + (job_type, status, backup_format, file_path, file_size_bytes, + checksum_sha256, is_monthly, started_at, completed_at, retention_until) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""", + (backup_type, 'completed', backup_format, str(target_path), file_size, + checksum, is_monthly, datetime.now(), datetime.now(), retention_until.date()) + ) + + logger.info("✅ Backup upload registered: job_id=%s", job_id) + + return { + "success": True, + "job_id": job_id, + "message": f"Backup uploaded successfully: {new_filename}", + "file_size_mb": round(file_size / 1024 / 1024, 2), + "checksum": checksum + } + + except Exception as e: + logger.error("❌ Upload failed: %s", str(e), exc_info=True) + # Clean up partial file + if target_path.exists(): + target_path.unlink() + raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}") + + +@router.post("/backups/restore/{job_id}", response_model=dict, tags=["Backups"]) +async def restore_backup(job_id: int, request: RestoreRequest): + """ + Restore from a backup (database or files) + + **WARNING**: This will enable maintenance mode and temporarily shut down the system + """ + if not request.confirmation: + raise HTTPException( + status_code=400, + detail="Restore operation requires confirmation=true" + ) + + if settings.BACKUP_READ_ONLY: + raise HTTPException( + status_code=403, + detail="Restore blocked: BACKUP_READ_ONLY=true. Update configuration to enable restores." + ) + + # Get backup job + backup = execute_query( + "SELECT * FROM backup_jobs WHERE id = %s", + (job_id,), + fetchone=True + ) + + if not backup: + raise HTTPException(status_code=404, detail=f"Backup job {job_id} not found") + + if backup['status'] != 'completed': + raise HTTPException(status_code=400, detail=f"Cannot restore from backup with status: {backup['status']}") + + logger.warning("🔧 Restore initiated: job_id=%s, type=%s, user_message=%s", + job_id, backup['job_type'], request.message) + + try: + # Send notification + await notifications.send_restore_started( + job_id=job_id, + backup_name=backup['file_path'].split('/')[-1], + eta_minutes=5 + ) + + # Perform restore based on type + if backup['job_type'] == 'database': + success = await backup_service.restore_database(job_id) + elif backup['job_type'] == 'files': + success = await backup_service.restore_files(job_id) + elif backup['job_type'] == 'full': + # Restore both database and files + db_success = await backup_service.restore_database(job_id) + files_success = await backup_service.restore_files(job_id) + success = db_success and files_success + else: + raise HTTPException(status_code=400, detail=f"Unknown backup type: {backup['job_type']}") + + if success: + logger.info("✅ Restore completed successfully: job_id=%s", job_id) + return {"success": True, "message": "Restore completed successfully"} + else: + logger.error("❌ Restore failed: job_id=%s", job_id) + raise HTTPException(status_code=500, detail="Restore operation failed - check logs") + + except Exception as e: + logger.error("❌ Restore error: %s", str(e), exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@router.delete("/backups/jobs/{job_id}", response_model=dict, tags=["Backups"]) +async def delete_backup(job_id: int): + """ + Delete a backup job and its associated file + """ + # Get backup job + backup = execute_query( + "SELECT * FROM backup_jobs WHERE id = %s", + (job_id,), + fetchone=True + ) + + if not backup: + raise HTTPException(status_code=404, detail=f"Backup job {job_id} not found") + + logger.info("🗑️ Deleting backup: job_id=%s, file=%s", job_id, backup['file_path']) + + try: + # Delete file if exists + from pathlib import Path + if backup['file_path']: + file_path = Path(backup['file_path']) + if file_path.exists(): + file_path.unlink() + logger.info("✅ Deleted backup file: %s", file_path.name) + + # Delete database record + execute_update("DELETE FROM backup_jobs WHERE id = %s", (job_id,)) + + return {"success": True, "message": f"Backup {job_id} deleted successfully"} + + except Exception as e: + logger.error("❌ Delete backup error: %s", str(e), exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/backups/offsite/{job_id}", response_model=dict, tags=["Backups"]) +async def upload_offsite(job_id: int): + """ + Manually trigger offsite upload for a specific backup + """ + if not settings.OFFSITE_ENABLED: + raise HTTPException(status_code=503, detail="Offsite uploads are disabled (OFFSITE_ENABLED=false)") + + logger.info("☁️ Manual offsite upload requested: job_id=%s", job_id) + + try: + success = await backup_service.upload_offsite(job_id) + + if success: + return {"success": True, "message": f"Backup {job_id} uploaded to offsite successfully"} + else: + raise HTTPException(status_code=500, detail="Offsite upload failed - check logs") + + except Exception as e: + logger.error("❌ Manual offsite upload error: %s", str(e), exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/backups/maintenance", response_model=MaintenanceStatus, tags=["System"]) +async def get_maintenance_status(): + """ + Get current maintenance mode status + + Used by frontend to display maintenance overlay + """ + status = execute_query( + "SELECT * FROM system_status WHERE id = 1", + fetchone=True + ) + + if not status: + # Return default status if not found + return { + "maintenance_mode": False, + "maintenance_message": "", + "maintenance_eta_minutes": None, + "updated_at": datetime.now() + } + + return status + + +@router.get("/backups/notifications", response_model=List[NotificationRecord], tags=["Backups"]) +async def list_notifications( + acknowledged: Optional[bool] = Query(None, description="Filter by acknowledged status"), + limit: int = Query(50, ge=1, le=200), + offset: int = Query(0, ge=0) +): + """ + List backup notifications (alerts, warnings, errors) + """ + query = "SELECT * FROM backup_notifications WHERE 1=1" + params = [] + + if acknowledged is not None: + query += " AND acknowledged = %s" + params.append(acknowledged) + + query += " ORDER BY sent_at DESC LIMIT %s OFFSET %s" + params.extend([limit, offset]) + + notifications_list = execute_query(query, tuple(params)) + + return notifications_list if notifications_list else [] + + +@router.post("/backups/notifications/{notification_id}/acknowledge", response_model=dict, tags=["Backups"]) +async def acknowledge_notification(notification_id: int): + """ + Acknowledge a notification (mark as read) + """ + execute_update( + """UPDATE backup_notifications + SET acknowledged = true, acknowledged_at = %s + WHERE id = %s""", + (datetime.now(), notification_id) + ) + + return {"success": True, "message": f"Notification {notification_id} acknowledged"} + + +@router.get("/backups/storage", response_model=StorageStats, tags=["System"]) +async def get_storage_stats(): + """ + Get backup storage usage statistics + """ + stats = await backup_service.check_storage_usage() + return stats + + +@router.get("/backups/scheduler/status", response_model=dict, tags=["System"]) +async def get_scheduler_status(): + """ + Get backup scheduler status and job information + """ + from app.backups.backend.scheduler import backup_scheduler + + if not backup_scheduler.running: + return { + "enabled": settings.BACKUP_ENABLED, + "running": False, + "message": "Backup scheduler is not running" + } + + jobs = [] + for job in backup_scheduler.scheduler.get_jobs(): + jobs.append({ + "id": job.id, + "name": job.name, + "next_run": job.next_run_time.isoformat() if job.next_run_time else None, + }) + + return { + "enabled": settings.BACKUP_ENABLED, + "running": backup_scheduler.running, + "jobs": jobs + } diff --git a/app/backups/backend/scheduler.py b/app/backups/backend/scheduler.py new file mode 100644 index 0000000..b654182 --- /dev/null +++ b/app/backups/backend/scheduler.py @@ -0,0 +1,401 @@ +""" +Backup Scheduler +Manages scheduled backup jobs, rotation, offsite uploads, and retry logic +""" + +import logging +from datetime import datetime, time +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger + +from app.core.config import settings +from app.core.database import execute_query +from app.backups.backend.service import backup_service +from app.backups.backend.notifications import notifications + +logger = logging.getLogger(__name__) + + +class BackupScheduler: + """Scheduler for automated backup operations""" + + def __init__(self): + self.scheduler = AsyncIOScheduler() + self.enabled = settings.BACKUP_ENABLED + self.running = False + + def start(self): + """Start the backup scheduler with all jobs""" + if not self.enabled: + logger.info("⏭️ Backup scheduler disabled (BACKUP_ENABLED=false)") + return + + if self.running: + logger.warning("⚠️ Backup scheduler already running") + return + + logger.info("🚀 Starting backup scheduler...") + + # Daily full backup at 02:00 CET + self.scheduler.add_job( + func=self._daily_backup_job, + trigger=CronTrigger(hour=2, minute=0), + id='daily_backup', + name='Daily Full Backup', + max_instances=1, + replace_existing=True + ) + logger.info("✅ Scheduled: Daily backup at 02:00") + + # Monthly backup on the 1st at 02:00 CET + self.scheduler.add_job( + func=self._monthly_backup_job, + trigger=CronTrigger(day=1, hour=2, minute=0), + id='monthly_backup', + name='Monthly Full Backup (SQL format)', + max_instances=1, + replace_existing=True + ) + logger.info("✅ Scheduled: Monthly backup on 1st at 02:00") + + # Weekly offsite upload (configurable day, default Sunday at 03:00) + offsite_day = self._get_weekday_number(settings.OFFSITE_WEEKLY_DAY) + self.scheduler.add_job( + func=self._offsite_upload_job, + trigger=CronTrigger(day_of_week=offsite_day, hour=3, minute=0), + id='offsite_upload', + name=f'Weekly Offsite Upload ({settings.OFFSITE_WEEKLY_DAY.capitalize()})', + max_instances=1, + replace_existing=True + ) + logger.info("✅ Scheduled: Weekly offsite upload on %s at 03:00", + settings.OFFSITE_WEEKLY_DAY.capitalize()) + + # Offsite retry job (every hour) + self.scheduler.add_job( + func=self._offsite_retry_job, + trigger=IntervalTrigger(hours=settings.OFFSITE_RETRY_DELAY_HOURS), + id='offsite_retry', + name='Offsite Upload Retry', + max_instances=1, + replace_existing=True + ) + logger.info("✅ Scheduled: Offsite retry every %d hour(s)", + settings.OFFSITE_RETRY_DELAY_HOURS) + + # Backup rotation (daily at 01:00) + self.scheduler.add_job( + func=self._rotation_job, + trigger=CronTrigger(hour=1, minute=0), + id='backup_rotation', + name='Backup Rotation', + max_instances=1, + replace_existing=True + ) + logger.info("✅ Scheduled: Backup rotation at 01:00") + + # Storage check (daily at 01:30) + self.scheduler.add_job( + func=self._storage_check_job, + trigger=CronTrigger(hour=1, minute=30), + id='storage_check', + name='Storage Usage Check', + max_instances=1, + replace_existing=True + ) + logger.info("✅ Scheduled: Storage check at 01:30") + + # Start the scheduler + self.scheduler.start() + self.running = True + + logger.info("✅ Backup scheduler started successfully") + + def stop(self): + """Stop the backup scheduler""" + if not self.running: + return + + logger.info("🛑 Stopping backup scheduler...") + self.scheduler.shutdown(wait=True) + self.running = False + logger.info("✅ Backup scheduler stopped") + + def pause(self): + """Pause all scheduled jobs (for maintenance)""" + if not self.running: + return + + logger.info("⏸️ Pausing backup scheduler...") + self.scheduler.pause() + logger.info("✅ Backup scheduler paused") + + def resume(self): + """Resume all scheduled jobs""" + if not self.running: + return + + logger.info("▶️ Resuming backup scheduler...") + self.scheduler.resume() + logger.info("✅ Backup scheduler resumed") + + async def _daily_backup_job(self): + """Daily full backup job (database + files)""" + logger.info("🔄 Starting daily backup job...") + + try: + start_time = datetime.now() + + # Create full backup (database uses compressed .dump format) + db_job_id, files_job_id = await backup_service.create_full_backup(is_monthly=False) + + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + + if db_job_id and files_job_id: + logger.info("✅ Daily backup completed: db=%s, files=%s (%.1fs)", + db_job_id, files_job_id, duration) + + # Send success notification for database backup + db_backup = execute_query( + "SELECT * FROM backup_jobs WHERE id = %s", + (db_job_id,), + fetchone=True + ) + + if db_backup: + await notifications.send_backup_success( + job_id=db_job_id, + job_type='database', + file_size_bytes=db_backup['file_size_bytes'], + duration_seconds=duration / 2, # Rough estimate + checksum=db_backup['checksum_sha256'], + is_monthly=False + ) + else: + logger.error("❌ Daily backup failed: db=%s, files=%s", db_job_id, files_job_id) + + # Send failure notification + if not db_job_id: + await notifications.send_backup_failed( + job_id=0, + job_type='database', + error_message='Database backup failed - see logs for details' + ) + + if not files_job_id: + await notifications.send_backup_failed( + job_id=0, + job_type='files', + error_message='Files backup failed - see logs for details' + ) + + except Exception as e: + logger.error("❌ Daily backup job error: %s", str(e), exc_info=True) + await notifications.send_backup_failed( + job_id=0, + job_type='full', + error_message=str(e) + ) + + async def _monthly_backup_job(self): + """Monthly full backup job (database uses plain SQL format)""" + logger.info("🔄 Starting monthly backup job...") + + try: + start_time = datetime.now() + + # Create full backup with is_monthly=True (uses plain SQL format) + db_job_id, files_job_id = await backup_service.create_full_backup(is_monthly=True) + + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + + if db_job_id and files_job_id: + logger.info("✅ Monthly backup completed: db=%s, files=%s (%.1fs)", + db_job_id, files_job_id, duration) + + # Send success notification for database backup + db_backup = execute_query( + "SELECT * FROM backup_jobs WHERE id = %s", + (db_job_id,), + fetchone=True + ) + + if db_backup: + await notifications.send_backup_success( + job_id=db_job_id, + job_type='database', + file_size_bytes=db_backup['file_size_bytes'], + duration_seconds=duration / 2, + checksum=db_backup['checksum_sha256'], + is_monthly=True + ) + else: + logger.error("❌ Monthly backup failed: db=%s, files=%s", db_job_id, files_job_id) + + await notifications.send_backup_failed( + job_id=0, + job_type='monthly', + error_message='Monthly backup failed - see logs for details' + ) + + except Exception as e: + logger.error("❌ Monthly backup job error: %s", str(e), exc_info=True) + await notifications.send_backup_failed( + job_id=0, + job_type='monthly', + error_message=str(e) + ) + + async def _offsite_upload_job(self): + """Weekly offsite upload job - uploads all backups not yet uploaded""" + if not settings.OFFSITE_ENABLED: + logger.info("⏭️ Offsite upload skipped (OFFSITE_ENABLED=false)") + return + + logger.info("☁️ Starting weekly offsite upload job...") + + try: + # Find all completed backups not yet uploaded + pending_backups = execute_query( + """SELECT * FROM backup_jobs + WHERE status = 'completed' + AND offsite_uploaded_at IS NULL + AND offsite_retry_count < %s + ORDER BY completed_at ASC""", + (settings.OFFSITE_RETRY_MAX_ATTEMPTS,) + ) + + if not pending_backups: + logger.info("✅ No pending backups for offsite upload") + return + + logger.info("📦 Found %d backups pending offsite upload", len(pending_backups)) + + success_count = 0 + fail_count = 0 + + for backup in pending_backups: + success = await backup_service.upload_offsite(backup['id']) + + if success: + success_count += 1 + + # Send success notification + await notifications.send_offsite_success( + job_id=backup['id'], + backup_name=backup['file_path'].split('/')[-1], + file_size_bytes=backup['file_size_bytes'] + ) + else: + fail_count += 1 + + # Get updated retry count + updated_backup = execute_query( + "SELECT offsite_retry_count FROM backup_jobs WHERE id = %s", + (backup['id'],), + fetchone=True + ) + + # Send failure notification + await notifications.send_offsite_failed( + job_id=backup['id'], + backup_name=backup['file_path'].split('/')[-1], + error_message='Offsite upload failed - will retry', + retry_count=updated_backup['offsite_retry_count'] + ) + + logger.info("✅ Offsite upload job completed: %d success, %d failed", + success_count, fail_count) + + except Exception as e: + logger.error("❌ Offsite upload job error: %s", str(e), exc_info=True) + + async def _offsite_retry_job(self): + """Retry failed offsite uploads""" + if not settings.OFFSITE_ENABLED: + return + + # Find backups that failed offsite upload and haven't exceeded retry limit + retry_backups = execute_query( + """SELECT * FROM backup_jobs + WHERE status = 'completed' + AND offsite_uploaded_at IS NULL + AND offsite_retry_count > 0 + AND offsite_retry_count < %s + ORDER BY offsite_retry_count ASC, completed_at ASC + LIMIT 5""", # Limit to 5 retries per run + (settings.OFFSITE_RETRY_MAX_ATTEMPTS,) + ) + + if not retry_backups: + return + + logger.info("🔁 Retrying %d failed offsite uploads...", len(retry_backups)) + + for backup in retry_backups: + logger.info("🔁 Retry attempt %d/%d for backup %s", + backup['offsite_retry_count'] + 1, + settings.OFFSITE_RETRY_MAX_ATTEMPTS, + backup['id']) + + success = await backup_service.upload_offsite(backup['id']) + + if success: + logger.info("✅ Offsite upload succeeded on retry: backup %s", backup['id']) + + await notifications.send_offsite_success( + job_id=backup['id'], + backup_name=backup['file_path'].split('/')[-1], + file_size_bytes=backup['file_size_bytes'] + ) + + async def _rotation_job(self): + """Backup rotation job - removes expired backups""" + logger.info("🔄 Starting backup rotation job...") + + try: + await backup_service.rotate_backups() + logger.info("✅ Backup rotation completed") + + except Exception as e: + logger.error("❌ Backup rotation error: %s", str(e), exc_info=True) + + async def _storage_check_job(self): + """Storage usage check job - warns if storage is running low""" + logger.info("🔄 Starting storage check job...") + + try: + stats = await backup_service.check_storage_usage() + + if stats['warning']: + await notifications.send_storage_warning( + usage_pct=stats['usage_pct'], + used_gb=stats['total_size_gb'], + max_gb=settings.BACKUP_MAX_SIZE_GB + ) + + logger.info("✅ Storage check completed: %.1f%% used (%.2f GB / %d GB)", + stats['usage_pct'], stats['total_size_gb'], settings.BACKUP_MAX_SIZE_GB) + + except Exception as e: + logger.error("❌ Storage check error: %s", str(e), exc_info=True) + + def _get_weekday_number(self, day_name: str) -> int: + """Convert day name to APScheduler weekday number (0=Monday, 6=Sunday)""" + days = { + 'monday': 0, + 'tuesday': 1, + 'wednesday': 2, + 'thursday': 3, + 'friday': 4, + 'saturday': 5, + 'sunday': 6 + } + return days.get(day_name.lower(), 6) # Default to Sunday + + +# Singleton instance +backup_scheduler = BackupScheduler() diff --git a/app/backups/backend/service.py b/app/backups/backend/service.py new file mode 100644 index 0000000..50c7dd8 --- /dev/null +++ b/app/backups/backend/service.py @@ -0,0 +1,696 @@ +""" +Backup Service +Handles database and file backup operations, rotation, restore, and offsite uploads. +""" + +import os +import logging +import hashlib +import tarfile +import subprocess +import fcntl +from pathlib import Path +from datetime import datetime, timedelta +from typing import Optional, Dict, List, Tuple +import paramiko +from stat import S_ISDIR + +from app.core.config import settings +from app.core.database import execute_query, execute_insert, execute_update + +logger = logging.getLogger(__name__) + + +class BackupService: + """Service for managing backup operations""" + + def __init__(self): + self.backup_dir = Path(settings.BACKUP_STORAGE_PATH) + self.backup_dir.mkdir(parents=True, exist_ok=True) + + # Subdirectories for different backup types + self.db_dir = self.backup_dir / "database" + self.files_dir = self.backup_dir / "files" + self.db_dir.mkdir(exist_ok=True) + self.files_dir.mkdir(exist_ok=True) + + async def create_database_backup(self, is_monthly: bool = False) -> Optional[int]: + """ + Create PostgreSQL database backup using pg_dump + + Args: + is_monthly: If True, creates plain SQL backup for readability + + Returns: + backup_job_id or None if failed + """ + if settings.BACKUP_DRY_RUN: + logger.info("🔄 DRY RUN: Would create database backup (monthly=%s)", is_monthly) + return None + + # Determine format based on monthly flag + backup_format = settings.DB_MONTHLY_FORMAT if is_monthly else settings.DB_DAILY_FORMAT + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"db_{timestamp}_{'monthly' if is_monthly else 'daily'}.{backup_format}" + backup_path = self.db_dir / filename + + # Create backup job record + job_id = execute_insert( + """INSERT INTO backup_jobs (job_type, status, backup_format, is_monthly, started_at) + VALUES (%s, %s, %s, %s, %s)""", + ('database', 'running', backup_format, is_monthly, datetime.now()) + ) + + logger.info("🔄 Starting database backup: job_id=%s, format=%s, monthly=%s", + job_id, backup_format, is_monthly) + + try: + # Build pg_dump command - connect via network to postgres service + env = os.environ.copy() + env['PGPASSWORD'] = settings.DATABASE_URL.split(':')[2].split('@')[0] # Extract password + + # Parse database connection info from DATABASE_URL + # Format: postgresql://user:pass@host:port/dbname + db_parts = settings.DATABASE_URL.replace('postgresql://', '').split('@') + user_pass = db_parts[0].split(':') + host_db = db_parts[1].split('/') + + user = user_pass[0] + password = user_pass[1] if len(user_pass) > 1 else '' + host = host_db[0].split(':')[0] if ':' in host_db[0] else host_db[0] + dbname = host_db[1] if len(host_db) > 1 else 'bmc_hub' + + env['PGPASSWORD'] = password + + if backup_format == 'dump': + # Compressed custom format (-Fc) + cmd = ['pg_dump', '-h', host, '-U', user, '-Fc', dbname] + else: + # Plain SQL format + cmd = ['pg_dump', '-h', host, '-U', user, dbname] + + # Execute pg_dump and write to file + logger.info("📦 Executing: %s > %s", ' '.join(cmd), backup_path) + + with open(backup_path, 'wb') as f: + result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, check=True, env=env) + + # Calculate file size and checksum + file_size = backup_path.stat().st_size + checksum = self._calculate_checksum(backup_path) + + # Calculate retention date + if is_monthly: + retention_until = datetime.now() + timedelta(days=settings.MONTHLY_KEEP_MONTHS * 30) + else: + retention_until = datetime.now() + timedelta(days=settings.RETENTION_DAYS) + + # Update job record + execute_update( + """UPDATE backup_jobs + SET status = %s, completed_at = %s, file_path = %s, + file_size_bytes = %s, checksum_sha256 = %s, retention_until = %s + WHERE id = %s""", + ('completed', datetime.now(), str(backup_path), file_size, checksum, + retention_until.date(), job_id) + ) + + logger.info("✅ Database backup completed: %s (%.2f MB)", + filename, file_size / 1024 / 1024) + + return job_id + + except subprocess.CalledProcessError as e: + error_msg = e.stderr.decode() if e.stderr else str(e) + logger.error("❌ Database backup failed: %s", error_msg) + + execute_update( + """UPDATE backup_jobs + SET status = %s, completed_at = %s, error_message = %s + WHERE id = %s""", + ('failed', datetime.now(), error_msg, job_id) + ) + + # Clean up partial backup file + if backup_path.exists(): + backup_path.unlink() + + return None + + async def create_files_backup(self) -> Optional[int]: + """ + Create tar.gz backup of file directories (uploads/, data/, logs/) + + Returns: + backup_job_id or None if failed + """ + if settings.BACKUP_DRY_RUN: + logger.info("🔄 DRY RUN: Would create files backup") + return None + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"files_{timestamp}.tar.gz" + backup_path = self.files_dir / filename + + # Paths to backup (relative to project root) + base_path = Path.cwd() + paths_to_backup = [] + + if settings.BACKUP_INCLUDE_UPLOADS: + uploads_path = base_path / settings.UPLOAD_DIR + if uploads_path.exists(): + paths_to_backup.append((uploads_path, 'uploads')) + + if settings.BACKUP_INCLUDE_DATA: + data_path = base_path / 'data' + if data_path.exists(): + paths_to_backup.append((data_path, 'data')) + + if settings.BACKUP_INCLUDE_LOGS: + logs_path = base_path / 'logs' + if logs_path.exists(): + paths_to_backup.append((logs_path, 'logs')) + + if not paths_to_backup: + logger.warning("⚠️ No file directories to backup") + return None + + # Create backup job record + job_id = execute_insert( + """INSERT INTO backup_jobs + (job_type, status, backup_format, includes_uploads, includes_logs, includes_data, started_at) + VALUES (%s, %s, %s, %s, %s, %s, %s)""", + ('files', 'running', 'tar.gz', + settings.BACKUP_INCLUDE_UPLOADS, + settings.BACKUP_INCLUDE_LOGS, + settings.BACKUP_INCLUDE_DATA, + datetime.now()) + ) + + logger.info("🔄 Starting files backup: job_id=%s, paths=%s", + job_id, [name for _, name in paths_to_backup]) + + try: + # Exclude patterns + exclude_patterns = [ + '__pycache__', + '*.pyc', + '*.pyo', + '*.pyd', + '.DS_Store', + '.git', + 'backup', # Don't backup the backup directory itself! + ] + + # Create tar.gz archive + with tarfile.open(backup_path, 'w:gz') as tar: + for path, arcname in paths_to_backup: + tar.add( + path, + arcname=arcname, + recursive=True, + filter=lambda ti: None if any( + pattern in ti.name for pattern in exclude_patterns + ) else ti + ) + + # Calculate file size and checksum + file_size = backup_path.stat().st_size + checksum = self._calculate_checksum(backup_path) + + # Calculate retention date (files use daily retention) + retention_until = datetime.now() + timedelta(days=settings.RETENTION_DAYS) + + # Update job record + execute_update( + """UPDATE backup_jobs + SET status = %s, completed_at = %s, file_path = %s, + file_size_bytes = %s, checksum_sha256 = %s, retention_until = %s + WHERE id = %s""", + ('completed', datetime.now(), str(backup_path), file_size, checksum, + retention_until.date(), job_id) + ) + + logger.info("✅ Files backup completed: %s (%.2f MB)", + filename, file_size / 1024 / 1024) + + return job_id + + except Exception as e: + logger.error("❌ Files backup failed: %s", str(e)) + + execute_update( + """UPDATE backup_jobs + SET status = %s, completed_at = %s, error_message = %s + WHERE id = %s""", + ('failed', datetime.now(), str(e), job_id) + ) + + # Clean up partial backup file + if backup_path.exists(): + backup_path.unlink() + + return None + + async def create_full_backup(self, is_monthly: bool = False) -> Tuple[Optional[int], Optional[int]]: + """ + Create full backup (database + files) + + Returns: + (db_job_id, files_job_id) tuple + """ + logger.info("🔄 Starting full backup (database + files)") + + db_job_id = await self.create_database_backup(is_monthly=is_monthly) + files_job_id = await self.create_files_backup() + + if db_job_id and files_job_id: + logger.info("✅ Full backup completed: db=%s, files=%s", db_job_id, files_job_id) + else: + logger.warning("⚠️ Full backup partially failed: db=%s, files=%s", + db_job_id, files_job_id) + + return (db_job_id, files_job_id) + + async def rotate_backups(self): + """ + Remove old backups based on retention policy: + - Daily backups: Keep for RETENTION_DAYS (default 30 days) + - Monthly backups: Keep for MONTHLY_KEEP_MONTHS (default 12 months) + """ + if settings.BACKUP_DRY_RUN: + logger.info("🔄 DRY RUN: Would rotate backups") + return + + logger.info("🔄 Starting backup rotation") + + # Find expired backups + expired_backups = execute_query( + """SELECT id, file_path, is_monthly, retention_until + FROM backup_jobs + WHERE status = 'completed' + AND retention_until < CURRENT_DATE + ORDER BY retention_until ASC""" + ) + + deleted_count = 0 + freed_bytes = 0 + + for backup in expired_backups: + file_path = Path(backup['file_path']) + + if file_path.exists(): + file_size = file_path.stat().st_size + file_path.unlink() + freed_bytes += file_size + logger.info("🗑️ Deleted expired backup: %s (%.2f MB, retention_until=%s)", + file_path.name, file_size / 1024 / 1024, backup['retention_until']) + + # Delete from database + execute_update("DELETE FROM backup_jobs WHERE id = %s", (backup['id'],)) + deleted_count += 1 + + if deleted_count > 0: + logger.info("✅ Rotation complete: deleted %d backups, freed %.2f MB", + deleted_count, freed_bytes / 1024 / 1024) + else: + logger.info("✅ Rotation complete: no expired backups") + + async def restore_database(self, job_id: int) -> bool: + """ + Restore database from backup with maintenance mode + + Args: + job_id: Backup job ID to restore from + + Returns: + True if successful, False otherwise + """ + if settings.BACKUP_READ_ONLY: + logger.error("❌ Restore blocked: BACKUP_READ_ONLY=true") + return False + + # Get backup job + backup = execute_query( + "SELECT * FROM backup_jobs WHERE id = %s AND job_type = 'database'", + (job_id,), + fetchone=True + ) + + if not backup: + logger.error("❌ Backup job not found: %s", job_id) + return False + + backup_path = Path(backup['file_path']) + + if not backup_path.exists(): + logger.error("❌ Backup file not found: %s", backup_path) + return False + + logger.info("🔄 Starting database restore from backup: %s", backup_path.name) + + # Enable maintenance mode + await self.set_maintenance_mode(True, "Database restore i gang", eta_minutes=5) + + # TODO: Stop scheduler (will be implemented in scheduler.py) + + try: + # Verify checksum + current_checksum = self._calculate_checksum(backup_path) + if current_checksum != backup['checksum_sha256']: + raise ValueError(f"Checksum mismatch! Expected {backup['checksum_sha256']}, got {current_checksum}") + + logger.info("✅ Checksum verified") + + # Acquire file lock to prevent concurrent operations + lock_file = self.backup_dir / ".restore.lock" + with open(lock_file, 'w') as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX) + + # Parse database connection info + env = os.environ.copy() + db_parts = settings.DATABASE_URL.replace('postgresql://', '').split('@') + user_pass = db_parts[0].split(':') + host_db = db_parts[1].split('/') + + user = user_pass[0] + password = user_pass[1] if len(user_pass) > 1 else '' + host = host_db[0].split(':')[0] if ':' in host_db[0] else host_db[0] + dbname = host_db[1] if len(host_db) > 1 else 'bmc_hub' + + env['PGPASSWORD'] = password + + # Build restore command based on format + if backup['backup_format'] == 'dump': + # Restore from compressed custom format + cmd = ['pg_restore', '-h', host, '-U', user, '-d', dbname, '--clean', '--if-exists'] + + logger.info("📥 Executing: %s < %s", ' '.join(cmd), backup_path) + + with open(backup_path, 'rb') as f: + result = subprocess.run(cmd, stdin=f, stderr=subprocess.PIPE, check=True, env=env) + + else: + # Restore from plain SQL + cmd = ['psql', '-h', host, '-U', user, '-d', dbname] + + logger.info("📥 Executing: %s < %s", ' '.join(cmd), backup_path) + + with open(backup_path, 'rb') as f: + result = subprocess.run(cmd, stdin=f, stderr=subprocess.PIPE, check=True, env=env) + + # Release file lock + fcntl.flock(f.fileno(), fcntl.LOCK_UN) + + logger.info("✅ Database restore completed successfully") + + # Log notification + execute_insert( + """INSERT INTO backup_notifications (backup_job_id, event_type, message) + VALUES (%s, %s, %s)""", + (job_id, 'restore_started', f'Database restored from backup: {backup_path.name}') + ) + + return True + + except Exception as e: + logger.error("❌ Database restore failed: %s", str(e)) + return False + + finally: + # Disable maintenance mode + await self.set_maintenance_mode(False) + + # TODO: Restart scheduler (will be implemented in scheduler.py) + + # Clean up lock file + if lock_file.exists(): + lock_file.unlink() + + async def restore_files(self, job_id: int) -> bool: + """ + Restore files from tar.gz backup + + Args: + job_id: Backup job ID to restore from + + Returns: + True if successful, False otherwise + """ + if settings.BACKUP_READ_ONLY: + logger.error("❌ Restore blocked: BACKUP_READ_ONLY=true") + return False + + # Get backup job + backup = execute_query( + "SELECT * FROM backup_jobs WHERE id = %s AND job_type = 'files'", + (job_id,), + fetchone=True + ) + + if not backup: + logger.error("❌ Backup job not found: %s", job_id) + return False + + backup_path = Path(backup['file_path']) + + if not backup_path.exists(): + logger.error("❌ Backup file not found: %s", backup_path) + return False + + logger.info("🔄 Starting files restore from backup: %s", backup_path.name) + + try: + # Verify checksum + current_checksum = self._calculate_checksum(backup_path) + if current_checksum != backup['checksum_sha256']: + raise ValueError(f"Checksum mismatch! Expected {backup['checksum_sha256']}, got {current_checksum}") + + logger.info("✅ Checksum verified") + + # Acquire file lock + lock_file = self.backup_dir / ".restore_files.lock" + with open(lock_file, 'w') as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX) + + # Extract tar.gz to project root + base_path = Path.cwd() + + with tarfile.open(backup_path, 'r:gz') as tar: + # Extract all files, excluding backup directory + members = [m for m in tar.getmembers() if 'backup' not in m.name] + tar.extractall(path=base_path, members=members) + + # Release file lock + fcntl.flock(f.fileno(), fcntl.LOCK_UN) + + logger.info("✅ Files restore completed successfully") + + return True + + except Exception as e: + logger.error("❌ Files restore failed: %s", str(e)) + return False + + finally: + # Clean up lock file + if lock_file.exists(): + lock_file.unlink() + + async def upload_offsite(self, job_id: int) -> bool: + """ + Upload backup to offsite location via SFTP/SSH + + Args: + job_id: Backup job ID to upload + + Returns: + True if successful, False otherwise + """ + if not settings.OFFSITE_ENABLED: + logger.info("⏭️ Offsite upload disabled") + return False + + if settings.BACKUP_DRY_RUN: + logger.info("🔄 DRY RUN: Would upload backup to offsite") + return False + + # Get backup job + backup = execute_query( + "SELECT * FROM backup_jobs WHERE id = %s", + (job_id,), + fetchone=True + ) + + if not backup: + logger.error("❌ Backup job not found: %s", job_id) + return False + + if backup['offsite_uploaded_at']: + logger.info("⏭️ Backup already uploaded to offsite: %s", job_id) + return True + + backup_path = Path(backup['file_path']) + + if not backup_path.exists(): + logger.error("❌ Backup file not found: %s", backup_path) + return False + + logger.info("☁️ Starting offsite upload: %s to %s:%s", + backup_path.name, settings.SFTP_HOST, settings.SFTP_REMOTE_PATH) + + try: + # Connect via SFTP + transport = paramiko.Transport((settings.SFTP_HOST, settings.SFTP_PORT)) + + if settings.SSH_KEY_PATH: + # Use SSH key authentication + private_key = paramiko.RSAKey.from_private_key_file(settings.SSH_KEY_PATH) + transport.connect(username=settings.SFTP_USER, pkey=private_key) + else: + # Use password authentication + transport.connect(username=settings.SFTP_USER, password=settings.SFTP_PASSWORD) + + sftp = paramiko.SFTPClient.from_transport(transport) + + # Create remote directory if needed + remote_path = settings.SFTP_REMOTE_PATH + self._ensure_remote_directory(sftp, remote_path) + + # Upload file + remote_file = f"{remote_path}/{backup_path.name}" + sftp.put(str(backup_path), remote_file) + + # Verify upload + remote_stat = sftp.stat(remote_file) + local_size = backup_path.stat().st_size + + if remote_stat.st_size != local_size: + raise ValueError(f"Upload verification failed: remote size {remote_stat.st_size} != local size {local_size}") + + # Close connection + sftp.close() + transport.close() + + # Update job record + execute_update( + """UPDATE backup_jobs + SET offsite_uploaded_at = %s, offsite_retry_count = 0 + WHERE id = %s""", + (datetime.now(), job_id) + ) + + logger.info("✅ Offsite upload completed: %s", backup_path.name) + + return True + + except Exception as e: + logger.error("❌ Offsite upload failed: %s", str(e)) + + # Increment retry count + execute_update( + """UPDATE backup_jobs + SET offsite_retry_count = offsite_retry_count + 1 + WHERE id = %s""", + (job_id,) + ) + + return False + + async def check_storage_usage(self) -> Dict[str, any]: + """ + Check backup storage usage and warn if exceeding threshold + + Returns: + Dict with storage statistics + """ + total_size = 0 + file_count = 0 + + for backup_file in self.backup_dir.rglob('*'): + if backup_file.is_file() and not backup_file.name.startswith('.'): + total_size += backup_file.stat().st_size + file_count += 1 + + max_size_bytes = settings.BACKUP_MAX_SIZE_GB * 1024 * 1024 * 1024 + usage_pct = (total_size / max_size_bytes) * 100 if max_size_bytes > 0 else 0 + + stats = { + 'total_size_bytes': total_size, + 'total_size_gb': total_size / 1024 / 1024 / 1024, + 'max_size_gb': settings.BACKUP_MAX_SIZE_GB, + 'usage_pct': usage_pct, + 'file_count': file_count, + 'warning': usage_pct >= settings.STORAGE_WARNING_THRESHOLD_PCT + } + + if stats['warning']: + logger.warning("⚠️ Backup storage usage high: %.1f%% (%.2f GB / %d GB)", + usage_pct, stats['total_size_gb'], settings.BACKUP_MAX_SIZE_GB) + + # Log notification + execute_insert( + """INSERT INTO backup_notifications (event_type, message) + VALUES (%s, %s)""", + ('storage_low', + f"Backup storage usage at {usage_pct:.1f}% ({stats['total_size_gb']:.2f} GB / {settings.BACKUP_MAX_SIZE_GB} GB)") + ) + + return stats + + async def set_maintenance_mode(self, enabled: bool, message: str = None, eta_minutes: int = None): + """ + Enable or disable system maintenance mode + + Args: + enabled: True to enable maintenance mode, False to disable + message: Custom maintenance message + eta_minutes: Estimated time to completion in minutes + """ + if message is None: + message = "System under vedligeholdelse" if enabled else "" + + execute_update( + """UPDATE system_status + SET maintenance_mode = %s, maintenance_message = %s, + maintenance_eta_minutes = %s, updated_at = %s + WHERE id = 1""", + (enabled, message, eta_minutes, datetime.now()) + ) + + if enabled: + logger.warning("🔧 Maintenance mode ENABLED: %s (ETA: %s min)", message, eta_minutes) + else: + logger.info("✅ Maintenance mode DISABLED") + + def _calculate_checksum(self, file_path: Path) -> str: + """Calculate SHA256 checksum of file""" + sha256_hash = hashlib.sha256() + + with open(file_path, "rb") as f: + for byte_block in iter(lambda: f.read(4096), b""): + sha256_hash.update(byte_block) + + return sha256_hash.hexdigest() + + def _ensure_remote_directory(self, sftp: paramiko.SFTPClient, path: str): + """Create remote directory if it doesn't exist (recursive)""" + dirs = [] + current = path + + while current != '/': + dirs.append(current) + current = os.path.dirname(current) + + dirs.reverse() + + for dir_path in dirs: + try: + sftp.stat(dir_path) + except FileNotFoundError: + sftp.mkdir(dir_path) + logger.info("📁 Created remote directory: %s", dir_path) + + +# Singleton instance +backup_service = BackupService() diff --git a/app/backups/frontend/__init__.py b/app/backups/frontend/__init__.py new file mode 100644 index 0000000..f3a0b80 --- /dev/null +++ b/app/backups/frontend/__init__.py @@ -0,0 +1 @@ +"""Backup frontend views and templates.""" diff --git a/app/backups/frontend/views.py b/app/backups/frontend/views.py new file mode 100644 index 0000000..680d8f3 --- /dev/null +++ b/app/backups/frontend/views.py @@ -0,0 +1,20 @@ +""" +Backup Frontend Views +Serves HTML pages for backup system dashboard +""" + +from fastapi import APIRouter, Request +from fastapi.templating import Jinja2Templates +from fastapi.responses import HTMLResponse + +router = APIRouter() +templates = Jinja2Templates(directory="app") + + +@router.get("/backups", response_class=HTMLResponse) +async def backups_dashboard(request: Request): + """Backup system dashboard page""" + return templates.TemplateResponse("backups/templates/index.html", { + "request": request, + "title": "Backup System" + }) diff --git a/app/backups/templates/index.html b/app/backups/templates/index.html new file mode 100644 index 0000000..d53650b --- /dev/null +++ b/app/backups/templates/index.html @@ -0,0 +1,778 @@ + + + + + + Backup System - BMC Hub + + + + + + + + +
+ +
+
+
+
-
+
Total Backups
+
+
+
+
+
-
+
Completed
+
+
+
+
+
-
+
Pending Offsite
+
+
+
+
+
-
+
Storage Used
+
+
+
+ + +
+
+
+
+ Storage Usage +
+
+
+
+ 0% +
+
+

Loading...

+
+
+
+
+ + +
+
+
+
+ Manual Backup +
+
+
+
+
+ + +
+
+
+ + +
+
+
+ +
+
+
+
+ +
+ + +
Upload Backup
+
+
+
+ + +
+
+ + +
+
+
+ + +
+
+
+ +
+
+
+
+
+
+
+
+
+
+ Scheduler Status +
+
+
+
+ Loading... +
+
+
+
+
+ + +
+
+
+
+ Backup History + +
+
+
+ + + + + + + + + + + + + + + + + + +
IDTypeFormatSizeStatusOffsiteCreatedActions
+
+

Loading backups...

+
+
+
+
+
+ + +
+
+
+ Recent Notifications +
+
+
+
+
+

Loading...

+
+
+
+
+
+
+
+ + + + + + + + diff --git a/app/billing/backend/supplier_invoices.py b/app/billing/backend/supplier_invoices.py index d6e3888..4817c43 100644 --- a/app/billing/backend/supplier_invoices.py +++ b/app/billing/backend/supplier_invoices.py @@ -4,6 +4,7 @@ Backend API for managing supplier invoices that integrate with e-conomic """ from fastapi import APIRouter, HTTPException, UploadFile, File +from pydantic import BaseModel from typing import List, Dict, Optional from datetime import datetime, date, timedelta from decimal import Decimal @@ -276,22 +277,36 @@ async def get_pending_files(): logger.info(f"📋 Checking invoice2data templates: {len(invoice2data.templates)} loaded") for file in files: - # Check if there's an invoice2data template for this vendor's CVR + # Check if there's an invoice2data template for this vendor's CVR or name vendor_cvr = file.get('matched_vendor_cvr_number') or file.get('detected_vendor_cvr') or file.get('vendor_cvr') + vendor_name = file.get('vendor_name') or file.get('detected_vendor_name') or file.get('matched_vendor_name') file['has_invoice2data_template'] = False - logger.debug(f" File {file['file_id']}: CVR={vendor_cvr}") + logger.debug(f" File {file['file_id']}: CVR={vendor_cvr}, name={vendor_name}") - if vendor_cvr: - # Check all templates for this CVR in keywords - for template_name, template_data in invoice2data.templates.items(): - keywords = template_data.get('keywords', []) - logger.debug(f" Template {template_name}: keywords={keywords}") - if str(vendor_cvr) in [str(k) for k in keywords]: - file['has_invoice2data_template'] = True - file['invoice2data_template_name'] = template_name - logger.info(f" ✅ File {file['file_id']} matched template: {template_name}") - break + # Check all templates + for template_name, template_data in invoice2data.templates.items(): + keywords = template_data.get('keywords', []) + logger.debug(f" Template {template_name}: keywords={keywords}") + + # Match by CVR + if vendor_cvr and str(vendor_cvr) in [str(k) for k in keywords]: + file['has_invoice2data_template'] = True + file['invoice2data_template_name'] = template_name + logger.info(f" ✅ File {file['file_id']} matched template {template_name} by CVR") + break + + # Match by vendor name + if vendor_name: + for keyword in keywords: + if str(keyword).upper() in str(vendor_name).upper(): + file['has_invoice2data_template'] = True + file['invoice2data_template_name'] = template_name + logger.info(f" ✅ File {file['file_id']} matched template {template_name} by name") + break + + if file['has_invoice2data_template']: + break except Exception as e: logger.error(f"❌ Failed to check invoice2data templates: {e}", exc_info=True) # Continue without invoice2data info @@ -413,12 +428,23 @@ async def get_file_extracted_data(file_id: int): # Build llm_data response llm_data = None if llm_json_data: + # Normalize common invoice2data field names to our API schema + total_amount_value = llm_json_data.get('total_amount') + if total_amount_value is None: + total_amount_value = llm_json_data.get('amount_total') + + invoice_date_value = llm_json_data.get('invoice_date') + if invoice_date_value is None: + invoice_date_value = llm_json_data.get('document_date') + + due_date_value = llm_json_data.get('due_date') + # Use invoice_number from LLM JSON (works for both AI and template extraction) llm_data = { "invoice_number": llm_json_data.get('invoice_number'), - "invoice_date": llm_json_data.get('invoice_date'), - "due_date": llm_json_data.get('due_date'), - "total_amount": float(llm_json_data.get('total_amount')) if llm_json_data.get('total_amount') else None, + "invoice_date": invoice_date_value, + "due_date": due_date_value, + "total_amount": float(total_amount_value) if total_amount_value else None, "currency": llm_json_data.get('currency') or 'DKK', "document_type": llm_json_data.get('document_type'), "lines": formatted_lines @@ -1377,8 +1403,11 @@ async def delete_supplier_invoice(invoice_id: int): # ========== E-CONOMIC INTEGRATION ========== +class ApproveRequest(BaseModel): + approved_by: str + @router.post("/supplier-invoices/{invoice_id}/approve") -async def approve_supplier_invoice(invoice_id: int, approved_by: str): +async def approve_supplier_invoice(invoice_id: int, request: ApproveRequest): """Approve supplier invoice for payment""" try: invoice = execute_query( @@ -1388,21 +1417,21 @@ async def approve_supplier_invoice(invoice_id: int, approved_by: str): ) if not invoice: - raise HTTPException(status_code=404, detail=f"Invoice {invoice_id} not found") + raise HTTPException(status_code=404, detail=f"Faktura {invoice_id} ikke fundet") if invoice['status'] != 'pending': - raise HTTPException(status_code=400, detail=f"Invoice is already {invoice['status']}") + raise HTTPException(status_code=400, detail=f"Faktura har allerede status '{invoice['status']}' - kan kun godkende fakturaer med status 'pending'") execute_update( """UPDATE supplier_invoices SET status = 'approved', approved_by = %s, approved_at = CURRENT_TIMESTAMP WHERE id = %s""", - (approved_by, invoice_id) + (request.approved_by, invoice_id) ) - logger.info(f"✅ Approved supplier invoice {invoice['invoice_number']} by {approved_by}") + logger.info(f"✅ Approved supplier invoice {invoice['invoice_number']} by {request.approved_by}") - return {"success": True, "invoice_id": invoice_id, "approved_by": approved_by} + return {"success": True, "invoice_id": invoice_id, "approved_by": request.approved_by} except HTTPException: raise @@ -2058,6 +2087,35 @@ async def reprocess_uploaded_file(file_id: int): is_invoice2data = (template_id == -1) if is_invoice2data: + def _to_numeric(value): + if value is None: + return None + if isinstance(value, (int, float, Decimal)): + return float(value) + if not isinstance(value, str): + return None + + cleaned = value.strip().replace(' ', '') + if not cleaned: + return None + + # Common Danish formatting: 25.000,00 or 1.530,00 + if ',' in cleaned: + cleaned = cleaned.replace('.', '').replace(',', '.') + + try: + return float(cleaned) + except ValueError: + return None + + def _clean_document_id(value): + if value is None: + return None + if isinstance(value, str): + cleaned = value.strip() + return cleaned if cleaned and cleaned.lower() != 'none' else None + return str(value) + # Invoice2data doesn't have vendor in cache logger.info(f"📋 Using invoice2data template") # Try to find vendor from extracted CVR @@ -2069,6 +2127,20 @@ async def reprocess_uploaded_file(file_id: int): ) if vendor: vendor_id = vendor['id'] + + # Fallback: use vendor detected during quick analysis (incoming_files.detected_vendor_id) + if vendor_id is None: + vendor_id = file_record.get('detected_vendor_id') + + # Fallback: match by issuer name + if vendor_id is None and extracted_fields.get('issuer'): + vendor = execute_query( + "SELECT id FROM vendors WHERE name ILIKE %s ORDER BY id LIMIT 1", + (extracted_fields['issuer'],), + fetchone=True + ) + if vendor: + vendor_id = vendor['id'] # Store invoice2data extraction in database extraction_id = execute_insert( @@ -2081,12 +2153,12 @@ async def reprocess_uploaded_file(file_id: int): (file_id, vendor_id, extracted_fields.get('issuer'), # vendor_name extracted_fields.get('vendor_vat'), # vendor_cvr - str(extracted_fields.get('invoice_number')), # document_id + _clean_document_id(extracted_fields.get('invoice_number')), # document_id extracted_fields.get('invoice_date'), # document_date extracted_fields.get('due_date'), 'invoice', # document_type 'invoice', # document_type_detected - extracted_fields.get('amount_total'), + _to_numeric(extracted_fields.get('amount_total') if extracted_fields.get('amount_total') is not None else extracted_fields.get('total_amount')), extracted_fields.get('currency', 'DKK'), 1.0, # invoice2data always 100% confidence json.dumps(extracted_fields), # llm_response_json @@ -2096,6 +2168,9 @@ async def reprocess_uploaded_file(file_id: int): # Insert line items if extracted if extracted_fields.get('lines'): for idx, line in enumerate(extracted_fields['lines'], start=1): + line_total = _to_numeric(line.get('line_total')) + unit_price = _to_numeric(line.get('unit_price')) + quantity = _to_numeric(line.get('quantity')) execute_insert( """INSERT INTO extraction_lines (extraction_id, line_number, description, quantity, unit_price, @@ -2104,8 +2179,8 @@ async def reprocess_uploaded_file(file_id: int): VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING line_id""", (extraction_id, idx, line.get('description'), - line.get('quantity'), line.get('unit_price'), - line.get('line_total'), None, None, 1.0, + quantity, unit_price, + line_total, None, None, 1.0, line.get('ip_address'), line.get('contract_number'), line.get('location_street'), line.get('location_zip'), line.get('location_city')) ) @@ -2137,8 +2212,14 @@ async def reprocess_uploaded_file(file_id: int): logger.info(f"🤖 Calling Ollama for AI extraction...") llm_result = await ollama_service.extract_from_text(text) - if not llm_result or 'error' in llm_result: - error_msg = llm_result.get('error') if llm_result else 'AI extraction fejlede' + # Handle both dict and string error responses + if not llm_result or isinstance(llm_result, str) or (isinstance(llm_result, dict) and 'error' in llm_result): + if isinstance(llm_result, dict): + error_msg = llm_result.get('error', 'AI extraction fejlede') + elif isinstance(llm_result, str): + error_msg = llm_result # Error message returned as string + else: + error_msg = 'AI extraction fejlede' logger.error(f"❌ AI extraction failed: {error_msg}") execute_update( @@ -2198,6 +2279,14 @@ async def reprocess_uploaded_file(file_id: int): logger.info(f"✅ AI extraction completed for file {file_id}") # Return success with template data or AI extraction result + # Determine confidence value safely + if template_id: + final_confidence = confidence + elif 'llm_result' in locals() and isinstance(llm_result, dict): + final_confidence = llm_result.get('confidence', 0.75) + else: + final_confidence = 0.0 + result = { "status": "success", "file_id": file_id, @@ -2205,7 +2294,7 @@ async def reprocess_uploaded_file(file_id: int): "template_matched": template_id is not None, "template_id": template_id, "vendor_id": vendor_id, - "confidence": confidence if template_id else llm_result.get('confidence', 0.75), + "confidence": final_confidence, "extracted_fields": extracted_fields, "pdf_text": text[:1000] if not template_id else text } diff --git a/app/billing/frontend/supplier_invoices.html b/app/billing/frontend/supplier_invoices.html index 3835d8a..eb33b2b 100644 --- a/app/billing/frontend/supplier_invoices.html +++ b/app/billing/frontend/supplier_invoices.html @@ -515,7 +515,7 @@