""" Invoice2Data Service Wrapper around invoice2data library for template-based invoice extraction """ import logging import re from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any import yaml logger = logging.getLogger(__name__) class Invoice2DataService: """Service for extracting invoice data using invoice2data templates""" def __init__(self): self.template_dir = Path(__file__).parent.parent.parent / "data" / "invoice_templates" self.templates = self._load_templates() logger.info(f"📋 Loaded {len(self.templates)} invoice2data templates") def _load_templates(self) -> Dict[str, Dict]: """Load all YAML templates from template directory""" templates = {} if not self.template_dir.exists(): logger.warning(f"Template directory not found: {self.template_dir}") return templates for template_file in self.template_dir.glob("*.yml"): try: with open(template_file, 'r', encoding='utf-8') as f: template_data = yaml.safe_load(f) template_name = template_file.stem templates[template_name] = template_data logger.debug(f" ✓ Loaded template: {template_name}") except Exception as e: logger.error(f" ✗ Failed to load template {template_file}: {e}") return templates def match_template(self, text: str) -> Optional[str]: """ Find matching template based on keywords Returns template name or None """ text_lower = text.lower() for template_name, template_data in self.templates.items(): keywords = template_data.get('keywords', []) # Check if all keywords are present matches = sum(1 for keyword in keywords if str(keyword).lower() in text_lower) if matches >= len(keywords) * 0.7: # 70% of keywords must match logger.info(f"✅ Matched template: {template_name} ({matches}/{len(keywords)} keywords)") return template_name logger.warning("⚠️ No template matched") return None def extract_with_template(self, text: str, template_name: str) -> Dict[str, Any]: """ Extract invoice data using specific template """ if template_name not in self.templates: raise ValueError(f"Template not found: {template_name}") template = self.templates[template_name] fields = template.get('fields', {}) options = template.get('options', {}) extracted = { 'template': template_name, 'issuer': template.get('issuer'), 'country': template.get('country'), 'currency': options.get('currency', 'DKK') } # Extract each field using its regex for field_name, field_config in fields.items(): if field_config.get('parser') != 'regex': continue pattern = field_config.get('regex') field_type = field_config.get('type', 'string') group = field_config.get('group', 1) try: match = re.search(pattern, text, re.IGNORECASE | re.MULTILINE) if match: value = match.group(group).strip() logger.debug(f" 🔍 Extracted raw value for {field_name}: '{value}' (type: {field_type})") # Handle CVR filtering (avoid customer CVR) if field_name == 'vendor_vat': # Find ALL CVR numbers all_cvr_matches = re.finditer(r'SE/CVR-nr\.\s+(\d{8})', text, re.IGNORECASE) cvr_numbers = [m.group(1) for m in all_cvr_matches] # Filter out BMC's CVR (29522790) vendor_cvrs = [cvr for cvr in cvr_numbers if cvr != '29522790'] if vendor_cvrs: value = vendor_cvrs[0] logger.debug(f" ✓ {field_name}: {value} (filtered from {cvr_numbers})") else: logger.warning(f" ⚠️ Only customer CVR found, no vendor CVR") continue # Convert type if field_type == 'float': # Handle Danish number format (1.234,56 → 1234.56) # OR (148,587.98 → 148587.98) - handle both formats decimal_sep = options.get('decimal_separator', ',') thousands_sep = options.get('thousands_separator', '.') # Remove all spaces first value = value.replace(' ', '') # If both separators are present, we can determine the format # Danish: 148.587,98 (thousands=., decimal=,) # English: 148,587.98 (thousands=, decimal=.) if thousands_sep in value and decimal_sep in value: # Remove thousands separator, then convert decimal separator to . value = value.replace(thousands_sep, '').replace(decimal_sep, '.') elif thousands_sep in value: # Only thousands separator present - just remove it value = value.replace(thousands_sep, '') elif decimal_sep in value and decimal_sep == ',': # Only decimal separator and it's Danish comma - convert to . value = value.replace(',', '.') value = float(value) elif field_type == 'int': value = int(value) elif field_type == 'date': # Try to parse Danish dates date_formats = options.get('date_formats', ['%B %d, %Y', '%d-%m-%Y']) # Danish month names value = value.replace('januar', 'January').replace('februar', 'February') value = value.replace('marts', 'March').replace('april', 'April') value = value.replace('maj', 'May').replace('juni', 'June') value = value.replace('juli', 'July').replace('august', 'August') value = value.replace('september', 'September').replace('oktober', 'October') value = value.replace('november', 'November').replace('december', 'December') for date_format in date_formats: try: parsed_date = datetime.strptime(value, date_format) value = parsed_date.strftime('%Y-%m-%d') break except ValueError: continue extracted[field_name] = value logger.debug(f" ✓ {field_name}: {value}") else: logger.debug(f" ✗ {field_name}: No match") except Exception as e: logger.warning(f" ✗ Failed to extract {field_name}: {e}") # Extract line items if defined in template lines_config = template.get('lines', []) if lines_config: extracted['lines'] = self._extract_lines(text, lines_config, options) return extracted def _extract_lines(self, text: str, lines_configs: List[Dict], options: Dict) -> List[Dict]: """Extract line items from invoice text""" all_lines = [] logger.debug(f"🔍 Extracting lines with {len(lines_configs)} configurations") for lines_config in lines_configs: start_pattern = lines_config.get('start') end_pattern = lines_config.get('end') line_config = lines_config.get('line', {}) if not start_pattern or not line_config: continue try: # Find section between start and end patterns if end_pattern: section_pattern = f"{start_pattern}(.*?){end_pattern}" section_match = re.search(section_pattern, text, re.DOTALL | re.IGNORECASE) else: section_pattern = f"{start_pattern}(.*?)$" section_match = re.search(section_pattern, text, re.DOTALL | re.IGNORECASE) if not section_match: logger.debug(f" ✗ Line section not found (start: {start_pattern[:50]}, end: {end_pattern[:50] if end_pattern else 'None'})") continue section_text = section_match.group(1) logger.debug(f" ✓ Found line section ({len(section_text)} chars)") # Extract individual lines line_pattern = line_config.get('regex') field_names = line_config.get('fields', []) field_types = line_config.get('types', {}) context_config = line_config.get('context_before', {}) if not line_pattern or not field_names: continue # Split section into lines for context processing section_lines = section_text.split('\n') line_matches = [] # Find all matching lines with their indices for line_idx, line_text in enumerate(section_lines): match = re.search(line_pattern, line_text, re.MULTILINE) if match: line_matches.append((line_idx, line_text, match)) logger.debug(f" ✓ Found {len(line_matches)} matching lines") for line_idx, line_text, match in line_matches: line_data = {} # Extract main line fields for idx, field_name in enumerate(field_names, start=1): try: value = match.group(idx).strip() field_type = field_types.get(field_name, 'string') # Convert type if field_type == 'float': thousands_sep = options.get('thousands_separator', ',') decimal_sep = options.get('decimal_separator', '.') value = value.replace(' ', '') if thousands_sep in value and decimal_sep in value: value = value.replace(thousands_sep, '').replace(decimal_sep, '.') elif thousands_sep in value: value = value.replace(thousands_sep, '') elif decimal_sep in value and decimal_sep == ',': value = value.replace(',', '.') value = float(value) elif field_type == 'int': value = int(value) line_data[field_name] = value except Exception as e: logger.debug(f" ✗ Failed to extract line field {field_name}: {e}") # Extract context_before if configured if context_config and line_idx > 0: max_lines = context_config.get('max_lines', 5) patterns = context_config.get('patterns', []) # Look at lines BEFORE this line start_idx = max(0, line_idx - max_lines) context_lines = section_lines[start_idx:line_idx] for pattern_config in patterns: pattern_regex = pattern_config.get('regex') pattern_fields = pattern_config.get('fields', []) if not pattern_regex or not pattern_fields: continue # Try to match against context lines (most recent first) for ctx_line in reversed(context_lines): ctx_match = re.search(pattern_regex, ctx_line) if ctx_match: # Extract fields from context for ctx_idx, ctx_field_name in enumerate(pattern_fields, start=1): try: ctx_value = ctx_match.group(ctx_idx).strip() line_data[ctx_field_name] = ctx_value except Exception as e: logger.debug(f" ✗ Failed to extract context field {ctx_field_name}: {e}") break # Stop after first match for this pattern if line_data: all_lines.append(line_data) logger.info(f" ✓ Extracted {len(all_lines)} line items") except Exception as e: logger.warning(f" ✗ Failed to extract lines: {e}") return all_lines def extract(self, text: str, template_name: Optional[str] = None) -> Optional[Dict[str, Any]]: """ Extract invoice data from text If template_name is None, auto-detect template """ try: # Auto-detect template if not specified if template_name is None: template_name = self.match_template(text) if template_name is None: return None # Extract with template result = self.extract_with_template(text, template_name) logger.info(f"✅ Extracted {len(result)} fields using template: {template_name}") return result except Exception as e: logger.error(f"❌ Extraction failed: {e}") return None def get_template_list(self) -> List[Dict[str, str]]: """Get list of available templates""" return [ { 'name': name, 'issuer': template.get('issuer'), 'country': template.get('country') } for name, template in self.templates.items() ] # Singleton instance _invoice2data_service = None def get_invoice2data_service() -> Invoice2DataService: """Get singleton instance of Invoice2Data service""" global _invoice2data_service if _invoice2data_service is None: _invoice2data_service = Invoice2DataService() return _invoice2data_service