""" Supplier Invoice Template Service Simple template-based invoice field extraction (no AI) Inspired by OmniSync's invoice template system """ import re import logging from typing import Dict, List, Optional, Tuple from datetime import datetime from pathlib import Path from app.core.database import execute_query, execute_insert, execute_update logger = logging.getLogger(__name__) class TemplateService: """Service for template-based invoice extraction""" def __init__(self): self.templates_cache = {} self._initialized = False def _ensure_loaded(self): """Lazy load templates on first use""" if not self._initialized: logger.info("🔄 Lazy loading templates...") self._load_templates() self._initialized = True def _load_templates(self): """Load all active templates into cache""" try: templates = execute_query( """SELECT t.*, v.name as vendor_name, v.cvr_number as vendor_cvr FROM supplier_invoice_templates t LEFT JOIN vendors v ON t.vendor_id = v.id WHERE t.is_active = TRUE""" ) if templates: for template in templates: self.templates_cache[template['template_id']] = template logger.info(f"📚 Loaded {len(self.templates_cache)} active templates") else: logger.warning("⚠️ No templates found") except Exception as e: logger.error(f"❌ Failed to load templates: {e}") def match_template(self, pdf_text: str) -> Tuple[Optional[int], float]: """ Find best matching template for PDF text Returns: (template_id, confidence_score) """ self._ensure_loaded() # Lazy load templates logger.info(f"🔍 Matching against {len(self.templates_cache)} templates") best_match = None best_score = 0.0 pdf_text_lower = pdf_text.lower() for template_id, template in self.templates_cache.items(): score = self._calculate_match_score(pdf_text_lower, template) logger.debug(f" Template {template_id} ({template['template_name']}): {score:.2f}") if score > best_score: best_score = score best_match = template_id if best_match: logger.info(f"✅ Matched template {best_match} ({self.templates_cache[best_match]['template_name']}) with {best_score:.0%} confidence") else: logger.info(f"⚠️ No template matched (best score: {best_score:.2f})") return best_match, best_score def _calculate_match_score(self, pdf_text: str, template: Dict) -> float: """Calculate match score based on detection patterns""" score = 0.0 patterns = template.get('detection_patterns', []) if not patterns: return 0.0 for pattern_obj in patterns: pattern_type = pattern_obj.get('type') weight = pattern_obj.get('weight', 0.5) if pattern_type == 'text': # Simple text search pattern = pattern_obj.get('pattern', '').lower() if pattern in pdf_text: score += weight elif pattern_type == 'cvr': # CVR number match (exact) cvr = str(pattern_obj.get('value', '')) if cvr in pdf_text: score += weight # CVR match is strong signal elif pattern_type == 'regex': # Regex pattern match pattern = pattern_obj.get('pattern', '') if re.search(pattern, pdf_text, re.IGNORECASE): score += weight return min(score, 1.0) # Cap at 100% def extract_fields(self, pdf_text: str, template_id: int) -> Dict: """Extract invoice fields using template's regex patterns""" self._ensure_loaded() # Lazy load templates template = self.templates_cache.get(template_id) if not template: logger.warning(f"⚠️ Template {template_id} not found in cache") return {} field_mappings = template.get('field_mappings', {}) extracted = {} for field_name, field_config in field_mappings.items(): pattern = field_config.get('pattern') group = field_config.get('group', 1) if not pattern: continue try: match = re.search(pattern, pdf_text, re.IGNORECASE | re.MULTILINE) if match and len(match.groups()) >= group: value = match.group(group).strip() extracted[field_name] = value logger.debug(f" ✓ {field_name}: {value}") except Exception as e: logger.warning(f" ✗ Failed to extract {field_name}: {e}") return extracted def extract_line_items(self, pdf_text: str, template_id: int) -> List[Dict]: """Extract invoice line items using template's line patterns""" self._ensure_loaded() # Lazy load templates template = self.templates_cache.get(template_id) if not template: logger.warning(f"⚠️ Template {template_id} not found in cache") return [] field_mappings = template.get('field_mappings', {}) # Get line extraction config lines_start = field_mappings.get('lines_start', {}).get('pattern') lines_end = field_mappings.get('lines_end', {}).get('pattern') line_pattern = field_mappings.get('line_item', {}).get('pattern') line_fields = field_mappings.get('line_item', {}).get('fields', []) if not line_pattern: logger.debug("No line_item pattern configured") return [] # Extract section between start and end markers text_section = pdf_text if lines_start: try: start_match = re.search(lines_start, pdf_text, re.IGNORECASE) if start_match: text_section = pdf_text[start_match.end():] logger.debug(f"Found lines_start, section starts at position {start_match.end()}") except Exception as e: logger.warning(f"Failed to find lines_start: {e}") if lines_end: try: end_match = re.search(lines_end, text_section, re.IGNORECASE) if end_match: text_section = text_section[:end_match.start()] logger.debug(f"Found lines_end, section ends at position {end_match.start()}") except Exception as e: logger.warning(f"Failed to find lines_end: {e}") # Try multiple extraction strategies lines = self._extract_with_pattern(text_section, line_pattern, line_fields) if not lines: # Fallback: Try smart extraction for common formats lines = self._smart_line_extraction(text_section, line_fields) logger.info(f"📦 Extracted {len(lines)} line items") return lines def _extract_with_pattern(self, text: str, pattern: str, field_names: List[str]) -> List[Dict]: """Extract lines using regex pattern""" lines = [] try: for match in re.finditer(pattern, text, re.MULTILINE): line_data = { 'line_number': len(lines) + 1, 'raw_text': match.group(0) } # Map captured groups to field names for idx, field_name in enumerate(field_names, start=1): if idx <= len(match.groups()): line_data[field_name] = match.group(idx).strip() lines.append(line_data) except Exception as e: logger.error(f"❌ Pattern extraction failed: {e}") return lines def _smart_line_extraction(self, text: str, field_names: List[str]) -> List[Dict]: """ Multi-line extraction for ALSO-style invoices. Format: 100 48023976 REFURB LENOVO ThinkPad P15 G1 Grde A ...metadata lines... 1ST 3.708,27 3.708,27 Combines data from description line + price line. """ lines_arr = text.split('\n') items = [] i = 0 while i < len(lines_arr): line = lines_arr[i].strip() # Find position + varenr + beskrivelse linje # Match: "100 48023976 REFURB LENOVO ThinkPad P15 G1 Grde A" item_match = re.match(r'^(\d{1,3})\s+(\d{6,})\s+(.+)', line) if item_match: position = item_match.group(1) item_number = item_match.group(2) description = item_match.group(3).strip() # Skip hvis det er en header if re.search(r'(Position|Varenr|Beskrivelse|Antal|Pris|Total)', line, re.IGNORECASE): i += 1 continue # Find næste linje med antal+priser (inden for 10 linjer) quantity = None unit_price = None total_price = None vat_note = None # For "Omvendt betalingspligt" etc. for j in range(i+1, min(i+10, len(lines_arr))): price_line = lines_arr[j].strip() # Match: "1ST 3.708,27 3.708,27" price_match = re.match(r'^(\d+)\s*(?:ST|stk|pc|pcs)\s+([\d.,]+)\s+([\d.,]+)', price_line, re.IGNORECASE) if price_match: quantity = price_match.group(1) unit_price = price_match.group(2).replace(',', '.') total_price = price_match.group(3).replace(',', '.') # Check next 3 lines for VAT markers for k in range(j+1, min(j+4, len(lines_arr))): vat_line = lines_arr[k].strip().lower() if 'omvendt' in vat_line and 'betalingspligt' in vat_line: vat_note = "reverse_charge" logger.debug(f"⚠️ Found reverse charge marker for item {item_number}") elif 'copydan' in vat_line: vat_note = "copydan_included" break # Kun tilføj hvis vi fandt priser if quantity and unit_price: item_data = { 'line_number': len(items) + 1, 'position': position, 'item_number': item_number, 'description': description, 'quantity': quantity, 'unit_price': unit_price, 'total_price': total_price, 'raw_text': f"{line} ... {quantity}ST {unit_price} {total_price}" } # Add VAT note if found if vat_note: item_data['vat_note'] = vat_note items.append(item_data) logger.info(f"✅ Multi-line item: {item_number} - {description[:30]}... ({quantity}ST @ {unit_price}){' [REVERSE CHARGE]' if vat_note == 'reverse_charge' else ''}") i += 1 if items: logger.info(f"📦 Multi-line extraction found {len(items)} items") else: logger.warning("⚠️ Multi-line extraction found no items") return items def log_usage(self, template_id: int, file_id: int, matched: bool, confidence: float, fields: Dict): """Log template usage for statistics""" import json try: execute_insert( """INSERT INTO template_usage_log (template_id, file_id, matched, confidence, fields_extracted) VALUES (%s, %s, %s, %s, %s)""", (template_id, file_id, matched, confidence, json.dumps(fields)) ) if matched: # Update template stats execute_update( """UPDATE supplier_invoice_templates SET usage_count = usage_count + 1, success_count = success_count + 1, last_used_at = CURRENT_TIMESTAMP WHERE template_id = %s""", (template_id,) ) except Exception as e: logger.error(f"❌ Failed to log template usage: {e}") def get_vendor_templates(self, vendor_id: int) -> List[Dict]: """Get all templates for a vendor""" return execute_query( """SELECT * FROM supplier_invoice_templates WHERE vendor_id = %s AND is_active = TRUE ORDER BY usage_count DESC""", (vendor_id,), fetchall=True ) def reload_templates(self): """Reload templates from database""" self.templates_cache = {} self._initialized = False self._ensure_loaded() # Global instance template_service = TemplateService()