- Added a button to sync ESET data in the hardware detail view. - Introduced a new tab for ESET specifications, displaying relevant device information. - Included ESET UUID and group details in the hardware information section. - Implemented a JavaScript function to handle ESET data synchronization via API. - Updated the ESET import template to improve device listing and inline contact selection. - Enhanced the Nextcloud and locations routers to support customer ID resolution from contacts. - Added utility functions for retrieving customer IDs linked to contacts. - Removed debug information from the service contract wizard for cleaner output.
146 lines
5.6 KiB
Python
146 lines
5.6 KiB
Python
"""
|
|
ESET PROTECT Integration Service
|
|
"""
|
|
import logging
|
|
import time
|
|
import httpx
|
|
from typing import Dict, Optional, Any
|
|
from app.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class EsetService:
|
|
def __init__(self):
|
|
self.base_url = settings.ESET_API_URL.rstrip('/')
|
|
self.iam_url = settings.ESET_IAM_URL.rstrip('/')
|
|
self.incidents_url = settings.ESET_INCIDENTS_URL.rstrip('/')
|
|
self.username = settings.ESET_USERNAME
|
|
self.password = settings.ESET_PASSWORD
|
|
self.client_id = settings.ESET_OAUTH_CLIENT_ID
|
|
self.client_secret = settings.ESET_OAUTH_CLIENT_SECRET
|
|
self.scope = settings.ESET_OAUTH_SCOPE
|
|
self.enabled = settings.ESET_ENABLED
|
|
# Disable SSL verification for ESET usually (self-signed certs common)
|
|
# In production this should be configurable
|
|
self.verify_ssl = False
|
|
self._access_token = None
|
|
self._access_token_expires_at = 0.0
|
|
|
|
async def _authenticate(self, client: httpx.AsyncClient) -> Optional[str]:
|
|
"""Authenticate and return access token."""
|
|
if not self.enabled:
|
|
return None
|
|
|
|
try:
|
|
# OAuth token endpoint via ESET Connect IAM
|
|
url = f"{self.iam_url}/oauth/token"
|
|
|
|
payload = {
|
|
"grant_type": "password",
|
|
"username": self.username,
|
|
"password": self.password
|
|
}
|
|
if self.scope:
|
|
payload["scope"] = self.scope
|
|
if self.client_id:
|
|
payload["client_id"] = self.client_id
|
|
|
|
auth = None
|
|
if self.client_id and self.client_secret:
|
|
auth = (self.client_id, self.client_secret)
|
|
|
|
logger.info(f"Authenticating with ESET IAM at {url}")
|
|
response = await client.post(
|
|
url,
|
|
data=payload,
|
|
auth=auth,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
token_data = response.json()
|
|
access_token = token_data.get("access_token")
|
|
expires_in = token_data.get("expires_in", 3600)
|
|
if access_token:
|
|
self._access_token = access_token
|
|
self._access_token_expires_at = time.time() + int(expires_in) - 30
|
|
logger.info("✅ ESET Authentication successful")
|
|
return access_token
|
|
logger.error("❌ ESET Auth failed: missing access_token")
|
|
return None
|
|
else:
|
|
logger.error(f"❌ ESET Auth failed: {response.status_code} - {response.text[:200]}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"❌ ESET Auth error: {str(e)}")
|
|
return None
|
|
|
|
async def _get_access_token(self, client: httpx.AsyncClient) -> Optional[str]:
|
|
if self._access_token and time.time() < self._access_token_expires_at:
|
|
return self._access_token
|
|
return await self._authenticate(client)
|
|
|
|
async def _get_json(self, client: httpx.AsyncClient, url: str, params: Optional[dict] = None) -> Optional[Dict[str, Any]]:
|
|
token = await self._get_access_token(client)
|
|
if not token:
|
|
return None
|
|
|
|
response = await client.get(url, params=params, headers={"Authorization": f"Bearer {token}"})
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
logger.error(
|
|
"ESET API error %s for %s: %s",
|
|
response.status_code,
|
|
url,
|
|
(response.text or "")[:500]
|
|
)
|
|
return None
|
|
|
|
async def list_devices(self, page_size: Optional[int] = None, page_token: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
|
"""List devices from ESET Device Management."""
|
|
if not self.enabled:
|
|
logger.warning("ESET not enabled")
|
|
return None
|
|
|
|
url = f"{self.base_url}/v1/devices"
|
|
params: Dict[str, Any] = {}
|
|
if page_size:
|
|
params["pageSize"] = page_size
|
|
if page_token:
|
|
params["pageToken"] = page_token
|
|
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client:
|
|
payload = await self._get_json(client, url, params=params or None)
|
|
if not payload:
|
|
logger.warning("ESET devices payload empty")
|
|
return payload
|
|
|
|
async def list_incidents(self) -> Optional[Dict[str, Any]]:
|
|
"""List incidents from ESET Incident Management."""
|
|
if not self.enabled:
|
|
logger.warning("ESET not enabled")
|
|
return None
|
|
|
|
url = f"{self.incidents_url}/v1/incidents"
|
|
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client:
|
|
return await self._get_json(client, url)
|
|
|
|
async def get_device_details(self, device_uuid: str) -> Optional[Dict[str, Any]]:
|
|
"""Fetch device details from ESET by UUID"""
|
|
if not self.enabled:
|
|
logger.warning("ESET not enabled")
|
|
return None
|
|
|
|
url = f"{self.base_url}/v1/devices/{device_uuid}"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client:
|
|
response = await self._get_json(client, url)
|
|
if response is None:
|
|
return None
|
|
return response
|
|
except Exception as e:
|
|
logger.error(f"ESET API error: {str(e)}")
|
|
return None
|
|
|
|
eset_service = EsetService()
|