bmc_hub/app/services/case_analysis_service.py

566 lines
23 KiB
Python

"""
Case Analysis Service
AI-powered case text analysis using Ollama LLM
Extracts entities for QuickCreate feature
"""
import logging
import json
import hashlib
from typing import Dict, Optional, List
from datetime import datetime, timedelta
import aiohttp
import re
from app.core.config import settings
from app.core.database import execute_query, execute_query_single
from app.models.schemas import QuickCreateAnalysis, SagPriority
logger = logging.getLogger(__name__)
class CaseAnalysisService:
"""AI-powered case text analysis for QuickCreate feature"""
def __init__(self):
self.ollama_endpoint = settings.OLLAMA_ENDPOINT
self.ollama_model = settings.OLLAMA_MODEL
self.confidence_threshold = getattr(settings, 'QUICK_CREATE_CONFIDENCE_THRESHOLD', 0.6)
self.ai_timeout = getattr(settings, 'QUICK_CREATE_AI_TIMEOUT', 15) # Increased for larger models
self._cache = {} # Simple in-memory cache {text_hash: (result, timestamp)}
self._cache_ttl = 300 # 5 minutes
async def analyze_case_text(self, text: str, user_id: int) -> QuickCreateAnalysis:
"""
Analyze case text and extract structured data
Returns QuickCreateAnalysis with suggestions
"""
if not text or len(text) < 5:
return self._empty_analysis(text)
# Check cache first
cached_result = self._get_cached_analysis(text)
if cached_result:
logger.info(f"✅ Using cached analysis for text hash")
return cached_result
# Build system prompt (Danish for accuracy)
system_prompt = self._build_analysis_prompt()
# Build user message
user_message = self._build_text_context(text)
try:
# Call Ollama with timeout
result = await self._call_ollama(system_prompt, user_message)
if result:
# Enrich with database matches
analysis = await self._enrich_with_db_matches(result, user_id)
# Apply business rules
analysis = await self._apply_business_rules(analysis, user_id)
# Cache the result
self._cache_analysis(text, analysis)
return analysis
else:
logger.warning("⚠️ Ollama returned no result, using heuristic fallback analysis")
return await self._heuristic_fallback_analysis(text)
except Exception as e:
logger.error(f"❌ Case analysis failed: {e}", exc_info=True)
return await self._heuristic_fallback_analysis(text)
def _build_analysis_prompt(self) -> str:
"""Build Danish system prompt for case analysis"""
return """Du er en intelligent assistent der analyserer beskeder om IT-support sager og udtrækker struktureret information.
VIGTIGE REGLER:
1. Returner KUN gyldig JSON - ingen forklaring eller ekstra tekst
2. Hvis et felt ikke findes, sæt det til null eller tom liste
3. Beregn confidence baseret på hvor sikker du er (0.0-1.0) - SÆT IKKE 0.0 hvis du faktisk kan udtrække information!
4. Analyser tone og urgency for at bestemme prioritet
5. OPFIND IKKE ORD - brug kun ord fra den originale tekst
6. Ved "Ring til X fra Y" mønster: Y er oftest firma, X er kontaktperson
FELTER AT UDTRÆKKE:
**suggested_title**: Kort beskrivende titel (max 80 tegn) - BASERET PÅ ORIGINAL TEKST
**suggested_description**: Fuld beskrivelse (kan være hele teksten hvis den er kort)
**priority**: "low", "normal", "high", eller "urgent"
- "urgent": Kritiske ord som "nede", "virker ikke", "kritisk", "ASAP", "sur kunde", "omgående"
- "high": "hurtigt", "snart", "vigtigt", "prioriter", "Haster"
- "low": "når I får tid", "ikke hastende", "lavprioriteret"
- "normal": Standard hvis intet tyder på andet
**customer_hints**: Liste af hints om kunde (firmanavn, CVR, nummer, lokation) - f.eks. ["norva24", "Nets", "24"]
**technician_hints**: Liste af INTERNE BMC Hub medarbejdere der skal tildeles sagen (Christian, Peter, osv.)
- VIG: "Ring til Dennis fra norva24" betyder Dennis er KUNDE-kontaktperson, IKKE vores tekniker
- Kun sæt technician_hints hvis der står "Send til Christian" eller "Christian skal ordne det"
- Lad være tom liste hvis der ikke nævnes en INTERN medarbejder
**tags**: Relevante nøgleord (produkt, lokation, type problem) - brug kun fra original tekst
**hardware_keywords**: Hardware-relaterede termer (modeller, serial numbers)
**confidence**: 0.5-0.9 hvis du kan udtrække nogen information, 0.0-0.4 kun hvis teksten er uforståelig
EKSEMPLER:
Input: "Ring til Dennis fra norva24. Haster"
Output:
{
"suggested_title": "Ring til Dennis fra norva24",
"suggested_description": "Ring til Dennis fra norva24. Haster",
"priority": "high",
"customer_hints": ["norva24"],
"technician_hints": [],
"tags": ["telefonbesked", "opfølgning"],
"hardware_keywords": [],
"confidence": 0.75
}
Input: "Kunde 24 ringer. Printer i Hellerup virker ikke. Skal fixes hurtigt."
Output:
{
"suggested_title": "Printer i Hellerup virker ikke",
"suggested_description": "Kunde 24 ringer. Printer i Hellerup virker ikke. Skal fixes hurtigt.",
"priority": "high",
"customer_hints": ["24", "Hellerup"],
"technician_hints": [],
"tags": ["printer", "hellerup", "hardware"],
"hardware_keywords": ["printer"],
"confidence": 0.85
}
Input: "Ny medarbejder starter 1. marts hos Nets. Skal have laptop og adgang til systemer."
Output:
{
"suggested_title": "Ny medarbejder hos Nets starter 1. marts",
"suggested_description": "Ny medarbejder starter 1. marts hos Nets. Skal have laptop og adgang til systemer.",
"priority": "normal",
"customer_hints": ["Nets"],
"technician_hints": [],
"tags": ["onboarding", "ny medarbejder", "laptop"],
"hardware_keywords": ["laptop"],
"confidence": 0.90
}
Input: "Lenovo T14 serial ABC123 kører langsomt. Når I får tid."
Output:
{
"suggested_title": "Lenovo T14 kører langsomt",
"suggested_description": "Lenovo T14 serial ABC123 kører langsomt. Når I får tid.",
"priority": "low",
"customer_hints": [],
"technician_hints": [],
"tags": ["performance", "lenovo"],
"hardware_keywords": ["Lenovo T14", "ABC123"],
"confidence": 0.75
}
Input: "Norva24 ringer - server er nede! Send Christian ud ASAP."
Output:
{
"suggested_title": "Norva24 server er nede",
"suggested_description": "Norva24 ringer - server er nede! Send Christian ud ASAP.",
"priority": "urgent",
"customer_hints": ["Norva24"],
"technician_hints": ["Christian"],
"tags": ["server", "onsite", "kritisk"],
"hardware_keywords": ["server"],
"confidence": 0.90
}
Husk: Returner KUN JSON, ingen forklaring eller tekst udenfor JSON-objektet."""
def _build_text_context(self, text: str) -> str:
"""Build context for AI analysis"""
# Truncate if too long (avoid token limits)
if len(text) > 2000:
text = text[:2000] + "..."
return f"""Analyser denne sags-tekst og udtrække struktureret information:
{text}
Returner JSON med suggested_title, suggested_description, priority, customer_hints, technician_hints, tags, hardware_keywords, og confidence."""
async def _call_ollama(self, system_prompt: str, user_message: str) -> Optional[Dict]:
"""Call Ollama API for analysis"""
url = f"{self.ollama_endpoint}/api/chat"
payload = {
"model": self.ollama_model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
],
"stream": False,
"options": {
"temperature": 0.2, # Low for consistent extraction
"num_predict": 800 # Enough for detailed response
}
}
try:
start_time = datetime.now()
async with aiohttp.ClientSession() as session:
timeout = aiohttp.ClientTimeout(total=self.ai_timeout)
async with session.post(url, json=payload, timeout=timeout) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"❌ Ollama API error: {response.status} - {error_text}")
return None
data = await response.json()
message_data = data.get('message', {})
content = message_data.get('content', '') or message_data.get('thinking', '')
processing_time = (datetime.now() - start_time).total_seconds() * 1000
if not content:
logger.error(f"❌ Ollama returned empty response")
return None
# Parse JSON response
logger.info(f"🔍 Raw Ollama response: {content[:500]}")
result = self._parse_ollama_response(content)
if result:
result['processing_time_ms'] = int(processing_time)
logger.info(f"✅ AI analysis: priority={result.get('priority', 'unknown')} (confidence: {result.get('confidence', 0):.2f}, {processing_time:.0f}ms)")
logger.info(f"🔍 Parsed result: {json.dumps(result, ensure_ascii=False)[:500]}")
return result
else:
logger.warning(f"⚠️ Failed to parse Ollama response: {content[:200]}")
return None
except aiohttp.ClientError as e:
logger.warning(f"⚠️ Ollama connection error: {e}")
return None
except Exception as e:
logger.error(f"❌ Unexpected error calling Ollama: {e}", exc_info=True)
return None
def _parse_ollama_response(self, content: str) -> Optional[Dict]:
"""Parse Ollama JSON response"""
try:
# Try to extract JSON if wrapped in markdown
if '```json' in content:
json_match = re.search(r'```json\s*(\{.*?\})\s*```', content, re.DOTALL)
if json_match:
content = json_match.group(1)
elif '```' in content:
json_match = re.search(r'```\s*(\{.*?\})\s*```', content, re.DOTALL)
if json_match:
content = json_match.group(1)
# Try to find JSON object
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
content = json_match.group(0)
result = json.loads(content)
return result
except json.JSONDecodeError as e:
logger.error(f"❌ JSON parse error: {e} - Content: {content[:500]}")
return None
async def _enrich_with_db_matches(self, ai_result: Dict, user_id: int) -> QuickCreateAnalysis:
"""Enrich AI results with database matches"""
# Extract base fields
title = ai_result.get('suggested_title', '')
description = ai_result.get('suggested_description', '')
priority_str = ai_result.get('priority', 'normal')
confidence = float(ai_result.get('confidence', 0.0))
# Map priority to enum
try:
priority = SagPriority(priority_str.lower())
except ValueError:
priority = SagPriority.NORMAL
# Match customer
customer_hints = ai_result.get('customer_hints', [])
customer_id, customer_name = await self._match_customer(customer_hints)
# Match technician
technician_hints = ai_result.get('technician_hints', [])
technician_id, technician_name = await self._match_technician(technician_hints)
# Match group (can be enhanced with rules later)
group_id, group_name = None, None
# Extract tags
tags = ai_result.get('tags', [])
# Match hardware
hardware_keywords = ai_result.get('hardware_keywords', [])
hardware_refs = await self._match_hardware(hardware_keywords)
# Build AI reasoning for low confidence
ai_reasoning = None
if confidence < self.confidence_threshold:
ai_reasoning = f"Lav confidence: {confidence:.2f}. Tjek alle felter grundigt."
return QuickCreateAnalysis(
suggested_title=title,
suggested_description=description,
suggested_priority=priority,
suggested_customer_id=customer_id,
suggested_customer_name=customer_name,
suggested_technician_id=technician_id,
suggested_technician_name=technician_name,
suggested_group_id=group_id,
suggested_group_name=group_name,
suggested_tags=tags,
hardware_references=hardware_refs,
confidence=confidence,
ai_reasoning=ai_reasoning
)
async def _match_customer(self, hints: List[str]) -> tuple[Optional[int], Optional[str]]:
"""Match customer from hints"""
if not hints:
return None, None
for hint in hints:
hint_clean = str(hint).strip()
if not hint_clean:
continue
# Try exact ID match first
if hint_clean.isdigit():
row = execute_query_single(
"SELECT id, name FROM customers WHERE id = %s AND deleted_at IS NULL AND is_active = true",
(int(hint_clean),)
)
if row:
return row['id'], row['name']
# Try name or CVR match with intelligent sorting
# Priority: 1) Exact match, 2) Starts with hint, 3) Contains hint
row = execute_query_single(
"""SELECT id, name FROM customers
WHERE (name ILIKE %s OR cvr_number ILIKE %s)
AND deleted_at IS NULL AND is_active = true
ORDER BY
CASE WHEN LOWER(name) = LOWER(%s) THEN 1
WHEN LOWER(name) LIKE LOWER(%s) THEN 2
ELSE 3 END,
name ASC
LIMIT 1""",
(f"%{hint_clean}%", f"%{hint_clean}%", hint_clean, f"{hint_clean}%")
)
if row:
logger.info(f"✅ Matched customer: {row['name']} (id={row['id']}) from hint '{hint_clean}'")
return row['id'], row['name']
return None, None
async def _match_technician(self, hints: List[str]) -> tuple[Optional[int], Optional[str]]:
"""Match technician from hints (BMC Hub employees only)"""
if not hints:
return None, None
for hint in hints:
hint_clean = str(hint).strip()
if not hint_clean:
continue
# Match by full_name or username with intelligent sorting
# Priority: 1) Exact match, 2) Starts with hint, 3) Contains hint
row = execute_query_single(
"""SELECT user_id as id, full_name, username FROM users
WHERE (full_name ILIKE %s OR username ILIKE %s)
AND is_active = true
ORDER BY
CASE WHEN LOWER(full_name) = LOWER(%s) THEN 1
WHEN LOWER(full_name) LIKE LOWER(%s) THEN 2
WHEN LOWER(username) = LOWER(%s) THEN 1
WHEN LOWER(username) LIKE LOWER(%s) THEN 2
ELSE 3 END,
full_name ASC
LIMIT 1""",
(f"%{hint_clean}%", f"%{hint_clean}%", hint_clean, f"{hint_clean}%", hint_clean, f"{hint_clean}%")
)
if row:
logger.info(f"✅ Matched technician: {row['full_name']} (user_id={row['id']}) from hint '{hint_clean}'")
return row['id'], row['full_name'] or row['username']
else:
logger.warning(f"⚠️ No BMC Hub employee found matching '{hint_clean}' - may be external contact")
return None, None
async def _match_hardware(self, keywords: List[str]) -> List[Dict]:
"""Match hardware from keywords"""
hardware_refs = []
for keyword in keywords:
keyword_clean = str(keyword).strip()
if not keyword_clean or len(keyword_clean) < 3:
continue
# Search hardware assets
rows = execute_query(
"""SELECT id, brand, model, serial_number
FROM hardware_assets
WHERE (brand ILIKE %s OR model ILIKE %s OR serial_number ILIKE %s)
AND deleted_at IS NULL
LIMIT 3""",
(f"%{keyword_clean}%", f"%{keyword_clean}%", f"%{keyword_clean}%")
)
for row in rows:
hardware_refs.append({
'id': row['id'],
'brand': row['brand'],
'model': row['model'],
'serial_number': row['serial_number']
})
return hardware_refs
async def _apply_business_rules(self, analysis: QuickCreateAnalysis, user_id: int) -> QuickCreateAnalysis:
"""Apply business rules to enhance analysis (Phase 2 feature)"""
# Rule 1: Check customer history for technician preference
if analysis.suggested_customer_id and not analysis.suggested_technician_id:
# Find most frequent technician for this customer
row = execute_query_single(
"""SELECT ansvarlig_bruger_id, COUNT(*) as cnt
FROM sag_sager
WHERE customer_id = %s AND ansvarlig_bruger_id IS NOT NULL
GROUP BY ansvarlig_bruger_id
ORDER BY cnt DESC LIMIT 1""",
(analysis.suggested_customer_id,)
)
if row:
user_row = execute_query_single(
"SELECT user_id as id, full_name FROM users WHERE user_id = %s AND is_active = true",
(row['ansvarlig_bruger_id'],)
)
if user_row:
analysis.suggested_technician_id = user_row['id']
analysis.suggested_technician_name = user_row['full_name']
logger.info(f"✨ Business rule: Assigned technician {user_row['full_name']} based on history")
# Rule 2: Check for VIP customers (future: alert_notes integration)
# TODO: Add alert_notes check for popup rules
# Rule 3: Onboarding detection
onboarding_keywords = ['onboarding', 'ny medarbejder', 'nyt bruger', 'starter den']
if any(keyword in analysis.suggested_description.lower() for keyword in onboarding_keywords):
if 'onboarding' not in analysis.suggested_tags:
analysis.suggested_tags.append('onboarding')
logger.info(f"✨ Business rule: Detected onboarding case")
return analysis
def _empty_analysis(self, text: str) -> QuickCreateAnalysis:
"""Return empty analysis with original text"""
return QuickCreateAnalysis(
suggested_title="",
suggested_description=text,
suggested_priority=SagPriority.NORMAL,
confidence=0.0,
ai_reasoning="AI unavailable - fill fields manually"
)
async def _heuristic_fallback_analysis(self, text: str) -> QuickCreateAnalysis:
"""Local fallback when AI service is unavailable."""
cleaned_text = (text or "").strip()
if not cleaned_text:
return self._empty_analysis(text)
lowered = cleaned_text.lower()
# Priority heuristics based on urgency wording.
urgent_terms = ["nede", "kritisk", "asap", "omgående", "straks", "akut", "haster"]
high_terms = ["hurtigt", "vigtigt", "snarest", "prioriter"]
low_terms = ["når i får tid", "ikke hastende", "lavprioriteret"]
if any(term in lowered for term in urgent_terms):
priority = SagPriority.URGENT
elif any(term in lowered for term in high_terms):
priority = SagPriority.HIGH
elif any(term in lowered for term in low_terms):
priority = SagPriority.LOW
else:
priority = SagPriority.NORMAL
# Basic title heuristic: first non-empty line/sentence, clipped to 80 chars.
first_line = cleaned_text.splitlines()[0].strip()
first_sentence = re.split(r"[.!?]", first_line)[0].strip()
title_source = first_sentence or first_line or cleaned_text
title = title_source[:80].strip()
if not title:
title = "Ny sag"
# Lightweight keyword tags.
keyword_tags = {
"printer": "printer",
"mail": "mail",
"email": "mail",
"vpn": "vpn",
"net": "netværk",
"wifi": "wifi",
"server": "server",
"laptop": "laptop",
"adgang": "adgang",
"onboarding": "onboarding",
}
suggested_tags: List[str] = []
for key, tag in keyword_tags.items():
if key in lowered and tag not in suggested_tags:
suggested_tags.append(tag)
# Try simple customer matching from long words in text.
candidate_hints = []
for token in re.findall(r"[A-Za-z0-9ÆØÅæøå._-]{3,}", cleaned_text):
if token.lower() in {"ring", "kunde", "sag", "skal", "have", "virker", "ikke"}:
continue
candidate_hints.append(token)
customer_id, customer_name = await self._match_customer(candidate_hints[:8])
return QuickCreateAnalysis(
suggested_title=title,
suggested_description=cleaned_text,
suggested_priority=priority,
suggested_customer_id=customer_id,
suggested_customer_name=customer_name,
suggested_tags=suggested_tags,
confidence=0.35,
ai_reasoning="AI service unavailable - using local fallback suggestions"
)
def _get_cached_analysis(self, text: str) -> Optional[QuickCreateAnalysis]:
"""Get cached analysis if available and not expired"""
text_hash = hashlib.md5(text.encode()).hexdigest()
if text_hash in self._cache:
result, timestamp = self._cache[text_hash]
if (datetime.now() - timestamp).total_seconds() < self._cache_ttl:
return result
else:
# Expired
del self._cache[text_hash]
return None
def _cache_analysis(self, text: str, analysis: QuickCreateAnalysis):
"""Cache analysis result"""
text_hash = hashlib.md5(text.encode()).hexdigest()
self._cache[text_hash] = (analysis, datetime.now())
# Clean old cache entries (simple LRU)
if len(self._cache) > 100:
# Remove oldest 20 entries
sorted_cache = sorted(self._cache.items(), key=lambda x: x[1][1])
for key, _ in sorted_cache[:20]:
del self._cache[key]