bmc_hub/app/dashboard/backend/mission_router.py

436 lines
16 KiB
Python

import json
import logging
from datetime import datetime
from typing import Any, Dict, Optional
from fastapi import APIRouter, HTTPException, Query, Request, WebSocket, WebSocketDisconnect
from pydantic import BaseModel, Field
from app.core.auth_service import AuthService
from app.core.config import settings
from app.core.database import execute_query, execute_query_single
from .mission_service import MissionService
from .mission_ws import mission_ws_manager
logger = logging.getLogger(__name__)
router = APIRouter()
class MissionCallEvent(BaseModel):
call_id: str = Field(..., min_length=1, max_length=128)
caller_number: Optional[str] = None
queue_name: Optional[str] = None
timestamp: Optional[datetime] = None
class MissionUptimeWebhook(BaseModel):
status: Optional[str] = None
service_name: Optional[str] = None
customer_name: Optional[str] = None
timestamp: Optional[datetime] = None
payload: Dict[str, Any] = Field(default_factory=dict)
def _first_query_param(request: Request, *names: str) -> Optional[str]:
for name in names:
value = request.query_params.get(name)
if value and str(value).strip():
return str(value).strip()
return None
def _parse_query_timestamp(request: Request) -> Optional[datetime]:
raw = _first_query_param(request, "timestamp", "time", "event_time")
if not raw:
return None
try:
return datetime.fromisoformat(raw.replace("Z", "+00:00"))
except Exception:
return None
def _event_from_query(request: Request) -> MissionCallEvent:
call_id = _first_query_param(request, "call_id", "callid", "id", "session_id", "uuid")
if not call_id:
raise HTTPException(status_code=400, detail="Missing call_id query parameter")
return MissionCallEvent(
call_id=call_id,
caller_number=_first_query_param(request, "caller_number", "caller", "from", "number", "phone"),
queue_name=_first_query_param(request, "queue_name", "queue", "group", "line"),
timestamp=_parse_query_timestamp(request),
)
def _get_webhook_token() -> str:
db_token = MissionService.get_setting_value("mission_webhook_token", "") or ""
env_token = (getattr(settings, "MISSION_WEBHOOK_TOKEN", "") or "").strip()
return db_token.strip() or env_token
def _validate_mission_webhook_token(request: Request, token: Optional[str] = None) -> None:
configured = _get_webhook_token()
if not configured:
logger.warning("Mission webhook token not configured for path=%s", request.url.path)
raise HTTPException(status_code=403, detail="Mission webhook token not configured")
candidate = token or request.headers.get("x-mission-token") or request.query_params.get("token")
if not candidate or candidate.strip() != configured:
logger.warning("Mission webhook forbidden for path=%s", request.url.path)
raise HTTPException(status_code=403, detail="Forbidden")
def _normalize_uptime_payload(payload: MissionUptimeWebhook) -> Dict[str, Any]:
raw = dict(payload.payload or {})
status_candidate = payload.status or raw.get("status") or raw.get("event")
if not status_candidate and isinstance(raw.get("monitor"), dict):
status_candidate = raw.get("monitor", {}).get("status")
service_name = payload.service_name or raw.get("service_name") or raw.get("monitor_name")
if not service_name and isinstance(raw.get("monitor"), dict):
service_name = raw.get("monitor", {}).get("name")
customer_name = payload.customer_name or raw.get("customer_name") or raw.get("customer")
timestamp = payload.timestamp or raw.get("timestamp")
status = str(status_candidate or "UNKNOWN").upper().strip()
if status not in {"UP", "DOWN", "DEGRADED"}:
status = "UNKNOWN"
return {
"status": status,
"service_name": str(service_name or "Unknown Service"),
"customer_name": str(customer_name or "").strip() or None,
"timestamp": timestamp,
"raw": raw,
}
@router.get("/mission/state")
async def get_mission_state():
return MissionService.get_state()
@router.websocket("/mission/ws")
async def mission_ws(websocket: WebSocket):
token = websocket.query_params.get("token")
auth_header = (websocket.headers.get("authorization") or "").strip()
if not token and auth_header.lower().startswith("bearer "):
token = auth_header.split(" ", 1)[1].strip()
if not token:
token = (websocket.cookies.get("access_token") or "").strip() or None
payload = AuthService.verify_token(token) if token else None
if not payload:
await websocket.close(code=1008)
return
await mission_ws_manager.connect(websocket)
try:
await mission_ws_manager.broadcast("mission_state", MissionService.get_state())
while True:
await websocket.receive_text()
except WebSocketDisconnect:
await mission_ws_manager.disconnect(websocket)
except Exception:
await mission_ws_manager.disconnect(websocket)
@router.post("/mission/webhook/telefoni/ringing")
async def mission_telefoni_ringing(event: MissionCallEvent, request: Request, token: Optional[str] = Query(None)):
_validate_mission_webhook_token(request, token)
logger.info(
"☎️ Mission webhook ringing call_id=%s caller=%s queue=%s method=%s",
event.call_id,
event.caller_number,
event.queue_name,
request.method,
)
timestamp = event.timestamp or datetime.utcnow()
context = MissionService.resolve_contact_context(event.caller_number)
queue_name = (event.queue_name or "Ukendt kø").strip()
execute_query(
"""
INSERT INTO mission_call_state (
call_id, queue_name, caller_number, contact_name, company_name, customer_tag,
state, started_at, answered_at, ended_at, updated_at, last_payload
)
VALUES (%s, %s, %s, %s, %s, %s, 'ringing', %s, NULL, NULL, NOW(), %s::jsonb)
ON CONFLICT (call_id)
DO UPDATE SET
queue_name = EXCLUDED.queue_name,
caller_number = EXCLUDED.caller_number,
contact_name = EXCLUDED.contact_name,
company_name = EXCLUDED.company_name,
customer_tag = EXCLUDED.customer_tag,
state = 'ringing',
ended_at = NULL,
answered_at = NULL,
started_at = LEAST(mission_call_state.started_at, EXCLUDED.started_at),
updated_at = NOW(),
last_payload = EXCLUDED.last_payload
""",
(
event.call_id,
queue_name,
event.caller_number,
context.get("contact_name"),
context.get("company_name"),
context.get("customer_tag"),
timestamp,
json.dumps(event.model_dump(mode="json")),
),
)
event_row = MissionService.insert_event(
event_type="incoming_call",
title=f"Indgående opkald i {queue_name}",
severity="warning",
source="telefoni",
customer_name=context.get("company_name"),
payload={
"call_id": event.call_id,
"queue_name": queue_name,
"caller_number": event.caller_number,
**context,
},
)
call_payload = {
"call_id": event.call_id,
"queue_name": queue_name,
"caller_number": event.caller_number,
**context,
"timestamp": timestamp,
}
await mission_ws_manager.broadcast("call_ringing", call_payload)
await mission_ws_manager.broadcast("live_feed_event", event_row)
await mission_ws_manager.broadcast("kpi_update", MissionService.get_kpis())
return {"status": "ok"}
@router.get("/mission/webhook/telefoni/ringing")
async def mission_telefoni_ringing_get(request: Request, token: Optional[str] = Query(None)):
_validate_mission_webhook_token(request, token)
# Allow token-only GET calls (no call payload) for phone webhook validation/ping.
if not _first_query_param(request, "call_id", "callid", "id", "session_id", "uuid"):
logger.info("☎️ Mission webhook ringing ping method=%s", request.method)
return {"status": "ok", "mode": "ping"}
event = _event_from_query(request)
return await mission_telefoni_ringing(event, request, token)
@router.post("/mission/webhook/telefoni/answered")
async def mission_telefoni_answered(event: MissionCallEvent, request: Request, token: Optional[str] = Query(None)):
_validate_mission_webhook_token(request, token)
logger.info(
"✅ Mission webhook answered call_id=%s caller=%s queue=%s method=%s",
event.call_id,
event.caller_number,
event.queue_name,
request.method,
)
execute_query(
"""
UPDATE mission_call_state
SET state = 'answered',
answered_at = COALESCE(answered_at, NOW()),
updated_at = NOW(),
last_payload = %s::jsonb
WHERE call_id = %s
""",
(json.dumps(event.model_dump(mode="json")), event.call_id),
)
event_row = MissionService.insert_event(
event_type="call_answered",
title="Opkald besvaret",
severity="info",
source="telefoni",
payload={"call_id": event.call_id, "queue_name": event.queue_name, "caller_number": event.caller_number},
)
await mission_ws_manager.broadcast("call_answered", {"call_id": event.call_id})
await mission_ws_manager.broadcast("live_feed_event", event_row)
return {"status": "ok"}
@router.get("/mission/webhook/telefoni/answered")
async def mission_telefoni_answered_get(request: Request, token: Optional[str] = Query(None)):
_validate_mission_webhook_token(request, token)
if not _first_query_param(request, "call_id", "callid", "id", "session_id", "uuid"):
logger.info("✅ Mission webhook answered ping method=%s", request.method)
return {"status": "ok", "mode": "ping"}
event = _event_from_query(request)
return await mission_telefoni_answered(event, request, token)
@router.post("/mission/webhook/telefoni/hangup")
async def mission_telefoni_hangup(event: MissionCallEvent, request: Request, token: Optional[str] = Query(None)):
_validate_mission_webhook_token(request, token)
logger.info(
"📴 Mission webhook hangup call_id=%s caller=%s queue=%s method=%s",
event.call_id,
event.caller_number,
event.queue_name,
request.method,
)
execute_query(
"""
UPDATE mission_call_state
SET state = 'hangup',
ended_at = NOW(),
updated_at = NOW(),
last_payload = %s::jsonb
WHERE call_id = %s
""",
(json.dumps(event.model_dump(mode="json")), event.call_id),
)
event_row = MissionService.insert_event(
event_type="call_ended",
title="Opkald afsluttet",
severity="info",
source="telefoni",
payload={"call_id": event.call_id, "queue_name": event.queue_name, "caller_number": event.caller_number},
)
await mission_ws_manager.broadcast("call_hangup", {"call_id": event.call_id})
await mission_ws_manager.broadcast("live_feed_event", event_row)
return {"status": "ok"}
@router.get("/mission/webhook/telefoni/hangup")
async def mission_telefoni_hangup_get(request: Request, token: Optional[str] = Query(None)):
_validate_mission_webhook_token(request, token)
if not _first_query_param(request, "call_id", "callid", "id", "session_id", "uuid"):
logger.info("📴 Mission webhook hangup ping method=%s", request.method)
return {"status": "ok", "mode": "ping"}
event = _event_from_query(request)
return await mission_telefoni_hangup(event, request, token)
@router.post("/mission/webhook/uptime")
async def mission_uptime_webhook(payload: MissionUptimeWebhook, request: Request, token: Optional[str] = Query(None)):
_validate_mission_webhook_token(request, token)
normalized = _normalize_uptime_payload(payload)
status = normalized["status"]
service_name = normalized["service_name"]
customer_name = normalized["customer_name"]
alert_key = MissionService.build_alert_key(service_name, customer_name)
current = execute_query_single("SELECT is_active, started_at FROM mission_uptime_alerts WHERE alert_key = %s", (alert_key,))
if status in {"DOWN", "DEGRADED"}:
started_at = (current or {}).get("started_at")
is_active = bool((current or {}).get("is_active"))
if not started_at or not is_active:
started_at = datetime.utcnow()
execute_query(
"""
INSERT INTO mission_uptime_alerts (
alert_key, service_name, customer_name, status, is_active, started_at, resolved_at,
updated_at, raw_payload, normalized_payload
)
VALUES (%s, %s, %s, %s, TRUE, %s, NULL, NOW(), %s::jsonb, %s::jsonb)
ON CONFLICT (alert_key)
DO UPDATE SET
status = EXCLUDED.status,
is_active = TRUE,
started_at = COALESCE(mission_uptime_alerts.started_at, EXCLUDED.started_at),
resolved_at = NULL,
updated_at = NOW(),
raw_payload = EXCLUDED.raw_payload,
normalized_payload = EXCLUDED.normalized_payload
""",
(
alert_key,
service_name,
customer_name,
status,
started_at,
json.dumps(payload.model_dump(mode="json")),
json.dumps(normalized, default=str),
),
)
event_type = "uptime_down" if status == "DOWN" else "uptime_degraded"
severity = "critical" if status == "DOWN" else "warning"
title = f"{service_name} er {status}"
elif status == "UP":
execute_query(
"""
INSERT INTO mission_uptime_alerts (
alert_key, service_name, customer_name, status, is_active, started_at, resolved_at,
updated_at, raw_payload, normalized_payload
)
VALUES (%s, %s, %s, %s, FALSE, NULL, NOW(), NOW(), %s::jsonb, %s::jsonb)
ON CONFLICT (alert_key)
DO UPDATE SET
status = EXCLUDED.status,
is_active = FALSE,
resolved_at = NOW(),
updated_at = NOW(),
raw_payload = EXCLUDED.raw_payload,
normalized_payload = EXCLUDED.normalized_payload
""",
(
alert_key,
service_name,
customer_name,
status,
json.dumps(payload.model_dump(mode="json")),
json.dumps(normalized, default=str),
),
)
event_type = "uptime_up"
severity = "success"
title = f"{service_name} er UP"
else:
event_type = "uptime_unknown"
severity = "info"
title = f"{service_name} status ukendt"
event_row = MissionService.insert_event(
event_type=event_type,
title=title,
severity=severity,
source="uptime",
customer_name=customer_name,
payload={"alert_key": alert_key, **normalized},
)
await mission_ws_manager.broadcast(
"uptime_alert",
{
"alert_key": alert_key,
"status": status,
"service_name": service_name,
"customer_name": customer_name,
"active_alerts": MissionService.get_active_alerts(),
},
)
await mission_ws_manager.broadcast("live_feed_event", event_row)
return {"status": "ok", "normalized": normalized}