import json import logging from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, Query from app.core.auth_dependencies import get_current_user, require_permission from app.core.database import execute_query from app.modules.links.backend.service import ( build_action_result, get_link_category_ids, get_relevant_links, log_access, update_link_categories, ) from app.modules.links.models.schemas import ( Link, LinkActionLogCreate, LinkActionResult, LinkCategory, LinkCategoryCreate, LinkCreate, LinkUpdate, RelevantLink, ) logger = logging.getLogger(__name__) router = APIRouter() def _with_categories(link_row: dict) -> dict: out = dict(link_row) out["vault_item_ids"] = out.get("vault_item_ids") or [] out["category_ids"] = get_link_category_ids(int(out["id"])) return out @router.get("/links/health") async def links_health(): execute_query("SELECT 1", ()) return {"status": "healthy", "service": "links-module"} @router.get("/links/categories", response_model=List[LinkCategory]) async def list_categories(current_user: dict = Depends(require_permission("links.read"))): del current_user rows = execute_query( "SELECT * FROM link_categories ORDER BY sort_order ASC, name ASC", (), ) or [] return rows @router.post("/links/categories", response_model=LinkCategory) async def create_category( payload: LinkCategoryCreate, current_user: dict = Depends(require_permission("links.create")), ): del current_user rows = execute_query( """ INSERT INTO link_categories (name, icon, sort_order) VALUES (%s, %s, %s) RETURNING * """, (payload.name, payload.icon, payload.sort_order), ) return rows[0] @router.get("/links", response_model=List[Link]) async def list_links( q: Optional[str] = Query(None), customer_id: Optional[int] = Query(None), case_id: Optional[int] = Query(None), hardware_id: Optional[int] = Query(None), category_id: Optional[int] = Query(None), is_favorite: Optional[bool] = Query(None), current_user: dict = Depends(require_permission("links.read")), ): del current_user query = """ SELECT l.* FROM links l WHERE l.deleted_at IS NULL """ params: List[object] = [] if q: query += " AND (l.name ILIKE %s OR l.url ILIKE %s OR l.host ILIKE %s)" term = f"%{q}%" params.extend([term, term, term]) if customer_id is not None: query += " AND l.customer_id = %s" params.append(customer_id) if case_id is not None: query += " AND l.case_id = %s" params.append(case_id) if hardware_id is not None: query += " AND l.hardware_id = %s" params.append(hardware_id) if is_favorite is not None: query += " AND l.is_favorite = %s" params.append(is_favorite) if category_id is not None: query += " AND EXISTS (SELECT 1 FROM link_category_map lcm WHERE lcm.link_id = l.id AND lcm.category_id = %s)" params.append(category_id) query += " ORDER BY l.is_critical DESC, l.updated_at DESC" rows = execute_query(query, tuple(params) if params else ()) or [] return [_with_categories(row) for row in rows] @router.get("/links/{link_id}", response_model=Link) async def get_link(link_id: int, current_user: dict = Depends(require_permission("links.read"))): del current_user rows = execute_query("SELECT * FROM links WHERE id = %s AND deleted_at IS NULL", (link_id,)) if not rows: raise HTTPException(status_code=404, detail="Link not found") return _with_categories(rows[0]) @router.post("/links", response_model=Link) async def create_link(payload: LinkCreate, current_user: dict = Depends(require_permission("links.create"))): rows = execute_query( """ INSERT INTO links ( name, description, type, url, host, port, username, icon, color, customer_id, case_id, hardware_id, vault_item_id, vault_item_ids, is_critical, is_favorite, environment ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s, %s) RETURNING * """, ( payload.name, payload.description, payload.type.value, payload.url, payload.host, payload.port, payload.username, payload.icon, payload.color, payload.customer_id, payload.case_id, payload.hardware_id, payload.vault_item_id, json.dumps(payload.vault_item_ids), payload.is_critical, payload.is_favorite, payload.environment.value, ), ) created = rows[0] update_link_categories(int(created["id"]), payload.category_ids) execute_query( """ INSERT INTO links_audit_log (link_id, event_type, actor_user_id, changes) VALUES (%s, %s, %s, %s::jsonb) """, (created["id"], "created", current_user["id"], json.dumps({"name": payload.name})), ) return _with_categories(created) @router.patch("/links/{link_id}", response_model=Link) async def update_link( link_id: int, payload: LinkUpdate, current_user: dict = Depends(require_permission("links.update")), ): fields = payload.model_dump(exclude_unset=True) category_ids = fields.pop("category_ids", None) updates = [] params: List[object] = [] for field_name, value in fields.items(): if field_name == "type" and value is not None: updates.append("type = %s") params.append(value.value) elif field_name == "environment" and value is not None: updates.append("environment = %s") params.append(value.value) elif field_name == "vault_item_ids" and value is not None: updates.append("vault_item_ids = %s::jsonb") params.append(json.dumps(value)) else: updates.append(f"{field_name} = %s") params.append(value) if updates: updates.append("updated_at = NOW()") params.append(link_id) query = f"UPDATE links SET {', '.join(updates)} WHERE id = %s AND deleted_at IS NULL RETURNING *" rows = execute_query(query, tuple(params)) or [] if not rows: raise HTTPException(status_code=404, detail="Link not found") updated = rows[0] else: rows = execute_query("SELECT * FROM links WHERE id = %s AND deleted_at IS NULL", (link_id,)) if not rows: raise HTTPException(status_code=404, detail="Link not found") updated = rows[0] if category_ids is not None: update_link_categories(link_id, category_ids) execute_query( """ INSERT INTO links_audit_log (link_id, event_type, actor_user_id, changes) VALUES (%s, %s, %s, %s::jsonb) """, (link_id, "updated", current_user["id"], json.dumps(fields or {"category_ids": category_ids})), ) return _with_categories(updated) @router.delete("/links/{link_id}") async def delete_link(link_id: int, current_user: dict = Depends(require_permission("links.delete"))): rows = execute_query( "UPDATE links SET deleted_at = NOW(), updated_at = NOW() WHERE id = %s AND deleted_at IS NULL RETURNING id", (link_id,), ) or [] if not rows: raise HTTPException(status_code=404, detail="Link not found") execute_query( """ INSERT INTO links_audit_log (link_id, event_type, actor_user_id, changes) VALUES (%s, %s, %s, %s::jsonb) """, (link_id, "deleted", current_user["id"], json.dumps({"deleted": True})), ) return {"status": "deleted", "id": link_id} @router.get("/links/cases/{case_id}/relevant", response_model=List[RelevantLink]) async def case_relevant_links( case_id: int, limit: int = Query(50, ge=1, le=200), current_user: dict = Depends(require_permission("links.read")), ): del current_user return get_relevant_links(case_id, limit=limit) @router.post("/links/{link_id}/access", response_model=LinkActionResult) async def access_link( link_id: int, payload: LinkActionLogCreate, current_user: dict = Depends(require_permission("links.use")), ): rows = execute_query("SELECT * FROM links WHERE id = %s AND deleted_at IS NULL", (link_id,)) or [] if not rows: raise HTTPException(status_code=404, detail="Link not found") link_row = rows[0] action_result = build_action_result(link_row, payload.action_type) log_access( link_id=link_id, user_id=current_user["id"], action_type=payload.action_type, case_id=payload.case_id, customer_id=payload.customer_id, metadata=payload.metadata, ) return action_result