""" AnyDesk Remote Support Service Handles integration with AnyDesk API for remote session management """ import logging import json import hashlib import hmac import base64 import time from uuid import uuid4 from datetime import datetime, timedelta from typing import Optional, Dict, Any, List import httpx import aiohttp from app.core.config import settings from app.core.database import execute_query logger = logging.getLogger(__name__) class AnyDeskService: """ AnyDesk API Integration Service Handles remote session creation, monitoring, and closure. Respects safety switches: READ_ONLY and DRY_RUN """ BASE_URL = "https://v1.api.anydesk.com:8081" def __init__(self): # Credentials loaded lazily from DB at call-time (via _get_credentials) # Fall back to .env values if DB has nothing self._timeout = settings.ANYDESK_TIMEOUT_SECONDS self.auto_start = settings.ANYDESK_AUTO_START_SESSION def _get_credentials(self) -> Dict[str, Any]: """Load credentials from DB settings table, fallback to .env""" try: rows = execute_query( "SELECT key, value FROM settings WHERE key LIKE 'anydesk_%'", ) db = {r["key"]: r["value"] for r in rows} if rows else {} except Exception: db = {} def _bool(val, default: bool) -> bool: if val is None: return default return str(val).lower() in ("true", "1", "yes") return { "api_token": db.get("anydesk_api_token") or settings.ANYDESK_API_TOKEN or "", "license_id": db.get("anydesk_license_id") or settings.ANYDESK_LICENSE_ID or "", "read_only": _bool(db.get("anydesk_read_only"), settings.ANYDESK_READ_ONLY), "dry_run": _bool(db.get("anydesk_dry_run"), settings.ANYDESK_DRY_RUN), } @property def timeout(self): return self._timeout def _generate_auth_header(self, resource: str, content: str = "", method: str = "GET") -> str: """ AnyDesk HMAC-SHA1 auth header. Format: AD {license_id}:{timestamp}:{signature} """ creds = self._get_credentials() sha1 = hashlib.sha1() sha1.update(content.encode("utf-8")) content_hash = base64.b64encode(sha1.digest()).decode("utf-8") timestamp = str(int(time.time())) request_string = f"{method}\n{resource}\n{timestamp}\n{content_hash}" sig = hmac.new( creds["api_token"].encode("utf-8"), request_string.encode("utf-8"), hashlib.sha1, ).digest() token = base64.b64encode(sig).decode("utf-8") return f"AD {creds['license_id']}:{timestamp}:{token}" def _check_enabled(self) -> bool: """Check if AnyDesk is properly configured""" creds = self._get_credentials() if not creds["api_token"] or not creds["license_id"]: logger.warning("AnyDesk service not configured (missing credentials)") return False return True async def _api_call(self, method: str, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]: if not self._check_enabled(): return {"error": "AnyDesk not configured"} creds = self._get_credentials() dry_run = creds["dry_run"] read_only = creds["read_only"] log_msg = f"🔗 AnyDesk API: {method} {endpoint}" if data: log_msg += f" | Data: {json.dumps(data, indent=2)}" logger.info(log_msg) # DRY RUN: Don't actually call API if dry_run: logger.warning("⚠️ DRY_RUN=true: Simulating API response (no actual call)") return self._simulate_response(method, endpoint, data) # READ ONLY: Allow gets but not mutations if read_only and method != "GET": logger.warning(f"🔒 READ_ONLY=true: Blocking {method} request") return {"error": "Read-only mode: mutations disabled"} body_str = json.dumps(data) if data else "" auth_header = self._generate_auth_header(endpoint, body_str, method) headers = { "Authorization": auth_header, "Content-Type": "application/json", } url = f"{self.BASE_URL}{endpoint}" try: async with aiohttp.ClientSession() as session: kwargs = {"headers": headers, "timeout": aiohttp.ClientTimeout(total=self.timeout)} if data: kwargs["json"] = data async with getattr(session, method.lower())(url, **kwargs) as response: response_text = await response.text() logger.info(f"📡 AnyDesk API {response.status}: {response_text[:200]}") if response.status == 200: try: return await response.json(content_type=None) except Exception: return {"raw": response_text} elif response.status == 401: logger.error(f"❌ AnyDesk auth failed — check license_id + api_token") return {"error": f"Unauthorized (401): {response_text[:200]}"} else: logger.error(f"❌ AnyDesk API error {response.status}: {response_text[:300]}") return {"error": f"HTTP {response.status}: {response_text[:300]}"} except httpx.HTTPError as e: logger.error(f"❌ AnyDesk API error: {str(e)}") return {"error": str(e)} except Exception as e: logger.error(f"❌ Unexpected error calling AnyDesk API: {str(e)}") return {"error": str(e)} def _simulate_response(self, method: str, endpoint: str, data: Optional[Dict]) -> Dict[str, Any]: """Generate simulated AnyDesk API response for DRY_RUN mode""" import uuid if "/sessions" in endpoint and method == "POST": # Simulate session creation session_id = f"session_{uuid.uuid4().hex[:12]}" return { "id": session_id, "status": "active", "access_code": f"AD-{uuid.uuid4().hex[:8].upper()}", "link": f"https://anydesk.com/?phone={uuid.uuid4().hex[:8]}", "created_at": datetime.utcnow().isoformat(), "expires_at": (datetime.utcnow() + timedelta(hours=24)).isoformat(), "simulator": True } elif "/sessions" in endpoint and method == "GET": # Simulate session retrieval return { "id": "session_abc123", "status": "active", "device_name": "Customer PC", "duration_seconds": 300, "simulator": True } elif "/sessions" in endpoint and method == "DELETE": # Simulate session termination return {"status": "terminated", "simulator": True} return {"status": "ok", "simulator": True} async def create_session( self, customer_id: int, contact_id: Optional[int] = None, sag_id: Optional[int] = None, description: Optional[str] = None, created_by_user_id: Optional[int] = None ) -> Dict[str, Any]: """ Create a new unattended remote session Args: customer_id: BMC Hub customer ID contact_id: Optional contact ID sag_id: Optional case ID description: Session description/purpose created_by_user_id: User creating the session Returns: Session data with session_id, link, access_code, etc. """ creds = self._get_credentials() # Prepare session data session_data = { "name": f"BMC Support - Customer {customer_id}", "description": description or f"Support session for customer {customer_id}", "license_id": creds["license_id"], "auto_accept": True # Auto-accept connection requests } # Call AnyDesk API result = await self._api_call("POST", "/v1/sessions", session_data) if "error" in result: logger.error(f"Failed to create AnyDesk session: {result['error']}") return result # Store session in database session_id = result.get("id") session_link = result.get("link") or result.get("access_code") try: query = """ INSERT INTO anydesk_sessions (anydesk_session_id, customer_id, contact_id, sag_id, session_link, status, created_by_user_id, device_info, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id, anydesk_session_id, customer_id, contact_id, sag_id, session_link, status, started_at, ended_at, duration_minutes, created_by_user_id, created_at, updated_at """ device_info = { "created_via": "api", "auto_start": self.auto_start, "dry_run_mode": creds["dry_run"] } metadata = { "api_response": { "expires_at": result.get("expires_at"), "access_code": result.get("access_code") } } db_result = execute_query( query, (session_id, customer_id, contact_id, sag_id, session_link, "active", created_by_user_id, json.dumps(device_info), json.dumps(metadata)) ) if db_result: logger.info(f"✅ Created AnyDesk session {session_id} in database") return { **db_result[0], "api_response": result } else: logger.error("Failed to store session in database") return {"error": "Database storage failed"} except Exception as e: logger.error(f"Error storing session: {str(e)}") return {"error": str(e)} async def get_session(self, anydesk_session_id: str) -> Dict[str, Any]: """ Get session details from AnyDesk API Args: anydesk_session_id: AnyDesk session ID Returns: Session status and details """ result = await self._api_call("GET", f"/v1/sessions/{anydesk_session_id}") return result async def check_session_status(self, db_session_id: int) -> Dict[str, Any]: """ Check current status of a session in database Args: db_session_id: Database session ID Returns: Current session status, running time, etc. """ try: query = """ SELECT id, anydesk_session_id, status, started_at, ended_at, duration_minutes FROM anydesk_sessions WHERE id = %s """ result = execute_query(query, (db_session_id,)) if result: session = result[0] # If session still active, try to get live status from AnyDesk if session["status"] == "active": api_result = await self.get_session(session["anydesk_session_id"]) if "error" not in api_result: return { "db_id": session["id"], "status": "active", "started_at": str(session["started_at"]), "api_status": api_result } return { "db_id": session["id"], "status": session["status"], "started_at": str(session["started_at"]), "ended_at": str(session["ended_at"]) if session["ended_at"] else None, "duration_minutes": session["duration_minutes"] } else: return {"error": "Session not found"} except Exception as e: logger.error(f"Error checking session status: {str(e)}") return {"error": str(e)} async def end_session(self, db_session_id: int) -> Dict[str, Any]: """ End a remote session (stop AnyDesk connection and mark as completed) Args: db_session_id: Database session ID Returns: Confirmation with duration and suggested worklog """ try: # Get session from DB query = """ SELECT id, anydesk_session_id, started_at FROM anydesk_sessions WHERE id = %s AND status = 'active' """ result = execute_query(query, (db_session_id,)) if not result: return {"error": "Session not found or already ended"} session = result[0] anydesk_session_id = session["anydesk_session_id"] started_at = session["started_at"] # Call AnyDesk API to terminate session api_result = await self._api_call("DELETE", f"/v1/sessions/{anydesk_session_id}") # Calculate duration now = datetime.utcnow() started = started_at.replace(tzinfo=None) if isinstance(started_at, datetime) else started_at duration_seconds = int((now - started).total_seconds()) duration_minutes = round(duration_seconds / 60, 1) # Update database update_query = """ UPDATE anydesk_sessions SET status = 'completed', ended_at = %s, duration_minutes = %s, updated_at = %s WHERE id = %s RETURNING id, anydesk_session_id, duration_minutes """ update_result = execute_query( update_query, (datetime.utcnow(), duration_minutes, datetime.utcnow(), db_session_id) ) logger.info(f"✅ Ended AnyDesk session {anydesk_session_id} (Duration: {duration_minutes} min)") return { "id": db_session_id, "status": "completed", "duration_minutes": duration_minutes, "duration_hours": round(duration_minutes / 60, 2), "ended_at": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Error ending session: {str(e)}") return {"error": str(e)} async def get_session_history( self, contact_id: Optional[int] = None, customer_id: Optional[int] = None, sag_id: Optional[int] = None, limit: int = 50, offset: int = 0 ) -> Dict[str, Any]: """ Get session history for a contact/customer/case Args: contact_id: Filter by contact customer_id: Filter by customer sag_id: Filter by case limit: Number of sessions to return offset: Pagination offset Returns: List of sessions with details """ try: # Build dynamic query based on filters where_clauses = [] params = [] if contact_id: where_clauses.append("contact_id = %s") params.append(contact_id) if customer_id: where_clauses.append("customer_id = %s") params.append(customer_id) if sag_id: where_clauses.append("sag_id = %s") params.append(sag_id) where_clause = " AND ".join(where_clauses) if where_clauses else "1=1" query = f""" SELECT s.id, s.anydesk_session_id, s.contact_id, s.customer_id, s.sag_id, s.session_link, s.status, s.started_at, s.ended_at, s.duration_minutes, s.created_by_user_id, s.created_at, s.updated_at, c.first_name || ' ' || c.last_name as contact_name, cust.name as customer_name, sag.titel as sag_title, u.full_name as created_by_user_name, s.device_info, s.metadata FROM anydesk_sessions s LEFT JOIN contacts c ON s.contact_id = c.id LEFT JOIN customers cust ON s.customer_id = cust.id LEFT JOIN sag_sager sag ON s.sag_id = sag.id LEFT JOIN users u ON s.created_by_user_id = u.user_id WHERE {where_clause} ORDER BY s.started_at DESC LIMIT %s OFFSET %s """ params.extend([limit, offset]) result = execute_query(query, tuple(params)) # Count total count_query = f""" SELECT COUNT(*) as total FROM anydesk_sessions WHERE {where_clause} """ count_result = execute_query(count_query, tuple(params[:-2])) total = count_result[0]["total"] if count_result else 0 return { "sessions": result or [], "total": total, "limit": limit, "offset": offset } except Exception as e: logger.error(f"Error fetching session history: {str(e)}") return {"error": str(e), "sessions": []} async def fetch_sessions_from_api( self, days: int = 30, limit: int = 1000, after: Optional[str] = None, before: Optional[str] = None, ) -> Dict[str, Any]: """ Pull session log from AnyDesk REST API and upsert into local DB. AnyDesk API: GET /sessions?from=UNIX&to=UNIX&limit=N Auth: HMAC-SHA1 signature (not Bearer token) Returns summary of imported/updated records. """ end_ts = int(time.time()) start_ts = end_ts - (days * 86400) # Allow ISO override if after: try: start_ts = int(datetime.fromisoformat(after.rstrip("Z")).timestamp()) except Exception: pass if before: try: end_ts = int(datetime.fromisoformat(before.rstrip("Z")).timestamp()) except Exception: pass qs = f"from={start_ts}&to={end_ts}&limit={limit}" result = await self._api_call("GET", f"/sessions?{qs}") if "error" in result: return result # AnyDesk returns { "list": [...] } entries = result.get("list", result if isinstance(result, list) else []) imported = 0 updated = 0 errors = [] for i, entry in enumerate(entries): if i < 3: logger.info(f"📊 AnyDesk session sample: {entry}") try: session_id = str(entry.get("sid") or "") if not session_id: continue # AnyDesk timestamps are unix integers started_raw = entry.get("start-time") ended_raw = entry.get("end-time") started = datetime.utcfromtimestamp(started_raw) if started_raw else None ended = datetime.utcfromtimestamp(ended_raw) if ended_raw else None duration_s = entry.get("duration") or 0 duration_min = round(int(duration_s) / 60, 1) if duration_s else None remote_alias = entry.get("from", {}).get("alias") if isinstance(entry.get("from"), dict) else None from_id = str(entry.get("from", {}).get("cid") or "") if isinstance(entry.get("from"), dict) else None # technician machine to_id = str(entry.get("to", {}).get("cid") or "") if isinstance(entry.get("to"), dict) else None # customer machine local_alias = entry.get("to", {}).get("alias") if isinstance(entry.get("to"), dict) else None status = "active" if entry.get("active") else "completed" device_info = json.dumps({ "remote_alias": remote_alias, # technician alias (from) "remote_id": from_id, # technician machine CID (from.cid) — kept for compat "from_id": from_id, # technician machine CID "to_id": to_id, # customer machine CID ← use for hardware linking "local_alias": local_alias, # customer alias (to) "imported_from_api": True, }) metadata = json.dumps({"raw": entry}) # Upsert: insert or update on anydesk_session_id check = execute_query( "SELECT id FROM anydesk_sessions WHERE anydesk_session_id = %s", (session_id,) ) if check: execute_query( """UPDATE anydesk_sessions SET status=%s, ended_at=%s, duration_minutes=%s, device_info=%s, metadata=%s, updated_at=NOW() WHERE anydesk_session_id=%s""", (status, ended, duration_min, device_info, metadata, session_id) ) updated += 1 else: execute_query( """INSERT INTO anydesk_sessions (anydesk_session_id, status, started_at, ended_at, duration_minutes, device_info, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s)""", (session_id, status, started, ended, duration_min, device_info, metadata) ) imported += 1 except Exception as exc: errors.append(str(exc)) logger.warning(f"⚠️ Could not import entry: {exc}") logger.info(f"✅ AnyDesk import done: {imported} new, {updated} updated, {len(errors)} errors") return { "imported": imported, "updated": updated, "total_from_api": len(entries), "errors": errors, } @staticmethod def _extract_local_sessions(payload: Any) -> List[dict]: if isinstance(payload, list): return [item for item in payload if isinstance(item, dict)] if isinstance(payload, dict): for key in ("sessions", "list", "data", "items", "results"): value = payload.get(key) if isinstance(value, list): return [item for item in value if isinstance(item, dict)] return [] @staticmethod def _parse_timestamp(value: Any) -> Optional[datetime]: if value is None: return None if isinstance(value, datetime): return value if isinstance(value, (int, float)): if value > 10_000_000_000: value = value / 1000 try: return datetime.utcfromtimestamp(value) except Exception: return None text = str(value).strip() if not text: return None try: if text.endswith("Z"): text = text[:-1] + "+00:00" return datetime.fromisoformat(text).replace(tzinfo=None) except Exception: return None async def fetch_sessions_from_local_endpoint( self, endpoint_url: str, timeout_seconds: int = 20, dry_run: bool = False, ) -> Dict[str, Any]: """ Poll local AnyDesk bridge endpoint and upsert/enrich local sessions. Endpoint expected: http://localhost:8001/anydesk/sessions """ imported = 0 updated = 0 matched = 0 errors: List[str] = [] try: logger.info("📡 Polling local AnyDesk sessions from %s", endpoint_url) async with httpx.AsyncClient(timeout=timeout_seconds) as client: response = await client.get(endpoint_url) response.raise_for_status() payload = response.json() except Exception as exc: logger.error("❌ Local AnyDesk polling failed: %s", exc) return {"error": str(exc), "imported": 0, "updated": 0, "matched": 0, "total": 0, "errors": [str(exc)]} entries = self._extract_local_sessions(payload) for entry in entries: try: sid = str(entry.get("sid") or entry.get("session_id") or entry.get("anydesk_session_id") or entry.get("id") or "").strip() to_raw = entry.get("to") to_obj = to_raw if isinstance(to_raw, dict) else {} to_id = str( entry.get("to_id") or entry.get("anydesk_id") or entry.get("customer_machine_id") or to_obj.get("cid") or "" ).strip() from_raw = entry.get("from") from_obj = from_raw if isinstance(from_raw, dict) else {} from_id = str(entry.get("from_id") or from_obj.get("cid") or "").strip() started = self._parse_timestamp( entry.get("started_at") or entry.get("start") or entry.get("start-time") or entry.get("start_time") ) ended = self._parse_timestamp( entry.get("ended_at") or entry.get("end") or entry.get("end-time") or entry.get("end_time") ) duration_minutes = entry.get("duration_minutes") if duration_minutes is None: duration_seconds = entry.get("duration_seconds") if duration_seconds is None: duration_seconds = entry.get("duration") if duration_seconds is not None: try: duration_minutes = round(float(duration_seconds) / 60, 1) except Exception: duration_minutes = None status = str(entry.get("status") or "").strip().lower() if not status: if bool(entry.get("active")): status = "active" elif ended or duration_minutes is not None: status = "completed" else: status = "pending" if not sid and not to_id: continue existing = [] if sid: existing = execute_query( "SELECT id FROM anydesk_sessions WHERE anydesk_session_id = %s", (sid,), ) or [] if not existing and to_id: existing = execute_query( """ SELECT id FROM anydesk_sessions WHERE COALESCE(device_info->>'to_id', '') = %s AND started_at >= NOW() - INTERVAL '24 hours' ORDER BY started_at DESC LIMIT 1 """, (to_id,), ) or [] device_info = { "remote_alias": entry.get("remote_alias") or from_obj.get("alias"), "from_id": from_id, "to_id": to_id, "local_alias": entry.get("local_alias") or to_obj.get("alias"), "imported_from_local_endpoint": True, } metadata = { "source": "local_anydesk_sessions", "raw": entry, } if existing: matched += 1 if dry_run: continue execute_query( """ UPDATE anydesk_sessions SET status = COALESCE(%s, status), ended_at = COALESCE(%s, ended_at), duration_minutes = COALESCE(%s, duration_minutes), device_info = COALESCE(device_info, '{}'::jsonb) || %s::jsonb, metadata = COALESCE(metadata, '{}'::jsonb) || %s::jsonb, updated_at = NOW() WHERE id = %s """, ( status, ended, duration_minutes, json.dumps(device_info), json.dumps(metadata), existing[0]["id"], ), ) updated += 1 continue if dry_run: continue insert_sid = sid or f"local-{uuid4().hex[:12]}" execute_query( """ INSERT INTO anydesk_sessions ( anydesk_session_id, session_link, started_at, ended_at, duration_minutes, status, device_info, metadata, created_at, updated_at ) VALUES (%s, %s, %s, %s, %s, %s, %s::jsonb, %s::jsonb, NOW(), NOW()) """, ( insert_sid, f"anydesk:{to_id}" if to_id else None, started or datetime.utcnow(), ended, duration_minutes, status, json.dumps(device_info), json.dumps(metadata), ), ) imported += 1 except Exception as exc: logger.warning("⚠️ Failed local AnyDesk entry sync: %s", exc) errors.append(str(exc)) logger.info( "✅ Local AnyDesk sync done: %d imported, %d updated (%d matched), %d errors", imported, updated, matched, len(errors), ) return { "imported": imported, "updated": updated, "matched": matched, "total": len(entries), "errors": errors, }