""" ESET PROTECT Integration Service """ import logging import time import httpx from typing import Dict, Optional, Any, List, Mapping, Sequence, Tuple, Union from app.core.config import settings logger = logging.getLogger(__name__) class EsetService: def __init__(self): self.base_url = settings.ESET_API_URL.rstrip('/') self.iam_url = settings.ESET_IAM_URL.rstrip('/') self.incidents_url = settings.ESET_INCIDENTS_URL.rstrip('/') self.username = settings.ESET_USERNAME self.password = settings.ESET_PASSWORD self.client_id = settings.ESET_OAUTH_CLIENT_ID self.client_secret = settings.ESET_OAUTH_CLIENT_SECRET self.scope = settings.ESET_OAUTH_SCOPE self.enabled = settings.ESET_ENABLED # Disable SSL verification for ESET usually (self-signed certs common) # In production this should be configurable self.verify_ssl = False self._access_token = None self._access_token_expires_at = 0.0 async def _authenticate(self, client: httpx.AsyncClient) -> Optional[str]: """Authenticate and return access token.""" if not self.enabled: return None try: # OAuth token endpoint via ESET Connect IAM url = f"{self.iam_url}/oauth/token" payload = { "grant_type": "password", "username": self.username, "password": self.password } if self.scope: payload["scope"] = self.scope if self.client_id: payload["client_id"] = self.client_id auth = None if self.client_id and self.client_secret: auth = (self.client_id, self.client_secret) logger.info(f"Authenticating with ESET IAM at {url}") response = await client.post( url, data=payload, auth=auth, headers={"Content-Type": "application/x-www-form-urlencoded"} ) if response.status_code == 200: token_data = response.json() access_token = token_data.get("access_token") expires_in = token_data.get("expires_in", 3600) if access_token: self._access_token = access_token self._access_token_expires_at = time.time() + int(expires_in) - 30 logger.info("✅ ESET Authentication successful") return access_token logger.error("❌ ESET Auth failed: missing access_token") return None else: logger.error(f"❌ ESET Auth failed: {response.status_code} - {response.text[:200]}") return None except Exception as e: logger.error(f"❌ ESET Auth error: {str(e)}") return None async def _get_access_token(self, client: httpx.AsyncClient) -> Optional[str]: if self._access_token and time.time() < self._access_token_expires_at: return self._access_token return await self._authenticate(client) async def _get_json( self, client: httpx.AsyncClient, url: str, params: Optional[Union[Mapping[str, Any], Sequence[Tuple[str, str]]]] = None, ) -> Optional[Dict[str, Any]]: token = await self._get_access_token(client) if not token: return None response = await client.get(url, params=params, headers={"Authorization": f"Bearer {token}"}) if response.status_code == 200: return response.json() logger.error( "ESET API error %s for %s: %s", response.status_code, url, (response.text or "")[:500] ) return None async def list_devices(self, page_size: Optional[int] = None, page_token: Optional[str] = None) -> Optional[Dict[str, Any]]: """List devices from ESET Device Management.""" if not self.enabled: logger.warning("ESET not enabled") return None url = f"{self.base_url}/v1/devices" params: Dict[str, Any] = {} if page_size: params["pageSize"] = page_size if page_token: params["pageToken"] = page_token async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client: payload = await self._get_json(client, url, params=params or None) if not payload: logger.warning("ESET devices payload empty") return payload async def list_incidents(self) -> Optional[Dict[str, Any]]: """List incidents from ESET Incident Management.""" if not self.enabled: logger.warning("ESET not enabled") return None url = f"{self.incidents_url}/v1/incidents" async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client: return await self._get_json(client, url) async def get_device_details(self, device_uuid: str) -> Optional[Dict[str, Any]]: """Fetch device details from ESET by UUID""" if not self.enabled: logger.warning("ESET not enabled") return None url = f"{self.base_url}/v1/devices/{device_uuid}" try: async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client: response = await self._get_json(client, url) if response is None: return None return response except Exception as e: logger.error(f"ESET API error: {str(e)}") return None async def get_devices_batch(self, device_uuids: List[str]) -> Optional[Dict[str, Any]]: """Fetch multiple devices in one call via /v1/devices:batchGet.""" if not self.enabled: logger.warning("ESET not enabled") return None uuids = [str(u).strip() for u in (device_uuids or []) if str(u).strip()] if not uuids: return {"devices": []} url = f"{self.base_url}/v1/devices:batchGet" params = [("devicesUuids", u) for u in uuids] async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client: payload = await self._get_json(client, url, params=params) if not payload: logger.warning("ESET batchGet payload empty") return payload @staticmethod def extract_installed_software(device_payload: Dict[str, Any]) -> List[str]: """Extract installed software names/versions from ESET device payload.""" if not isinstance(device_payload, dict): return [] device_raw = device_payload.get("device") if isinstance(device_payload.get("device"), dict) else device_payload if not isinstance(device_raw, dict): return [] def _normalize_version(value: Any) -> str: if isinstance(value, dict): name = str(value.get("name") or "").strip() if name: return name version_id = str(value.get("id") or "").strip() if version_id: return version_id major = value.get("major") minor = value.get("minor") patch = value.get("patch") if major is not None and minor is not None and patch is not None: return f"{major}.{minor}.{patch}" return "" if value is None: return "" return str(value).strip() result: List[str] = [] def _add_item(name: Any, version: Any = None) -> None: item_name = str(name or "").strip() if not item_name: return item_version = _normalize_version(version) result.append(f"{item_name} {item_version}".strip() if item_version else item_name) for comp in device_raw.get("deployedComponents") or []: if isinstance(comp, dict): _add_item(comp.get("displayName") or comp.get("name"), comp.get("version")) elif isinstance(comp, str): _add_item(comp) for key in ("installedSoftware", "applications", "applicationInventory", "softwareInventory", "activeProducts"): for comp in device_raw.get(key) or []: if isinstance(comp, dict): _add_item( comp.get("displayName") or comp.get("name") or comp.get("softwareName") or comp.get("applicationName") or comp.get("productName") or comp.get("product"), comp.get("version") or comp.get("applicationVersion") or comp.get("softwareVersion") or comp.get("productVersion"), ) elif isinstance(comp, str): _add_item(comp) # keep order, remove duplicates deduped: List[str] = [] seen = set() for item in result: key = item.lower() if key in seen: continue seen.add(key) deduped.append(item) return deduped eset_service = EsetService()