""" Subscription Billing Matrix Service Generate a per-customer billing matrix showing: - Rows: Products from e-conomic customer invoices - Columns: Months - Cells: Expected vs invoiced amounts, payment status, invoice references Data source: e-conomic sales invoices only """ import logging import json from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple from collections import defaultdict from app.services.economic_service import get_economic_service from app.core.database import execute_query logger = logging.getLogger(__name__) class SubscriptionMatrixService: """Generate billing matrix for customer subscriptions""" def __init__(self): self.economic_service = get_economic_service() async def generate_billing_matrix( self, customer_id: int, months: int = 12 ) -> Dict: """ Generate subscription billing matrix for a customer Args: customer_id: BMC Hub customer ID (local database) months: Number of months to include (default 12) Returns: Dict with structure: { "customer_id": int, "generated_at": str (ISO datetime), "months_shown": int, "products": [ { "product_number": str, "product_name": str, "rows": [ { "year_month": "2025-01", "amount": float, "status": "paid|invoiced|missing", "invoice_number": str (optional), "period_label": str, "period_from": str (ISO date), "period_to": str (ISO date) } ], "total_amount": float, "total_paid": float } ] } """ try: # Get customer's e-conomic number from local database logger.info(f"🔍 [MATRIX] Starting matrix generation for customer {customer_id}") customer = execute_query( "SELECT economic_customer_number FROM customers WHERE id = %s", (customer_id,) ) logger.info(f"🔍 [MATRIX] Query result: {customer}") if not customer or not customer[0].get('economic_customer_number'): logger.warning(f"⚠️ Customer {customer_id} has no e-conomic number") return { "customer_id": customer_id, "error": "No e-conomic customer number found", "products": [] } economic_customer_number = customer[0]['economic_customer_number'] logger.info(f"📊 Generating matrix for e-conomic customer {economic_customer_number}") # Fetch invoices from e-conomic logger.info(f"🔍 [MATRIX] About to call get_customer_invoices with customer {economic_customer_number}") invoices = await self.economic_service.get_customer_invoices( economic_customer_number, include_lines=True ) logger.info(f"🔍 [MATRIX] Returned {len(invoices)} invoices from e-conomic") if not invoices: logger.warning(f"⚠️ No invoices found for customer {economic_customer_number}") return { "customer_id": customer_id, "generated_at": datetime.now().isoformat(), "months_shown": months, "products": [] } logger.info(f"📊 Processing {len(invoices)} invoices") # Debug: log first invoice structure if invoices: logger.debug(f"First invoice structure: {json.dumps(invoices[0], indent=2, default=str)[:500]}") # Group invoices by product number matrix_data = self._aggregate_by_product(invoices, months) return { "customer_id": customer_id, "economic_customer_number": economic_customer_number, "generated_at": datetime.now().isoformat(), "months_shown": months, "products": matrix_data } except Exception as e: logger.error(f"❌ Error generating billing matrix: {e}") import traceback logger.error(traceback.format_exc()) return { "customer_id": customer_id, "error": str(e), "products": [] } def _aggregate_by_product(self, invoices: List[Dict], months: int) -> List[Dict]: """ Group invoice lines by product number and aggregate by month Args: invoices: List of e-conomic invoice objects months: Number of months to include Returns: List of product records with aggregated monthly data (including empty months) """ # Generate list of all months to display (last N months from today) all_months = self._generate_month_range(months) # Structure: {product_number: {year_month: {amount, status, invoice_number, period_from, period_to}}} product_matrix = defaultdict(dict) product_names = {} # Cache product names # Initialize products only if they have data within the selected month range try: # Fill in data from invoices, but only for months within range for invoice in invoices: invoice_number = invoice.get('bookedInvoiceNumber') or invoice.get('draftInvoiceNumber') # Determine status based on invoice type/endpoint it came from # Priority: use 'status' field, fallback to inferring from presence of certain fields invoice_status = invoice.get('status', 'unknown') if invoice_status == 'unknown': # Infer status from invoice structure if invoice.get('bookedInvoiceNumber'): invoice_status = 'booked' elif invoice.get('draftInvoiceNumber'): invoice_status = 'draft' invoice_date = invoice.get('date') # Build invoice-level text (title/notes) for keyword matching invoice_text_parts = [] notes = invoice.get('notes') if isinstance(notes, dict): for key in ["heading", "textLine1", "textLine2", "textLine3", "textLine4", "textLine5"]: val = notes.get(key) if val: invoice_text_parts.append(str(val)) elif notes: invoice_text_parts.append(str(notes)) other_ref = invoice.get('otherReference') or invoice.get('orderNumberDb') if other_ref: invoice_text_parts.append(str(other_ref)) invoice_text = " ".join(invoice_text_parts).lower() # Process invoice lines lines = invoice.get('lines', []) for line in lines: product_number = line.get('product', {}).get('productNumber') product_name = line.get('product', {}).get('name') or line.get('description') line_description = (line.get('description') or product_name or "").lower() # Only include lines that indicate a period or subscription if ( "periode" not in line_description and "abonnement" not in line_description and "periode" not in invoice_text and "abonnement" not in invoice_text ): continue if not product_number: logger.debug(f"Skipping line without product number: {line}") continue # Cache product name if product_number not in product_names: product_names[product_number] = product_name or f"Product {product_number}" # Extract period from line period_from_str = line.get('period', {}).get('from') period_to_str = line.get('period', {}).get('to') # If no period on line, use invoice date as month if not period_from_str: if invoice_date: period_from_str = invoice_date # Assume monthly billing if no end date period_from = datetime.fromisoformat(invoice_date.replace('Z', '+00:00')) period_to = period_from + timedelta(days=31) period_to_str = period_to.isoformat().split('T')[0] else: logger.warning(f"No period or date found for line in invoice {invoice_number}") continue # Parse dates try: period_from = datetime.fromisoformat(period_from_str.replace('Z', '+00:00')) period_to = datetime.fromisoformat(period_to_str.replace('Z', '+00:00')) except (ValueError, AttributeError) as e: logger.warning(f"Could not parse dates {period_from_str} - {period_to_str}: {e}") continue # Get amount - use gross amount if net is 0 (line items may have gross only) amount = float(line.get('netAmount', 0)) if amount == 0: amount = float(line.get('grossAmount', 0)) if amount == 0: # Try unit net price * quantity as fallback unit_price = float(line.get('unitNetPrice', 0)) qty = float(line.get('quantity', 1)) amount = unit_price * qty # Determine month key (use first month if multi-month) year_month = period_from.strftime('%Y-%m') if year_month not in all_months: # Skip lines outside the displayed month range continue # Calculate period label period_label = self._format_period_label(period_from, period_to) # Initialize product if first time within range if product_number not in product_matrix: for month_key in all_months: product_matrix[product_number][month_key] = { "amount": 0.0, "status": "missing", "invoice_number": None, "period_from": None, "period_to": None, "period_label": None } # Update cell cell = product_matrix[product_number][year_month] cell["amount"] = amount # Take last amount (or sum if multiple?) cell["status"] = self._determine_status(invoice_status) cell["invoice_number"] = invoice_number cell["period_from"] = period_from.isoformat().split('T')[0] cell["period_to"] = period_to.isoformat().split('T')[0] cell["period_label"] = period_label except Exception as e: logger.error(f"❌ Error aggregating data: {e}") # Convert to output format products = [] for product_number, months_data in sorted(product_matrix.items()): rows = [] total_amount: float = 0.0 total_paid: float = 0.0 for year_month in sorted(months_data.keys()): cell = months_data[year_month] rows.append({ "year_month": year_month, "amount": cell["amount"], "status": cell["status"], "invoice_number": cell["invoice_number"], "period_label": cell["period_label"], "period_from": cell["period_from"], "period_to": cell["period_to"] }) total_amount += cell["amount"] if cell["status"] == "paid": total_paid += cell["amount"] products.append({ "product_number": product_number, "product_name": product_names[product_number], "rows": rows, "total_amount": total_amount, "total_paid": total_paid }) return products @staticmethod def _generate_month_range(num_months: int) -> List[str]: """ Generate list of month keys for the last N months Args: num_months: Number of months to generate (e.g., 12) Returns: Sorted list of 'YYYY-MM' strings going back from today """ months = [] today = datetime.now() for i in range(num_months): # Go back i months from today using calendar arithmetic year = today.year month = today.month - i # Adjust year and month if month goes negative while month <= 0: month += 12 year -= 1 month_key = f"{year:04d}-{month:02d}" months.append(month_key) # Return sorted chronologically (oldest first) return sorted(months) @staticmethod def _format_period_label(period_from: datetime, period_to: datetime) -> str: """Format period as readable label (e.g., 'Jan', 'Jan-Mar', etc)""" from_month = period_from.strftime('%b') to_month = period_to.strftime('%b') from_year = period_from.year to_year = period_to.year # Calculate number of months months_diff = (period_to.year - period_from.year) * 12 + (period_to.month - period_from.month) if months_diff == 0: # Same month return from_month elif from_year == to_year: # Same year, different months return f"{from_month}-{to_month}" else: # Different years return f"{from_month} {from_year} - {to_month} {to_year}" @staticmethod def _determine_status(invoice_status: str) -> str: """Map e-conomic invoice status to matrix status""" status_map = { "sent": "invoiced", "paid": "paid", "draft": "draft", "credited": "credited", "unpaid": "invoiced", "booked": "invoiced" } result = status_map.get(invoice_status.lower() if invoice_status else 'unknown', invoice_status or 'unknown') return result if result else "unknown" # Singleton instance _matrix_service_instance = None def get_subscription_matrix_service() -> SubscriptionMatrixService: """Get singleton instance of SubscriptionMatrixService""" global _matrix_service_instance if _matrix_service_instance is None: _matrix_service_instance = SubscriptionMatrixService() return _matrix_service_instance