bmc_hub/app/jobs/process_subscriptions.py

217 lines
7.7 KiB
Python
Raw Normal View History

"""
Subscription Invoice Processing Job
Processes active subscriptions when next_invoice_date is reached
Creates ordre drafts and advances subscription periods
Runs daily at 04:00
"""
import logging
from datetime import datetime, date
from decimal import Decimal
import json
from dateutil.relativedelta import relativedelta
from app.core.config import settings
from app.core.database import execute_query, get_db_connection
logger = logging.getLogger(__name__)
async def process_subscriptions():
"""
Main job: Process subscriptions due for invoicing
- Find active subscriptions where next_invoice_date <= TODAY
- Create ordre draft with line items from subscription
- Advance period_start and next_invoice_date based on billing_interval
- Log all actions for audit trail
"""
try:
logger.info("💰 Processing subscription invoices...")
# Find subscriptions due for invoicing
query = """
SELECT
s.id,
s.sag_id,
sg.titel AS sag_name,
s.customer_id,
c.name AS customer_name,
s.product_name,
s.billing_interval,
s.price,
s.next_invoice_date,
s.period_start,
COALESCE(
(
SELECT json_agg(
json_build_object(
'id', si.id,
'description', si.description,
'quantity', si.quantity,
'unit_price', si.unit_price,
'line_total', si.line_total,
'product_id', si.product_id
) ORDER BY si.id
)
FROM sag_subscription_items si
WHERE si.subscription_id = s.id
),
'[]'::json
) as line_items
FROM sag_subscriptions s
LEFT JOIN sag_sager sg ON sg.id = s.sag_id
LEFT JOIN customers c ON c.id = s.customer_id
WHERE s.status = 'active'
AND s.next_invoice_date <= CURRENT_DATE
ORDER BY s.next_invoice_date, s.id
"""
subscriptions = execute_query(query)
if not subscriptions:
logger.info("✅ No subscriptions due for invoicing")
return
logger.info(f"📋 Found {len(subscriptions)} subscription(s) to process")
processed_count = 0
error_count = 0
for sub in subscriptions:
try:
await _process_single_subscription(sub)
processed_count += 1
except Exception as e:
logger.error(f"❌ Failed to process subscription {sub['id']}: {e}", exc_info=True)
error_count += 1
logger.info(f"✅ Subscription processing complete: {processed_count} processed, {error_count} errors")
except Exception as e:
logger.error(f"❌ Subscription processing job failed: {e}", exc_info=True)
async def _process_single_subscription(sub: dict):
"""Process a single subscription: create ordre draft and advance period"""
subscription_id = sub['id']
logger.info(f"Processing subscription #{subscription_id}: {sub['product_name']} for {sub['customer_name']}")
conn = get_db_connection()
cursor = conn.cursor()
try:
# Convert line_items from JSON to list
line_items = sub.get('line_items', [])
if isinstance(line_items, str):
line_items = json.loads(line_items)
# Build ordre draft lines_json
ordre_lines = []
for item in line_items:
product_number = str(item.get('product_id', 'SUB'))
ordre_lines.append({
"product": {
"productNumber": product_number,
"description": item.get('description', '')
},
"quantity": float(item.get('quantity', 1)),
"unitNetPrice": float(item.get('unit_price', 0)),
"totalNetAmount": float(item.get('line_total', 0)),
"discountPercentage": 0
})
# Create ordre draft title with period information
period_start = sub.get('period_start') or sub.get('next_invoice_date')
next_period_start = _calculate_next_period_start(period_start, sub['billing_interval'])
title = f"Abonnement: {sub['product_name']}"
notes = f"Periode: {period_start} til {next_period_start}\nAbonnement ID: {subscription_id}"
if sub.get('sag_id'):
notes += f"\nSag: {sub['sag_name']}"
# Insert ordre draft
insert_query = """
INSERT INTO ordre_drafts (
title,
customer_id,
lines_json,
notes,
layout_number,
created_by_user_id,
export_status_json,
updated_at
) VALUES (%s, %s, %s::jsonb, %s, %s, %s, %s::jsonb, CURRENT_TIMESTAMP)
RETURNING id
"""
cursor.execute(insert_query, (
title,
sub['customer_id'],
json.dumps(ordre_lines, ensure_ascii=False),
notes,
1, # Default layout
None, # System-created
json.dumps({"source": "subscription", "subscription_id": subscription_id}, ensure_ascii=False)
))
ordre_id = cursor.fetchone()[0]
logger.info(f"✅ Created ordre draft #{ordre_id} for subscription #{subscription_id}")
# Calculate new period dates
current_period_start = sub.get('period_start') or sub.get('next_invoice_date')
new_period_start = next_period_start
new_next_invoice_date = _calculate_next_period_start(new_period_start, sub['billing_interval'])
# Update subscription with new period dates
update_query = """
UPDATE sag_subscriptions
SET period_start = %s,
next_invoice_date = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
"""
cursor.execute(update_query, (new_period_start, new_next_invoice_date, subscription_id))
conn.commit()
logger.info(f"✅ Advanced subscription #{subscription_id}: next invoice {new_next_invoice_date}")
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
conn.close()
def _calculate_next_period_start(current_date, billing_interval: str) -> date:
"""Calculate next period start date based on billing interval"""
# Parse current_date if it's a string
if isinstance(current_date, str):
current_date = datetime.strptime(current_date, '%Y-%m-%d').date()
elif isinstance(current_date, datetime):
current_date = current_date.date()
# Calculate delta based on interval
if billing_interval == 'daily':
delta = relativedelta(days=1)
elif billing_interval == 'biweekly':
delta = relativedelta(weeks=2)
elif billing_interval == 'monthly':
delta = relativedelta(months=1)
elif billing_interval == 'quarterly':
delta = relativedelta(months=3)
elif billing_interval == 'yearly':
delta = relativedelta(years=1)
else:
# Default to monthly if unknown
logger.warning(f"Unknown billing interval '{billing_interval}', defaulting to monthly")
delta = relativedelta(months=1)
next_date = current_date + delta
return next_date