- Implemented a new bottom bar feature in `bottom-bar.js` that fetches and displays various notifications and statuses in real-time. - Added functions for handling visibility, state updates, and user interactions within the bottom bar. - Introduced WebSocket connection for real-time updates and fallback polling mechanism. - Created a manual testing script `test_manual.py` to validate API endpoints for the manual module. - Included tests for various paths to ensure expected responses from the server.
122 lines
4.3 KiB
Python
122 lines
4.3 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Request, WebSocket, WebSocketDisconnect
|
|
|
|
from app.core.auth_service import AuthService
|
|
from .service import get_active_timer, get_dashboard_status, get_notifications
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def _resolve_user_id_from_request(request: Request) -> Optional[int]:
|
|
user_id = getattr(request.state, "user_id", None)
|
|
if user_id is not None:
|
|
try:
|
|
return int(user_id)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
user_id_param = request.query_params.get("user_id")
|
|
if user_id_param:
|
|
try:
|
|
return int(user_id_param)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
return None
|
|
|
|
|
|
def _resolve_ws_payload(websocket: WebSocket) -> Optional[dict]:
|
|
token = websocket.query_params.get("token")
|
|
auth_header = (websocket.headers.get("authorization") or "").strip()
|
|
if not token and auth_header.lower().startswith("bearer "):
|
|
token = auth_header.split(" ", 1)[1].strip()
|
|
|
|
payload = AuthService.verify_token(token) if token else None
|
|
if not payload:
|
|
access_cookie_token = (websocket.cookies.get("access_token") or "").strip() or None
|
|
payload = AuthService.verify_token(access_cookie_token) if access_cookie_token else None
|
|
return payload
|
|
|
|
|
|
@router.get("/api/v1/dashboard/status")
|
|
async def get_dashboard_status_endpoint() -> dict:
|
|
return get_dashboard_status()
|
|
|
|
|
|
@router.get("/api/v1/timer/active")
|
|
async def get_active_timer_endpoint(request: Request) -> dict:
|
|
user_id = _resolve_user_id_from_request(request)
|
|
return get_active_timer(user_id)
|
|
|
|
|
|
@router.get("/api/v1/notifications")
|
|
async def get_notifications_endpoint(request: Request, limit: int = 20) -> dict:
|
|
user_id = _resolve_user_id_from_request(request)
|
|
return get_notifications(user_id, limit=limit)
|
|
|
|
|
|
@router.websocket("/api/v1/bottom-bar/ws")
|
|
async def bottom_bar_ws(websocket: WebSocket):
|
|
payload = _resolve_ws_payload(websocket)
|
|
if not payload:
|
|
await websocket.close(code=1008)
|
|
return
|
|
|
|
try:
|
|
user_id = int(payload.get("sub")) if payload.get("sub") is not None else None
|
|
except (TypeError, ValueError):
|
|
await websocket.close(code=1008)
|
|
return
|
|
|
|
await websocket.accept()
|
|
|
|
initial_status = get_dashboard_status()
|
|
initial_notifications = get_notifications(user_id, limit=20)
|
|
await websocket.send_json({"event": "status_delta", "data": initial_status})
|
|
await websocket.send_json({"event": "notification_delta", "data": initial_notifications})
|
|
|
|
last_status_json = json.dumps(initial_status, sort_keys=True, default=str)
|
|
last_notifications_json = json.dumps(initial_notifications, sort_keys=True, default=str)
|
|
last_timer_elapsed = -1
|
|
status_tick = 0
|
|
|
|
try:
|
|
while True:
|
|
timer = get_active_timer(user_id)
|
|
elapsed = int(timer.get("elapsed") or 0)
|
|
if elapsed != last_timer_elapsed:
|
|
await websocket.send_json({"event": "timer_tick", "data": timer})
|
|
last_timer_elapsed = elapsed
|
|
|
|
status_tick += 1
|
|
if status_tick >= 5:
|
|
status = get_dashboard_status()
|
|
notifications = get_notifications(user_id, limit=20)
|
|
|
|
status_json = json.dumps(status, sort_keys=True, default=str)
|
|
if status_json != last_status_json:
|
|
await websocket.send_json({"event": "status_delta", "data": status})
|
|
last_status_json = status_json
|
|
|
|
notifications_json = json.dumps(notifications, sort_keys=True, default=str)
|
|
if notifications_json != last_notifications_json:
|
|
await websocket.send_json({"event": "notification_delta", "data": notifications})
|
|
last_notifications_json = notifications_json
|
|
|
|
status_tick = 0
|
|
|
|
try:
|
|
await asyncio.wait_for(websocket.receive_text(), timeout=1.0)
|
|
except TimeoutError:
|
|
continue
|
|
except WebSocketDisconnect:
|
|
logger.info("Bottom bar websocket disconnected user_id=%s", user_id)
|
|
except Exception as exc:
|
|
logger.warning("Bottom bar websocket error user_id=%s error=%s", user_id, exc)
|