import json import logging from typing import Dict, List, Optional from app.core.database import execute_query, execute_query_single from app.modules.links.models.schemas import LinkActionResult, LinkScope, LinkType logger = logging.getLogger(__name__) def _get_case(case_id: int) -> Optional[dict]: return execute_query_single( "SELECT id, customer_id FROM sag_sager WHERE id = %s AND deleted_at IS NULL", (case_id,), ) def _get_case_hardware_ids(case_id: int) -> List[int]: rows = execute_query( "SELECT hardware_id FROM sag_hardware WHERE sag_id = %s", (case_id,), ) or [] return [int(row["hardware_id"]) for row in rows if row.get("hardware_id") is not None] def _get_tag_ids_for_entity(entity_type: str, entity_id: int) -> List[int]: rows = execute_query( "SELECT tag_id FROM entity_tags WHERE entity_type = %s AND entity_id = %s", (entity_type, entity_id), ) or [] return [int(row["tag_id"]) for row in rows if row.get("tag_id") is not None] def _get_link_tag_map(link_ids: List[int]) -> Dict[int, List[int]]: if not link_ids: return {} rows = execute_query( """ SELECT entity_id AS link_id, tag_id FROM entity_tags WHERE entity_type = 'link' AND entity_id = ANY(%s) """, (link_ids,), ) or [] out: Dict[int, List[int]] = {link_id: [] for link_id in link_ids} for row in rows: link_id = int(row.get("link_id")) tag_id = int(row.get("tag_id")) out.setdefault(link_id, []).append(tag_id) return out def _get_link_category_map(link_ids: List[int]) -> Dict[int, List[int]]: if not link_ids: return {} rows = execute_query( """ SELECT link_id, category_id FROM link_category_map WHERE link_id = ANY(%s) """, (link_ids,), ) or [] out: Dict[int, List[int]] = {link_id: [] for link_id in link_ids} for row in rows: link_id = int(row.get("link_id")) category_id = int(row.get("category_id")) out.setdefault(link_id, []).append(category_id) return out def _resolve_scope(link_row: dict, case_id: int, case_customer_id: Optional[int], case_hardware_ids: List[int]) -> tuple[LinkScope, int]: if link_row.get("case_id") == case_id: return (LinkScope.case, 1) if case_customer_id and link_row.get("customer_id") == case_customer_id: return (LinkScope.customer, 2) if link_row.get("hardware_id") in case_hardware_ids: return (LinkScope.hardware, 3) return (LinkScope.global_scope, 4) def get_relevant_links(case_id: int, limit: int = 50) -> List[dict]: case_row = _get_case(case_id) if not case_row: return [] case_customer_id = case_row.get("customer_id") case_hardware_ids = _get_case_hardware_ids(case_id) case_tag_ids = set(_get_tag_ids_for_entity("case", case_id)) candidate_query = """ SELECT * FROM links WHERE deleted_at IS NULL AND ( case_id = %s OR (%s IS NOT NULL AND customer_id = %s) OR (hardware_id IS NOT NULL AND hardware_id = ANY(%s)) OR (case_id IS NULL AND customer_id IS NULL AND hardware_id IS NULL) ) """ candidate_rows = execute_query( candidate_query, (case_id, case_customer_id, case_customer_id, case_hardware_ids or [0]), ) or [] link_ids = [int(row["id"]) for row in candidate_rows] link_tag_map = _get_link_tag_map(link_ids) link_category_map = _get_link_category_map(link_ids) scored: List[dict] = [] for row in candidate_rows: link_id = int(row["id"]) link_tags = set(link_tag_map.get(link_id, [])) matched_tags = sorted(case_tag_ids.intersection(link_tags)) scope, scope_priority = _resolve_scope(row, case_id, case_customer_id, case_hardware_ids) if not matched_tags and scope != LinkScope.case and not row.get("is_critical"): continue score = 0 if case_customer_id and row.get("customer_id") == case_customer_id: score += 3 if row.get("is_critical"): score += 2 score += len(matched_tags) row["scope"] = scope.value row["scope_priority"] = scope_priority row["score"] = score row["match_count"] = len(matched_tags) row["matched_tag_ids"] = matched_tags row["category_ids"] = link_category_map.get(link_id, []) scored.append(row) scored.sort( key=lambda item: ( item["scope_priority"], -int(item.get("is_critical") is True), -item["score"], item.get("name") or "", ) ) return scored[:limit] def update_link_categories(link_id: int, category_ids: List[int]) -> None: execute_query("DELETE FROM link_category_map WHERE link_id = %s", (link_id,)) if not category_ids: return values = [] params: List[int] = [] for category_id in category_ids: values.append("(%s, %s)") params.extend([link_id, category_id]) query = f"INSERT INTO link_category_map (link_id, category_id) VALUES {', '.join(values)} ON CONFLICT DO NOTHING" execute_query(query, tuple(params)) def get_link_category_ids(link_id: int) -> List[int]: rows = execute_query( "SELECT category_id FROM link_category_map WHERE link_id = %s ORDER BY category_id", (link_id,), ) or [] return [int(row["category_id"]) for row in rows] def log_access(link_id: int, user_id: Optional[int], action_type: str, case_id: Optional[int], customer_id: Optional[int], metadata: Optional[dict]) -> None: execute_query( """ INSERT INTO link_access_log (link_id, user_id, action_type, case_id, customer_id, metadata) VALUES (%s, %s, %s, %s, %s, %s::jsonb) """, (link_id, user_id, action_type, case_id, customer_id, json.dumps(metadata or {})), ) def build_action_result(link_row: dict, action_type: str) -> LinkActionResult: link_type = LinkType(link_row["type"]) host = link_row.get("host") port = link_row.get("port") username = link_row.get("username") ssh_command = None rdp_content = None command_text = None open_url = link_row.get("url") if link_type == LinkType.ssh: if host: base = "ssh" if username: base += f" {username}@{host}" else: base += f" {host}" if port: base += f" -p {port}" ssh_command = base if link_type == LinkType.rdp and host: rdp_port = port or 3389 rdp_content = f"full address:s:{host}:{rdp_port}\nusername:s:{username or ''}\nprompt for credentials:i:1\n" if link_type == LinkType.command: command_text = link_row.get("url") or link_row.get("description") or "" if link_type in (LinkType.ssh, LinkType.rdp) and not open_url and host: open_url = host return LinkActionResult( link_id=int(link_row["id"]), action_type=action_type, type=link_type, open_url=open_url, ssh_command=ssh_command, rdp_content=rdp_content, command_text=command_text, username=username, vault_item_id=link_row.get("vault_item_id"), vault_search_hint=host or link_row.get("url") or None, )