517 lines
24 KiB
Python
517 lines
24 KiB
Python
"""
|
|
Subscription Billing Matrix Service
|
|
|
|
Generate a per-customer billing matrix showing:
|
|
- Rows: Products from e-conomic customer invoices
|
|
- Columns: Months
|
|
- Cells: Expected vs invoiced amounts, payment status, invoice references
|
|
|
|
Data source: e-conomic sales invoices only
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple
|
|
from collections import defaultdict
|
|
from app.services.economic_service import get_economic_service
|
|
from app.core.database import execute_query
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SubscriptionMatrixService:
|
|
"""Generate billing matrix for customer subscriptions"""
|
|
|
|
def __init__(self):
|
|
self.economic_service = get_economic_service()
|
|
|
|
async def generate_billing_matrix(
|
|
self,
|
|
customer_id: int,
|
|
months: int = 12
|
|
) -> Dict:
|
|
"""
|
|
Generate subscription billing matrix for a customer
|
|
|
|
Args:
|
|
customer_id: BMC Hub customer ID (local database)
|
|
months: Number of months to include (default 12)
|
|
|
|
Returns:
|
|
Dict with structure:
|
|
{
|
|
"customer_id": int,
|
|
"generated_at": str (ISO datetime),
|
|
"months_shown": int,
|
|
"products": [
|
|
{
|
|
"product_number": str,
|
|
"product_name": str,
|
|
"rows": [
|
|
{
|
|
"year_month": "2025-01",
|
|
"amount": float,
|
|
"status": "paid|invoiced|missing",
|
|
"invoice_number": str (optional),
|
|
"period_label": str,
|
|
"period_from": str (ISO date),
|
|
"period_to": str (ISO date)
|
|
}
|
|
],
|
|
"total_amount": float,
|
|
"total_paid": float
|
|
}
|
|
]
|
|
}
|
|
"""
|
|
try:
|
|
# Get customer's e-conomic number from local database
|
|
logger.info(f"🔍 [MATRIX] Starting matrix generation for customer {customer_id}")
|
|
|
|
customer = execute_query(
|
|
"SELECT economic_customer_number FROM customers WHERE id = %s",
|
|
(customer_id,)
|
|
)
|
|
|
|
logger.info(f"🔍 [MATRIX] Query result: {customer}")
|
|
|
|
if not customer or not customer[0].get('economic_customer_number'):
|
|
logger.warning(f"⚠️ Customer {customer_id} has no e-conomic number")
|
|
return {
|
|
"customer_id": customer_id,
|
|
"error": "No e-conomic customer number found",
|
|
"products": []
|
|
}
|
|
|
|
economic_customer_number = customer[0]['economic_customer_number']
|
|
logger.info(f"📊 Generating matrix for e-conomic customer {economic_customer_number}")
|
|
|
|
# Fetch invoices from e-conomic
|
|
logger.info(f"🔍 [MATRIX] About to call get_customer_invoices with customer {economic_customer_number}")
|
|
invoices = await self.economic_service.get_customer_invoices(
|
|
economic_customer_number,
|
|
include_lines=True
|
|
)
|
|
logger.info(f"🔍 [MATRIX] Returned {len(invoices)} invoices from e-conomic")
|
|
|
|
if not invoices:
|
|
logger.warning(f"⚠️ No invoices found for customer {economic_customer_number}")
|
|
return {
|
|
"customer_id": customer_id,
|
|
"generated_at": datetime.now().isoformat(),
|
|
"months_shown": months,
|
|
"products": []
|
|
}
|
|
|
|
logger.info(f"📊 Processing {len(invoices)} invoices")
|
|
|
|
# Debug: log first invoice structure
|
|
if invoices:
|
|
logger.debug(f"First invoice structure: {json.dumps(invoices[0], indent=2, default=str)[:500]}")
|
|
|
|
# Group invoices by product number
|
|
matrix_data = self._aggregate_by_product(invoices, months)
|
|
|
|
return {
|
|
"customer_id": customer_id,
|
|
"economic_customer_number": economic_customer_number,
|
|
"generated_at": datetime.now().isoformat(),
|
|
"months_shown": months,
|
|
"products": matrix_data
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error generating billing matrix: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return {
|
|
"customer_id": customer_id,
|
|
"error": str(e),
|
|
"products": []
|
|
}
|
|
|
|
def _aggregate_by_product(self, invoices: List[Dict], months: int) -> List[Dict]:
|
|
"""
|
|
Group invoice lines by product number and aggregate by month
|
|
|
|
Args:
|
|
invoices: List of e-conomic invoice objects
|
|
months: Number of months to include
|
|
|
|
Returns:
|
|
List of product records with aggregated monthly data (including empty months)
|
|
"""
|
|
# Generate list of all months to display (last N months from today)
|
|
all_months = self._generate_month_range(months)
|
|
|
|
# Structure: {product_number: {year_month: {amount, status, invoice_number, period_from, period_to}}}
|
|
product_matrix = defaultdict(dict)
|
|
product_names = {} # Cache product names
|
|
|
|
# Initialize products only if they have data within the selected month range
|
|
try:
|
|
# Fill in data from invoices, but only for months within range
|
|
logger.info(f"🔍 [MATRIX] Processing {len(invoices)} invoices")
|
|
invoice_count = 0
|
|
for invoice in invoices:
|
|
invoice_count += 1
|
|
invoice_number = invoice.get('bookedInvoiceNumber') or invoice.get('draftInvoiceNumber')
|
|
logger.info(f"📄 [MATRIX] [{invoice_count}/{len(invoices)}] Processing invoice {invoice_number}")
|
|
# Determine status based on invoice type/endpoint it came from
|
|
# Priority: use 'status' field, fallback to inferring from presence of certain fields
|
|
invoice_status = invoice.get('status', 'unknown')
|
|
if invoice_status == 'unknown':
|
|
# Infer status from invoice structure
|
|
if invoice.get('bookedInvoiceNumber'):
|
|
invoice_status = 'booked'
|
|
elif invoice.get('draftInvoiceNumber'):
|
|
invoice_status = 'draft'
|
|
invoice_date = invoice.get('date')
|
|
# Build invoice-level text (title/notes) for keyword matching
|
|
invoice_text_parts = []
|
|
notes = invoice.get('notes')
|
|
if isinstance(notes, dict):
|
|
for key in ["heading", "textLine1", "textLine2", "textLine3", "textLine4", "textLine5"]:
|
|
val = notes.get(key)
|
|
if val:
|
|
invoice_text_parts.append(str(val))
|
|
elif notes:
|
|
invoice_text_parts.append(str(notes))
|
|
# Common title/description fields
|
|
for key in ["heading", "description", "text", "subject"]:
|
|
val = invoice.get(key)
|
|
if val:
|
|
invoice_text_parts.append(str(val))
|
|
other_ref = invoice.get('otherReference') or invoice.get('orderNumberDb')
|
|
if other_ref:
|
|
invoice_text_parts.append(str(other_ref))
|
|
invoice_text = " ".join(invoice_text_parts).lower()
|
|
invoice_period = self._extract_period_from_text(invoice_text)
|
|
|
|
# Process invoice lines
|
|
lines = invoice.get('lines', [])
|
|
logger.info(f"📋 [MATRIX] Invoice {invoice_number} has {len(lines)} lines")
|
|
line_num = 0
|
|
for line in lines:
|
|
line_num += 1
|
|
product_number = line.get('product', {}).get('productNumber')
|
|
product_name = line.get('product', {}).get('name') or line.get('description')
|
|
line_description = (line.get('description') or product_name or "").lower()
|
|
logger.info(f" 📦 [MATRIX] Line {line_num}: Product='{product_name}', Number={product_number}")
|
|
|
|
# Check for period data
|
|
line_period_data = line.get('period', {})
|
|
line_has_period_field = bool(line_period_data.get('from'))
|
|
line_period = self._extract_period_from_text(line_description) or invoice_period
|
|
has_period = line_has_period_field or bool(line_period)
|
|
|
|
# Check keyword matching
|
|
has_periode_keyword = "periode" in line_description or "periode" in invoice_text
|
|
has_abonnement_keyword = "abonnement" in line_description or "abonnement" in invoice_text
|
|
|
|
# More lenient filtering: Include if it has keywords OR period data OR appears to be a recurring product
|
|
# Skip only obvious one-time items (installation, setup fees, hardware, etc.)
|
|
skip_keywords = ['installation', 'opsætning', 'setup', 'hardware', 'engangs']
|
|
should_skip = any(keyword in line_description for keyword in skip_keywords)
|
|
|
|
logger.info(f" 🔍 Checks: periode={has_periode_keyword}, abon={has_abonnement_keyword}, has_period={has_period}, should_skip={should_skip}")
|
|
|
|
if should_skip:
|
|
logger.info(f" ❌ SKIPPED (one-time): {product_name[:60] if product_name else 'NO NAME'}")
|
|
continue
|
|
|
|
logger.info(f" ✅ INCLUDED: {product_name[:60] if product_name else 'NO NAME'}")
|
|
|
|
if not product_number and not product_name:
|
|
logger.debug(f"Skipping line without product number: {line}")
|
|
continue
|
|
product_key = product_number or self._normalize_product_key(product_name or "")
|
|
|
|
# Cache product name
|
|
if product_key not in product_names:
|
|
product_names[product_key] = product_name or f"Product {product_number}"
|
|
|
|
# Extract period from line
|
|
period_from_str = line.get('period', {}).get('from')
|
|
period_to_str = line.get('period', {}).get('to')
|
|
|
|
# If no period on line, use invoice date as month
|
|
if not period_from_str:
|
|
if line_period:
|
|
period_from = line_period
|
|
period_to = self._end_of_month(period_from)
|
|
period_from_str = period_from.isoformat().split('T')[0]
|
|
period_to_str = period_to.isoformat().split('T')[0]
|
|
elif invoice_period:
|
|
period_from = invoice_period
|
|
period_to = self._end_of_month(period_from)
|
|
period_from_str = period_from.isoformat().split('T')[0]
|
|
period_to_str = period_to.isoformat().split('T')[0]
|
|
elif invoice_date:
|
|
period_from_str = invoice_date
|
|
# Assume monthly billing if no end date
|
|
period_from = datetime.fromisoformat(invoice_date.replace('Z', '+00:00'))
|
|
period_to = period_from + timedelta(days=31)
|
|
period_to_str = period_to.isoformat().split('T')[0]
|
|
else:
|
|
logger.warning(f"No period or date found for line in invoice {invoice_number}")
|
|
continue
|
|
|
|
# Parse dates
|
|
try:
|
|
period_from = datetime.fromisoformat(period_from_str.replace('Z', '+00:00'))
|
|
period_to = datetime.fromisoformat(period_to_str.replace('Z', '+00:00'))
|
|
except (ValueError, AttributeError) as e:
|
|
logger.warning(f"Could not parse dates {period_from_str} - {period_to_str}: {e}")
|
|
continue
|
|
|
|
# Get amount - use gross amount if net is 0 (line items may have gross only)
|
|
amount = float(line.get('netAmount', 0))
|
|
if amount == 0:
|
|
amount = float(line.get('grossAmount', 0))
|
|
if amount == 0:
|
|
# Try unit net price * quantity as fallback
|
|
unit_price = float(line.get('unitNetPrice', 0))
|
|
qty = float(line.get('quantity', 1))
|
|
amount = unit_price * qty
|
|
|
|
logger.info(f" 💰 Amount: {amount} kr, Period: {period_from_str} to {period_to_str}")
|
|
|
|
# Skip lines with no amount (text/description lines only)
|
|
if abs(amount) < 0.01:
|
|
logger.info(f" ⏭️ SKIPPED: Line with no amount (text field)")
|
|
continue
|
|
|
|
# Determine month key (use first month if multi-month)
|
|
year_month = period_from.strftime('%Y-%m')
|
|
logger.info(f" 📅 Month: {year_month}")
|
|
|
|
if year_month not in all_months:
|
|
# Skip lines outside the displayed month range
|
|
logger.info(f" ⏭️ SKIPPED: Month {year_month} outside display range {all_months[0]} - {all_months[-1]}")
|
|
continue
|
|
|
|
# Calculate period label
|
|
period_label = self._format_period_label(period_from, period_to)
|
|
|
|
# Initialize product if first time within range
|
|
if product_key not in product_matrix:
|
|
for month_key in all_months:
|
|
product_matrix[product_key][month_key] = {
|
|
"amount": 0.0,
|
|
"status": "missing",
|
|
"invoice_number": None,
|
|
"period_from": None,
|
|
"period_to": None,
|
|
"period_label": None
|
|
}
|
|
|
|
# Update cell
|
|
cell = product_matrix[product_key][year_month]
|
|
old_amount = cell["amount"]
|
|
cell["amount"] += amount
|
|
cell["status"] = self._merge_status(cell["status"], self._determine_status(invoice_status))
|
|
cell["invoice_number"] = invoice_number
|
|
cell["period_from"] = period_from.isoformat().split('T')[0]
|
|
cell["period_to"] = period_to.isoformat().split('T')[0]
|
|
cell["period_label"] = period_label
|
|
logger.info(f" ✏️ Updated {product_key} @ {year_month}: {old_amount} kr -> {cell['amount']} kr, status={cell['status']}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error aggregating data: {e}")
|
|
|
|
# Filter out products that only appear in one month (one-time purchases, not subscriptions)
|
|
# Keep products that appear in 2+ months OR have subscription keywords
|
|
logger.info(f"\n🔍 [MATRIX] Filtering products. Total collected: {len(product_matrix)}")
|
|
filtered_matrix = {}
|
|
for product_key, months_data in product_matrix.items():
|
|
# Count how many months have actual data (not missing status)
|
|
months_with_data = sum(1 for cell in months_data.values() if cell["status"] != "missing")
|
|
|
|
# Check if product name suggests subscription
|
|
product_name = product_names.get(product_key, "")
|
|
has_subscription_indicators = any(
|
|
keyword in product_name.lower()
|
|
for keyword in ['periode', 'abonnement', 'hosting', 'office', 'microsoft', 'subscription']
|
|
)
|
|
|
|
# Include if it appears in multiple months OR has subscription keywords
|
|
if months_with_data >= 2 or has_subscription_indicators:
|
|
filtered_matrix[product_key] = months_data
|
|
logger.info(f" ✅ KEEP: '{product_name}' (months={months_with_data}, subscription_keyword={has_subscription_indicators})")
|
|
else:
|
|
logger.info(f" ❌ REMOVE: '{product_name}' (months={months_with_data}, subscription_keyword={has_subscription_indicators})")
|
|
|
|
# Convert to output format
|
|
logger.info(f"\n📊 [MATRIX] Final output: {len(filtered_matrix)} products after filtering")
|
|
products = []
|
|
for product_number, months_data in sorted(filtered_matrix.items()):
|
|
product_name = product_names.get(product_number, f"Product {product_number}")
|
|
logger.info(f" 📦 Building output for: {product_name}")
|
|
rows = []
|
|
total_amount: float = 0.0
|
|
total_paid: float = 0.0
|
|
|
|
for year_month in sorted(months_data.keys()):
|
|
cell = months_data[year_month]
|
|
rows.append({
|
|
"year_month": year_month,
|
|
"amount": cell["amount"],
|
|
"status": cell["status"],
|
|
"invoice_number": cell["invoice_number"],
|
|
"period_label": cell["period_label"],
|
|
"period_from": cell["period_from"],
|
|
"period_to": cell["period_to"]
|
|
})
|
|
total_amount += cell["amount"]
|
|
if cell["status"] == "paid":
|
|
total_paid += cell["amount"]
|
|
|
|
products.append({
|
|
"product_number": product_number,
|
|
"product_name": product_names[product_number],
|
|
"rows": rows,
|
|
"total_amount": total_amount,
|
|
"total_paid": total_paid
|
|
})
|
|
|
|
return products
|
|
|
|
@staticmethod
|
|
def _generate_month_range(num_months: int) -> List[str]:
|
|
"""
|
|
Generate list of month keys for the last N months
|
|
|
|
Args:
|
|
num_months: Number of months to generate (e.g., 12)
|
|
|
|
Returns:
|
|
Sorted list of 'YYYY-MM' strings going back from today
|
|
"""
|
|
months = []
|
|
today = datetime.now()
|
|
|
|
for i in range(num_months):
|
|
# Go back i months from today using calendar arithmetic
|
|
year = today.year
|
|
month = today.month - i
|
|
|
|
# Adjust year and month if month goes negative
|
|
while month <= 0:
|
|
month += 12
|
|
year -= 1
|
|
|
|
month_key = f"{year:04d}-{month:02d}"
|
|
months.append(month_key)
|
|
|
|
# Return sorted chronologically (oldest first)
|
|
return sorted(months)
|
|
|
|
@staticmethod
|
|
def _end_of_month(dt: datetime) -> datetime:
|
|
"""Return last day of the month for a given date."""
|
|
next_month = dt.replace(day=28) + timedelta(days=4)
|
|
return next_month - timedelta(days=next_month.day)
|
|
|
|
@staticmethod
|
|
def _normalize_product_key(name: str) -> str:
|
|
"""Normalize product names to keep similar lines on the same row."""
|
|
base = (name or "").lower().strip()
|
|
# Remove period phrases like "periode" and trailing date ranges
|
|
for token in ["periode", "abonnement"]:
|
|
if token in base:
|
|
base = base.split(token)[0].strip()
|
|
# Collapse multiple spaces
|
|
while " " in base:
|
|
base = base.replace(" ", " ")
|
|
return base or name.lower().strip()
|
|
|
|
@staticmethod
|
|
def _extract_period_from_text(text: str) -> Optional[datetime]:
|
|
"""Extract month/year from invoice title/notes text (e.g., 'Periode May 2025')."""
|
|
if not text:
|
|
return None
|
|
month_map = {
|
|
"january": 1, "jan": 1, "januar": 1,
|
|
"february": 2, "feb": 2, "februar": 2,
|
|
"march": 3, "mar": 3, "marts": 3,
|
|
"april": 4, "apr": 4,
|
|
"may": 5, "maj": 5,
|
|
"june": 6, "jun": 6, "juni": 6,
|
|
"july": 7, "jul": 7, "juli": 7,
|
|
"august": 8, "aug": 8,
|
|
"september": 9, "sep": 9, "sept": 9,
|
|
"october": 10, "oct": 10, "okt": 10, "oktober": 10,
|
|
"november": 11, "nov": 11,
|
|
"december": 12, "dec": 12
|
|
}
|
|
tokens = text.replace(".", " ").replace("/", " ").split()
|
|
for i, token in enumerate(tokens[:-1]):
|
|
month = month_map.get(token.lower())
|
|
if month:
|
|
year_token = tokens[i + 1]
|
|
if year_token.isdigit() and len(year_token) == 4:
|
|
return datetime(int(year_token), month, 1)
|
|
return None
|
|
|
|
@staticmethod
|
|
def _merge_status(existing: str, incoming: str) -> str:
|
|
"""Keep the most 'complete' status when multiple invoices hit same cell."""
|
|
priority = {
|
|
"missing": 0,
|
|
"draft": 1,
|
|
"invoiced": 2,
|
|
"paid": 3,
|
|
"credited": 2
|
|
}
|
|
existing_key = existing or "missing"
|
|
incoming_key = incoming or "missing"
|
|
return incoming_key if priority.get(incoming_key, 0) >= priority.get(existing_key, 0) else existing_key
|
|
|
|
@staticmethod
|
|
def _format_period_label(period_from: datetime, period_to: datetime) -> str:
|
|
"""Format period as readable label (e.g., 'Jan', 'Jan-Mar', etc)"""
|
|
from_month = period_from.strftime('%b')
|
|
to_month = period_to.strftime('%b')
|
|
from_year = period_from.year
|
|
to_year = period_to.year
|
|
|
|
# Calculate number of months
|
|
months_diff = (period_to.year - period_from.year) * 12 + (period_to.month - period_from.month)
|
|
|
|
if months_diff == 0:
|
|
# Same month
|
|
return from_month
|
|
elif from_year == to_year:
|
|
# Same year, different months
|
|
return f"{from_month}-{to_month}"
|
|
else:
|
|
# Different years
|
|
return f"{from_month} {from_year} - {to_month} {to_year}"
|
|
|
|
@staticmethod
|
|
def _determine_status(invoice_status: str) -> str:
|
|
"""Map e-conomic invoice status to matrix status"""
|
|
status_map = {
|
|
"sent": "invoiced",
|
|
"paid": "paid",
|
|
"draft": "draft",
|
|
"credited": "credited",
|
|
"unpaid": "invoiced",
|
|
"booked": "invoiced"
|
|
}
|
|
result = status_map.get(invoice_status.lower() if invoice_status else 'unknown', invoice_status or 'unknown')
|
|
return result if result else "unknown"
|
|
|
|
|
|
# Singleton instance
|
|
_matrix_service_instance = None
|
|
|
|
def get_subscription_matrix_service() -> SubscriptionMatrixService:
|
|
"""Get singleton instance of SubscriptionMatrixService"""
|
|
global _matrix_service_instance
|
|
if _matrix_service_instance is None:
|
|
_matrix_service_instance = SubscriptionMatrixService()
|
|
return _matrix_service_instance
|