from typing import List, Dict, Optional, Set from app.core.database import execute_query class RelationService: """Service for handling case relations (Sager)""" @staticmethod def get_relation_tree(root_id: int) -> List[Dict]: """ Builds a hierarchical tree of relations for a specific case. Handles cycles and deduplication. """ # 1. Fetch all connected cases (Recursive CTE) # We fetch a network around the case, but limit depth/count to avoid explosion tree_ids_query = """ WITH RECURSIVE CaseTree AS ( SELECT id, 0 as depth FROM sag_sager WHERE id = %s UNION SELECT CASE WHEN sr.kilde_sag_id = ct.id THEN sr.målsag_id ELSE sr.kilde_sag_id END, ct.depth + 1 FROM sag_relationer sr JOIN CaseTree ct ON sr.kilde_sag_id = ct.id OR sr.målsag_id = ct.id WHERE sr.deleted_at IS NULL AND ct.depth < 5 ) SELECT DISTINCT id FROM CaseTree LIMIT 100; """ tree_ids_rows = execute_query(tree_ids_query, (root_id,)) tree_ids = [r['id'] for r in tree_ids_rows] if not tree_ids: return [] # 2. Fetch details for these cases placeholders = ','.join(['%s'] * len(tree_ids)) tree_cases_query = f"SELECT id, titel, status FROM sag_sager WHERE id IN ({placeholders})" tree_cases = {c['id']: c for c in execute_query(tree_cases_query, tuple(tree_ids))} # 3. Fetch all edges between these cases tree_edges_query = f""" SELECT id, kilde_sag_id, målsag_id, relationstype FROM sag_relationer WHERE deleted_at IS NULL AND kilde_sag_id IN ({placeholders}) AND målsag_id IN ({placeholders}) """ tree_edges = execute_query(tree_edges_query, tuple(tree_ids) * 2) # 4. Build Graph (Adjacency List) children_map: Dict[int, List[Dict]] = {cid: [] for cid in tree_ids} parents_map: Dict[int, List[int]] = {cid: [] for cid in tree_ids} # Helper to normalize relation types # Now that we cleaned DB, we expect standard Danish terms, but good to be safe def get_direction(k, m, rtype): rtype_lower = rtype.lower() if rtype_lower in ['afledt af', 'derived from']: return m, k # m is parent of k if rtype_lower in ['årsag til', 'cause of']: return k, m # k is parent of m # Default: k is "related" to m, treat as child for visualization if k is current root context # But here we build a directed graph. # If relation is symmetric (Relateret til), we must be careful not to create cycle A->B->A return k, m processed_edges = set() for edge in tree_edges: k, m, rtype = edge['kilde_sag_id'], edge['målsag_id'], edge['relationstype'] # Dedup edges (bi-directional check) edge_key = tuple(sorted((k,m))) + (rtype,) # Actually, standardizing direction is key. # If we have (k, m, 'Relateret til'), we add it once. parent, child = get_direction(k, m, rtype) # Avoid self-loops if parent == child: continue # Add to maps children_map[parent].append({ 'id': child, 'type': rtype, 'rel_id': edge['id'] }) parents_map[child].append(parent) # 5. Build Tree # We want the `root_id` to be the visual root if possible. # But if `root_id` is a child of something else in this graph, we might want to show that parent? # The current design shows the requested case as root, OR finds the "true" root? # The original code acted a bit vaguely on "roots". # Let's try to center the view on `root_id`. # Global visited set to prevent any node from appearing more than once in the entire tree # This prevents the "duplicate entries" issue where a shared child appears under multiple parents # However, it makes it hard to see shared dependencies. # Standard approach: Show duplicate but mark it as "reference" or stop expansion. global_visited = set() def build_node(cid: int, path_visited: Set[int], current_rel_type: str = None, current_rel_id: int = None): if cid not in tree_cases: return None # Cycle detection in current path if cid in path_visited: return { 'case': tree_cases[cid], 'relation_type': current_rel_type, 'relation_id': current_rel_id, 'is_current': cid == root_id, 'is_cycle': True, 'children': [] } path_visited.add(cid) # Sort children for consistent display children_data = sorted(children_map.get(cid, []), key=lambda x: x['type']) children_nodes = [] for child_info in children_data: child_id = child_info['id'] # Check if we've seen this node globally to prevent tree duplication explosion # If we want a strict tree where every node appears once: # if child_id in global_visited: continue # But users usually want to see the context. # Let's check if the user wanted "Duplicate entries" removed. # Yes. So let's use global_visited, OR just show a "Link" node. # Using path_visited.copy() allows Multi-Parent display (A->C, B->C) # creating visual duplicates. # If we use global_visited, C only appears under A (if A processed first). # Compromise: We only expand children if NOT globally visited. # If globally visited, we show the node but no children (Leaf ref). is_repeated = child_id in global_visited global_visited.add(child_id) child_node = build_node(child_id, path_visited.copy(), child_info['type'], child_info['rel_id']) if child_node: if is_repeated: child_node['children'] = [] child_node['is_repeated'] = True children_nodes.append(child_node) return { 'case': tree_cases[cid], 'relation_type': current_rel_type, 'relation_id': current_rel_id, 'is_current': cid == root_id, 'children': children_nodes } # Determine Roots: # If we just want to show the tree FROM the current case downwards (and upwards?), # the original view mixed everything. # Let's try to find the "top-most" parents of the current case, to show the full context. # Traverse up from root_id to find a root curr = root_id while parents_map[curr]: # Pick first parent (naive) - creates a single primary ancestry path curr = parents_map[curr][0] if curr == root_id: break # Cycle effective_root = curr # Build tree starting from effective root global_visited.add(effective_root) full_tree = build_node(effective_root, set()) if not full_tree: return [] return [full_tree] @staticmethod def add_relation(source_id: int, target_id: int, type: str): """Creates a relation between two cases.""" query = """ INSERT INTO sag_relationer (kilde_sag_id, målsag_id, relationstype) VALUES (%s, %s, %s) RETURNING id """ return execute_query(query, (source_id, target_id, type)) @staticmethod def remove_relation(relation_id: int): """Soft deletes a relation.""" query = "UPDATE sag_relationer SET deleted_at = NOW() WHERE id = %s" execute_query(query, (relation_id,))