bmc_hub/app/modules/sag/backend/router.py
Christian 29acdf3e01 Add tests for new SAG module endpoints and module deactivation
- Implement test script for new SAG module endpoints BE-003 (Tag State Management) and BE-004 (Bulk Operations).
- Create test cases for creating, updating, and bulk operations on cases and tags.
- Add a test for module deactivation to ensure data integrity is maintained.
- Include setup and teardown for tests to clear database state before and after each test.
2026-01-31 23:16:24 +01:00

596 lines
22 KiB
Python

import logging
from typing import List, Optional
from fastapi import APIRouter, HTTPException, Query
from app.core.database import execute_query
from datetime import datetime
logger = logging.getLogger(__name__)
router = APIRouter()
# ============================================================================
# CRUD Endpoints for Cases
# ============================================================================
@router.get("/cases", response_model=List[dict])
async def list_cases(status: Optional[str] = None, customer_id: Optional[int] = None):
"""List all cases with optional filters."""
query = "SELECT * FROM sag_sager WHERE deleted_at IS NULL"
params = []
if status:
query += " AND status = %s"
params.append(status)
if customer_id:
query += " AND customer_id = %s"
params.append(customer_id)
query += " ORDER BY created_at DESC"
return execute_query(query, tuple(params))
@router.post("/cases", response_model=dict)
async def create_case(data: dict):
"""Create a new case."""
query = """
INSERT INTO sag_sager (titel, beskrivelse, template_key, status, customer_id, ansvarlig_bruger_id, created_by_user_id, deadline)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING *
"""
params = (
data.get("titel"),
data.get("beskrivelse"),
data.get("template_key"),
data.get("status"),
data.get("customer_id"),
data.get("ansvarlig_bruger_id"),
data.get("created_by_user_id"),
data.get("deadline"),
)
result = execute_query(query, params)
if not result:
raise HTTPException(status_code=500, detail="Failed to create case.")
return result[0]
@router.get("/cases/{id}", response_model=dict)
async def get_case(id: int):
"""Retrieve a specific case by ID."""
query = "SELECT * FROM sag_sager WHERE id = %s AND deleted_at IS NULL"
result = execute_query(query, (id,))
if not result:
raise HTTPException(status_code=404, detail="Case not found.")
return result[0]
@router.patch("/cases/{id}", response_model=dict)
async def update_case(id: int, updates: dict):
"""Update a specific case."""
set_clause = ", ".join([f"{key} = %s" for key in updates.keys()])
query = f"""
UPDATE sag_sager
SET {set_clause}, updated_at = NOW()
WHERE id = %s AND deleted_at IS NULL
RETURNING *
"""
params = list(updates.values()) + [id]
result = execute_query(query, tuple(params))
if not result:
raise HTTPException(status_code=404, detail="Case not found or not updated.")
return result[0]
@router.delete("/cases/{id}", response_model=dict)
async def delete_case(id: int):
"""Soft-delete a specific case."""
query = """
UPDATE sag_sager
SET deleted_at = NOW()
WHERE id = %s AND deleted_at IS NULL
RETURNING *
"""
result = execute_query(query, (id,))
if not result:
raise HTTPException(status_code=404, detail="Case not found or already deleted.")
return result[0]
# ============================================================================
# BULK OPERATIONS
# ============================================================================
@router.post("/cases/bulk", response_model=dict)
async def bulk_operations(data: dict):
"""Perform bulk actions on multiple cases."""
try:
case_ids = data.get("case_ids", [])
action = data.get("action")
params = data.get("params", {})
if not case_ids:
raise HTTPException(status_code=400, detail="case_ids list is required")
if not action:
raise HTTPException(status_code=400, detail="action is required")
affected_cases = 0
try:
if action == "update_status":
status = params.get("status")
if not status:
raise HTTPException(status_code=400, detail="status parameter is required for update_status action")
placeholders = ", ".join(["%s"] * len(case_ids))
query = f"""
UPDATE sag_sager
SET status = %s, updated_at = NOW()
WHERE id IN ({placeholders}) AND deleted_at IS NULL
"""
affected_cases = execute_query(query, tuple([status] + case_ids), fetch=False)
logger.info("✅ Bulk update_status: %s cases set to '%s'", affected_cases, status)
elif action == "add_tag":
tag_navn = params.get("tag_navn")
if not tag_navn:
raise HTTPException(status_code=400, detail="tag_navn parameter is required for add_tag action")
# Add tag to each case (skip if already exists)
for case_id in case_ids:
try:
query = """
INSERT INTO sag_tags (sag_id, tag_navn, state)
VALUES (%s, %s, 'open')
"""
execute_query(query, (case_id, tag_navn), fetch=False)
affected_cases += 1
except Exception as e:
# Skip if tag already exists for this case
logger.warning("⚠️ Could not add tag to case %s: %s", case_id, e)
logger.info("✅ Bulk add_tag: tag '%s' added to %s cases", tag_navn, affected_cases)
elif action == "close_all":
placeholders = ", ".join(["%s"] * len(case_ids))
query = f"""
UPDATE sag_sager
SET status = 'lukket', updated_at = NOW()
WHERE id IN ({placeholders}) AND deleted_at IS NULL
"""
affected_cases = execute_query(query, tuple(case_ids), fetch=False)
logger.info("✅ Bulk close_all: %s cases closed", affected_cases)
else:
raise HTTPException(status_code=400, detail=f"Unknown action: {action}")
return {
"success": True,
"affected_cases": affected_cases,
"action": action
}
except HTTPException:
raise
except Exception as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.error("❌ Error in bulk operations: %s", e)
raise HTTPException(status_code=500, detail=f"Bulk operation failed: {str(e)}")
# ============================================================================
# CRUD Endpoints for Relations
# ============================================================================
@router.get("/cases/{id}/relations", response_model=List[dict])
async def list_relations(id: int):
"""List all relations for a specific case."""
query = """
SELECT sr.*, ss_kilde.titel AS kilde_titel, ss_mål.titel AS mål_titel
FROM sag_relationer sr
JOIN sag_sager ss_kilde ON sr.kilde_sag_id = ss_kilde.id
JOIN sag_sager ss_mål ON sr.målsag_id = ss_mål.id
WHERE (sr.kilde_sag_id = %s OR sr.målsag_id = %s) AND sr.deleted_at IS NULL
ORDER BY sr.created_at DESC
"""
return execute_query(query, (id, id))
@router.post("/cases/{id}/relations", response_model=dict)
async def create_relation(id: int, data: dict):
"""Create a new relation for a case."""
query = """
INSERT INTO sag_relationer (kilde_sag_id, målsag_id, relationstype)
VALUES (%s, %s, %s)
RETURNING *
"""
params = (
id,
data.get("målsag_id"),
data.get("relationstype"),
)
result = execute_query(query, params)
if not result:
raise HTTPException(status_code=500, detail="Failed to create relation.")
return result[0]
@router.delete("/cases/{id}/relations/{relation_id}", response_model=dict)
async def delete_relation(id: int, relation_id: int):
"""Soft-delete a specific relation."""
query = """
UPDATE sag_relationer
SET deleted_at = NOW()
WHERE id = %s AND deleted_at IS NULL
RETURNING *
"""
result = execute_query(query, (relation_id,))
if not result:
raise HTTPException(status_code=404, detail="Relation not found or already deleted.")
return result[0]
# CRUD Endpoints for Tags
@router.get("/cases/{id}/tags", response_model=List[dict])
async def list_tags(id: int):
"""List all tags for a specific case."""
query = "SELECT * FROM sag_tags WHERE sag_id = %s AND deleted_at IS NULL ORDER BY created_at DESC"
return execute_query(query, (id,))
@router.post("/cases/{id}/tags", response_model=dict)
async def create_tag(id: int, data: dict):
"""Add a tag to a case."""
query = """
INSERT INTO sag_tags (sag_id, tag_navn, state)
VALUES (%s, %s, %s)
RETURNING *
"""
params = (
id,
data.get("tag_navn"),
data.get("state", "open"),
)
result = execute_query(query, params)
if not result:
raise HTTPException(status_code=500, detail="Failed to create tag.")
return result[0]
@router.delete("/cases/{id}/tags/{tag_id}", response_model=dict)
async def delete_tag(id: int, tag_id: int):
"""Soft-delete a specific tag."""
query = """
UPDATE sag_tags
SET deleted_at = NOW()
WHERE id = %s AND deleted_at IS NULL
RETURNING *
"""
result = execute_query(query, (tag_id,))
if not result:
raise HTTPException(status_code=404, detail="Tag not found or already deleted.")
return result[0]
@router.patch("/cases/{id}/tags/{tag_id}/state", response_model=dict)
async def update_tag_state(id: int, tag_id: int, data: dict):
"""Update tag state (open/closed) - tags are never deleted, only closed."""
try:
state = data.get("state")
# Validate state value
if state not in ["open", "closed"]:
logger.error("❌ Invalid state value: %s", state)
raise HTTPException(status_code=400, detail="State must be 'open' or 'closed'")
# Check tag exists and belongs to case
check_query = """
SELECT id FROM sag_tags
WHERE id = %s AND sag_id = %s AND deleted_at IS NULL
"""
tag_check = execute_query(check_query, (tag_id, id))
if not tag_check:
logger.error("❌ Tag %s not found for case %s", tag_id, id)
raise HTTPException(status_code=404, detail="Tag not found or doesn't belong to this case")
# Update tag state
if state == "closed":
query = """
UPDATE sag_tags
SET state = %s, closed_at = NOW()
WHERE id = %s AND deleted_at IS NULL
RETURNING *
"""
else: # state == "open"
query = """
UPDATE sag_tags
SET state = %s, closed_at = NULL
WHERE id = %s AND deleted_at IS NULL
RETURNING *
"""
result = execute_query(query, (state, tag_id))
if result:
logger.info("✅ Tag %s state changed to '%s' for case %s", tag_id, state, id)
return result[0]
raise HTTPException(status_code=500, detail="Failed to update tag state")
except HTTPException:
raise
except Exception as e:
logger.error("❌ Error updating tag state: %s", e)
raise HTTPException(status_code=500, detail="Failed to update tag state")
# ============================================================================
# CONTACTS & CUSTOMERS - Link to Cases
# ============================================================================
@router.post("/cases/{id}/contacts", response_model=dict)
async def add_contact_to_case(id: int, data: dict):
"""Add a contact to a case."""
contact_id = data.get("contact_id")
role = data.get("role", "Kontakt")
if not contact_id:
raise HTTPException(status_code=400, detail="contact_id is required")
try:
query = """
INSERT INTO sag_kontakter (sag_id, contact_id, role)
VALUES (%s, %s, %s)
RETURNING *
"""
result = execute_query(query, (id, contact_id, role))
if result:
logger.info("✅ Contact %s added to case %s", contact_id, id)
return result[0]
raise HTTPException(status_code=500, detail="Failed to add contact")
except Exception as e:
if "unique_sag_contact" in str(e).lower():
raise HTTPException(status_code=400, detail="Contact already linked to this case")
logger.error("❌ Error adding contact to case: %s", e)
raise HTTPException(status_code=500, detail="Failed to add contact")
@router.get("/cases/{id}/contacts", response_model=list)
async def get_case_contacts(id: int):
"""Get all contacts linked to a case."""
query = """
SELECT sk.*, CONCAT(c.first_name, ' ', c.last_name) as contact_name, c.email as contact_email
FROM sag_kontakter sk
LEFT JOIN contacts c ON sk.contact_id = c.id
WHERE sk.sag_id = %s AND sk.deleted_at IS NULL
ORDER BY sk.created_at DESC
"""
contacts = execute_query(query, (id,))
return contacts or []
@router.delete("/cases/{id}/contacts/{contact_id}", response_model=dict)
async def remove_contact_from_case(id: int, contact_id: int):
"""Remove a contact from a case."""
try:
query = """
UPDATE sag_kontakter
SET deleted_at = NOW()
WHERE sag_id = %s AND contact_id = %s AND deleted_at IS NULL
RETURNING *
"""
result = execute_query(query, (id, contact_id))
if result:
logger.info("✅ Contact %s removed from case %s", contact_id, id)
return result[0]
raise HTTPException(status_code=404, detail="Contact not linked to this case")
except Exception as e:
logger.error("❌ Error removing contact from case: %s", e)
raise HTTPException(status_code=500, detail="Failed to remove contact")
# ============================================================================
# SEARCH - Find Customers and Contacts
# ============================================================================
@router.get("/search/customers", response_model=list)
async def search_customers(q: str = Query(..., min_length=1)):
"""Search for customers by name, email, or CVR number."""
search_term = f"%{q}%"
query = """
SELECT id, name, email, cvr_number, city
FROM customers
WHERE deleted_at IS NULL AND (
name ILIKE %s OR
email ILIKE %s OR
cvr_number ILIKE %s
)
LIMIT 20
"""
results = execute_query(query, (search_term, search_term, search_term))
return results or []
@router.get("/search/contacts", response_model=list)
async def search_contacts(q: str = Query(..., min_length=1)):
"""Search for contacts by name, email, or company."""
search_term = f"%{q}%"
query = """
SELECT id, first_name, last_name, email, user_company, phone
FROM contacts
WHERE is_active = true AND (
first_name ILIKE %s OR
last_name ILIKE %s OR
email ILIKE %s OR
user_company ILIKE %s
)
LIMIT 20
"""
results = execute_query(query, (search_term, search_term, search_term, search_term))
return results or []
@router.post("/cases/{id}/customers", response_model=dict)
async def add_customer_to_case(id: int, data: dict):
"""Add a customer to a case."""
customer_id = data.get("customer_id")
role = data.get("role", "Kunde")
if not customer_id:
raise HTTPException(status_code=400, detail="customer_id is required")
try:
query = """
INSERT INTO sag_kunder (sag_id, customer_id, role)
VALUES (%s, %s, %s)
RETURNING *
"""
result = execute_query(query, (id, customer_id, role))
if result:
logger.info("✅ Customer %s added to case %s", customer_id, id)
return result[0]
raise HTTPException(status_code=500, detail="Failed to add customer")
except Exception as e:
if "unique_sag_customer" in str(e).lower():
raise HTTPException(status_code=400, detail="Customer already linked to this case")
logger.error("❌ Error adding customer to case: %s", e)
raise HTTPException(status_code=500, detail="Failed to add customer")
@router.get("/cases/{id}/customers", response_model=list)
async def get_case_customers(id: int):
"""Get all customers linked to a case."""
query = """
SELECT sk.*, c.name as customer_name, c.email as customer_email
FROM sag_kunder sk
LEFT JOIN customers c ON sk.customer_id = c.id
WHERE sk.sag_id = %s AND sk.deleted_at IS NULL
ORDER BY sk.created_at DESC
"""
customers = execute_query(query, (id,))
return customers or []
@router.delete("/cases/{id}/customers/{customer_id}", response_model=dict)
async def remove_customer_from_case(id: int, customer_id: int):
"""Remove a customer from a case."""
try:
query = """
UPDATE sag_kunder
SET deleted_at = NOW()
WHERE sag_id = %s AND customer_id = %s AND deleted_at IS NULL
RETURNING *
"""
result = execute_query(query, (id, customer_id))
if result:
logger.info("✅ Customer %s removed from case %s", customer_id, id)
return result[0]
raise HTTPException(status_code=404, detail="Customer not linked to this case")
except Exception as e:
logger.error("❌ Error removing customer from case: %s", e)
raise HTTPException(status_code=500, detail="Failed to remove customer")
# ============================================================================
# SEARCH
# ============================================================================
@router.get("/search/cases")
async def search_cases(q: str):
"""Search for cases by title or description."""
try:
if not q or len(q) < 2:
return []
query = """
SELECT id, titel, status, created_at
FROM sag_sager
WHERE deleted_at IS NULL
AND (titel ILIKE %s OR beskrivelse ILIKE %s)
ORDER BY created_at DESC
LIMIT 20
"""
search_term = f"%{q}%"
result = execute_query(query, (search_term, search_term))
return result
except Exception as e:
logger.error("❌ Error searching cases: %s", e)
raise HTTPException(status_code=500, detail="Failed to search cases")
# ============================================================================
# Hardware & Location Relations
# ============================================================================
@router.get("/cases/{id}/hardware", response_model=List[dict])
async def get_case_hardware(id: int):
"""Get hardware related to this case."""
query = """
SELECT h.*, hcr.relation_type as link_type, hcr.created_at as link_created_at
FROM hardware_assets h
JOIN hardware_case_relations hcr ON hcr.hardware_id = h.id
WHERE hcr.case_id = %s AND hcr.deleted_at IS NULL AND h.deleted_at IS NULL
ORDER BY hcr.created_at DESC
"""
return execute_query(query, (id,)) or []
@router.post("/cases/{id}/hardware", response_model=dict)
async def add_hardware_to_case(id: int, data: dict):
"""Link hardware to case."""
hardware_id = data.get("hardware_id")
if not hardware_id:
raise HTTPException(status_code=400, detail="hardware_id required")
# Check if already linked
check = execute_query(
"SELECT id FROM hardware_case_relations WHERE case_id = %s AND hardware_id = %s AND deleted_at IS NULL",
(id, hardware_id)
)
if check:
# Already linked
return {"message": "Already linked", "id": check[0]['id']}
query = """
INSERT INTO hardware_case_relations (case_id, hardware_id, relation_type)
VALUES (%s, %s, 'related')
RETURNING *
"""
result = execute_query(query, (id, hardware_id))
return result[0] if result else {}
@router.delete("/cases/{id}/hardware/{hardware_id}", response_model=dict)
async def remove_hardware_from_case(id: int, hardware_id: int):
"""Unlink hardware from case."""
query = """
UPDATE hardware_case_relations
SET deleted_at = NOW()
WHERE case_id = %s AND hardware_id = %s AND deleted_at IS NULL
RETURNING *
"""
result = execute_query(query, (id, hardware_id))
if not result:
raise HTTPException(status_code=404, detail="Link not found")
return {"message": "Unlinked"}
@router.get("/cases/{id}/locations", response_model=List[dict])
async def get_case_locations(id: int):
"""Get locations related to this case."""
query = """
SELECT l.*, clr.relation_type as link_type, clr.created_at as link_created_at
FROM locations_locations l
JOIN case_location_relations clr ON clr.location_id = l.id
WHERE clr.case_id = %s AND clr.deleted_at IS NULL AND l.deleted_at IS NULL
ORDER BY clr.created_at DESC
"""
return execute_query(query, (id,)) or []
@router.post("/cases/{id}/locations", response_model=dict)
async def add_location_to_case(id: int, data: dict):
"""Link location to case."""
location_id = data.get("location_id")
if not location_id:
raise HTTPException(status_code=400, detail="location_id required")
query = """
INSERT INTO case_location_relations (case_id, location_id, relation_type)
VALUES (%s, %s, 'related')
ON CONFLICT (case_id, location_id) DO UPDATE SET deleted_at = NULL
RETURNING *
"""
result = execute_query(query, (id, location_id))
return result[0] if result else {}
@router.delete("/cases/{id}/locations/{location_id}", response_model=dict)
async def remove_location_from_case(id: int, location_id: int):
"""Unlink location from case."""
query = """
UPDATE case_location_relations
SET deleted_at = NOW()
WHERE case_id = %s AND location_id = %s
RETURNING *
"""
result = execute_query(query, (id, location_id))
if not result:
raise HTTPException(status_code=404, detail="Link not found")
return {"message": "Unlinked"}