bmc_hub/app/subscriptions/backend/router.py

1105 lines
43 KiB
Python
Raw Normal View History

"""
Subscriptions API
Sag-based subscriptions listing and stats
"""
from fastapi import APIRouter, HTTPException, Query
from typing import List, Dict, Any, Optional
from app.core.database import execute_query, execute_query_single, get_db_connection, release_db_connection
from psycopg2.extras import RealDictCursor
import logging
import hashlib
import json
from uuid import uuid4
from datetime import datetime, date, timedelta
from dateutil.relativedelta import relativedelta
from fastapi import Request
from app.services.simplycrm_service import SimplyCRMService
logger = logging.getLogger(__name__)
router = APIRouter()
ALLOWED_STATUSES = {"draft", "active", "paused", "cancelled"}
STAGING_KEY_SQL = "COALESCE(source_account_id, 'name:' || LOWER(COALESCE(source_customer_name, 'ukendt')))"
def _staging_status_with_mapping(status: str, has_customer: bool) -> str:
if status == "approved":
return "approved"
return "mapped" if has_customer else "pending"
def _safe_date(value: Optional[Any]) -> Optional[date]:
if value is None:
return None
if isinstance(value, date):
return value
if isinstance(value, datetime):
return value.date()
text = str(value).strip()
if not text:
return None
try:
return datetime.fromisoformat(text.replace("Z", "+00:00")).date()
except ValueError:
return None
def _simply_to_hub_interval(frequency: Optional[str]) -> str:
normalized = (frequency or "").strip().lower()
mapping = {
"daily": "daily",
"biweekly": "biweekly",
"weekly": "biweekly",
"monthly": "monthly",
"quarterly": "quarterly",
"yearly": "yearly",
"annually": "yearly",
"semi_annual": "yearly",
}
return mapping.get(normalized, "monthly")
def _next_invoice_date(start_date: date, interval: str) -> date:
if interval == "daily":
return start_date + timedelta(days=1)
if interval == "biweekly":
return start_date + timedelta(days=14)
if interval == "quarterly":
return start_date + relativedelta(months=3)
if interval == "yearly":
return start_date + relativedelta(years=1)
return start_date + relativedelta(months=1)
def _auto_map_customer(account_id: Optional[str], customer_name: Optional[str], customer_cvr: Optional[str]) -> Optional[int]:
if account_id:
row = execute_query_single(
"SELECT id FROM customers WHERE vtiger_id = %s LIMIT 1",
(account_id,)
)
if row and row.get("id"):
return int(row["id"])
if customer_cvr:
row = execute_query_single(
"SELECT id FROM customers WHERE cvr_number = %s LIMIT 1",
(customer_cvr,)
)
if row and row.get("id"):
return int(row["id"])
if customer_name:
row = execute_query_single(
"SELECT id FROM customers WHERE LOWER(name) = LOWER(%s) LIMIT 1",
(customer_name,)
)
if row and row.get("id"):
return int(row["id"])
return None
@router.get("/sag-subscriptions/by-sag/{sag_id}", response_model=Dict[str, Any])
async def get_subscription_by_sag(sag_id: int):
"""Get latest subscription for a case."""
try:
query = """
SELECT
s.id,
s.subscription_number,
s.sag_id,
sg.titel AS sag_title,
s.customer_id,
c.name AS customer_name,
s.product_name,
s.billing_interval,
s.billing_day,
s.price,
s.start_date,
s.end_date,
s.status,
s.notes
FROM sag_subscriptions s
LEFT JOIN sag_sager sg ON sg.id = s.sag_id
LEFT JOIN customers c ON c.id = s.customer_id
WHERE s.sag_id = %s
ORDER BY s.id DESC
LIMIT 1
"""
subscription = execute_query_single(query, (sag_id,))
if not subscription:
raise HTTPException(status_code=404, detail="Subscription not found")
items = execute_query(
"""
SELECT
i.id,
i.line_no,
i.product_id,
p.name AS product_name,
i.description,
i.quantity,
i.unit_price,
i.line_total
FROM sag_subscription_items i
LEFT JOIN products p ON p.id = i.product_id
WHERE i.subscription_id = %s
ORDER BY i.line_no ASC, i.id ASC
""",
(subscription["id"],)
)
subscription["line_items"] = items or []
return subscription
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error loading subscription by case: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.post("/sag-subscriptions", response_model=Dict[str, Any])
async def create_subscription(payload: Dict[str, Any]):
"""Create a new subscription tied to a case (status = draft)."""
try:
sag_id = payload.get("sag_id")
billing_interval = payload.get("billing_interval")
billing_day = payload.get("billing_day")
start_date = payload.get("start_date")
notes = payload.get("notes")
line_items = payload.get("line_items") or []
if not sag_id:
raise HTTPException(status_code=400, detail="sag_id is required")
if not billing_interval:
raise HTTPException(status_code=400, detail="billing_interval is required")
if billing_day is None:
raise HTTPException(status_code=400, detail="billing_day is required")
if not start_date:
raise HTTPException(status_code=400, detail="start_date is required")
if not line_items:
raise HTTPException(status_code=400, detail="line_items is required")
sag = execute_query_single(
"SELECT id, customer_id FROM sag_sager WHERE id = %s",
(sag_id,)
)
if not sag or not sag.get("customer_id"):
raise HTTPException(status_code=400, detail="Case must have a customer")
existing = execute_query_single(
"""
SELECT id FROM sag_subscriptions
WHERE sag_id = %s AND status != 'cancelled'
ORDER BY id DESC
LIMIT 1
""",
(sag_id,)
)
if existing:
raise HTTPException(status_code=400, detail="Subscription already exists for this case")
product_ids = [item.get("product_id") for item in line_items if item.get("product_id")]
product_map = {}
if product_ids:
rows = execute_query(
"SELECT id, name, sales_price FROM products WHERE id = ANY(%s)",
(product_ids,)
)
product_map = {row["id"]: row for row in (rows or [])}
cleaned_items = []
total_price = 0
for idx, item in enumerate(line_items, start=1):
product_id = item.get("product_id")
description = (item.get("description") or "").strip()
quantity = item.get("quantity")
unit_price = item.get("unit_price")
product = product_map.get(product_id)
if not description and product:
description = product.get("name") or ""
if unit_price is None and product and product.get("sales_price") is not None:
unit_price = product.get("sales_price")
if not description:
raise HTTPException(status_code=400, detail="line_items description is required")
if quantity is None or float(quantity) <= 0:
raise HTTPException(status_code=400, detail="line_items quantity must be > 0")
if unit_price is None or float(unit_price) < 0:
raise HTTPException(status_code=400, detail="line_items unit_price must be >= 0")
line_total = float(quantity) * float(unit_price)
total_price += line_total
cleaned_items.append({
"line_no": idx,
"product_id": product_id,
"description": description,
"quantity": quantity,
"unit_price": unit_price,
"line_total": line_total,
})
product_name = cleaned_items[0]["description"]
if len(cleaned_items) > 1:
product_name = f"{product_name} (+{len(cleaned_items) - 1})"
# Calculate next_invoice_date based on billing_interval
start_dt = datetime.strptime(start_date, "%Y-%m-%d").date()
period_start = start_dt
# Calculate next invoice date
if billing_interval == "daily":
next_invoice_date = start_dt + timedelta(days=1)
elif billing_interval == "biweekly":
next_invoice_date = start_dt + timedelta(days=14)
elif billing_interval == "monthly":
next_invoice_date = start_dt + relativedelta(months=1)
elif billing_interval == "quarterly":
next_invoice_date = start_dt + relativedelta(months=3)
elif billing_interval == "yearly":
next_invoice_date = start_dt + relativedelta(years=1)
else:
next_invoice_date = start_dt + relativedelta(months=1) # Default to monthly
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
INSERT INTO sag_subscriptions (
sag_id,
customer_id,
product_name,
billing_interval,
billing_day,
price,
start_date,
period_start,
next_invoice_date,
status,
notes
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, 'draft', %s)
RETURNING *
""",
(
sag_id,
sag["customer_id"],
product_name,
billing_interval,
billing_day,
total_price,
start_date,
period_start,
next_invoice_date,
notes,
)
)
subscription = cursor.fetchone()
for item in cleaned_items:
cursor.execute(
"""
INSERT INTO sag_subscription_items (
subscription_id,
line_no,
product_id,
description,
quantity,
unit_price,
line_total
) VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
(
subscription["id"],
item["line_no"],
item["product_id"],
item["description"],
item["quantity"],
item["unit_price"],
item["line_total"],
)
)
conn.commit()
subscription["line_items"] = cleaned_items
return subscription
finally:
release_db_connection(conn)
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error creating subscription: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/sag-subscriptions/{subscription_id}", response_model=Dict[str, Any])
async def get_subscription(subscription_id: int):
"""Get a single subscription by ID with all details."""
try:
query = """
SELECT
s.id,
s.subscription_number,
s.sag_id,
sg.titel AS sag_title,
s.customer_id,
c.name AS customer_name,
s.product_name,
s.billing_interval,
s.billing_day,
s.price,
s.start_date,
s.end_date,
s.next_invoice_date,
s.period_start,
s.notice_period_days,
s.status,
s.notes,
s.cancelled_at,
s.cancellation_reason,
s.created_at,
s.updated_at
FROM sag_subscriptions s
LEFT JOIN sag_sager sg ON sg.id = s.sag_id
LEFT JOIN customers c ON c.id = s.customer_id
WHERE s.id = %s
"""
subscription = execute_query_single(query, (subscription_id,))
if not subscription:
raise HTTPException(status_code=404, detail="Subscription not found")
# Get line items
items = execute_query(
"""
SELECT
i.id,
i.line_no,
i.product_id,
p.name AS product_name,
i.description,
i.quantity,
i.unit_price,
i.line_total
FROM sag_subscription_items i
LEFT JOIN products p ON p.id = i.product_id
WHERE i.subscription_id = %s
ORDER BY i.line_no ASC, i.id ASC
""",
(subscription_id,)
)
subscription["line_items"] = items or []
return subscription
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error loading subscription: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.patch("/sag-subscriptions/{subscription_id}", response_model=Dict[str, Any])
async def update_subscription(subscription_id: int, payload: Dict[str, Any]):
"""Update subscription - all fields editable including line items."""
try:
subscription = execute_query_single(
"SELECT id, status FROM sag_subscriptions WHERE id = %s",
(subscription_id,)
)
if not subscription:
raise HTTPException(status_code=404, detail="Subscription not found")
# Extract line_items before processing other fields
line_items = payload.pop("line_items", None)
# Build dynamic update query
allowed_fields = {
"product_name", "billing_interval", "billing_day", "price",
"start_date", "end_date", "next_invoice_date", "period_start",
"notice_period_days", "status", "notes"
}
updates = []
values = []
for field, value in payload.items():
if field in allowed_fields:
updates.append(f"{field} = %s")
values.append(value)
# Validate status if provided
if "status" in payload and payload["status"] not in ALLOWED_STATUSES:
raise HTTPException(status_code=400, detail="Invalid status")
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
# Update subscription fields if any
if updates:
values.append(subscription_id)
query = f"""
UPDATE sag_subscriptions
SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
RETURNING *
"""
cursor.execute(query, tuple(values))
result = cursor.fetchone()
else:
cursor.execute("SELECT * FROM sag_subscriptions WHERE id = %s", (subscription_id,))
result = cursor.fetchone()
# Update line items if provided
if line_items is not None:
# Delete existing line items
cursor.execute(
"DELETE FROM sag_subscription_items WHERE subscription_id = %s",
(subscription_id,)
)
# Insert new line items
for idx, item in enumerate(line_items, start=1):
description = item.get("description", "").strip()
quantity = float(item.get("quantity", 0))
unit_price = float(item.get("unit_price", 0))
if not description or quantity <= 0:
continue
line_total = quantity * unit_price
cursor.execute(
"""
INSERT INTO sag_subscription_items (
subscription_id, line_no, description,
quantity, unit_price, line_total, product_id
) VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
(
subscription_id, idx, description,
quantity, unit_price, line_total,
item.get("product_id")
)
)
conn.commit()
return result
finally:
release_db_connection(conn)
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error updating subscription: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.patch("/sag-subscriptions/{subscription_id}/status", response_model=Dict[str, Any])
async def update_subscription_status(subscription_id: int, payload: Dict[str, Any]):
"""Update subscription status."""
try:
status = payload.get("status")
if status not in ALLOWED_STATUSES:
raise HTTPException(status_code=400, detail="Invalid status")
query = """
UPDATE sag_subscriptions
SET status = %s, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
RETURNING *
"""
result = execute_query(query, (status, subscription_id))
if not result:
raise HTTPException(status_code=404, detail="Subscription not found")
return result[0]
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error updating subscription status: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/sag-subscriptions", response_model=List[Dict[str, Any]])
async def list_subscriptions(status: str = Query("all")):
"""List subscriptions by status (default: all) with line item counts."""
try:
where_clause = ""
params: List[Any] = []
if status and status != "all":
where_clause = "WHERE s.status = %s"
params.append(status)
query = f"""
SELECT
s.id,
s.subscription_number,
s.sag_id,
sg.titel AS sag_title,
s.customer_id,
c.name AS customer_name,
s.product_name,
s.billing_interval,
s.billing_day,
s.price,
s.start_date,
s.end_date,
s.status,
(SELECT COUNT(*) FROM sag_subscription_items WHERE subscription_id = s.id) as item_count
FROM sag_subscriptions s
LEFT JOIN sag_sager sg ON sg.id = s.sag_id
LEFT JOIN customers c ON c.id = s.customer_id
{where_clause}
ORDER BY s.start_date DESC, s.id DESC
"""
subscriptions = execute_query(query, tuple(params)) or []
# Add line_items array with count for display
for sub in subscriptions:
item_count = sub.get('item_count', 0)
sub['line_items'] = [{'count': item_count}] if item_count > 0 else []
return subscriptions
except Exception as e:
logger.error(f"❌ Error listing subscriptions: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/sag-subscriptions/stats/summary", response_model=Dict[str, Any])
async def subscription_stats(status: str = Query("all")):
"""Summary stats for subscriptions by status (default: all)."""
try:
where_clause = ""
params: List[Any] = []
if status and status != "all":
where_clause = "WHERE status = %s"
params.append(status)
query = f"""
SELECT
COUNT(*) AS subscription_count,
COALESCE(SUM(price), 0) AS total_amount,
COALESCE(AVG(price), 0) AS avg_amount
FROM sag_subscriptions
{where_clause}
"""
result = execute_query(query, tuple(params))
return result[0] if result else {
"subscription_count": 0,
"total_amount": 0,
"avg_amount": 0
}
except Exception as e:
logger.error(f"❌ Error loading subscription stats: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.post("/sag-subscriptions/process-invoices")
async def trigger_subscription_processing():
"""Manual trigger for subscription invoice processing (for testing)."""
try:
from app.jobs.process_subscriptions import process_subscriptions
await process_subscriptions()
return {"status": "success", "message": "Subscription processing completed"}
except Exception as e:
logger.error(f"❌ Manual subscription processing failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.post("/simply-subscription-staging/import", response_model=Dict[str, Any])
async def import_simply_subscriptions_to_staging():
"""Import recurring Simply CRM SalesOrders into staging (parking area)."""
try:
async with SimplyCRMService() as service:
raw_subscriptions = await service.fetch_active_subscriptions()
import_batch_id = str(uuid4())
account_cache: Dict[str, Dict[str, Any]] = {}
upserted = 0
auto_mapped = 0
for raw in raw_subscriptions:
normalized = service.extract_subscription_data(raw)
source_record_id = str(normalized.get("simplycrm_id") or raw.get("id") or "").strip()
if not source_record_id:
continue
source_account_id = normalized.get("account_id")
source_customer_name = None
source_customer_cvr = None
if source_account_id:
if source_account_id not in account_cache:
account_cache[source_account_id] = await service.fetch_account_by_id(source_account_id) or {}
account = account_cache[source_account_id]
source_customer_name = (account.get("accountname") or "").strip() or None
source_customer_cvr = (account.get("siccode") or account.get("vat_number") or "").strip() or None
if not source_customer_name:
source_customer_name = (raw.get("accountname") or raw.get("account_id") or "").strip() or None
hub_customer_id = _auto_map_customer(source_account_id, source_customer_name, source_customer_cvr)
if hub_customer_id:
auto_mapped += 1
source_status = (normalized.get("status") or "active").strip()
source_subject = (normalized.get("name") or raw.get("subject") or "").strip() or None
source_total_amount = float(normalized.get("total_amount") or normalized.get("subtotal") or 0)
source_currency = (normalized.get("currency") or "DKK").strip() or "DKK"
source_start_date = _safe_date(normalized.get("start_date"))
source_end_date = _safe_date(normalized.get("end_date"))
source_binding_end_date = _safe_date(normalized.get("binding_end_date"))
source_billing_frequency = _simply_to_hub_interval(normalized.get("billing_frequency"))
sync_hash = hashlib.sha256(
json.dumps(raw, ensure_ascii=False, sort_keys=True, default=str).encode("utf-8")
).hexdigest()
execute_query(
"""
INSERT INTO simply_subscription_staging (
source_system,
source_record_id,
source_account_id,
source_customer_name,
source_customer_cvr,
source_salesorder_no,
source_subject,
source_status,
source_start_date,
source_end_date,
source_binding_end_date,
source_billing_frequency,
source_total_amount,
source_currency,
source_raw,
sync_hash,
hub_customer_id,
approval_status,
import_batch_id,
imported_at,
updated_at
) VALUES (
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s::jsonb,
%s, %s, %s, %s::uuid, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
)
ON CONFLICT (source_system, source_record_id)
DO UPDATE SET
source_account_id = EXCLUDED.source_account_id,
source_customer_name = EXCLUDED.source_customer_name,
source_customer_cvr = EXCLUDED.source_customer_cvr,
source_salesorder_no = EXCLUDED.source_salesorder_no,
source_subject = EXCLUDED.source_subject,
source_status = EXCLUDED.source_status,
source_start_date = EXCLUDED.source_start_date,
source_end_date = EXCLUDED.source_end_date,
source_binding_end_date = EXCLUDED.source_binding_end_date,
source_billing_frequency = EXCLUDED.source_billing_frequency,
source_total_amount = EXCLUDED.source_total_amount,
source_currency = EXCLUDED.source_currency,
source_raw = EXCLUDED.source_raw,
sync_hash = EXCLUDED.sync_hash,
hub_customer_id = COALESCE(simply_subscription_staging.hub_customer_id, EXCLUDED.hub_customer_id),
approval_status = CASE
WHEN simply_subscription_staging.approval_status = 'approved' THEN 'approved'
ELSE %s
END,
approval_error = CASE
WHEN simply_subscription_staging.approval_status = 'approved' THEN simply_subscription_staging.approval_error
ELSE NULL
END,
import_batch_id = EXCLUDED.import_batch_id,
imported_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
""",
(
"simplycrm",
source_record_id,
source_account_id,
source_customer_name,
source_customer_cvr,
normalized.get("salesorder_no"),
source_subject,
source_status,
source_start_date,
source_end_date,
source_binding_end_date,
source_billing_frequency,
source_total_amount,
source_currency,
json.dumps(raw, ensure_ascii=False, default=str),
sync_hash,
hub_customer_id,
_staging_status_with_mapping("pending", bool(hub_customer_id)),
import_batch_id,
_staging_status_with_mapping("pending", bool(hub_customer_id)),
)
)
upserted += 1
return {
"status": "success",
"batch_id": import_batch_id,
"fetched": len(raw_subscriptions),
"upserted": upserted,
"auto_mapped": auto_mapped,
"pending_manual": max(upserted - auto_mapped, 0),
}
except Exception as e:
logger.error(f"❌ Simply staging import failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Could not import subscriptions from Simply CRM")
@router.get("/simply-subscription-staging/customers", response_model=List[Dict[str, Any]])
async def list_staging_customers(status: str = Query("pending")):
"""List staging queue grouped by customer/account key."""
try:
where_clauses = []
params: List[Any] = []
if status and status != "all":
where_clauses.append("approval_status = %s")
params.append(status)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"""
SELECT
{STAGING_KEY_SQL} AS customer_key,
COALESCE(MAX(source_customer_name), 'Ukendt kunde') AS source_customer_name,
MAX(source_account_id) AS source_account_id,
COUNT(*) AS row_count,
COUNT(*) FILTER (WHERE hub_customer_id IS NOT NULL) AS mapped_count,
COUNT(*) FILTER (WHERE approval_status = 'approved') AS approved_count,
COUNT(*) FILTER (WHERE approval_status = 'error') AS error_count,
COALESCE(SUM(source_total_amount), 0) AS total_amount,
MAX(updated_at) AS updated_at
FROM simply_subscription_staging
{where_sql}
GROUP BY {STAGING_KEY_SQL}
ORDER BY MAX(updated_at) DESC
"""
return execute_query(query, tuple(params)) or []
except Exception as e:
logger.error(f"❌ Failed listing staging customers: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Could not list staging customers")
@router.get("/simply-subscription-staging/customers/{customer_key}/rows", response_model=List[Dict[str, Any]])
async def list_staging_customer_rows(customer_key: str):
"""List staging rows for one customer group."""
try:
query = f"""
SELECT
s.id,
s.source_record_id,
s.source_salesorder_no,
s.source_subject,
s.source_status,
s.source_billing_frequency,
s.source_start_date,
s.source_end_date,
s.source_total_amount,
s.source_currency,
s.hub_customer_id,
c.name AS hub_customer_name,
s.hub_sag_id,
s.approval_status,
s.approval_error,
s.approved_at,
s.updated_at
FROM simply_subscription_staging s
LEFT JOIN customers c ON c.id = s.hub_customer_id
WHERE {STAGING_KEY_SQL} = %s
ORDER BY s.source_salesorder_no NULLS LAST, s.id ASC
"""
return execute_query(query, (customer_key,)) or []
except Exception as e:
logger.error(f"❌ Failed listing staging rows for customer key {customer_key}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Could not list staging rows")
@router.get("/simply-subscription-staging/rows", response_model=List[Dict[str, Any]])
async def list_all_staging_rows(
status: str = Query("all"),
limit: int = Query(500, ge=1, le=2000),
):
"""List all imported staging rows for overview page/table."""
try:
where_clauses = []
params: List[Any] = []
if status and status != "all":
where_clauses.append("s.approval_status = %s")
params.append(status)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"""
SELECT
s.id,
s.source_record_id,
s.source_salesorder_no,
s.source_account_id,
s.source_customer_name,
s.source_customer_cvr,
s.source_subject,
s.source_status,
s.source_billing_frequency,
s.source_start_date,
s.source_end_date,
s.source_total_amount,
s.source_currency,
s.hub_customer_id,
c.name AS hub_customer_name,
s.hub_sag_id,
s.approval_status,
s.approval_error,
s.approved_at,
s.import_batch_id,
s.imported_at,
s.updated_at
FROM simply_subscription_staging s
LEFT JOIN customers c ON c.id = s.hub_customer_id
{where_sql}
ORDER BY s.updated_at DESC, s.id DESC
LIMIT %s
"""
params.append(limit)
return execute_query(query, tuple(params)) or []
except Exception as e:
logger.error(f"❌ Failed listing all staging rows: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Could not list imported staging rows")
@router.patch("/simply-subscription-staging/{staging_id}/map", response_model=Dict[str, Any])
async def map_staging_row(staging_id: int, payload: Dict[str, Any]):
"""Map a staging row to Hub customer (and optional existing sag)."""
try:
hub_customer_id = payload.get("hub_customer_id")
hub_sag_id = payload.get("hub_sag_id")
if not hub_customer_id:
raise HTTPException(status_code=400, detail="hub_customer_id is required")
customer = execute_query_single("SELECT id FROM customers WHERE id = %s", (hub_customer_id,))
if not customer:
raise HTTPException(status_code=400, detail="Hub customer not found")
if hub_sag_id:
sag = execute_query_single(
"SELECT id, customer_id FROM sag_sager WHERE id = %s AND deleted_at IS NULL",
(hub_sag_id,)
)
if not sag:
raise HTTPException(status_code=400, detail="Hub sag not found")
if int(sag.get("customer_id") or 0) != int(hub_customer_id):
raise HTTPException(status_code=400, detail="Hub sag does not belong to selected customer")
result = execute_query(
"""
UPDATE simply_subscription_staging
SET hub_customer_id = %s,
hub_sag_id = %s,
approval_status = CASE WHEN approval_status = 'approved' THEN 'approved' ELSE 'mapped' END,
approval_error = NULL,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
RETURNING *
""",
(hub_customer_id, hub_sag_id, staging_id)
)
if not result:
raise HTTPException(status_code=404, detail="Staging row not found")
return result[0]
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Failed mapping staging row: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Could not map staging row")
@router.post("/simply-subscription-staging/customers/{customer_key}/approve", response_model=Dict[str, Any])
async def approve_staging_customer_rows(customer_key: str, payload: Dict[str, Any], request: Request):
"""Approve selected rows for one customer key and copy to Hub subscriptions."""
try:
row_ids = payload.get("row_ids") or []
if not isinstance(row_ids, list) or not row_ids:
raise HTTPException(status_code=400, detail="row_ids is required")
user_id = getattr(request.state, "user_id", None)
created_by_user_id = int(user_id) if user_id is not None else 1
rows = execute_query(
f"""
SELECT *
FROM simply_subscription_staging
WHERE {STAGING_KEY_SQL} = %s
AND id = ANY(%s)
""",
(customer_key, row_ids)
) or []
if not rows:
raise HTTPException(status_code=404, detail="No staging rows found for customer + selection")
success_rows: List[int] = []
error_rows: List[Dict[str, Any]] = []
for row in rows:
row_id = int(row["id"])
hub_customer_id = row.get("hub_customer_id")
if not hub_customer_id:
error_message = "Missing hub_customer_id mapping"
execute_query(
"""
UPDATE simply_subscription_staging
SET approval_status = 'error',
approval_error = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""",
(error_message, row_id)
)
error_rows.append({"id": row_id, "error": error_message})
continue
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
start_date = _safe_date(row.get("source_start_date")) or date.today()
billing_interval = _simply_to_hub_interval(row.get("source_billing_frequency"))
billing_day = min(max(start_date.day, 1), 31)
next_invoice_date = _next_invoice_date(start_date, billing_interval)
source_subject = (row.get("source_subject") or row.get("source_salesorder_no") or "Simply abonnement").strip()
source_record_id = row.get("source_record_id") or str(row_id)
source_salesorder_no = row.get("source_salesorder_no") or source_record_id
amount = float(row.get("source_total_amount") or 0)
sag_id = row.get("hub_sag_id")
if not sag_id:
cursor.execute(
"""
INSERT INTO sag_sager (
titel,
beskrivelse,
template_key,
status,
customer_id,
created_by_user_id
) VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
f"Simply abonnement {source_salesorder_no}",
f"Auto-oprettet fra Simply CRM staging row {source_record_id}",
"subscription",
"åben",
hub_customer_id,
created_by_user_id,
)
)
sag_id = cursor.fetchone()["id"]
cursor.execute(
"""
INSERT INTO sag_subscriptions (
sag_id,
customer_id,
product_name,
billing_interval,
billing_day,
price,
start_date,
period_start,
next_invoice_date,
status,
notes
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, 'draft', %s)
RETURNING id
""",
(
sag_id,
hub_customer_id,
source_subject,
billing_interval,
billing_day,
amount,
start_date,
start_date,
next_invoice_date,
f"Imported from Simply CRM source {source_record_id}",
)
)
subscription_id = cursor.fetchone()["id"]
cursor.execute(
"""
INSERT INTO sag_subscription_items (
subscription_id,
line_no,
product_id,
description,
quantity,
unit_price,
line_total
) VALUES (%s, 1, NULL, %s, 1, %s, %s)
""",
(
subscription_id,
source_subject,
amount,
amount,
)
)
cursor.execute(
"""
UPDATE simply_subscription_staging
SET hub_sag_id = %s,
approval_status = 'approved',
approval_error = NULL,
approved_at = CURRENT_TIMESTAMP,
approved_by_user_id = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""",
(sag_id, created_by_user_id, row_id)
)
conn.commit()
success_rows.append(row_id)
except Exception as row_exc:
conn.rollback()
error_message = str(row_exc)
execute_query(
"""
UPDATE simply_subscription_staging
SET approval_status = 'error',
approval_error = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""",
(error_message[:1000], row_id)
)
error_rows.append({"id": row_id, "error": error_message})
finally:
release_db_connection(conn)
return {
"status": "completed",
"selected_count": len(row_ids),
"approved_count": len(success_rows),
"error_count": len(error_rows),
"approved_row_ids": success_rows,
"errors": error_rows,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Failed approving staging rows: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Could not approve selected staging rows")