""" Subscription Billing Matrix Service Generate a per-customer billing matrix showing: - Rows: Products from e-conomic customer invoices - Columns: Months - Cells: Expected vs invoiced amounts, payment status, invoice references Data source: e-conomic sales invoices only """ import logging import json from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple from collections import defaultdict from app.services.economic_service import get_economic_service from app.core.database import execute_query logger = logging.getLogger(__name__) class SubscriptionMatrixService: """Generate billing matrix for customer subscriptions""" def __init__(self): self.economic_service = get_economic_service() async def generate_billing_matrix( self, customer_id: int, months: int = 12 ) -> Dict: """ Generate subscription billing matrix for a customer Args: customer_id: BMC Hub customer ID (local database) months: Number of months to include (default 12) Returns: Dict with structure: { "customer_id": int, "generated_at": str (ISO datetime), "months_shown": int, "products": [ { "product_number": str, "product_name": str, "rows": [ { "year_month": "2025-01", "amount": float, "status": "paid|invoiced|missing", "invoice_number": str (optional), "period_label": str, "period_from": str (ISO date), "period_to": str (ISO date) } ], "total_amount": float, "total_paid": float } ] } """ try: # Get customer's e-conomic number from local database logger.info(f"πŸ” [MATRIX] Starting matrix generation for customer {customer_id}") customer = execute_query( "SELECT economic_customer_number FROM customers WHERE id = %s", (customer_id,) ) logger.info(f"πŸ” [MATRIX] Query result: {customer}") if not customer or not customer[0].get('economic_customer_number'): logger.warning(f"⚠️ Customer {customer_id} has no e-conomic number") return { "customer_id": customer_id, "error": "No e-conomic customer number found", "products": [] } economic_customer_number = customer[0]['economic_customer_number'] logger.info(f"πŸ“Š Generating matrix for e-conomic customer {economic_customer_number}") # Fetch invoices from e-conomic logger.info(f"πŸ” [MATRIX] About to call get_customer_invoices with customer {economic_customer_number}") invoices = await self.economic_service.get_customer_invoices( economic_customer_number, include_lines=True ) logger.info(f"πŸ” [MATRIX] Returned {len(invoices)} invoices from e-conomic") if not invoices: logger.warning(f"⚠️ No invoices found for customer {economic_customer_number}") return { "customer_id": customer_id, "generated_at": datetime.now().isoformat(), "months_shown": months, "products": [] } logger.info(f"πŸ“Š Processing {len(invoices)} invoices") # Debug: log first invoice structure if invoices: logger.debug(f"First invoice structure: {json.dumps(invoices[0], indent=2, default=str)[:500]}") # Group invoices by product number matrix_data = self._aggregate_by_product(invoices, months) return { "customer_id": customer_id, "economic_customer_number": economic_customer_number, "generated_at": datetime.now().isoformat(), "months_shown": months, "products": matrix_data } except Exception as e: logger.error(f"❌ Error generating billing matrix: {e}") import traceback logger.error(traceback.format_exc()) return { "customer_id": customer_id, "error": str(e), "products": [] } def _aggregate_by_product(self, invoices: List[Dict], months: int) -> List[Dict]: """ Group invoice lines by product number and aggregate by month Args: invoices: List of e-conomic invoice objects months: Number of months to include Returns: List of product records with aggregated monthly data (including empty months) """ # Generate list of all months to display (last N months from today) all_months = self._generate_month_range(months) # Structure: {product_number: {year_month: {amount, status, invoice_number, period_from, period_to}}} product_matrix = defaultdict(dict) product_names = {} # Cache product names # Initialize products only if they have data within the selected month range try: # Fill in data from invoices, but only for months within range logger.info(f"πŸ” [MATRIX] Processing {len(invoices)} invoices") invoice_count = 0 for invoice in invoices: invoice_count += 1 invoice_number = invoice.get('bookedInvoiceNumber') or invoice.get('draftInvoiceNumber') logger.info(f"πŸ“„ [MATRIX] [{invoice_count}/{len(invoices)}] Processing invoice {invoice_number}") # Determine status based on invoice type/endpoint it came from # Priority: use 'status' field, fallback to inferring from presence of certain fields invoice_status = invoice.get('status', 'unknown') if invoice_status == 'unknown': # Infer status from invoice structure if invoice.get('bookedInvoiceNumber'): invoice_status = 'booked' elif invoice.get('draftInvoiceNumber'): invoice_status = 'draft' invoice_date = invoice.get('date') # Build invoice-level text (title/notes) for keyword matching invoice_text_parts = [] notes = invoice.get('notes') if isinstance(notes, dict): for key in ["heading", "textLine1", "textLine2", "textLine3", "textLine4", "textLine5"]: val = notes.get(key) if val: invoice_text_parts.append(str(val)) elif notes: invoice_text_parts.append(str(notes)) # Common title/description fields for key in ["heading", "description", "text", "subject"]: val = invoice.get(key) if val: invoice_text_parts.append(str(val)) other_ref = invoice.get('otherReference') or invoice.get('orderNumberDb') if other_ref: invoice_text_parts.append(str(other_ref)) invoice_text = " ".join(invoice_text_parts).lower() invoice_period = self._extract_period_from_text(invoice_text) # Process invoice lines lines = invoice.get('lines', []) logger.info(f"πŸ“‹ [MATRIX] Invoice {invoice_number} has {len(lines)} lines") line_num = 0 for line in lines: line_num += 1 product_number = line.get('product', {}).get('productNumber') product_name = line.get('product', {}).get('name') or line.get('description') line_description = (line.get('description') or product_name or "").lower() logger.info(f" πŸ“¦ [MATRIX] Line {line_num}: Product='{product_name}', Number={product_number}") # Check for period data line_period_data = line.get('period', {}) line_has_period_field = bool(line_period_data.get('from')) line_period = self._extract_period_from_text(line_description) or invoice_period has_period = line_has_period_field or bool(line_period) # Check keyword matching has_periode_keyword = "periode" in line_description or "periode" in invoice_text has_abonnement_keyword = "abonnement" in line_description or "abonnement" in invoice_text # More lenient filtering: Include if it has keywords OR period data OR appears to be a recurring product # Skip only obvious one-time items (installation, setup fees, hardware, etc.) skip_keywords = ['installation', 'opsΓ¦tning', 'setup', 'hardware', 'engangs'] should_skip = any(keyword in line_description for keyword in skip_keywords) logger.info(f" πŸ” Checks: periode={has_periode_keyword}, abon={has_abonnement_keyword}, has_period={has_period}, should_skip={should_skip}") if should_skip: logger.info(f" ❌ SKIPPED (one-time): {product_name[:60] if product_name else 'NO NAME'}") continue logger.info(f" βœ… INCLUDED: {product_name[:60] if product_name else 'NO NAME'}") if not product_number and not product_name: logger.debug(f"Skipping line without product number: {line}") continue product_key = product_number or self._normalize_product_key(product_name or "") # Cache product name if product_key not in product_names: product_names[product_key] = product_name or f"Product {product_number}" # Extract period from line period_from_str = line.get('period', {}).get('from') period_to_str = line.get('period', {}).get('to') # If no period on line, use invoice date as month if not period_from_str: if line_period: period_from = line_period period_to = self._end_of_month(period_from) period_from_str = period_from.isoformat().split('T')[0] period_to_str = period_to.isoformat().split('T')[0] elif invoice_period: period_from = invoice_period period_to = self._end_of_month(period_from) period_from_str = period_from.isoformat().split('T')[0] period_to_str = period_to.isoformat().split('T')[0] elif invoice_date: period_from_str = invoice_date # Assume monthly billing if no end date period_from = datetime.fromisoformat(invoice_date.replace('Z', '+00:00')) period_to = period_from + timedelta(days=31) period_to_str = period_to.isoformat().split('T')[0] else: logger.warning(f"No period or date found for line in invoice {invoice_number}") continue # Parse dates try: period_from = datetime.fromisoformat(period_from_str.replace('Z', '+00:00')) period_to = datetime.fromisoformat(period_to_str.replace('Z', '+00:00')) except (ValueError, AttributeError) as e: logger.warning(f"Could not parse dates {period_from_str} - {period_to_str}: {e}") continue # Get amount - use gross amount if net is 0 (line items may have gross only) amount = float(line.get('netAmount', 0)) if amount == 0: amount = float(line.get('grossAmount', 0)) if amount == 0: # Try unit net price * quantity as fallback unit_price = float(line.get('unitNetPrice', 0)) qty = float(line.get('quantity', 1)) amount = unit_price * qty logger.info(f" πŸ’° Amount: {amount} kr, Period: {period_from_str} to {period_to_str}") # Determine month key (use first month if multi-month) year_month = period_from.strftime('%Y-%m') logger.info(f" πŸ“… Month: {year_month}") if year_month not in all_months: # Skip lines outside the displayed month range logger.info(f" ⏭️ SKIPPED: Month {year_month} outside display range {all_months[0]} - {all_months[-1]}") continue # Calculate period label period_label = self._format_period_label(period_from, period_to) # Initialize product if first time within range if product_key not in product_matrix: for month_key in all_months: product_matrix[product_key][month_key] = { "amount": 0.0, "status": "missing", "invoice_number": None, "period_from": None, "period_to": None, "period_label": None } # Update cell cell = product_matrix[product_key][year_month] old_amount = cell["amount"] cell["amount"] += amount cell["status"] = self._merge_status(cell["status"], self._determine_status(invoice_status)) cell["invoice_number"] = invoice_number cell["period_from"] = period_from.isoformat().split('T')[0] cell["period_to"] = period_to.isoformat().split('T')[0] cell["period_label"] = period_label logger.info(f" ✏️ Updated {product_key} @ {year_month}: {old_amount} kr -> {cell['amount']} kr, status={cell['status']}") except Exception as e: logger.error(f"❌ Error aggregating data: {e}") # Filter out products that only appear in one month (one-time purchases, not subscriptions) # Keep products that appear in 2+ months OR have subscription keywords logger.info(f"\nπŸ” [MATRIX] Filtering products. Total collected: {len(product_matrix)}") filtered_matrix = {} for product_key, months_data in product_matrix.items(): # Count how many months have actual data (not missing status) months_with_data = sum(1 for cell in months_data.values() if cell["status"] != "missing") # Check if product name suggests subscription product_name = product_names.get(product_key, "") has_subscription_indicators = any( keyword in product_name.lower() for keyword in ['periode', 'abonnement', 'hosting', 'office', 'microsoft', 'subscription'] ) # Include if it appears in multiple months OR has subscription keywords if months_with_data >= 2 or has_subscription_indicators: filtered_matrix[product_key] = months_data logger.info(f" βœ… KEEP: '{product_name}' (months={months_with_data}, subscription_keyword={has_subscription_indicators})") else: logger.info(f" ❌ REMOVE: '{product_name}' (months={months_with_data}, subscription_keyword={has_subscription_indicators})") # Convert to output format logger.info(f"\nπŸ“Š [MATRIX] Final output: {len(filtered_matrix)} products after filtering") products = [] for product_number, months_data in sorted(filtered_matrix.items()): product_name = product_names.get(product_number, f"Product {product_number}") logger.info(f" πŸ“¦ Building output for: {product_name}") rows = [] total_amount: float = 0.0 total_paid: float = 0.0 for year_month in sorted(months_data.keys()): cell = months_data[year_month] rows.append({ "year_month": year_month, "amount": cell["amount"], "status": cell["status"], "invoice_number": cell["invoice_number"], "period_label": cell["period_label"], "period_from": cell["period_from"], "period_to": cell["period_to"] }) total_amount += cell["amount"] if cell["status"] == "paid": total_paid += cell["amount"] products.append({ "product_number": product_number, "product_name": product_names[product_number], "rows": rows, "total_amount": total_amount, "total_paid": total_paid }) return products @staticmethod def _generate_month_range(num_months: int) -> List[str]: """ Generate list of month keys for the last N months Args: num_months: Number of months to generate (e.g., 12) Returns: Sorted list of 'YYYY-MM' strings going back from today """ months = [] today = datetime.now() for i in range(num_months): # Go back i months from today using calendar arithmetic year = today.year month = today.month - i # Adjust year and month if month goes negative while month <= 0: month += 12 year -= 1 month_key = f"{year:04d}-{month:02d}" months.append(month_key) # Return sorted chronologically (oldest first) return sorted(months) @staticmethod def _end_of_month(dt: datetime) -> datetime: """Return last day of the month for a given date.""" next_month = dt.replace(day=28) + timedelta(days=4) return next_month - timedelta(days=next_month.day) @staticmethod def _normalize_product_key(name: str) -> str: """Normalize product names to keep similar lines on the same row.""" base = (name or "").lower().strip() # Remove period phrases like "periode" and trailing date ranges for token in ["periode", "abonnement"]: if token in base: base = base.split(token)[0].strip() # Collapse multiple spaces while " " in base: base = base.replace(" ", " ") return base or name.lower().strip() @staticmethod def _extract_period_from_text(text: str) -> Optional[datetime]: """Extract month/year from invoice title/notes text (e.g., 'Periode May 2025').""" if not text: return None month_map = { "january": 1, "jan": 1, "januar": 1, "february": 2, "feb": 2, "februar": 2, "march": 3, "mar": 3, "marts": 3, "april": 4, "apr": 4, "may": 5, "maj": 5, "june": 6, "jun": 6, "juni": 6, "july": 7, "jul": 7, "juli": 7, "august": 8, "aug": 8, "september": 9, "sep": 9, "sept": 9, "october": 10, "oct": 10, "okt": 10, "oktober": 10, "november": 11, "nov": 11, "december": 12, "dec": 12 } tokens = text.replace(".", " ").replace("/", " ").split() for i, token in enumerate(tokens[:-1]): month = month_map.get(token.lower()) if month: year_token = tokens[i + 1] if year_token.isdigit() and len(year_token) == 4: return datetime(int(year_token), month, 1) return None @staticmethod def _merge_status(existing: str, incoming: str) -> str: """Keep the most 'complete' status when multiple invoices hit same cell.""" priority = { "missing": 0, "draft": 1, "invoiced": 2, "paid": 3, "credited": 2 } existing_key = existing or "missing" incoming_key = incoming or "missing" return incoming_key if priority.get(incoming_key, 0) >= priority.get(existing_key, 0) else existing_key @staticmethod def _format_period_label(period_from: datetime, period_to: datetime) -> str: """Format period as readable label (e.g., 'Jan', 'Jan-Mar', etc)""" from_month = period_from.strftime('%b') to_month = period_to.strftime('%b') from_year = period_from.year to_year = period_to.year # Calculate number of months months_diff = (period_to.year - period_from.year) * 12 + (period_to.month - period_from.month) if months_diff == 0: # Same month return from_month elif from_year == to_year: # Same year, different months return f"{from_month}-{to_month}" else: # Different years return f"{from_month} {from_year} - {to_month} {to_year}" @staticmethod def _determine_status(invoice_status: str) -> str: """Map e-conomic invoice status to matrix status""" status_map = { "sent": "invoiced", "paid": "paid", "draft": "draft", "credited": "credited", "unpaid": "invoiced", "booked": "invoiced" } result = status_map.get(invoice_status.lower() if invoice_status else 'unknown', invoice_status or 'unknown') return result if result else "unknown" # Singleton instance _matrix_service_instance = None def get_subscription_matrix_service() -> SubscriptionMatrixService: """Get singleton instance of SubscriptionMatrixService""" global _matrix_service_instance if _matrix_service_instance is None: _matrix_service_instance = SubscriptionMatrixService() return _matrix_service_instance