""" AnyDesk Remote Support Service Handles integration with AnyDesk API for remote session management """ import logging import json from datetime import datetime, timedelta from typing import Optional, Dict, Any import httpx from app.core.config import settings from app.core.database import execute_query logger = logging.getLogger(__name__) class AnyDeskService: """ AnyDesk API Integration Service Handles remote session creation, monitoring, and closure. Respects safety switches: READ_ONLY and DRY_RUN """ BASE_URL = "https://api.anydesk.com" def __init__(self): self.api_token = settings.ANYDESK_API_TOKEN self.license_id = settings.ANYDESK_LICENSE_ID self.read_only = settings.ANYDESK_READ_ONLY self.dry_run = settings.ANYDESK_DRY_RUN self.timeout = settings.ANYDESK_TIMEOUT_SECONDS self.auto_start = settings.ANYDESK_AUTO_START_SESSION if not self.api_token or not self.license_id: logger.warning("⚠️ AnyDesk credentials not configured - service disabled") def _check_enabled(self) -> bool: """Check if AnyDesk is properly configured""" if not self.api_token or not self.license_id: logger.warning("AnyDesk service not configured (missing credentials)") return False return True async def _api_call(self, method: str, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]: """ Make HTTP call to AnyDesk API Args: method: HTTP method (GET, POST, PUT, DELETE) endpoint: API endpoint (e.g., "/v1/sessions") data: Request body data Returns: Response JSON dictionary """ if not self._check_enabled(): return {"error": "AnyDesk not configured"} # Log the intent log_msg = f"🔗 AnyDesk API: {method} {endpoint}" if data: log_msg += f" | Data: {json.dumps(data, indent=2)}" logger.info(log_msg) # DRY RUN: Don't actually call API if self.dry_run: logger.warning("⚠️ DRY_RUN=true: Simulating API response (no actual call)") return self._simulate_response(method, endpoint, data) # READ ONLY: Allow gets but not mutations if self.read_only and method != "GET": logger.warning(f"🔒 READ_ONLY=true: Blocking {method} request") return {"error": "Read-only mode: mutations disabled"} try: headers = { "Authorization": f"Bearer {self.api_token}", "Content-Type": "application/json" } url = f"{self.BASE_URL}{endpoint}" async with httpx.AsyncClient(timeout=self.timeout) as client: if method == "GET": response = await client.get(url, headers=headers) elif method == "POST": response = await client.post(url, headers=headers, json=data) elif method == "PUT": response = await client.put(url, headers=headers, json=data) elif method == "DELETE": response = await client.delete(url, headers=headers) else: return {"error": f"Unsupported method: {method}"} response.raise_for_status() return response.json() except httpx.HTTPError as e: logger.error(f"❌ AnyDesk API error: {str(e)}") return {"error": str(e)} except Exception as e: logger.error(f"❌ Unexpected error calling AnyDesk API: {str(e)}") return {"error": str(e)} def _simulate_response(self, method: str, endpoint: str, data: Optional[Dict]) -> Dict[str, Any]: """Generate simulated AnyDesk API response for DRY_RUN mode""" import uuid if "/sessions" in endpoint and method == "POST": # Simulate session creation session_id = f"session_{uuid.uuid4().hex[:12]}" return { "id": session_id, "status": "active", "access_code": f"AD-{uuid.uuid4().hex[:8].upper()}", "link": f"https://anydesk.com/?phone={uuid.uuid4().hex[:8]}", "created_at": datetime.utcnow().isoformat(), "expires_at": (datetime.utcnow() + timedelta(hours=24)).isoformat(), "simulator": True } elif "/sessions" in endpoint and method == "GET": # Simulate session retrieval return { "id": "session_abc123", "status": "active", "device_name": "Customer PC", "duration_seconds": 300, "simulator": True } elif "/sessions" in endpoint and method == "DELETE": # Simulate session termination return {"status": "terminated", "simulator": True} return {"status": "ok", "simulator": True} async def create_session( self, customer_id: int, contact_id: Optional[int] = None, sag_id: Optional[int] = None, description: Optional[str] = None, created_by_user_id: Optional[int] = None ) -> Dict[str, Any]: """ Create a new unattended remote session Args: customer_id: BMC Hub customer ID contact_id: Optional contact ID sag_id: Optional case ID description: Session description/purpose created_by_user_id: User creating the session Returns: Session data with session_id, link, access_code, etc. """ # Prepare session data session_data = { "name": f"BMC Support - Customer {customer_id}", "description": description or f"Support session for customer {customer_id}", "license_id": self.license_id, "auto_accept": True # Auto-accept connection requests } # Call AnyDesk API result = await self._api_call("POST", "/v1/sessions", session_data) if "error" in result: logger.error(f"Failed to create AnyDesk session: {result['error']}") return result # Store session in database session_id = result.get("id") session_link = result.get("link") or result.get("access_code") try: query = """ INSERT INTO anydesk_sessions (anydesk_session_id, customer_id, contact_id, sag_id, session_link, status, created_by_user_id, device_info, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id, anydesk_session_id, customer_id, contact_id, sag_id, session_link, status, started_at, ended_at, duration_minutes, created_by_user_id, created_at, updated_at """ device_info = { "created_via": "api", "auto_start": self.auto_start, "dry_run_mode": self.dry_run } metadata = { "api_response": { "expires_at": result.get("expires_at"), "access_code": result.get("access_code") } } db_result = execute_query( query, (session_id, customer_id, contact_id, sag_id, session_link, "active", created_by_user_id, json.dumps(device_info), json.dumps(metadata)) ) if db_result: logger.info(f"✅ Created AnyDesk session {session_id} in database") return { **db_result[0], "api_response": result } else: logger.error("Failed to store session in database") return {"error": "Database storage failed"} except Exception as e: logger.error(f"Error storing session: {str(e)}") return {"error": str(e)} async def get_session(self, anydesk_session_id: str) -> Dict[str, Any]: """ Get session details from AnyDesk API Args: anydesk_session_id: AnyDesk session ID Returns: Session status and details """ result = await self._api_call("GET", f"/v1/sessions/{anydesk_session_id}") return result async def check_session_status(self, db_session_id: int) -> Dict[str, Any]: """ Check current status of a session in database Args: db_session_id: Database session ID Returns: Current session status, running time, etc. """ try: query = """ SELECT id, anydesk_session_id, status, started_at, ended_at, duration_minutes FROM anydesk_sessions WHERE id = %s """ result = execute_query(query, (db_session_id,)) if result: session = result[0] # If session still active, try to get live status from AnyDesk if session["status"] == "active": api_result = await self.get_session(session["anydesk_session_id"]) if "error" not in api_result: return { "db_id": session["id"], "status": "active", "started_at": str(session["started_at"]), "api_status": api_result } return { "db_id": session["id"], "status": session["status"], "started_at": str(session["started_at"]), "ended_at": str(session["ended_at"]) if session["ended_at"] else None, "duration_minutes": session["duration_minutes"] } else: return {"error": "Session not found"} except Exception as e: logger.error(f"Error checking session status: {str(e)}") return {"error": str(e)} async def end_session(self, db_session_id: int) -> Dict[str, Any]: """ End a remote session (stop AnyDesk connection and mark as completed) Args: db_session_id: Database session ID Returns: Confirmation with duration and suggested worklog """ try: # Get session from DB query = """ SELECT id, anydesk_session_id, started_at FROM anydesk_sessions WHERE id = %s AND status = 'active' """ result = execute_query(query, (db_session_id,)) if not result: return {"error": "Session not found or already ended"} session = result[0] anydesk_session_id = session["anydesk_session_id"] started_at = session["started_at"] # Call AnyDesk API to terminate session api_result = await self._api_call("DELETE", f"/v1/sessions/{anydesk_session_id}") # Calculate duration now = datetime.utcnow() started = started_at.replace(tzinfo=None) if isinstance(started_at, datetime) else started_at duration_seconds = int((now - started).total_seconds()) duration_minutes = round(duration_seconds / 60, 1) # Update database update_query = """ UPDATE anydesk_sessions SET status = 'completed', ended_at = %s, duration_minutes = %s, updated_at = %s WHERE id = %s RETURNING id, anydesk_session_id, duration_minutes """ update_result = execute_query( update_query, (datetime.utcnow(), duration_minutes, datetime.utcnow(), db_session_id) ) logger.info(f"✅ Ended AnyDesk session {anydesk_session_id} (Duration: {duration_minutes} min)") return { "id": db_session_id, "status": "completed", "duration_minutes": duration_minutes, "duration_hours": round(duration_minutes / 60, 2), "ended_at": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Error ending session: {str(e)}") return {"error": str(e)} async def get_session_history( self, contact_id: Optional[int] = None, customer_id: Optional[int] = None, sag_id: Optional[int] = None, limit: int = 50, offset: int = 0 ) -> Dict[str, Any]: """ Get session history for a contact/customer/case Args: contact_id: Filter by contact customer_id: Filter by customer sag_id: Filter by case limit: Number of sessions to return offset: Pagination offset Returns: List of sessions with details """ try: # Build dynamic query based on filters where_clauses = [] params = [] if contact_id: where_clauses.append("contact_id = %s") params.append(contact_id) if customer_id: where_clauses.append("customer_id = %s") params.append(customer_id) if sag_id: where_clauses.append("sag_id = %s") params.append(sag_id) where_clause = " AND ".join(where_clauses) if where_clauses else "1=1" query = f""" SELECT s.id, s.anydesk_session_id, s.contact_id, s.customer_id, s.sag_id, s.session_link, s.status, s.started_at, s.ended_at, s.duration_minutes, s.created_by_user_id, s.created_at, s.updated_at, c.first_name || ' ' || c.last_name as contact_name, cust.name as customer_name, sag.title as sag_title, u.full_name as created_by_user_name, s.device_info, s.metadata FROM anydesk_sessions s LEFT JOIN contacts c ON s.contact_id = c.id LEFT JOIN customers cust ON s.customer_id = cust.id LEFT JOIN sag_sager sag ON s.sag_id = sag.id LEFT JOIN users u ON s.created_by_user_id = u.user_id WHERE {where_clause} ORDER BY s.started_at DESC LIMIT %s OFFSET %s """ params.extend([limit, offset]) result = execute_query(query, tuple(params)) # Count total count_query = f""" SELECT COUNT(*) as total FROM anydesk_sessions WHERE {where_clause} """ count_result = execute_query(count_query, tuple(params[:-2])) total = count_result[0]["total"] if count_result else 0 return { "sessions": result or [], "total": total, "limit": limit, "offset": offset } except Exception as e: logger.error(f"Error fetching session history: {str(e)}") return {"error": str(e), "sessions": []}