bmc_hub/app/modules/links/backend/service.py
Christian bc504b9257 feat: Add subscription management functionality and AnyDesk API integration
- Implemented subscription creation, updating, and rendering in script_9.js.
- Added functions for handling subscription line items, product selection, and total calculations.
- Integrated AnyDesk API for session management in test_anydesk.py.
- Created REST client test requests for API endpoints in api.http.
- Developed a script to check ESET machine status and save details in tmp_check_eset_machine.py.
2026-03-30 07:50:15 +02:00

230 lines
7.3 KiB
Python

import json
import logging
from typing import Dict, List, Optional
from app.core.database import execute_query, execute_query_single
from app.modules.links.models.schemas import LinkActionResult, LinkScope, LinkType
logger = logging.getLogger(__name__)
def _get_case(case_id: int) -> Optional[dict]:
return execute_query_single(
"SELECT id, customer_id FROM sag_sager WHERE id = %s AND deleted_at IS NULL",
(case_id,),
)
def _get_case_hardware_ids(case_id: int) -> List[int]:
rows = execute_query(
"SELECT hardware_id FROM sag_hardware WHERE sag_id = %s",
(case_id,),
) or []
return [int(row["hardware_id"]) for row in rows if row.get("hardware_id") is not None]
def _get_tag_ids_for_entity(entity_type: str, entity_id: int) -> List[int]:
rows = execute_query(
"SELECT tag_id FROM entity_tags WHERE entity_type = %s AND entity_id = %s",
(entity_type, entity_id),
) or []
return [int(row["tag_id"]) for row in rows if row.get("tag_id") is not None]
def _get_link_tag_map(link_ids: List[int]) -> Dict[int, List[int]]:
if not link_ids:
return {}
rows = execute_query(
"""
SELECT entity_id AS link_id, tag_id
FROM entity_tags
WHERE entity_type = 'link'
AND entity_id = ANY(%s)
""",
(link_ids,),
) or []
out: Dict[int, List[int]] = {link_id: [] for link_id in link_ids}
for row in rows:
link_id = int(row.get("link_id"))
tag_id = int(row.get("tag_id"))
out.setdefault(link_id, []).append(tag_id)
return out
def _get_link_category_map(link_ids: List[int]) -> Dict[int, List[int]]:
if not link_ids:
return {}
rows = execute_query(
"""
SELECT link_id, category_id
FROM link_category_map
WHERE link_id = ANY(%s)
""",
(link_ids,),
) or []
out: Dict[int, List[int]] = {link_id: [] for link_id in link_ids}
for row in rows:
link_id = int(row.get("link_id"))
category_id = int(row.get("category_id"))
out.setdefault(link_id, []).append(category_id)
return out
def _resolve_scope(link_row: dict, case_id: int, case_customer_id: Optional[int], case_hardware_ids: List[int]) -> tuple[LinkScope, int]:
if link_row.get("case_id") == case_id:
return (LinkScope.case, 1)
if case_customer_id and link_row.get("customer_id") == case_customer_id:
return (LinkScope.customer, 2)
if link_row.get("hardware_id") in case_hardware_ids:
return (LinkScope.hardware, 3)
return (LinkScope.global_scope, 4)
def get_relevant_links(case_id: int, limit: int = 50) -> List[dict]:
case_row = _get_case(case_id)
if not case_row:
return []
case_customer_id = case_row.get("customer_id")
case_hardware_ids = _get_case_hardware_ids(case_id)
case_tag_ids = set(_get_tag_ids_for_entity("case", case_id))
candidate_query = """
SELECT *
FROM links
WHERE deleted_at IS NULL
AND (
case_id = %s
OR (%s IS NOT NULL AND customer_id = %s)
OR (hardware_id IS NOT NULL AND hardware_id = ANY(%s))
OR (case_id IS NULL AND customer_id IS NULL AND hardware_id IS NULL)
)
"""
candidate_rows = execute_query(
candidate_query,
(case_id, case_customer_id, case_customer_id, case_hardware_ids or [0]),
) or []
link_ids = [int(row["id"]) for row in candidate_rows]
link_tag_map = _get_link_tag_map(link_ids)
link_category_map = _get_link_category_map(link_ids)
scored: List[dict] = []
for row in candidate_rows:
link_id = int(row["id"])
link_tags = set(link_tag_map.get(link_id, []))
matched_tags = sorted(case_tag_ids.intersection(link_tags))
scope, scope_priority = _resolve_scope(row, case_id, case_customer_id, case_hardware_ids)
if not matched_tags and scope != LinkScope.case and not row.get("is_critical"):
continue
score = 0
if case_customer_id and row.get("customer_id") == case_customer_id:
score += 3
if row.get("is_critical"):
score += 2
score += len(matched_tags)
row["scope"] = scope.value
row["scope_priority"] = scope_priority
row["score"] = score
row["match_count"] = len(matched_tags)
row["matched_tag_ids"] = matched_tags
row["category_ids"] = link_category_map.get(link_id, [])
scored.append(row)
scored.sort(
key=lambda item: (
item["scope_priority"],
-int(item.get("is_critical") is True),
-item["score"],
item.get("name") or "",
)
)
return scored[:limit]
def update_link_categories(link_id: int, category_ids: List[int]) -> None:
execute_query("DELETE FROM link_category_map WHERE link_id = %s", (link_id,))
if not category_ids:
return
values = []
params: List[int] = []
for category_id in category_ids:
values.append("(%s, %s)")
params.extend([link_id, category_id])
query = f"INSERT INTO link_category_map (link_id, category_id) VALUES {', '.join(values)} ON CONFLICT DO NOTHING"
execute_query(query, tuple(params))
def get_link_category_ids(link_id: int) -> List[int]:
rows = execute_query(
"SELECT category_id FROM link_category_map WHERE link_id = %s ORDER BY category_id",
(link_id,),
) or []
return [int(row["category_id"]) for row in rows]
def log_access(link_id: int, user_id: Optional[int], action_type: str, case_id: Optional[int], customer_id: Optional[int], metadata: Optional[dict]) -> None:
execute_query(
"""
INSERT INTO link_access_log (link_id, user_id, action_type, case_id, customer_id, metadata)
VALUES (%s, %s, %s, %s, %s, %s::jsonb)
""",
(link_id, user_id, action_type, case_id, customer_id, json.dumps(metadata or {})),
)
def build_action_result(link_row: dict, action_type: str) -> LinkActionResult:
link_type = LinkType(link_row["type"])
host = link_row.get("host")
port = link_row.get("port")
username = link_row.get("username")
ssh_command = None
rdp_content = None
command_text = None
open_url = link_row.get("url")
if link_type == LinkType.ssh:
if host:
base = "ssh"
if username:
base += f" {username}@{host}"
else:
base += f" {host}"
if port:
base += f" -p {port}"
ssh_command = base
if link_type == LinkType.rdp and host:
rdp_port = port or 3389
rdp_content = f"full address:s:{host}:{rdp_port}\nusername:s:{username or ''}\nprompt for credentials:i:1\n"
if link_type == LinkType.command:
command_text = link_row.get("url") or link_row.get("description") or ""
if link_type in (LinkType.ssh, LinkType.rdp) and not open_url and host:
open_url = host
return LinkActionResult(
link_id=int(link_row["id"]),
action_type=action_type,
type=link_type,
open_url=open_url,
ssh_command=ssh_command,
rdp_content=rdp_content,
command_text=command_text,
username=username,
vault_item_id=link_row.get("vault_item_id"),
vault_search_hint=host or link_row.get("url") or None,
)