230 lines
7.3 KiB
Python
230 lines
7.3 KiB
Python
|
|
import json
|
||
|
|
import logging
|
||
|
|
from typing import Dict, List, Optional
|
||
|
|
|
||
|
|
from app.core.database import execute_query, execute_query_single
|
||
|
|
from app.modules.links.models.schemas import LinkActionResult, LinkScope, LinkType
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
def _get_case(case_id: int) -> Optional[dict]:
|
||
|
|
return execute_query_single(
|
||
|
|
"SELECT id, customer_id FROM sag_sager WHERE id = %s AND deleted_at IS NULL",
|
||
|
|
(case_id,),
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def _get_case_hardware_ids(case_id: int) -> List[int]:
|
||
|
|
rows = execute_query(
|
||
|
|
"SELECT hardware_id FROM sag_hardware WHERE sag_id = %s",
|
||
|
|
(case_id,),
|
||
|
|
) or []
|
||
|
|
return [int(row["hardware_id"]) for row in rows if row.get("hardware_id") is not None]
|
||
|
|
|
||
|
|
|
||
|
|
def _get_tag_ids_for_entity(entity_type: str, entity_id: int) -> List[int]:
|
||
|
|
rows = execute_query(
|
||
|
|
"SELECT tag_id FROM entity_tags WHERE entity_type = %s AND entity_id = %s",
|
||
|
|
(entity_type, entity_id),
|
||
|
|
) or []
|
||
|
|
return [int(row["tag_id"]) for row in rows if row.get("tag_id") is not None]
|
||
|
|
|
||
|
|
|
||
|
|
def _get_link_tag_map(link_ids: List[int]) -> Dict[int, List[int]]:
|
||
|
|
if not link_ids:
|
||
|
|
return {}
|
||
|
|
|
||
|
|
rows = execute_query(
|
||
|
|
"""
|
||
|
|
SELECT entity_id AS link_id, tag_id
|
||
|
|
FROM entity_tags
|
||
|
|
WHERE entity_type = 'link'
|
||
|
|
AND entity_id = ANY(%s)
|
||
|
|
""",
|
||
|
|
(link_ids,),
|
||
|
|
) or []
|
||
|
|
|
||
|
|
out: Dict[int, List[int]] = {link_id: [] for link_id in link_ids}
|
||
|
|
for row in rows:
|
||
|
|
link_id = int(row.get("link_id"))
|
||
|
|
tag_id = int(row.get("tag_id"))
|
||
|
|
out.setdefault(link_id, []).append(tag_id)
|
||
|
|
return out
|
||
|
|
|
||
|
|
|
||
|
|
def _get_link_category_map(link_ids: List[int]) -> Dict[int, List[int]]:
|
||
|
|
if not link_ids:
|
||
|
|
return {}
|
||
|
|
|
||
|
|
rows = execute_query(
|
||
|
|
"""
|
||
|
|
SELECT link_id, category_id
|
||
|
|
FROM link_category_map
|
||
|
|
WHERE link_id = ANY(%s)
|
||
|
|
""",
|
||
|
|
(link_ids,),
|
||
|
|
) or []
|
||
|
|
|
||
|
|
out: Dict[int, List[int]] = {link_id: [] for link_id in link_ids}
|
||
|
|
for row in rows:
|
||
|
|
link_id = int(row.get("link_id"))
|
||
|
|
category_id = int(row.get("category_id"))
|
||
|
|
out.setdefault(link_id, []).append(category_id)
|
||
|
|
return out
|
||
|
|
|
||
|
|
|
||
|
|
def _resolve_scope(link_row: dict, case_id: int, case_customer_id: Optional[int], case_hardware_ids: List[int]) -> tuple[LinkScope, int]:
|
||
|
|
if link_row.get("case_id") == case_id:
|
||
|
|
return (LinkScope.case, 1)
|
||
|
|
if case_customer_id and link_row.get("customer_id") == case_customer_id:
|
||
|
|
return (LinkScope.customer, 2)
|
||
|
|
if link_row.get("hardware_id") in case_hardware_ids:
|
||
|
|
return (LinkScope.hardware, 3)
|
||
|
|
return (LinkScope.global_scope, 4)
|
||
|
|
|
||
|
|
|
||
|
|
def get_relevant_links(case_id: int, limit: int = 50) -> List[dict]:
|
||
|
|
case_row = _get_case(case_id)
|
||
|
|
if not case_row:
|
||
|
|
return []
|
||
|
|
|
||
|
|
case_customer_id = case_row.get("customer_id")
|
||
|
|
case_hardware_ids = _get_case_hardware_ids(case_id)
|
||
|
|
case_tag_ids = set(_get_tag_ids_for_entity("case", case_id))
|
||
|
|
|
||
|
|
candidate_query = """
|
||
|
|
SELECT *
|
||
|
|
FROM links
|
||
|
|
WHERE deleted_at IS NULL
|
||
|
|
AND (
|
||
|
|
case_id = %s
|
||
|
|
OR (%s IS NOT NULL AND customer_id = %s)
|
||
|
|
OR (hardware_id IS NOT NULL AND hardware_id = ANY(%s))
|
||
|
|
OR (case_id IS NULL AND customer_id IS NULL AND hardware_id IS NULL)
|
||
|
|
)
|
||
|
|
"""
|
||
|
|
candidate_rows = execute_query(
|
||
|
|
candidate_query,
|
||
|
|
(case_id, case_customer_id, case_customer_id, case_hardware_ids or [0]),
|
||
|
|
) or []
|
||
|
|
|
||
|
|
link_ids = [int(row["id"]) for row in candidate_rows]
|
||
|
|
link_tag_map = _get_link_tag_map(link_ids)
|
||
|
|
link_category_map = _get_link_category_map(link_ids)
|
||
|
|
|
||
|
|
scored: List[dict] = []
|
||
|
|
for row in candidate_rows:
|
||
|
|
link_id = int(row["id"])
|
||
|
|
link_tags = set(link_tag_map.get(link_id, []))
|
||
|
|
matched_tags = sorted(case_tag_ids.intersection(link_tags))
|
||
|
|
|
||
|
|
scope, scope_priority = _resolve_scope(row, case_id, case_customer_id, case_hardware_ids)
|
||
|
|
|
||
|
|
if not matched_tags and scope != LinkScope.case and not row.get("is_critical"):
|
||
|
|
continue
|
||
|
|
|
||
|
|
score = 0
|
||
|
|
if case_customer_id and row.get("customer_id") == case_customer_id:
|
||
|
|
score += 3
|
||
|
|
if row.get("is_critical"):
|
||
|
|
score += 2
|
||
|
|
score += len(matched_tags)
|
||
|
|
|
||
|
|
row["scope"] = scope.value
|
||
|
|
row["scope_priority"] = scope_priority
|
||
|
|
row["score"] = score
|
||
|
|
row["match_count"] = len(matched_tags)
|
||
|
|
row["matched_tag_ids"] = matched_tags
|
||
|
|
row["category_ids"] = link_category_map.get(link_id, [])
|
||
|
|
scored.append(row)
|
||
|
|
|
||
|
|
scored.sort(
|
||
|
|
key=lambda item: (
|
||
|
|
item["scope_priority"],
|
||
|
|
-int(item.get("is_critical") is True),
|
||
|
|
-item["score"],
|
||
|
|
item.get("name") or "",
|
||
|
|
)
|
||
|
|
)
|
||
|
|
return scored[:limit]
|
||
|
|
|
||
|
|
|
||
|
|
def update_link_categories(link_id: int, category_ids: List[int]) -> None:
|
||
|
|
execute_query("DELETE FROM link_category_map WHERE link_id = %s", (link_id,))
|
||
|
|
if not category_ids:
|
||
|
|
return
|
||
|
|
|
||
|
|
values = []
|
||
|
|
params: List[int] = []
|
||
|
|
for category_id in category_ids:
|
||
|
|
values.append("(%s, %s)")
|
||
|
|
params.extend([link_id, category_id])
|
||
|
|
|
||
|
|
query = f"INSERT INTO link_category_map (link_id, category_id) VALUES {', '.join(values)} ON CONFLICT DO NOTHING"
|
||
|
|
execute_query(query, tuple(params))
|
||
|
|
|
||
|
|
|
||
|
|
def get_link_category_ids(link_id: int) -> List[int]:
|
||
|
|
rows = execute_query(
|
||
|
|
"SELECT category_id FROM link_category_map WHERE link_id = %s ORDER BY category_id",
|
||
|
|
(link_id,),
|
||
|
|
) or []
|
||
|
|
return [int(row["category_id"]) for row in rows]
|
||
|
|
|
||
|
|
|
||
|
|
def log_access(link_id: int, user_id: Optional[int], action_type: str, case_id: Optional[int], customer_id: Optional[int], metadata: Optional[dict]) -> None:
|
||
|
|
execute_query(
|
||
|
|
"""
|
||
|
|
INSERT INTO link_access_log (link_id, user_id, action_type, case_id, customer_id, metadata)
|
||
|
|
VALUES (%s, %s, %s, %s, %s, %s::jsonb)
|
||
|
|
""",
|
||
|
|
(link_id, user_id, action_type, case_id, customer_id, json.dumps(metadata or {})),
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def build_action_result(link_row: dict, action_type: str) -> LinkActionResult:
|
||
|
|
link_type = LinkType(link_row["type"])
|
||
|
|
host = link_row.get("host")
|
||
|
|
port = link_row.get("port")
|
||
|
|
username = link_row.get("username")
|
||
|
|
|
||
|
|
ssh_command = None
|
||
|
|
rdp_content = None
|
||
|
|
command_text = None
|
||
|
|
open_url = link_row.get("url")
|
||
|
|
|
||
|
|
if link_type == LinkType.ssh:
|
||
|
|
if host:
|
||
|
|
base = "ssh"
|
||
|
|
if username:
|
||
|
|
base += f" {username}@{host}"
|
||
|
|
else:
|
||
|
|
base += f" {host}"
|
||
|
|
if port:
|
||
|
|
base += f" -p {port}"
|
||
|
|
ssh_command = base
|
||
|
|
|
||
|
|
if link_type == LinkType.rdp and host:
|
||
|
|
rdp_port = port or 3389
|
||
|
|
rdp_content = f"full address:s:{host}:{rdp_port}\nusername:s:{username or ''}\nprompt for credentials:i:1\n"
|
||
|
|
|
||
|
|
if link_type == LinkType.command:
|
||
|
|
command_text = link_row.get("url") or link_row.get("description") or ""
|
||
|
|
|
||
|
|
if link_type in (LinkType.ssh, LinkType.rdp) and not open_url and host:
|
||
|
|
open_url = host
|
||
|
|
|
||
|
|
return LinkActionResult(
|
||
|
|
link_id=int(link_row["id"]),
|
||
|
|
action_type=action_type,
|
||
|
|
type=link_type,
|
||
|
|
open_url=open_url,
|
||
|
|
ssh_command=ssh_command,
|
||
|
|
rdp_content=rdp_content,
|
||
|
|
command_text=command_text,
|
||
|
|
username=username,
|
||
|
|
vault_item_id=link_row.get("vault_item_id"),
|
||
|
|
vault_search_hint=host or link_row.get("url") or None,
|
||
|
|
)
|