""" Case Analysis Service AI-powered case text analysis using Ollama LLM Extracts entities for QuickCreate feature """ import logging import json import hashlib from typing import Dict, Optional, List from datetime import datetime, timedelta import aiohttp import re from app.core.config import settings from app.core.database import execute_query, execute_query_single from app.models.schemas import QuickCreateAnalysis, SagPriority logger = logging.getLogger(__name__) class CaseAnalysisService: """AI-powered case text analysis for QuickCreate feature""" def __init__(self): self.ollama_endpoint = settings.OLLAMA_ENDPOINT self.ollama_model = settings.OLLAMA_MODEL self.confidence_threshold = getattr(settings, 'QUICK_CREATE_CONFIDENCE_THRESHOLD', 0.6) self.ai_timeout = getattr(settings, 'QUICK_CREATE_AI_TIMEOUT', 15) # Increased for larger models self._cache = {} # Simple in-memory cache {text_hash: (result, timestamp)} self._cache_ttl = 300 # 5 minutes async def analyze_case_text(self, text: str, user_id: int) -> QuickCreateAnalysis: """ Analyze case text and extract structured data Returns QuickCreateAnalysis with suggestions """ if not text or len(text) < 5: return self._empty_analysis(text) # Check cache first cached_result = self._get_cached_analysis(text) if cached_result: logger.info(f"✅ Using cached analysis for text hash") return cached_result # Build system prompt (Danish for accuracy) system_prompt = self._build_analysis_prompt() # Build user message user_message = self._build_text_context(text) try: # Call Ollama with timeout result = await self._call_ollama(system_prompt, user_message) if result: # Enrich with database matches analysis = await self._enrich_with_db_matches(result, user_id) # Apply business rules analysis = await self._apply_business_rules(analysis, user_id) # Cache the result self._cache_analysis(text, analysis) return analysis else: logger.warning("⚠️ Ollama returned no result, using empty analysis") return self._empty_analysis(text) except Exception as e: logger.error(f"❌ Case analysis failed: {e}", exc_info=True) return self._empty_analysis(text) def _build_analysis_prompt(self) -> str: """Build Danish system prompt for case analysis""" return """Du er en intelligent assistent der analyserer beskeder om IT-support sager og udtrækker struktureret information. VIGTIGE REGLER: 1. Returner KUN gyldig JSON - ingen forklaring eller ekstra tekst 2. Hvis et felt ikke findes, sæt det til null eller tom liste 3. Beregn confidence baseret på hvor sikker du er (0.0-1.0) - SÆT IKKE 0.0 hvis du faktisk kan udtrække information! 4. Analyser tone og urgency for at bestemme prioritet 5. OPFIND IKKE ORD - brug kun ord fra den originale tekst 6. Ved "Ring til X fra Y" mønster: Y er oftest firma, X er kontaktperson FELTER AT UDTRÆKKE: **suggested_title**: Kort beskrivende titel (max 80 tegn) - BASERET PÅ ORIGINAL TEKST **suggested_description**: Fuld beskrivelse (kan være hele teksten hvis den er kort) **priority**: "low", "normal", "high", eller "urgent" - "urgent": Kritiske ord som "nede", "virker ikke", "kritisk", "ASAP", "sur kunde", "omgående" - "high": "hurtigt", "snart", "vigtigt", "prioriter", "Haster" - "low": "når I får tid", "ikke hastende", "lavprioriteret" - "normal": Standard hvis intet tyder på andet **customer_hints**: Liste af hints om kunde (firmanavn, CVR, nummer, lokation) - f.eks. ["norva24", "Nets", "24"] **technician_hints**: Liste af INTERNE BMC Hub medarbejdere der skal tildeles sagen (Christian, Peter, osv.) - VIG: "Ring til Dennis fra norva24" betyder Dennis er KUNDE-kontaktperson, IKKE vores tekniker - Kun sæt technician_hints hvis der står "Send til Christian" eller "Christian skal ordne det" - Lad være tom liste hvis der ikke nævnes en INTERN medarbejder **tags**: Relevante nøgleord (produkt, lokation, type problem) - brug kun fra original tekst **hardware_keywords**: Hardware-relaterede termer (modeller, serial numbers) **confidence**: 0.5-0.9 hvis du kan udtrække nogen information, 0.0-0.4 kun hvis teksten er uforståelig EKSEMPLER: Input: "Ring til Dennis fra norva24. Haster" Output: { "suggested_title": "Ring til Dennis fra norva24", "suggested_description": "Ring til Dennis fra norva24. Haster", "priority": "high", "customer_hints": ["norva24"], "technician_hints": [], "tags": ["telefonbesked", "opfølgning"], "hardware_keywords": [], "confidence": 0.75 } Input: "Kunde 24 ringer. Printer i Hellerup virker ikke. Skal fixes hurtigt." Output: { "suggested_title": "Printer i Hellerup virker ikke", "suggested_description": "Kunde 24 ringer. Printer i Hellerup virker ikke. Skal fixes hurtigt.", "priority": "high", "customer_hints": ["24", "Hellerup"], "technician_hints": [], "tags": ["printer", "hellerup", "hardware"], "hardware_keywords": ["printer"], "confidence": 0.85 } Input: "Ny medarbejder starter 1. marts hos Nets. Skal have laptop og adgang til systemer." Output: { "suggested_title": "Ny medarbejder hos Nets starter 1. marts", "suggested_description": "Ny medarbejder starter 1. marts hos Nets. Skal have laptop og adgang til systemer.", "priority": "normal", "customer_hints": ["Nets"], "technician_hints": [], "tags": ["onboarding", "ny medarbejder", "laptop"], "hardware_keywords": ["laptop"], "confidence": 0.90 } Input: "Lenovo T14 serial ABC123 kører langsomt. Når I får tid." Output: { "suggested_title": "Lenovo T14 kører langsomt", "suggested_description": "Lenovo T14 serial ABC123 kører langsomt. Når I får tid.", "priority": "low", "customer_hints": [], "technician_hints": [], "tags": ["performance", "lenovo"], "hardware_keywords": ["Lenovo T14", "ABC123"], "confidence": 0.75 } Input: "Norva24 ringer - server er nede! Send Christian ud ASAP." Output: { "suggested_title": "Norva24 server er nede", "suggested_description": "Norva24 ringer - server er nede! Send Christian ud ASAP.", "priority": "urgent", "customer_hints": ["Norva24"], "technician_hints": ["Christian"], "tags": ["server", "onsite", "kritisk"], "hardware_keywords": ["server"], "confidence": 0.90 } Husk: Returner KUN JSON, ingen forklaring eller tekst udenfor JSON-objektet.""" def _build_text_context(self, text: str) -> str: """Build context for AI analysis""" # Truncate if too long (avoid token limits) if len(text) > 2000: text = text[:2000] + "..." return f"""Analyser denne sags-tekst og udtrække struktureret information: {text} Returner JSON med suggested_title, suggested_description, priority, customer_hints, technician_hints, tags, hardware_keywords, og confidence.""" async def _call_ollama(self, system_prompt: str, user_message: str) -> Optional[Dict]: """Call Ollama API for analysis""" url = f"{self.ollama_endpoint}/api/chat" payload = { "model": self.ollama_model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_message} ], "stream": False, "options": { "temperature": 0.2, # Low for consistent extraction "num_predict": 800 # Enough for detailed response } } try: start_time = datetime.now() async with aiohttp.ClientSession() as session: timeout = aiohttp.ClientTimeout(total=self.ai_timeout) async with session.post(url, json=payload, timeout=timeout) as response: if response.status != 200: error_text = await response.text() logger.error(f"❌ Ollama API error: {response.status} - {error_text}") return None data = await response.json() message_data = data.get('message', {}) content = message_data.get('content', '') or message_data.get('thinking', '') processing_time = (datetime.now() - start_time).total_seconds() * 1000 if not content: logger.error(f"❌ Ollama returned empty response") return None # Parse JSON response logger.info(f"🔍 Raw Ollama response: {content[:500]}") result = self._parse_ollama_response(content) if result: result['processing_time_ms'] = int(processing_time) logger.info(f"✅ AI analysis: priority={result.get('priority', 'unknown')} (confidence: {result.get('confidence', 0):.2f}, {processing_time:.0f}ms)") logger.info(f"🔍 Parsed result: {json.dumps(result, ensure_ascii=False)[:500]}") return result else: logger.warning(f"⚠️ Failed to parse Ollama response: {content[:200]}") return None except aiohttp.ClientError as e: logger.warning(f"⚠️ Ollama connection error: {e}") return None except Exception as e: logger.error(f"❌ Unexpected error calling Ollama: {e}", exc_info=True) return None def _parse_ollama_response(self, content: str) -> Optional[Dict]: """Parse Ollama JSON response""" try: # Try to extract JSON if wrapped in markdown if '```json' in content: json_match = re.search(r'```json\s*(\{.*?\})\s*```', content, re.DOTALL) if json_match: content = json_match.group(1) elif '```' in content: json_match = re.search(r'```\s*(\{.*?\})\s*```', content, re.DOTALL) if json_match: content = json_match.group(1) # Try to find JSON object json_match = re.search(r'\{.*\}', content, re.DOTALL) if json_match: content = json_match.group(0) result = json.loads(content) return result except json.JSONDecodeError as e: logger.error(f"❌ JSON parse error: {e} - Content: {content[:500]}") return None async def _enrich_with_db_matches(self, ai_result: Dict, user_id: int) -> QuickCreateAnalysis: """Enrich AI results with database matches""" # Extract base fields title = ai_result.get('suggested_title', '') description = ai_result.get('suggested_description', '') priority_str = ai_result.get('priority', 'normal') confidence = float(ai_result.get('confidence', 0.0)) # Map priority to enum try: priority = SagPriority(priority_str.lower()) except ValueError: priority = SagPriority.NORMAL # Match customer customer_hints = ai_result.get('customer_hints', []) customer_id, customer_name = await self._match_customer(customer_hints) # Match technician technician_hints = ai_result.get('technician_hints', []) technician_id, technician_name = await self._match_technician(technician_hints) # Match group (can be enhanced with rules later) group_id, group_name = None, None # Extract tags tags = ai_result.get('tags', []) # Match hardware hardware_keywords = ai_result.get('hardware_keywords', []) hardware_refs = await self._match_hardware(hardware_keywords) # Build AI reasoning for low confidence ai_reasoning = None if confidence < self.confidence_threshold: ai_reasoning = f"Lav confidence: {confidence:.2f}. Tjek alle felter grundigt." return QuickCreateAnalysis( suggested_title=title, suggested_description=description, suggested_priority=priority, suggested_customer_id=customer_id, suggested_customer_name=customer_name, suggested_technician_id=technician_id, suggested_technician_name=technician_name, suggested_group_id=group_id, suggested_group_name=group_name, suggested_tags=tags, hardware_references=hardware_refs, confidence=confidence, ai_reasoning=ai_reasoning ) async def _match_customer(self, hints: List[str]) -> tuple[Optional[int], Optional[str]]: """Match customer from hints""" if not hints: return None, None for hint in hints: hint_clean = str(hint).strip() if not hint_clean: continue # Try exact ID match first if hint_clean.isdigit(): row = execute_query_single( "SELECT id, name FROM customers WHERE id = %s AND deleted_at IS NULL AND is_active = true", (int(hint_clean),) ) if row: return row['id'], row['name'] # Try name or CVR match with intelligent sorting # Priority: 1) Exact match, 2) Starts with hint, 3) Contains hint row = execute_query_single( """SELECT id, name FROM customers WHERE (name ILIKE %s OR cvr_number ILIKE %s) AND deleted_at IS NULL AND is_active = true ORDER BY CASE WHEN LOWER(name) = LOWER(%s) THEN 1 WHEN LOWER(name) LIKE LOWER(%s) THEN 2 ELSE 3 END, name ASC LIMIT 1""", (f"%{hint_clean}%", f"%{hint_clean}%", hint_clean, f"{hint_clean}%") ) if row: logger.info(f"✅ Matched customer: {row['name']} (id={row['id']}) from hint '{hint_clean}'") return row['id'], row['name'] return None, None async def _match_technician(self, hints: List[str]) -> tuple[Optional[int], Optional[str]]: """Match technician from hints (BMC Hub employees only)""" if not hints: return None, None for hint in hints: hint_clean = str(hint).strip() if not hint_clean: continue # Match by full_name or username with intelligent sorting # Priority: 1) Exact match, 2) Starts with hint, 3) Contains hint row = execute_query_single( """SELECT user_id as id, full_name, username FROM users WHERE (full_name ILIKE %s OR username ILIKE %s) AND is_active = true ORDER BY CASE WHEN LOWER(full_name) = LOWER(%s) THEN 1 WHEN LOWER(full_name) LIKE LOWER(%s) THEN 2 WHEN LOWER(username) = LOWER(%s) THEN 1 WHEN LOWER(username) LIKE LOWER(%s) THEN 2 ELSE 3 END, full_name ASC LIMIT 1""", (f"%{hint_clean}%", f"%{hint_clean}%", hint_clean, f"{hint_clean}%", hint_clean, f"{hint_clean}%") ) if row: logger.info(f"✅ Matched technician: {row['full_name']} (user_id={row['id']}) from hint '{hint_clean}'") return row['id'], row['full_name'] or row['username'] else: logger.warning(f"⚠️ No BMC Hub employee found matching '{hint_clean}' - may be external contact") return None, None async def _match_hardware(self, keywords: List[str]) -> List[Dict]: """Match hardware from keywords""" hardware_refs = [] for keyword in keywords: keyword_clean = str(keyword).strip() if not keyword_clean or len(keyword_clean) < 3: continue # Search hardware assets rows = execute_query( """SELECT id, brand, model, serial_number FROM hardware_assets WHERE (brand ILIKE %s OR model ILIKE %s OR serial_number ILIKE %s) AND deleted_at IS NULL LIMIT 3""", (f"%{keyword_clean}%", f"%{keyword_clean}%", f"%{keyword_clean}%") ) for row in rows: hardware_refs.append({ 'id': row['id'], 'brand': row['brand'], 'model': row['model'], 'serial_number': row['serial_number'] }) return hardware_refs async def _apply_business_rules(self, analysis: QuickCreateAnalysis, user_id: int) -> QuickCreateAnalysis: """Apply business rules to enhance analysis (Phase 2 feature)""" # Rule 1: Check customer history for technician preference if analysis.suggested_customer_id and not analysis.suggested_technician_id: # Find most frequent technician for this customer row = execute_query_single( """SELECT ansvarlig_bruger_id, COUNT(*) as cnt FROM sag_sager WHERE customer_id = %s AND ansvarlig_bruger_id IS NOT NULL GROUP BY ansvarlig_bruger_id ORDER BY cnt DESC LIMIT 1""", (analysis.suggested_customer_id,) ) if row: user_row = execute_query_single( "SELECT user_id as id, full_name FROM users WHERE user_id = %s AND is_active = true", (row['ansvarlig_bruger_id'],) ) if user_row: analysis.suggested_technician_id = user_row['id'] analysis.suggested_technician_name = user_row['full_name'] logger.info(f"✨ Business rule: Assigned technician {user_row['full_name']} based on history") # Rule 2: Check for VIP customers (future: alert_notes integration) # TODO: Add alert_notes check for popup rules # Rule 3: Onboarding detection onboarding_keywords = ['onboarding', 'ny medarbejder', 'nyt bruger', 'starter den'] if any(keyword in analysis.suggested_description.lower() for keyword in onboarding_keywords): if 'onboarding' not in analysis.suggested_tags: analysis.suggested_tags.append('onboarding') logger.info(f"✨ Business rule: Detected onboarding case") return analysis def _empty_analysis(self, text: str) -> QuickCreateAnalysis: """Return empty analysis with original text""" return QuickCreateAnalysis( suggested_title="", suggested_description=text, suggested_priority=SagPriority.NORMAL, confidence=0.0, ai_reasoning="AI unavailable - fill fields manually" ) def _get_cached_analysis(self, text: str) -> Optional[QuickCreateAnalysis]: """Get cached analysis if available and not expired""" text_hash = hashlib.md5(text.encode()).hexdigest() if text_hash in self._cache: result, timestamp = self._cache[text_hash] if (datetime.now() - timestamp).total_seconds() < self._cache_ttl: return result else: # Expired del self._cache[text_hash] return None def _cache_analysis(self, text: str, analysis: QuickCreateAnalysis): """Cache analysis result""" text_hash = hashlib.md5(text.encode()).hexdigest() self._cache[text_hash] = (analysis, datetime.now()) # Clean old cache entries (simple LRU) if len(self._cache) > 100: # Remove oldest 20 entries sorted_cache = sorted(self._cache.items(), key=lambda x: x[1][1]) for key, _ in sorted_cache[:20]: del self._cache[key]