bmc_hub/app/services/anydesk.py

425 lines
16 KiB
Python
Raw Permalink Normal View History

"""
AnyDesk Remote Support Service
Handles integration with AnyDesk API for remote session management
"""
import logging
import json
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import httpx
from app.core.config import settings
from app.core.database import execute_query
logger = logging.getLogger(__name__)
class AnyDeskService:
"""
AnyDesk API Integration Service
Handles remote session creation, monitoring, and closure.
Respects safety switches: READ_ONLY and DRY_RUN
"""
BASE_URL = "https://api.anydesk.com"
def __init__(self):
self.api_token = settings.ANYDESK_API_TOKEN
self.license_id = settings.ANYDESK_LICENSE_ID
self.read_only = settings.ANYDESK_READ_ONLY
self.dry_run = settings.ANYDESK_DRY_RUN
self.timeout = settings.ANYDESK_TIMEOUT_SECONDS
self.auto_start = settings.ANYDESK_AUTO_START_SESSION
if not self.api_token or not self.license_id:
logger.warning("⚠️ AnyDesk credentials not configured - service disabled")
def _check_enabled(self) -> bool:
"""Check if AnyDesk is properly configured"""
if not self.api_token or not self.license_id:
logger.warning("AnyDesk service not configured (missing credentials)")
return False
return True
async def _api_call(self, method: str, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]:
"""
Make HTTP call to AnyDesk API
Args:
method: HTTP method (GET, POST, PUT, DELETE)
endpoint: API endpoint (e.g., "/v1/sessions")
data: Request body data
Returns:
Response JSON dictionary
"""
if not self._check_enabled():
return {"error": "AnyDesk not configured"}
# Log the intent
log_msg = f"🔗 AnyDesk API: {method} {endpoint}"
if data:
log_msg += f" | Data: {json.dumps(data, indent=2)}"
logger.info(log_msg)
# DRY RUN: Don't actually call API
if self.dry_run:
logger.warning("⚠️ DRY_RUN=true: Simulating API response (no actual call)")
return self._simulate_response(method, endpoint, data)
# READ ONLY: Allow gets but not mutations
if self.read_only and method != "GET":
logger.warning(f"🔒 READ_ONLY=true: Blocking {method} request")
return {"error": "Read-only mode: mutations disabled"}
try:
headers = {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
}
url = f"{self.BASE_URL}{endpoint}"
async with httpx.AsyncClient(timeout=self.timeout) as client:
if method == "GET":
response = await client.get(url, headers=headers)
elif method == "POST":
response = await client.post(url, headers=headers, json=data)
elif method == "PUT":
response = await client.put(url, headers=headers, json=data)
elif method == "DELETE":
response = await client.delete(url, headers=headers)
else:
return {"error": f"Unsupported method: {method}"}
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
logger.error(f"❌ AnyDesk API error: {str(e)}")
return {"error": str(e)}
except Exception as e:
logger.error(f"❌ Unexpected error calling AnyDesk API: {str(e)}")
return {"error": str(e)}
def _simulate_response(self, method: str, endpoint: str, data: Optional[Dict]) -> Dict[str, Any]:
"""Generate simulated AnyDesk API response for DRY_RUN mode"""
import uuid
if "/sessions" in endpoint and method == "POST":
# Simulate session creation
session_id = f"session_{uuid.uuid4().hex[:12]}"
return {
"id": session_id,
"status": "active",
"access_code": f"AD-{uuid.uuid4().hex[:8].upper()}",
"link": f"https://anydesk.com/?phone={uuid.uuid4().hex[:8]}",
"created_at": datetime.utcnow().isoformat(),
"expires_at": (datetime.utcnow() + timedelta(hours=24)).isoformat(),
"simulator": True
}
elif "/sessions" in endpoint and method == "GET":
# Simulate session retrieval
return {
"id": "session_abc123",
"status": "active",
"device_name": "Customer PC",
"duration_seconds": 300,
"simulator": True
}
elif "/sessions" in endpoint and method == "DELETE":
# Simulate session termination
return {"status": "terminated", "simulator": True}
return {"status": "ok", "simulator": True}
async def create_session(
self,
customer_id: int,
contact_id: Optional[int] = None,
sag_id: Optional[int] = None,
description: Optional[str] = None,
created_by_user_id: Optional[int] = None
) -> Dict[str, Any]:
"""
Create a new unattended remote session
Args:
customer_id: BMC Hub customer ID
contact_id: Optional contact ID
sag_id: Optional case ID
description: Session description/purpose
created_by_user_id: User creating the session
Returns:
Session data with session_id, link, access_code, etc.
"""
# Prepare session data
session_data = {
"name": f"BMC Support - Customer {customer_id}",
"description": description or f"Support session for customer {customer_id}",
"license_id": self.license_id,
"auto_accept": True # Auto-accept connection requests
}
# Call AnyDesk API
result = await self._api_call("POST", "/v1/sessions", session_data)
if "error" in result:
logger.error(f"Failed to create AnyDesk session: {result['error']}")
return result
# Store session in database
session_id = result.get("id")
session_link = result.get("link") or result.get("access_code")
try:
query = """
INSERT INTO anydesk_sessions
(anydesk_session_id, customer_id, contact_id, sag_id, session_link,
status, created_by_user_id, device_info, metadata)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id, anydesk_session_id, customer_id, contact_id, sag_id,
session_link, status, started_at, ended_at, duration_minutes,
created_by_user_id, created_at, updated_at
"""
device_info = {
"created_via": "api",
"auto_start": self.auto_start,
"dry_run_mode": self.dry_run
}
metadata = {
"api_response": {
"expires_at": result.get("expires_at"),
"access_code": result.get("access_code")
}
}
db_result = execute_query(
query,
(session_id, customer_id, contact_id, sag_id, session_link,
"active", created_by_user_id, json.dumps(device_info), json.dumps(metadata))
)
if db_result:
logger.info(f"✅ Created AnyDesk session {session_id} in database")
return {
**db_result[0],
"api_response": result
}
else:
logger.error("Failed to store session in database")
return {"error": "Database storage failed"}
except Exception as e:
logger.error(f"Error storing session: {str(e)}")
return {"error": str(e)}
async def get_session(self, anydesk_session_id: str) -> Dict[str, Any]:
"""
Get session details from AnyDesk API
Args:
anydesk_session_id: AnyDesk session ID
Returns:
Session status and details
"""
result = await self._api_call("GET", f"/v1/sessions/{anydesk_session_id}")
return result
async def check_session_status(self, db_session_id: int) -> Dict[str, Any]:
"""
Check current status of a session in database
Args:
db_session_id: Database session ID
Returns:
Current session status, running time, etc.
"""
try:
query = """
SELECT id, anydesk_session_id, status, started_at, ended_at, duration_minutes
FROM anydesk_sessions
WHERE id = %s
"""
result = execute_query(query, (db_session_id,))
if result:
session = result[0]
# If session still active, try to get live status from AnyDesk
if session["status"] == "active":
api_result = await self.get_session(session["anydesk_session_id"])
if "error" not in api_result:
return {
"db_id": session["id"],
"status": "active",
"started_at": str(session["started_at"]),
"api_status": api_result
}
return {
"db_id": session["id"],
"status": session["status"],
"started_at": str(session["started_at"]),
"ended_at": str(session["ended_at"]) if session["ended_at"] else None,
"duration_minutes": session["duration_minutes"]
}
else:
return {"error": "Session not found"}
except Exception as e:
logger.error(f"Error checking session status: {str(e)}")
return {"error": str(e)}
async def end_session(self, db_session_id: int) -> Dict[str, Any]:
"""
End a remote session (stop AnyDesk connection and mark as completed)
Args:
db_session_id: Database session ID
Returns:
Confirmation with duration and suggested worklog
"""
try:
# Get session from DB
query = """
SELECT id, anydesk_session_id, started_at
FROM anydesk_sessions
WHERE id = %s AND status = 'active'
"""
result = execute_query(query, (db_session_id,))
if not result:
return {"error": "Session not found or already ended"}
session = result[0]
anydesk_session_id = session["anydesk_session_id"]
started_at = session["started_at"]
# Call AnyDesk API to terminate session
api_result = await self._api_call("DELETE", f"/v1/sessions/{anydesk_session_id}")
# Calculate duration
now = datetime.utcnow()
started = started_at.replace(tzinfo=None) if isinstance(started_at, datetime) else started_at
duration_seconds = int((now - started).total_seconds())
duration_minutes = round(duration_seconds / 60, 1)
# Update database
update_query = """
UPDATE anydesk_sessions
SET status = 'completed', ended_at = %s, duration_minutes = %s, updated_at = %s
WHERE id = %s
RETURNING id, anydesk_session_id, duration_minutes
"""
update_result = execute_query(
update_query,
(datetime.utcnow(), duration_minutes, datetime.utcnow(), db_session_id)
)
logger.info(f"✅ Ended AnyDesk session {anydesk_session_id} (Duration: {duration_minutes} min)")
return {
"id": db_session_id,
"status": "completed",
"duration_minutes": duration_minutes,
"duration_hours": round(duration_minutes / 60, 2),
"ended_at": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error ending session: {str(e)}")
return {"error": str(e)}
async def get_session_history(
self,
contact_id: Optional[int] = None,
customer_id: Optional[int] = None,
sag_id: Optional[int] = None,
limit: int = 50,
offset: int = 0
) -> Dict[str, Any]:
"""
Get session history for a contact/customer/case
Args:
contact_id: Filter by contact
customer_id: Filter by customer
sag_id: Filter by case
limit: Number of sessions to return
offset: Pagination offset
Returns:
List of sessions with details
"""
try:
# Build dynamic query based on filters
where_clauses = []
params = []
if contact_id:
where_clauses.append("contact_id = %s")
params.append(contact_id)
if customer_id:
where_clauses.append("customer_id = %s")
params.append(customer_id)
if sag_id:
where_clauses.append("sag_id = %s")
params.append(sag_id)
where_clause = " AND ".join(where_clauses) if where_clauses else "1=1"
query = f"""
SELECT
s.id, s.anydesk_session_id, s.contact_id, s.customer_id, s.sag_id,
s.session_link, s.status, s.started_at, s.ended_at, s.duration_minutes,
s.created_by_user_id, s.created_at, s.updated_at,
c.first_name || ' ' || c.last_name as contact_name,
cust.name as customer_name,
sag.title as sag_title,
u.full_name as created_by_user_name,
s.device_info, s.metadata
FROM anydesk_sessions s
LEFT JOIN contacts c ON s.contact_id = c.id
LEFT JOIN customers cust ON s.customer_id = cust.id
LEFT JOIN sag_sager sag ON s.sag_id = sag.id
LEFT JOIN users u ON s.created_by_user_id = u.user_id
WHERE {where_clause}
ORDER BY s.started_at DESC
LIMIT %s OFFSET %s
"""
params.extend([limit, offset])
result = execute_query(query, tuple(params))
# Count total
count_query = f"""
SELECT COUNT(*) as total
FROM anydesk_sessions
WHERE {where_clause}
"""
count_result = execute_query(count_query, tuple(params[:-2]))
total = count_result[0]["total"] if count_result else 0
return {
"sessions": result or [],
"total": total,
"limit": limit,
"offset": offset
}
except Exception as e:
logger.error(f"Error fetching session history: {str(e)}")
return {"error": str(e), "sessions": []}