import logging from typing import Optional, Any from fastapi import APIRouter, HTTPException, Query, Request, Form, Depends from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from pathlib import Path from datetime import date from app.core.database import execute_query from app.core.auth_dependencies import get_optional_user logger = logging.getLogger(__name__) router = APIRouter() # Setup template directory - must be root "app" to allow extending shared/frontend/base.html templates = Jinja2Templates(directory="app") def build_location_tree(items: list) -> list: """Helper to build recursive location tree""" nodes = {} roots = [] for loc in items or []: if not isinstance(loc, dict): continue loc_id = loc.get("id") if loc_id is None: continue # Ensure we have the fields we need nodes[loc_id] = { "id": loc_id, "name": loc.get("name"), "location_type": loc.get("location_type"), "parent_location_id": loc.get("parent_location_id"), "children": [] } for node in nodes.values(): parent_id = node.get("parent_location_id") if parent_id and parent_id in nodes: nodes[parent_id]["children"].append(node) else: roots.append(node) def sort_nodes(node_list: list) -> None: node_list.sort(key=lambda n: (n.get("name") or "").lower()) for n in node_list: if n.get("children"): sort_nodes(n["children"]) sort_nodes(roots) return roots def extract_eset_specs_summary(hardware: dict) -> dict: specs = hardware.get("hardware_specs") or {} if not isinstance(specs, dict): return {} if "device" in specs and isinstance(specs.get("device"), dict): specs = specs["device"] def format_bytes(raw_value: Optional[str]) -> Optional[str]: if not raw_value: return None try: value = int(raw_value) except (TypeError, ValueError): return None if value <= 0: return None gb = value / (1024 ** 3) return f"{gb:.1f} GB" os_info = specs.get("operatingSystem") or {} os_name = os_info.get("displayName") os_version = (os_info.get("version") or {}).get("name") hardware_profiles = specs.get("hardwareProfiles") or [] profile = hardware_profiles[0] if hardware_profiles else {} bios = profile.get("bios") or {} processors = profile.get("processors") or [] hard_drives = profile.get("hardDrives") or [] network_adapters = profile.get("networkAdapters") or [] cpu_models = [p.get("caption") for p in processors if isinstance(p, dict) and p.get("caption")] disk_models = [d.get("displayName") for d in hard_drives if isinstance(d, dict) and d.get("displayName")] disk_sizes = [format_bytes(d.get("capacityBytes")) for d in hard_drives if isinstance(d, dict)] disk_summaries = [] for drive in hard_drives: if not isinstance(drive, dict): continue name = drive.get("displayName") size = format_bytes(drive.get("capacityBytes")) if name and size: disk_summaries.append(f"{name} ({size})") elif name: disk_summaries.append(name) adapter_names = [n.get("caption") for n in network_adapters if isinstance(n, dict) and n.get("caption")] macs = [n.get("macAddress") for n in network_adapters if isinstance(n, dict) and n.get("macAddress")] installed_software = [] def _normalize_version(value: Any) -> str: if isinstance(value, dict): name = str(value.get("name") or "").strip() if name: return name version_id = str(value.get("id") or "").strip() if version_id: return version_id major = value.get("major") minor = value.get("minor") patch = value.get("patch") if major is not None and minor is not None and patch is not None: return f"{major}.{minor}.{patch}" return "" if value is None: return "" return str(value).strip() def _add_software_item(name: Optional[str], version: Any = None) -> None: if not name: return item_name = str(name).strip() item_version = _normalize_version(version) if not item_name: return if item_version: installed_software.append(f"{item_name} {item_version}") else: installed_software.append(item_name) # ESET standard format for comp in specs.get("deployedComponents") or []: if not isinstance(comp, dict): continue _add_software_item( comp.get("displayName") or comp.get("name"), comp.get("version"), ) # Alternative common payload names for comp in specs.get("installedSoftware") or []: if isinstance(comp, dict): _add_software_item(comp.get("displayName") or comp.get("name") or comp.get("softwareName"), comp.get("version")) elif isinstance(comp, str): _add_software_item(comp) for comp in specs.get("applications") or []: if isinstance(comp, dict): _add_software_item(comp.get("displayName") or comp.get("name") or comp.get("applicationName"), comp.get("version")) elif isinstance(comp, str): _add_software_item(comp) for comp in specs.get("activeProducts") or []: if isinstance(comp, dict): _add_software_item( comp.get("displayName") or comp.get("name") or comp.get("productName") or comp.get("product"), comp.get("version") or comp.get("productVersion"), ) elif isinstance(comp, str): _add_software_item(comp) for key in ("applicationInventory", "softwareInventory"): for comp in specs.get(key) or []: if isinstance(comp, dict): _add_software_item( comp.get("displayName") or comp.get("name") or comp.get("applicationName") or comp.get("softwareName"), comp.get("version") or comp.get("applicationVersion") or comp.get("softwareVersion"), ) elif isinstance(comp, str): _add_software_item(comp) # Keep ordering but remove duplicates seen = set() deduped_installed_software = [] for item in installed_software: key = item.lower() if key in seen: continue seen.add(key) deduped_installed_software.append(item) installed_software_details = [] for item in deduped_installed_software: match = item.rsplit(" ", 1) if len(match) == 2 and any(ch.isdigit() for ch in match[1]): installed_software_details.append({"name": match[0], "version": match[1]}) else: installed_software_details.append({"name": item, "version": ""}) return { "os_name": os_name, "os_version": os_version, "primary_local_ip": specs.get("primaryLocalIpAddress"), "public_ip": specs.get("publicIpAddress"), "device_name": specs.get("displayName"), "manufacturer": profile.get("manufacturer"), "model": profile.get("model"), "bios_serial": bios.get("serialNumber"), "bios_vendor": bios.get("manufacturer"), "cpu_models": cpu_models, "disk_models": disk_models, "disk_summaries": disk_summaries, "disk_sizes": disk_sizes, "adapter_names": adapter_names, "macs": macs, "functionality_status": specs.get("functionalityStatus"), "last_sync_time": specs.get("lastSyncTime"), "device_type": specs.get("deviceType"), "deployed_components": deduped_installed_software, "installed_software_details": installed_software_details, } @router.get("/hardware", response_class=HTMLResponse) async def hardware_list( request: Request, status: str = Query(None), asset_type: str = Query(None), customer_id: int = Query(None), q: str = Query(None) ): """Display list of all hardware.""" query = "SELECT * FROM hardware_assets WHERE deleted_at IS NULL" params = [] if status: query += " AND status = %s" params.append(status) if asset_type: query += " AND asset_type = %s" params.append(asset_type) if customer_id: query += " AND current_owner_customer_id = %s" params.append(customer_id) if q: query += " AND (serial_number ILIKE %s OR model ILIKE %s OR brand ILIKE %s)" search_param = f"%{q}%" params.extend([search_param, search_param, search_param]) query += " ORDER BY created_at DESC" hardware = execute_query(query, tuple(params)) # Get customer names for display if hardware: customer_ids = [h['current_owner_customer_id'] for h in hardware if h.get('current_owner_customer_id')] if customer_ids: customer_query = "SELECT id, name AS navn FROM customers WHERE id = ANY(%s)" customers = execute_query(customer_query, (customer_ids,)) customer_map = {c['id']: c['navn'] for c in customers} if customers else {} # Add customer names to hardware for h in hardware: if h.get('current_owner_customer_id'): h['customer_name'] = customer_map.get(h['current_owner_customer_id'], 'Unknown') return templates.TemplateResponse("modules/hardware/templates/index.html", { "request": request, "hardware": hardware, "current_status": status, "current_asset_type": asset_type, "search_query": q }) @router.get("/hardware/new", response_class=HTMLResponse) async def create_hardware_form(request: Request): """Display create hardware form.""" # Get customers for dropdown customers = execute_query("SELECT id, name AS navn FROM customers WHERE deleted_at IS NULL ORDER BY name") return templates.TemplateResponse("modules/hardware/templates/create.html", { "request": request, "customers": customers or [] }) @router.get("/hardware/eset", response_class=HTMLResponse) async def hardware_eset_overview( request: Request, current_user: dict = Depends(get_optional_user), ): """Display ESET sync overview (matches + incidents).""" matches_query = """ SELECT h.id, h.asset_type, h.brand, h.model, h.serial_number, h.eset_uuid, h.eset_group, h.updated_at, hc.contact_id, c.first_name, c.last_name, c.user_company, cc.customer_id, cust.name AS customer_name FROM hardware_assets h LEFT JOIN hardware_contacts hc ON hc.hardware_id = h.id LEFT JOIN contacts c ON c.id = hc.contact_id LEFT JOIN contact_companies cc ON cc.contact_id = c.id LEFT JOIN customers cust ON cust.id = cc.customer_id WHERE h.deleted_at IS NULL ORDER BY h.updated_at DESC NULLS LAST LIMIT 500 """ matches = execute_query(matches_query) incidents_query = """ SELECT i.*, hw.id AS hardware_id, hw.current_owner_customer_id AS customer_id, cust.name AS customer_name FROM eset_incidents i LEFT JOIN LATERAL ( SELECT h.id, h.current_owner_customer_id FROM hardware_assets h WHERE h.deleted_at IS NULL AND LOWER(COALESCE(h.eset_uuid, '')) = LOWER(COALESCE(i.device_uuid, '')) ORDER BY h.updated_at DESC NULLS LAST, h.id DESC LIMIT 1 ) hw ON TRUE LEFT JOIN customers cust ON cust.id = hw.current_owner_customer_id WHERE LOWER(COALESCE(i.severity, '')) IN ('critical', 'high', 'severe') ORDER BY i.updated_at DESC NULLS LAST LIMIT 5 """ incidents = execute_query(incidents_query) return templates.TemplateResponse("modules/hardware/templates/eset_overview.html", { "request": request, "matches": matches or [], "incidents": incidents or [], "current_user": current_user, }) @router.get("/hardware/eset/test", response_class=HTMLResponse) async def hardware_eset_test(request: Request): """Display ESET API test page.""" return templates.TemplateResponse("modules/hardware/templates/eset_test.html", { "request": request }) @router.get("/hardware/eset/import", response_class=HTMLResponse) async def hardware_eset_import(request: Request): """Display ESET import page.""" return templates.TemplateResponse("modules/hardware/templates/eset_import.html", { "request": request }) @router.get("/hardware/{hardware_id}", response_class=HTMLResponse) async def hardware_detail(request: Request, hardware_id: int): """Display hardware details.""" # Get hardware query = "SELECT * FROM hardware_assets WHERE id = %s AND deleted_at IS NULL" result = execute_query(query, (hardware_id,)) if not result: raise HTTPException(status_code=404, detail="Hardware not found") hardware = result[0] # Get customer name if applicable if hardware.get('current_owner_customer_id'): customer_query = "SELECT name AS navn FROM customers WHERE id = %s" customer_result = execute_query(customer_query, (hardware['current_owner_customer_id'],)) if customer_result: hardware['customer_name'] = customer_result[0]['navn'] # Get ownership history ownership_query = """ SELECT * FROM hardware_ownership_history WHERE hardware_id = %s AND deleted_at IS NULL ORDER BY start_date DESC """ ownership = execute_query(ownership_query, (hardware_id,)) # Get customer names for ownership history if ownership: customer_ids = [o['owner_customer_id'] for o in ownership if o.get('owner_customer_id')] if customer_ids: customer_query = "SELECT id, name AS navn FROM customers WHERE id = ANY(%s)" customers = execute_query(customer_query, (customer_ids,)) customer_map = {c['id']: c['navn'] for c in customers} if customers else {} for o in ownership: if o.get('owner_customer_id'): o['customer_name'] = customer_map.get(o['owner_customer_id'], 'Unknown') # Get location history location_query = """ SELECT * FROM hardware_location_history WHERE hardware_id = %s AND deleted_at IS NULL ORDER BY start_date DESC """ locations = execute_query(location_query, (hardware_id,)) # Get attachments attachment_query = """ SELECT * FROM hardware_attachments WHERE hardware_id = %s AND deleted_at IS NULL ORDER BY uploaded_at DESC """ attachments = execute_query(attachment_query, (hardware_id,)) # Get related cases case_query = """ SELECT hcr.*, s.titel, s.status, s.customer_id FROM hardware_case_relations hcr LEFT JOIN sag_sager s ON hcr.case_id = s.id WHERE hcr.hardware_id = %s AND hcr.deleted_at IS NULL AND s.deleted_at IS NULL ORDER BY hcr.created_at DESC """ cases = execute_query(case_query, (hardware_id,)) # Get tags tag_query = """ SELECT * FROM hardware_tags WHERE hardware_id = %s AND deleted_at IS NULL ORDER BY created_at DESC """ tags = execute_query(tag_query, (hardware_id,)) # Get all active locations for the tree (including parent_id for structure) all_locations_query = """ SELECT id, name, location_type, parent_location_id FROM locations_locations WHERE deleted_at IS NULL ORDER BY name """ all_locations_flat = execute_query(all_locations_query) location_tree = build_location_tree(all_locations_flat) return templates.TemplateResponse("modules/hardware/templates/detail.html", { "request": request, "hardware": hardware, "ownership": ownership or [], "locations": locations or [], "attachments": attachments or [], "cases": cases or [], "tags": tags or [], "location_tree": location_tree or [], "eset_specs": extract_eset_specs_summary(hardware) }) @router.get("/hardware/{hardware_id}/edit", response_class=HTMLResponse) async def edit_hardware_form(request: Request, hardware_id: int): """Display edit hardware form.""" # Get hardware query = "SELECT * FROM hardware_assets WHERE id = %s AND deleted_at IS NULL" result = execute_query(query, (hardware_id,)) if not result: raise HTTPException(status_code=404, detail="Hardware not found") hardware = result[0] # Get customers for dropdown customers = execute_query("SELECT id, name AS navn FROM customers WHERE deleted_at IS NULL ORDER BY name") return templates.TemplateResponse("modules/hardware/templates/edit.html", { "request": request, "hardware": hardware, "customers": customers or [] }) @router.post("/hardware/{hardware_id}/location") async def update_hardware_location( request: Request, hardware_id: int, location_id: int = Form(...), notes: str = Form(None) ): """Update hardware location.""" # Verify hardware exists check_query = "SELECT id FROM hardware_assets WHERE id = %s AND deleted_at IS NULL" if not execute_query(check_query, (hardware_id,)): raise HTTPException(status_code=404, detail="Hardware not found") # Verify location exists loc_check = "SELECT name FROM locations_locations WHERE id = %s" loc_result = execute_query(loc_check, (location_id,)) if not loc_result: raise HTTPException(status_code=404, detail="Location not found") location_name = loc_result[0]['name'] # 1. Close current location history close_history_query = """ UPDATE hardware_location_history SET end_date = %s WHERE hardware_id = %s AND end_date IS NULL """ execute_query(close_history_query, (date.today(), hardware_id)) # 2. Add new location history add_history_query = """ INSERT INTO hardware_location_history (hardware_id, location_id, location_name, start_date, notes) VALUES (%s, %s, %s, %s, %s) """ execute_query(add_history_query, (hardware_id, location_id, location_name, date.today(), notes)) # 3. Update current location on asset update_asset_query = """ UPDATE hardware_assets SET current_location_id = %s, updated_at = NOW() WHERE id = %s """ execute_query(update_asset_query, (location_id, hardware_id)) return RedirectResponse(url=f"/hardware/{hardware_id}", status_code=303)