bmc_hub/app/backups/backend/notifications.py

395 lines
14 KiB
Python
Raw Permalink Normal View History

"""
Notification Service for Backup System
Sends rich formatted notifications to Mattermost webhook
"""
import logging
import aiohttp
from datetime import datetime
from typing import Dict, Optional, List
from app.core.config import settings
from app.core.database import execute_insert
logger = logging.getLogger(__name__)
class MattermostNotification:
"""Service for sending Mattermost webhook notifications"""
def __init__(self):
self.webhook_url = settings.MATTERMOST_WEBHOOK_URL
self.enabled = settings.MATTERMOST_ENABLED
self.channel = settings.MATTERMOST_CHANNEL
async def send_backup_success(self, job_id: int, job_type: str, file_size_bytes: int,
duration_seconds: float, checksum: str, is_monthly: bool = False):
"""
Send success notification for completed backup
Args:
job_id: Backup job ID
job_type: database, files, or full
file_size_bytes: Size of backup file
duration_seconds: Time taken to complete backup
checksum: SHA256 checksum of backup
is_monthly: Whether this is a monthly backup
"""
if not self._should_send_notification('backup_success'):
return
file_size_mb = file_size_bytes / 1024 / 1024
payload = {
"channel": self.channel,
"username": "BMC Hub Backup",
"icon_emoji": ":white_check_mark:",
"text": f"✅ **Backup Complete** - {job_type.capitalize()}",
"attachments": [{
"color": "#36a64f", # Green
"fields": [
{
"short": True,
"title": "Job ID",
"value": f"#{job_id}"
},
{
"short": True,
"title": "Type",
"value": f"{job_type.capitalize()} {'(Monthly)' if is_monthly else '(Daily)'}"
},
{
"short": True,
"title": "Size",
"value": f"{file_size_mb:.2f} MB"
},
{
"short": True,
"title": "Duration",
"value": f"{duration_seconds:.1f}s"
},
{
"short": False,
"title": "Checksum (SHA256)",
"value": f"`{checksum[:16]}...{checksum[-16:]}`"
}
],
"footer": "BMC Hub Backup System",
"footer_icon": "https://platform.slack-edge.com/img/default_application_icon.png",
"ts": int(datetime.now().timestamp())
}]
}
await self._send_webhook(payload, 'backup_success', job_id)
async def send_backup_failed(self, job_id: int, job_type: str, error_message: str):
"""
Send failure notification for failed backup
Args:
job_id: Backup job ID
job_type: database, files, or full
error_message: Error message from failed backup
"""
if not self._should_send_notification('backup_failed'):
return
payload = {
"channel": self.channel,
"username": "BMC Hub Backup",
"icon_emoji": ":x:",
"text": f"❌ **Backup Failed** - {job_type.capitalize()}",
"attachments": [{
"color": "#ff0000", # Red
"fields": [
{
"short": True,
"title": "Job ID",
"value": f"#{job_id}"
},
{
"short": True,
"title": "Type",
"value": job_type.capitalize()
},
{
"short": False,
"title": "Error",
"value": f"```{error_message[:500]}```"
}
],
"actions": [
{
"name": "view_dashboard",
"type": "button",
"text": "View Dashboard",
"url": f"{self._get_hub_url()}/backups"
}
],
"footer": "BMC Hub Backup System",
"ts": int(datetime.now().timestamp())
}]
}
await self._send_webhook(payload, 'backup_failed', job_id)
async def send_offsite_success(self, job_id: int, backup_name: str, file_size_bytes: int):
"""
Send notification for successful offsite upload
Args:
job_id: Backup job ID
backup_name: Name of uploaded backup file
file_size_bytes: Size of uploaded file
"""
if not settings.NOTIFY_ON_SUCCESS_OFFSITE:
return
file_size_mb = file_size_bytes / 1024 / 1024
payload = {
"channel": self.channel,
"username": "BMC Hub Backup",
"icon_emoji": ":cloud:",
"text": f"☁️ **Offsite Upload Complete**",
"attachments": [{
"color": "#0066cc", # Blue
"fields": [
{
"short": True,
"title": "Job ID",
"value": f"#{job_id}"
},
{
"short": True,
"title": "Destination",
"value": f"{settings.SFTP_HOST}:{settings.SFTP_REMOTE_PATH}"
},
{
"short": True,
"title": "Filename",
"value": backup_name
},
{
"short": True,
"title": "Size",
"value": f"{file_size_mb:.2f} MB"
}
],
"footer": "BMC Hub Backup System",
"ts": int(datetime.now().timestamp())
}]
}
await self._send_webhook(payload, 'backup_success', job_id)
async def send_offsite_failed(self, job_id: int, backup_name: str, error_message: str,
retry_count: int):
"""
Send notification for failed offsite upload
Args:
job_id: Backup job ID
backup_name: Name of backup file
error_message: Error message
retry_count: Current retry attempt number
"""
if not self._should_send_notification('offsite_failed'):
return
max_retries = settings.OFFSITE_RETRY_MAX_ATTEMPTS
payload = {
"channel": self.channel,
"username": "BMC Hub Backup",
"icon_emoji": ":warning:",
"text": f"⚠️ **Offsite Upload Failed** (Retry {retry_count}/{max_retries})",
"attachments": [{
"color": "#ff9900", # Orange
"fields": [
{
"short": True,
"title": "Job ID",
"value": f"#{job_id}"
},
{
"short": True,
"title": "Destination",
"value": f"{settings.SFTP_HOST}:{settings.SFTP_REMOTE_PATH}"
},
{
"short": True,
"title": "Filename",
"value": backup_name
},
{
"short": True,
"title": "Retry Status",
"value": f"{retry_count}/{max_retries}"
},
{
"short": False,
"title": "Error",
"value": f"```{error_message[:300]}```"
}
],
"footer": "BMC Hub Backup System",
"ts": int(datetime.now().timestamp())
}]
}
await self._send_webhook(payload, 'offsite_failed', job_id)
async def send_storage_warning(self, usage_pct: float, used_gb: float, max_gb: int):
"""
Send notification for high storage usage
Args:
usage_pct: Percentage of storage used
used_gb: Gigabytes used
max_gb: Maximum storage capacity
"""
if not self._should_send_notification('storage_low'):
return
payload = {
"channel": self.channel,
"username": "BMC Hub Backup",
"icon_emoji": ":warning:",
"text": f"⚠️ **Backup Storage Warning**",
"attachments": [{
"color": "#ff9900", # Orange
"fields": [
{
"short": True,
"title": "Usage",
"value": f"{usage_pct:.1f}%"
},
{
"short": True,
"title": "Space Used",
"value": f"{used_gb:.2f} GB / {max_gb} GB"
},
{
"short": False,
"title": "Recommendation",
"value": "Consider running backup rotation or increasing storage capacity."
}
],
"actions": [
{
"name": "view_dashboard",
"type": "button",
"text": "View Dashboard",
"url": f"{self._get_hub_url()}/backups"
}
],
"footer": "BMC Hub Backup System",
"ts": int(datetime.now().timestamp())
}]
}
await self._send_webhook(payload, 'storage_low')
async def send_restore_started(self, job_id: int, backup_name: str, eta_minutes: int):
"""
Send notification when restore operation starts
Args:
job_id: Backup job ID being restored
backup_name: Name of backup file
eta_minutes: Estimated time to completion
"""
payload = {
"channel": self.channel,
"username": "BMC Hub Backup",
"icon_emoji": ":gear:",
"text": f"🔧 **System Maintenance: Restore in Progress**",
"attachments": [{
"color": "#ffcc00", # Yellow
"fields": [
{
"short": True,
"title": "Backup Job ID",
"value": f"#{job_id}"
},
{
"short": True,
"title": "ETA",
"value": f"{eta_minutes} minutes"
},
{
"short": False,
"title": "Backup File",
"value": backup_name
},
{
"short": False,
"title": "Status",
"value": "System is in maintenance mode. All services temporarily unavailable."
}
],
"footer": "BMC Hub Backup System",
"ts": int(datetime.now().timestamp())
}]
}
await self._send_webhook(payload, 'restore_started', job_id)
async def _send_webhook(self, payload: Dict, event_type: str, job_id: Optional[int] = None):
"""
Send webhook to Mattermost and log notification
Args:
payload: Mattermost webhook payload
event_type: Type of notification event
job_id: Optional backup job ID
"""
if not self.enabled or not self.webhook_url:
logger.info("📢 Notification (disabled): %s - job_id=%s", event_type, job_id)
return
try:
async with aiohttp.ClientSession() as session:
async with session.post(self.webhook_url, json=payload, timeout=10) as response:
if response.status == 200:
logger.info("📢 Notification sent: %s - job_id=%s", event_type, job_id)
# Log to database
execute_insert(
"""INSERT INTO backup_notifications
(backup_job_id, event_type, message, mattermost_payload)
VALUES (%s, %s, %s, %s)""",
(job_id, event_type, payload.get('text', ''), str(payload))
)
else:
error_text = await response.text()
logger.error("❌ Notification failed: HTTP %s - %s",
response.status, error_text)
except aiohttp.ClientError as e:
logger.error("❌ Notification connection error: %s", str(e))
except Exception as e:
logger.error("❌ Notification error: %s", str(e))
def _should_send_notification(self, event_type: str) -> bool:
"""Check if notification should be sent based on settings"""
if not self.enabled or not self.webhook_url:
return False
if event_type in ['backup_failed', 'offsite_failed', 'storage_low']:
return settings.NOTIFY_ON_FAILURE
if event_type == 'backup_success' and 'offsite' in event_type:
return settings.NOTIFY_ON_SUCCESS_OFFSITE
return True
def _get_hub_url(self) -> str:
"""Get BMC Hub base URL for action buttons"""
# TODO: Add HUB_BASE_URL to config
return "http://localhost:8000" # Fallback
# Singleton instance
notifications = MattermostNotification()