bmc_hub/app/services/invoice2data_service.py
Christian 3a8288f5a1 feat: Implement quick analysis on PDF upload for CVR, document type, and number extraction
- Added `check_invoice_number_exists` method in `EconomicService` to verify invoice numbers in e-conomic journals.
- Introduced `quick_analysis_on_upload` method in `OllamaService` for extracting critical fields from uploaded PDFs, including CVR, document type, and document number.
- Created migration script to add new fields for storing detected CVR, vendor ID, document type, and document number in the `incoming_files` table.
- Developed comprehensive tests for the quick analysis functionality, validating CVR detection, document type identification, and invoice number extraction.
2025-12-09 14:54:33 +01:00

338 lines
16 KiB
Python

"""
Invoice2Data Service
Wrapper around invoice2data library for template-based invoice extraction
"""
import logging
import re
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any
import yaml
logger = logging.getLogger(__name__)
class Invoice2DataService:
"""Service for extracting invoice data using invoice2data templates"""
def __init__(self):
self.template_dir = Path(__file__).parent.parent.parent / "data" / "invoice_templates"
self.templates = self._load_templates()
logger.info(f"📋 Loaded {len(self.templates)} invoice2data templates")
def _load_templates(self) -> Dict[str, Dict]:
"""Load all YAML templates from template directory"""
templates = {}
if not self.template_dir.exists():
logger.warning(f"Template directory not found: {self.template_dir}")
return templates
for template_file in self.template_dir.glob("*.yml"):
try:
with open(template_file, 'r', encoding='utf-8') as f:
template_data = yaml.safe_load(f)
template_name = template_file.stem
templates[template_name] = template_data
logger.debug(f" ✓ Loaded template: {template_name}")
except Exception as e:
logger.error(f" ✗ Failed to load template {template_file}: {e}")
return templates
def match_template(self, text: str) -> Optional[str]:
"""
Find matching template based on keywords
Returns template name or None
"""
text_lower = text.lower()
for template_name, template_data in self.templates.items():
keywords = template_data.get('keywords', [])
# Check if all keywords are present
matches = sum(1 for keyword in keywords if str(keyword).lower() in text_lower)
if matches >= len(keywords) * 0.7: # 70% of keywords must match
logger.info(f"✅ Matched template: {template_name} ({matches}/{len(keywords)} keywords)")
return template_name
logger.warning("⚠️ No template matched")
return None
def extract_with_template(self, text: str, template_name: str) -> Dict[str, Any]:
"""
Extract invoice data using specific template
"""
if template_name not in self.templates:
raise ValueError(f"Template not found: {template_name}")
template = self.templates[template_name]
fields = template.get('fields', {})
options = template.get('options', {})
extracted = {
'template': template_name,
'issuer': template.get('issuer'),
'country': template.get('country'),
'currency': options.get('currency', 'DKK')
}
# Extract each field using its regex
for field_name, field_config in fields.items():
if field_config.get('parser') != 'regex':
continue
pattern = field_config.get('regex')
field_type = field_config.get('type', 'string')
group = field_config.get('group', 1)
try:
match = re.search(pattern, text, re.IGNORECASE | re.MULTILINE)
if match:
value = match.group(group).strip()
logger.debug(f" 🔍 Extracted raw value for {field_name}: '{value}' (type: {field_type})")
# Handle CVR filtering (avoid customer CVR)
if field_name == 'vendor_vat':
# Find ALL CVR numbers
all_cvr_matches = re.finditer(r'SE/CVR-nr\.\s+(\d{8})', text, re.IGNORECASE)
cvr_numbers = [m.group(1) for m in all_cvr_matches]
# Filter out BMC's CVR (29522790)
vendor_cvrs = [cvr for cvr in cvr_numbers if cvr != '29522790']
if vendor_cvrs:
value = vendor_cvrs[0]
logger.debug(f"{field_name}: {value} (filtered from {cvr_numbers})")
else:
logger.warning(f" ⚠️ Only customer CVR found, no vendor CVR")
continue
# Convert type
if field_type == 'float':
# Handle Danish number format (1.234,56 → 1234.56)
# OR (148,587.98 → 148587.98) - handle both formats
decimal_sep = options.get('decimal_separator', ',')
thousands_sep = options.get('thousands_separator', '.')
# Remove all spaces first
value = value.replace(' ', '')
# If both separators are present, we can determine the format
# Danish: 148.587,98 (thousands=., decimal=,)
# English: 148,587.98 (thousands=, decimal=.)
if thousands_sep in value and decimal_sep in value:
# Remove thousands separator, then convert decimal separator to .
value = value.replace(thousands_sep, '').replace(decimal_sep, '.')
elif thousands_sep in value:
# Only thousands separator present - just remove it
value = value.replace(thousands_sep, '')
elif decimal_sep in value and decimal_sep == ',':
# Only decimal separator and it's Danish comma - convert to .
value = value.replace(',', '.')
value = float(value)
elif field_type == 'int':
value = int(value)
elif field_type == 'date':
# Try to parse Danish dates
date_formats = options.get('date_formats', ['%B %d, %Y', '%d-%m-%Y'])
# Danish month names
value = value.replace('januar', 'January').replace('februar', 'February')
value = value.replace('marts', 'March').replace('april', 'April')
value = value.replace('maj', 'May').replace('juni', 'June')
value = value.replace('juli', 'July').replace('august', 'August')
value = value.replace('september', 'September').replace('oktober', 'October')
value = value.replace('november', 'November').replace('december', 'December')
for date_format in date_formats:
try:
parsed_date = datetime.strptime(value, date_format)
value = parsed_date.strftime('%Y-%m-%d')
break
except ValueError:
continue
extracted[field_name] = value
logger.debug(f"{field_name}: {value}")
else:
logger.debug(f"{field_name}: No match")
except Exception as e:
logger.warning(f" ✗ Failed to extract {field_name}: {e}")
# Extract line items if defined in template
lines_config = template.get('lines', [])
if lines_config:
extracted['lines'] = self._extract_lines(text, lines_config, options)
return extracted
def _extract_lines(self, text: str, lines_configs: List[Dict], options: Dict) -> List[Dict]:
"""Extract line items from invoice text"""
all_lines = []
logger.debug(f"🔍 Extracting lines with {len(lines_configs)} configurations")
for lines_config in lines_configs:
start_pattern = lines_config.get('start')
end_pattern = lines_config.get('end')
line_config = lines_config.get('line', {})
if not start_pattern or not line_config:
continue
try:
# Find section between start and end patterns
if end_pattern:
section_pattern = f"{start_pattern}(.*?){end_pattern}"
section_match = re.search(section_pattern, text, re.DOTALL | re.IGNORECASE)
else:
section_pattern = f"{start_pattern}(.*?)$"
section_match = re.search(section_pattern, text, re.DOTALL | re.IGNORECASE)
if not section_match:
logger.debug(f" ✗ Line section not found (start: {start_pattern[:50]}, end: {end_pattern[:50] if end_pattern else 'None'})")
continue
section_text = section_match.group(1)
logger.debug(f" ✓ Found line section ({len(section_text)} chars)")
# Extract individual lines
line_pattern = line_config.get('regex')
field_names = line_config.get('fields', [])
field_types = line_config.get('types', {})
context_config = line_config.get('context_before', {})
if not line_pattern or not field_names:
continue
# Split section into lines for context processing
section_lines = section_text.split('\n')
line_matches = []
# Find all matching lines with their indices
for line_idx, line_text in enumerate(section_lines):
match = re.search(line_pattern, line_text, re.MULTILINE)
if match:
line_matches.append((line_idx, line_text, match))
logger.debug(f" ✓ Found {len(line_matches)} matching lines")
for line_idx, line_text, match in line_matches:
line_data = {}
# Extract main line fields
for idx, field_name in enumerate(field_names, start=1):
try:
value = match.group(idx).strip()
field_type = field_types.get(field_name, 'string')
# Convert type
if field_type == 'float':
thousands_sep = options.get('thousands_separator', ',')
decimal_sep = options.get('decimal_separator', '.')
value = value.replace(' ', '')
if thousands_sep in value and decimal_sep in value:
value = value.replace(thousands_sep, '').replace(decimal_sep, '.')
elif thousands_sep in value:
value = value.replace(thousands_sep, '')
elif decimal_sep in value and decimal_sep == ',':
value = value.replace(',', '.')
value = float(value)
elif field_type == 'int':
value = int(value)
line_data[field_name] = value
except Exception as e:
logger.debug(f" ✗ Failed to extract line field {field_name}: {e}")
# Extract context_before if configured
if context_config and line_idx > 0:
max_lines = context_config.get('max_lines', 5)
patterns = context_config.get('patterns', [])
# Look at lines BEFORE this line
start_idx = max(0, line_idx - max_lines)
context_lines = section_lines[start_idx:line_idx]
for pattern_config in patterns:
pattern_regex = pattern_config.get('regex')
pattern_fields = pattern_config.get('fields', [])
if not pattern_regex or not pattern_fields:
continue
# Try to match against context lines (most recent first)
for ctx_line in reversed(context_lines):
ctx_match = re.search(pattern_regex, ctx_line)
if ctx_match:
# Extract fields from context
for ctx_idx, ctx_field_name in enumerate(pattern_fields, start=1):
try:
ctx_value = ctx_match.group(ctx_idx).strip()
line_data[ctx_field_name] = ctx_value
except Exception as e:
logger.debug(f" ✗ Failed to extract context field {ctx_field_name}: {e}")
break # Stop after first match for this pattern
if line_data:
all_lines.append(line_data)
logger.info(f" ✓ Extracted {len(all_lines)} line items")
except Exception as e:
logger.warning(f" ✗ Failed to extract lines: {e}")
return all_lines
def extract(self, text: str, template_name: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""
Extract invoice data from text
If template_name is None, auto-detect template
"""
try:
# Auto-detect template if not specified
if template_name is None:
template_name = self.match_template(text)
if template_name is None:
return None
# Extract with template
result = self.extract_with_template(text, template_name)
logger.info(f"✅ Extracted {len(result)} fields using template: {template_name}")
return result
except Exception as e:
logger.error(f"❌ Extraction failed: {e}")
return None
def get_template_list(self) -> List[Dict[str, str]]:
"""Get list of available templates"""
return [
{
'name': name,
'issuer': template.get('issuer'),
'country': template.get('country')
}
for name, template in self.templates.items()
]
# Singleton instance
_invoice2data_service = None
def get_invoice2data_service() -> Invoice2DataService:
"""Get singleton instance of Invoice2Data service"""
global _invoice2data_service
if _invoice2data_service is None:
_invoice2data_service = Invoice2DataService()
return _invoice2data_service