bmc_hub/app/services/economic_service.py
Christian b764224eff fix: Use /invoices/drafts endpoint instead of /sent
v1.3.142:
- Changed from /invoices/sent to /invoices/drafts
- /drafts contains most active invoices (29 vs 0 in /sent)
- Still applies pagination and 13-month filter
2026-01-27 07:20:01 +01:00

1063 lines
48 KiB
Python

"""
e-conomic Integration Service
Send invoices and supplier invoices (kassekladde) to e-conomic accounting system
🚨 SAFETY MODES:
- ECONOMIC_READ_ONLY: Blocks ALL write operations when True
- ECONOMIC_DRY_RUN: Logs operations but doesn't send to e-conomic when True
"""
import logging
import aiohttp
import json
from datetime import datetime, timedelta
from typing import Dict, Optional, List
from app.core.config import settings
logger = logging.getLogger(__name__)
class EconomicService:
"""Service for integrating with e-conomic REST API"""
def __init__(self):
self.api_url = getattr(settings, 'ECONOMIC_API_URL', 'https://restapi.e-conomic.com')
self.app_secret_token = getattr(settings, 'ECONOMIC_APP_SECRET_TOKEN', None)
self.agreement_grant_token = getattr(settings, 'ECONOMIC_AGREEMENT_GRANT_TOKEN', None)
self.read_only = getattr(settings, 'ECONOMIC_READ_ONLY', True)
self.dry_run = getattr(settings, 'ECONOMIC_DRY_RUN', True)
if not self.app_secret_token or not self.agreement_grant_token:
logger.warning("⚠️ e-conomic credentials not configured")
# Log safety status at initialization
if self.read_only:
logger.warning("🔒 e-conomic READ-ONLY MODE ENABLED - All write operations will be blocked")
elif self.dry_run:
logger.warning("🏃 e-conomic DRY-RUN MODE ENABLED - Operations will be logged but not executed")
else:
logger.warning("⚠️ e-conomic WRITE MODE ACTIVE - Changes will be sent to production!")
def _check_write_permission(self, operation: str) -> bool:
"""
Check if write operations are allowed
Args:
operation: Name of the operation being attempted
Returns:
True if operation should proceed, False if blocked
"""
if self.read_only:
logger.error(f"🚫 BLOCKED: {operation} - READ_ONLY mode is enabled")
logger.error("To enable writes, set ECONOMIC_READ_ONLY=false in .env")
return False
if self.dry_run:
logger.warning(f"🏃 DRY-RUN: {operation} - Would execute but DRY_RUN mode is enabled")
logger.warning("To actually send to e-conomic, set ECONOMIC_DRY_RUN=false in .env")
return False
# Triple-check for production writes
logger.warning(f"⚠️ EXECUTING WRITE OPERATION: {operation}")
logger.warning(f"⚠️ This will modify production e-conomic at {self.api_url}")
return True
def _log_api_call(self, method: str, endpoint: str, payload: Optional[Dict] = None,
response_data: Optional[Dict] = None, status_code: Optional[int] = None):
"""
Comprehensive logging of all API calls
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint
payload: Request payload
response_data: Response data
status_code: HTTP status code
"""
log_entry = {
"method": method,
"endpoint": endpoint,
"api_url": self.api_url,
"read_only": self.read_only,
"dry_run": self.dry_run
}
if payload:
log_entry["request_payload"] = payload
if response_data:
log_entry["response_data"] = response_data
if status_code:
log_entry["status_code"] = status_code
logger.info(f"📊 e-conomic API Call: {json.dumps(log_entry, indent=2, default=str)}")
def _get_headers(self) -> Dict[str, str]:
"""Get HTTP headers for e-conomic API"""
if not self.app_secret_token or not self.agreement_grant_token:
raise ValueError("e-conomic credentials not configured")
return {
'X-AppSecretToken': self.app_secret_token,
'X-AgreementGrantToken': self.agreement_grant_token,
'Content-Type': 'application/json'
}
async def test_connection(self) -> bool:
"""
Test e-conomic API connection
Returns:
True if connection successful
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.api_url}/self",
headers=self._get_headers()
) as response:
if response.status == 200:
data = await response.json()
logger.info(f"✅ Connected to e-conomic: {data.get('agreementNumber')}")
return True
else:
error = await response.text()
logger.error(f"❌ e-conomic connection failed: {response.status} - {error}")
return False
except Exception as e:
logger.error(f"❌ e-conomic connection error: {e}")
return False
# ========== CUSTOMER MANAGEMENT ==========
async def get_customers(self, page: int = 0, page_size: int = 1000) -> List[Dict]:
"""
Get customers from e-conomic
Args:
page: Page number (0-indexed)
page_size: Number of records per page
Returns:
List of customer records with customerNumber, corporateIdentificationNumber, name
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.api_url}/customers",
params={"skippages": page, "pagesize": page_size},
headers=self._get_headers()
) as response:
if response.status == 200:
data = await response.json()
customers = data.get('collection', [])
logger.info(f"📥 Fetched {len(customers)} customers from e-conomic (page {page})")
return customers
else:
error = await response.text()
logger.error(f"❌ Failed to fetch customers: {response.status} - {error}")
return []
except Exception as e:
logger.error(f"❌ Error fetching customers from e-conomic: {e}")
return []
async def search_customer_by_cvr(self, cvr: str) -> Optional[Dict]:
"""
Search for customer by CVR number
Args:
cvr: CVR number (8 digits)
Returns:
Customer record if found, None otherwise
"""
try:
# Clean CVR
import re
cvr_clean = re.sub(r'\D', '', str(cvr))[:8]
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.api_url}/customers",
params={"filter": f"corporateIdentificationNumber$eq:{cvr_clean}"},
headers=self._get_headers()
) as response:
if response.status == 200:
data = await response.json()
customers = data.get('collection', [])
if customers:
logger.info(f"✅ Found customer with CVR {cvr_clean}: {customers[0].get('name')}")
return customers[0]
else:
logger.debug(f"❌ No customer found with CVR {cvr_clean}")
return None
else:
error = await response.text()
logger.error(f"❌ CVR search failed: {response.status} - {error}")
return None
except Exception as e:
logger.error(f"❌ Error searching customer by CVR: {e}")
return None
async def search_customer_by_name(self, name: str) -> List[Dict]:
"""
Search for customers by name (partial match)
Args:
name: Customer name to search for
Returns:
List of matching customer records
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.api_url}/customers",
params={"filter": f"name$like:*{name}*"},
headers=self._get_headers()
) as response:
if response.status == 200:
data = await response.json()
customers = data.get('collection', [])
logger.info(f"✅ Found {len(customers)} customers matching '{name}'")
return customers
else:
error = await response.text()
logger.error(f"❌ Name search failed: {response.status} - {error}")
return []
except Exception as e:
logger.error(f"❌ Error searching customer by name: {e}")
return []
async def get_customer(self, customer_number: int) -> Optional[Dict]:
"""
Get a single customer by customer number
Args:
customer_number: e-conomic customer number
Returns:
Customer record or None
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.api_url}/customers/{customer_number}",
headers=self._get_headers()
) as response:
if response.status == 200:
customer = await response.json()
logger.info(f"✅ Found e-conomic customer {customer_number}")
return customer
elif response.status == 404:
logger.warning(f"⚠️ Customer {customer_number} not found in e-conomic")
return None
else:
error = await response.text()
logger.error(f"❌ Failed to get customer {customer_number}: {response.status} - {error}")
return None
except Exception as e:
logger.error(f"❌ Error getting customer {customer_number}: {e}")
return None
async def update_customer(self, customer_number: int, update_data: Dict) -> bool:
"""
Update a customer in e-conomic
Args:
customer_number: e-conomic customer number
update_data: Dictionary of fields to update
Returns:
True if successful, False otherwise
"""
if settings.ECONOMIC_READ_ONLY or settings.ECONOMIC_DRY_RUN:
logger.warning(f"⚠️ e-conomic update blocked by safety flags (READ_ONLY={settings.ECONOMIC_READ_ONLY}, DRY_RUN={settings.ECONOMIC_DRY_RUN})")
logger.info(f"Would update customer {customer_number} with: {update_data}")
return False
try:
async with aiohttp.ClientSession() as session:
async with session.put(
f"{self.api_url}/customers/{customer_number}",
json=update_data,
headers=self._get_headers()
) as response:
if response.status == 200:
logger.info(f"✅ Updated e-conomic customer {customer_number}")
return True
else:
error = await response.text()
logger.error(f"❌ Failed to update customer {customer_number}: {response.status} - {error}")
return False
except Exception as e:
logger.error(f"❌ Error updating e-conomic customer {customer_number}: {e}")
return False
# ========== SUPPLIER/VENDOR MANAGEMENT ==========
async def search_supplier_by_name(self, supplier_name: str) -> Optional[Dict]:
"""
Search for supplier in e-conomic based on name
Args:
supplier_name: Name of supplier to search for
Returns:
Supplier data if found, None otherwise
"""
try:
url = f"{self.api_url}/suppliers"
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=self._get_headers()) as response:
if response.status != 200:
logger.error(f"❌ Failed to fetch suppliers: {response.status}")
return None
data = await response.json()
suppliers = data.get('collection', [])
# Search for supplier by name (case-insensitive)
search_name = supplier_name.lower().strip()
for supplier in suppliers:
supplier_display_name = supplier.get('name', '').lower().strip()
# Exact match or contains
if search_name in supplier_display_name or supplier_display_name in search_name:
logger.info(f"✅ Found supplier match: {supplier.get('name')} (ID: {supplier.get('supplierNumber')})")
return {
'supplierNumber': supplier.get('supplierNumber'),
'name': supplier.get('name'),
'currency': supplier.get('currency'),
'vatZone': supplier.get('vatZone')
}
logger.warning(f"⚠️ No supplier found matching '{supplier_name}'")
return None
except Exception as e:
logger.error(f"❌ Error searching supplier: {e}")
return None
async def create_supplier(self, supplier_data: Dict) -> Optional[Dict]:
"""
Create new supplier in e-conomic
🚨 WRITE OPERATION - Respects READ_ONLY and DRY_RUN modes
Args:
supplier_data: {
'name': str,
'address': str (optional),
'city': str (optional),
'zip': str (optional),
'country': str (optional),
'corporate_identification_number': str (optional - CVR),
'currency': str (default 'DKK'),
'payment_terms_number': int (default 1),
'vat_zone_number': int (default 1)
}
Returns:
Created supplier data with supplierNumber or None if failed
"""
if not self._check_write_permission("create_supplier"):
return None
try:
# Build supplier payload
payload = {
"name": supplier_data['name'],
"currency": supplier_data.get('currency', 'DKK'),
"supplierGroup": {
"supplierGroupNumber": supplier_data.get('supplier_group_number', 1)
},
"paymentTerms": {
"paymentTermsNumber": supplier_data.get('payment_terms_number', 4) # Netto 14 dage
},
"vatZone": {
"vatZoneNumber": supplier_data.get('vat_zone_number', 1)
}
}
# Optional fields
if supplier_data.get('address'):
payload['address'] = supplier_data['address']
if supplier_data.get('city'):
payload['city'] = supplier_data['city']
if supplier_data.get('zip'):
payload['zip'] = supplier_data['zip']
if supplier_data.get('country'):
payload['country'] = supplier_data['country']
if supplier_data.get('corporate_identification_number'):
payload['corporateIdentificationNumber'] = supplier_data['corporate_identification_number']
url = f"{self.api_url}/suppliers"
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=self._get_headers(), json=payload) as response:
if response.status in [200, 201]:
result = await response.json()
logger.info(f"✅ Created supplier: {result.get('name')} (ID: {result.get('supplierNumber')})")
# Save to local vendors table
try:
from app.core.database import execute_insert
vendor_id = execute_insert("""
INSERT INTO vendors (
name,
cvr,
economic_supplier_number,
created_at
) VALUES (%s, %s, %s, CURRENT_TIMESTAMP)
ON CONFLICT (economic_supplier_number)
DO UPDATE SET
name = EXCLUDED.name,
cvr = EXCLUDED.cvr
""", (
result.get('name'),
supplier_data.get('corporate_identification_number'),
result.get('supplierNumber')
))
logger.info(f"✅ Saved supplier to local database (vendor_id: {vendor_id})")
except Exception as db_error:
logger.warning(f"⚠️ Could not save to local database: {db_error}")
return result
else:
error_text = await response.text()
logger.error(f"❌ Failed to create supplier: {response.status} - {error_text}")
return None
except Exception as e:
logger.error(f"❌ Error creating supplier: {e}")
return None
# ========== CUSTOMER INVOICES ==========
async def get_customer_invoices(self, customer_number: int, include_lines: bool = True) -> List[Dict]:
"""
Get customer invoices (sales invoices) from e-conomic (last 1 year only)
Args:
customer_number: e-conomic customer number
include_lines: Whether to include invoice lines (adds more API calls but gets full data)
Returns:
List of invoice records with lines from the last year
"""
try:
# Calculate first day of the month 13 months ago (for yearly billed items)
from dateutil.relativedelta import relativedelta
now = datetime.now()
start_date_obj = (now - relativedelta(months=13)).replace(day=1)
start_date = start_date_obj.strftime('%Y-%m-%d')
logger.info(f"📅 Will filter invoices from {start_date} onwards (13 months for yearly billed items)")
async with aiohttp.ClientSession() as session:
# Fetch from /invoices/drafts endpoint (covers active invoices)
# Then filter by customer in code
all_invoices = []
endpoint = f"{self.api_url}/invoices/drafts"
logger.info(f"📋 Fetching invoices from {endpoint}")
try:
# Pagination: Keep fetching until no more pages
page = 0
while True:
async with session.get(
endpoint,
params={"pagesize": 1000, "skippages": page},
headers=self._get_headers()
) as response:
logger.info(f"🔍 [API] Response status from {endpoint} (page {page}): {response.status}")
if response.status == 200:
data = await response.json()
invoices_from_endpoint = data.get('collection', [])
logger.info(f"✅ Fetched {len(invoices_from_endpoint)} invoices from {endpoint} page {page}")
if not invoices_from_endpoint:
# No more invoices on this page
break
# Add invoices
all_invoices.extend(invoices_from_endpoint)
# Check if we got full page (1000) - if so, there might be more pages
if len(invoices_from_endpoint) < 1000:
break # Last page
page += 1
else:
error_text = await response.text()
logger.warning(f"⚠️ Endpoint {endpoint} returned {response.status}: {error_text[:200]}")
break
except Exception as e:
logger.error(f"❌ Error fetching invoices: {e}")
logger.info(f"✅ Found {len(all_invoices)} total invoices")
# Filter invoices for this customer
customer_invoices = [
inv for inv in all_invoices
if inv.get('customer', {}).get('customerNumber') == customer_number
]
logger.info(f"📊 Filtered to {len(customer_invoices)} invoices for customer {customer_number}")
if not all_invoices:
logger.warning(f"⚠️ No invoices found in e-conomic")
return []
logger.info(f"✅ Found {len(all_invoices)} total invoices")
# Filter invoices for this customer
customer_invoices = [
inv for inv in all_invoices
if inv.get('customer', {}).get('customerNumber') == customer_number
]
logger.info(f"📊 Filtered to {len(customer_invoices)} invoices for customer {customer_number}")
# Debug: log response structure if we have invoices
if customer_invoices:
logger.info(f"🔍 [API] First invoice structure keys: {list(customer_invoices[0].keys())}")
logger.info(f"🔍 [API] First invoice date: {customer_invoices[0].get('date')}")
# Apply date filter (13 months back)
from dateutil.parser import parse as parse_date
filtered_by_date = []
for inv in all_invoices:
invoice_date_str = inv.get('date')
if invoice_date_str:
try:
invoice_date = parse_date(invoice_date_str)
if invoice_date.replace(tzinfo=None) >= start_date_obj:
filtered_by_date.append(inv)
else:
logger.debug(f" Skipped invoice {inv.get('draftInvoiceNumber') or inv.get('bookedInvoiceNumber')} dated {invoice_date_str} (before {start_date})")
except Exception as e:
logger.warning(f" Could not parse invoice date '{invoice_date_str}': {e}")
# Include if we can't parse the date
filtered_by_date.append(inv)
else:
# Include if no date field
filtered_by_date.append(inv)
logger.info(f"📅 After date filter ({start_date}+): {len(filtered_by_date)} invoices")
customer_invoices = filtered_by_date
# Fetch full invoice data with lines if requested
if include_lines and customer_invoices:
full_invoices = []
for inv in customer_invoices:
# Try to get full invoice data using the 'self' link or direct URL
invoice_self_link = inv.get('self')
invoice_number = inv.get('draftInvoiceNumber') or inv.get('bookedInvoiceNumber')
logger.debug(f"Fetching full data for invoice: {invoice_number}")
try:
# Try the self link first
fetch_url = invoice_self_link or f"{self.api_url}/invoices/sales/{invoice_number}"
async with session.get(
fetch_url,
headers=self._get_headers()
) as inv_response:
if inv_response.status == 200:
full_inv = await inv_response.json()
full_invoices.append(full_inv)
lines_count = len(full_inv.get('lines', []))
logger.debug(f"✅ Fetched invoice {invoice_number} with {lines_count} lines")
else:
# If self link fails, try with the existing data (may have lines already)
if 'lines' in inv:
full_invoices.append(inv)
logger.debug(f"Using cached data for invoice {invoice_number}")
else:
logger.warning(f"Could not fetch invoice {invoice_number}: {inv_response.status}")
except Exception as e:
# If fetch fails, try to use existing data if it has lines
if 'lines' in inv:
full_invoices.append(inv)
logger.debug(f"Using cached data for invoice {invoice_number} (fetch failed: {e})")
else:
logger.warning(f"⚠️ Could not fetch invoice {invoice_number}: {e}")
logger.info(f"✅ Fetched full data for {len(full_invoices)} invoices")
return full_invoices
return customer_invoices
except Exception as e:
logger.error(f"❌ Error getting customer invoices: {e}")
import traceback
logger.error(traceback.format_exc())
return []
# ========== KASSEKLADDE (JOURNALS/VOUCHERS) ==========
async def check_invoice_number_exists(self, invoice_number: str, journal_number: Optional[int] = None) -> Optional[Dict]:
"""
Check if an invoice number already exists in e-conomic journals
Args:
invoice_number: Invoice number to check
journal_number: Optional specific journal to search (if None, searches all)
Returns:
Dict with voucher info if found, None otherwise
"""
try:
# Search in vouchers (posted journal entries)
url = f"{self.api_url}/vouchers"
params = {
'filter': f'voucherNumber${invoice_number}', # e-conomic filter syntax
'pagesize': 100
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=self._get_headers(), params=params) as response:
if response.status != 200:
logger.warning(f"⚠️ Failed to search vouchers: {response.status}")
return None
data = await response.json()
vouchers = data.get('collection', [])
# Check if any voucher matches the invoice number
for voucher in vouchers:
# Check if invoice number appears in voucher text or entries
if invoice_number in str(voucher):
logger.warning(f"⚠️ Invoice number {invoice_number} found in e-conomic voucher #{voucher.get('voucherNumber')}")
return {
'found_in': 'e-conomic',
'voucher_number': voucher.get('voucherNumber'),
'date': voucher.get('date'),
'journal': voucher.get('journal', {}).get('journalNumber')
}
logger.info(f"✅ Invoice number {invoice_number} not found in e-conomic")
return None
except Exception as e:
logger.error(f"❌ Error checking invoice number in e-conomic: {e}")
# Don't block on e-conomic errors - assume not found
return None
async def get_supplier_invoice_journals(self) -> list:
"""
Get all available journals for supplier invoices (kassekladde)
Returns:
List of journal dictionaries with journalNumber, name, and journalType
"""
try:
url = f"{self.api_url}/journals"
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=self._get_headers()) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"e-conomic API error: {response.status} - {error_text}")
data = await response.json()
# Filter for supplier invoice journals
journals = []
for journal in data.get('collection', []):
journals.append({
'journalNumber': journal.get('journalNumber'),
'name': journal.get('name'),
'journalType': journal.get('journalType')
})
return journals
except Exception as e:
logger.error(f"❌ Error fetching journals: {e}")
raise
async def get_accounts(self) -> List[Dict]:
"""
Get chart of accounts (kontoplan) from e-conomic
Returns:
List of account dictionaries with accountNumber, name, accountType, vatCode
"""
try:
url = f"{self.api_url}/accounts"
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=self._get_headers()) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"e-conomic API error: {response.status} - {error_text}")
data = await response.json()
# Extract relevant account info
accounts = []
for account in data.get('collection', []):
accounts.append({
'accountNumber': account.get('accountNumber'),
'name': account.get('name'),
'accountType': account.get('accountType'),
'vatCode': account.get('vatCode', {}).get('vatCode') if account.get('vatCode') else None,
'balance': account.get('balance', 0.0)
})
logger.info(f"📥 Fetched {len(accounts)} accounts from e-conomic")
return accounts
except Exception as e:
logger.error(f"❌ Error fetching accounts: {e}")
raise
async def sync_accounts_to_database(self) -> int:
"""
Fetch accounts from e-conomic and cache them in database
Returns:
Number of accounts synced
"""
from app.core.database import execute_query
try:
accounts = await self.get_accounts()
# Upsert accounts into database
for account in accounts:
execute_query("""
INSERT INTO economic_accounts
(account_number, name, account_type, vat_code, balance, last_synced)
VALUES (%s, %s, %s, %s, %s, NOW())
ON CONFLICT (account_number)
DO UPDATE SET
name = EXCLUDED.name,
account_type = EXCLUDED.account_type,
vat_code = EXCLUDED.vat_code,
balance = EXCLUDED.balance,
last_synced = NOW()
""", (
account['accountNumber'],
account['name'],
account['accountType'],
account['vatCode'],
account['balance']
))
logger.info(f"✅ Synced {len(accounts)} accounts to database")
return len(accounts)
except Exception as e:
logger.error(f"❌ Error syncing accounts to database: {e}")
raise
async def create_journal_supplier_invoice(self,
journal_number: int,
supplier_number: int,
invoice_number: str,
invoice_date: str,
total_amount: float,
vat_breakdown: Dict[str, float],
line_items: List[Dict] = None,
due_date: Optional[str] = None,
text: Optional[str] = None) -> Dict:
"""
Post supplier invoice to e-conomic kassekladde (journals API)
🚨 WRITE OPERATION - Respects READ_ONLY and DRY_RUN modes
Args:
journal_number: Journal/kassekladde number (from system_settings)
supplier_number: e-conomic supplier number
invoice_number: Supplier's invoice number
invoice_date: Invoice date (YYYY-MM-DD)
total_amount: Total invoice amount including VAT
vat_breakdown: Dict of {vat_code: {"net": X, "vat": Y, "gross": Z}} for each VAT group
line_items: List of line items with contra_account and vat_code
due_date: Payment due date (YYYY-MM-DD)
text: Invoice description
Returns:
Dict with voucher details or error info
"""
# 🚨 SAFETY CHECK
if not self._check_write_permission("create_journal_supplier_invoice"):
return {"error": True, "message": "Write operations blocked by READ_ONLY or DRY_RUN mode"}
try:
# Extract year from invoice date for accounting year
accounting_year = invoice_date[:4]
# Build supplier invoice entries - one per line item or per VAT group
supplier_invoices = []
# If we have line items with contra accounts, use those
if line_items and isinstance(line_items, list):
# Group lines by VAT code and contra account combination
line_groups = {}
for line in line_items:
vat_code = line.get('vat_code', 'I25')
contra_account = line.get('contra_account', '5810')
key = f"{vat_code}_{contra_account}"
if key not in line_groups:
line_groups[key] = {
'vat_code': vat_code,
'contra_account': contra_account,
'gross': 0,
'vat': 0,
'items': []
}
line_total = line.get('line_total', 0)
vat_amount = line.get('vat_amount', 0)
line_groups[key]['gross'] += line_total
line_groups[key]['vat'] += vat_amount
line_groups[key]['items'].append(line)
# Create entry for each group
for key, group in line_groups.items():
entry = {
"supplier": {
"supplierNumber": supplier_number
},
"amount": round(group['gross'], 2),
"contraAccount": {
"accountNumber": int(group['contra_account'])
},
"currency": {
"code": "DKK"
},
"date": invoice_date,
"supplierInvoiceNumber": invoice_number[:30] if invoice_number else ""
}
# Add text with product descriptions
descriptions = [item.get('description', '') for item in group['items'][:2]]
entry_text = text if text else f"Faktura {invoice_number}"
if descriptions:
entry_text = f"{entry_text} - {', '.join(filter(None, descriptions))}"
entry["text"] = entry_text[:250]
if due_date:
entry["dueDate"] = due_date
# Add VAT details
if group['vat'] > 0:
entry["contraVatAccount"] = {
"vatCode": group['vat_code']
}
entry["contraVatAmount"] = round(group['vat'], 2)
supplier_invoices.append(entry)
elif vat_breakdown and isinstance(vat_breakdown, dict):
# Fallback: vat_breakdown format: {"I25": {"net": 1110.672, "vat": 277.668, "rate": 25, "gross": 1388.34}, ...}
for vat_code, vat_data in vat_breakdown.items():
if not isinstance(vat_data, dict):
continue
net_amount = vat_data.get('net', 0)
vat_amount = vat_data.get('vat', 0)
gross_amount = vat_data.get('gross', net_amount + vat_amount)
if gross_amount <= 0:
continue
entry = {
"supplier": {
"supplierNumber": supplier_number
},
"amount": round(gross_amount, 2),
"contraAccount": {
"accountNumber": 5810 # Default fallback account
},
"currency": {
"code": "DKK"
},
"date": invoice_date,
"supplierInvoiceNumber": invoice_number[:30] if invoice_number else ""
}
# Add text with VAT code for clarity
entry_text = text if text else f"Faktura {invoice_number}"
if len(vat_breakdown) > 1:
entry_text = f"{entry_text} ({vat_code})"
entry["text"] = entry_text[:250]
if due_date:
entry["dueDate"] = due_date
# Add VAT details
if vat_amount > 0:
entry["contraVatAccount"] = {
"vatCode": vat_code
}
entry["contraVatAmount"] = round(vat_amount, 2)
supplier_invoices.append(entry)
else:
# No VAT breakdown - create single entry
supplier_invoice = {
"supplier": {
"supplierNumber": supplier_number
},
"amount": total_amount,
"contraAccount": {
"accountNumber": 5810 # Default fallback account
},
"currency": {
"code": "DKK"
},
"date": invoice_date,
"supplierInvoiceNumber": invoice_number[:30] if invoice_number else ""
}
if text:
supplier_invoice["text"] = text[:250]
if due_date:
supplier_invoice["dueDate"] = due_date
supplier_invoices.append(supplier_invoice)
# Build voucher payload
payload = {
"accountingYear": {
"year": accounting_year
},
"journal": {
"journalNumber": journal_number
},
"entries": {
"supplierInvoices": supplier_invoices
}
}
logger.info(f"📤 Posting supplier invoice to journal {journal_number}")
logger.debug(f"Payload: {json.dumps(payload, indent=2)}")
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.api_url}/journals/{journal_number}/vouchers",
headers=self._get_headers(),
json=payload
) as response:
response_text = await response.text()
self._log_api_call(
"POST",
f"/journals/{journal_number}/vouchers",
payload,
await response.json() if response.status in [200, 201] and response_text else None,
response.status
)
if response.status in [200, 201]:
data = await response.json() if response_text else {}
# e-conomic returns array of created vouchers
if isinstance(data, list) and len(data) > 0:
voucher_data = data[0]
else:
voucher_data = data
voucher_number = voucher_data.get('voucherNumber')
logger.info(f"✅ Supplier invoice posted to kassekladde: voucher #{voucher_number}")
return {
"success": True,
"voucher_number": voucher_number,
"journal_number": journal_number,
"accounting_year": accounting_year,
"data": voucher_data
}
else:
logger.error(f"❌ Post to kassekladde failed: {response.status}")
logger.error(f"Response: {response_text}")
return {
"error": True,
"status": response.status,
"message": response_text
}
except Exception as e:
logger.error(f"❌ create_journal_supplier_invoice error: {e}")
logger.exception("Full traceback:")
return {"error": True, "status": 500, "message": str(e)}
async def upload_voucher_attachment(self,
journal_number: int,
accounting_year: str,
voucher_number: int,
pdf_path: str,
filename: str) -> Dict:
"""
Upload PDF attachment to e-conomic voucher
🚨 WRITE OPERATION - Respects READ_ONLY and DRY_RUN modes
Args:
journal_number: Journal number
accounting_year: Accounting year (e.g., "2025")
voucher_number: Voucher number
pdf_path: Local path to PDF file
filename: Filename for attachment
Returns:
Dict with success status
"""
# 🚨 SAFETY CHECK
if not self._check_write_permission("upload_voucher_attachment"):
return {"error": True, "message": "Write operations blocked by READ_ONLY or DRY_RUN mode"}
try:
# Read PDF file
with open(pdf_path, 'rb') as f:
pdf_data = f.read()
# e-conomic attachment/file endpoint (POST is allowed here, not on /attachment)
url = f"{self.api_url}/journals/{journal_number}/vouchers/{accounting_year}-{voucher_number}/attachment/file"
headers = {
'X-AppSecretToken': self.app_secret_token,
'X-AgreementGrantToken': self.agreement_grant_token
}
# Use multipart/form-data as required by e-conomic API
form_data = aiohttp.FormData()
form_data.add_field('file',
pdf_data,
filename=filename,
content_type='application/pdf')
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, data=form_data) as response:
if response.status in [200, 201, 204]:
logger.info(f"📎 PDF attachment uploaded to voucher {accounting_year}-{voucher_number}")
return {"success": True}
else:
error_text = await response.text()
logger.error(f"❌ Failed to upload attachment: {response.status} - {error_text}")
return {"error": True, "status": response.status, "message": error_text}
except Exception as e:
logger.error(f"❌ upload_voucher_attachment error: {e}")
return {"error": True, "message": str(e)}
# Singleton instance
_economic_service_instance = None
def get_economic_service() -> EconomicService:
"""Get singleton instance of EconomicService"""
global _economic_service_instance
if _economic_service_instance is None:
_economic_service_instance = EconomicService()
return _economic_service_instance