bmc_hub/app/alert_notes/backend/router.py

516 lines
16 KiB
Python
Raw Normal View History

"""
Alert Notes Router
API endpoints for contextual customer/contact alert system
"""
from fastapi import APIRouter, HTTPException, Depends, Query
from typing import List, Optional, Dict
import logging
from datetime import datetime
from app.core.database import execute_query, execute_update
from app.core.auth_dependencies import require_permission, get_current_user
from app.alert_notes.backend.schemas import (
AlertNoteCreate, AlertNoteUpdate, AlertNoteFull, AlertNoteCheck,
AlertNoteRestriction, AlertNoteAcknowledgement, EntityType, Severity
)
logger = logging.getLogger(__name__)
router = APIRouter()
def _check_user_can_handle(alert_id: int, current_user: dict) -> bool:
"""
Check if current user is allowed to handle the entity based on restrictions.
Returns True if no restrictions exist OR user matches a restriction.
"""
# Superadmins bypass restrictions
if current_user.get("is_superadmin"):
return True
# Get restrictions for this alert
restrictions = execute_query(
"""
SELECT restriction_type, restriction_id
FROM alert_note_restrictions
WHERE alert_note_id = %s
""",
(alert_id,)
)
# No restrictions = everyone can handle
if not restrictions:
return True
user_id = current_user["id"]
# Get user's group IDs
user_groups = execute_query(
"SELECT group_id FROM user_groups WHERE user_id = %s",
(user_id,)
)
user_group_ids = [g["group_id"] for g in user_groups]
# Check if user matches any restriction
for restriction in restrictions:
if restriction["restriction_type"] == "user" and restriction["restriction_id"] == user_id:
return True
if restriction["restriction_type"] == "group" and restriction["restriction_id"] in user_group_ids:
return True
return False
def _get_entity_name(entity_type: str, entity_id: int) -> Optional[str]:
"""Get the name of the entity (customer or contact)"""
if entity_type == "customer":
result = execute_query(
"SELECT name FROM customers WHERE id = %s",
(entity_id,)
)
return result[0]["name"] if result else None
elif entity_type == "contact":
result = execute_query(
"SELECT first_name, last_name FROM contacts WHERE id = %s",
(entity_id,)
)
if result:
return f"{result[0]['first_name']} {result[0]['last_name']}"
return None
def _get_alert_with_relations(alert_id: int, current_user: dict) -> Optional[Dict]:
"""Get alert note with all its relations"""
# Get main alert
alerts = execute_query(
"""
SELECT an.*, u.full_name as created_by_user_name
FROM alert_notes an
LEFT JOIN users u ON an.created_by_user_id = u.user_id
WHERE an.id = %s
""",
(alert_id,)
)
if not alerts:
return None
alert = dict(alerts[0])
# Get entity name
alert["entity_name"] = _get_entity_name(alert["entity_type"], alert["entity_id"])
# Get restrictions
restrictions = execute_query(
"""
SELECT anr.*,
CASE
WHEN anr.restriction_type = 'group' THEN g.name
WHEN anr.restriction_type = 'user' THEN u.full_name
END as restriction_name
FROM alert_note_restrictions anr
LEFT JOIN groups g ON anr.restriction_type = 'group' AND anr.restriction_id = g.id
LEFT JOIN users u ON anr.restriction_type = 'user' AND anr.restriction_id = u.user_id
WHERE anr.alert_note_id = %s
""",
(alert_id,)
)
alert["restrictions"] = restrictions
# Get acknowledgements
acknowledgements = execute_query(
"""
SELECT ana.*, u.full_name as user_name
FROM alert_note_acknowledgements ana
LEFT JOIN users u ON ana.user_id = u.user_id
WHERE ana.alert_note_id = %s
ORDER BY ana.acknowledged_at DESC
""",
(alert_id,)
)
alert["acknowledgements"] = acknowledgements
return alert
@router.get("/alert-notes/check", response_model=AlertNoteCheck)
async def check_alerts(
entity_type: EntityType = Query(..., description="Entity type (customer/contact)"),
entity_id: int = Query(..., description="Entity ID"),
current_user: dict = Depends(get_current_user)
):
"""
Check if there are active alert notes for a specific entity.
Returns alerts that the current user is allowed to see based on restrictions.
"""
# Get active alerts for this entity
alerts = execute_query(
"""
SELECT an.*, u.full_name as created_by_user_name
FROM alert_notes an
LEFT JOIN users u ON an.created_by_user_id = u.user_id
WHERE an.entity_type = %s
AND an.entity_id = %s
AND an.active = TRUE
ORDER BY
CASE an.severity
WHEN 'critical' THEN 1
WHEN 'warning' THEN 2
WHEN 'info' THEN 3
END,
an.created_at DESC
""",
(entity_type.value, entity_id)
)
if not alerts:
return AlertNoteCheck(
has_alerts=False,
alerts=[],
user_can_handle=True,
user_has_acknowledged=False
)
# Enrich alerts with relations
enriched_alerts = []
for alert in alerts:
alert_dict = dict(alert)
alert_dict["entity_name"] = _get_entity_name(alert["entity_type"], alert["entity_id"])
# Get restrictions
restrictions = execute_query(
"""
SELECT anr.*,
CASE
WHEN anr.restriction_type = 'group' THEN g.name
WHEN anr.restriction_type = 'user' THEN u.full_name
END as restriction_name
FROM alert_note_restrictions anr
LEFT JOIN groups g ON anr.restriction_type = 'group' AND anr.restriction_id = g.id
LEFT JOIN users u ON anr.restriction_type = 'user' AND anr.restriction_id = u.user_id
WHERE anr.alert_note_id = %s
""",
(alert["id"],)
)
alert_dict["restrictions"] = restrictions
# Get acknowledgements
acknowledgements = execute_query(
"""
SELECT ana.*, u.full_name as user_name
FROM alert_note_acknowledgements ana
LEFT JOIN users u ON ana.user_id = u.user_id
WHERE ana.alert_note_id = %s
ORDER BY ana.acknowledged_at DESC
""",
(alert["id"],)
)
alert_dict["acknowledgements"] = acknowledgements
enriched_alerts.append(alert_dict)
# Check if user can handle based on restrictions
user_can_handle = all(_check_user_can_handle(a["id"], current_user) for a in alerts)
# Check if user has acknowledged all alerts that require it
user_id = current_user["id"]
user_has_acknowledged = True
for alert in alerts:
if alert["requires_acknowledgement"]:
ack = execute_query(
"SELECT id FROM alert_note_acknowledgements WHERE alert_note_id = %s AND user_id = %s",
(alert["id"], user_id)
)
if not ack:
user_has_acknowledged = False
break
return AlertNoteCheck(
has_alerts=True,
alerts=enriched_alerts,
user_can_handle=user_can_handle,
user_has_acknowledged=user_has_acknowledged
)
@router.post("/alert-notes/{alert_id}/acknowledge")
async def acknowledge_alert(
alert_id: int,
current_user: dict = Depends(get_current_user)
):
"""
Mark an alert note as acknowledged by the current user.
"""
# Check if alert exists
alert = execute_query(
"SELECT id, active FROM alert_notes WHERE id = %s",
(alert_id,)
)
if not alert:
raise HTTPException(status_code=404, detail="Alert note not found")
if not alert[0]["active"]:
raise HTTPException(status_code=400, detail="Alert note is not active")
user_id = current_user["id"]
# Check if already acknowledged
existing = execute_query(
"SELECT id FROM alert_note_acknowledgements WHERE alert_note_id = %s AND user_id = %s",
(alert_id, user_id)
)
if existing:
return {"status": "already_acknowledged", "alert_id": alert_id}
# Insert acknowledgement
execute_update(
"""
INSERT INTO alert_note_acknowledgements (alert_note_id, user_id)
VALUES (%s, %s)
""",
(alert_id, user_id)
)
logger.info(f"Alert {alert_id} acknowledged by user {user_id}")
return {"status": "acknowledged", "alert_id": alert_id}
@router.get("/alert-notes", response_model=List[AlertNoteFull])
async def list_alerts(
entity_type: Optional[EntityType] = Query(None),
entity_id: Optional[int] = Query(None),
severity: Optional[Severity] = Query(None),
active: Optional[bool] = Query(None),
limit: int = Query(default=50, ge=1, le=500),
offset: int = Query(default=0, ge=0),
current_user: dict = Depends(require_permission("alert_notes.view"))
):
"""
List alert notes with filtering (admin endpoint).
Requires alert_notes.view permission.
"""
conditions = []
params = []
if entity_type:
conditions.append("an.entity_type = %s")
params.append(entity_type.value)
if entity_id:
conditions.append("an.entity_id = %s")
params.append(entity_id)
if severity:
conditions.append("an.severity = %s")
params.append(severity.value)
if active is not None:
conditions.append("an.active = %s")
params.append(active)
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
query = f"""
SELECT an.*, u.full_name as created_by_user_name
FROM alert_notes an
LEFT JOIN users u ON an.created_by_user_id = u.user_id
{where_clause}
ORDER BY an.created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([limit, offset])
alerts = execute_query(query, tuple(params))
# Enrich with relations
enriched_alerts = []
for alert in alerts:
alert_full = _get_alert_with_relations(alert["id"], current_user)
if alert_full:
enriched_alerts.append(alert_full)
return enriched_alerts
@router.post("/alert-notes", response_model=AlertNoteFull)
async def create_alert(
alert: AlertNoteCreate,
current_user: dict = Depends(require_permission("alert_notes.create"))
):
"""
Create a new alert note.
Requires alert_notes.create permission.
"""
# Verify entity exists
entity_name = _get_entity_name(alert.entity_type.value, alert.entity_id)
if not entity_name:
raise HTTPException(
status_code=404,
detail=f"{alert.entity_type.value.capitalize()} with ID {alert.entity_id} not found"
)
# Insert alert note
result = execute_query(
"""
INSERT INTO alert_notes (
entity_type, entity_id, title, message, severity,
requires_acknowledgement, active, created_by_user_id
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
alert.entity_type.value, alert.entity_id, alert.title, alert.message,
alert.severity.value, alert.requires_acknowledgement, alert.active,
current_user["id"]
)
)
if not result or len(result) == 0:
raise HTTPException(status_code=500, detail="Failed to create alert note")
alert_id = result[0]["id"]
# Insert restrictions
for group_id in alert.restriction_group_ids:
execute_query(
"""
INSERT INTO alert_note_restrictions (alert_note_id, restriction_type, restriction_id)
VALUES (%s, 'group', %s)
""",
(alert_id, group_id)
)
for user_id in alert.restriction_user_ids:
execute_query(
"""
INSERT INTO alert_note_restrictions (alert_note_id, restriction_type, restriction_id)
VALUES (%s, 'user', %s)
""",
(alert_id, user_id)
)
logger.info(f"Alert note {alert_id} created for {alert.entity_type.value} {alert.entity_id} by user {current_user['id']}")
# Return full alert with relations
alert_full = _get_alert_with_relations(alert_id, current_user)
return alert_full
@router.patch("/alert-notes/{alert_id}", response_model=AlertNoteFull)
async def update_alert(
alert_id: int,
alert_update: AlertNoteUpdate,
current_user: dict = Depends(require_permission("alert_notes.edit"))
):
"""
Update an existing alert note.
Requires alert_notes.edit permission.
"""
# Check if alert exists
existing = execute_query(
"SELECT id FROM alert_notes WHERE id = %s",
(alert_id,)
)
if not existing:
raise HTTPException(status_code=404, detail="Alert note not found")
# Build update query
update_fields = []
params = []
if alert_update.title is not None:
update_fields.append("title = %s")
params.append(alert_update.title)
if alert_update.message is not None:
update_fields.append("message = %s")
params.append(alert_update.message)
if alert_update.severity is not None:
update_fields.append("severity = %s")
params.append(alert_update.severity.value)
if alert_update.requires_acknowledgement is not None:
update_fields.append("requires_acknowledgement = %s")
params.append(alert_update.requires_acknowledgement)
if alert_update.active is not None:
update_fields.append("active = %s")
params.append(alert_update.active)
if update_fields:
query = f"UPDATE alert_notes SET {', '.join(update_fields)} WHERE id = %s"
params.append(alert_id)
execute_update(query, tuple(params))
# Update restrictions if provided
if alert_update.restriction_group_ids is not None or alert_update.restriction_user_ids is not None:
# Delete existing restrictions
execute_update(
"DELETE FROM alert_note_restrictions WHERE alert_note_id = %s",
(alert_id,)
)
# Insert new group restrictions
if alert_update.restriction_group_ids is not None:
for group_id in alert_update.restriction_group_ids:
execute_update(
"""
INSERT INTO alert_note_restrictions (alert_note_id, restriction_type, restriction_id)
VALUES (%s, 'group', %s)
""",
(alert_id, group_id)
)
# Insert new user restrictions
if alert_update.restriction_user_ids is not None:
for user_id in alert_update.restriction_user_ids:
execute_update(
"""
INSERT INTO alert_note_restrictions (alert_note_id, restriction_type, restriction_id)
VALUES (%s, 'user', %s)
""",
(alert_id, user_id)
)
logger.info(f"Alert note {alert_id} updated by user {current_user['id']}")
# Return updated alert
alert_full = _get_alert_with_relations(alert_id, current_user)
return alert_full
@router.delete("/alert-notes/{alert_id}")
async def delete_alert(
alert_id: int,
current_user: dict = Depends(require_permission("alert_notes.delete"))
):
"""
Soft delete an alert note (sets active = false).
Requires alert_notes.delete permission.
"""
# Check if alert exists
existing = execute_query(
"SELECT id, active FROM alert_notes WHERE id = %s",
(alert_id,)
)
if not existing:
raise HTTPException(status_code=404, detail="Alert note not found")
# Soft delete
execute_update(
"UPDATE alert_notes SET active = FALSE WHERE id = %s",
(alert_id,)
)
logger.info(f"Alert note {alert_id} deactivated by user {current_user['id']}")
return {"status": "deleted", "alert_id": alert_id}