bmc_hub/app/modules/sag/services/relation_service.py

198 lines
8.2 KiB
Python
Raw Normal View History

from typing import List, Dict, Optional, Set
from app.core.database import execute_query
class RelationService:
"""Service for handling case relations (Sager)"""
@staticmethod
def get_relation_tree(root_id: int) -> List[Dict]:
"""
Builds a hierarchical tree of relations for a specific case.
Handles cycles and deduplication.
"""
# 1. Fetch all connected cases (Recursive CTE)
# We fetch a network around the case, but limit depth/count to avoid explosion
tree_ids_query = """
WITH RECURSIVE CaseTree AS (
SELECT id, 0 as depth FROM sag_sager WHERE id = %s
UNION
SELECT
CASE WHEN sr.kilde_sag_id = ct.id THEN sr.målsag_id ELSE sr.kilde_sag_id END,
ct.depth + 1
FROM sag_relationer sr
JOIN CaseTree ct ON sr.kilde_sag_id = ct.id OR sr.målsag_id = ct.id
WHERE sr.deleted_at IS NULL AND ct.depth < 5
)
SELECT DISTINCT id FROM CaseTree LIMIT 100;
"""
tree_ids_rows = execute_query(tree_ids_query, (root_id,))
tree_ids = [r['id'] for r in tree_ids_rows]
if not tree_ids:
return []
# 2. Fetch details for these cases
placeholders = ','.join(['%s'] * len(tree_ids))
tree_cases_query = f"SELECT id, titel, status FROM sag_sager WHERE id IN ({placeholders})"
tree_cases = {c['id']: c for c in execute_query(tree_cases_query, tuple(tree_ids))}
# 3. Fetch all edges between these cases
tree_edges_query = f"""
SELECT id, kilde_sag_id, målsag_id, relationstype
FROM sag_relationer
WHERE deleted_at IS NULL
AND kilde_sag_id IN ({placeholders})
AND målsag_id IN ({placeholders})
"""
tree_edges = execute_query(tree_edges_query, tuple(tree_ids) * 2)
# 4. Build Graph (Adjacency List)
children_map: Dict[int, List[Dict]] = {cid: [] for cid in tree_ids}
parents_map: Dict[int, List[int]] = {cid: [] for cid in tree_ids}
# Helper to normalize relation types
# Now that we cleaned DB, we expect standard Danish terms, but good to be safe
def get_direction(k, m, rtype):
rtype_lower = rtype.lower()
if rtype_lower in ['afledt af', 'derived from']:
return m, k # m is parent of k
if rtype_lower in ['årsag til', 'cause of']:
return k, m # k is parent of m
# Default: k is "related" to m, treat as child for visualization if k is current root context
# But here we build a directed graph.
# If relation is symmetric (Relateret til), we must be careful not to create cycle A->B->A
return k, m
processed_edges = set()
for edge in tree_edges:
k, m, rtype = edge['kilde_sag_id'], edge['målsag_id'], edge['relationstype']
# Dedup edges (bi-directional check)
edge_key = tuple(sorted((k,m))) + (rtype,)
# Actually, standardizing direction is key.
# If we have (k, m, 'Relateret til'), we add it once.
parent, child = get_direction(k, m, rtype)
# Avoid self-loops
if parent == child:
continue
# Add to maps
children_map[parent].append({
'id': child,
'type': rtype,
'rel_id': edge['id']
})
parents_map[child].append(parent)
# 5. Build Tree
# We want the `root_id` to be the visual root if possible.
# But if `root_id` is a child of something else in this graph, we might want to show that parent?
# The current design shows the requested case as root, OR finds the "true" root?
# The original code acted a bit vaguely on "roots".
# Let's try to center the view on `root_id`.
# Global visited set to prevent any node from appearing more than once in the entire tree
# This prevents the "duplicate entries" issue where a shared child appears under multiple parents
# However, it makes it hard to see shared dependencies.
# Standard approach: Show duplicate but mark it as "reference" or stop expansion.
global_visited = set()
def build_node(cid: int, path_visited: Set[int], current_rel_type: str = None, current_rel_id: int = None):
if cid not in tree_cases:
return None
# Cycle detection in current path
if cid in path_visited:
return {
'case': tree_cases[cid],
'relation_type': current_rel_type,
'relation_id': current_rel_id,
'is_current': cid == root_id,
'is_cycle': True,
'children': []
}
path_visited.add(cid)
# Sort children for consistent display
children_data = sorted(children_map.get(cid, []), key=lambda x: x['type'])
children_nodes = []
for child_info in children_data:
child_id = child_info['id']
# Check if we've seen this node globally to prevent tree duplication explosion
# If we want a strict tree where every node appears once:
# if child_id in global_visited: continue
# But users usually want to see the context.
# Let's check if the user wanted "Duplicate entries" removed.
# Yes. So let's use global_visited, OR just show a "Link" node.
# Using path_visited.copy() allows Multi-Parent display (A->C, B->C)
# creating visual duplicates.
# If we use global_visited, C only appears under A (if A processed first).
# Compromise: We only expand children if NOT globally visited.
# If globally visited, we show the node but no children (Leaf ref).
is_repeated = child_id in global_visited
global_visited.add(child_id)
child_node = build_node(child_id, path_visited.copy(), child_info['type'], child_info['rel_id'])
if child_node:
if is_repeated:
child_node['children'] = []
child_node['is_repeated'] = True
children_nodes.append(child_node)
return {
'case': tree_cases[cid],
'relation_type': current_rel_type,
'relation_id': current_rel_id,
'is_current': cid == root_id,
'children': children_nodes
}
# Determine Roots:
# If we just want to show the tree FROM the current case downwards (and upwards?),
# the original view mixed everything.
# Let's try to find the "top-most" parents of the current case, to show the full context.
# Traverse up from root_id to find a root
curr = root_id
while parents_map[curr]:
# Pick first parent (naive) - creates a single primary ancestry path
curr = parents_map[curr][0]
if curr == root_id: break # Cycle
effective_root = curr
# Build tree starting from effective root
global_visited.add(effective_root)
full_tree = build_node(effective_root, set())
if not full_tree:
return []
return [full_tree]
@staticmethod
def add_relation(source_id: int, target_id: int, type: str):
"""Creates a relation between two cases."""
query = """
INSERT INTO sag_relationer (kilde_sag_id, målsag_id, relationstype)
VALUES (%s, %s, %s)
RETURNING id
"""
return execute_query(query, (source_id, target_id, type))
@staticmethod
def remove_relation(relation_id: int):
"""Soft deletes a relation."""
query = "UPDATE sag_relationer SET deleted_at = NOW() WHERE id = %s"
execute_query(query, (relation_id,))