bmc_hub/app/services/eset_service.py

241 lines
9.4 KiB
Python
Raw Normal View History

"""
ESET PROTECT Integration Service
"""
import logging
import time
import httpx
from typing import Dict, Optional, Any, List, Mapping, Sequence, Tuple, Union
from app.core.config import settings
logger = logging.getLogger(__name__)
class EsetService:
def __init__(self):
self.base_url = settings.ESET_API_URL.rstrip('/')
self.iam_url = settings.ESET_IAM_URL.rstrip('/')
self.incidents_url = settings.ESET_INCIDENTS_URL.rstrip('/')
self.username = settings.ESET_USERNAME
self.password = settings.ESET_PASSWORD
self.client_id = settings.ESET_OAUTH_CLIENT_ID
self.client_secret = settings.ESET_OAUTH_CLIENT_SECRET
self.scope = settings.ESET_OAUTH_SCOPE
self.enabled = settings.ESET_ENABLED
# Disable SSL verification for ESET usually (self-signed certs common)
# In production this should be configurable
self.verify_ssl = False
self._access_token = None
self._access_token_expires_at = 0.0
async def _authenticate(self, client: httpx.AsyncClient) -> Optional[str]:
"""Authenticate and return access token."""
if not self.enabled:
return None
try:
# OAuth token endpoint via ESET Connect IAM
url = f"{self.iam_url}/oauth/token"
payload = {
"grant_type": "password",
"username": self.username,
"password": self.password
}
if self.scope:
payload["scope"] = self.scope
if self.client_id:
payload["client_id"] = self.client_id
auth = None
if self.client_id and self.client_secret:
auth = (self.client_id, self.client_secret)
logger.info(f"Authenticating with ESET IAM at {url}")
response = await client.post(
url,
data=payload,
auth=auth,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if response.status_code == 200:
token_data = response.json()
access_token = token_data.get("access_token")
expires_in = token_data.get("expires_in", 3600)
if access_token:
self._access_token = access_token
self._access_token_expires_at = time.time() + int(expires_in) - 30
logger.info("✅ ESET Authentication successful")
return access_token
logger.error("❌ ESET Auth failed: missing access_token")
return None
else:
logger.error(f"❌ ESET Auth failed: {response.status_code} - {response.text[:200]}")
return None
except Exception as e:
logger.error(f"❌ ESET Auth error: {str(e)}")
return None
async def _get_access_token(self, client: httpx.AsyncClient) -> Optional[str]:
if self._access_token and time.time() < self._access_token_expires_at:
return self._access_token
return await self._authenticate(client)
async def _get_json(
self,
client: httpx.AsyncClient,
url: str,
params: Optional[Union[Mapping[str, Any], Sequence[Tuple[str, str]]]] = None,
) -> Optional[Dict[str, Any]]:
token = await self._get_access_token(client)
if not token:
return None
response = await client.get(url, params=params, headers={"Authorization": f"Bearer {token}"})
if response.status_code == 200:
return response.json()
logger.error(
"ESET API error %s for %s: %s",
response.status_code,
url,
(response.text or "")[:500]
)
return None
async def list_devices(self, page_size: Optional[int] = None, page_token: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""List devices from ESET Device Management."""
if not self.enabled:
logger.warning("ESET not enabled")
return None
url = f"{self.base_url}/v1/devices"
params: Dict[str, Any] = {}
if page_size:
params["pageSize"] = page_size
if page_token:
params["pageToken"] = page_token
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client:
payload = await self._get_json(client, url, params=params or None)
if not payload:
logger.warning("ESET devices payload empty")
return payload
async def list_incidents(self) -> Optional[Dict[str, Any]]:
"""List incidents from ESET Incident Management."""
if not self.enabled:
logger.warning("ESET not enabled")
return None
url = f"{self.incidents_url}/v1/incidents"
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client:
return await self._get_json(client, url)
async def get_device_details(self, device_uuid: str) -> Optional[Dict[str, Any]]:
"""Fetch device details from ESET by UUID"""
if not self.enabled:
logger.warning("ESET not enabled")
return None
url = f"{self.base_url}/v1/devices/{device_uuid}"
try:
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client:
response = await self._get_json(client, url)
if response is None:
return None
return response
except Exception as e:
logger.error(f"ESET API error: {str(e)}")
return None
async def get_devices_batch(self, device_uuids: List[str]) -> Optional[Dict[str, Any]]:
"""Fetch multiple devices in one call via /v1/devices:batchGet."""
if not self.enabled:
logger.warning("ESET not enabled")
return None
uuids = [str(u).strip() for u in (device_uuids or []) if str(u).strip()]
if not uuids:
return {"devices": []}
url = f"{self.base_url}/v1/devices:batchGet"
params = [("devicesUuids", u) for u in uuids]
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client:
payload = await self._get_json(client, url, params=params)
if not payload:
logger.warning("ESET batchGet payload empty")
return payload
@staticmethod
def extract_installed_software(device_payload: Dict[str, Any]) -> List[str]:
"""Extract installed software names/versions from ESET device payload."""
if not isinstance(device_payload, dict):
return []
device_raw = device_payload.get("device") if isinstance(device_payload.get("device"), dict) else device_payload
if not isinstance(device_raw, dict):
return []
def _normalize_version(value: Any) -> str:
if isinstance(value, dict):
name = str(value.get("name") or "").strip()
if name:
return name
version_id = str(value.get("id") or "").strip()
if version_id:
return version_id
major = value.get("major")
minor = value.get("minor")
patch = value.get("patch")
if major is not None and minor is not None and patch is not None:
return f"{major}.{minor}.{patch}"
return ""
if value is None:
return ""
return str(value).strip()
result: List[str] = []
def _add_item(name: Any, version: Any = None) -> None:
item_name = str(name or "").strip()
if not item_name:
return
item_version = _normalize_version(version)
result.append(f"{item_name} {item_version}".strip() if item_version else item_name)
for comp in device_raw.get("deployedComponents") or []:
if isinstance(comp, dict):
_add_item(comp.get("displayName") or comp.get("name"), comp.get("version"))
elif isinstance(comp, str):
_add_item(comp)
for key in ("installedSoftware", "applications", "applicationInventory", "softwareInventory", "activeProducts"):
for comp in device_raw.get(key) or []:
if isinstance(comp, dict):
_add_item(
comp.get("displayName")
or comp.get("name")
or comp.get("softwareName")
or comp.get("applicationName")
or comp.get("productName")
or comp.get("product"),
comp.get("version")
or comp.get("applicationVersion")
or comp.get("softwareVersion")
or comp.get("productVersion"),
)
elif isinstance(comp, str):
_add_item(comp)
# keep order, remove duplicates
deduped: List[str] = []
seen = set()
for item in result:
key = item.lower()
if key in seen:
continue
seen.add(key)
deduped.append(item)
return deduped
eset_service = EsetService()