bmc_hub/app/dashboard/backend/mission_router.py

856 lines
30 KiB
Python
Raw Normal View History

import json
import logging
import io
import time
from datetime import datetime
from typing import Any, Dict, Optional
from urllib.parse import urlparse
from fastapi import APIRouter, HTTPException, Query, Request, WebSocket, WebSocketDisconnect
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from app.core.auth_service import AuthService
from app.core.config import settings
from app.core.database import execute_query, execute_query_single
from .mission_service import MissionService
from .mission_ws import mission_ws_manager
logger = logging.getLogger(__name__)
router = APIRouter()
class MissionCallEvent(BaseModel):
call_id: str = Field(..., min_length=1, max_length=128)
caller_number: Optional[str] = None
queue_name: Optional[str] = None
timestamp: Optional[datetime] = None
class MissionUptimeWebhook(BaseModel):
status: Optional[str] = None
service_name: Optional[str] = None
customer_name: Optional[str] = None
timestamp: Optional[datetime] = None
payload: Dict[str, Any] = Field(default_factory=dict)
class MissionCameraConfigUpdate(BaseModel):
enabled: bool = False
camera_name: Optional[str] = None
feed_url: Optional[str] = None
spotlight_seconds: Optional[int] = 20
class MissionCameraMotionWebhook(BaseModel):
camera_name: Optional[str] = None
motion: Optional[bool] = True
event_type: Optional[str] = None
timestamp: Optional[datetime] = None
snapshot_url: Optional[str] = None
payload: Dict[str, Any] = Field(default_factory=dict)
class MissionAccessPinUpdate(BaseModel):
pin: str = Field(..., min_length=4, max_length=10)
class MissionTemperatureWebhook(BaseModel):
sensor_id: Optional[str] = None
sensor_name: Optional[str] = None
temperature: float
unit: Optional[str] = "°C"
timestamp: Optional[datetime] = None
payload: Dict[str, Any] = Field(default_factory=dict)
def _first_query_param(request: Request, *names: str) -> Optional[str]:
for name in names:
value = request.query_params.get(name)
if value and str(value).strip():
return str(value).strip()
return None
def _parse_query_timestamp(request: Request) -> Optional[datetime]:
raw = _first_query_param(request, "timestamp", "time", "event_time")
if not raw:
return None
try:
return datetime.fromisoformat(raw.replace("Z", "+00:00"))
except Exception:
return None
def _event_from_query(request: Request) -> MissionCallEvent:
call_id = _first_query_param(request, "call_id", "callid", "id", "session_id", "uuid")
if not call_id:
logger.warning(
"⚠️ Mission webhook invalid query path=%s reason=missing_call_id keys=%s",
request.url.path,
",".join(sorted(request.query_params.keys())),
)
raise HTTPException(status_code=400, detail="Missing call_id query parameter")
return MissionCallEvent(
call_id=call_id,
caller_number=_first_query_param(request, "caller_number", "caller", "from", "number", "phone"),
queue_name=_first_query_param(request, "queue_name", "queue", "group", "line"),
timestamp=_parse_query_timestamp(request),
)
def _get_webhook_token() -> str:
db_token = MissionService.get_setting_value("mission_webhook_token", "") or ""
env_token = (getattr(settings, "MISSION_WEBHOOK_TOKEN", "") or "").strip()
return db_token.strip() or env_token
def _validate_mission_webhook_token(request: Request, token: Optional[str] = None) -> None:
configured = _get_webhook_token()
path = request.url.path
if not configured:
logger.warning("❌ Mission webhook rejected path=%s reason=token_not_configured", path)
raise HTTPException(status_code=403, detail="Mission webhook token not configured")
candidate = token or request.headers.get("x-mission-token") or request.query_params.get("token")
if not candidate or candidate.strip() != configured:
source = "query_or_arg"
if not token and request.headers.get("x-mission-token"):
source = "header"
masked = "<empty>"
if candidate:
c = candidate.strip()
masked = "***" if len(c) <= 8 else f"{c[:4]}...{c[-4:]}"
logger.warning(
"❌ Mission webhook forbidden path=%s reason=token_mismatch source=%s token=%s",
path,
source,
masked,
)
raise HTTPException(status_code=403, detail="Forbidden")
def _normalize_uptime_payload(payload: MissionUptimeWebhook) -> Dict[str, Any]:
raw = dict(payload.payload or {})
status_candidate = payload.status or raw.get("status") or raw.get("event")
if not status_candidate and isinstance(raw.get("monitor"), dict):
status_candidate = raw.get("monitor", {}).get("status")
service_name = payload.service_name or raw.get("service_name") or raw.get("monitor_name")
if not service_name and isinstance(raw.get("monitor"), dict):
service_name = raw.get("monitor", {}).get("name")
customer_name = payload.customer_name or raw.get("customer_name") or raw.get("customer")
timestamp = payload.timestamp or raw.get("timestamp")
status = str(status_candidate or "UNKNOWN").upper().strip()
if status not in {"UP", "DOWN", "DEGRADED"}:
status = "UNKNOWN"
return {
"status": status,
"service_name": str(service_name or "Unknown Service"),
"customer_name": str(customer_name or "").strip() or None,
"timestamp": timestamp,
"raw": raw,
}
def _is_valid_feed_url(candidate: Optional[str]) -> bool:
if not candidate:
return False
try:
parsed = urlparse(candidate.strip())
except Exception:
return False
return parsed.scheme in {"http", "https", "rtsp"} and bool(parsed.netloc)
def _require_authenticated_user(request: Request) -> Dict[str, Any]:
token = None
auth_header = (request.headers.get("authorization") or "").strip()
if auth_header.lower().startswith("bearer "):
token = auth_header.split(" ", 1)[1].strip()
if not token:
token = (request.cookies.get("access_token") or "").strip()
payload = AuthService.verify_token(token) if token else None
if not payload or payload.get("scope") == "mission_pin":
raise HTTPException(status_code=401, detail="Not authenticated")
user_id = payload.get("sub") or payload.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Invalid token")
return payload
def _is_valid_access_pin(pin: str) -> bool:
return pin.isdigit() and 4 <= len(pin) <= 10
def _iter_mjpeg_frames(feed_url: str, target_fps: float = 5.0):
"""Transcode camera frames to MJPEG for browser playback."""
try:
import av
except Exception as exc:
logger.error("❌ PyAV import failed for camera stream: %s", exc)
raise HTTPException(status_code=503, detail="PyAV ikke installeret på serveren")
options = {
"rtsp_transport": "tcp",
"fflags": "nobuffer",
"flags": "low_delay",
"stimeout": "5000000",
}
boundary = b"frame"
frame_interval = 1.0 / max(1.0, float(target_fps))
last_emit = 0.0
container = None
try:
container = av.open(feed_url, options=options)
video_stream = next((s for s in container.streams if s.type == "video"), None)
if video_stream is None:
raise HTTPException(status_code=400, detail="Feed indeholder ingen video stream")
for frame in container.decode(video=0):
now = time.monotonic()
if now - last_emit < frame_interval:
continue
last_emit = now
image = frame.to_image()
buffer = io.BytesIO()
image.save(buffer, format="JPEG", quality=80)
jpeg = buffer.getvalue()
yield (
b"--" + boundary + b"\r\n"
+ b"Content-Type: image/jpeg\r\n"
+ f"Content-Length: {len(jpeg)}\r\n\r\n".encode("ascii")
+ jpeg
+ b"\r\n"
)
except HTTPException:
raise
except Exception as exc:
logger.error("❌ Camera MJPEG stream failed: %s", exc)
raise HTTPException(status_code=502, detail="Kunne ikke åbne kamera stream")
finally:
if container is not None:
try:
container.close()
except Exception:
pass
def _probe_camera_stream(feed_url: str) -> Dict[str, Any]:
"""Attempt opening and decoding one frame to provide actionable diagnostics."""
try:
import av
except Exception:
return {"ok": False, "detail": "PyAV ikke installeret på serveren"}
options = {
"rtsp_transport": "tcp",
"fflags": "nobuffer",
"flags": "low_delay",
"stimeout": "5000000",
}
container = None
try:
container = av.open(feed_url, options=options)
video_stream = next((s for s in container.streams if s.type == "video"), None)
if video_stream is None:
return {"ok": False, "detail": "Feed indeholder ingen video stream"}
frame_found = False
for _ in container.decode(video=0):
frame_found = True
break
if not frame_found:
return {"ok": False, "detail": "Ingen frames modtaget fra kamera"}
return {"ok": True, "detail": "Stream OK"}
except Exception as exc:
return {"ok": False, "detail": f"Kamera stream fejl: {exc}"}
finally:
if container is not None:
try:
container.close()
except Exception:
pass
@router.get("/mission/state")
async def get_mission_state():
return MissionService.get_state()
@router.get("/mission/camera/mjpeg")
async def mission_camera_mjpeg_stream(fps: float = Query(5.0, ge=1.0, le=15.0)):
feed_url = (MissionService.get_setting_value("mission_camera_feed_url", "") or "").strip()
enabled = str(MissionService.get_setting_value("mission_camera_enabled", "false")).lower() == "true"
if not enabled:
raise HTTPException(status_code=400, detail="Kamera feed er ikke aktiveret")
if not feed_url:
raise HTTPException(status_code=400, detail="Kamera feed URL mangler")
if not _is_valid_feed_url(feed_url):
raise HTTPException(status_code=400, detail="Ugyldig kamera feed URL")
return StreamingResponse(
_iter_mjpeg_frames(feed_url=feed_url, target_fps=fps),
media_type="multipart/x-mixed-replace; boundary=frame",
headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"},
)
@router.get("/mission/camera/status")
async def mission_camera_status():
feed_url = (MissionService.get_setting_value("mission_camera_feed_url", "") or "").strip()
enabled = str(MissionService.get_setting_value("mission_camera_enabled", "false")).lower() == "true"
if not enabled:
return {"ok": False, "detail": "Kamera feed er ikke aktiveret", "enabled": False}
if not feed_url:
return {"ok": False, "detail": "Kamera feed URL mangler", "enabled": True}
if not _is_valid_feed_url(feed_url):
return {"ok": False, "detail": "Ugyldig kamera feed URL", "enabled": True}
probe = _probe_camera_stream(feed_url)
return {
"ok": bool(probe.get("ok")),
"detail": probe.get("detail") or "Ukendt status",
"enabled": True,
"feed_scheme": feed_url.split(":", 1)[0].lower() if ":" in feed_url else "unknown",
}
@router.put("/mission/camera/config")
async def update_mission_camera_config(config: MissionCameraConfigUpdate):
feed_url = (config.feed_url or "").strip()
camera_name = (config.camera_name or "Mission Kamera").strip() or "Mission Kamera"
spotlight_seconds = int(config.spotlight_seconds or 20)
spotlight_seconds = max(5, min(spotlight_seconds, 120))
if feed_url and not _is_valid_feed_url(feed_url):
raise HTTPException(status_code=400, detail="Ugyldig feed URL. Brug rtsp/http/https")
execute_query(
"""
INSERT INTO settings (key, value, category, description, value_type, is_public)
VALUES
(%s, %s, 'mission', 'Enable one camera feed in Mission Control', 'boolean', true),
(%s, %s, 'mission', 'Camera name for Mission Control', 'string', true),
(%s, %s, 'mission', 'Camera feed URL for Mission Control', 'string', true),
(%s, %s, 'mission', 'Camera spotlight duration in seconds for motion events', 'integer', true)
ON CONFLICT (key)
DO UPDATE SET
value = EXCLUDED.value,
updated_at = CURRENT_TIMESTAMP
""",
(
"mission_camera_enabled",
"true" if config.enabled else "false",
"mission_camera_name",
camera_name,
"mission_camera_feed_url",
feed_url,
"mission_camera_spotlight_seconds",
str(spotlight_seconds),
),
)
await mission_ws_manager.broadcast("mission_state", MissionService.get_state())
return {
"status": "ok",
"camera": {
"enabled": config.enabled,
"camera_name": camera_name,
"feed_url": feed_url,
"spotlight_seconds": spotlight_seconds,
},
}
@router.put("/mission/access-pin")
async def update_mission_access_pin(request: Request, payload: MissionAccessPinUpdate):
_require_authenticated_user(request)
new_pin = (payload.pin or "").strip()
if not _is_valid_access_pin(new_pin):
raise HTTPException(status_code=400, detail="PIN skal være 4-10 cifre")
execute_query(
"""
INSERT INTO settings (key, value, category, description, value_type, is_public)
VALUES (%s, %s, 'mission', 'Access PIN for Mission Control kiosk mode', 'string', false)
ON CONFLICT (key)
DO UPDATE SET
value = EXCLUDED.value,
updated_at = CURRENT_TIMESTAMP
""",
("mission_access_pin", new_pin),
)
return {"status": "ok", "message": "Mission PIN opdateret"}
@router.websocket("/mission/ws")
async def mission_ws(websocket: WebSocket):
token = websocket.query_params.get("token")
auth_header = (websocket.headers.get("authorization") or "").strip()
if not token and auth_header.lower().startswith("bearer "):
token = auth_header.split(" ", 1)[1].strip()
payload = AuthService.verify_token(token) if token else None
if not payload:
access_cookie_token = (websocket.cookies.get("access_token") or "").strip() or None
payload = AuthService.verify_token(access_cookie_token) if access_cookie_token else None
if not payload:
mission_pin_cookie_token = (websocket.cookies.get("mission_pin_token") or "").strip() or None
payload = AuthService.verify_token(mission_pin_cookie_token) if mission_pin_cookie_token else None
if not payload:
await websocket.close(code=1008)
return
await mission_ws_manager.connect(websocket)
try:
await mission_ws_manager.broadcast("mission_state", MissionService.get_state())
while True:
await websocket.receive_text()
except WebSocketDisconnect:
await mission_ws_manager.disconnect(websocket)
except Exception:
await mission_ws_manager.disconnect(websocket)
@router.post("/mission/webhook/telefoni/ringing")
async def mission_telefoni_ringing(event: MissionCallEvent, request: Request, token: Optional[str] = Query(None)):
_validate_mission_webhook_token(request, token)
logger.info(
"☎️ Mission webhook ringing call_id=%s caller=%s queue=%s method=%s",
event.call_id,
event.caller_number,
event.queue_name,
request.method,
)
timestamp = event.timestamp or datetime.utcnow()
context = MissionService.resolve_contact_context(event.caller_number)
queue_name = (event.queue_name or "Ukendt kø").strip()
execute_query(
"""
INSERT INTO mission_call_state (
call_id, queue_name, caller_number, contact_name, company_name, customer_tag,
state, started_at, answered_at, ended_at, updated_at, last_payload
)
VALUES (%s, %s, %s, %s, %s, %s, 'ringing', %s, NULL, NULL, NOW(), %s::jsonb)
ON CONFLICT (call_id)
DO UPDATE SET
queue_name = EXCLUDED.queue_name,
caller_number = EXCLUDED.caller_number,
contact_name = EXCLUDED.contact_name,
company_name = EXCLUDED.company_name,
customer_tag = EXCLUDED.customer_tag,
state = 'ringing',
ended_at = NULL,
answered_at = NULL,
started_at = LEAST(mission_call_state.started_at, EXCLUDED.started_at),
updated_at = NOW(),
last_payload = EXCLUDED.last_payload
""",
(
event.call_id,
queue_name,
event.caller_number,
context.get("contact_name"),
context.get("company_name"),
context.get("customer_tag"),
timestamp,
json.dumps(event.model_dump(mode="json")),
),
)
event_row = MissionService.insert_event(
event_type="incoming_call",
title=f"Indgående opkald i {queue_name}",
severity="warning",
source="telefoni",
customer_name=context.get("company_name"),
payload={
"call_id": event.call_id,
"queue_name": queue_name,
"caller_number": event.caller_number,
**context,
},
)
call_payload = {
"call_id": event.call_id,
"queue_name": queue_name,
"caller_number": event.caller_number,
**context,
"timestamp": timestamp,
}
await mission_ws_manager.broadcast("call_ringing", call_payload)
await mission_ws_manager.broadcast("live_feed_event", event_row)
await mission_ws_manager.broadcast("kpi_update", MissionService.get_kpis())
return {"status": "ok"}
@router.get("/mission/webhook/telefoni/ringing")
async def mission_telefoni_ringing_get(request: Request, token: Optional[str] = Query(None)):
_validate_mission_webhook_token(request, token)
# Allow token-only GET calls (no call payload) for phone webhook validation/ping.
if not _first_query_param(request, "call_id", "callid", "id", "session_id", "uuid"):
logger.info("☎️ Mission webhook ringing ping method=%s", request.method)
return {"status": "ok", "mode": "ping"}
event = _event_from_query(request)
return await mission_telefoni_ringing(event, request, token)
@router.post("/mission/webhook/telefoni/answered")
async def mission_telefoni_answered(event: MissionCallEvent, request: Request, token: Optional[str] = Query(None)):
_validate_mission_webhook_token(request, token)
logger.info(
"✅ Mission webhook answered call_id=%s caller=%s queue=%s method=%s",
event.call_id,
event.caller_number,
event.queue_name,
request.method,
)
execute_query(
"""
UPDATE mission_call_state
SET state = 'answered',
answered_at = COALESCE(answered_at, NOW()),
updated_at = NOW(),
last_payload = %s::jsonb
WHERE call_id = %s
""",
(json.dumps(event.model_dump(mode="json")), event.call_id),
)
event_row = MissionService.insert_event(
event_type="call_answered",
title="Opkald besvaret",
severity="info",
source="telefoni",
payload={"call_id": event.call_id, "queue_name": event.queue_name, "caller_number": event.caller_number},
)
await mission_ws_manager.broadcast("call_answered", {"call_id": event.call_id})
await mission_ws_manager.broadcast("live_feed_event", event_row)
return {"status": "ok"}
@router.get("/mission/webhook/telefoni/answered")
async def mission_telefoni_answered_get(request: Request, token: Optional[str] = Query(None)):
_validate_mission_webhook_token(request, token)
if not _first_query_param(request, "call_id", "callid", "id", "session_id", "uuid"):
logger.info("✅ Mission webhook answered ping method=%s", request.method)
return {"status": "ok", "mode": "ping"}
event = _event_from_query(request)
return await mission_telefoni_answered(event, request, token)
@router.post("/mission/webhook/telefoni/hangup")
async def mission_telefoni_hangup(event: MissionCallEvent, request: Request, token: Optional[str] = Query(None)):
_validate_mission_webhook_token(request, token)
logger.info(
"📴 Mission webhook hangup call_id=%s caller=%s queue=%s method=%s",
event.call_id,
event.caller_number,
event.queue_name,
request.method,
)
execute_query(
"""
UPDATE mission_call_state
SET state = 'hangup',
ended_at = NOW(),
updated_at = NOW(),
last_payload = %s::jsonb
WHERE call_id = %s
""",
(json.dumps(event.model_dump(mode="json")), event.call_id),
)
event_row = MissionService.insert_event(
event_type="call_ended",
title="Opkald afsluttet",
severity="info",
source="telefoni",
payload={"call_id": event.call_id, "queue_name": event.queue_name, "caller_number": event.caller_number},
)
await mission_ws_manager.broadcast("call_hangup", {"call_id": event.call_id})
await mission_ws_manager.broadcast("live_feed_event", event_row)
return {"status": "ok"}
@router.get("/mission/webhook/telefoni/hangup")
async def mission_telefoni_hangup_get(request: Request, token: Optional[str] = Query(None)):
_validate_mission_webhook_token(request, token)
if not _first_query_param(request, "call_id", "callid", "id", "session_id", "uuid"):
logger.info("📴 Mission webhook hangup ping method=%s", request.method)
return {"status": "ok", "mode": "ping"}
event = _event_from_query(request)
return await mission_telefoni_hangup(event, request, token)
@router.post("/mission/webhook/uptime")
async def mission_uptime_webhook(payload: MissionUptimeWebhook, request: Request, token: Optional[str] = Query(None)):
_validate_mission_webhook_token(request, token)
normalized = _normalize_uptime_payload(payload)
status = normalized["status"]
service_name = normalized["service_name"]
customer_name = normalized["customer_name"]
alert_key = MissionService.build_alert_key(service_name, customer_name)
current = execute_query_single("SELECT is_active, started_at FROM mission_uptime_alerts WHERE alert_key = %s", (alert_key,))
if status in {"DOWN", "DEGRADED"}:
started_at = (current or {}).get("started_at")
is_active = bool((current or {}).get("is_active"))
if not started_at or not is_active:
started_at = datetime.utcnow()
execute_query(
"""
INSERT INTO mission_uptime_alerts (
alert_key, service_name, customer_name, status, is_active, started_at, resolved_at,
updated_at, raw_payload, normalized_payload
)
VALUES (%s, %s, %s, %s, TRUE, %s, NULL, NOW(), %s::jsonb, %s::jsonb)
ON CONFLICT (alert_key)
DO UPDATE SET
status = EXCLUDED.status,
is_active = TRUE,
started_at = COALESCE(mission_uptime_alerts.started_at, EXCLUDED.started_at),
resolved_at = NULL,
updated_at = NOW(),
raw_payload = EXCLUDED.raw_payload,
normalized_payload = EXCLUDED.normalized_payload
""",
(
alert_key,
service_name,
customer_name,
status,
started_at,
json.dumps(payload.model_dump(mode="json")),
json.dumps(normalized, default=str),
),
)
event_type = "uptime_down" if status == "DOWN" else "uptime_degraded"
severity = "critical" if status == "DOWN" else "warning"
title = f"{service_name} er {status}"
elif status == "UP":
execute_query(
"""
INSERT INTO mission_uptime_alerts (
alert_key, service_name, customer_name, status, is_active, started_at, resolved_at,
updated_at, raw_payload, normalized_payload
)
VALUES (%s, %s, %s, %s, FALSE, NULL, NOW(), NOW(), %s::jsonb, %s::jsonb)
ON CONFLICT (alert_key)
DO UPDATE SET
status = EXCLUDED.status,
is_active = FALSE,
resolved_at = NOW(),
updated_at = NOW(),
raw_payload = EXCLUDED.raw_payload,
normalized_payload = EXCLUDED.normalized_payload
""",
(
alert_key,
service_name,
customer_name,
status,
json.dumps(payload.model_dump(mode="json")),
json.dumps(normalized, default=str),
),
)
event_type = "uptime_up"
severity = "success"
title = f"{service_name} er UP"
else:
event_type = "uptime_unknown"
severity = "info"
title = f"{service_name} status ukendt"
event_row = MissionService.insert_event(
event_type=event_type,
title=title,
severity=severity,
source="uptime",
customer_name=customer_name,
payload={"alert_key": alert_key, **normalized},
)
await mission_ws_manager.broadcast(
"uptime_alert",
{
"alert_key": alert_key,
"status": status,
"service_name": service_name,
"customer_name": customer_name,
"active_alerts": MissionService.get_active_alerts(),
},
)
await mission_ws_manager.broadcast("live_feed_event", event_row)
return {"status": "ok", "normalized": normalized}
@router.post("/mission/webhook/camera/motion")
async def mission_camera_motion_webhook(
payload: MissionCameraMotionWebhook,
request: Request,
token: Optional[str] = Query(None),
):
_validate_mission_webhook_token(request, token)
raw_payload = dict(payload.payload or {})
motion_detected = bool(payload.motion)
if payload.event_type and str(payload.event_type).strip().lower() in {"no_motion", "idle", "clear"}:
motion_detected = False
camera_name = (payload.camera_name or MissionService.get_setting_value("mission_camera_name", "Mission Kamera") or "Mission Kamera").strip()
event_timestamp = payload.timestamp or datetime.utcnow()
event_timestamp_iso = event_timestamp.isoformat()
snapshot_url = (payload.snapshot_url or "").strip() or None
await mission_ws_manager.broadcast(
"camera_motion",
{
"camera_name": camera_name,
"motion": motion_detected,
"timestamp": event_timestamp_iso,
"snapshot_url": snapshot_url,
"payload": raw_payload,
},
)
return {
"status": "ok",
"camera_name": camera_name,
"motion": motion_detected,
}
@router.post("/mission/webhook/environment/temperature")
async def mission_environment_temperature_webhook(
payload: MissionTemperatureWebhook,
request: Request,
token: Optional[str] = Query(None),
):
_validate_mission_webhook_token(request, token)
sensor_id = (payload.sensor_id or "").strip() or None
sensor_name = (payload.sensor_name or "").strip() or sensor_id or "Temperatur"
unit = (payload.unit or "°C").strip() or "°C"
timestamp = payload.timestamp or datetime.utcnow()
raw_payload = dict(payload.payload or {})
reading = {
"sensor_id": sensor_id,
"sensor_name": sensor_name,
"temperature": float(payload.temperature),
"unit": unit,
"timestamp": timestamp.isoformat(),
"payload": raw_payload,
}
existing = MissionService.parse_json_setting("mission_environment_readings", [])
if not isinstance(existing, list):
existing = []
merged: list[Dict[str, Any]] = []
replaced = False
for item in existing:
if not isinstance(item, dict):
continue
item_sensor_id = str(item.get("sensor_id") or "").strip() or None
item_sensor_name = str(item.get("sensor_name") or "").strip()
# Keep one latest entry per sensor when possible.
if sensor_id and item_sensor_id == sensor_id and not replaced:
merged.append(reading)
replaced = True
continue
if (not sensor_id) and item_sensor_name and item_sensor_name == sensor_name and not replaced:
merged.append(reading)
replaced = True
continue
merged.append(item)
if not replaced:
merged.insert(0, reading)
merged = merged[:12]
execute_query(
"""
INSERT INTO settings (key, value, category, description, value_type, is_public)
VALUES (%s, %s, 'mission', 'Latest environment sensor readings for Mission Control', 'json', true)
ON CONFLICT (key)
DO UPDATE SET
value = EXCLUDED.value,
updated_at = CURRENT_TIMESTAMP
""",
("mission_environment_readings", json.dumps(merged, ensure_ascii=False)),
)
event_row = MissionService.insert_event(
event_type="environment_temperature",
title=f"Temperatur {sensor_name}: {payload.temperature:.1f}{unit}",
severity="info",
source="home_assistant",
payload=reading,
)
await mission_ws_manager.broadcast(
"mission_environment_temperature",
{"reading": reading, "environment_readings": merged},
)
if event_row:
await mission_ws_manager.broadcast("live_feed_event", event_row)
return {
"status": "ok",
"reading": reading,
"count": len(merged),
}