- Introduced Technician Dashboard V1 (tech_v1_overview.html) with KPI cards and new cases overview. - Implemented Technician Dashboard V2 (tech_v2_workboard.html) featuring a workboard layout for daily tasks and opportunities. - Developed Technician Dashboard V3 (tech_v3_table_focus.html) with a power table for detailed case management. - Created a dashboard selector page (technician_dashboard_selector.html) for easy navigation between dashboard versions. - Added user dashboard preferences migration (130_user_dashboard_preferences.sql) to store default dashboard paths. - Enhanced sag_sager table with assigned group ID (131_sag_assignment_group.sql) for better case management. - Updated sag_subscriptions table to include cancellation rules and billing dates (132_subscription_cancellation.sql, 134_subscription_billing_dates.sql). - Implemented subscription staging for CRM integration (136_simply_subscription_staging.sql). - Added a script to move time tracking section in detail view (move_time_section.py). - Created a test script for subscription processing (test_subscription_processing.py).
1013 lines
35 KiB
Python
1013 lines
35 KiB
Python
"""
|
|
Products API
|
|
"""
|
|
from fastapi import APIRouter, HTTPException, Query, Depends
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
from app.core.database import execute_query, execute_query_single
|
|
from app.core.config import settings
|
|
from app.core.auth_dependencies import require_permission
|
|
import logging
|
|
import os
|
|
import aiohttp
|
|
import json
|
|
import asyncio
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter()
|
|
|
|
|
|
def _apigw_headers() -> Dict[str, str]:
|
|
token = settings.APIGW_TOKEN or os.getenv("APIGW_TOKEN") or os.getenv("APIGATEWAY_TOKEN")
|
|
if not token:
|
|
raise HTTPException(status_code=400, detail="APIGW_TOKEN is not configured")
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
def _apigw_base_url() -> str:
|
|
base_url = (
|
|
settings.APIGW_BASE_URL
|
|
or settings.APIGATEWAY_URL
|
|
or os.getenv("APIGW_BASE_URL")
|
|
or os.getenv("APIGATEWAY_URL")
|
|
or ""
|
|
).strip()
|
|
if not base_url:
|
|
raise HTTPException(status_code=500, detail="API Gateway base URL is not configured")
|
|
return base_url.rstrip("/")
|
|
|
|
|
|
def _extract_error_detail(payload: Any) -> str:
|
|
if payload is None:
|
|
return ""
|
|
|
|
if isinstance(payload, str):
|
|
return payload.strip()
|
|
|
|
if isinstance(payload, dict):
|
|
for key in ("detail", "message", "error", "description"):
|
|
value = payload.get(key)
|
|
if isinstance(value, str) and value.strip():
|
|
return value.strip()
|
|
return json.dumps(payload, ensure_ascii=False)
|
|
|
|
if isinstance(payload, list):
|
|
parts = [_extract_error_detail(item) for item in payload]
|
|
cleaned = [part for part in parts if part]
|
|
return "; ".join(cleaned)
|
|
|
|
return str(payload).strip()
|
|
|
|
|
|
async def _read_apigw_error(response: aiohttp.ClientResponse) -> str:
|
|
try:
|
|
body = await response.json(content_type=None)
|
|
detail = _extract_error_detail(body)
|
|
if detail:
|
|
return detail
|
|
except Exception:
|
|
pass
|
|
|
|
text = (await response.text() or "").strip()
|
|
if text:
|
|
return text
|
|
|
|
return f"API Gateway request failed (HTTP {response.status})"
|
|
|
|
|
|
def _upsert_product_supplier(product_id: int, payload: Dict[str, Any], source: str = "manual") -> Dict[str, Any]:
|
|
supplier_name = payload.get("supplier_name")
|
|
supplier_code = payload.get("supplier_code")
|
|
supplier_sku = payload.get("supplier_sku") or payload.get("sku")
|
|
supplier_price = payload.get("supplier_price") or payload.get("price")
|
|
supplier_currency = payload.get("supplier_currency") or payload.get("currency") or "DKK"
|
|
supplier_stock = payload.get("supplier_stock") or payload.get("stock_qty")
|
|
supplier_url = payload.get("supplier_url") or payload.get("supplier_link")
|
|
supplier_product_url = (
|
|
payload.get("supplier_product_url")
|
|
or payload.get("product_url")
|
|
or payload.get("product_link")
|
|
or payload.get("url")
|
|
)
|
|
|
|
match_query = None
|
|
match_params = None
|
|
if supplier_code and supplier_sku:
|
|
match_query = """
|
|
SELECT * FROM product_suppliers
|
|
WHERE product_id = %s AND supplier_code = %s AND supplier_sku = %s
|
|
LIMIT 1
|
|
"""
|
|
match_params = (product_id, supplier_code, supplier_sku)
|
|
elif supplier_url:
|
|
match_query = """
|
|
SELECT * FROM product_suppliers
|
|
WHERE product_id = %s AND supplier_url = %s
|
|
LIMIT 1
|
|
"""
|
|
match_params = (product_id, supplier_url)
|
|
elif supplier_name and supplier_sku:
|
|
match_query = """
|
|
SELECT * FROM product_suppliers
|
|
WHERE product_id = %s AND supplier_name = %s AND supplier_sku = %s
|
|
LIMIT 1
|
|
"""
|
|
match_params = (product_id, supplier_name, supplier_sku)
|
|
|
|
existing = execute_query_single(match_query, match_params) if match_query else None
|
|
|
|
if existing:
|
|
update_query = """
|
|
UPDATE product_suppliers
|
|
SET supplier_name = %s,
|
|
supplier_code = %s,
|
|
supplier_sku = %s,
|
|
supplier_price = %s,
|
|
supplier_currency = %s,
|
|
supplier_stock = %s,
|
|
supplier_url = %s,
|
|
supplier_product_url = %s,
|
|
source = %s,
|
|
last_updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
RETURNING *
|
|
"""
|
|
result = execute_query(
|
|
update_query,
|
|
(
|
|
supplier_name,
|
|
supplier_code,
|
|
supplier_sku,
|
|
supplier_price,
|
|
supplier_currency,
|
|
supplier_stock,
|
|
supplier_url,
|
|
supplier_product_url,
|
|
source,
|
|
existing.get("id"),
|
|
)
|
|
)
|
|
return result[0] if result else existing
|
|
|
|
insert_query = """
|
|
INSERT INTO product_suppliers (
|
|
product_id,
|
|
supplier_name,
|
|
supplier_code,
|
|
supplier_sku,
|
|
supplier_price,
|
|
supplier_currency,
|
|
supplier_stock,
|
|
supplier_url,
|
|
supplier_product_url,
|
|
source,
|
|
last_updated_at
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
|
|
RETURNING *
|
|
"""
|
|
result = execute_query(
|
|
insert_query,
|
|
(
|
|
product_id,
|
|
supplier_name,
|
|
supplier_code,
|
|
supplier_sku,
|
|
supplier_price,
|
|
supplier_currency,
|
|
supplier_stock,
|
|
supplier_url,
|
|
supplier_product_url,
|
|
source,
|
|
)
|
|
)
|
|
return result[0] if result else {}
|
|
|
|
|
|
def _log_product_audit(product_id: int, event_type: str, user_id: Optional[int], changes: Dict[str, Any]) -> None:
|
|
audit_query = """
|
|
INSERT INTO product_audit_log (product_id, event_type, user_id, changes)
|
|
VALUES (%s, %s, %s, %s)
|
|
"""
|
|
execute_query(audit_query, (product_id, event_type, user_id, json.dumps(changes)))
|
|
|
|
|
|
def _normalize_query(raw_query: str) -> Tuple[str, List[str]]:
|
|
normalized = " ".join(
|
|
"".join(ch.lower() if ch.isalnum() else " " for ch in raw_query).split()
|
|
)
|
|
tokens = [token for token in normalized.split() if len(token) > 1]
|
|
return normalized, tokens
|
|
|
|
|
|
def _score_apigw_product(product: Dict[str, Any], normalized_query: str, tokens: List[str]) -> int:
|
|
if not normalized_query and not tokens:
|
|
return 0
|
|
|
|
name = str(product.get("product_name") or product.get("name") or "")
|
|
sku = str(product.get("sku") or "")
|
|
manufacturer = str(product.get("manufacturer") or "")
|
|
category = str(product.get("category") or "")
|
|
supplier = str(product.get("supplier_name") or "")
|
|
|
|
haystack = " ".join(
|
|
" ".join("".join(ch.lower() if ch.isalnum() else " " for ch in value).split())
|
|
for value in (name, sku, manufacturer, category, supplier)
|
|
if value
|
|
)
|
|
|
|
score = 0
|
|
if normalized_query and normalized_query in haystack:
|
|
score += 100
|
|
|
|
if tokens:
|
|
if all(token in haystack for token in tokens):
|
|
score += 50
|
|
for token in tokens:
|
|
if token in name.lower():
|
|
score += 5
|
|
elif token in haystack:
|
|
score += 2
|
|
|
|
if sku and sku.lower() == normalized_query:
|
|
score += 120
|
|
|
|
return score
|
|
|
|
|
|
@router.get("/products/apigateway/search", response_model=Dict[str, Any])
|
|
async def search_apigw_products(
|
|
q: Optional[str] = Query(None),
|
|
supplier_code: Optional[str] = Query(None),
|
|
min_price: Optional[float] = Query(None),
|
|
max_price: Optional[float] = Query(None),
|
|
in_stock: Optional[bool] = Query(None),
|
|
category: Optional[str] = Query(None),
|
|
manufacturer: Optional[str] = Query(None),
|
|
sort: Optional[str] = Query(None),
|
|
page: Optional[int] = Query(None),
|
|
per_page: Optional[int] = Query(None),
|
|
):
|
|
"""Search products via API Gateway and return raw results."""
|
|
params: Dict[str, Any] = {}
|
|
if q:
|
|
params["q"] = q
|
|
if supplier_code:
|
|
params["supplier_code"] = supplier_code
|
|
if min_price is not None:
|
|
params["min_price"] = min_price
|
|
if max_price is not None:
|
|
params["max_price"] = max_price
|
|
if in_stock is not None:
|
|
params["in_stock"] = str(in_stock).lower()
|
|
if category:
|
|
params["category"] = category
|
|
if manufacturer:
|
|
params["manufacturer"] = manufacturer
|
|
if sort:
|
|
params["sort"] = sort
|
|
if page is not None:
|
|
params["page"] = page
|
|
if per_page is not None:
|
|
params["per_page"] = per_page
|
|
|
|
if not params:
|
|
raise HTTPException(status_code=400, detail="Provide at least one search parameter")
|
|
|
|
url = f"{_apigw_base_url()}/api/v1/products/search"
|
|
logger.info("🔍 APIGW product search: %s", params)
|
|
|
|
timeout = aiohttp.ClientTimeout(total=settings.APIGW_TIMEOUT_SECONDS)
|
|
try:
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
async with session.get(url, headers=_apigw_headers(), params=params) as response:
|
|
if response.status >= 400:
|
|
detail = await _read_apigw_error(response)
|
|
raise HTTPException(
|
|
status_code=502,
|
|
detail=f"API Gateway product search failed ({response.status}): {detail}",
|
|
)
|
|
data = await response.json()
|
|
|
|
if q and isinstance(data, dict) and isinstance(data.get("products"), list):
|
|
normalized_query, tokens = _normalize_query(q)
|
|
data["products"].sort(
|
|
key=lambda product: _score_apigw_product(product, normalized_query, tokens),
|
|
reverse=True,
|
|
)
|
|
|
|
return data
|
|
except HTTPException:
|
|
raise
|
|
except asyncio.TimeoutError:
|
|
logger.error("❌ APIGW product search timeout for params: %s", params, exc_info=True)
|
|
raise HTTPException(status_code=504, detail="API Gateway product search timed out")
|
|
except aiohttp.ClientError as e:
|
|
logger.error("❌ APIGW product search connection error: %s", e, exc_info=True)
|
|
raise HTTPException(status_code=502, detail=f"API Gateway connection failed: {e}")
|
|
except Exception as e:
|
|
logger.error("❌ Error searching APIGW products: %s", e, exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/products/apigateway/import", response_model=Dict[str, Any])
|
|
async def import_apigw_product(payload: Dict[str, Any]):
|
|
"""Import a single APIGW product into local catalog."""
|
|
try:
|
|
product = payload.get("product") or payload
|
|
name = (product.get("product_name") or product.get("name") or "").strip()
|
|
if not name:
|
|
raise HTTPException(status_code=400, detail="product_name is required")
|
|
|
|
supplier_code = product.get("supplier_code")
|
|
sku = product.get("sku")
|
|
sku_internal = f"{supplier_code}:{sku}" if supplier_code and sku else sku
|
|
|
|
if sku_internal:
|
|
existing = execute_query_single(
|
|
"SELECT * FROM products WHERE sku_internal = %s AND deleted_at IS NULL",
|
|
(sku_internal,)
|
|
)
|
|
if existing:
|
|
_upsert_product_supplier(existing["id"], product, source="gateway")
|
|
return existing
|
|
|
|
sales_price = product.get("price")
|
|
supplier_price = product.get("price")
|
|
|
|
insert_query = """
|
|
INSERT INTO products (
|
|
name,
|
|
short_description,
|
|
type,
|
|
status,
|
|
sku_internal,
|
|
ean,
|
|
manufacturer,
|
|
supplier_name,
|
|
supplier_sku,
|
|
supplier_price,
|
|
supplier_currency,
|
|
supplier_stock,
|
|
sales_price,
|
|
vat_rate,
|
|
billable
|
|
) VALUES (
|
|
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
|
|
)
|
|
RETURNING *
|
|
"""
|
|
params = (
|
|
name,
|
|
product.get("category"),
|
|
"hardware",
|
|
"active",
|
|
sku_internal,
|
|
product.get("ean"),
|
|
product.get("manufacturer"),
|
|
product.get("supplier_name"),
|
|
sku,
|
|
supplier_price,
|
|
product.get("currency") or "DKK",
|
|
product.get("stock_qty"),
|
|
sales_price,
|
|
25.00,
|
|
True,
|
|
)
|
|
result = execute_query(insert_query, params)
|
|
created = result[0] if result else {}
|
|
if created:
|
|
_upsert_product_supplier(created["id"], product, source="gateway")
|
|
return created
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("❌ Error importing APIGW product: %s", e, exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/products", response_model=List[Dict[str, Any]])
|
|
async def list_products(
|
|
status: Optional[str] = Query("active"),
|
|
q: Optional[str] = Query(None),
|
|
product_type: Optional[str] = Query(None, alias="type"),
|
|
manufacturer: Optional[str] = Query(None),
|
|
supplier_name: Optional[str] = Query(None),
|
|
sku: Optional[str] = Query(None),
|
|
ean: Optional[str] = Query(None),
|
|
billable: Optional[bool] = Query(None),
|
|
is_bundle: Optional[bool] = Query(None),
|
|
min_price: Optional[float] = Query(None),
|
|
max_price: Optional[float] = Query(None),
|
|
):
|
|
"""List products with optional search and filters."""
|
|
try:
|
|
conditions = ["deleted_at IS NULL"]
|
|
params = []
|
|
|
|
if status and status.lower() != "all":
|
|
conditions.append("status = %s")
|
|
params.append(status)
|
|
|
|
if q:
|
|
like = f"%{q.strip()}%"
|
|
conditions.append(
|
|
"(name ILIKE %s OR sku_internal ILIKE %s OR ean ILIKE %s OR manufacturer ILIKE %s OR supplier_name ILIKE %s)"
|
|
)
|
|
params.extend([like, like, like, like, like])
|
|
|
|
if product_type:
|
|
conditions.append("type = %s")
|
|
params.append(product_type)
|
|
|
|
if manufacturer:
|
|
conditions.append("manufacturer ILIKE %s")
|
|
params.append(f"%{manufacturer.strip()}%")
|
|
|
|
if supplier_name:
|
|
conditions.append("supplier_name ILIKE %s")
|
|
params.append(f"%{supplier_name.strip()}%")
|
|
|
|
if sku:
|
|
conditions.append("sku_internal ILIKE %s")
|
|
params.append(f"%{sku.strip()}%")
|
|
|
|
if ean:
|
|
conditions.append("ean ILIKE %s")
|
|
params.append(f"%{ean.strip()}%")
|
|
|
|
if billable is not None:
|
|
conditions.append("billable = %s")
|
|
params.append(billable)
|
|
|
|
if is_bundle is not None:
|
|
conditions.append("is_bundle = %s")
|
|
params.append(is_bundle)
|
|
|
|
if min_price is not None:
|
|
conditions.append("sales_price >= %s")
|
|
params.append(min_price)
|
|
|
|
if max_price is not None:
|
|
conditions.append("sales_price <= %s")
|
|
params.append(max_price)
|
|
|
|
where_clause = "WHERE " + " AND ".join(conditions)
|
|
|
|
query = f"""
|
|
SELECT
|
|
id,
|
|
uuid,
|
|
name,
|
|
short_description,
|
|
type,
|
|
status,
|
|
sku_internal,
|
|
ean,
|
|
manufacturer,
|
|
supplier_name,
|
|
supplier_price,
|
|
cost_price,
|
|
sales_price,
|
|
vat_rate,
|
|
billing_period,
|
|
auto_renew,
|
|
minimum_term_months,
|
|
is_bundle,
|
|
billable,
|
|
image_url
|
|
FROM products
|
|
{where_clause}
|
|
ORDER BY name ASC
|
|
"""
|
|
return execute_query(query, tuple(params)) or []
|
|
except Exception as e:
|
|
logger.error(f"❌ Error listing products: {e}", exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/products", response_model=Dict[str, Any])
|
|
async def create_product(payload: Dict[str, Any]):
|
|
"""Create a product."""
|
|
try:
|
|
name = (payload.get("name") or "").strip()
|
|
if not name:
|
|
raise HTTPException(status_code=400, detail="name is required")
|
|
|
|
query = """
|
|
INSERT INTO products (
|
|
name,
|
|
short_description,
|
|
long_description,
|
|
type,
|
|
status,
|
|
sku_internal,
|
|
ean,
|
|
er_number,
|
|
manufacturer,
|
|
manufacturer_sku,
|
|
supplier_id,
|
|
supplier_name,
|
|
supplier_sku,
|
|
supplier_price,
|
|
supplier_currency,
|
|
supplier_stock,
|
|
supplier_lead_time_days,
|
|
supplier_updated_at,
|
|
cost_price,
|
|
sales_price,
|
|
vat_rate,
|
|
price_model,
|
|
price_override_allowed,
|
|
billing_period,
|
|
billing_anchor_month,
|
|
auto_renew,
|
|
minimum_term_months,
|
|
subscription_group_id,
|
|
is_bundle,
|
|
parent_product_id,
|
|
bundle_pricing_model,
|
|
billable,
|
|
default_case_tag,
|
|
default_time_rate_id,
|
|
category_id,
|
|
subcategory_id,
|
|
tags,
|
|
attributes_json,
|
|
technical_spec_json,
|
|
ai_classified,
|
|
ai_confidence,
|
|
ai_category_suggestion,
|
|
ai_tags_suggestion,
|
|
ai_classified_at,
|
|
image_url,
|
|
datasheet_url,
|
|
manual_url,
|
|
created_by,
|
|
updated_by
|
|
) VALUES (
|
|
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s, %s, %s, %s, %s
|
|
)
|
|
RETURNING *
|
|
"""
|
|
params = (
|
|
name,
|
|
payload.get("short_description"),
|
|
payload.get("long_description"),
|
|
payload.get("type"),
|
|
payload.get("status", "active"),
|
|
payload.get("sku_internal"),
|
|
payload.get("ean"),
|
|
payload.get("er_number"),
|
|
payload.get("manufacturer"),
|
|
payload.get("manufacturer_sku"),
|
|
payload.get("supplier_id"),
|
|
payload.get("supplier_name"),
|
|
payload.get("supplier_sku"),
|
|
payload.get("supplier_price"),
|
|
payload.get("supplier_currency", "DKK"),
|
|
payload.get("supplier_stock"),
|
|
payload.get("supplier_lead_time_days"),
|
|
payload.get("supplier_updated_at"),
|
|
payload.get("cost_price"),
|
|
payload.get("sales_price"),
|
|
payload.get("vat_rate", 25.00),
|
|
payload.get("price_model"),
|
|
payload.get("price_override_allowed", False),
|
|
payload.get("billing_period"),
|
|
payload.get("billing_anchor_month"),
|
|
payload.get("auto_renew", False),
|
|
payload.get("minimum_term_months"),
|
|
payload.get("subscription_group_id"),
|
|
payload.get("is_bundle", False),
|
|
payload.get("parent_product_id"),
|
|
payload.get("bundle_pricing_model"),
|
|
payload.get("billable", True),
|
|
payload.get("default_case_tag"),
|
|
payload.get("default_time_rate_id"),
|
|
payload.get("category_id"),
|
|
payload.get("subcategory_id"),
|
|
payload.get("tags"),
|
|
payload.get("attributes_json"),
|
|
payload.get("technical_spec_json"),
|
|
payload.get("ai_classified", False),
|
|
payload.get("ai_confidence"),
|
|
payload.get("ai_category_suggestion"),
|
|
payload.get("ai_tags_suggestion"),
|
|
payload.get("ai_classified_at"),
|
|
payload.get("image_url"),
|
|
payload.get("datasheet_url"),
|
|
payload.get("manual_url"),
|
|
payload.get("created_by"),
|
|
payload.get("updated_by"),
|
|
)
|
|
result = execute_query(query, params)
|
|
return result[0] if result else {}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error creating product: {e}", exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/products/{product_id}", response_model=Dict[str, Any])
|
|
async def get_product(product_id: int):
|
|
"""Get a single product."""
|
|
try:
|
|
query = "SELECT * FROM products WHERE id = %s AND deleted_at IS NULL"
|
|
product = execute_query_single(query, (product_id,))
|
|
if not product:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
return product
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error loading product: {e}", exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.patch("/products/{product_id}", response_model=Dict[str, Any])
|
|
async def update_product(
|
|
product_id: int,
|
|
payload: Dict[str, Any],
|
|
current_user: dict = Depends(require_permission("products.update"))
|
|
):
|
|
"""Update product fields like name."""
|
|
try:
|
|
name = payload.get("name")
|
|
if name is not None:
|
|
name = name.strip()
|
|
if not name:
|
|
raise HTTPException(status_code=400, detail="name cannot be empty")
|
|
|
|
existing = execute_query_single(
|
|
"SELECT name FROM products WHERE id = %s AND deleted_at IS NULL",
|
|
(product_id,)
|
|
)
|
|
if not existing:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
query = """
|
|
UPDATE products
|
|
SET name = COALESCE(%s, name),
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s AND deleted_at IS NULL
|
|
RETURNING *
|
|
"""
|
|
result = execute_query(query, (name, product_id))
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
if name is not None and name != existing.get("name"):
|
|
_log_product_audit(
|
|
product_id,
|
|
"name_updated",
|
|
current_user.get("id") if current_user else None,
|
|
{"old": existing.get("name"), "new": name}
|
|
)
|
|
return result[0]
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("❌ Error updating product: %s", e, exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.delete("/products/{product_id}", response_model=Dict[str, Any])
|
|
async def delete_product(
|
|
product_id: int,
|
|
current_user: dict = Depends(require_permission("products.delete"))
|
|
):
|
|
"""Soft-delete a product."""
|
|
try:
|
|
query = """
|
|
UPDATE products
|
|
SET deleted_at = CURRENT_TIMESTAMP,
|
|
status = 'inactive',
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s AND deleted_at IS NULL
|
|
RETURNING id
|
|
"""
|
|
result = execute_query(query, (product_id,))
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
_log_product_audit(
|
|
product_id,
|
|
"deleted",
|
|
current_user.get("id") if current_user else None,
|
|
{"status": "inactive"}
|
|
)
|
|
return {"status": "deleted", "id": result[0].get("id")}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("❌ Error deleting product: %s", e, exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/products/{product_id}/suppliers", response_model=List[Dict[str, Any]])
|
|
async def list_product_suppliers(product_id: int):
|
|
"""List suppliers for a product."""
|
|
try:
|
|
query = """
|
|
SELECT
|
|
id,
|
|
product_id,
|
|
supplier_name,
|
|
supplier_code,
|
|
supplier_sku,
|
|
supplier_price,
|
|
supplier_currency,
|
|
supplier_stock,
|
|
supplier_url,
|
|
supplier_product_url,
|
|
source,
|
|
last_updated_at
|
|
FROM product_suppliers
|
|
WHERE product_id = %s
|
|
ORDER BY supplier_name NULLS LAST, supplier_price NULLS LAST
|
|
"""
|
|
return execute_query(query, (product_id,)) or []
|
|
except Exception as e:
|
|
logger.error("❌ Error loading product suppliers: %s", e, exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/products/{product_id}/suppliers", response_model=Dict[str, Any])
|
|
async def upsert_product_supplier(product_id: int, payload: Dict[str, Any]):
|
|
"""Create or update a product supplier."""
|
|
try:
|
|
return _upsert_product_supplier(product_id, payload, source=payload.get("source") or "manual")
|
|
except Exception as e:
|
|
logger.error("❌ Error saving product supplier: %s", e, exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.delete("/products/{product_id}/suppliers/{supplier_id}", response_model=Dict[str, Any])
|
|
async def delete_product_supplier(product_id: int, supplier_id: int):
|
|
"""Delete a product supplier."""
|
|
try:
|
|
query = "DELETE FROM product_suppliers WHERE id = %s AND product_id = %s"
|
|
execute_query(query, (supplier_id, product_id))
|
|
return {"status": "deleted"}
|
|
except Exception as e:
|
|
logger.error("❌ Error deleting product supplier: %s", e, exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/products/{product_id}/suppliers/refresh", response_model=Dict[str, Any])
|
|
async def refresh_product_suppliers(product_id: int):
|
|
"""Refresh suppliers from API Gateway using EAN only."""
|
|
try:
|
|
product = execute_query_single(
|
|
"SELECT ean FROM products WHERE id = %s AND deleted_at IS NULL",
|
|
(product_id,),
|
|
)
|
|
if not product:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
ean = (product.get("ean") or "").strip()
|
|
if not ean:
|
|
raise HTTPException(status_code=400, detail="Product has no EAN to search")
|
|
|
|
base_url = settings.APIGW_BASE_URL or settings.APIGATEWAY_URL
|
|
url = f"{base_url.rstrip('/')}/api/v1/products/search"
|
|
params = {"q": ean, "per_page": 25}
|
|
timeout = aiohttp.ClientTimeout(total=settings.APIGW_TIMEOUT_SECONDS)
|
|
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
async with session.get(url, headers=_apigw_headers(), params=params) as response:
|
|
if response.status >= 400:
|
|
detail = await response.text()
|
|
raise HTTPException(status_code=response.status, detail=detail)
|
|
data = await response.json()
|
|
|
|
results = data.get("products") if isinstance(data, dict) else []
|
|
if not isinstance(results, list):
|
|
results = []
|
|
|
|
saved = 0
|
|
seen_keys = set()
|
|
for item in results:
|
|
key = (
|
|
item.get("supplier_code"),
|
|
item.get("sku"),
|
|
item.get("product_url") or item.get("url")
|
|
)
|
|
if key in seen_keys:
|
|
continue
|
|
seen_keys.add(key)
|
|
_upsert_product_supplier(product_id, item, source="gateway")
|
|
saved += 1
|
|
|
|
return {
|
|
"status": "refreshed",
|
|
"saved": saved,
|
|
"queries": [{"q": ean}]
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("❌ Error refreshing product suppliers: %s", e, exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/products/{product_id}/price-history", response_model=List[Dict[str, Any]])
|
|
async def list_product_price_history(product_id: int, limit: int = Query(100)):
|
|
"""List price history entries for a product."""
|
|
try:
|
|
query = """
|
|
SELECT
|
|
id,
|
|
product_id,
|
|
price_type,
|
|
old_price,
|
|
new_price,
|
|
note,
|
|
changed_by,
|
|
changed_at
|
|
FROM product_price_history
|
|
WHERE product_id = %s
|
|
ORDER BY changed_at DESC
|
|
LIMIT %s
|
|
"""
|
|
return execute_query(query, (product_id, limit)) or []
|
|
except Exception as e:
|
|
logger.error("❌ Error loading product price history: %s", e, exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/products/{product_id}/price", response_model=Dict[str, Any])
|
|
async def update_product_price(product_id: int, payload: Dict[str, Any]):
|
|
"""Update product sales price and record price history."""
|
|
try:
|
|
if "new_price" not in payload:
|
|
raise HTTPException(status_code=400, detail="new_price is required")
|
|
|
|
new_price = payload.get("new_price")
|
|
note = payload.get("note")
|
|
changed_by = payload.get("changed_by")
|
|
|
|
current = execute_query_single(
|
|
"SELECT sales_price FROM products WHERE id = %s AND deleted_at IS NULL",
|
|
(product_id,)
|
|
)
|
|
if not current:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
old_price = current.get("sales_price")
|
|
if old_price == new_price:
|
|
return {"status": "no_change", "sales_price": old_price}
|
|
|
|
update_query = """
|
|
UPDATE products
|
|
SET sales_price = %s, updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
RETURNING *
|
|
"""
|
|
updated = execute_query(update_query, (new_price, product_id))
|
|
|
|
history_query = """
|
|
INSERT INTO product_price_history (
|
|
product_id,
|
|
price_type,
|
|
old_price,
|
|
new_price,
|
|
note,
|
|
changed_by
|
|
) VALUES (%s, %s, %s, %s, %s, %s)
|
|
RETURNING *
|
|
"""
|
|
history = execute_query(
|
|
history_query,
|
|
(product_id, "sales_price", old_price, new_price, note, changed_by)
|
|
)
|
|
|
|
return {
|
|
"status": "updated",
|
|
"product": updated[0] if updated else {},
|
|
"history": history[0] if history else {}
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("❌ Error updating product price: %s", e, exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/products/{product_id}/supplier", response_model=Dict[str, Any])
|
|
async def update_product_supplier(product_id: int, payload: Dict[str, Any]):
|
|
"""Update supplier info and optionally record supplier price history."""
|
|
try:
|
|
supplier_name = payload.get("supplier_name")
|
|
supplier_price = payload.get("supplier_price")
|
|
note = payload.get("note")
|
|
changed_by = payload.get("changed_by")
|
|
|
|
current = execute_query_single(
|
|
"SELECT supplier_name, supplier_price FROM products WHERE id = %s AND deleted_at IS NULL",
|
|
(product_id,)
|
|
)
|
|
if not current:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
update_query = """
|
|
UPDATE products
|
|
SET supplier_name = %s,
|
|
supplier_price = %s,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
RETURNING *
|
|
"""
|
|
updated = execute_query(
|
|
update_query,
|
|
(
|
|
supplier_name if supplier_name is not None else current.get("supplier_name"),
|
|
supplier_price if supplier_price is not None else current.get("supplier_price"),
|
|
product_id,
|
|
)
|
|
)
|
|
|
|
history_entry = {}
|
|
if supplier_price is not None and current.get("supplier_price") != supplier_price:
|
|
history_query = """
|
|
INSERT INTO product_price_history (
|
|
product_id,
|
|
price_type,
|
|
old_price,
|
|
new_price,
|
|
note,
|
|
changed_by
|
|
) VALUES (%s, %s, %s, %s, %s, %s)
|
|
RETURNING *
|
|
"""
|
|
history = execute_query(
|
|
history_query,
|
|
(
|
|
product_id,
|
|
"supplier_price",
|
|
current.get("supplier_price"),
|
|
supplier_price,
|
|
note,
|
|
changed_by,
|
|
)
|
|
)
|
|
history_entry = history[0] if history else {}
|
|
|
|
return {
|
|
"status": "updated",
|
|
"product": updated[0] if updated else {},
|
|
"history": history_entry
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("❌ Error updating supplier info: %s", e, exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/products/{product_id}/sales-history", response_model=List[Dict[str, Any]])
|
|
async def list_product_sales_history(product_id: int, limit: int = Query(100)):
|
|
"""List historical sales for a product from cases and subscriptions."""
|
|
try:
|
|
query = """
|
|
SELECT
|
|
'case_sale' AS source,
|
|
ss.id AS reference_id,
|
|
ss.sag_id,
|
|
COALESCE(ss.line_date, ss.created_at)::date AS line_date,
|
|
ss.description,
|
|
ss.quantity,
|
|
ss.unit_price,
|
|
ss.amount AS total_amount,
|
|
ss.currency,
|
|
ss.status
|
|
FROM sag_salgsvarer ss
|
|
WHERE ss.product_id = %s AND ss.type = 'sale'
|
|
|
|
UNION ALL
|
|
|
|
SELECT
|
|
'subscription' AS source,
|
|
ssi.id AS reference_id,
|
|
ss.sag_id,
|
|
ssi.created_at::date AS line_date,
|
|
ssi.description,
|
|
ssi.quantity,
|
|
ssi.unit_price,
|
|
ssi.line_total AS total_amount,
|
|
'DKK' AS currency,
|
|
ss.status
|
|
FROM sag_subscription_items ssi
|
|
JOIN sag_subscriptions ss ON ss.id = ssi.subscription_id
|
|
WHERE ssi.product_id = %s
|
|
|
|
ORDER BY line_date DESC
|
|
LIMIT %s
|
|
"""
|
|
return execute_query(query, (product_id, product_id, limit)) or []
|
|
except Exception as e:
|
|
logger.error("❌ Error loading product sales history: %s", e, exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|