bmc_hub/app/modules/links/jobs/dead_link_check.py

144 lines
4.8 KiB
Python
Raw Normal View History

import asyncio
import json
import logging
import time
from typing import Optional, Tuple
import httpx
from app.core.config import settings
from app.core.database import execute_query
logger = logging.getLogger(__name__)
def _normalize_http_url(url: Optional[str], host: Optional[str]) -> Optional[str]:
candidate = (url or "").strip()
if not candidate and host:
candidate = host.strip()
if not candidate:
return None
if candidate.startswith("http://") or candidate.startswith("https://"):
return candidate
return f"http://{candidate}"
async def _check_http(client: httpx.AsyncClient, url: str) -> Tuple[str, dict]:
started = time.perf_counter()
try:
response = await client.get(url)
elapsed_ms = int((time.perf_counter() - started) * 1000)
status = "ok" if response.status_code < 400 else "down"
return status, {
"checker": "http",
"url": str(response.url),
"http_status": response.status_code,
"elapsed_ms": elapsed_ms,
}
except Exception as exc:
elapsed_ms = int((time.perf_counter() - started) * 1000)
return "down", {
"checker": "http",
"url": url,
"error": str(exc),
"elapsed_ms": elapsed_ms,
}
async def _check_tcp(host: str, port: int, timeout_seconds: int, checker: str) -> Tuple[str, dict]:
started = time.perf_counter()
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=float(timeout_seconds))
del reader
writer.close()
await writer.wait_closed()
elapsed_ms = int((time.perf_counter() - started) * 1000)
return "ok", {
"checker": checker,
"host": host,
"port": port,
"elapsed_ms": elapsed_ms,
}
except Exception as exc:
elapsed_ms = int((time.perf_counter() - started) * 1000)
return "down", {
"checker": checker,
"host": host,
"port": port,
"error": str(exc),
"elapsed_ms": elapsed_ms,
}
async def _evaluate_link(row: dict, client: httpx.AsyncClient, timeout_seconds: int) -> Tuple[str, dict]:
link_type = row.get("type")
host = row.get("host")
port = row.get("port")
url = row.get("url")
if link_type == "http":
normalized_url = _normalize_http_url(url, host)
if not normalized_url:
return "unknown", {"checker": "http", "reason": "missing_url_or_host"}
return await _check_http(client, normalized_url)
if link_type == "ssh":
if not host:
return "unknown", {"checker": "tcp", "reason": "missing_host", "type": "ssh"}
return await _check_tcp(host, int(port or 22), timeout_seconds, "tcp-ssh")
if link_type == "rdp":
if not host:
return "unknown", {"checker": "tcp", "reason": "missing_host", "type": "rdp"}
return await _check_tcp(host, int(port or 3389), timeout_seconds, "tcp-rdp")
if link_type == "command":
return "unknown", {"checker": "command", "reason": "not_probeable"}
return "unknown", {"checker": "unknown", "reason": f"unsupported_type:{link_type}"}
def _persist_status(link_id: int, status: str, details: dict) -> None:
execute_query(
"""
INSERT INTO link_status_checks (link_id, status, details)
VALUES (%s, %s, %s::jsonb)
""",
(link_id, status, json.dumps(details or {})),
)
async def check_links_health():
rows = execute_query(
"SELECT id, type, url, host, port FROM links WHERE deleted_at IS NULL",
(),
) or []
timeout_seconds = max(1, int(settings.LINKS_CHECK_TIMEOUT_SECONDS))
if settings.LINKS_DRY_RUN:
for row in rows:
_persist_status(int(row["id"]), "unknown", {"reason": "dry_run_enabled"})
logger.info("✅ Links health check skipped by dry-run for %s links", len(rows))
return {"checked": len(rows), "ok": 0, "down": 0, "unknown": len(rows), "dry_run": True}
summary = {"checked": 0, "ok": 0, "down": 0, "unknown": 0, "dry_run": False}
timeout = httpx.Timeout(connect=float(timeout_seconds), read=float(timeout_seconds), write=float(timeout_seconds), pool=float(timeout_seconds))
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
for row in rows:
link_id = int(row["id"])
status, details = await _evaluate_link(row, client, timeout_seconds)
_persist_status(link_id, status, details)
summary["checked"] += 1
summary[status] += 1
logger.info(
"✅ Links health check completed: checked=%s ok=%s down=%s unknown=%s",
summary["checked"],
summary["ok"],
summary["down"],
summary["unknown"],
)
return summary