bmc_hub/app/modules/rentals/backend/router.py

1362 lines
49 KiB
Python
Raw Normal View History

from __future__ import annotations
import logging
import json
from datetime import date
from typing import Any, Dict, List, Literal, Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, Field
from psycopg2.extras import RealDictCursor
from dateutil.relativedelta import relativedelta
from app.core.database import execute_query, execute_query_single, get_db_connection, release_db_connection
from app.core.config import settings
from app.jobs.process_subscriptions import process_subscriptions
from app.subscriptions.backend.router import create_subscription as create_sag_subscription
from app.subscriptions.backend.router import create_subscription_asset_binding as create_sag_asset_binding
from app.subscriptions.backend.router import update_subscription as update_sag_subscription
logger = logging.getLogger(__name__)
router = APIRouter()
PRICE_TYPE_FIELD_MAP = {
"day": "rental_price_day",
"week": "rental_price_week",
"month": "rental_price_month",
"year": "rental_price_year",
}
def _economic_safety_state() -> Dict[str, bool]:
return {
"economic_enabled": bool(getattr(settings, "ECONOMIC_ENABLED", False)),
"economic_read_only": bool(getattr(settings, "ECONOMIC_READ_ONLY", True)),
"economic_dry_run": bool(getattr(settings, "ECONOMIC_DRY_RUN", True)),
"ordre_economic_read_only": bool(getattr(settings, "ORDRE_ECONOMIC_READ_ONLY", True)),
"ordre_economic_dry_run": bool(getattr(settings, "ORDRE_ECONOMIC_DRY_RUN", True)),
}
def _add_interval(start: date, interval: str) -> date:
normalized = (interval or "monthly").strip().lower()
if normalized == "daily":
return start + relativedelta(days=1)
if normalized == "biweekly":
return start + relativedelta(weeks=2)
if normalized == "quarterly":
return start + relativedelta(months=3)
if normalized == "yearly":
return start + relativedelta(years=1)
return start + relativedelta(months=1)
def _assert_asset_booking_available(
asset_id: int,
start_date: date,
end_date: Optional[date],
exclude_subscription_id: Optional[int] = None,
) -> None:
where_extra = ""
params: List[Any] = [asset_id, start_date, end_date]
if exclude_subscription_id is not None:
where_extra = " AND b.subscription_id != %s"
params.append(exclude_subscription_id)
overlap = execute_query_single(
f"""
SELECT b.id, b.subscription_id, b.start_date, b.end_date
FROM subscription_asset_bindings b
WHERE b.asset_id = %s
AND b.deleted_at IS NULL
AND b.status = 'active'
{where_extra}
AND NOT (
COALESCE(b.end_date, DATE '9999-12-31') < %s
OR b.start_date > COALESCE(%s, DATE '9999-12-31')
)
LIMIT 1
""",
tuple(params),
)
if overlap:
raise HTTPException(
status_code=409,
detail=(
f"Asset {asset_id} is already booked in subscription "
f"{overlap.get('subscription_id')} from {overlap.get('start_date')} to {overlap.get('end_date')}"
),
)
def _resolve_product_map(product_ids: List[int]) -> Dict[int, Dict[str, Any]]:
if not product_ids:
return {}
rows = execute_query(
"""
SELECT
id,
sales_price,
rental_price_day,
rental_price_week,
rental_price_month,
rental_price_year
FROM products
WHERE id = ANY(%s)
AND deleted_at IS NULL
""",
(product_ids,),
) or []
return {int(row["id"]): row for row in rows}
def _derive_unit_price(
product: Optional[Dict[str, Any]],
price_type: str,
custom_override: bool,
provided_unit_price: Optional[float],
) -> float:
if provided_unit_price is not None and custom_override:
return float(provided_unit_price)
normalized_type = (price_type or "manual").strip().lower()
if normalized_type in PRICE_TYPE_FIELD_MAP and not custom_override:
if not product:
raise HTTPException(status_code=400, detail="product_id is required for period price types")
price_field = PRICE_TYPE_FIELD_MAP[normalized_type]
period_price = product.get(price_field)
if period_price is None:
raise HTTPException(
status_code=400,
detail=f"Product {product.get('id')} has no {price_field} configured",
)
return float(period_price)
if provided_unit_price is not None:
return float(provided_unit_price)
if product and product.get("sales_price") is not None:
return float(product.get("sales_price"))
raise HTTPException(status_code=400, detail="unit_price is required")
def _normalize_line_items(
line_items: List[SubscriptionLineInput],
default_price_type: str,
default_custom_override: bool,
) -> List[Dict[str, Any]]:
product_ids = [line.product_id for line in line_items if line.product_id is not None]
product_map = _resolve_product_map([int(pid) for pid in product_ids])
normalized: List[Dict[str, Any]] = []
for line in line_items:
line_data = line.model_dump(exclude_none=True)
line_price_type = (line_data.get("price_type") or default_price_type or "manual").strip().lower()
line_custom_override = bool(line_data.get("custom_price_override", default_custom_override))
product = product_map.get(int(line.product_id)) if line.product_id is not None else None
unit_price = _derive_unit_price(
product=product,
price_type=line_price_type,
custom_override=line_custom_override,
provided_unit_price=line_data.get("unit_price"),
)
line_data["unit_price"] = unit_price
line_data["quantity"] = float(line_data.get("quantity", 1))
line_data["price_type"] = line_price_type
line_data["custom_price_override"] = line_custom_override
if isinstance(line_data.get("period_from"), date):
line_data["period_from"] = line_data["period_from"].isoformat()
if isinstance(line_data.get("period_to"), date):
line_data["period_to"] = line_data["period_to"].isoformat()
normalized.append(line_data)
return normalized
def _build_due_subscription_preview(as_of: date, customer_id: Optional[int]) -> Dict[str, Any]:
where = [
"s.status = 'active'",
"s.next_invoice_date <= %s",
"COALESCE(s.billing_blocked, false) = false",
]
params: List[Any] = [as_of]
if customer_id is not None:
where.append("s.customer_id = %s")
params.append(customer_id)
rows = execute_query(
f"""
SELECT
s.id,
s.customer_id,
c.name AS customer_name,
s.invoice_merge_key,
s.billing_direction,
s.next_invoice_date,
s.period_start,
s.billing_interval,
COALESCE(
(
SELECT json_agg(
json_build_object(
'id', i.id,
'line_total', i.line_total,
'billing_blocked', i.billing_blocked,
'period_from', i.period_from,
'period_to', i.period_to
) ORDER BY i.id ASC
)
FROM sag_subscription_items i
WHERE i.subscription_id = s.id
),
'[]'::json
) AS line_items
FROM sag_subscriptions s
LEFT JOIN customers c ON c.id = s.customer_id
WHERE {' AND '.join(where)}
ORDER BY s.customer_id, s.next_invoice_date, s.id
""",
tuple(params),
) or []
groups: Dict[str, Dict[str, Any]] = {}
for row in rows:
merge_key = row.get("invoice_merge_key") or f"cust-{row['customer_id']}"
group_id = f"{row['customer_id']}|{merge_key}|{row.get('next_invoice_date')}|{row.get('billing_direction') or 'forward'}"
grp = groups.setdefault(
group_id,
{
"customer_id": row["customer_id"],
"customer_name": row.get("customer_name"),
"invoice_merge_key": merge_key,
"invoice_date": str(row.get("next_invoice_date")),
"billing_direction": row.get("billing_direction") or "forward",
"subscription_ids": [],
"coverage_start": None,
"coverage_end": None,
"line_count": 0,
"amount_total": 0.0,
},
)
grp["subscription_ids"].append(int(row["id"]))
period_start = row.get("period_start") or row.get("next_invoice_date")
period_end = _add_interval(period_start, row.get("billing_interval") or "monthly")
grp["coverage_start"] = (
str(period_start)
if grp["coverage_start"] is None or str(period_start) < grp["coverage_start"]
else grp["coverage_start"]
)
grp["coverage_end"] = (
str(period_end)
if grp["coverage_end"] is None or str(period_end) > grp["coverage_end"]
else grp["coverage_end"]
)
for item in row.get("line_items") or []:
if item.get("billing_blocked"):
continue
grp["line_count"] += 1
grp["amount_total"] += float(item.get("line_total") or 0)
group_values = list(groups.values())
return {
"as_of": str(as_of),
"customer_id": customer_id,
"groups": group_values,
"group_count": len(group_values),
"amount_total": round(sum(float(g.get("amount_total") or 0) for g in group_values), 2),
}
def _generate_due_invoices_for_customer(as_of: date, customer_id: int) -> Dict[str, Any]:
conn = get_db_connection()
created_drafts = 0
touched_subscriptions = 0
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
SELECT
s.id,
s.customer_id,
c.name AS customer_name,
s.billing_interval,
s.billing_direction,
s.next_invoice_date,
s.period_start,
s.invoice_merge_key,
COALESCE(
(
SELECT json_agg(
json_build_object(
'description', i.description,
'quantity', i.quantity,
'unit_price', i.unit_price,
'line_total', i.line_total,
'product_id', i.product_id,
'asset_id', i.asset_id,
'period_from', i.period_from,
'period_to', i.period_to,
'billing_blocked', i.billing_blocked
) ORDER BY i.id ASC
)
FROM sag_subscription_items i
WHERE i.subscription_id = s.id
),
'[]'::json
) AS line_items
FROM sag_subscriptions s
LEFT JOIN customers c ON c.id = s.customer_id
WHERE s.status = 'active'
AND s.customer_id = %s
AND s.next_invoice_date <= %s
AND COALESCE(s.billing_blocked, false) = false
ORDER BY s.next_invoice_date ASC, s.id ASC
""",
(customer_id, as_of),
)
subscriptions = cursor.fetchall() or []
if not subscriptions:
return {
"status": "ok",
"message": "No due subscriptions for customer",
"created_drafts": 0,
"processed_subscriptions": 0,
}
grouped: Dict[str, List[Dict[str, Any]]] = {}
for sub in subscriptions:
merge_key = sub.get("invoice_merge_key") or f"cust-{sub['customer_id']}"
group_key = (
f"{sub['customer_id']}|{merge_key}|{sub.get('next_invoice_date')}|"
f"{sub.get('billing_direction') or 'forward'}"
)
grouped.setdefault(group_key, []).append(sub)
for group in grouped.values():
first = group[0]
merge_key = first.get("invoice_merge_key") or f"cust-{first['customer_id']}"
billing_direction = first.get("billing_direction") or "forward"
customer_name = first.get("customer_name") or f"Customer #{first['customer_id']}"
source_ids: List[int] = []
lines: List[Dict[str, Any]] = []
coverage_start: Optional[date] = None
coverage_end: Optional[date] = None
for sub in group:
source_ids.append(int(sub["id"]))
period_start = sub.get("period_start") or sub.get("next_invoice_date")
period_end = _add_interval(period_start, sub.get("billing_interval") or "monthly")
coverage_start = period_start if coverage_start is None or period_start < coverage_start else coverage_start
coverage_end = period_end if coverage_end is None or period_end > coverage_end else coverage_end
for item in sub.get("line_items") or []:
if item.get("billing_blocked"):
continue
lines.append(
{
"product": {
"productNumber": str(item.get("product_id") or "SUB"),
"description": item.get("description") or "",
},
"quantity": float(item.get("quantity") or 1),
"unitNetPrice": float(item.get("unit_price") or 0),
"totalNetAmount": float(item.get("line_total") or 0),
"discountPercentage": 0,
"metadata": {
"subscription_id": int(sub["id"]),
"asset_id": item.get("asset_id"),
"period_from": str(item.get("period_from") or period_start),
"period_to": str(item.get("period_to") or period_end),
},
}
)
if not lines:
continue
cursor.execute(
"""
SELECT id
FROM ordre_drafts
WHERE customer_id = %s
AND invoice_aggregate_key = %s
AND coverage_start = %s
AND coverage_end = %s
AND deleted_at IS NULL
AND sync_status IN ('pending', 'exported', 'posted', 'paid')
LIMIT 1
""",
(first["customer_id"], merge_key, coverage_start, coverage_end),
)
existing = cursor.fetchone()
if existing:
continue
cursor.execute(
"""
INSERT INTO ordre_drafts (
title,
customer_id,
lines_json,
notes,
coverage_start,
coverage_end,
billing_direction,
source_subscription_ids,
invoice_aggregate_key,
layout_number,
created_by_user_id,
sync_status,
export_status_json,
updated_at
) VALUES (%s, %s, %s::jsonb, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, CURRENT_TIMESTAMP)
""",
(
f"Abonnementer: {customer_name}",
first["customer_id"],
json.dumps(lines, ensure_ascii=False),
(
"Aggregated abonnement faktura\n"
f"Kunde: {customer_name}\n"
f"Coverage: {coverage_start} til {coverage_end}\n"
f"Subscription IDs: {', '.join(str(sid) for sid in source_ids)}"
),
coverage_start,
coverage_end,
billing_direction,
source_ids,
merge_key,
1,
None,
"pending",
json.dumps({"source": "subscription", "subscription_ids": source_ids}, ensure_ascii=False),
),
)
created_drafts += 1
for sub in group:
period_start = sub.get("period_start") or sub.get("next_invoice_date")
new_period_start = _add_interval(period_start, sub.get("billing_interval") or "monthly")
new_next_invoice_date = _add_interval(new_period_start, sub.get("billing_interval") or "monthly")
cursor.execute(
"""
UPDATE sag_subscriptions
SET period_start = %s,
next_invoice_date = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""",
(new_period_start, new_next_invoice_date, sub["id"]),
)
touched_subscriptions += 1
conn.commit()
return {
"status": "ok",
"message": "Customer-scoped invoice generation completed",
"created_drafts": created_drafts,
"processed_subscriptions": touched_subscriptions,
"customer_id": customer_id,
"as_of": str(as_of),
}
except Exception:
conn.rollback()
raise
finally:
release_db_connection(conn)
class SubscriptionLineInput(BaseModel):
product_id: Optional[int] = None
description: str
quantity: float = 1
unit_price: Optional[float] = None
asset_id: Optional[int] = None
period_from: Optional[date] = None
period_to: Optional[date] = None
serial_number: Optional[str] = None
price_type: Literal["manual", "day", "week", "month", "year"] = "manual"
custom_price_override: bool = False
class SubscriptionCreateInput(BaseModel):
customer_id: int
sag_id: int
product_name: str = Field(min_length=1)
price: float = Field(ge=0)
billing_interval: Literal["daily", "biweekly", "monthly", "quarterly", "yearly"] = "monthly"
billing_day: int = Field(default=1, ge=1, le=31)
start_date: date
end_date: Optional[date] = None
binding_months: int = Field(default=0, ge=0)
billing_direction: Literal["forward", "backward"] = "forward"
price_type: Literal["manual", "day", "week", "month", "year"] = "manual"
custom_price_override: bool = False
first_invoice_policy: Literal["start_date", "next_cycle"] = "start_date"
invoice_merge_key: Optional[str] = None
notes: Optional[str] = None
line_items: List[SubscriptionLineInput] = Field(default_factory=list)
class SubscriptionUpdateInput(BaseModel):
product_name: Optional[str] = None
price: Optional[float] = Field(default=None, ge=0)
billing_interval: Optional[Literal["daily", "biweekly", "monthly", "quarterly", "yearly"]] = None
billing_day: Optional[int] = Field(default=None, ge=1, le=31)
start_date: Optional[date] = None
end_date: Optional[date] = None
binding_months: Optional[int] = Field(default=None, ge=0)
billing_direction: Optional[Literal["forward", "backward"]] = None
price_type: Optional[Literal["manual", "day", "week", "month", "year"]] = None
custom_price_override: Optional[bool] = None
first_invoice_policy: Optional[Literal["start_date", "next_cycle"]] = None
invoice_merge_key: Optional[str] = None
notes: Optional[str] = None
status: Optional[Literal["draft", "active", "paused", "cancelled"]] = None
line_items: Optional[List[SubscriptionLineInput]] = None
class AssetStatusInput(BaseModel):
status: Literal["ledig", "udlejet", "defekt", "retur"]
status_reason: Optional[str] = None
class AssetBindingCreateInput(BaseModel):
asset_id: int
start_date: date
end_date: Optional[date] = None
binding_months: int = Field(default=0, ge=0)
shared_binding_key: Optional[str] = None
notice_period_days: int = Field(default=30, ge=0)
sag_id: Optional[int] = None
created_by_user_id: Optional[int] = None
class QuickRentCreateInput(BaseModel):
customer_id: int
sag_id: int
start_date: date = Field(default_factory=date.today)
start_price: float = Field(default=0, ge=0)
freight_price: float = Field(default=0, ge=0)
preparation_price: float = Field(default=0, ge=0)
operations_monthly_price: float = Field(gt=0)
initial_operations_months: int = Field(default=2, ge=1, le=12)
notice_period_days: int = Field(default=30, ge=0)
created_by_user_id: Optional[int] = None
@router.get("/hardware/{hardware_id}/quick-rent/preview", response_model=Dict[str, Any])
async def quick_rent_preview(hardware_id: int, customer_id: int = Query(...), sag_id: int = Query(...)):
"""Preview whether quick-rent will reuse an existing subscription or create a new one."""
hardware = execute_query_single(
"""
SELECT id, current_owner_type
FROM hardware_assets
WHERE id = %s
AND deleted_at IS NULL
""",
(hardware_id,),
)
if not hardware:
return {
"can_submit": False,
"action": "blocked",
"message": "Hardware blev ikke fundet.",
}
if (hardware.get("current_owner_type") or "").strip().lower() != "bmc":
return {
"can_submit": False,
"action": "blocked",
"message": "Kun BMC-ejede assets kan udlejes fra denne modal.",
}
customer = execute_query_single(
"SELECT id FROM customers WHERE id = %s AND deleted_at IS NULL",
(customer_id,),
)
if not customer:
return {
"can_submit": False,
"action": "blocked",
"message": "Kunde blev ikke fundet.",
}
sag = execute_query_single(
"SELECT id, customer_id FROM sag_sager WHERE id = %s AND deleted_at IS NULL",
(sag_id,),
)
if not sag:
return {
"can_submit": False,
"action": "blocked",
"message": "Sag blev ikke fundet.",
}
if int(sag.get("customer_id") or 0) != int(customer_id):
return {
"can_submit": False,
"action": "blocked",
"message": "Sag og kunde matcher ikke.",
}
existing_subscription = execute_query_single(
"""
SELECT id, status
FROM sag_subscriptions
WHERE sag_id = %s
AND status IN ('draft', 'active', 'paused')
ORDER BY id DESC
LIMIT 1
""",
(sag_id,),
)
if existing_subscription:
return {
"can_submit": True,
"action": "reuse",
"subscription_id": int(existing_subscription.get("id")),
"message": f"Genbruger abonnement #{int(existing_subscription.get('id'))} paa sagen.",
}
return {
"can_submit": True,
"action": "create",
"subscription_id": None,
"message": "Der findes intet aktivt abonnement paa sagen. Der oprettes et nyt.",
}
class InvoiceGenerateInput(BaseModel):
preview: bool = False
customer_id: Optional[int] = None
as_of: Optional[date] = None
push_to_economic: bool = False
@router.get("/assets", response_model=List[Dict[str, Any]])
async def list_assets(
status: str = Query("all"),
customer_id: Optional[int] = Query(default=None),
only_rental_enabled: bool = Query(default=False),
):
"""Alias endpoint for rental-focused asset listing."""
where = ["ha.deleted_at IS NULL"]
params: List[Any] = []
if status != "all":
where.append("COALESCE(ha.rental_status, 'ledig') = %s")
params.append(status)
if customer_id is not None:
where.append("ha.current_owner_customer_id = %s")
params.append(customer_id)
if only_rental_enabled:
where.append(
"EXISTS (SELECT 1 FROM products p WHERE p.rental_asset_enabled = true AND p.deleted_at IS NULL)"
)
rows = execute_query(
f"""
SELECT
ha.id,
ha.asset_type,
ha.brand,
ha.model,
ha.serial_number,
ha.customer_asset_id,
ha.internal_asset_id,
ha.current_owner_customer_id AS customer_id,
c.name AS customer_name,
COALESCE(ha.rental_status, 'ledig') AS rental_status,
ha.status AS hardware_status,
ha.status_reason,
ha.updated_at
FROM hardware_assets ha
LEFT JOIN customers c ON c.id = ha.current_owner_customer_id
WHERE {' AND '.join(where)}
ORDER BY ha.updated_at DESC, ha.id DESC
""",
tuple(params),
) or []
return rows
@router.put("/assets/{asset_id}/status", response_model=Dict[str, Any])
async def update_asset_rental_status(asset_id: int, payload: AssetStatusInput):
"""Alias endpoint for updating rental status without changing core hardware lifecycle status."""
row = execute_query(
"""
UPDATE hardware_assets
SET rental_status = %s,
status_reason = COALESCE(%s, status_reason),
updated_at = CURRENT_TIMESTAMP
WHERE id = %s AND deleted_at IS NULL
RETURNING id, COALESCE(rental_status, 'ledig') AS status, status_reason, updated_at
""",
(payload.status, payload.status_reason, asset_id),
)
if not row:
raise HTTPException(status_code=404, detail="Asset not found")
return row[0]
@router.get("/subscriptions", response_model=List[Dict[str, Any]])
async def list_subscriptions_alias(status: str = Query("all")):
where = ""
params: List[Any] = []
if status != "all":
where = "WHERE s.status = %s"
params.append(status)
rows = execute_query(
f"""
SELECT
s.id,
s.subscription_number,
s.customer_id,
c.name AS customer_name,
s.sag_id,
s.product_name,
s.price,
s.billing_interval,
s.billing_direction,
s.start_date,
s.end_date,
s.status,
s.binding_months,
s.invoice_merge_key,
COALESCE(
(
SELECT json_agg(json_build_object(
'id', i.id,
'description', i.description,
'quantity', i.quantity,
'unit_price', i.unit_price,
'line_total', i.line_total,
'asset_id', i.asset_id,
'period_from', i.period_from,
'period_to', i.period_to,
'serial_number', i.serial_number
) ORDER BY i.line_no ASC, i.id ASC)
FROM sag_subscription_items i
WHERE i.subscription_id = s.id
),
'[]'::json
) AS line_items
FROM sag_subscriptions s
LEFT JOIN customers c ON c.id = s.customer_id
{where}
ORDER BY s.start_date DESC, s.id DESC
""",
tuple(params),
) or []
return rows
@router.post("/subscriptions", response_model=Dict[str, Any])
async def create_subscription_alias(payload: SubscriptionCreateInput):
"""Alias endpoint that maps to existing sag subscription engine."""
normalized_lines = _normalize_line_items(
payload.line_items,
default_price_type=payload.price_type,
default_custom_override=payload.custom_price_override,
)
for line in normalized_lines:
asset_id = line.get("asset_id")
if asset_id is None:
continue
_assert_asset_booking_available(
int(asset_id),
start_date=payload.start_date,
end_date=payload.end_date,
)
body: Dict[str, Any] = {
"customer_id": payload.customer_id,
"sag_id": payload.sag_id,
"product_name": payload.product_name,
"price": payload.price,
"billing_interval": payload.billing_interval,
"billing_day": payload.billing_day,
"start_date": payload.start_date.isoformat(),
"end_date": payload.end_date.isoformat() if payload.end_date else None,
"binding_months": payload.binding_months,
"billing_direction": payload.billing_direction,
"price_type": payload.price_type,
"custom_price_override": payload.custom_price_override,
"first_invoice_policy": payload.first_invoice_policy,
"invoice_merge_key": payload.invoice_merge_key,
"notes": payload.notes,
"line_items": normalized_lines,
}
return await create_sag_subscription(body)
@router.post("/hardware/{hardware_id}/quick-rent", response_model=Dict[str, Any])
async def quick_rent_hardware(hardware_id: int, payload: QuickRentCreateInput):
"""Create a rental subscription + asset binding + startup order draft in one step."""
hardware = execute_query_single(
"""
SELECT id, brand, model, serial_number, current_owner_type
FROM hardware_assets
WHERE id = %s
AND deleted_at IS NULL
""",
(hardware_id,),
)
if not hardware:
raise HTTPException(status_code=404, detail="Hardware not found")
if (hardware.get("current_owner_type") or "").strip().lower() != "bmc":
raise HTTPException(status_code=409, detail="Only BMC-owned assets can be rented from this flow")
customer = execute_query_single(
"SELECT id, name FROM customers WHERE id = %s AND deleted_at IS NULL",
(payload.customer_id,),
)
if not customer:
raise HTTPException(status_code=400, detail="Customer not found")
sag = execute_query_single(
"SELECT id, customer_id, titel FROM sag_sager WHERE id = %s AND deleted_at IS NULL",
(payload.sag_id,),
)
if not sag:
raise HTTPException(status_code=400, detail="Sag not found")
if int(sag.get("customer_id") or 0) != int(payload.customer_id):
raise HTTPException(status_code=400, detail="Sag customer mismatch")
_assert_asset_booking_available(
asset_id=hardware_id,
start_date=payload.start_date,
end_date=None,
)
hardware_label = f"{hardware.get('brand') or ''} {hardware.get('model') or ''}".strip() or f"Asset {hardware_id}"
existing_subscription = execute_query_single(
"""
SELECT id, customer_id
FROM sag_subscriptions
WHERE sag_id = %s
AND status IN ('draft', 'active', 'paused')
ORDER BY id DESC
LIMIT 1
""",
(payload.sag_id,),
)
created_new_subscription = False
if existing_subscription:
subscription_id = int(existing_subscription.get("id") or 0)
if int(existing_subscription.get("customer_id") or 0) != int(payload.customer_id):
raise HTTPException(status_code=400, detail="Existing subscription customer mismatch for sag")
duplicate_line = execute_query_single(
"""
SELECT id
FROM sag_subscription_items
WHERE subscription_id = %s
AND asset_id = %s
LIMIT 1
""",
(subscription_id, hardware_id),
)
if duplicate_line:
raise HTTPException(status_code=409, detail="Asset already exists on subscription line items for this sag")
next_line_row = execute_query_single(
"SELECT COALESCE(MAX(line_no), 0) + 1 AS next_line_no FROM sag_subscription_items WHERE subscription_id = %s",
(subscription_id,),
)
next_line_no = int((next_line_row or {}).get("next_line_no") or 1)
execute_query(
"""
INSERT INTO sag_subscription_items (
subscription_id,
line_no,
product_id,
asset_id,
description,
quantity,
unit_price,
line_total,
period_from,
period_to,
price_type,
custom_price_override,
requires_serial_number,
serial_number,
billing_blocked,
billing_block_reason
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
subscription_id,
next_line_no,
None,
hardware_id,
f"Drift - {hardware_label}",
1,
float(payload.operations_monthly_price),
float(payload.operations_monthly_price),
payload.start_date,
None,
"manual",
True,
False,
hardware.get("serial_number"),
False,
None,
),
)
execute_query(
"""
UPDATE sag_subscriptions
SET price = COALESCE(price, 0) + %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""",
(float(payload.operations_monthly_price), subscription_id),
)
else:
subscription_payload = {
"customer_id": payload.customer_id,
"sag_id": payload.sag_id,
"product_name": f"Udlejning - {hardware_label}",
"price": float(payload.operations_monthly_price),
"billing_interval": "monthly",
"billing_day": min(max(payload.start_date.day, 1), 28),
"start_date": payload.start_date.isoformat(),
"end_date": None,
"binding_months": 0,
"billing_direction": "forward",
"price_type": "manual",
"custom_price_override": True,
"first_invoice_policy": "next_cycle",
"invoice_merge_key": f"rental-{payload.customer_id}-{payload.sag_id}",
"notes": f"Quick rent created from hardware #{hardware_id}",
"line_items": [
{
"description": f"Drift - {hardware_label}",
"quantity": 1,
"unit_price": float(payload.operations_monthly_price),
"asset_id": hardware_id,
"serial_number": hardware.get("serial_number"),
"price_type": "manual",
"custom_price_override": True,
}
],
}
subscription = await create_sag_subscription(subscription_payload)
subscription_id = int(subscription.get("id") or 0)
if subscription_id <= 0:
raise HTTPException(status_code=500, detail="Subscription creation did not return an id")
created_new_subscription = True
binding = await create_sag_asset_binding(
subscription_id,
{
"asset_id": hardware_id,
"start_date": payload.start_date.isoformat(),
"end_date": None,
"binding_months": 0,
"shared_binding_key": f"rental-{payload.customer_id}-{payload.sag_id}",
"notice_period_days": payload.notice_period_days,
"sag_id": payload.sag_id,
"created_by_user_id": payload.created_by_user_id,
},
)
startup_lines: List[Dict[str, Any]] = []
def _append_line(key: str, description: str, quantity: float, unit_price: float) -> None:
if quantity <= 0 or unit_price < 0:
return
startup_lines.append(
{
"line_key": key,
"source_type": "manual",
"source_id": hardware_id,
"description": description,
"quantity": float(quantity),
"unit_price": float(unit_price),
"discount_percentage": 0,
"unit": "stk",
"product_id": None,
"selected": True,
}
)
_append_line("rental-start", f"Start - {hardware_label}", 1, float(payload.start_price))
_append_line("rental-freight", "Fragt", 1, float(payload.freight_price))
_append_line("rental-preparation", "Klargoring", 1, float(payload.preparation_price))
_append_line(
"rental-operations-initial",
f"Drift (forste {payload.initial_operations_months} maned(er))",
float(payload.initial_operations_months),
float(payload.operations_monthly_price),
)
if not startup_lines:
raise HTTPException(status_code=400, detail="At least one startup/order line must have a price")
coverage_end = payload.start_date + relativedelta(months=payload.initial_operations_months)
order_result = execute_query(
"""
INSERT INTO ordre_drafts (
title,
customer_id,
lines_json,
notes,
coverage_start,
coverage_end,
billing_direction,
source_subscription_ids,
invoice_aggregate_key,
layout_number,
created_by_user_id,
sync_status,
export_status_json,
updated_at
) VALUES (%s, %s, %s::jsonb, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, CURRENT_TIMESTAMP)
RETURNING id, title, customer_id, sync_status, created_at
""",
(
f"Udlejning opstart: {hardware_label}",
payload.customer_id,
json.dumps(startup_lines, ensure_ascii=False),
(
f"Quick rent startup order for {hardware_label}\n"
f"Sag: #{payload.sag_id}\n"
f"Subscription: #{subscription_id}\n"
f"Asset: #{hardware_id}"
),
payload.start_date,
coverage_end,
"forward",
[subscription_id],
f"quick-rent-{subscription_id}-{hardware_id}-{payload.start_date.isoformat()}",
1,
payload.created_by_user_id,
"pending",
json.dumps({"source": "quick_rent", "subscription_id": subscription_id}, ensure_ascii=False),
),
)
order_draft = (order_result or [None])[0]
return {
"status": "ok",
"hardware_id": hardware_id,
"subscription_id": subscription_id,
"created_new_subscription": created_new_subscription,
"asset_binding_id": binding.get("id") if isinstance(binding, dict) else None,
"ordre_draft_id": order_draft.get("id") if order_draft else None,
"message": "Rental flow completed: subscription, binding and startup order draft created",
}
@router.put("/subscriptions/{subscription_id}", response_model=Dict[str, Any])
async def update_subscription_alias(subscription_id: int, payload: SubscriptionUpdateInput):
body = payload.model_dump(exclude_none=True)
current = execute_query_single(
"SELECT id, start_date, end_date, price_type, custom_price_override FROM sag_subscriptions WHERE id = %s",
(subscription_id,),
)
if not current:
raise HTTPException(status_code=404, detail="Subscription not found")
effective_start = payload.start_date or current.get("start_date")
effective_end = payload.end_date if payload.end_date is not None else current.get("end_date")
effective_price_type = payload.price_type or current.get("price_type") or "manual"
effective_override = (
payload.custom_price_override
if payload.custom_price_override is not None
else bool(current.get("custom_price_override"))
)
if "line_items" in body:
normalized_lines = _normalize_line_items(
payload.line_items or [],
default_price_type=effective_price_type,
default_custom_override=effective_override,
)
for line in normalized_lines:
asset_id = line.get("asset_id")
if asset_id is None:
continue
_assert_asset_booking_available(
int(asset_id),
start_date=effective_start,
end_date=effective_end,
exclude_subscription_id=subscription_id,
)
body["line_items"] = normalized_lines
if isinstance(body.get("start_date"), date):
body["start_date"] = body["start_date"].isoformat()
if isinstance(body.get("end_date"), date):
body["end_date"] = body["end_date"].isoformat()
return await update_sag_subscription(subscription_id, body)
@router.post("/subscriptions/{subscription_id}/asset-bindings", response_model=Dict[str, Any])
async def create_subscription_asset_binding_alias(subscription_id: int, payload: AssetBindingCreateInput):
_assert_asset_booking_available(
asset_id=payload.asset_id,
start_date=payload.start_date,
end_date=payload.end_date,
exclude_subscription_id=subscription_id,
)
end_date = payload.end_date
if end_date is None and payload.binding_months > 0:
end_date = payload.start_date + relativedelta(months=payload.binding_months)
row = execute_query(
"""
INSERT INTO subscription_asset_bindings (
subscription_id,
asset_id,
shared_binding_key,
binding_months,
start_date,
end_date,
notice_period_days,
status,
sag_id,
created_by_user_id
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'active', %s, %s)
RETURNING *
""",
(
subscription_id,
payload.asset_id,
payload.shared_binding_key,
payload.binding_months,
payload.start_date,
end_date,
payload.notice_period_days,
payload.sag_id,
payload.created_by_user_id,
),
)
if not row:
raise HTTPException(status_code=500, detail="Failed to create binding")
return row[0]
@router.get("/invoices", response_model=List[Dict[str, Any]])
async def list_invoices_alias(
status: str = Query("all"),
customer_id: Optional[int] = Query(default=None),
limit: int = Query(default=100, ge=1, le=500),
):
where = ["1=1"]
params: List[Any] = []
if status != "all":
where.append("d.sync_status = %s")
params.append(status)
if customer_id is not None:
where.append("d.customer_id = %s")
params.append(customer_id)
params.append(limit)
rows = execute_query(
f"""
SELECT
d.id,
d.customer_id,
c.name AS customer_name,
d.title,
d.invoice_date,
d.due_date,
d.coverage_start,
d.coverage_end,
d.total_amount,
d.sync_status AS status,
d.economic_order_number,
d.economic_invoice_number,
d.last_sync_at,
d.created_at,
d.updated_at
FROM ordre_drafts d
LEFT JOIN customers c ON c.id = d.customer_id
WHERE {' AND '.join(where)}
ORDER BY d.created_at DESC, d.id DESC
LIMIT %s
""",
tuple(params),
) or []
return rows
@router.post("/invoices/generate", response_model=Dict[str, Any])
async def generate_invoices_alias(payload: InvoiceGenerateInput | None = None):
"""Generate invoices globally or scoped by customer, with preview support."""
req = payload or InvoiceGenerateInput()
as_of = req.as_of or date.today()
economic_safety = _economic_safety_state()
if req.push_to_economic:
raise HTTPException(
status_code=409,
detail={
"message": "Direct e-conomic sync is blocked in this alias endpoint to protect live accounting.",
"action": "Use draft generation here and run dedicated sync flow manually.",
"safety": economic_safety,
},
)
if req.preview:
preview = _build_due_subscription_preview(as_of=as_of, customer_id=req.customer_id)
return {"status": "preview", "economic_sync_attempted": False, "safety": economic_safety, **preview}
if req.customer_id is not None:
try:
result = _generate_due_invoices_for_customer(as_of=as_of, customer_id=req.customer_id)
result["economic_sync_attempted"] = False
result["safety"] = economic_safety
return result
except Exception as exc:
logger.error("Failed customer-scoped invoice generation: %s", exc, exc_info=True)
raise HTTPException(status_code=500, detail=f"Invoice generation failed: {exc}")
try:
await process_subscriptions()
return {
"status": "ok",
"message": "Invoice generation job completed (drafts only, no e-conomic sync)",
"economic_sync_attempted": False,
"safety": economic_safety,
}
except Exception as exc:
logger.error("Failed running invoice generation job: %s", exc, exc_info=True)
raise HTTPException(status_code=500, detail=f"Invoice generation failed: {exc}")
@router.get("/invoices/sync-safety", response_model=Dict[str, Any])
async def get_invoice_sync_safety_status(
customer_id: Optional[int] = Query(default=None),
):
"""Read-only safety overview for invoice sync readiness. Does not call e-conomic."""
safety = _economic_safety_state()
where = ["d.deleted_at IS NULL"]
params: List[Any] = []
if customer_id is not None:
where.append("d.customer_id = %s")
params.append(customer_id)
stats_rows = execute_query(
f"""
SELECT
COALESCE(d.sync_status, 'unknown') AS sync_status,
COUNT(*)::int AS count
FROM ordre_drafts d
WHERE {' AND '.join(where)}
GROUP BY COALESCE(d.sync_status, 'unknown')
ORDER BY 1
""",
tuple(params),
) or []
sample_rows = execute_query(
f"""
SELECT
d.id,
d.customer_id,
c.name AS customer_name,
d.title,
d.sync_status,
d.economic_order_number,
d.economic_invoice_number,
d.last_sync_at,
d.created_at
FROM ordre_drafts d
LEFT JOIN customers c ON c.id = d.customer_id
WHERE {' AND '.join(where)}
AND COALESCE(d.sync_status, 'pending') IN ('pending', 'failed', 'exported')
ORDER BY d.created_at DESC, d.id DESC
LIMIT 20
""",
tuple(params),
) or []
stats: Dict[str, int] = {row["sync_status"]: int(row["count"]) for row in stats_rows}
draft_sync_allowed = not (safety["ordre_economic_read_only"] or safety["ordre_economic_dry_run"])
return {
"status": "ok",
"mode": "live_write_enabled" if draft_sync_allowed else "safe_no_write",
"customer_id": customer_id,
"safety": safety,
"sync_write_allowed": draft_sync_allowed,
"sync_write_block_reason": None if draft_sync_allowed else "ORDRE_ECONOMIC_READ_ONLY or ORDRE_ECONOMIC_DRY_RUN is enabled",
"stats": stats,
"candidate_count": len(sample_rows),
"candidates": sample_rows,
}
@router.get("/invoices/{draft_id}/sync-preview", response_model=Dict[str, Any])
async def preview_invoice_sync(draft_id: int):
"""Preview sync intent for one draft without sending data to e-conomic."""
safety = _economic_safety_state()
draft = execute_query_single(
"""
SELECT
d.id,
d.customer_id,
c.name AS customer_name,
d.title,
d.sync_status,
d.economic_order_number,
d.economic_invoice_number,
d.last_sync_at,
d.created_at,
d.updated_at
FROM ordre_drafts d
LEFT JOIN customers c ON c.id = d.customer_id
WHERE d.id = %s
AND d.deleted_at IS NULL
""",
(draft_id,),
)
if not draft:
raise HTTPException(status_code=404, detail="Invoice draft not found")
next_action = "none"
if draft.get("sync_status") in {"pending", "failed"}:
next_action = "export_order"
elif draft.get("sync_status") == "exported":
next_action = "book_invoice"
draft_sync_allowed = not (safety["ordre_economic_read_only"] or safety["ordre_economic_dry_run"])
return {
"status": "preview",
"draft": draft,
"next_action": next_action,
"will_sync": False,
"safety": safety,
"sync_write_allowed": draft_sync_allowed,
"sync_write_block_reason": None if draft_sync_allowed else "ORDRE_ECONOMIC_READ_ONLY or ORDRE_ECONOMIC_DRY_RUN is enabled",
"message": "Preview only. No e-conomic API call has been made.",
}