bmc_hub/app/services/subscription_matrix.py

462 lines
20 KiB
Python
Raw Normal View History

"""
Subscription Billing Matrix Service
Generate a per-customer billing matrix showing:
- Rows: Products from e-conomic customer invoices
- Columns: Months
- Cells: Expected vs invoiced amounts, payment status, invoice references
Data source: e-conomic sales invoices only
"""
import logging
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from collections import defaultdict
from app.services.economic_service import get_economic_service
from app.core.database import execute_query
logger = logging.getLogger(__name__)
class SubscriptionMatrixService:
"""Generate billing matrix for customer subscriptions"""
def __init__(self):
self.economic_service = get_economic_service()
async def generate_billing_matrix(
self,
customer_id: int,
months: int = 12
) -> Dict:
"""
Generate subscription billing matrix for a customer
Args:
customer_id: BMC Hub customer ID (local database)
months: Number of months to include (default 12)
Returns:
Dict with structure:
{
"customer_id": int,
"generated_at": str (ISO datetime),
"months_shown": int,
"products": [
{
"product_number": str,
"product_name": str,
"rows": [
{
"year_month": "2025-01",
"amount": float,
"status": "paid|invoiced|missing",
"invoice_number": str (optional),
"period_label": str,
"period_from": str (ISO date),
"period_to": str (ISO date)
}
],
"total_amount": float,
"total_paid": float
}
]
}
"""
try:
# Get customer's e-conomic number from local database
logger.info(f"🔍 [MATRIX] Starting matrix generation for customer {customer_id}")
customer = execute_query(
"SELECT economic_customer_number FROM customers WHERE id = %s",
(customer_id,)
)
logger.info(f"🔍 [MATRIX] Query result: {customer}")
if not customer or not customer[0].get('economic_customer_number'):
logger.warning(f"⚠️ Customer {customer_id} has no e-conomic number")
return {
"customer_id": customer_id,
"error": "No e-conomic customer number found",
"products": []
}
economic_customer_number = customer[0]['economic_customer_number']
logger.info(f"📊 Generating matrix for e-conomic customer {economic_customer_number}")
# Fetch invoices from e-conomic
logger.info(f"🔍 [MATRIX] About to call get_customer_invoices with customer {economic_customer_number}")
invoices = await self.economic_service.get_customer_invoices(
economic_customer_number,
include_lines=True
)
logger.info(f"🔍 [MATRIX] Returned {len(invoices)} invoices from e-conomic")
if not invoices:
logger.warning(f"⚠️ No invoices found for customer {economic_customer_number}")
return {
"customer_id": customer_id,
"generated_at": datetime.now().isoformat(),
"months_shown": months,
"products": []
}
logger.info(f"📊 Processing {len(invoices)} invoices")
# Debug: log first invoice structure
if invoices:
logger.debug(f"First invoice structure: {json.dumps(invoices[0], indent=2, default=str)[:500]}")
# Group invoices by product number
matrix_data = self._aggregate_by_product(invoices, months)
return {
"customer_id": customer_id,
"economic_customer_number": economic_customer_number,
"generated_at": datetime.now().isoformat(),
"months_shown": months,
"products": matrix_data
}
except Exception as e:
logger.error(f"❌ Error generating billing matrix: {e}")
import traceback
logger.error(traceback.format_exc())
return {
"customer_id": customer_id,
"error": str(e),
"products": []
}
def _aggregate_by_product(self, invoices: List[Dict], months: int) -> List[Dict]:
"""
Group invoice lines by product number and aggregate by month
Args:
invoices: List of e-conomic invoice objects
months: Number of months to include
Returns:
List of product records with aggregated monthly data (including empty months)
"""
# Generate list of all months to display (last N months from today)
all_months = self._generate_month_range(months)
# Structure: {product_number: {year_month: {amount, status, invoice_number, period_from, period_to}}}
product_matrix = defaultdict(dict)
product_names = {} # Cache product names
# Initialize products only if they have data within the selected month range
try:
# Fill in data from invoices, but only for months within range
for invoice in invoices:
invoice_number = invoice.get('bookedInvoiceNumber') or invoice.get('draftInvoiceNumber')
# Determine status based on invoice type/endpoint it came from
# Priority: use 'status' field, fallback to inferring from presence of certain fields
invoice_status = invoice.get('status', 'unknown')
if invoice_status == 'unknown':
# Infer status from invoice structure
if invoice.get('bookedInvoiceNumber'):
invoice_status = 'booked'
elif invoice.get('draftInvoiceNumber'):
invoice_status = 'draft'
invoice_date = invoice.get('date')
# Build invoice-level text (title/notes) for keyword matching
invoice_text_parts = []
notes = invoice.get('notes')
if isinstance(notes, dict):
for key in ["heading", "textLine1", "textLine2", "textLine3", "textLine4", "textLine5"]:
val = notes.get(key)
if val:
invoice_text_parts.append(str(val))
elif notes:
invoice_text_parts.append(str(notes))
# Common title/description fields
for key in ["heading", "description", "text", "subject"]:
val = invoice.get(key)
if val:
invoice_text_parts.append(str(val))
other_ref = invoice.get('otherReference') or invoice.get('orderNumberDb')
if other_ref:
invoice_text_parts.append(str(other_ref))
invoice_text = " ".join(invoice_text_parts).lower()
invoice_period = self._extract_period_from_text(invoice_text)
# Process invoice lines
lines = invoice.get('lines', [])
for line in lines:
product_number = line.get('product', {}).get('productNumber')
product_name = line.get('product', {}).get('name') or line.get('description')
line_description = (line.get('description') or product_name or "").lower()
line_period = self._extract_period_from_text(line_description) or invoice_period
has_period = bool(line.get('period', {}).get('from') if line.get('period') else None) or bool(line_period)
# Only include lines that indicate a period or subscription or explicit period fields
if (
"periode" not in line_description
and "abonnement" not in line_description
and "periode" not in invoice_text
and "abonnement" not in invoice_text
and not has_period
):
continue
if not product_number and not product_name:
logger.debug(f"Skipping line without product number: {line}")
continue
product_key = product_number or self._normalize_product_key(product_name or "")
# Cache product name
if product_key not in product_names:
product_names[product_key] = product_name or f"Product {product_number}"
# Extract period from line
period_from_str = line.get('period', {}).get('from')
period_to_str = line.get('period', {}).get('to')
# If no period on line, use invoice date as month
if not period_from_str:
if line_period:
period_from = line_period
period_to = self._end_of_month(period_from)
period_from_str = period_from.isoformat().split('T')[0]
period_to_str = period_to.isoformat().split('T')[0]
elif invoice_period:
period_from = invoice_period
period_to = self._end_of_month(period_from)
period_from_str = period_from.isoformat().split('T')[0]
period_to_str = period_to.isoformat().split('T')[0]
elif invoice_date:
period_from_str = invoice_date
# Assume monthly billing if no end date
period_from = datetime.fromisoformat(invoice_date.replace('Z', '+00:00'))
period_to = period_from + timedelta(days=31)
period_to_str = period_to.isoformat().split('T')[0]
else:
logger.warning(f"No period or date found for line in invoice {invoice_number}")
continue
# Parse dates
try:
period_from = datetime.fromisoformat(period_from_str.replace('Z', '+00:00'))
period_to = datetime.fromisoformat(period_to_str.replace('Z', '+00:00'))
except (ValueError, AttributeError) as e:
logger.warning(f"Could not parse dates {period_from_str} - {period_to_str}: {e}")
continue
# Get amount - use gross amount if net is 0 (line items may have gross only)
amount = float(line.get('netAmount', 0))
if amount == 0:
amount = float(line.get('grossAmount', 0))
if amount == 0:
# Try unit net price * quantity as fallback
unit_price = float(line.get('unitNetPrice', 0))
qty = float(line.get('quantity', 1))
amount = unit_price * qty
# Determine month key (use first month if multi-month)
year_month = period_from.strftime('%Y-%m')
if year_month not in all_months:
# Skip lines outside the displayed month range
continue
# Calculate period label
period_label = self._format_period_label(period_from, period_to)
# Initialize product if first time within range
if product_key not in product_matrix:
for month_key in all_months:
product_matrix[product_key][month_key] = {
"amount": 0.0,
"status": "missing",
"invoice_number": None,
"period_from": None,
"period_to": None,
"period_label": None
}
# Update cell
cell = product_matrix[product_key][year_month]
cell["amount"] += amount
cell["status"] = self._merge_status(cell["status"], self._determine_status(invoice_status))
cell["invoice_number"] = invoice_number
cell["period_from"] = period_from.isoformat().split('T')[0]
cell["period_to"] = period_to.isoformat().split('T')[0]
cell["period_label"] = period_label
except Exception as e:
logger.error(f"❌ Error aggregating data: {e}")
# Convert to output format
products = []
for product_number, months_data in sorted(product_matrix.items()):
rows = []
total_amount: float = 0.0
total_paid: float = 0.0
for year_month in sorted(months_data.keys()):
cell = months_data[year_month]
rows.append({
"year_month": year_month,
"amount": cell["amount"],
"status": cell["status"],
"invoice_number": cell["invoice_number"],
"period_label": cell["period_label"],
"period_from": cell["period_from"],
"period_to": cell["period_to"]
})
total_amount += cell["amount"]
if cell["status"] == "paid":
total_paid += cell["amount"]
products.append({
"product_number": product_number,
"product_name": product_names[product_number],
"rows": rows,
"total_amount": total_amount,
"total_paid": total_paid
})
return products
@staticmethod
def _generate_month_range(num_months: int) -> List[str]:
"""
Generate list of month keys for the last N months
Args:
num_months: Number of months to generate (e.g., 12)
Returns:
Sorted list of 'YYYY-MM' strings going back from today
"""
months = []
today = datetime.now()
for i in range(num_months):
# Go back i months from today using calendar arithmetic
year = today.year
month = today.month - i
# Adjust year and month if month goes negative
while month <= 0:
month += 12
year -= 1
month_key = f"{year:04d}-{month:02d}"
months.append(month_key)
# Return sorted chronologically (oldest first)
return sorted(months)
@staticmethod
def _end_of_month(dt: datetime) -> datetime:
"""Return last day of the month for a given date."""
next_month = dt.replace(day=28) + timedelta(days=4)
return next_month - timedelta(days=next_month.day)
@staticmethod
def _normalize_product_key(name: str) -> str:
"""Normalize product names to keep similar lines on the same row."""
base = (name or "").lower().strip()
# Remove period phrases like "periode" and trailing date ranges
for token in ["periode", "abonnement"]:
if token in base:
base = base.split(token)[0].strip()
# Collapse multiple spaces
while " " in base:
base = base.replace(" ", " ")
return base or name.lower().strip()
@staticmethod
def _extract_period_from_text(text: str) -> Optional[datetime]:
"""Extract month/year from invoice title/notes text (e.g., 'Periode May 2025')."""
if not text:
return None
month_map = {
"january": 1, "jan": 1, "januar": 1,
"february": 2, "feb": 2, "februar": 2,
"march": 3, "mar": 3, "marts": 3,
"april": 4, "apr": 4,
"may": 5, "maj": 5,
"june": 6, "jun": 6, "juni": 6,
"july": 7, "jul": 7, "juli": 7,
"august": 8, "aug": 8,
"september": 9, "sep": 9, "sept": 9,
"october": 10, "oct": 10, "okt": 10, "oktober": 10,
"november": 11, "nov": 11,
"december": 12, "dec": 12
}
tokens = text.replace(".", " ").replace("/", " ").split()
for i, token in enumerate(tokens[:-1]):
month = month_map.get(token.lower())
if month:
year_token = tokens[i + 1]
if year_token.isdigit() and len(year_token) == 4:
return datetime(int(year_token), month, 1)
return None
@staticmethod
def _merge_status(existing: str, incoming: str) -> str:
"""Keep the most 'complete' status when multiple invoices hit same cell."""
priority = {
"missing": 0,
"draft": 1,
"invoiced": 2,
"paid": 3,
"credited": 2
}
existing_key = existing or "missing"
incoming_key = incoming or "missing"
return incoming_key if priority.get(incoming_key, 0) >= priority.get(existing_key, 0) else existing_key
@staticmethod
def _format_period_label(period_from: datetime, period_to: datetime) -> str:
"""Format period as readable label (e.g., 'Jan', 'Jan-Mar', etc)"""
from_month = period_from.strftime('%b')
to_month = period_to.strftime('%b')
from_year = period_from.year
to_year = period_to.year
# Calculate number of months
months_diff = (period_to.year - period_from.year) * 12 + (period_to.month - period_from.month)
if months_diff == 0:
# Same month
return from_month
elif from_year == to_year:
# Same year, different months
return f"{from_month}-{to_month}"
else:
# Different years
return f"{from_month} {from_year} - {to_month} {to_year}"
@staticmethod
def _determine_status(invoice_status: str) -> str:
"""Map e-conomic invoice status to matrix status"""
status_map = {
"sent": "invoiced",
"paid": "paid",
"draft": "draft",
"credited": "credited",
"unpaid": "invoiced",
"booked": "invoiced"
}
result = status_map.get(invoice_status.lower() if invoice_status else 'unknown', invoice_status or 'unknown')
return result if result else "unknown"
# Singleton instance
_matrix_service_instance = None
def get_subscription_matrix_service() -> SubscriptionMatrixService:
"""Get singleton instance of SubscriptionMatrixService"""
global _matrix_service_instance
if _matrix_service_instance is None:
_matrix_service_instance = SubscriptionMatrixService()
return _matrix_service_instance