bmc_hub/app/services/template_service.py
Christian 3a8288f5a1 feat: Implement quick analysis on PDF upload for CVR, document type, and number extraction
- Added `check_invoice_number_exists` method in `EconomicService` to verify invoice numbers in e-conomic journals.
- Introduced `quick_analysis_on_upload` method in `OllamaService` for extracting critical fields from uploaded PDFs, including CVR, document type, and document number.
- Created migration script to add new fields for storing detected CVR, vendor ID, document type, and document number in the `incoming_files` table.
- Developed comprehensive tests for the quick analysis functionality, validating CVR detection, document type identification, and invoice number extraction.
2025-12-09 14:54:33 +01:00

401 lines
16 KiB
Python

"""
Supplier Invoice Template Service
Hybrid approach: invoice2data templates + custom regex templates
Inspired by OmniSync's invoice template system
"""
import re
import logging
from typing import Dict, List, Optional, Tuple
from datetime import datetime
from pathlib import Path
from app.core.database import execute_query, execute_insert, execute_update
from app.services.invoice2data_service import get_invoice2data_service
logger = logging.getLogger(__name__)
class TemplateService:
"""Service for template-based invoice extraction"""
def __init__(self):
self.templates_cache = {}
self._initialized = False
self.invoice2data = None
def _ensure_loaded(self):
"""Lazy load templates on first use"""
if not self._initialized:
logger.info("🔄 Lazy loading templates...")
self._load_templates()
# Also load invoice2data templates
try:
self.invoice2data = get_invoice2data_service()
logger.info(f"✅ Invoice2Data service initialized")
except Exception as e:
logger.warning(f"⚠️ Failed to load invoice2data: {e}")
self._initialized = True
def _load_templates(self):
"""Load all active templates into cache"""
try:
templates = execute_query(
"""SELECT t.*, v.name as vendor_name, v.cvr_number as vendor_cvr
FROM supplier_invoice_templates t
LEFT JOIN vendors v ON t.vendor_id = v.id
WHERE t.is_active = TRUE"""
)
if templates:
for template in templates:
self.templates_cache[template['template_id']] = template
logger.info(f"📚 Loaded {len(self.templates_cache)} active templates")
else:
logger.warning("⚠️ No templates found")
except Exception as e:
logger.error(f"❌ Failed to load templates: {e}")
def match_template(self, pdf_text: str) -> Tuple[Optional[int], float]:
"""
Find best matching template for PDF text
First tries invoice2data templates, then falls back to custom templates
Returns: (template_id, confidence_score)
"""
self._ensure_loaded() # Lazy load templates
# Try invoice2data templates first
if self.invoice2data:
try:
template_name = self.invoice2data.match_template(pdf_text)
if template_name:
logger.info(f"✅ Matched invoice2data template: {template_name}")
# Return special ID to indicate invoice2data template
return (-1, 1.0) # -1 = invoice2data, 100% confidence
except Exception as e:
logger.warning(f"⚠️ Invoice2data matching failed: {e}")
# Fallback to custom templates
logger.info(f"🔍 Matching against {len(self.templates_cache)} custom templates")
best_match = None
best_score = 0.0
pdf_text_lower = pdf_text.lower()
for template_id, template in self.templates_cache.items():
score = self._calculate_match_score(pdf_text_lower, template)
logger.debug(f" Template {template_id} ({template['template_name']}): {score:.2f}")
if score > best_score:
best_score = score
best_match = template_id
if best_match:
logger.info(f"✅ Matched template {best_match} ({self.templates_cache[best_match]['template_name']}) with {best_score:.0%} confidence")
else:
logger.info(f"⚠️ No template matched (best score: {best_score:.2f})")
return best_match, best_score
def _calculate_match_score(self, pdf_text: str, template: Dict) -> float:
"""Calculate match score based on detection patterns"""
score = 0.0
patterns = template.get('detection_patterns', [])
if not patterns:
return 0.0
for pattern_obj in patterns:
pattern_type = pattern_obj.get('type')
weight = pattern_obj.get('weight', 0.5)
if pattern_type == 'text':
# Simple text search
pattern = pattern_obj.get('pattern', '').lower()
if pattern in pdf_text:
score += weight
elif pattern_type == 'cvr':
# CVR number match (exact)
cvr = str(pattern_obj.get('value', ''))
if cvr in pdf_text:
score += weight # CVR match is strong signal
elif pattern_type == 'regex':
# Regex pattern match
pattern = pattern_obj.get('pattern', '')
if re.search(pattern, pdf_text, re.IGNORECASE):
score += weight
return min(score, 1.0) # Cap at 100%
def extract_fields(self, pdf_text: str, template_id: int) -> Dict:
"""Extract invoice fields using template's regex patterns"""
self._ensure_loaded() # Lazy load templates
# Check if this is an invoice2data template
if template_id == -1:
if self.invoice2data:
try:
result = self.invoice2data.extract(pdf_text)
if result:
logger.info(f"✅ Extracted fields using invoice2data")
return result
except Exception as e:
logger.error(f"❌ Invoice2data extraction failed: {e}")
return {}
# Use custom template
template = self.templates_cache.get(template_id)
if not template:
logger.warning(f"⚠️ Template {template_id} not found in cache")
return {}
field_mappings = template.get('field_mappings', {})
extracted = {}
for field_name, field_config in field_mappings.items():
pattern = field_config.get('pattern')
group = field_config.get('group', 1)
if not pattern:
continue
try:
# Special handling for CVR to avoid extracting own CVR
if field_name == 'vendor_cvr':
from app.core.config import settings
own_cvr = getattr(settings, 'OWN_CVR', '29522790')
# Find ALL CVR matches
all_matches = list(re.finditer(pattern, pdf_text, re.IGNORECASE | re.MULTILINE))
found_cvrs = []
for match in all_matches:
if len(match.groups()) >= group:
cvr = match.group(group).strip()
found_cvrs.append(cvr)
# Filter out own CVR
vendor_cvrs = [cvr for cvr in found_cvrs if cvr != own_cvr]
if vendor_cvrs:
# Use first non-own CVR as vendor CVR
extracted[field_name] = vendor_cvrs[0]
logger.debug(f"{field_name}: {vendor_cvrs[0]} (filtered out own CVR: {own_cvr})")
else:
logger.warning(f" ⚠️ Only found own CVR ({own_cvr}), no vendor CVR found")
else:
# Normal extraction for other fields
match = re.search(pattern, pdf_text, re.IGNORECASE | re.MULTILINE)
if match and len(match.groups()) >= group:
value = match.group(group).strip()
extracted[field_name] = value
logger.debug(f"{field_name}: {value}")
except Exception as e:
logger.warning(f" ✗ Failed to extract {field_name}: {e}")
return extracted
def extract_line_items(self, pdf_text: str, template_id: int) -> List[Dict]:
"""Extract invoice line items using template's line patterns"""
self._ensure_loaded() # Lazy load templates
template = self.templates_cache.get(template_id)
if not template:
logger.warning(f"⚠️ Template {template_id} not found in cache")
return []
field_mappings = template.get('field_mappings', {})
# Get line extraction config
lines_start = field_mappings.get('lines_start', {}).get('pattern')
lines_end = field_mappings.get('lines_end', {}).get('pattern')
line_pattern = field_mappings.get('line_item', {}).get('pattern')
line_fields = field_mappings.get('line_item', {}).get('fields', [])
if not line_pattern:
logger.debug("No line_item pattern configured")
return []
# Extract section between start and end markers
text_section = pdf_text
if lines_start:
try:
start_match = re.search(lines_start, pdf_text, re.IGNORECASE)
if start_match:
text_section = pdf_text[start_match.end():]
logger.debug(f"Found lines_start, section starts at position {start_match.end()}")
except Exception as e:
logger.warning(f"Failed to find lines_start: {e}")
if lines_end:
try:
end_match = re.search(lines_end, text_section, re.IGNORECASE)
if end_match:
text_section = text_section[:end_match.start()]
logger.debug(f"Found lines_end, section ends at position {end_match.start()}")
except Exception as e:
logger.warning(f"Failed to find lines_end: {e}")
# Try multiple extraction strategies
lines = self._extract_with_pattern(text_section, line_pattern, line_fields)
if not lines:
# Fallback: Try smart extraction for common formats
lines = self._smart_line_extraction(text_section, line_fields)
logger.info(f"📦 Extracted {len(lines)} line items")
return lines
def _extract_with_pattern(self, text: str, pattern: str, field_names: List[str]) -> List[Dict]:
"""Extract lines using regex pattern"""
lines = []
try:
for match in re.finditer(pattern, text, re.MULTILINE):
line_data = {
'line_number': len(lines) + 1,
'raw_text': match.group(0)
}
# Map captured groups to field names
for idx, field_name in enumerate(field_names, start=1):
if idx <= len(match.groups()):
line_data[field_name] = match.group(idx).strip()
lines.append(line_data)
except Exception as e:
logger.error(f"❌ Pattern extraction failed: {e}")
return lines
def _smart_line_extraction(self, text: str, field_names: List[str]) -> List[Dict]:
"""
Multi-line extraction for ALSO-style invoices.
Format:
100 48023976 REFURB LENOVO ThinkPad P15 G1 Grde A
...metadata lines...
1ST 3.708,27 3.708,27
Combines data from description line + price line.
"""
lines_arr = text.split('\n')
items = []
i = 0
while i < len(lines_arr):
line = lines_arr[i].strip()
# Find position + varenr + beskrivelse linje
# Match: "100 48023976 REFURB LENOVO ThinkPad P15 G1 Grde A"
item_match = re.match(r'^(\d{1,3})\s+(\d{6,})\s+(.+)', line)
if item_match:
position = item_match.group(1)
item_number = item_match.group(2)
description = item_match.group(3).strip()
# Skip hvis det er en header
if re.search(r'(Position|Varenr|Beskrivelse|Antal|Pris|Total)', line, re.IGNORECASE):
i += 1
continue
# Find næste linje med antal+priser (inden for 10 linjer)
quantity = None
unit_price = None
total_price = None
vat_note = None # For "Omvendt betalingspligt" etc.
for j in range(i+1, min(i+10, len(lines_arr))):
price_line = lines_arr[j].strip()
# Match: "1ST 3.708,27 3.708,27"
price_match = re.match(r'^(\d+)\s*(?:ST|stk|pc|pcs)\s+([\d.,]+)\s+([\d.,]+)', price_line, re.IGNORECASE)
if price_match:
quantity = price_match.group(1)
unit_price = price_match.group(2).replace(',', '.')
total_price = price_match.group(3).replace(',', '.')
# Check next 3 lines for VAT markers
for k in range(j+1, min(j+4, len(lines_arr))):
vat_line = lines_arr[k].strip().lower()
if 'omvendt' in vat_line and 'betalingspligt' in vat_line:
vat_note = "reverse_charge"
logger.debug(f"⚠️ Found reverse charge marker for item {item_number}")
elif 'copydan' in vat_line:
vat_note = "copydan_included"
break
# Kun tilføj hvis vi fandt priser
if quantity and unit_price:
item_data = {
'line_number': len(items) + 1,
'position': position,
'item_number': item_number,
'description': description,
'quantity': quantity,
'unit_price': unit_price,
'total_price': total_price,
'raw_text': f"{line} ... {quantity}ST {unit_price} {total_price}"
}
# Add VAT note if found
if vat_note:
item_data['vat_note'] = vat_note
items.append(item_data)
logger.info(f"✅ Multi-line item: {item_number} - {description[:30]}... ({quantity}ST @ {unit_price}){' [REVERSE CHARGE]' if vat_note == 'reverse_charge' else ''}")
i += 1
if items:
logger.info(f"📦 Multi-line extraction found {len(items)} items")
else:
logger.warning("⚠️ Multi-line extraction found no items")
return items
def log_usage(self, template_id: int, file_id: int, matched: bool,
confidence: float, fields: Dict):
"""Log template usage for statistics"""
import json
try:
execute_insert(
"""INSERT INTO template_usage_log
(template_id, file_id, matched, confidence, fields_extracted)
VALUES (%s, %s, %s, %s, %s)""",
(template_id, file_id, matched, confidence, json.dumps(fields))
)
if matched:
# Update template stats
execute_update(
"""UPDATE supplier_invoice_templates
SET usage_count = usage_count + 1,
success_count = success_count + 1,
last_used_at = CURRENT_TIMESTAMP
WHERE template_id = %s""",
(template_id,)
)
except Exception as e:
logger.error(f"❌ Failed to log template usage: {e}")
def get_vendor_templates(self, vendor_id: int) -> List[Dict]:
"""Get all templates for a vendor"""
return execute_query(
"""SELECT * FROM supplier_invoice_templates
WHERE vendor_id = %s AND is_active = TRUE
ORDER BY usage_count DESC""",
(vendor_id,),
fetchall=True
)
def reload_templates(self):
"""Reload templates from database"""
self.templates_cache = {}
self._initialized = False
self._ensure_loaded()
# Global instance
template_service = TemplateService()