import asyncio import json import logging import time from typing import Optional, Tuple import httpx from app.core.config import settings from app.core.database import execute_query logger = logging.getLogger(__name__) def _normalize_http_url(url: Optional[str], host: Optional[str]) -> Optional[str]: candidate = (url or "").strip() if not candidate and host: candidate = host.strip() if not candidate: return None if candidate.startswith("http://") or candidate.startswith("https://"): return candidate return f"http://{candidate}" async def _check_http(client: httpx.AsyncClient, url: str) -> Tuple[str, dict]: started = time.perf_counter() try: response = await client.get(url) elapsed_ms = int((time.perf_counter() - started) * 1000) status = "ok" if response.status_code < 400 else "down" return status, { "checker": "http", "url": str(response.url), "http_status": response.status_code, "elapsed_ms": elapsed_ms, } except Exception as exc: elapsed_ms = int((time.perf_counter() - started) * 1000) return "down", { "checker": "http", "url": url, "error": str(exc), "elapsed_ms": elapsed_ms, } async def _check_tcp(host: str, port: int, timeout_seconds: int, checker: str) -> Tuple[str, dict]: started = time.perf_counter() try: reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=float(timeout_seconds)) del reader writer.close() await writer.wait_closed() elapsed_ms = int((time.perf_counter() - started) * 1000) return "ok", { "checker": checker, "host": host, "port": port, "elapsed_ms": elapsed_ms, } except Exception as exc: elapsed_ms = int((time.perf_counter() - started) * 1000) return "down", { "checker": checker, "host": host, "port": port, "error": str(exc), "elapsed_ms": elapsed_ms, } async def _evaluate_link(row: dict, client: httpx.AsyncClient, timeout_seconds: int) -> Tuple[str, dict]: link_type = row.get("type") host = row.get("host") port = row.get("port") url = row.get("url") if link_type == "http": normalized_url = _normalize_http_url(url, host) if not normalized_url: return "unknown", {"checker": "http", "reason": "missing_url_or_host"} return await _check_http(client, normalized_url) if link_type == "ssh": if not host: return "unknown", {"checker": "tcp", "reason": "missing_host", "type": "ssh"} return await _check_tcp(host, int(port or 22), timeout_seconds, "tcp-ssh") if link_type == "rdp": if not host: return "unknown", {"checker": "tcp", "reason": "missing_host", "type": "rdp"} return await _check_tcp(host, int(port or 3389), timeout_seconds, "tcp-rdp") if link_type == "command": return "unknown", {"checker": "command", "reason": "not_probeable"} return "unknown", {"checker": "unknown", "reason": f"unsupported_type:{link_type}"} def _persist_status(link_id: int, status: str, details: dict) -> None: execute_query( """ INSERT INTO link_status_checks (link_id, status, details) VALUES (%s, %s, %s::jsonb) """, (link_id, status, json.dumps(details or {})), ) async def check_links_health(): rows = execute_query( "SELECT id, type, url, host, port FROM links WHERE deleted_at IS NULL", (), ) or [] timeout_seconds = max(1, int(settings.LINKS_CHECK_TIMEOUT_SECONDS)) if settings.LINKS_DRY_RUN: for row in rows: _persist_status(int(row["id"]), "unknown", {"reason": "dry_run_enabled"}) logger.info("✅ Links health check skipped by dry-run for %s links", len(rows)) return {"checked": len(rows), "ok": 0, "down": 0, "unknown": len(rows), "dry_run": True} summary = {"checked": 0, "ok": 0, "down": 0, "unknown": 0, "dry_run": False} timeout = httpx.Timeout(connect=float(timeout_seconds), read=float(timeout_seconds), write=float(timeout_seconds), pool=float(timeout_seconds)) async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client: for row in rows: link_id = int(row["id"]) status, details = await _evaluate_link(row, client, timeout_seconds) _persist_status(link_id, status, details) summary["checked"] += 1 summary[status] += 1 logger.info( "✅ Links health check completed: checked=%s ok=%s down=%s unknown=%s", summary["checked"], summary["ok"], summary["down"], summary["unknown"], ) return summary