bmc_hub/app/services/eset_service.py
Christian 297a8ef2d6 feat: Implement ESET integration for hardware management
- Added ESET sync functionality to periodically fetch devices and incidents.
- Created new ESET service for API interactions, including authentication and data retrieval.
- Introduced new database tables for storing ESET incidents and hardware contacts.
- Updated hardware assets schema to include ESET-specific fields (UUID, specs, group).
- Developed frontend templates for ESET overview, import, and testing.
- Enhanced existing hardware creation form to auto-generate AnyDesk links.
- Added global logout functionality to clear user session data.
- Improved error handling and logging for ESET API interactions.
2026-02-11 13:23:32 +01:00

141 lines
5.4 KiB
Python

"""
ESET PROTECT Integration Service
"""
import logging
import time
import httpx
from typing import Dict, Optional, Any
from app.core.config import settings
logger = logging.getLogger(__name__)
class EsetService:
def __init__(self):
self.base_url = settings.ESET_API_URL.rstrip('/')
self.iam_url = settings.ESET_IAM_URL.rstrip('/')
self.incidents_url = settings.ESET_INCIDENTS_URL.rstrip('/')
self.username = settings.ESET_USERNAME
self.password = settings.ESET_PASSWORD
self.client_id = settings.ESET_OAUTH_CLIENT_ID
self.client_secret = settings.ESET_OAUTH_CLIENT_SECRET
self.scope = settings.ESET_OAUTH_SCOPE
self.enabled = settings.ESET_ENABLED
# Disable SSL verification for ESET usually (self-signed certs common)
# In production this should be configurable
self.verify_ssl = False
self._access_token = None
self._access_token_expires_at = 0.0
async def _authenticate(self, client: httpx.AsyncClient) -> Optional[str]:
"""Authenticate and return access token."""
if not self.enabled:
return None
try:
# OAuth token endpoint via ESET Connect IAM
url = f"{self.iam_url}/oauth/token"
payload = {
"grant_type": "password",
"username": self.username,
"password": self.password
}
if self.scope:
payload["scope"] = self.scope
if self.client_id:
payload["client_id"] = self.client_id
auth = None
if self.client_id and self.client_secret:
auth = (self.client_id, self.client_secret)
logger.info(f"Authenticating with ESET IAM at {url}")
response = await client.post(
url,
data=payload,
auth=auth,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if response.status_code == 200:
token_data = response.json()
access_token = token_data.get("access_token")
expires_in = token_data.get("expires_in", 3600)
if access_token:
self._access_token = access_token
self._access_token_expires_at = time.time() + int(expires_in) - 30
logger.info("✅ ESET Authentication successful")
return access_token
logger.error("❌ ESET Auth failed: missing access_token")
return None
else:
logger.error(f"❌ ESET Auth failed: {response.status_code} - {response.text[:200]}")
return None
except Exception as e:
logger.error(f"❌ ESET Auth error: {str(e)}")
return None
async def _get_access_token(self, client: httpx.AsyncClient) -> Optional[str]:
if self._access_token and time.time() < self._access_token_expires_at:
return self._access_token
return await self._authenticate(client)
async def _get_json(self, client: httpx.AsyncClient, url: str, params: Optional[dict] = None) -> Optional[Dict[str, Any]]:
token = await self._get_access_token(client)
if not token:
return None
response = await client.get(url, params=params, headers={"Authorization": f"Bearer {token}"})
if response.status_code == 200:
return response.json()
logger.error(
"ESET API error %s for %s: %s",
response.status_code,
url,
(response.text or "")[:500]
)
return None
async def list_devices(self) -> Optional[Dict[str, Any]]:
"""List devices from ESET Device Management."""
if not self.enabled:
logger.warning("ESET not enabled")
return None
url = f"{self.base_url}/v1/devices"
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client:
payload = await self._get_json(client, url)
if not payload:
logger.warning("ESET devices payload empty")
return payload
async def list_incidents(self) -> Optional[Dict[str, Any]]:
"""List incidents from ESET Incident Management."""
if not self.enabled:
logger.warning("ESET not enabled")
return None
url = f"{self.incidents_url}/v1/incidents"
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client:
return await self._get_json(client, url)
async def get_device_details(self, device_uuid: str) -> Optional[Dict[str, Any]]:
"""Fetch device details from ESET by UUID"""
if not self.enabled:
logger.warning("ESET not enabled")
return None
url = f"{self.base_url}/v1/devices/{device_uuid}"
try:
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client:
response = await self._get_json(client, url)
if response is None:
return None
return response
except Exception as e:
logger.error(f"ESET API error: {str(e)}")
return None
eset_service = EsetService()