- Implement SmsService class for sending SMS via CPSMS API. - Add SMS sending functionality in the frontend with validation and user feedback. - Create database migrations for SMS message storage and telephony features. - Introduce telephony settings and user-specific configurations for click-to-call functionality. - Enhance user experience with toast notifications for incoming calls and actions.
241 lines
9.4 KiB
Python
241 lines
9.4 KiB
Python
"""
|
|
ESET PROTECT Integration Service
|
|
"""
|
|
import logging
|
|
import time
|
|
import httpx
|
|
from typing import Dict, Optional, Any, List, Mapping, Sequence, Tuple, Union
|
|
from app.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class EsetService:
|
|
def __init__(self):
|
|
self.base_url = settings.ESET_API_URL.rstrip('/')
|
|
self.iam_url = settings.ESET_IAM_URL.rstrip('/')
|
|
self.incidents_url = settings.ESET_INCIDENTS_URL.rstrip('/')
|
|
self.username = settings.ESET_USERNAME
|
|
self.password = settings.ESET_PASSWORD
|
|
self.client_id = settings.ESET_OAUTH_CLIENT_ID
|
|
self.client_secret = settings.ESET_OAUTH_CLIENT_SECRET
|
|
self.scope = settings.ESET_OAUTH_SCOPE
|
|
self.enabled = settings.ESET_ENABLED
|
|
# Disable SSL verification for ESET usually (self-signed certs common)
|
|
# In production this should be configurable
|
|
self.verify_ssl = False
|
|
self._access_token = None
|
|
self._access_token_expires_at = 0.0
|
|
|
|
async def _authenticate(self, client: httpx.AsyncClient) -> Optional[str]:
|
|
"""Authenticate and return access token."""
|
|
if not self.enabled:
|
|
return None
|
|
|
|
try:
|
|
# OAuth token endpoint via ESET Connect IAM
|
|
url = f"{self.iam_url}/oauth/token"
|
|
|
|
payload = {
|
|
"grant_type": "password",
|
|
"username": self.username,
|
|
"password": self.password
|
|
}
|
|
if self.scope:
|
|
payload["scope"] = self.scope
|
|
if self.client_id:
|
|
payload["client_id"] = self.client_id
|
|
|
|
auth = None
|
|
if self.client_id and self.client_secret:
|
|
auth = (self.client_id, self.client_secret)
|
|
|
|
logger.info(f"Authenticating with ESET IAM at {url}")
|
|
response = await client.post(
|
|
url,
|
|
data=payload,
|
|
auth=auth,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
token_data = response.json()
|
|
access_token = token_data.get("access_token")
|
|
expires_in = token_data.get("expires_in", 3600)
|
|
if access_token:
|
|
self._access_token = access_token
|
|
self._access_token_expires_at = time.time() + int(expires_in) - 30
|
|
logger.info("✅ ESET Authentication successful")
|
|
return access_token
|
|
logger.error("❌ ESET Auth failed: missing access_token")
|
|
return None
|
|
else:
|
|
logger.error(f"❌ ESET Auth failed: {response.status_code} - {response.text[:200]}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"❌ ESET Auth error: {str(e)}")
|
|
return None
|
|
|
|
async def _get_access_token(self, client: httpx.AsyncClient) -> Optional[str]:
|
|
if self._access_token and time.time() < self._access_token_expires_at:
|
|
return self._access_token
|
|
return await self._authenticate(client)
|
|
|
|
async def _get_json(
|
|
self,
|
|
client: httpx.AsyncClient,
|
|
url: str,
|
|
params: Optional[Union[Mapping[str, Any], Sequence[Tuple[str, str]]]] = None,
|
|
) -> Optional[Dict[str, Any]]:
|
|
token = await self._get_access_token(client)
|
|
if not token:
|
|
return None
|
|
|
|
response = await client.get(url, params=params, headers={"Authorization": f"Bearer {token}"})
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
logger.error(
|
|
"ESET API error %s for %s: %s",
|
|
response.status_code,
|
|
url,
|
|
(response.text or "")[:500]
|
|
)
|
|
return None
|
|
|
|
async def list_devices(self, page_size: Optional[int] = None, page_token: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
|
"""List devices from ESET Device Management."""
|
|
if not self.enabled:
|
|
logger.warning("ESET not enabled")
|
|
return None
|
|
|
|
url = f"{self.base_url}/v1/devices"
|
|
params: Dict[str, Any] = {}
|
|
if page_size:
|
|
params["pageSize"] = page_size
|
|
if page_token:
|
|
params["pageToken"] = page_token
|
|
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client:
|
|
payload = await self._get_json(client, url, params=params or None)
|
|
if not payload:
|
|
logger.warning("ESET devices payload empty")
|
|
return payload
|
|
|
|
async def list_incidents(self) -> Optional[Dict[str, Any]]:
|
|
"""List incidents from ESET Incident Management."""
|
|
if not self.enabled:
|
|
logger.warning("ESET not enabled")
|
|
return None
|
|
|
|
url = f"{self.incidents_url}/v1/incidents"
|
|
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client:
|
|
return await self._get_json(client, url)
|
|
|
|
async def get_device_details(self, device_uuid: str) -> Optional[Dict[str, Any]]:
|
|
"""Fetch device details from ESET by UUID"""
|
|
if not self.enabled:
|
|
logger.warning("ESET not enabled")
|
|
return None
|
|
|
|
url = f"{self.base_url}/v1/devices/{device_uuid}"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client:
|
|
response = await self._get_json(client, url)
|
|
if response is None:
|
|
return None
|
|
return response
|
|
except Exception as e:
|
|
logger.error(f"ESET API error: {str(e)}")
|
|
return None
|
|
|
|
async def get_devices_batch(self, device_uuids: List[str]) -> Optional[Dict[str, Any]]:
|
|
"""Fetch multiple devices in one call via /v1/devices:batchGet."""
|
|
if not self.enabled:
|
|
logger.warning("ESET not enabled")
|
|
return None
|
|
|
|
uuids = [str(u).strip() for u in (device_uuids or []) if str(u).strip()]
|
|
if not uuids:
|
|
return {"devices": []}
|
|
|
|
url = f"{self.base_url}/v1/devices:batchGet"
|
|
params = [("devicesUuids", u) for u in uuids]
|
|
|
|
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=settings.ESET_TIMEOUT_SECONDS) as client:
|
|
payload = await self._get_json(client, url, params=params)
|
|
if not payload:
|
|
logger.warning("ESET batchGet payload empty")
|
|
return payload
|
|
|
|
@staticmethod
|
|
def extract_installed_software(device_payload: Dict[str, Any]) -> List[str]:
|
|
"""Extract installed software names/versions from ESET device payload."""
|
|
if not isinstance(device_payload, dict):
|
|
return []
|
|
|
|
device_raw = device_payload.get("device") if isinstance(device_payload.get("device"), dict) else device_payload
|
|
if not isinstance(device_raw, dict):
|
|
return []
|
|
def _normalize_version(value: Any) -> str:
|
|
if isinstance(value, dict):
|
|
name = str(value.get("name") or "").strip()
|
|
if name:
|
|
return name
|
|
version_id = str(value.get("id") or "").strip()
|
|
if version_id:
|
|
return version_id
|
|
major = value.get("major")
|
|
minor = value.get("minor")
|
|
patch = value.get("patch")
|
|
if major is not None and minor is not None and patch is not None:
|
|
return f"{major}.{minor}.{patch}"
|
|
return ""
|
|
if value is None:
|
|
return ""
|
|
return str(value).strip()
|
|
|
|
result: List[str] = []
|
|
|
|
def _add_item(name: Any, version: Any = None) -> None:
|
|
item_name = str(name or "").strip()
|
|
if not item_name:
|
|
return
|
|
item_version = _normalize_version(version)
|
|
result.append(f"{item_name} {item_version}".strip() if item_version else item_name)
|
|
|
|
for comp in device_raw.get("deployedComponents") or []:
|
|
if isinstance(comp, dict):
|
|
_add_item(comp.get("displayName") or comp.get("name"), comp.get("version"))
|
|
elif isinstance(comp, str):
|
|
_add_item(comp)
|
|
|
|
for key in ("installedSoftware", "applications", "applicationInventory", "softwareInventory", "activeProducts"):
|
|
for comp in device_raw.get(key) or []:
|
|
if isinstance(comp, dict):
|
|
_add_item(
|
|
comp.get("displayName")
|
|
or comp.get("name")
|
|
or comp.get("softwareName")
|
|
or comp.get("applicationName")
|
|
or comp.get("productName")
|
|
or comp.get("product"),
|
|
comp.get("version")
|
|
or comp.get("applicationVersion")
|
|
or comp.get("softwareVersion")
|
|
or comp.get("productVersion"),
|
|
)
|
|
elif isinstance(comp, str):
|
|
_add_item(comp)
|
|
|
|
# keep order, remove duplicates
|
|
deduped: List[str] = []
|
|
seen = set()
|
|
for item in result:
|
|
key = item.lower()
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
deduped.append(item)
|
|
return deduped
|
|
|
|
eset_service = EsetService()
|