""" Subscriptions API Sag-based subscriptions listing and stats """ from fastapi import APIRouter, HTTPException, Query from typing import List, Dict, Any, Optional from app.core.database import execute_query, execute_query_single, get_db_connection, release_db_connection from psycopg2.extras import RealDictCursor import logging import hashlib import json from uuid import uuid4 from datetime import datetime, date, timedelta from dateutil.relativedelta import relativedelta from fastapi import Request from app.services.simplycrm_service import SimplyCRMService logger = logging.getLogger(__name__) router = APIRouter() ALLOWED_STATUSES = {"draft", "active", "paused", "cancelled"} STAGING_KEY_SQL = "COALESCE(source_account_id, 'name:' || LOWER(COALESCE(source_customer_name, 'ukendt')))" def _staging_status_with_mapping(status: str, has_customer: bool) -> str: if status == "approved": return "approved" return "mapped" if has_customer else "pending" def _safe_date(value: Optional[Any]) -> Optional[date]: if value is None: return None if isinstance(value, date): return value if isinstance(value, datetime): return value.date() text = str(value).strip() if not text: return None try: return datetime.fromisoformat(text.replace("Z", "+00:00")).date() except ValueError: return None def _simply_to_hub_interval(frequency: Optional[str]) -> str: normalized = (frequency or "").strip().lower() mapping = { "daily": "daily", "biweekly": "biweekly", "weekly": "biweekly", "monthly": "monthly", "quarterly": "quarterly", "yearly": "yearly", "annually": "yearly", "semi_annual": "yearly", } return mapping.get(normalized, "monthly") def _next_invoice_date(start_date: date, interval: str) -> date: if interval == "daily": return start_date + timedelta(days=1) if interval == "biweekly": return start_date + timedelta(days=14) if interval == "quarterly": return start_date + relativedelta(months=3) if interval == "yearly": return start_date + relativedelta(years=1) return start_date + relativedelta(months=1) def _auto_map_customer(account_id: Optional[str], customer_name: Optional[str], customer_cvr: Optional[str]) -> Optional[int]: if account_id: row = execute_query_single( "SELECT id FROM customers WHERE vtiger_id = %s LIMIT 1", (account_id,) ) if row and row.get("id"): return int(row["id"]) if customer_cvr: row = execute_query_single( "SELECT id FROM customers WHERE cvr_number = %s LIMIT 1", (customer_cvr,) ) if row and row.get("id"): return int(row["id"]) if customer_name: row = execute_query_single( "SELECT id FROM customers WHERE LOWER(name) = LOWER(%s) LIMIT 1", (customer_name,) ) if row and row.get("id"): return int(row["id"]) return None @router.get("/sag-subscriptions/by-sag/{sag_id}", response_model=Dict[str, Any]) async def get_subscription_by_sag(sag_id: int): """Get latest subscription for a case.""" try: query = """ SELECT s.id, s.subscription_number, s.sag_id, sg.titel AS sag_title, s.customer_id, c.name AS customer_name, s.product_name, s.billing_interval, s.billing_day, s.price, s.start_date, s.end_date, s.status, s.notes FROM sag_subscriptions s LEFT JOIN sag_sager sg ON sg.id = s.sag_id LEFT JOIN customers c ON c.id = s.customer_id WHERE s.sag_id = %s ORDER BY s.id DESC LIMIT 1 """ subscription = execute_query_single(query, (sag_id,)) if not subscription: raise HTTPException(status_code=404, detail="Subscription not found") items = execute_query( """ SELECT i.id, i.line_no, i.product_id, p.name AS product_name, i.description, i.quantity, i.unit_price, i.line_total FROM sag_subscription_items i LEFT JOIN products p ON p.id = i.product_id WHERE i.subscription_id = %s ORDER BY i.line_no ASC, i.id ASC """, (subscription["id"],) ) subscription["line_items"] = items or [] return subscription except HTTPException: raise except Exception as e: logger.error(f"❌ Error loading subscription by case: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.post("/sag-subscriptions", response_model=Dict[str, Any]) async def create_subscription(payload: Dict[str, Any]): """Create a new subscription tied to a case (status = draft).""" try: sag_id = payload.get("sag_id") billing_interval = payload.get("billing_interval") billing_day = payload.get("billing_day") start_date = payload.get("start_date") notes = payload.get("notes") line_items = payload.get("line_items") or [] if not sag_id: raise HTTPException(status_code=400, detail="sag_id is required") if not billing_interval: raise HTTPException(status_code=400, detail="billing_interval is required") if billing_day is None: raise HTTPException(status_code=400, detail="billing_day is required") if not start_date: raise HTTPException(status_code=400, detail="start_date is required") if not line_items: raise HTTPException(status_code=400, detail="line_items is required") sag = execute_query_single( "SELECT id, customer_id FROM sag_sager WHERE id = %s", (sag_id,) ) if not sag or not sag.get("customer_id"): raise HTTPException(status_code=400, detail="Case must have a customer") existing = execute_query_single( """ SELECT id FROM sag_subscriptions WHERE sag_id = %s AND status != 'cancelled' ORDER BY id DESC LIMIT 1 """, (sag_id,) ) if existing: raise HTTPException(status_code=400, detail="Subscription already exists for this case") product_ids = [item.get("product_id") for item in line_items if item.get("product_id")] product_map = {} if product_ids: rows = execute_query( "SELECT id, name, sales_price FROM products WHERE id = ANY(%s)", (product_ids,) ) product_map = {row["id"]: row for row in (rows or [])} cleaned_items = [] total_price = 0 for idx, item in enumerate(line_items, start=1): product_id = item.get("product_id") description = (item.get("description") or "").strip() quantity = item.get("quantity") unit_price = item.get("unit_price") product = product_map.get(product_id) if not description and product: description = product.get("name") or "" if unit_price is None and product and product.get("sales_price") is not None: unit_price = product.get("sales_price") if not description: raise HTTPException(status_code=400, detail="line_items description is required") if quantity is None or float(quantity) <= 0: raise HTTPException(status_code=400, detail="line_items quantity must be > 0") if unit_price is None or float(unit_price) < 0: raise HTTPException(status_code=400, detail="line_items unit_price must be >= 0") line_total = float(quantity) * float(unit_price) total_price += line_total cleaned_items.append({ "line_no": idx, "product_id": product_id, "description": description, "quantity": quantity, "unit_price": unit_price, "line_total": line_total, }) product_name = cleaned_items[0]["description"] if len(cleaned_items) > 1: product_name = f"{product_name} (+{len(cleaned_items) - 1})" # Calculate next_invoice_date based on billing_interval start_dt = datetime.strptime(start_date, "%Y-%m-%d").date() period_start = start_dt # Calculate next invoice date if billing_interval == "daily": next_invoice_date = start_dt + timedelta(days=1) elif billing_interval == "biweekly": next_invoice_date = start_dt + timedelta(days=14) elif billing_interval == "monthly": next_invoice_date = start_dt + relativedelta(months=1) elif billing_interval == "quarterly": next_invoice_date = start_dt + relativedelta(months=3) elif billing_interval == "yearly": next_invoice_date = start_dt + relativedelta(years=1) else: next_invoice_date = start_dt + relativedelta(months=1) # Default to monthly conn = get_db_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute( """ INSERT INTO sag_subscriptions ( sag_id, customer_id, product_name, billing_interval, billing_day, price, start_date, period_start, next_invoice_date, status, notes ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, 'draft', %s) RETURNING * """, ( sag_id, sag["customer_id"], product_name, billing_interval, billing_day, total_price, start_date, period_start, next_invoice_date, notes, ) ) subscription = cursor.fetchone() for item in cleaned_items: cursor.execute( """ INSERT INTO sag_subscription_items ( subscription_id, line_no, product_id, description, quantity, unit_price, line_total ) VALUES (%s, %s, %s, %s, %s, %s, %s) """, ( subscription["id"], item["line_no"], item["product_id"], item["description"], item["quantity"], item["unit_price"], item["line_total"], ) ) conn.commit() subscription["line_items"] = cleaned_items return subscription finally: release_db_connection(conn) except HTTPException: raise except Exception as e: logger.error(f"❌ Error creating subscription: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.get("/sag-subscriptions/{subscription_id}", response_model=Dict[str, Any]) async def get_subscription(subscription_id: int): """Get a single subscription by ID with all details.""" try: query = """ SELECT s.id, s.subscription_number, s.sag_id, sg.titel AS sag_title, s.customer_id, c.name AS customer_name, s.product_name, s.billing_interval, s.billing_day, s.price, s.start_date, s.end_date, s.next_invoice_date, s.period_start, s.notice_period_days, s.status, s.notes, s.cancelled_at, s.cancellation_reason, s.created_at, s.updated_at FROM sag_subscriptions s LEFT JOIN sag_sager sg ON sg.id = s.sag_id LEFT JOIN customers c ON c.id = s.customer_id WHERE s.id = %s """ subscription = execute_query_single(query, (subscription_id,)) if not subscription: raise HTTPException(status_code=404, detail="Subscription not found") # Get line items items = execute_query( """ SELECT i.id, i.line_no, i.product_id, p.name AS product_name, i.description, i.quantity, i.unit_price, i.line_total FROM sag_subscription_items i LEFT JOIN products p ON p.id = i.product_id WHERE i.subscription_id = %s ORDER BY i.line_no ASC, i.id ASC """, (subscription_id,) ) subscription["line_items"] = items or [] return subscription except HTTPException: raise except Exception as e: logger.error(f"❌ Error loading subscription: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.patch("/sag-subscriptions/{subscription_id}", response_model=Dict[str, Any]) async def update_subscription(subscription_id: int, payload: Dict[str, Any]): """Update subscription - all fields editable including line items.""" try: subscription = execute_query_single( "SELECT id, status FROM sag_subscriptions WHERE id = %s", (subscription_id,) ) if not subscription: raise HTTPException(status_code=404, detail="Subscription not found") # Extract line_items before processing other fields line_items = payload.pop("line_items", None) # Build dynamic update query allowed_fields = { "product_name", "billing_interval", "billing_day", "price", "start_date", "end_date", "next_invoice_date", "period_start", "notice_period_days", "status", "notes" } updates = [] values = [] for field, value in payload.items(): if field in allowed_fields: updates.append(f"{field} = %s") values.append(value) # Validate status if provided if "status" in payload and payload["status"] not in ALLOWED_STATUSES: raise HTTPException(status_code=400, detail="Invalid status") conn = get_db_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cursor: # Update subscription fields if any if updates: values.append(subscription_id) query = f""" UPDATE sag_subscriptions SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s RETURNING * """ cursor.execute(query, tuple(values)) result = cursor.fetchone() else: cursor.execute("SELECT * FROM sag_subscriptions WHERE id = %s", (subscription_id,)) result = cursor.fetchone() # Update line items if provided if line_items is not None: # Delete existing line items cursor.execute( "DELETE FROM sag_subscription_items WHERE subscription_id = %s", (subscription_id,) ) # Insert new line items for idx, item in enumerate(line_items, start=1): description = item.get("description", "").strip() quantity = float(item.get("quantity", 0)) unit_price = float(item.get("unit_price", 0)) if not description or quantity <= 0: continue line_total = quantity * unit_price cursor.execute( """ INSERT INTO sag_subscription_items ( subscription_id, line_no, description, quantity, unit_price, line_total, product_id ) VALUES (%s, %s, %s, %s, %s, %s, %s) """, ( subscription_id, idx, description, quantity, unit_price, line_total, item.get("product_id") ) ) conn.commit() return result finally: release_db_connection(conn) except HTTPException: raise except Exception as e: logger.error(f"❌ Error updating subscription: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.patch("/sag-subscriptions/{subscription_id}/status", response_model=Dict[str, Any]) async def update_subscription_status(subscription_id: int, payload: Dict[str, Any]): """Update subscription status.""" try: status = payload.get("status") if status not in ALLOWED_STATUSES: raise HTTPException(status_code=400, detail="Invalid status") query = """ UPDATE sag_subscriptions SET status = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s RETURNING * """ result = execute_query(query, (status, subscription_id)) if not result: raise HTTPException(status_code=404, detail="Subscription not found") return result[0] except HTTPException: raise except Exception as e: logger.error(f"❌ Error updating subscription status: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.get("/sag-subscriptions", response_model=List[Dict[str, Any]]) async def list_subscriptions(status: str = Query("all")): """List subscriptions by status (default: all) with line item counts.""" try: where_clause = "" params: List[Any] = [] if status and status != "all": where_clause = "WHERE s.status = %s" params.append(status) query = f""" SELECT s.id, s.subscription_number, s.sag_id, sg.titel AS sag_title, s.customer_id, c.name AS customer_name, s.product_name, s.billing_interval, s.billing_day, s.price, s.start_date, s.end_date, s.status, (SELECT COUNT(*) FROM sag_subscription_items WHERE subscription_id = s.id) as item_count FROM sag_subscriptions s LEFT JOIN sag_sager sg ON sg.id = s.sag_id LEFT JOIN customers c ON c.id = s.customer_id {where_clause} ORDER BY s.start_date DESC, s.id DESC """ subscriptions = execute_query(query, tuple(params)) or [] # Add line_items array with count for display for sub in subscriptions: item_count = sub.get('item_count', 0) sub['line_items'] = [{'count': item_count}] if item_count > 0 else [] return subscriptions except Exception as e: logger.error(f"❌ Error listing subscriptions: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.get("/sag-subscriptions/stats/summary", response_model=Dict[str, Any]) async def subscription_stats(status: str = Query("all")): """Summary stats for subscriptions by status (default: all).""" try: where_clause = "" params: List[Any] = [] if status and status != "all": where_clause = "WHERE status = %s" params.append(status) query = f""" SELECT COUNT(*) AS subscription_count, COALESCE(SUM(price), 0) AS total_amount, COALESCE(AVG(price), 0) AS avg_amount FROM sag_subscriptions {where_clause} """ result = execute_query(query, tuple(params)) return result[0] if result else { "subscription_count": 0, "total_amount": 0, "avg_amount": 0 } except Exception as e: logger.error(f"❌ Error loading subscription stats: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.post("/sag-subscriptions/process-invoices") async def trigger_subscription_processing(): """Manual trigger for subscription invoice processing (for testing).""" try: from app.jobs.process_subscriptions import process_subscriptions await process_subscriptions() return {"status": "success", "message": "Subscription processing completed"} except Exception as e: logger.error(f"❌ Manual subscription processing failed: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.post("/simply-subscription-staging/import", response_model=Dict[str, Any]) async def import_simply_subscriptions_to_staging(): """Import recurring Simply CRM SalesOrders into staging (parking area).""" try: async with SimplyCRMService() as service: raw_subscriptions = await service.fetch_active_subscriptions() import_batch_id = str(uuid4()) account_cache: Dict[str, Dict[str, Any]] = {} upserted = 0 auto_mapped = 0 for raw in raw_subscriptions: normalized = service.extract_subscription_data(raw) source_record_id = str(normalized.get("simplycrm_id") or raw.get("id") or "").strip() if not source_record_id: continue source_account_id = normalized.get("account_id") source_customer_name = None source_customer_cvr = None if source_account_id: if source_account_id not in account_cache: account_cache[source_account_id] = await service.fetch_account_by_id(source_account_id) or {} account = account_cache[source_account_id] source_customer_name = (account.get("accountname") or "").strip() or None source_customer_cvr = (account.get("siccode") or account.get("vat_number") or "").strip() or None if not source_customer_name: source_customer_name = (raw.get("accountname") or raw.get("account_id") or "").strip() or None hub_customer_id = _auto_map_customer(source_account_id, source_customer_name, source_customer_cvr) if hub_customer_id: auto_mapped += 1 source_status = (normalized.get("status") or "active").strip() source_subject = (normalized.get("name") or raw.get("subject") or "").strip() or None source_total_amount = float(normalized.get("total_amount") or normalized.get("subtotal") or 0) source_currency = (normalized.get("currency") or "DKK").strip() or "DKK" source_start_date = _safe_date(normalized.get("start_date")) source_end_date = _safe_date(normalized.get("end_date")) source_binding_end_date = _safe_date(normalized.get("binding_end_date")) source_billing_frequency = _simply_to_hub_interval(normalized.get("billing_frequency")) sync_hash = hashlib.sha256( json.dumps(raw, ensure_ascii=False, sort_keys=True, default=str).encode("utf-8") ).hexdigest() execute_query( """ INSERT INTO simply_subscription_staging ( source_system, source_record_id, source_account_id, source_customer_name, source_customer_cvr, source_salesorder_no, source_subject, source_status, source_start_date, source_end_date, source_binding_end_date, source_billing_frequency, source_total_amount, source_currency, source_raw, sync_hash, hub_customer_id, approval_status, import_batch_id, imported_at, updated_at ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s, %s, %s::uuid, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP ) ON CONFLICT (source_system, source_record_id) DO UPDATE SET source_account_id = EXCLUDED.source_account_id, source_customer_name = EXCLUDED.source_customer_name, source_customer_cvr = EXCLUDED.source_customer_cvr, source_salesorder_no = EXCLUDED.source_salesorder_no, source_subject = EXCLUDED.source_subject, source_status = EXCLUDED.source_status, source_start_date = EXCLUDED.source_start_date, source_end_date = EXCLUDED.source_end_date, source_binding_end_date = EXCLUDED.source_binding_end_date, source_billing_frequency = EXCLUDED.source_billing_frequency, source_total_amount = EXCLUDED.source_total_amount, source_currency = EXCLUDED.source_currency, source_raw = EXCLUDED.source_raw, sync_hash = EXCLUDED.sync_hash, hub_customer_id = COALESCE(simply_subscription_staging.hub_customer_id, EXCLUDED.hub_customer_id), approval_status = CASE WHEN simply_subscription_staging.approval_status = 'approved' THEN 'approved' ELSE %s END, approval_error = CASE WHEN simply_subscription_staging.approval_status = 'approved' THEN simply_subscription_staging.approval_error ELSE NULL END, import_batch_id = EXCLUDED.import_batch_id, imported_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP """, ( "simplycrm", source_record_id, source_account_id, source_customer_name, source_customer_cvr, normalized.get("salesorder_no"), source_subject, source_status, source_start_date, source_end_date, source_binding_end_date, source_billing_frequency, source_total_amount, source_currency, json.dumps(raw, ensure_ascii=False, default=str), sync_hash, hub_customer_id, _staging_status_with_mapping("pending", bool(hub_customer_id)), import_batch_id, _staging_status_with_mapping("pending", bool(hub_customer_id)), ) ) upserted += 1 return { "status": "success", "batch_id": import_batch_id, "fetched": len(raw_subscriptions), "upserted": upserted, "auto_mapped": auto_mapped, "pending_manual": max(upserted - auto_mapped, 0), } except Exception as e: logger.error(f"❌ Simply staging import failed: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Could not import subscriptions from Simply CRM") @router.get("/simply-subscription-staging/customers", response_model=List[Dict[str, Any]]) async def list_staging_customers(status: str = Query("pending")): """List staging queue grouped by customer/account key.""" try: where_clauses = [] params: List[Any] = [] if status and status != "all": where_clauses.append("approval_status = %s") params.append(status) where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else "" query = f""" SELECT {STAGING_KEY_SQL} AS customer_key, COALESCE(MAX(source_customer_name), 'Ukendt kunde') AS source_customer_name, MAX(source_account_id) AS source_account_id, COUNT(*) AS row_count, COUNT(*) FILTER (WHERE hub_customer_id IS NOT NULL) AS mapped_count, COUNT(*) FILTER (WHERE approval_status = 'approved') AS approved_count, COUNT(*) FILTER (WHERE approval_status = 'error') AS error_count, COALESCE(SUM(source_total_amount), 0) AS total_amount, MAX(updated_at) AS updated_at FROM simply_subscription_staging {where_sql} GROUP BY {STAGING_KEY_SQL} ORDER BY MAX(updated_at) DESC """ return execute_query(query, tuple(params)) or [] except Exception as e: logger.error(f"❌ Failed listing staging customers: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Could not list staging customers") @router.get("/simply-subscription-staging/customers/{customer_key}/rows", response_model=List[Dict[str, Any]]) async def list_staging_customer_rows(customer_key: str): """List staging rows for one customer group.""" try: query = f""" SELECT s.id, s.source_record_id, s.source_salesorder_no, s.source_subject, s.source_status, s.source_billing_frequency, s.source_start_date, s.source_end_date, s.source_total_amount, s.source_currency, s.hub_customer_id, c.name AS hub_customer_name, s.hub_sag_id, s.approval_status, s.approval_error, s.approved_at, s.updated_at FROM simply_subscription_staging s LEFT JOIN customers c ON c.id = s.hub_customer_id WHERE {STAGING_KEY_SQL} = %s ORDER BY s.source_salesorder_no NULLS LAST, s.id ASC """ return execute_query(query, (customer_key,)) or [] except Exception as e: logger.error(f"❌ Failed listing staging rows for customer key {customer_key}: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Could not list staging rows") @router.get("/simply-subscription-staging/rows", response_model=List[Dict[str, Any]]) async def list_all_staging_rows( status: str = Query("all"), limit: int = Query(500, ge=1, le=2000), ): """List all imported staging rows for overview page/table.""" try: where_clauses = [] params: List[Any] = [] if status and status != "all": where_clauses.append("s.approval_status = %s") params.append(status) where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else "" query = f""" SELECT s.id, s.source_record_id, s.source_salesorder_no, s.source_account_id, s.source_customer_name, s.source_customer_cvr, s.source_subject, s.source_status, s.source_billing_frequency, s.source_start_date, s.source_end_date, s.source_total_amount, s.source_currency, s.hub_customer_id, c.name AS hub_customer_name, s.hub_sag_id, s.approval_status, s.approval_error, s.approved_at, s.import_batch_id, s.imported_at, s.updated_at FROM simply_subscription_staging s LEFT JOIN customers c ON c.id = s.hub_customer_id {where_sql} ORDER BY s.updated_at DESC, s.id DESC LIMIT %s """ params.append(limit) return execute_query(query, tuple(params)) or [] except Exception as e: logger.error(f"❌ Failed listing all staging rows: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Could not list imported staging rows") @router.patch("/simply-subscription-staging/{staging_id}/map", response_model=Dict[str, Any]) async def map_staging_row(staging_id: int, payload: Dict[str, Any]): """Map a staging row to Hub customer (and optional existing sag).""" try: hub_customer_id = payload.get("hub_customer_id") hub_sag_id = payload.get("hub_sag_id") if not hub_customer_id: raise HTTPException(status_code=400, detail="hub_customer_id is required") customer = execute_query_single("SELECT id FROM customers WHERE id = %s", (hub_customer_id,)) if not customer: raise HTTPException(status_code=400, detail="Hub customer not found") if hub_sag_id: sag = execute_query_single( "SELECT id, customer_id FROM sag_sager WHERE id = %s AND deleted_at IS NULL", (hub_sag_id,) ) if not sag: raise HTTPException(status_code=400, detail="Hub sag not found") if int(sag.get("customer_id") or 0) != int(hub_customer_id): raise HTTPException(status_code=400, detail="Hub sag does not belong to selected customer") result = execute_query( """ UPDATE simply_subscription_staging SET hub_customer_id = %s, hub_sag_id = %s, approval_status = CASE WHEN approval_status = 'approved' THEN 'approved' ELSE 'mapped' END, approval_error = NULL, updated_at = CURRENT_TIMESTAMP WHERE id = %s RETURNING * """, (hub_customer_id, hub_sag_id, staging_id) ) if not result: raise HTTPException(status_code=404, detail="Staging row not found") return result[0] except HTTPException: raise except Exception as e: logger.error(f"❌ Failed mapping staging row: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Could not map staging row") @router.post("/simply-subscription-staging/customers/{customer_key}/approve", response_model=Dict[str, Any]) async def approve_staging_customer_rows(customer_key: str, payload: Dict[str, Any], request: Request): """Approve selected rows for one customer key and copy to Hub subscriptions.""" try: row_ids = payload.get("row_ids") or [] if not isinstance(row_ids, list) or not row_ids: raise HTTPException(status_code=400, detail="row_ids is required") user_id = getattr(request.state, "user_id", None) created_by_user_id = int(user_id) if user_id is not None else 1 rows = execute_query( f""" SELECT * FROM simply_subscription_staging WHERE {STAGING_KEY_SQL} = %s AND id = ANY(%s) """, (customer_key, row_ids) ) or [] if not rows: raise HTTPException(status_code=404, detail="No staging rows found for customer + selection") success_rows: List[int] = [] error_rows: List[Dict[str, Any]] = [] for row in rows: row_id = int(row["id"]) hub_customer_id = row.get("hub_customer_id") if not hub_customer_id: error_message = "Missing hub_customer_id mapping" execute_query( """ UPDATE simply_subscription_staging SET approval_status = 'error', approval_error = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s """, (error_message, row_id) ) error_rows.append({"id": row_id, "error": error_message}) continue conn = get_db_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cursor: start_date = _safe_date(row.get("source_start_date")) or date.today() billing_interval = _simply_to_hub_interval(row.get("source_billing_frequency")) billing_day = min(max(start_date.day, 1), 31) next_invoice_date = _next_invoice_date(start_date, billing_interval) source_subject = (row.get("source_subject") or row.get("source_salesorder_no") or "Simply abonnement").strip() source_record_id = row.get("source_record_id") or str(row_id) source_salesorder_no = row.get("source_salesorder_no") or source_record_id amount = float(row.get("source_total_amount") or 0) sag_id = row.get("hub_sag_id") if not sag_id: cursor.execute( """ INSERT INTO sag_sager ( titel, beskrivelse, template_key, status, customer_id, created_by_user_id ) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id """, ( f"Simply abonnement {source_salesorder_no}", f"Auto-oprettet fra Simply CRM staging row {source_record_id}", "subscription", "åben", hub_customer_id, created_by_user_id, ) ) sag_id = cursor.fetchone()["id"] cursor.execute( """ INSERT INTO sag_subscriptions ( sag_id, customer_id, product_name, billing_interval, billing_day, price, start_date, period_start, next_invoice_date, status, notes ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, 'draft', %s) RETURNING id """, ( sag_id, hub_customer_id, source_subject, billing_interval, billing_day, amount, start_date, start_date, next_invoice_date, f"Imported from Simply CRM source {source_record_id}", ) ) subscription_id = cursor.fetchone()["id"] cursor.execute( """ INSERT INTO sag_subscription_items ( subscription_id, line_no, product_id, description, quantity, unit_price, line_total ) VALUES (%s, 1, NULL, %s, 1, %s, %s) """, ( subscription_id, source_subject, amount, amount, ) ) cursor.execute( """ UPDATE simply_subscription_staging SET hub_sag_id = %s, approval_status = 'approved', approval_error = NULL, approved_at = CURRENT_TIMESTAMP, approved_by_user_id = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s """, (sag_id, created_by_user_id, row_id) ) conn.commit() success_rows.append(row_id) except Exception as row_exc: conn.rollback() error_message = str(row_exc) execute_query( """ UPDATE simply_subscription_staging SET approval_status = 'error', approval_error = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s """, (error_message[:1000], row_id) ) error_rows.append({"id": row_id, "error": error_message}) finally: release_db_connection(conn) return { "status": "completed", "selected_count": len(row_ids), "approved_count": len(success_rows), "error_count": len(error_rows), "approved_row_ids": success_rows, "errors": error_rows, } except HTTPException: raise except Exception as e: logger.error(f"❌ Failed approving staging rows: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Could not approve selected staging rows")