""" AnyDesk Remote Support Router REST API endpoints for managing remote support sessions """ import logging import json import re from uuid import uuid4 from typing import Optional from datetime import timedelta from fastapi import APIRouter, HTTPException from fastapi.responses import JSONResponse from app.models.schemas import ( AnyDeskSessionCreate, AnyDeskSession, AnyDeskSessionDetail, AnyDeskSessionHistory, AnyDeskSessionWithWorklog ) from app.services.anydesk import AnyDeskService from app.core.database import execute_query logger = logging.getLogger(__name__) router = APIRouter() anydesk_service = AnyDeskService() def _normalize_anydesk_id(raw_value: Optional[str]) -> str: """Normalize AnyDesk ID by stripping non-digits when possible.""" raw_text = str(raw_value or "").strip() digits_only = re.sub(r"\D", "", raw_text) return digits_only or raw_text def _ensure_case_exists(sag_id: int) -> dict: case_row = execute_query( "SELECT id, customer_id, titel FROM sag_sager WHERE id = %s AND deleted_at IS NULL", (sag_id,), ) if not case_row: raise HTTPException(status_code=404, detail="Case not found") return case_row[0] # ===================================================== # Session Management Endpoints # ===================================================== @router.post("/anydesk/start-session", response_model=AnyDeskSession, tags=["Remote Support"]) async def start_remote_session(session_data: AnyDeskSessionCreate): """ Start a new AnyDesk remote support session - **customer_id**: Required - Customer to provide support to - **contact_id**: Optional - Specific contact person - **sag_id**: Optional - Link to case/ticket for time tracking - **description**: Optional - Purpose of session - **created_by_user_id**: Optional - User initiating session Returns session details with access link for sharing with customer """ try: logger.info(f"🔗 Starting AnyDesk session for customer {session_data.customer_id}") # Verify customer exists cust_query = "SELECT id FROM customers WHERE id = %s" customer = execute_query(cust_query, (session_data.customer_id,)) if not customer: raise HTTPException(status_code=404, detail="Customer not found") # Verify contact exists if provided if session_data.contact_id: contact_query = "SELECT id FROM contacts WHERE id = %s" contact = execute_query(contact_query, (session_data.contact_id,)) if not contact: raise HTTPException(status_code=404, detail="Contact not found") # Verify sag exists if provided if session_data.sag_id: sag_query = "SELECT id FROM sag_sager WHERE id = %s" sag = execute_query(sag_query, (session_data.sag_id,)) if not sag: raise HTTPException(status_code=404, detail="Case not found") # Create session via AnyDesk service result = await anydesk_service.create_session( customer_id=session_data.customer_id, contact_id=session_data.contact_id, sag_id=session_data.sag_id, description=session_data.description, created_by_user_id=session_data.created_by_user_id ) if "error" in result: raise HTTPException(status_code=400, detail=result["error"]) return result except HTTPException: raise except Exception as e: logger.error(f"Error starting session: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/anydesk/sessions/{session_id}", response_model=AnyDeskSessionDetail, tags=["Remote Support"]) async def get_session_details(session_id: int): """ Get details of a specific AnyDesk session Includes current status, duration, and linked entities (contact, customer, case) """ try: query = """ SELECT s.id, s.anydesk_session_id, s.customer_id, s.contact_id, s.sag_id, s.session_link, s.status, s.started_at, s.ended_at, s.duration_minutes, s.created_by_user_id, s.created_at, s.updated_at, c.first_name || ' ' || c.last_name as contact_name, cust.name as customer_name, sag.titel as sag_title, u.full_name as created_by_user_name, s.device_info, s.metadata FROM anydesk_sessions s LEFT JOIN contacts c ON s.contact_id = c.id LEFT JOIN customers cust ON s.customer_id = cust.id LEFT JOIN sag_sager sag ON s.sag_id = sag.id LEFT JOIN users u ON s.created_by_user_id = u.user_id WHERE s.id = %s """ result = execute_query(query, (session_id,)) if not result: raise HTTPException(status_code=404, detail="Session not found") session = result[0] return AnyDeskSessionDetail(**session) except HTTPException: raise except Exception as e: logger.error(f"Error fetching session: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/anydesk/sessions/{session_id}/end", tags=["Remote Support"]) async def end_remote_session(session_id: int): """ End a remote support session and calculate duration Returns completed session with duration in minutes and hours """ try: logger.info(f"🛑 Ending AnyDesk session {session_id}") result = await anydesk_service.end_session(session_id) if "error" in result: raise HTTPException(status_code=400, detail=result["error"]) return JSONResponse(content=result) except HTTPException: raise except Exception as e: logger.error(f"Error ending session: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @router.patch("/anydesk/sessions/{session_id}", tags=["Remote Support"]) async def update_session(session_id: int, data: dict): """ Update a session — assign/re-assign to a sag, contact, or add notes. Accepted fields: sag_id, contact_id, customer_id, notes, status """ try: allowed = {"sag_id", "contact_id", "customer_id", "notes", "status"} updates = {k: v for k, v in data.items() if k in allowed} if not updates: raise HTTPException(status_code=400, detail="No valid fields to update") # Verify session exists existing = execute_query("SELECT id FROM anydesk_sessions WHERE id = %s", (session_id,)) if not existing: raise HTTPException(status_code=404, detail="Session not found") # Verify sag exists if provided if "sag_id" in updates and updates["sag_id"] is not None: sag = execute_query("SELECT id FROM sag_sager WHERE id = %s", (updates["sag_id"],)) if not sag: raise HTTPException(status_code=404, detail="Case not found") set_clauses = ", ".join([f"{k} = %s" for k in updates]) params = list(updates.values()) + [session_id] query = f""" UPDATE anydesk_sessions SET {set_clauses}, updated_at = NOW() WHERE id = %s RETURNING id, anydesk_session_id, customer_id, contact_id, sag_id, session_link, status, started_at, ended_at, duration_minutes, created_by_user_id, created_at, updated_at """ result = execute_query(query, tuple(params)) if not result: raise HTTPException(status_code=500, detail="Update failed") logger.info(f"✅ Updated AnyDesk session {session_id}: {list(updates.keys())}") return result[0] except HTTPException: raise except Exception as e: logger.error(f"Error updating session: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/anydesk/register-manual-session", tags=["Remote Support"]) async def register_manual_session(data: dict): """ Register a manual AnyDesk support session directly on a case. Expected payload: - customer_id (required) - sag_id (required) - anydesk_id (required) - assisted_device (required) - device_type (required, e.g. placebo/desktop/server) - contact_id (optional) - notes (optional) - created_by_user_id (optional) """ try: customer_id = data.get("customer_id") sag_id = data.get("sag_id") contact_id = data.get("contact_id") created_by_user_id = data.get("created_by_user_id") anydesk_id = str(data.get("anydesk_id") or "").strip() assisted_device = str(data.get("assisted_device") or "").strip() device_type = str(data.get("device_type") or "placebo").strip().lower() notes = str(data.get("notes") or "").strip() if not customer_id: raise HTTPException(status_code=400, detail="customer_id is required") if not sag_id: raise HTTPException(status_code=400, detail="sag_id is required") if not anydesk_id: raise HTTPException(status_code=400, detail="anydesk_id is required") if not assisted_device: raise HTTPException(status_code=400, detail="assisted_device is required") customer = execute_query("SELECT id FROM customers WHERE id = %s", (customer_id,)) if not customer: raise HTTPException(status_code=404, detail="Customer not found") sag = execute_query("SELECT id FROM sag_sager WHERE id = %s AND deleted_at IS NULL", (sag_id,)) if not sag: raise HTTPException(status_code=404, detail="Case not found") if contact_id: contact = execute_query("SELECT id FROM contacts WHERE id = %s", (contact_id,)) if not contact: raise HTTPException(status_code=404, detail="Contact not found") manual_external_id = f"manual-{uuid4().hex[:12]}" is_placeholder = device_type in {"placebo", "placeholder", "ukendt", "unknown"} device_info = { "to_id": anydesk_id, "customer_machine_id": anydesk_id, "assisted_device_name": assisted_device, "assisted_device_type": device_type, "is_placeholder_device": is_placeholder, "source": "manual_case_registration" } metadata = { "notes": notes, "source": "manual_case_registration" } insert_q = """ INSERT INTO anydesk_sessions ( anydesk_session_id, contact_id, customer_id, sag_id, session_link, device_info, created_by_user_id, started_at, ended_at, duration_minutes, status, metadata, created_at, updated_at ) VALUES ( %s, %s, %s, %s, NULL, %s::jsonb, %s, NOW(), NOW(), 0, 'completed', %s::jsonb, NOW(), NOW() ) RETURNING id, anydesk_session_id, customer_id, contact_id, sag_id, status, started_at """ created = execute_query( insert_q, ( manual_external_id, contact_id, customer_id, sag_id, json.dumps(device_info), created_by_user_id, json.dumps(metadata), ), ) if not created: raise HTTPException(status_code=500, detail="Failed to register session") comment_lines = [ f"đŸ–Ĩī¸ AnyDesk session registreret manuelt (AnyDesk ID: {anydesk_id})", f"Enhed: {assisted_device}", f"Type: {device_type}", ] if notes: comment_lines.append(f"Notat: {notes}") if is_placeholder: comment_lines.append("Info: Enhedstype er placeholder og kan linkes til hardware senere.") execute_query( """ INSERT INTO sag_kommentarer (sag_id, forfatter, indhold, er_system_besked) VALUES (%s, %s, %s, %s) """, (sag_id, "System", "\n".join(comment_lines), True), ) logger.info("✅ Manual AnyDesk session registered for case %s", sag_id) return {"ok": True, "session": created[0]} except HTTPException: raise except Exception as e: logger.error("Error registering manual AnyDesk session: %s", str(e)) raise HTTPException(status_code=500, detail=str(e)) @router.get("/anydesk/cases/{sag_id}/ids", tags=["Remote Support"]) async def list_case_anydesk_ids(sag_id: int): """List saved AnyDesk IDs for a case (multi-ID support).""" try: _ensure_case_exists(sag_id) rows = execute_query( """ SELECT sai.id, sai.sag_id, sai.anydesk_id, sai.hardware_asset_id, sai.is_primary, sai.note, sai.created_by_user_id, sai.created_at, sai.updated_at, h.brand, h.model, h.serial_number, h.anydesk_id AS hardware_anydesk_id FROM sag_anydesk_ids sai LEFT JOIN hardware_assets h ON h.id = sai.hardware_asset_id WHERE sai.sag_id = %s AND sai.deleted_at IS NULL ORDER BY sai.is_primary DESC, sai.updated_at DESC, sai.id DESC """, (sag_id,), ) or [] for row in rows: brand = (row.get("brand") or "").strip() model = (row.get("model") or "").strip() serial = (row.get("serial_number") or "").strip() fragments = [f for f in [brand, model] if f] if serial: fragments.append(f"SN: {serial}") row["hardware_label"] = " - ".join(fragments) if fragments else None return {"ids": rows} except HTTPException: raise except Exception as e: logger.error("Error listing case AnyDesk IDs: %s", e) raise HTTPException(status_code=500, detail="Could not load case AnyDesk IDs") @router.post("/anydesk/cases/{sag_id}/ids", tags=["Remote Support"]) async def upsert_case_anydesk_id(sag_id: int, data: dict): """Create or update a saved AnyDesk ID on a case.""" try: _ensure_case_exists(sag_id) anydesk_id = _normalize_anydesk_id(data.get("anydesk_id")) if not anydesk_id: raise HTTPException(status_code=400, detail="anydesk_id is required") hardware_asset_id = data.get("hardware_asset_id") created_by_user_id = data.get("created_by_user_id") note = (data.get("note") or "").strip() or None is_primary = bool(data.get("is_primary")) if hardware_asset_id is not None: asset = execute_query( "SELECT id FROM hardware_assets WHERE id = %s AND deleted_at IS NULL", (hardware_asset_id,), ) if not asset: raise HTTPException(status_code=404, detail="Hardware asset not found") row = execute_query( """ INSERT INTO sag_anydesk_ids ( sag_id, anydesk_id, hardware_asset_id, is_primary, note, created_by_user_id, deleted_at ) VALUES (%s, %s, %s, %s, %s, %s, NULL) ON CONFLICT (sag_id, anydesk_id) DO UPDATE SET hardware_asset_id = EXCLUDED.hardware_asset_id, is_primary = EXCLUDED.is_primary, note = COALESCE(EXCLUDED.note, sag_anydesk_ids.note), created_by_user_id = COALESCE(EXCLUDED.created_by_user_id, sag_anydesk_ids.created_by_user_id), deleted_at = NULL, updated_at = NOW() RETURNING id, sag_id, anydesk_id, hardware_asset_id, is_primary, note, created_by_user_id, created_at, updated_at """, (sag_id, anydesk_id, hardware_asset_id, is_primary, note, created_by_user_id), ) if not row: raise HTTPException(status_code=500, detail="Could not save AnyDesk ID") if is_primary: execute_query( """ UPDATE sag_anydesk_ids SET is_primary = FALSE, updated_at = NOW() WHERE sag_id = %s AND id != %s AND deleted_at IS NULL """, (sag_id, row[0]["id"]), ) return {"ok": True, "entry": row[0]} except HTTPException: raise except Exception as e: logger.error("Error upserting case AnyDesk ID: %s", e) raise HTTPException(status_code=500, detail="Could not save case AnyDesk ID") @router.delete("/anydesk/cases/{sag_id}/ids/{entry_id}", tags=["Remote Support"]) async def delete_case_anydesk_id(sag_id: int, entry_id: int): """Soft-delete a saved AnyDesk ID from a case.""" try: _ensure_case_exists(sag_id) result = execute_query( """ UPDATE sag_anydesk_ids SET deleted_at = NOW(), updated_at = NOW(), is_primary = FALSE WHERE id = %s AND sag_id = %s AND deleted_at IS NULL RETURNING id """, (entry_id, sag_id), ) if not result: raise HTTPException(status_code=404, detail="AnyDesk ID entry not found") return {"ok": True} except HTTPException: raise except Exception as e: logger.error("Error deleting case AnyDesk ID: %s", e) raise HTTPException(status_code=500, detail="Could not delete case AnyDesk ID") @router.get("/anydesk/cases/{sag_id}/hardware-options", tags=["Remote Support"]) async def get_case_anydesk_hardware_options(sag_id: int): """Get hardware options: case-linked assets first, then customer fallback assets.""" try: case_row = _ensure_case_exists(sag_id) customer_id = case_row.get("customer_id") case_hardware = execute_query( """ SELECT h.id, h.brand, h.model, h.serial_number, h.anydesk_id, h.current_owner_customer_id FROM sag_hardware sh JOIN hardware_assets h ON h.id = sh.hardware_id WHERE sh.sag_id = %s AND sh.deleted_at IS NULL AND h.deleted_at IS NULL ORDER BY h.brand, h.model, h.id """, (sag_id,), ) or [] case_ids = {row["id"] for row in case_hardware} customer_hardware = [] if customer_id: customer_hardware = execute_query( """ SELECT h.id, h.brand, h.model, h.serial_number, h.anydesk_id, h.current_owner_customer_id FROM hardware_assets h WHERE h.deleted_at IS NULL AND h.current_owner_customer_id = %s ORDER BY h.brand, h.model, h.id """, (customer_id,), ) or [] customer_hardware = [row for row in customer_hardware if row["id"] not in case_ids] def _with_label(row: dict) -> dict: brand = (row.get("brand") or "").strip() model = (row.get("model") or "").strip() serial = (row.get("serial_number") or "").strip() aid = (row.get("anydesk_id") or "").strip() parts = [p for p in [brand, model] if p] if serial: parts.append(f"SN: {serial}") if aid: parts.append(f"AD: {aid}") row["label"] = " - ".join(parts) if parts else f"Hardware #{row.get('id')}" return row return { "case_hardware": [_with_label(row) for row in case_hardware], "customer_hardware": [_with_label(row) for row in customer_hardware], } except HTTPException: raise except Exception as e: logger.error("Error loading AnyDesk hardware options: %s", e) raise HTTPException(status_code=500, detail="Could not load hardware options") @router.post("/anydesk/cases/{sag_id}/connect", tags=["Remote Support"]) async def connect_case_anydesk(sag_id: int, data: dict): """ Quick connect flow for a case: - Save AnyDesk ID on case - Create local session row for enrichment - Return AnyDesk deep link for app open """ try: case_row = _ensure_case_exists(sag_id) customer_id = case_row.get("customer_id") if not customer_id: raise HTTPException(status_code=400, detail="Case has no customer") anydesk_id = _normalize_anydesk_id(data.get("anydesk_id")) if not anydesk_id: raise HTTPException(status_code=400, detail="anydesk_id is required") created_by_user_id = data.get("created_by_user_id") contact_id = data.get("contact_id") hardware_asset_id = data.get("hardware_asset_id") note = (data.get("note") or "").strip() or None make_primary = bool(data.get("make_primary", True)) if contact_id is not None: contact = execute_query("SELECT id FROM contacts WHERE id = %s", (contact_id,)) if not contact: raise HTTPException(status_code=404, detail="Contact not found") if hardware_asset_id is not None: asset = execute_query( "SELECT id FROM hardware_assets WHERE id = %s AND deleted_at IS NULL", (hardware_asset_id,), ) if not asset: raise HTTPException(status_code=404, detail="Hardware asset not found") case_anydesk_row = execute_query( """ INSERT INTO sag_anydesk_ids ( sag_id, anydesk_id, hardware_asset_id, is_primary, note, created_by_user_id, deleted_at ) VALUES (%s, %s, %s, %s, %s, %s, NULL) ON CONFLICT (sag_id, anydesk_id) DO UPDATE SET hardware_asset_id = EXCLUDED.hardware_asset_id, is_primary = EXCLUDED.is_primary, note = COALESCE(EXCLUDED.note, sag_anydesk_ids.note), created_by_user_id = COALESCE(EXCLUDED.created_by_user_id, sag_anydesk_ids.created_by_user_id), deleted_at = NULL, updated_at = NOW() RETURNING id """, (sag_id, anydesk_id, hardware_asset_id, make_primary, note, created_by_user_id), ) case_anydesk_id = case_anydesk_row[0]["id"] if case_anydesk_row else None if make_primary and case_anydesk_id: execute_query( """ UPDATE sag_anydesk_ids SET is_primary = FALSE, updated_at = NOW() WHERE sag_id = %s AND id != %s AND deleted_at IS NULL """, (sag_id, case_anydesk_id), ) manual_external_id = f"case-{uuid4().hex[:12]}" deep_link = f"anydesk:{anydesk_id}" # Idempotency guard: avoid creating duplicate rows when users double-click connect. recent_existing = execute_query( """ SELECT id, anydesk_session_id, customer_id, contact_id, sag_id, status, started_at, session_link FROM anydesk_sessions WHERE sag_id = %s AND status IN ('active', 'pending') AND ( COALESCE(device_info->>'to_id', '') = %s OR COALESCE(device_info->>'customer_machine_id', '') = %s ) AND started_at >= NOW() - INTERVAL '10 minutes' ORDER BY started_at DESC LIMIT 1 """, (sag_id, anydesk_id, anydesk_id), ) if recent_existing: existing = recent_existing[0] logger.info("â„šī¸ Reusing in-flight AnyDesk session for case %s (session %s)", sag_id, existing.get("id")) return { "ok": True, "already_registering": True, "deep_link": existing.get("session_link") or deep_link, "anydesk_id": anydesk_id, "session": { "id": existing.get("id"), "anydesk_session_id": existing.get("anydesk_session_id"), "customer_id": existing.get("customer_id"), "contact_id": existing.get("contact_id"), "sag_id": existing.get("sag_id"), "status": existing.get("status"), "started_at": existing.get("started_at"), }, "case_anydesk_id": case_anydesk_id, } device_info = { "to_id": anydesk_id, "customer_machine_id": anydesk_id, "hardware_asset_id": hardware_asset_id, "case_anydesk_id": case_anydesk_id, "source": "case_quick_connect", } metadata = { "note": note, "source": "case_quick_connect", "needs_local_sync_enrichment": True, } created = execute_query( """ INSERT INTO anydesk_sessions ( anydesk_session_id, contact_id, customer_id, sag_id, session_link, device_info, created_by_user_id, started_at, ended_at, duration_minutes, status, metadata, created_at, updated_at ) VALUES ( %s, %s, %s, %s, %s, %s::jsonb, %s, NOW(), NULL, NULL, 'active', %s::jsonb, NOW(), NOW() ) RETURNING id, anydesk_session_id, customer_id, contact_id, sag_id, status, started_at """, ( manual_external_id, contact_id, customer_id, sag_id, deep_link, json.dumps(device_info), created_by_user_id, json.dumps(metadata), ), ) if not created: raise HTTPException(status_code=500, detail="Failed to create AnyDesk session row") execute_query( """ INSERT INTO sag_kommentarer (sag_id, forfatter, indhold, er_system_besked) VALUES (%s, %s, %s, %s) """, ( sag_id, "System", f"đŸ–Ĩī¸ AnyDesk quick-connect startet (ID: {anydesk_id})", True, ), ) logger.info("✅ AnyDesk quick-connect prepared for case %s", sag_id) return { "ok": True, "deep_link": deep_link, "anydesk_id": anydesk_id, "session": created[0], "case_anydesk_id": case_anydesk_id, } except HTTPException: raise except Exception as e: logger.error("Error in AnyDesk quick-connect: %s", e) raise HTTPException(status_code=500, detail="Could not start AnyDesk quick-connect") @router.get("/anydesk/sessions", response_model=AnyDeskSessionHistory, tags=["Remote Support"]) async def get_session_history( contact_id: Optional[int] = None, customer_id: Optional[int] = None, sag_id: Optional[int] = None, limit: int = 50, offset: int = 0 ): """ Get session history filtered by contact, customer, or case At least one filter parameter should be provided. Results are paginated and sorted by date (newest first). - **contact_id**: Get all sessions for a specific contact - **customer_id**: Get all sessions for a company - **sag_id**: Get sessions linked to a specific case - **limit**: Number of sessions per page (default 50, max 100) - **offset**: Pagination offset """ try: if limit > 100: limit = 100 result = await anydesk_service.get_session_history( contact_id=contact_id, customer_id=customer_id, sag_id=sag_id, limit=limit, offset=offset ) if "error" in result: raise HTTPException(status_code=400, detail=result["error"]) # Enrich session data with contact/customer/user names enriched_sessions = [] for session in result.get("sessions", []): enriched_sessions.append(AnyDeskSessionDetail(**session)) return AnyDeskSessionHistory( sessions=enriched_sessions, total=result["total"], limit=limit, offset=offset ) except HTTPException: raise except Exception as e: logger.error(f"Error fetching session history: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # ===================================================== # Worklog Integration Endpoints # ===================================================== @router.get("/anydesk/sessions/{session_id}/worklog-suggestion", tags=["Remote Support", "Time Tracking"]) async def suggest_worklog_from_session(session_id: int): """ Get suggested worklog entry from a completed session Calculates billable hours from session duration and provides a pre-filled worklog suggestion for review/approval. The worklog still needs to be created separately via the timetracking/worklog endpoints after user approval. """ try: # Get session query = """ SELECT id, duration_minutes, customer_id, sag_id, contact_id, started_at, ended_at, status FROM anydesk_sessions WHERE id = %s """ result = execute_query(query, (session_id,)) if not result: raise HTTPException(status_code=404, detail="Session not found") session = result[0] if session["status"] != "completed": raise HTTPException( status_code=400, detail=f"Cannot suggest worklog for non-completed session (status: {session['status']})" ) if not session["duration_minutes"]: raise HTTPException( status_code=400, detail="Session duration not calculated yet" ) # Build worklog suggestion duration_hours = round(session["duration_minutes"] / 60, 2) suggestion = { "session_id": session_id, "duration_minutes": session["duration_minutes"], "duration_hours": duration_hours, "start_time": str(session["started_at"]), "end_time": str(session["ended_at"]), "description": f"Remote support session via AnyDesk", "work_type": "remote_support", "billable": True, "linked_to": { "customer_id": session["customer_id"], "contact_id": session["contact_id"], "sag_id": session["sag_id"] } } logger.info(f"Generated worklog suggestion for session {session_id}: {duration_hours}h") return JSONResponse(content=suggestion) except HTTPException: raise except Exception as e: logger.error(f"Error generating worklog suggestion: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # ===================================================== # Status & Analytics Endpoints # ===================================================== @router.get("/anydesk/stats", tags=["Remote Support", "Analytics"]) async def get_anydesk_stats(): """ Get AnyDesk integration statistics - Total sessions today/this week/this month - Active sessions count - Average session duration - Most supported customers """ try: stats = { "sessions_today": 0, "sessions_this_week": 0, "sessions_this_month": 0, "active_sessions": 0, "average_duration_minutes": 0, "total_support_hours": 0.0 } # Get today's sessions query = """ SELECT COUNT(*) as count FROM anydesk_sessions WHERE DATE(started_at) = CURRENT_DATE """ result = execute_query(query) stats["sessions_today"] = result[0]["count"] if result else 0 # Get this week's sessions query = """ SELECT COUNT(*) as count FROM anydesk_sessions WHERE started_at >= CURRENT_DATE - INTERVAL '7 days' """ result = execute_query(query) stats["sessions_this_week"] = result[0]["count"] if result else 0 # Get this month's sessions query = """ SELECT COUNT(*) as count FROM anydesk_sessions WHERE started_at >= DATE_TRUNC('month', CURRENT_DATE) """ result = execute_query(query) stats["sessions_this_month"] = result[0]["count"] if result else 0 # Get active sessions query = """ SELECT COUNT(*) as count FROM anydesk_sessions WHERE status = 'active' """ result = execute_query(query) stats["active_sessions"] = result[0]["count"] if result else 0 # Get average duration query = """ SELECT AVG(duration_minutes) as avg_duration FROM anydesk_sessions WHERE status = 'completed' AND duration_minutes IS NOT NULL """ result = execute_query(query) stats["average_duration_minutes"] = round(result[0]["avg_duration"], 1) if result and result[0]["avg_duration"] else 0 # Get total support hours this month query = """ SELECT SUM(duration_minutes) as total_minutes FROM anydesk_sessions WHERE status = 'completed' AND started_at >= DATE_TRUNC('month', CURRENT_DATE) """ result = execute_query(query) total_minutes = result[0]["total_minutes"] if result and result[0]["total_minutes"] else 0 stats["total_support_hours"] = round(total_minutes / 60, 2) return JSONResponse(content=stats) except Exception as e: logger.error(f"Error calculating stats: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/anydesk/health", tags=["Remote Support", "Health"]) async def anydesk_health_check(): """ Health check for AnyDesk integration Returns configuration status, API connectivity, and last sync time """ creds = anydesk_service._get_credentials() return JSONResponse(content={ "service": "AnyDesk Remote Support", "status": "operational", "configured": bool(creds["api_token"] and creds["license_id"]), "dry_run_mode": creds["dry_run"], "read_only_mode": creds["read_only"], "auto_start_enabled": anydesk_service.auto_start }) @router.post("/anydesk/fetch-from-api", tags=["Remote Support"]) async def fetch_sessions_from_anydesk( days: int = 30, limit: int = 1000, after: Optional[str] = None, before: Optional[str] = None, ): """ Pull session log from the live AnyDesk REST API and import into local DB. - **days**: How many days back to fetch (default 30) - **limit**: Max entries to fetch (default 1000) - **after**: Override start as ISO-8601 timestamp (e.g. 2024-01-01T00:00:00Z) - **before**: Override end as ISO-8601 timestamp Requires `dry_run=false` in AnyDesk settings. Auth uses HMAC-SHA1 (AnyDesk native format), not Bearer token. Returns count of newly imported and updated sessions. """ try: result = await anydesk_service.fetch_sessions_from_api( days=days, limit=limit, after=after, before=before, ) if "error" in result: raise HTTPException(status_code=400, detail=result["error"]) return JSONResponse(content=result) except HTTPException: raise except Exception as e: logger.error(f"Error fetching from AnyDesk API: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # ===================================================== # Sessions Dashboard Endpoints # ===================================================== @router.get("/anydesk/sessions-overview", tags=["Remote Support"]) async def sessions_overview( days: int = 90, unregistered_only: bool = False, limit: int = 200, offset: int = 0, ): """ Enriched session list for the dashboard page. Joins hardware_assets (via remote_id/anydesk_id), contacts, customers, sag. """ try: where = "WHERE s.started_at >= NOW() - INTERVAL '%s days'" % int(days) if unregistered_only: where += " AND s.sag_id IS NULL AND s.contact_id IS NULL AND s.hardware_asset_id IS NULL" query = f""" SELECT s.id, s.anydesk_session_id, s.started_at, s.ended_at, s.duration_minutes, s.status, s.notes, -- linked hardware s.hardware_asset_id, ha.brand AS hw_brand, ha.model AS hw_model, ha.anydesk_id AS hw_anydesk_id, ha.current_owner_customer_id AS hw_customer_id, -- linked contact s.contact_id, c.first_name || ' ' || c.last_name AS contact_name, c.email AS contact_email, -- linked customer s.customer_id, cust.name AS customer_name, -- linked sag s.sag_id, sag.titel AS sag_titel, sag.status AS sag_status, -- raw remote_id from import (s.device_info->>'remote_id')::TEXT AS remote_id, (s.device_info->>'remote_alias')::TEXT AS remote_alias, (s.device_info->>'from_id')::TEXT AS technician_id, (s.device_info->>'to_id')::TEXT AS customer_machine_id, (s.device_info->>'local_alias')::TEXT AS customer_alias, -- technician resolved from user_anydesk_ids tech_u.user_id AS tech_user_id, COALESCE(tech_u.full_name, tech_u.username) AS tech_name FROM anydesk_sessions s LEFT JOIN hardware_assets ha ON s.hardware_asset_id = ha.id LEFT JOIN contacts c ON s.contact_id = c.id LEFT JOIN customers cust ON s.customer_id = cust.id LEFT JOIN sag_sager sag ON s.sag_id = sag.id LEFT JOIN user_anydesk_ids uad ON regexp_replace(COALESCE(uad.anydesk_id, ''), '[^0-9]', '', 'g') = regexp_replace(COALESCE((s.device_info->>'from_id')::TEXT, ''), '[^0-9]', '', 'g') LEFT JOIN users tech_u ON tech_u.user_id = uad.user_id {where} ORDER BY s.started_at DESC LIMIT {int(limit)} OFFSET {int(offset)} """ rows = execute_query(query) # count count_q = f"SELECT COUNT(*) AS total FROM anydesk_sessions s {where}" total = (execute_query(count_q) or [{"total": 0}])[0]["total"] def _is_synthetic_session_id(session_id: Optional[str]) -> bool: sid = str(session_id or "") return sid.startswith("case-") or sid.startswith("manual-") or sid.startswith("local-") def _dedupe_key(row: dict) -> str: # Prefer stable external AnyDesk session IDs when they are not synthetic. sid = str(row.get("anydesk_session_id") or "").strip() if sid and not _is_synthetic_session_id(sid): return f"sid:{sid}" machine_id = str(row.get("customer_machine_id") or row.get("remote_id") or "").strip() tech_id = str(row.get("technician_id") or "").strip() # Use 15-minute buckets to collapse short-lived duplicates from hub/import/local sync. started = row.get("started_at") if started: minutes = int(started.timestamp() // (15 * 60)) bucket = str(minutes) else: bucket = "unknown" return f"heur:{machine_id}:{tech_id}:{bucket}" def _row_score(row: dict) -> int: score = 0 if row.get("sag_id"): score += 200 if row.get("customer_id"): score += 120 if row.get("contact_id"): score += 100 if row.get("hardware_asset_id"): score += 80 duration = row.get("duration_minutes") if duration is not None: try: if float(duration) > 0: score += 60 else: score += 20 except Exception: score += 10 if row.get("ended_at"): score += 25 if not _is_synthetic_session_id(row.get("anydesk_session_id")): score += 40 return score def _backfill_enrichment(winner: dict, loser: dict) -> None: """Copy missing enriched fields from loser into winner.""" for field in ( "tech_name", "technician_id", "remote_alias", "remote_id", "customer_machine_id", "customer_alias", "contact_id", "contact_name", "contact_email", "customer_id", "customer_name", "sag_id", "sag_titel", "sag_status", "hardware_asset_id", "hw_brand", "hw_model", "hw_anydesk_id", "hw_customer_id", "notes", ): if not winner.get(field) and loser.get(field): winner[field] = loser.get(field) deduped_rows: dict[str, dict] = {} for row in (rows or []): key = _dedupe_key(row) if key not in deduped_rows: deduped_rows[key] = row continue current = deduped_rows[key] current_score = _row_score(current) incoming_score = _row_score(row) if incoming_score > current_score: winner = row other = current else: winner = current other = row # Merge useful enrichment from the losing row into the winner. if not winner.get("duration_minutes") and other.get("duration_minutes") is not None: winner["duration_minutes"] = other.get("duration_minutes") if not winner.get("ended_at") and other.get("ended_at"): winner["ended_at"] = other.get("ended_at") if (winner.get("status") in (None, "", "active", "pending")) and other.get("status"): if other.get("status") in ("completed", "failed", "cancelled"): winner["status"] = other.get("status") if not winner.get("notes") and other.get("notes"): winner["notes"] = other.get("notes") _backfill_enrichment(winner, other) deduped_rows[key] = winner # Second pass: merge neighboring buckets for same machine ID when timestamps are close. # This catches duplicates where one row lands in an adjacent 15-minute bucket. second_pass_rows = list(deduped_rows.values()) second_pass_rows.sort(key=lambda item: item.get("started_at") or "", reverse=True) clustered: list[dict] = [] for row in second_pass_rows: row_machine = str(row.get("customer_machine_id") or row.get("remote_id") or "").strip() row_started = row.get("started_at") attached = False if row_machine and row_started: for target in clustered: target_machine = str(target.get("customer_machine_id") or target.get("remote_id") or "").strip() target_started = target.get("started_at") if not target_machine or not target_started: continue if row_machine != target_machine: continue if abs(target_started - row_started) <= timedelta(minutes=20): target_score = _row_score(target) row_score = _row_score(row) winner = row if row_score > target_score else target loser = target if winner is row else row if not winner.get("duration_minutes") and loser.get("duration_minutes") is not None: winner["duration_minutes"] = loser.get("duration_minutes") if not winner.get("ended_at") and loser.get("ended_at"): winner["ended_at"] = loser.get("ended_at") if (winner.get("status") in (None, "", "active", "pending")) and loser.get("status"): if loser.get("status") in ("completed", "failed", "cancelled", "registered"): winner["status"] = loser.get("status") if not winner.get("notes") and loser.get("notes"): winner["notes"] = loser.get("notes") _backfill_enrichment(winner, loser) if winner is row: clustered.remove(target) clustered.append(winner) attached = True break if not attached: clustered.append(row) merged_rows = clustered merged_rows.sort(key=lambda item: item.get("started_at") or "", reverse=True) sessions = [] for r in merged_rows: sessions.append({ "id": r["id"], "anydesk_session_id": r["anydesk_session_id"], "started_at": str(r["started_at"]) if r["started_at"] else None, "ended_at": str(r["ended_at"]) if r["ended_at"] else None, "duration_minutes": r["duration_minutes"], "status": r["status"], "notes": r["notes"], "remote_id": r["remote_id"], "remote_alias": r["remote_alias"], "technician_id": r["technician_id"], # from.cid — teknikkerens maskine "technician_name": r["tech_name"], # resolved from user_anydesk_ids "customer_machine_id": r["customer_machine_id"], # to.cid — kundens maskine "customer_alias": r["customer_alias"], "hardware": { "id": r["hardware_asset_id"], "brand": r["hw_brand"], "model": r["hw_model"], "anydesk_id": r["hw_anydesk_id"], "customer_id": r["hw_customer_id"], } if r["hardware_asset_id"] else None, "contact": { "id": r["contact_id"], "name": r["contact_name"], "email": r["contact_email"], } if r["contact_id"] else None, "customer": { "id": r["customer_id"], "name": r["customer_name"], } if r["customer_id"] else None, "sag": { "id": r["sag_id"], "titel": r["sag_titel"], "status": r["sag_status"], } if r["sag_id"] else None, }) # Return deduplicated total for UI consistency on overview endpoint. return JSONResponse(content={"sessions": sessions, "total": len(sessions), "limit": limit, "offset": offset, "raw_total": total}) except Exception as e: logger.error(f"Error in sessions_overview: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/anydesk/auto-link", tags=["Remote Support"]) async def auto_link_sessions(): """ Auto-link unlinked sessions to hardware_assets via anydesk_id match, and carry over contact/customer from the hardware asset. Returns count of newly linked sessions. """ try: linked = 0 # Match sessions to hardware_assets where to_id (customer machine) = anydesk_id # NOTE: from.cid is the TECHNICIAN's machine, to.cid is the CUSTOMER's machine result = execute_query(""" UPDATE anydesk_sessions s SET hardware_asset_id = ha.id, customer_id = COALESCE(s.customer_id, ha.current_owner_customer_id), updated_at = NOW() FROM hardware_assets ha WHERE ha.anydesk_id IS NOT NULL AND ha.anydesk_id != '' AND ( (s.device_info->>'to_id') = ha.anydesk_id OR -- fallback: older imports without to_id — try remote_id only if it differs from technicians' known IDs (s.device_info->>'to_id' IS NULL AND (s.device_info->>'remote_id') = ha.anydesk_id) ) AND s.hardware_asset_id IS NULL RETURNING s.id """) linked = len(result) if result else 0 logger.info(f"✅ Auto-linked {linked} AnyDesk sessions to hardware assets") return JSONResponse(content={"linked": linked}) except Exception as e: logger.error(f"Error in auto_link_sessions: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.patch("/anydesk/sessions/{session_id}/link", tags=["Remote Support"]) async def link_session( session_id: int, sag_id: Optional[int] = None, contact_id: Optional[int] = None, customer_id: Optional[int] = None, hardware_asset_id: Optional[int] = None, notes: Optional[str] = None, ): """ Manually link a session to sag, contact, customer, hardware, or add notes. """ try: sets = ["updated_at = NOW()"] params = [] if sag_id is not None: sets.append("sag_id = %s"); params.append(sag_id) if contact_id is not None: sets.append("contact_id = %s"); params.append(contact_id) if customer_id is not None: sets.append("customer_id = %s"); params.append(customer_id) if hardware_asset_id is not None: sets.append("hardware_asset_id = %s"); params.append(hardware_asset_id) if notes is not None: sets.append("notes = %s"); params.append(notes) if len(sets) == 1: return JSONResponse(content={"message": "no changes"}) params.append(session_id) execute_query( f"UPDATE anydesk_sessions SET {', '.join(sets)} WHERE id = %s", tuple(params) ) return JSONResponse(content={"ok": True}) except Exception as e: logger.error(f"Error linking session {session_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/anydesk/hardware-assets", tags=["Remote Support"]) async def anydesk_hardware_list(): """List all hardware assets that have an anydesk_id (for linking dropdown)""" try: rows = execute_query(""" SELECT ha.id, ha.brand, ha.model, ha.anydesk_id, ha.serial_number, ha.current_owner_customer_id AS customer_id, cust.name AS customer_name FROM hardware_assets ha LEFT JOIN customers cust ON ha.current_owner_customer_id = cust.id WHERE ha.deleted_at IS NULL ORDER BY ha.brand, ha.model """) return JSONResponse(content={"assets": rows or []}) except Exception as e: raise HTTPException(status_code=500, detail=str(e))