""" Subscription Invoice Processing Job Processes active subscriptions when next_invoice_date is reached Creates ordre drafts and advances subscription periods Runs daily at 04:00 """ import logging from datetime import datetime, date from decimal import Decimal import json from dateutil.relativedelta import relativedelta from app.core.config import settings from app.core.database import execute_query, get_db_connection logger = logging.getLogger(__name__) async def process_subscriptions(): """ Main job: Process subscriptions due for invoicing - Find active subscriptions where next_invoice_date <= TODAY - Create ordre draft with line items from subscription - Advance period_start and next_invoice_date based on billing_interval - Log all actions for audit trail """ try: logger.info("💰 Processing subscription invoices...") # Find subscriptions due for invoicing query = """ SELECT s.id, s.sag_id, sg.titel AS sag_name, s.customer_id, c.name AS customer_name, s.product_name, s.billing_interval, s.price, s.next_invoice_date, s.period_start, COALESCE( ( SELECT json_agg( json_build_object( 'id', si.id, 'description', si.description, 'quantity', si.quantity, 'unit_price', si.unit_price, 'line_total', si.line_total, 'product_id', si.product_id ) ORDER BY si.id ) FROM sag_subscription_items si WHERE si.subscription_id = s.id ), '[]'::json ) as line_items FROM sag_subscriptions s LEFT JOIN sag_sager sg ON sg.id = s.sag_id LEFT JOIN customers c ON c.id = s.customer_id WHERE s.status = 'active' AND s.next_invoice_date <= CURRENT_DATE ORDER BY s.next_invoice_date, s.id """ subscriptions = execute_query(query) if not subscriptions: logger.info("✅ No subscriptions due for invoicing") return logger.info(f"📋 Found {len(subscriptions)} subscription(s) to process") processed_count = 0 error_count = 0 for sub in subscriptions: try: await _process_single_subscription(sub) processed_count += 1 except Exception as e: logger.error(f"❌ Failed to process subscription {sub['id']}: {e}", exc_info=True) error_count += 1 logger.info(f"✅ Subscription processing complete: {processed_count} processed, {error_count} errors") except Exception as e: logger.error(f"❌ Subscription processing job failed: {e}", exc_info=True) async def _process_single_subscription(sub: dict): """Process a single subscription: create ordre draft and advance period""" subscription_id = sub['id'] logger.info(f"Processing subscription #{subscription_id}: {sub['product_name']} for {sub['customer_name']}") conn = get_db_connection() cursor = conn.cursor() try: # Convert line_items from JSON to list line_items = sub.get('line_items', []) if isinstance(line_items, str): line_items = json.loads(line_items) # Build ordre draft lines_json ordre_lines = [] for item in line_items: product_number = str(item.get('product_id', 'SUB')) ordre_lines.append({ "product": { "productNumber": product_number, "description": item.get('description', '') }, "quantity": float(item.get('quantity', 1)), "unitNetPrice": float(item.get('unit_price', 0)), "totalNetAmount": float(item.get('line_total', 0)), "discountPercentage": 0 }) # Create ordre draft title with period information period_start = sub.get('period_start') or sub.get('next_invoice_date') next_period_start = _calculate_next_period_start(period_start, sub['billing_interval']) title = f"Abonnement: {sub['product_name']}" notes = f"Periode: {period_start} til {next_period_start}\nAbonnement ID: {subscription_id}" if sub.get('sag_id'): notes += f"\nSag: {sub['sag_name']}" # Insert ordre draft insert_query = """ INSERT INTO ordre_drafts ( title, customer_id, lines_json, notes, layout_number, created_by_user_id, export_status_json, updated_at ) VALUES (%s, %s, %s::jsonb, %s, %s, %s, %s::jsonb, CURRENT_TIMESTAMP) RETURNING id """ cursor.execute(insert_query, ( title, sub['customer_id'], json.dumps(ordre_lines, ensure_ascii=False), notes, 1, # Default layout None, # System-created json.dumps({"source": "subscription", "subscription_id": subscription_id}, ensure_ascii=False) )) ordre_id = cursor.fetchone()[0] logger.info(f"✅ Created ordre draft #{ordre_id} for subscription #{subscription_id}") # Calculate new period dates current_period_start = sub.get('period_start') or sub.get('next_invoice_date') new_period_start = next_period_start new_next_invoice_date = _calculate_next_period_start(new_period_start, sub['billing_interval']) # Update subscription with new period dates update_query = """ UPDATE sag_subscriptions SET period_start = %s, next_invoice_date = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s """ cursor.execute(update_query, (new_period_start, new_next_invoice_date, subscription_id)) conn.commit() logger.info(f"✅ Advanced subscription #{subscription_id}: next invoice {new_next_invoice_date}") except Exception as e: conn.rollback() raise e finally: cursor.close() conn.close() def _calculate_next_period_start(current_date, billing_interval: str) -> date: """Calculate next period start date based on billing interval""" # Parse current_date if it's a string if isinstance(current_date, str): current_date = datetime.strptime(current_date, '%Y-%m-%d').date() elif isinstance(current_date, datetime): current_date = current_date.date() # Calculate delta based on interval if billing_interval == 'daily': delta = relativedelta(days=1) elif billing_interval == 'biweekly': delta = relativedelta(weeks=2) elif billing_interval == 'monthly': delta = relativedelta(months=1) elif billing_interval == 'quarterly': delta = relativedelta(months=3) elif billing_interval == 'yearly': delta = relativedelta(years=1) else: # Default to monthly if unknown logger.warning(f"Unknown billing interval '{billing_interval}', defaulting to monthly") delta = relativedelta(months=1) next_date = current_date + delta return next_date