import logging from datetime import datetime, timezone from typing import Any, Dict, List, Optional from app.core.database import execute_query, execute_query_single logger = logging.getLogger(__name__) CLOSED_CASE_STATUSES = ("lukket", "løst", "closed", "resolved") URGENT_PRIORITIES = ("urgent", "high", "kritisk", "critical") def _safe_count(row: Optional[dict], key: str = "count") -> int: if not row: return 0 try: return int(row.get(key) or 0) except (TypeError, ValueError): return 0 def _format_elapsed(seconds: int) -> str: total = max(0, int(seconds or 0)) hours = total // 3600 minutes = (total % 3600) // 60 secs = total % 60 return f"{hours:02d}:{minutes:02d}:{secs:02d}" def _priority_rank(priority: str) -> int: normalized = str(priority or "").strip().lower() if normalized in {"urgent", "critical", "kritisk"}: return 3 if normalized in {"high", "høj"}: return 2 if normalized in {"normal", "medium", "middel"}: return 1 return 0 def _get_user_group_names(user_id: Optional[int]) -> List[str]: if user_id is None: return [] rows = execute_query( """ SELECT LOWER(g.name) AS name FROM user_groups ug JOIN groups g ON g.id = ug.group_id WHERE ug.user_id = %s """, (user_id,), ) or [] return [str(r.get("name") or "").strip() for r in rows if r.get("name")] def _can_view_boss_tab(user_id: Optional[int]) -> bool: if user_id is None: return False group_names = _get_user_group_names(user_id) if not group_names: # Fail-open for authenticated users if group mapping is missing. return True leadership_tokens = ( "admin", "manager", "leder", "chef", "teknik", "technician", "support", "drift", "it", ) return any( any(token in group for token in leadership_tokens) for group in group_names ) def is_bottom_bar_enabled(user_id: Optional[int]) -> bool: setting = execute_query_single("SELECT value FROM settings WHERE key = %s", ("bottom_bar_enabled",)) setting_value = str((setting or {}).get("value") or "").strip().lower() if setting_value not in {"1", "true", "yes", "on"}: return False if user_id is None: return True pref = execute_query_single( """ SELECT enabled FROM user_module_preferences WHERE user_id = %s AND module_name = %s LIMIT 1 """, (user_id, "bottom_bar"), ) if pref and pref.get("enabled") is not None: return bool(pref.get("enabled")) role = execute_query_single( """ SELECT mrs.enabled FROM module_role_settings mrs JOIN user_groups ug ON ug.group_id = mrs.group_id WHERE ug.user_id = %s AND mrs.module_name = %s ORDER BY mrs.enabled DESC LIMIT 1 """, (user_id, "bottom_bar"), ) if role and role.get("enabled") is not None: return bool(role.get("enabled")) return True def get_dashboard_status() -> Dict[str, int]: mails_unread = _safe_count( execute_query_single( """ SELECT COUNT(*) AS count FROM email_messages WHERE deleted_at IS NULL AND COALESCE(is_read, FALSE) = FALSE """ ) ) sager_open = _safe_count( execute_query_single( """ SELECT COUNT(*) AS count FROM sag_sager WHERE deleted_at IS NULL AND LOWER(COALESCE(status, '')) NOT IN ('lukket', 'løst', 'closed', 'resolved') """ ) ) sager_urgent = _safe_count( execute_query_single( """ SELECT COUNT(*) AS count FROM sag_sager WHERE deleted_at IS NULL AND LOWER(COALESCE(status, '')) NOT IN ('lukket', 'løst', 'closed', 'resolved') AND LOWER(COALESCE(priority::text, 'normal')) IN ('urgent', 'high', 'kritisk', 'critical') """ ) ) sager_unassigned = _safe_count( execute_query_single( """ SELECT COUNT(*) AS count FROM sag_sager WHERE deleted_at IS NULL AND LOWER(COALESCE(status, '')) NOT IN ('lukket', 'løst', 'closed', 'resolved') AND ansvarlig_bruger_id IS NULL """ ) ) return { "mails_unread": mails_unread, "sager_open": sager_open, "sager_urgent": sager_urgent, "sager_unassigned": sager_unassigned, } def get_active_timer(user_id: Optional[int]) -> Dict[str, Any]: if user_id is None: return { "active": False, "sag_id": None, "sag_navn": None, "start_tid": None, "elapsed": 0, "elapsed_hhmmss": "00:00:00", "time_entry_id": None, } timer = execute_query_single( """ SELECT t.id, t.sag_id, s.titel AS sag_navn, t.start_tid, GREATEST(EXTRACT(EPOCH FROM (NOW() - t.start_tid))::int, 0) AS elapsed FROM tmodule_times t LEFT JOIN sag_sager s ON s.id = t.sag_id WHERE t.medarbejder_id = %s AND t.aktiv_timer = TRUE AND t.slut_tid IS NULL ORDER BY t.start_tid DESC NULLS LAST, t.id DESC LIMIT 1 """, (user_id,), ) if not timer: return { "active": False, "sag_id": None, "sag_navn": None, "start_tid": None, "elapsed": 0, "elapsed_hhmmss": "00:00:00", "time_entry_id": None, } elapsed = int(timer.get("elapsed") or 0) return { "active": True, "sag_id": timer.get("sag_id"), "sag_navn": timer.get("sag_navn"), "start_tid": timer.get("start_tid"), "elapsed": elapsed, "elapsed_hhmmss": _format_elapsed(elapsed), "time_entry_id": timer.get("id"), } def get_notifications(user_id: Optional[int], limit: int = 20) -> Dict[str, Any]: if user_id is None: return {"items": [], "count": 0} limit_safe = max(1, min(int(limit or 20), 100)) reminders = execute_query( """ SELECT r.id, r.sag_id, r.title, r.message, r.priority, r.event_type, r.next_check_at, s.titel AS case_title, c.name AS customer_name FROM sag_reminders r JOIN sag_sager s ON r.sag_id = s.id JOIN customers c ON s.customer_id = c.id LEFT JOIN LATERAL ( SELECT id, snoozed_until, status, triggered_at FROM sag_reminder_logs WHERE reminder_id = r.id AND user_id = %s ORDER BY triggered_at DESC LIMIT 1 ) l ON true WHERE r.is_active = TRUE AND r.deleted_at IS NULL AND r.next_check_at <= CURRENT_TIMESTAMP AND %s = ANY(r.recipient_user_ids) AND (l.snoozed_until IS NULL OR l.snoozed_until < CURRENT_TIMESTAMP) AND (l.status IS NULL OR l.status != 'dismissed') ORDER BY CASE LOWER(COALESCE(r.priority::text, 'normal')) WHEN 'urgent' THEN 1 WHEN 'high' THEN 2 WHEN 'normal' THEN 3 ELSE 4 END, r.next_check_at ASC LIMIT %s """, (user_id, user_id, limit_safe), ) or [] unread_mail_count = _safe_count( execute_query_single( """ SELECT COUNT(*) AS count FROM email_messages em WHERE em.deleted_at IS NULL AND COALESCE(em.is_read, FALSE) = FALSE """ ) ) items: List[Dict[str, Any]] = [] if unread_mail_count > 0: items.append( { "id": f"mail-unread-{unread_mail_count}", "type": "mail", "severity": "medium" if unread_mail_count < 10 else "high", "title": f"{unread_mail_count} ulæste mails", "message": "Der er ulæste mails i indbakken", "action": "/emails", "created_at": datetime.now(timezone.utc).isoformat(), } ) for row in reminders: priority = str(row.get("priority") or "normal").lower() severity = "low" if priority in {"high", "høj"}: severity = "medium" if priority in {"urgent", "critical", "kritisk"}: severity = "high" items.append( { "id": f"reminder-{row.get('id')}", "type": row.get("event_type") or "reminder", "severity": severity, "title": row.get("title") or "Påmindelse", "message": row.get("message") or row.get("case_title") or "", "sag_id": row.get("sag_id"), "case_title": row.get("case_title"), "customer_name": row.get("customer_name"), "action": f"/sag/{row.get('sag_id')}" if row.get("sag_id") else "/sag", "created_at": row.get("next_check_at"), } ) items.sort( key=lambda item: ( {"high": 0, "medium": 1, "low": 2}.get(str(item.get("severity") or "low"), 3), str(item.get("created_at") or ""), ) ) return {"items": items[:limit_safe], "count": len(items)} def _context_actions_for_path(context_path: str) -> Dict[str, Any]: normalized = str(context_path or "").strip().lower() payload: Dict[str, Any] = { "context_key": "global", "global": [ {"id": "new_case", "label": "Ny sag", "action": "/sag"}, {"id": "new_mail", "label": "Ny mail", "action": "/emails"}, {"id": "start_timer", "label": "Start timer", "action": "/timetracking"}, {"id": "log_time", "label": "Log tid", "action": "/timetracking"}, {"id": "add_note", "label": "Tilføj note", "action": "/sag"}, ], "context": [], } if normalized.startswith("/sag"): payload["context_key"] = "sag" payload["context"] = [ {"id": "case_time", "label": "Tid", "action": "/timetracking"}, {"id": "case_mail", "label": "Mail", "action": "/emails"}, {"id": "case_relation", "label": "Relation", "action": "/customers"}, {"id": "case_tag", "label": "Tag", "action": "/tags"}, ] elif normalized.startswith("/hardware"): payload["context_key"] = "hardware" payload["context"] = [ {"id": "hardware_new", "label": "Ny enhed", "action": "/hardware"}, {"id": "hardware_history", "label": "Historik", "action": "/hardware"}, {"id": "hardware_link_case", "label": "Tilknyt sag", "action": "/sag"}, ] return payload def build_bottom_bar_state( user_id: Optional[int], context_path: str = "", force_boss_access: bool = False, ) -> Dict[str, Any]: enabled = is_bottom_bar_enabled(user_id) if not enabled: return {"enabled": False, "sections": {}} status = get_dashboard_status() timer = get_active_timer(user_id) notifications = get_notifications(user_id, limit=10) urgent_cases = execute_query( """ SELECT id, titel FROM sag_sager WHERE deleted_at IS NULL AND LOWER(COALESCE(status, '')) NOT IN ('lukket', 'løst', 'closed', 'resolved') AND LOWER(COALESCE(priority::text, 'normal')) IN ('urgent', 'high', 'kritisk', 'critical') ORDER BY updated_at DESC NULLS LAST, id DESC LIMIT 5 """ ) or [] open_cases = execute_query( """ SELECT id, titel FROM sag_sager WHERE deleted_at IS NULL AND LOWER(COALESCE(status, '')) NOT IN ('lukket', 'løst', 'closed', 'resolved') ORDER BY updated_at DESC NULLS LAST, id DESC LIMIT 5 """ ) or [] timer_list: List[Dict[str, Any]] = [] if timer.get("active"): timer_list.append( { "id": timer.get("time_entry_id"), "sag_id": timer.get("sag_id"), "desc": timer.get("sag_navn") or f"Sag #{timer.get('sag_id')}", "elapsed": timer.get("elapsed"), "elapsed_hhmmss": timer.get("elapsed_hhmmss"), } ) messages = [ { "from": "System", "text": f"{notifications.get('count', 0)} aktive notifikationer", } ] tasks = [] for n in (notifications.get("items") or [])[:5]: tasks.append( { "title": n.get("title") or "Notifikation", "deadline": n.get("severity") or "info", "action": n.get("action") or "/", } ) context_actions = _context_actions_for_path(context_path) can_view_boss = bool(force_boss_access) or _can_view_boss_tab(user_id) team_workload: List[Dict[str, Any]] = [] technicians_today: List[Dict[str, Any]] = [] escalation_cases: List[Dict[str, Any]] = [] unassigned_cases: List[Dict[str, Any]] = [] if can_view_boss: team_workload = execute_query( """ SELECT u.user_id, COALESCE(NULLIF(u.full_name, ''), u.username, ('Bruger #' || u.user_id::text)) AS owner_name, COUNT(s.id)::int AS open_cases, COUNT(CASE WHEN LOWER(COALESCE(s.priority::text, 'normal')) IN ('urgent', 'high', 'kritisk', 'critical') THEN 1 END)::int AS urgent_cases FROM users u LEFT JOIN sag_sager s ON s.ansvarlig_bruger_id = u.user_id AND s.deleted_at IS NULL AND LOWER(COALESCE(s.status, '')) NOT IN ('lukket', 'løst', 'closed', 'resolved') GROUP BY u.user_id, u.full_name, u.username HAVING COUNT(s.id) > 0 ORDER BY urgent_cases DESC, open_cases DESC, owner_name ASC LIMIT 8 """ ) or [] technicians_today = execute_query( """ SELECT u.user_id, COALESCE(NULLIF(u.full_name, ''), u.username, ('Bruger #' || u.user_id::text)) AS owner_name, COUNT(s.id)::int AS open_cases, COUNT(CASE WHEN s.deadline::date = CURRENT_DATE THEN 1 END)::int AS due_today_cases, COALESCE( ( SELECT json_agg( json_build_object( 'id', t.id, 'title', t.titel, 'priority', COALESCE(t.priority::text, 'normal'), 'deadline', t.deadline ) ORDER BY CASE WHEN t.deadline::date = CURRENT_DATE THEN 0 ELSE 1 END, COALESCE(t.deadline, t.updated_at, t.created_at) ASC, t.id ASC ) FROM ( SELECT s2.id, s2.titel, s2.priority, s2.deadline, s2.updated_at, s2.created_at FROM sag_sager s2 WHERE s2.ansvarlig_bruger_id = u.user_id AND s2.deleted_at IS NULL AND LOWER(COALESCE(s2.status, '')) NOT IN ('lukket', 'løst', 'closed', 'resolved') AND ( s2.deadline::date = CURRENT_DATE OR s2.created_at::date = CURRENT_DATE ) ORDER BY CASE WHEN s2.deadline::date = CURRENT_DATE THEN 0 ELSE 1 END, COALESCE(s2.deadline, s2.updated_at, s2.created_at) ASC, s2.id ASC LIMIT 6 ) t ), '[]'::json ) AS today_tasks FROM users u LEFT JOIN sag_sager s ON s.ansvarlig_bruger_id = u.user_id AND s.deleted_at IS NULL AND LOWER(COALESCE(s.status, '')) NOT IN ('lukket', 'løst', 'closed', 'resolved') WHERE EXISTS ( SELECT 1 FROM user_groups ug JOIN groups g ON g.id = ug.group_id WHERE ug.user_id = u.user_id AND LOWER(g.name) LIKE ANY(ARRAY['%teknik%', '%technician%', '%support%']) ) GROUP BY u.user_id, u.full_name, u.username ORDER BY due_today_cases DESC, open_cases DESC, owner_name ASC LIMIT 10 """ ) or [] escalation_cases = execute_query( """ SELECT s.id, s.titel, s.priority, s.updated_at, EXTRACT(EPOCH FROM (NOW() - COALESCE(s.updated_at, s.created_at)))::int AS age_seconds, COALESCE(NULLIF(u.full_name, ''), u.username, 'Ikke tildelt') AS owner_name FROM sag_sager s LEFT JOIN users u ON u.user_id = s.ansvarlig_bruger_id WHERE s.deleted_at IS NULL AND LOWER(COALESCE(s.status, '')) NOT IN ('lukket', 'løst', 'closed', 'resolved') AND LOWER(COALESCE(s.priority::text, 'normal')) IN ('urgent', 'high', 'kritisk', 'critical') AND NOW() - COALESCE(s.updated_at, s.created_at) > INTERVAL '24 hours' ORDER BY COALESCE(s.updated_at, s.created_at) ASC LIMIT 8 """ ) or [] unassigned_cases = execute_query( """ SELECT s.id, s.titel, s.priority, s.created_at, s.updated_at FROM sag_sager s WHERE s.deleted_at IS NULL AND LOWER(COALESCE(s.status, '')) NOT IN ('lukket', 'løst', 'closed', 'resolved') AND s.ansvarlig_bruger_id IS NULL ORDER BY COALESCE(s.updated_at, s.created_at) DESC LIMIT 8 """ ) or [] sections = { "mail": { "unread": status.get("mails_unread", 0), "customer_reply_needed": status.get("mails_unread", 0), }, "cases": { "open": status.get("sager_open", 0), "list": [{"id": row.get("id"), "title": row.get("titel") or f"Sag #{row.get('id')}"} for row in open_cases], }, "urgent": { "count": status.get("sager_urgent", 0), "list": [{"id": row.get("id"), "title": row.get("titel") or f"Sag #{row.get('id')}"} for row in urgent_cases], }, "unassigned": { "count": status.get("sager_unassigned", 0), }, "timer": { "active_count": 1 if timer.get("active") else 0, "list": timer_list, "active": timer, }, "kuma": { "down": 0, "list": [], }, "eset": { "incidents": 0, "list": [], }, "messages": { "count": len(messages), "list": messages, }, "tasks": { "count": len(tasks), "list": tasks, }, "boss": { "can_view": can_view_boss, "stats": { "unassigned": status.get("sager_unassigned", 0), "active_employees": _safe_count( execute_query_single( "SELECT COUNT(*) AS count FROM tmodule_times WHERE aktiv_timer = TRUE AND slut_tid IS NULL" ) ), "open_cases": status.get("sager_open", 0), "urgent_cases": status.get("sager_urgent", 0), "stale_urgent_cases": len(escalation_cases), } , "team_workload": [ { "user_id": row.get("user_id"), "owner_name": row.get("owner_name"), "open_cases": int(row.get("open_cases") or 0), "urgent_cases": int(row.get("urgent_cases") or 0), } for row in team_workload ], "technicians_today": [ { "user_id": row.get("user_id"), "owner_name": row.get("owner_name"), "open_cases": int(row.get("open_cases") or 0), "due_today_cases": int(row.get("due_today_cases") or 0), "today_tasks": row.get("today_tasks") or [], } for row in technicians_today ], "escalations": [ { "id": row.get("id"), "title": row.get("titel") or f"Sag #{row.get('id')}", "priority": row.get("priority") or "normal", "owner_name": row.get("owner_name") or "Ikke tildelt", "age_seconds": int(row.get("age_seconds") or 0), } for row in escalation_cases ], "unassigned_cases": [ { "id": row.get("id"), "title": row.get("titel") or f"Sag #{row.get('id')}", "priority": row.get("priority") or "normal", } for row in unassigned_cases ], }, "context_actions": context_actions, } return { "enabled": True, "sections": sections, "status": status, "active_timer": timer, "notifications": notifications, }