from __future__ import annotations import logging import json from datetime import date from typing import Any, Dict, List, Literal, Optional from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel, Field from psycopg2.extras import RealDictCursor from dateutil.relativedelta import relativedelta from app.core.database import execute_query, execute_query_single, get_db_connection, release_db_connection from app.core.config import settings from app.jobs.process_subscriptions import process_subscriptions from app.subscriptions.backend.router import create_subscription as create_sag_subscription from app.subscriptions.backend.router import update_subscription as update_sag_subscription logger = logging.getLogger(__name__) router = APIRouter() PRICE_TYPE_FIELD_MAP = { "day": "rental_price_day", "week": "rental_price_week", "month": "rental_price_month", "year": "rental_price_year", } def _economic_safety_state() -> Dict[str, bool]: return { "economic_enabled": bool(getattr(settings, "ECONOMIC_ENABLED", False)), "economic_read_only": bool(getattr(settings, "ECONOMIC_READ_ONLY", True)), "economic_dry_run": bool(getattr(settings, "ECONOMIC_DRY_RUN", True)), "ordre_economic_read_only": bool(getattr(settings, "ORDRE_ECONOMIC_READ_ONLY", True)), "ordre_economic_dry_run": bool(getattr(settings, "ORDRE_ECONOMIC_DRY_RUN", True)), } def _add_interval(start: date, interval: str) -> date: normalized = (interval or "monthly").strip().lower() if normalized == "daily": return start + relativedelta(days=1) if normalized == "biweekly": return start + relativedelta(weeks=2) if normalized == "quarterly": return start + relativedelta(months=3) if normalized == "yearly": return start + relativedelta(years=1) return start + relativedelta(months=1) def _assert_asset_booking_available( asset_id: int, start_date: date, end_date: Optional[date], exclude_subscription_id: Optional[int] = None, ) -> None: where_extra = "" params: List[Any] = [asset_id, start_date, end_date] if exclude_subscription_id is not None: where_extra = " AND b.subscription_id != %s" params.append(exclude_subscription_id) overlap = execute_query_single( f""" SELECT b.id, b.subscription_id, b.start_date, b.end_date FROM subscription_asset_bindings b WHERE b.asset_id = %s AND b.deleted_at IS NULL AND b.status = 'active' {where_extra} AND NOT ( COALESCE(b.end_date, DATE '9999-12-31') < %s OR b.start_date > COALESCE(%s, DATE '9999-12-31') ) LIMIT 1 """, tuple(params), ) if overlap: raise HTTPException( status_code=409, detail=( f"Asset {asset_id} is already booked in subscription " f"{overlap.get('subscription_id')} from {overlap.get('start_date')} to {overlap.get('end_date')}" ), ) def _resolve_product_map(product_ids: List[int]) -> Dict[int, Dict[str, Any]]: if not product_ids: return {} rows = execute_query( """ SELECT id, sales_price, rental_price_day, rental_price_week, rental_price_month, rental_price_year FROM products WHERE id = ANY(%s) AND deleted_at IS NULL """, (product_ids,), ) or [] return {int(row["id"]): row for row in rows} def _derive_unit_price( product: Optional[Dict[str, Any]], price_type: str, custom_override: bool, provided_unit_price: Optional[float], ) -> float: if provided_unit_price is not None and custom_override: return float(provided_unit_price) normalized_type = (price_type or "manual").strip().lower() if normalized_type in PRICE_TYPE_FIELD_MAP and not custom_override: if not product: raise HTTPException(status_code=400, detail="product_id is required for period price types") price_field = PRICE_TYPE_FIELD_MAP[normalized_type] period_price = product.get(price_field) if period_price is None: raise HTTPException( status_code=400, detail=f"Product {product.get('id')} has no {price_field} configured", ) return float(period_price) if provided_unit_price is not None: return float(provided_unit_price) if product and product.get("sales_price") is not None: return float(product.get("sales_price")) raise HTTPException(status_code=400, detail="unit_price is required") def _normalize_line_items( line_items: List[SubscriptionLineInput], default_price_type: str, default_custom_override: bool, ) -> List[Dict[str, Any]]: product_ids = [line.product_id for line in line_items if line.product_id is not None] product_map = _resolve_product_map([int(pid) for pid in product_ids]) normalized: List[Dict[str, Any]] = [] for line in line_items: line_data = line.model_dump(exclude_none=True) line_price_type = (line_data.get("price_type") or default_price_type or "manual").strip().lower() line_custom_override = bool(line_data.get("custom_price_override", default_custom_override)) product = product_map.get(int(line.product_id)) if line.product_id is not None else None unit_price = _derive_unit_price( product=product, price_type=line_price_type, custom_override=line_custom_override, provided_unit_price=line_data.get("unit_price"), ) line_data["unit_price"] = unit_price line_data["quantity"] = float(line_data.get("quantity", 1)) line_data["price_type"] = line_price_type line_data["custom_price_override"] = line_custom_override if isinstance(line_data.get("period_from"), date): line_data["period_from"] = line_data["period_from"].isoformat() if isinstance(line_data.get("period_to"), date): line_data["period_to"] = line_data["period_to"].isoformat() normalized.append(line_data) return normalized def _build_due_subscription_preview(as_of: date, customer_id: Optional[int]) -> Dict[str, Any]: where = [ "s.status = 'active'", "s.next_invoice_date <= %s", "COALESCE(s.billing_blocked, false) = false", ] params: List[Any] = [as_of] if customer_id is not None: where.append("s.customer_id = %s") params.append(customer_id) rows = execute_query( f""" SELECT s.id, s.customer_id, c.name AS customer_name, s.invoice_merge_key, s.billing_direction, s.next_invoice_date, s.period_start, s.billing_interval, COALESCE( ( SELECT json_agg( json_build_object( 'id', i.id, 'line_total', i.line_total, 'billing_blocked', i.billing_blocked, 'period_from', i.period_from, 'period_to', i.period_to ) ORDER BY i.id ASC ) FROM sag_subscription_items i WHERE i.subscription_id = s.id ), '[]'::json ) AS line_items FROM sag_subscriptions s LEFT JOIN customers c ON c.id = s.customer_id WHERE {' AND '.join(where)} ORDER BY s.customer_id, s.next_invoice_date, s.id """, tuple(params), ) or [] groups: Dict[str, Dict[str, Any]] = {} for row in rows: merge_key = row.get("invoice_merge_key") or f"cust-{row['customer_id']}" group_id = f"{row['customer_id']}|{merge_key}|{row.get('next_invoice_date')}|{row.get('billing_direction') or 'forward'}" grp = groups.setdefault( group_id, { "customer_id": row["customer_id"], "customer_name": row.get("customer_name"), "invoice_merge_key": merge_key, "invoice_date": str(row.get("next_invoice_date")), "billing_direction": row.get("billing_direction") or "forward", "subscription_ids": [], "coverage_start": None, "coverage_end": None, "line_count": 0, "amount_total": 0.0, }, ) grp["subscription_ids"].append(int(row["id"])) period_start = row.get("period_start") or row.get("next_invoice_date") period_end = _add_interval(period_start, row.get("billing_interval") or "monthly") grp["coverage_start"] = ( str(period_start) if grp["coverage_start"] is None or str(period_start) < grp["coverage_start"] else grp["coverage_start"] ) grp["coverage_end"] = ( str(period_end) if grp["coverage_end"] is None or str(period_end) > grp["coverage_end"] else grp["coverage_end"] ) for item in row.get("line_items") or []: if item.get("billing_blocked"): continue grp["line_count"] += 1 grp["amount_total"] += float(item.get("line_total") or 0) group_values = list(groups.values()) return { "as_of": str(as_of), "customer_id": customer_id, "groups": group_values, "group_count": len(group_values), "amount_total": round(sum(float(g.get("amount_total") or 0) for g in group_values), 2), } def _generate_due_invoices_for_customer(as_of: date, customer_id: int) -> Dict[str, Any]: conn = get_db_connection() created_drafts = 0 touched_subscriptions = 0 try: with conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute( """ SELECT s.id, s.customer_id, c.name AS customer_name, s.billing_interval, s.billing_direction, s.next_invoice_date, s.period_start, s.invoice_merge_key, COALESCE( ( SELECT json_agg( json_build_object( 'description', i.description, 'quantity', i.quantity, 'unit_price', i.unit_price, 'line_total', i.line_total, 'product_id', i.product_id, 'asset_id', i.asset_id, 'period_from', i.period_from, 'period_to', i.period_to, 'billing_blocked', i.billing_blocked ) ORDER BY i.id ASC ) FROM sag_subscription_items i WHERE i.subscription_id = s.id ), '[]'::json ) AS line_items FROM sag_subscriptions s LEFT JOIN customers c ON c.id = s.customer_id WHERE s.status = 'active' AND s.customer_id = %s AND s.next_invoice_date <= %s AND COALESCE(s.billing_blocked, false) = false ORDER BY s.next_invoice_date ASC, s.id ASC """, (customer_id, as_of), ) subscriptions = cursor.fetchall() or [] if not subscriptions: return { "status": "ok", "message": "No due subscriptions for customer", "created_drafts": 0, "processed_subscriptions": 0, } grouped: Dict[str, List[Dict[str, Any]]] = {} for sub in subscriptions: merge_key = sub.get("invoice_merge_key") or f"cust-{sub['customer_id']}" group_key = ( f"{sub['customer_id']}|{merge_key}|{sub.get('next_invoice_date')}|" f"{sub.get('billing_direction') or 'forward'}" ) grouped.setdefault(group_key, []).append(sub) for group in grouped.values(): first = group[0] merge_key = first.get("invoice_merge_key") or f"cust-{first['customer_id']}" billing_direction = first.get("billing_direction") or "forward" customer_name = first.get("customer_name") or f"Customer #{first['customer_id']}" source_ids: List[int] = [] lines: List[Dict[str, Any]] = [] coverage_start: Optional[date] = None coverage_end: Optional[date] = None for sub in group: source_ids.append(int(sub["id"])) period_start = sub.get("period_start") or sub.get("next_invoice_date") period_end = _add_interval(period_start, sub.get("billing_interval") or "monthly") coverage_start = period_start if coverage_start is None or period_start < coverage_start else coverage_start coverage_end = period_end if coverage_end is None or period_end > coverage_end else coverage_end for item in sub.get("line_items") or []: if item.get("billing_blocked"): continue lines.append( { "product": { "productNumber": str(item.get("product_id") or "SUB"), "description": item.get("description") or "", }, "quantity": float(item.get("quantity") or 1), "unitNetPrice": float(item.get("unit_price") or 0), "totalNetAmount": float(item.get("line_total") or 0), "discountPercentage": 0, "metadata": { "subscription_id": int(sub["id"]), "asset_id": item.get("asset_id"), "period_from": str(item.get("period_from") or period_start), "period_to": str(item.get("period_to") or period_end), }, } ) if not lines: continue cursor.execute( """ SELECT id FROM ordre_drafts WHERE customer_id = %s AND invoice_aggregate_key = %s AND coverage_start = %s AND coverage_end = %s AND deleted_at IS NULL AND sync_status IN ('pending', 'exported', 'posted', 'paid') LIMIT 1 """, (first["customer_id"], merge_key, coverage_start, coverage_end), ) existing = cursor.fetchone() if existing: continue cursor.execute( """ INSERT INTO ordre_drafts ( title, customer_id, lines_json, notes, coverage_start, coverage_end, billing_direction, source_subscription_ids, invoice_aggregate_key, layout_number, created_by_user_id, sync_status, export_status_json, updated_at ) VALUES (%s, %s, %s::jsonb, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, CURRENT_TIMESTAMP) """, ( f"Abonnementer: {customer_name}", first["customer_id"], json.dumps(lines, ensure_ascii=False), ( "Aggregated abonnement faktura\n" f"Kunde: {customer_name}\n" f"Coverage: {coverage_start} til {coverage_end}\n" f"Subscription IDs: {', '.join(str(sid) for sid in source_ids)}" ), coverage_start, coverage_end, billing_direction, source_ids, merge_key, 1, None, "pending", json.dumps({"source": "subscription", "subscription_ids": source_ids}, ensure_ascii=False), ), ) created_drafts += 1 for sub in group: period_start = sub.get("period_start") or sub.get("next_invoice_date") new_period_start = _add_interval(period_start, sub.get("billing_interval") or "monthly") new_next_invoice_date = _add_interval(new_period_start, sub.get("billing_interval") or "monthly") cursor.execute( """ UPDATE sag_subscriptions SET period_start = %s, next_invoice_date = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s """, (new_period_start, new_next_invoice_date, sub["id"]), ) touched_subscriptions += 1 conn.commit() return { "status": "ok", "message": "Customer-scoped invoice generation completed", "created_drafts": created_drafts, "processed_subscriptions": touched_subscriptions, "customer_id": customer_id, "as_of": str(as_of), } except Exception: conn.rollback() raise finally: release_db_connection(conn) class SubscriptionLineInput(BaseModel): product_id: Optional[int] = None description: str quantity: float = 1 unit_price: Optional[float] = None asset_id: Optional[int] = None period_from: Optional[date] = None period_to: Optional[date] = None serial_number: Optional[str] = None price_type: Literal["manual", "day", "week", "month", "year"] = "manual" custom_price_override: bool = False class SubscriptionCreateInput(BaseModel): customer_id: int sag_id: int product_name: str = Field(min_length=1) price: float = Field(ge=0) billing_interval: Literal["daily", "biweekly", "monthly", "quarterly", "yearly"] = "monthly" billing_day: int = Field(default=1, ge=1, le=31) start_date: date end_date: Optional[date] = None binding_months: int = Field(default=0, ge=0) billing_direction: Literal["forward", "backward"] = "forward" price_type: Literal["manual", "day", "week", "month", "year"] = "manual" custom_price_override: bool = False first_invoice_policy: Literal["start_date", "next_cycle"] = "start_date" invoice_merge_key: Optional[str] = None notes: Optional[str] = None line_items: List[SubscriptionLineInput] = Field(default_factory=list) class SubscriptionUpdateInput(BaseModel): product_name: Optional[str] = None price: Optional[float] = Field(default=None, ge=0) billing_interval: Optional[Literal["daily", "biweekly", "monthly", "quarterly", "yearly"]] = None billing_day: Optional[int] = Field(default=None, ge=1, le=31) start_date: Optional[date] = None end_date: Optional[date] = None binding_months: Optional[int] = Field(default=None, ge=0) billing_direction: Optional[Literal["forward", "backward"]] = None price_type: Optional[Literal["manual", "day", "week", "month", "year"]] = None custom_price_override: Optional[bool] = None first_invoice_policy: Optional[Literal["start_date", "next_cycle"]] = None invoice_merge_key: Optional[str] = None notes: Optional[str] = None status: Optional[Literal["draft", "active", "paused", "cancelled"]] = None line_items: Optional[List[SubscriptionLineInput]] = None class AssetStatusInput(BaseModel): status: Literal["ledig", "udlejet", "defekt", "retur"] status_reason: Optional[str] = None class AssetBindingCreateInput(BaseModel): asset_id: int start_date: date end_date: Optional[date] = None binding_months: int = Field(default=0, ge=0) shared_binding_key: Optional[str] = None notice_period_days: int = Field(default=30, ge=0) sag_id: Optional[int] = None created_by_user_id: Optional[int] = None class InvoiceGenerateInput(BaseModel): preview: bool = False customer_id: Optional[int] = None as_of: Optional[date] = None push_to_economic: bool = False @router.get("/assets", response_model=List[Dict[str, Any]]) async def list_assets( status: str = Query("all"), customer_id: Optional[int] = Query(default=None), only_rental_enabled: bool = Query(default=False), ): """Alias endpoint for rental-focused asset listing.""" where = ["ha.deleted_at IS NULL"] params: List[Any] = [] if status != "all": where.append("COALESCE(ha.rental_status, 'ledig') = %s") params.append(status) if customer_id is not None: where.append("ha.current_owner_customer_id = %s") params.append(customer_id) if only_rental_enabled: where.append( "EXISTS (SELECT 1 FROM products p WHERE p.rental_asset_enabled = true AND p.deleted_at IS NULL)" ) rows = execute_query( f""" SELECT ha.id, ha.asset_type, ha.brand, ha.model, ha.serial_number, ha.customer_asset_id, ha.internal_asset_id, ha.current_owner_customer_id AS customer_id, c.name AS customer_name, COALESCE(ha.rental_status, 'ledig') AS rental_status, ha.status AS hardware_status, ha.status_reason, ha.updated_at FROM hardware_assets ha LEFT JOIN customers c ON c.id = ha.current_owner_customer_id WHERE {' AND '.join(where)} ORDER BY ha.updated_at DESC, ha.id DESC """, tuple(params), ) or [] return rows @router.put("/assets/{asset_id}/status", response_model=Dict[str, Any]) async def update_asset_rental_status(asset_id: int, payload: AssetStatusInput): """Alias endpoint for updating rental status without changing core hardware lifecycle status.""" row = execute_query( """ UPDATE hardware_assets SET rental_status = %s, status_reason = COALESCE(%s, status_reason), updated_at = CURRENT_TIMESTAMP WHERE id = %s AND deleted_at IS NULL RETURNING id, COALESCE(rental_status, 'ledig') AS status, status_reason, updated_at """, (payload.status, payload.status_reason, asset_id), ) if not row: raise HTTPException(status_code=404, detail="Asset not found") return row[0] @router.get("/subscriptions", response_model=List[Dict[str, Any]]) async def list_subscriptions_alias(status: str = Query("all")): where = "" params: List[Any] = [] if status != "all": where = "WHERE s.status = %s" params.append(status) rows = execute_query( f""" SELECT s.id, s.subscription_number, s.customer_id, c.name AS customer_name, s.sag_id, s.product_name, s.price, s.billing_interval, s.billing_direction, s.start_date, s.end_date, s.status, s.binding_months, s.invoice_merge_key, COALESCE( ( SELECT json_agg(json_build_object( 'id', i.id, 'description', i.description, 'quantity', i.quantity, 'unit_price', i.unit_price, 'line_total', i.line_total, 'asset_id', i.asset_id, 'period_from', i.period_from, 'period_to', i.period_to, 'serial_number', i.serial_number ) ORDER BY i.line_no ASC, i.id ASC) FROM sag_subscription_items i WHERE i.subscription_id = s.id ), '[]'::json ) AS line_items FROM sag_subscriptions s LEFT JOIN customers c ON c.id = s.customer_id {where} ORDER BY s.start_date DESC, s.id DESC """, tuple(params), ) or [] return rows @router.post("/subscriptions", response_model=Dict[str, Any]) async def create_subscription_alias(payload: SubscriptionCreateInput): """Alias endpoint that maps to existing sag subscription engine.""" normalized_lines = _normalize_line_items( payload.line_items, default_price_type=payload.price_type, default_custom_override=payload.custom_price_override, ) for line in normalized_lines: asset_id = line.get("asset_id") if asset_id is None: continue _assert_asset_booking_available( int(asset_id), start_date=payload.start_date, end_date=payload.end_date, ) body: Dict[str, Any] = { "customer_id": payload.customer_id, "sag_id": payload.sag_id, "product_name": payload.product_name, "price": payload.price, "billing_interval": payload.billing_interval, "billing_day": payload.billing_day, "start_date": payload.start_date.isoformat(), "end_date": payload.end_date.isoformat() if payload.end_date else None, "binding_months": payload.binding_months, "billing_direction": payload.billing_direction, "price_type": payload.price_type, "custom_price_override": payload.custom_price_override, "first_invoice_policy": payload.first_invoice_policy, "invoice_merge_key": payload.invoice_merge_key, "notes": payload.notes, "line_items": normalized_lines, } return await create_sag_subscription(body) @router.put("/subscriptions/{subscription_id}", response_model=Dict[str, Any]) async def update_subscription_alias(subscription_id: int, payload: SubscriptionUpdateInput): body = payload.model_dump(exclude_none=True) current = execute_query_single( "SELECT id, start_date, end_date, price_type, custom_price_override FROM sag_subscriptions WHERE id = %s", (subscription_id,), ) if not current: raise HTTPException(status_code=404, detail="Subscription not found") effective_start = payload.start_date or current.get("start_date") effective_end = payload.end_date if payload.end_date is not None else current.get("end_date") effective_price_type = payload.price_type or current.get("price_type") or "manual" effective_override = ( payload.custom_price_override if payload.custom_price_override is not None else bool(current.get("custom_price_override")) ) if "line_items" in body: normalized_lines = _normalize_line_items( payload.line_items or [], default_price_type=effective_price_type, default_custom_override=effective_override, ) for line in normalized_lines: asset_id = line.get("asset_id") if asset_id is None: continue _assert_asset_booking_available( int(asset_id), start_date=effective_start, end_date=effective_end, exclude_subscription_id=subscription_id, ) body["line_items"] = normalized_lines if isinstance(body.get("start_date"), date): body["start_date"] = body["start_date"].isoformat() if isinstance(body.get("end_date"), date): body["end_date"] = body["end_date"].isoformat() return await update_sag_subscription(subscription_id, body) @router.post("/subscriptions/{subscription_id}/asset-bindings", response_model=Dict[str, Any]) async def create_subscription_asset_binding_alias(subscription_id: int, payload: AssetBindingCreateInput): _assert_asset_booking_available( asset_id=payload.asset_id, start_date=payload.start_date, end_date=payload.end_date, exclude_subscription_id=subscription_id, ) end_date = payload.end_date if end_date is None and payload.binding_months > 0: end_date = payload.start_date + relativedelta(months=payload.binding_months) row = execute_query( """ INSERT INTO subscription_asset_bindings ( subscription_id, asset_id, shared_binding_key, binding_months, start_date, end_date, notice_period_days, status, sag_id, created_by_user_id ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'active', %s, %s) RETURNING * """, ( subscription_id, payload.asset_id, payload.shared_binding_key, payload.binding_months, payload.start_date, end_date, payload.notice_period_days, payload.sag_id, payload.created_by_user_id, ), ) if not row: raise HTTPException(status_code=500, detail="Failed to create binding") return row[0] @router.get("/invoices", response_model=List[Dict[str, Any]]) async def list_invoices_alias( status: str = Query("all"), customer_id: Optional[int] = Query(default=None), limit: int = Query(default=100, ge=1, le=500), ): where = ["1=1"] params: List[Any] = [] if status != "all": where.append("d.sync_status = %s") params.append(status) if customer_id is not None: where.append("d.customer_id = %s") params.append(customer_id) params.append(limit) rows = execute_query( f""" SELECT d.id, d.customer_id, c.name AS customer_name, d.title, d.invoice_date, d.due_date, d.coverage_start, d.coverage_end, d.total_amount, d.sync_status AS status, d.economic_order_number, d.economic_invoice_number, d.last_sync_at, d.created_at, d.updated_at FROM ordre_drafts d LEFT JOIN customers c ON c.id = d.customer_id WHERE {' AND '.join(where)} ORDER BY d.created_at DESC, d.id DESC LIMIT %s """, tuple(params), ) or [] return rows @router.post("/invoices/generate", response_model=Dict[str, Any]) async def generate_invoices_alias(payload: InvoiceGenerateInput | None = None): """Generate invoices globally or scoped by customer, with preview support.""" req = payload or InvoiceGenerateInput() as_of = req.as_of or date.today() economic_safety = _economic_safety_state() if req.push_to_economic: raise HTTPException( status_code=409, detail={ "message": "Direct e-conomic sync is blocked in this alias endpoint to protect live accounting.", "action": "Use draft generation here and run dedicated sync flow manually.", "safety": economic_safety, }, ) if req.preview: preview = _build_due_subscription_preview(as_of=as_of, customer_id=req.customer_id) return {"status": "preview", "economic_sync_attempted": False, "safety": economic_safety, **preview} if req.customer_id is not None: try: result = _generate_due_invoices_for_customer(as_of=as_of, customer_id=req.customer_id) result["economic_sync_attempted"] = False result["safety"] = economic_safety return result except Exception as exc: logger.error("Failed customer-scoped invoice generation: %s", exc, exc_info=True) raise HTTPException(status_code=500, detail=f"Invoice generation failed: {exc}") try: await process_subscriptions() return { "status": "ok", "message": "Invoice generation job completed (drafts only, no e-conomic sync)", "economic_sync_attempted": False, "safety": economic_safety, } except Exception as exc: logger.error("Failed running invoice generation job: %s", exc, exc_info=True) raise HTTPException(status_code=500, detail=f"Invoice generation failed: {exc}") @router.get("/invoices/sync-safety", response_model=Dict[str, Any]) async def get_invoice_sync_safety_status( customer_id: Optional[int] = Query(default=None), ): """Read-only safety overview for invoice sync readiness. Does not call e-conomic.""" safety = _economic_safety_state() where = ["d.deleted_at IS NULL"] params: List[Any] = [] if customer_id is not None: where.append("d.customer_id = %s") params.append(customer_id) stats_rows = execute_query( f""" SELECT COALESCE(d.sync_status, 'unknown') AS sync_status, COUNT(*)::int AS count FROM ordre_drafts d WHERE {' AND '.join(where)} GROUP BY COALESCE(d.sync_status, 'unknown') ORDER BY 1 """, tuple(params), ) or [] sample_rows = execute_query( f""" SELECT d.id, d.customer_id, c.name AS customer_name, d.title, d.sync_status, d.economic_order_number, d.economic_invoice_number, d.last_sync_at, d.created_at FROM ordre_drafts d LEFT JOIN customers c ON c.id = d.customer_id WHERE {' AND '.join(where)} AND COALESCE(d.sync_status, 'pending') IN ('pending', 'failed', 'exported') ORDER BY d.created_at DESC, d.id DESC LIMIT 20 """, tuple(params), ) or [] stats: Dict[str, int] = {row["sync_status"]: int(row["count"]) for row in stats_rows} draft_sync_allowed = not (safety["ordre_economic_read_only"] or safety["ordre_economic_dry_run"]) return { "status": "ok", "mode": "live_write_enabled" if draft_sync_allowed else "safe_no_write", "customer_id": customer_id, "safety": safety, "sync_write_allowed": draft_sync_allowed, "sync_write_block_reason": None if draft_sync_allowed else "ORDRE_ECONOMIC_READ_ONLY or ORDRE_ECONOMIC_DRY_RUN is enabled", "stats": stats, "candidate_count": len(sample_rows), "candidates": sample_rows, } @router.get("/invoices/{draft_id}/sync-preview", response_model=Dict[str, Any]) async def preview_invoice_sync(draft_id: int): """Preview sync intent for one draft without sending data to e-conomic.""" safety = _economic_safety_state() draft = execute_query_single( """ SELECT d.id, d.customer_id, c.name AS customer_name, d.title, d.sync_status, d.economic_order_number, d.economic_invoice_number, d.last_sync_at, d.created_at, d.updated_at FROM ordre_drafts d LEFT JOIN customers c ON c.id = d.customer_id WHERE d.id = %s AND d.deleted_at IS NULL """, (draft_id,), ) if not draft: raise HTTPException(status_code=404, detail="Invoice draft not found") next_action = "none" if draft.get("sync_status") in {"pending", "failed"}: next_action = "export_order" elif draft.get("sync_status") == "exported": next_action = "book_invoice" draft_sync_allowed = not (safety["ordre_economic_read_only"] or safety["ordre_economic_dry_run"]) return { "status": "preview", "draft": draft, "next_action": next_action, "will_sync": False, "safety": safety, "sync_write_allowed": draft_sync_allowed, "sync_write_block_reason": None if draft_sync_allowed else "ORDRE_ECONOMIC_READ_ONLY or ORDRE_ECONOMIC_DRY_RUN is enabled", "message": "Preview only. No e-conomic API call has been made.", }