bmc_hub/app/jobs/process_subscriptions.py

301 lines
11 KiB
Python

"""
Subscription Invoice Processing Job
Processes active subscriptions when next_invoice_date is reached
Creates ordre drafts and advances subscription periods
Runs daily at 04:00
"""
import logging
from datetime import datetime, date
import json
from dateutil.relativedelta import relativedelta
from app.core.database import execute_query, get_db_connection
logger = logging.getLogger(__name__)
async def process_subscriptions():
"""
Main job: Process subscriptions due for invoicing.
- Find active subscriptions where next_invoice_date <= today
- Skip subscriptions blocked for invoicing (missing asset/serial)
- Aggregate eligible subscriptions into one ordre_draft per customer + merge key + due date + billing direction
- Advance period_start and next_invoice_date for processed subscriptions
"""
try:
logger.info("💰 Processing subscription invoices...")
# Find subscriptions due for invoicing
query = """
SELECT
s.id,
s.sag_id,
sg.titel AS sag_name,
s.customer_id,
c.name AS customer_name,
s.product_name,
s.billing_interval,
s.billing_direction,
s.advance_months,
s.price,
s.next_invoice_date,
s.period_start,
s.invoice_merge_key,
s.billing_blocked,
s.billing_block_reason,
COALESCE(
(
SELECT json_agg(
json_build_object(
'id', si.id,
'description', si.description,
'quantity', si.quantity,
'unit_price', si.unit_price,
'line_total', si.line_total,
'product_id', si.product_id,
'asset_id', si.asset_id,
'billing_blocked', si.billing_blocked,
'billing_block_reason', si.billing_block_reason,
'period_from', si.period_from,
'period_to', si.period_to
) ORDER BY si.id
)
FROM sag_subscription_items si
WHERE si.subscription_id = s.id
),
'[]'::json
) as line_items
FROM sag_subscriptions s
LEFT JOIN sag_sager sg ON sg.id = s.sag_id
LEFT JOIN customers c ON c.id = s.customer_id
WHERE s.status = 'active'
AND s.next_invoice_date <= CURRENT_DATE
ORDER BY s.next_invoice_date, s.id
"""
subscriptions = execute_query(query)
if not subscriptions:
logger.info("✅ No subscriptions due for invoicing")
return
logger.info(f"📋 Found {len(subscriptions)} subscription(s) to process")
blocked_count = 0
processed_count = 0
error_count = 0
grouped_subscriptions = {}
for sub in subscriptions:
if sub.get('billing_blocked'):
blocked_count += 1
logger.warning(
"⚠️ Subscription %s skipped due to billing block: %s",
sub.get('id'),
sub.get('billing_block_reason') or 'unknown reason'
)
continue
group_key = (
int(sub['customer_id']),
str(sub.get('invoice_merge_key') or f"cust-{sub['customer_id']}"),
str(sub.get('next_invoice_date')),
str(sub.get('billing_direction') or 'forward'),
)
grouped_subscriptions.setdefault(group_key, []).append(sub)
for group in grouped_subscriptions.values():
try:
count = await _process_subscription_group(group)
processed_count += count
except Exception as e:
logger.error("❌ Failed processing subscription group: %s", e, exc_info=True)
error_count += 1
logger.info(
"✅ Subscription processing complete: %s processed, %s blocked, %s errors",
processed_count,
blocked_count,
error_count,
)
except Exception as e:
logger.error(f"❌ Subscription processing job failed: {e}", exc_info=True)
async def _process_subscription_group(subscriptions: list[dict]) -> int:
"""Create one aggregated ordre draft for a group of subscriptions and advance all periods."""
if not subscriptions:
return 0
first = subscriptions[0]
customer_id = first['customer_id']
customer_name = first.get('customer_name') or f"Customer #{customer_id}"
billing_direction = first.get('billing_direction') or 'forward'
invoice_aggregate_key = first.get('invoice_merge_key') or f"cust-{customer_id}"
conn = get_db_connection()
cursor = conn.cursor()
try:
ordre_lines = []
source_subscription_ids = []
coverage_start = None
coverage_end = None
for sub in subscriptions:
subscription_id = int(sub['id'])
source_subscription_ids.append(subscription_id)
line_items = sub.get('line_items', [])
if isinstance(line_items, str):
line_items = json.loads(line_items)
period_start = sub.get('period_start') or sub.get('next_invoice_date')
period_end = _calculate_next_period_start(period_start, sub['billing_interval'])
if coverage_start is None or period_start < coverage_start:
coverage_start = period_start
if coverage_end is None or period_end > coverage_end:
coverage_end = period_end
for item in line_items:
if item.get('billing_blocked'):
logger.warning(
"⚠️ Skipping blocked subscription item %s on subscription %s",
item.get('id'),
subscription_id,
)
continue
product_number = str(item.get('product_id', 'SUB'))
ordre_lines.append({
"product": {
"productNumber": product_number,
"description": item.get('description', '')
},
"quantity": float(item.get('quantity', 1)),
"unitNetPrice": float(item.get('unit_price', 0)),
"totalNetAmount": float(item.get('line_total', 0)),
"discountPercentage": 0,
"metadata": {
"subscription_id": subscription_id,
"asset_id": item.get('asset_id'),
"period_from": str(item.get('period_from') or period_start),
"period_to": str(item.get('period_to') or period_end),
}
})
if not ordre_lines:
logger.warning("⚠️ No invoiceable lines in subscription group for customer %s", customer_id)
return 0
title = f"Abonnementer: {customer_name}"
notes = (
f"Aggregated abonnement faktura\n"
f"Kunde: {customer_name}\n"
f"Coverage: {coverage_start} til {coverage_end}\n"
f"Subscription IDs: {', '.join(str(sid) for sid in source_subscription_ids)}"
)
insert_query = """
INSERT INTO ordre_drafts (
title,
customer_id,
lines_json,
notes,
coverage_start,
coverage_end,
billing_direction,
source_subscription_ids,
invoice_aggregate_key,
layout_number,
created_by_user_id,
sync_status,
export_status_json,
updated_at
) VALUES (%s, %s, %s::jsonb, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, CURRENT_TIMESTAMP)
RETURNING id
"""
cursor.execute(insert_query, (
title,
customer_id,
json.dumps(ordre_lines, ensure_ascii=False),
notes,
coverage_start,
coverage_end,
billing_direction,
source_subscription_ids,
invoice_aggregate_key,
1, # Default layout
None, # System-created
'pending',
json.dumps({"source": "subscription", "subscription_ids": source_subscription_ids}, ensure_ascii=False)
))
ordre_id = cursor.fetchone()[0]
logger.info(
"✅ Created aggregated ordre draft #%s for %s subscription(s)",
ordre_id,
len(source_subscription_ids),
)
for sub in subscriptions:
subscription_id = int(sub['id'])
current_period_start = sub.get('period_start') or sub.get('next_invoice_date')
new_period_start = _calculate_next_period_start(current_period_start, sub['billing_interval'])
new_next_invoice_date = _calculate_next_period_start(new_period_start, sub['billing_interval'])
cursor.execute(
"""
UPDATE sag_subscriptions
SET period_start = %s,
next_invoice_date = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""",
(new_period_start, new_next_invoice_date, subscription_id)
)
conn.commit()
return len(source_subscription_ids)
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
conn.close()
def _calculate_next_period_start(current_date, billing_interval: str) -> date:
"""Calculate next period start date based on billing interval"""
# Parse current_date if it's a string
if isinstance(current_date, str):
current_date = datetime.strptime(current_date, '%Y-%m-%d').date()
elif isinstance(current_date, datetime):
current_date = current_date.date()
# Calculate delta based on interval
if billing_interval == 'daily':
delta = relativedelta(days=1)
elif billing_interval == 'biweekly':
delta = relativedelta(weeks=2)
elif billing_interval == 'monthly':
delta = relativedelta(months=1)
elif billing_interval == 'quarterly':
delta = relativedelta(months=3)
elif billing_interval == 'yearly':
delta = relativedelta(years=1)
else:
# Default to monthly if unknown
logger.warning(f"Unknown billing interval '{billing_interval}', defaulting to monthly")
delta = relativedelta(months=1)
next_date = current_date + delta
return next_date