bmc_hub/app/modules/fedex/backend/service.py

676 lines
24 KiB
Python
Raw Permalink Normal View History

import json
import logging
from decimal import Decimal
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from uuid import uuid4
from fastapi import HTTPException
from app.core.config import settings
from app.core.database import execute_query, execute_query_single, table_has_column
from app.modules.fedex.backend.api_client import FedExApiClient, parse_tracking_events
from app.modules.fedex.models.schemas import FedExBookingCreate
logger = logging.getLogger(__name__)
def _json_default(value: Any) -> Any:
if isinstance(value, Decimal):
return float(value)
if isinstance(value, datetime):
return value.isoformat()
return str(value)
def _json_dumps(value: Any) -> str:
return json.dumps(value, ensure_ascii=False, default=_json_default)
def _to_float(value: Any) -> Optional[float]:
try:
if value is None or value == "":
return None
return float(value)
except (TypeError, ValueError):
return None
def _extract_price_info(payload: Dict[str, Any]) -> tuple[Optional[float], Optional[str]]:
if not isinstance(payload, dict):
return None, None
direct_amount = _to_float(
payload.get("total_amount")
or payload.get("totalAmount")
or payload.get("total_cost")
or payload.get("totalCost")
or payload.get("price")
or payload.get("amount")
)
direct_currency = (
payload.get("currency")
or payload.get("currencyCode")
or payload.get("total_cost_currency")
)
if direct_amount is not None:
return direct_amount, str(direct_currency or "").upper() or None
stack: List[Any] = [payload]
visited: set[int] = set()
while stack:
node = stack.pop()
node_id = id(node)
if node_id in visited:
continue
visited.add(node_id)
if isinstance(node, dict):
amount = _to_float(node.get("amount") or node.get("value"))
currency = node.get("currency") or node.get("currencyCode")
if amount is not None and currency:
return amount, str(currency).upper()
# Prioritize common FedEx charge keys if present.
for key in (
"totalNetCharge",
"totalNetFedExCharge",
"totalBaseCharge",
"totalSurcharges",
"netCharge",
):
nested = node.get(key)
if isinstance(nested, dict):
nested_amount = _to_float(nested.get("amount") or nested.get("value"))
nested_currency = nested.get("currency") or nested.get("currencyCode")
if nested_amount is not None:
return nested_amount, str(nested_currency or "").upper() or None
stack.extend(node.values())
elif isinstance(node, list):
stack.extend(node)
return None, None
def _extract_label_url(payload: Dict[str, Any]) -> Optional[str]:
if not isinstance(payload, dict):
return None
direct = payload.get("label_url") or payload.get("labelUrl") or payload.get("label")
if isinstance(direct, str) and direct.strip().lower().startswith(("http://", "https://")):
return direct.strip()
stack: List[Any] = [payload]
visited: set[int] = set()
while stack:
node = stack.pop()
node_id = id(node)
if node_id in visited:
continue
visited.add(node_id)
if isinstance(node, dict):
for key, value in node.items():
key_lower = str(key).lower()
if isinstance(value, str):
v = value.strip()
if v.lower().startswith(("http://", "https://")) and (
"label" in key_lower or "document" in key_lower or "url" in key_lower
):
return v
elif isinstance(value, (dict, list)):
stack.append(value)
elif isinstance(node, list):
stack.extend(node)
return None
def _extract_tracking_number(payload: Dict[str, Any]) -> Optional[str]:
if not isinstance(payload, dict):
return None
direct = (
payload.get("tracking_number")
or payload.get("trackingNumber")
or payload.get("masterTrackingNumber")
)
if direct is not None:
value = str(direct).strip()
if value:
return value
stack: List[Any] = [payload]
visited: set[int] = set()
while stack:
node = stack.pop()
node_id = id(node)
if node_id in visited:
continue
visited.add(node_id)
if isinstance(node, dict):
for key, value in node.items():
key_lower = str(key).lower()
if "tracking" in key_lower and value is not None and not isinstance(value, (dict, list)):
candidate = str(value).strip()
if candidate:
return candidate
if isinstance(value, (dict, list)):
stack.append(value)
elif isinstance(node, list):
stack.extend(node)
return None
def _build_tracking_url(tracking_number: Optional[str]) -> Optional[str]:
if not tracking_number:
return None
return f"https://www.fedex.com/fedextrack/?trknbr={tracking_number}"
def _estimate_dry_run_price(payload: Dict[str, Any]) -> tuple[float, str]:
packages = payload.get("packages") if isinstance(payload, dict) else []
if not isinstance(packages, list) or not packages:
return 99.0, "DKK"
total_weight = 0.0
for p in packages:
if isinstance(p, dict):
total_weight += _to_float(p.get("weight_kg")) or 0.0
estimated = round(79.0 + (total_weight * 8.5), 2)
return max(estimated, 79.0), "DKK"
class FedExService:
def __init__(self) -> None:
self.client = FedExApiClient()
@property
def enabled(self) -> bool:
return bool(settings.FEDEX_ENABLED)
@property
def read_only(self) -> bool:
return bool(settings.FEDEX_READ_ONLY)
@property
def dry_run(self) -> bool:
return bool(settings.FEDEX_DRY_RUN)
def _assert_enabled(self) -> None:
if not self.enabled:
raise HTTPException(status_code=503, detail="FedEx integration is disabled")
def _booking_ref(self) -> str:
stamp = datetime.now(timezone.utc).strftime("%Y%m%d")
return f"FDX-{stamp}-{uuid4().hex[:8].upper()}"
def _validate_relations(self, payload: FedExBookingCreate) -> None:
case_exists = execute_query_single("SELECT id FROM sag_sager WHERE id = %s", (payload.case_id,))
if not case_exists:
raise HTTPException(status_code=404, detail="Case not found")
if payload.customer_id:
customer_exists = execute_query_single("SELECT id FROM customers WHERE id = %s", (payload.customer_id,))
if not customer_exists:
raise HTTPException(status_code=404, detail="Customer not found")
if payload.contact_id:
contact_exists = execute_query_single("SELECT id FROM contacts WHERE id = %s", (payload.contact_id,))
if not contact_exists:
raise HTTPException(status_code=404, detail="Contact not found")
def _fetch_packages(self, shipment_id: int) -> List[Dict[str, Any]]:
rows = execute_query(
"""
SELECT weight_kg, length_cm, width_cm, height_cm, description
FROM fedex_shipment_packages
WHERE shipment_id = %s
ORDER BY id ASC
""",
(shipment_id,),
) or []
return [dict(row) for row in rows]
def _shipment_row_to_dict(self, row: Dict[str, Any]) -> Dict[str, Any]:
mapped = dict(row)
mapped["packages"] = self._fetch_packages(int(row["id"]))
api_response = mapped.get("api_response")
if isinstance(api_response, str):
try:
api_response = json.loads(api_response)
except Exception:
api_response = None
if isinstance(api_response, dict):
if not mapped.get("tracking_number"):
mapped["tracking_number"] = _extract_tracking_number(api_response)
if not mapped.get("label_url"):
mapped["label_url"] = _extract_label_url(api_response)
if mapped.get("total_amount") is None:
fallback_amount, fallback_currency = _extract_price_info(api_response)
if fallback_amount is not None:
mapped["total_amount"] = fallback_amount
if not mapped.get("currency") and fallback_currency:
mapped["currency"] = fallback_currency
mapped["tracking_url"] = _build_tracking_url(mapped.get("tracking_number"))
# Ensure older dry-run rows still expose useful test outputs in UI.
if mapped.get("dry_run") and mapped.get("shipment_status") in {"submitted", "booked"}:
if not mapped.get("label_url") and mapped.get("tracking_url"):
mapped["label_url"] = mapped["tracking_url"]
if mapped.get("total_amount") is None:
estimated_amount, estimated_currency = _estimate_dry_run_price({"packages": mapped.get("packages") or []})
mapped["total_amount"] = estimated_amount
if not mapped.get("currency"):
mapped["currency"] = estimated_currency
return mapped
def create_booking_draft(self, payload: FedExBookingCreate, created_by_user_id: Optional[int]) -> Dict[str, Any]:
self._assert_enabled()
self._validate_relations(payload)
if payload.pickup_window_end <= payload.pickup_window_start:
raise HTTPException(status_code=400, detail="pickup_window_end must be after pickup_window_start")
booking_ref = self._booking_ref()
shipment_row = execute_query_single(
"""
INSERT INTO fedex_shipments (
booking_ref,
case_id,
customer_id,
contact_id,
service_type,
shipment_status,
pickup_window_start,
pickup_window_end,
recipient_name,
company_name,
address_line1,
address_line2,
postal_code,
city,
country_code,
phone,
email,
dry_run,
created_by_user_id,
updated_by_user_id,
api_payload,
api_response
) VALUES (
%s, %s, %s, %s, %s, 'draft',
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s::jsonb, %s::jsonb
)
RETURNING *
""",
(
booking_ref,
payload.case_id,
payload.customer_id,
payload.contact_id,
payload.service_type,
payload.pickup_window_start,
payload.pickup_window_end,
payload.address.recipient_name,
payload.address.company_name,
payload.address.address_line1,
payload.address.address_line2,
payload.address.postal_code,
payload.address.city,
payload.address.country_code.upper(),
payload.address.phone,
payload.address.email,
self.dry_run,
created_by_user_id,
created_by_user_id,
_json_dumps(payload.model_dump(mode="json")),
_json_dumps({"status": "draft_created"}),
),
)
if not shipment_row:
raise HTTPException(status_code=500, detail="Failed to create booking draft")
shipment_id = int(shipment_row["id"])
for package in payload.packages:
execute_query(
"""
INSERT INTO fedex_shipment_packages (
shipment_id, weight_kg, length_cm, width_cm, height_cm, description
) VALUES (%s, %s, %s, %s, %s, %s)
""",
(
shipment_id,
package.weight_kg,
package.length_cm,
package.width_cm,
package.height_cm,
package.description,
),
)
logger.info("✅ FedEx draft created: %s (case=%s)", booking_ref, payload.case_id)
return self._shipment_row_to_dict(dict(shipment_row))
def list_bookings(self, case_id: Optional[int] = None) -> List[Dict[str, Any]]:
params: List[Any] = []
where_sql = "WHERE deleted_at IS NULL"
if case_id is not None:
where_sql += " AND case_id = %s"
params.append(case_id)
rows = execute_query(
f"""
SELECT *
FROM fedex_shipments
{where_sql}
ORDER BY created_at DESC
LIMIT 200
""",
tuple(params),
) or []
return [self._shipment_row_to_dict(dict(row)) for row in rows]
def get_booking(self, booking_ref: str) -> Dict[str, Any]:
row = execute_query_single(
"""
SELECT *
FROM fedex_shipments
WHERE booking_ref = %s
AND deleted_at IS NULL
""",
(booking_ref,),
)
if not row:
raise HTTPException(status_code=404, detail="Booking not found")
return self._shipment_row_to_dict(dict(row))
async def submit_booking(self, booking_ref: str, user_id: Optional[int]) -> Dict[str, Any]:
self._assert_enabled()
shipment = self.get_booking(booking_ref)
if shipment["shipment_status"] not in {"draft", "failed"}:
raise HTTPException(status_code=409, detail="Only draft/failed bookings can be submitted")
if self.read_only:
raise HTTPException(status_code=403, detail="FedEx write actions are disabled (read-only mode)")
payload = {
"booking_ref": shipment["booking_ref"],
"service_type": shipment["service_type"],
"pickup_window_start": shipment["pickup_window_start"].isoformat() if shipment.get("pickup_window_start") else None,
"pickup_window_end": shipment["pickup_window_end"].isoformat() if shipment.get("pickup_window_end") else None,
"recipient": {
"recipient_name": shipment["recipient_name"],
"company_name": shipment.get("company_name"),
"address_line1": shipment["address_line1"],
"address_line2": shipment.get("address_line2"),
"postal_code": shipment["postal_code"],
"city": shipment["city"],
"country_code": shipment["country_code"],
"phone": shipment.get("phone"),
"email": shipment.get("email"),
},
"packages": shipment["packages"],
}
if self.dry_run:
tracking_number = f"DRYRUN-{uuid4().hex[:12].upper()}"
label_url = _build_tracking_url(tracking_number)
total_amount, currency = _estimate_dry_run_price(payload)
api_response = {
"dry_run": True,
"tracking_number": tracking_number,
"label_url": label_url,
"total_amount": total_amount,
"currency": currency,
}
new_status = "submitted"
else:
try:
api_response = await self.client.create_shipment(payload)
except Exception as exc:
execute_query(
"""
UPDATE fedex_shipments
SET shipment_status = 'failed',
api_response = %s::jsonb,
updated_by_user_id = %s,
updated_at = CURRENT_TIMESTAMP
WHERE booking_ref = %s
""",
(_json_dumps({"error": str(exc)}), user_id, booking_ref),
)
raise HTTPException(status_code=502, detail="FedEx booking failed") from exc
tracking_number = _extract_tracking_number(api_response)
label_url = _extract_label_url(api_response)
new_status = "booked"
total_amount, currency = _extract_price_info(api_response)
has_total_amount = table_has_column("fedex_shipments", "total_amount")
has_currency = table_has_column("fedex_shipments", "currency")
set_clauses = [
"shipment_status = %s",
"tracking_number = COALESCE(%s, tracking_number)",
"label_url = COALESCE(%s, label_url)",
]
update_params: List[Any] = [
new_status,
tracking_number,
label_url,
]
if has_total_amount:
set_clauses.append("total_amount = COALESCE(%s, total_amount)")
update_params.append(total_amount)
if has_currency:
set_clauses.append("currency = COALESCE(%s, currency)")
update_params.append(currency)
set_clauses.extend([
"submitted_at = CURRENT_TIMESTAMP",
"api_payload = %s::jsonb",
"api_response = %s::jsonb",
"updated_by_user_id = %s",
"updated_at = CURRENT_TIMESTAMP",
])
update_params.extend([
_json_dumps(payload),
_json_dumps(api_response),
user_id,
booking_ref,
])
updated = execute_query_single(
f"""
UPDATE fedex_shipments
SET {', '.join(set_clauses)}
WHERE booking_ref = %s
RETURNING *
""",
tuple(update_params),
)
if tracking_number:
execute_query(
"""
INSERT INTO fedex_tracking_events (
shipment_id, status, description, event_timestamp
) VALUES (%s, %s, %s, CURRENT_TIMESTAMP)
""",
(
updated["id"],
"submitted" if self.dry_run else "booked",
"Shipment submitted from BMC Hub",
),
)
return {
"booking_ref": booking_ref,
"status": new_status,
"dry_run": self.dry_run,
"tracking_number": tracking_number,
"tracking_url": _build_tracking_url(tracking_number),
"label_url": label_url,
"total_amount": total_amount,
"currency": currency,
}
async def get_tracking(self, booking_ref: str) -> Dict[str, Any]:
self._assert_enabled()
shipment = self.get_booking(booking_ref)
tracking_number = shipment.get("tracking_number")
if not tracking_number:
events = execute_query(
"""
SELECT status, event_timestamp, description, location_city, location_country
FROM fedex_tracking_events
WHERE shipment_id = %s
ORDER BY event_timestamp DESC
""",
(shipment["id"],),
) or []
return {
"booking_ref": booking_ref,
"shipment_status": shipment["shipment_status"],
"tracking_number": None,
"events": [dict(row) for row in events],
}
if self.dry_run:
events = execute_query(
"""
SELECT status, event_timestamp, description, location_city, location_country
FROM fedex_tracking_events
WHERE shipment_id = %s
ORDER BY event_timestamp DESC
""",
(shipment["id"],),
) or []
return {
"booking_ref": booking_ref,
"shipment_status": shipment["shipment_status"],
"tracking_number": tracking_number,
"events": [dict(row) for row in events],
}
try:
provider_payload = await self.client.get_tracking(tracking_number)
except Exception as exc:
raise HTTPException(status_code=502, detail="Failed to fetch FedEx tracking") from exc
events = parse_tracking_events(provider_payload)
if events:
execute_query(
"DELETE FROM fedex_tracking_events WHERE shipment_id = %s",
(shipment["id"],),
)
for event in events:
execute_query(
"""
INSERT INTO fedex_tracking_events (
shipment_id,
status,
description,
event_timestamp,
location_city,
location_country
) VALUES (
%s, %s, %s,
COALESCE(%s::timestamp, CURRENT_TIMESTAMP),
%s, %s
)
""",
(
shipment["id"],
event.get("status") or "unknown",
event.get("description"),
event.get("event_timestamp"),
event.get("location_city"),
event.get("location_country"),
),
)
status = str(provider_payload.get("shipment_status") or shipment["shipment_status"])
execute_query(
"""
UPDATE fedex_shipments
SET shipment_status = %s,
api_response = %s::jsonb,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""",
(status, _json_dumps(provider_payload), shipment["id"]),
)
current_events = execute_query(
"""
SELECT status, event_timestamp, description, location_city, location_country
FROM fedex_tracking_events
WHERE shipment_id = %s
ORDER BY event_timestamp DESC
""",
(shipment["id"],),
) or []
return {
"booking_ref": booking_ref,
"shipment_status": status,
"tracking_number": tracking_number,
"events": [dict(row) for row in current_events],
}
async def cancel_booking(self, booking_ref: str, reason: Optional[str], user_id: Optional[int]) -> Dict[str, Any]:
self._assert_enabled()
if self.read_only:
raise HTTPException(status_code=403, detail="FedEx write actions are disabled (read-only mode)")
shipment = self.get_booking(booking_ref)
if shipment["shipment_status"] == "cancelled":
return {"booking_ref": booking_ref, "status": "cancelled", "cancelled": True}
if not self.dry_run and shipment.get("tracking_number"):
try:
await self.client.cancel_shipment(str(shipment["tracking_number"]))
except Exception as exc:
raise HTTPException(status_code=502, detail="Failed to cancel shipment at FedEx") from exc
execute_query(
"""
UPDATE fedex_shipments
SET shipment_status = 'cancelled',
cancel_reason = %s,
updated_by_user_id = %s,
updated_at = CURRENT_TIMESTAMP
WHERE booking_ref = %s
""",
(reason, user_id, booking_ref),
)
execute_query(
"""
INSERT INTO fedex_tracking_events (shipment_id, status, description, event_timestamp)
VALUES (%s, 'cancelled', %s, CURRENT_TIMESTAMP)
""",
(shipment["id"], reason or "Cancelled from BMC Hub"),
)
return {"booking_ref": booking_ref, "status": "cancelled", "cancelled": True}
fedex_service = FedExService()