bmc_hub/app/opportunities/backend/router.py
Christian f059cb6c95 feat: Add product search endpoint and enhance opportunity management
- Implemented a new endpoint for searching webshop products with filters for visibility and configuration.
- Enhanced the webshop frontend to include a customer search feature for improved user experience.
- Added opportunity line items management with CRUD operations and comments functionality.
- Created database migrations for opportunity line items and comments, including necessary triggers and indexes.
2026-01-28 14:37:47 +01:00

505 lines
16 KiB
Python

"""
Opportunities (Pipeline) Router
Hub-local sales pipeline
"""
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from typing import Optional, List, Dict
from datetime import date, datetime
import json
import logging
from app.core.database import execute_query, execute_query_single, execute_update
from app.services.opportunity_service import handle_stage_change
logger = logging.getLogger(__name__)
router = APIRouter()
class PipelineStageBase(BaseModel):
name: str
description: Optional[str] = None
sort_order: int = 0
default_probability: int = 0
color: Optional[str] = "#0f4c75"
is_won: bool = False
is_lost: bool = False
is_active: bool = True
class PipelineStageCreate(PipelineStageBase):
pass
class PipelineStageUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
sort_order: Optional[int] = None
default_probability: Optional[int] = None
color: Optional[str] = None
is_won: Optional[bool] = None
is_lost: Optional[bool] = None
is_active: Optional[bool] = None
class OpportunityBase(BaseModel):
customer_id: int
title: str
description: Optional[str] = None
amount: Optional[float] = 0
currency: Optional[str] = "DKK"
expected_close_date: Optional[date] = None
stage_id: Optional[int] = None
owner_user_id: Optional[int] = None
class OpportunityCreate(OpportunityBase):
pass
class OpportunityUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
amount: Optional[float] = None
currency: Optional[str] = None
expected_close_date: Optional[date] = None
stage_id: Optional[int] = None
owner_user_id: Optional[int] = None
is_active: Optional[bool] = None
class OpportunityStageUpdate(BaseModel):
stage_id: int
note: Optional[str] = None
user_id: Optional[int] = None
class OpportunityLineBase(BaseModel):
name: str
quantity: int = 1
unit_price: float = 0.0
product_number: Optional[str] = None
description: Optional[str] = None
class OpportunityLineCreate(OpportunityLineBase):
pass
class OpportunityCommentBase(BaseModel):
content: str
author_name: Optional[str] = None
user_id: Optional[int] = None
email_id: Optional[int] = None
contract_number: Optional[str] = None
contract_context: Optional[str] = None
contract_link: Optional[str] = None
metadata: Optional[Dict] = None
class OpportunityCommentCreate(OpportunityCommentBase):
pass
class OpportunityCommentResponse(BaseModel):
id: int
opportunity_id: int
content: str
author_name: Optional[str] = None
user_id: Optional[int] = None
user_full_name: Optional[str] = None
username: Optional[str] = None
email_id: Optional[int] = None
email_subject: Optional[str] = None
email_sender: Optional[str] = None
contract_number: Optional[str] = None
contract_context: Optional[str] = None
contract_link: Optional[str] = None
metadata: Optional[Dict] = None
created_at: datetime
updated_at: datetime
def _get_stage(stage_id: int):
stage = execute_query_single(
"SELECT * FROM pipeline_stages WHERE id = %s AND is_active = TRUE",
(stage_id,)
)
if not stage:
raise HTTPException(status_code=404, detail="Stage not found")
return stage
def _get_default_stage():
stage = execute_query_single(
"SELECT * FROM pipeline_stages WHERE is_active = TRUE ORDER BY sort_order ASC LIMIT 1"
)
if not stage:
raise HTTPException(status_code=400, detail="No active stages configured")
return stage
def _get_opportunity(opportunity_id: int):
query = """
SELECT o.*, c.name AS customer_name,
s.name AS stage_name, s.color AS stage_color, s.is_won, s.is_lost
FROM pipeline_opportunities o
JOIN customers c ON c.id = o.customer_id
JOIN pipeline_stages s ON s.id = o.stage_id
WHERE o.id = %s
"""
opportunity = execute_query_single(query, (opportunity_id,))
if not opportunity:
raise HTTPException(status_code=404, detail="Opportunity not found")
return opportunity
def _insert_stage_history(opportunity_id: int, from_stage_id: Optional[int], to_stage_id: int,
user_id: Optional[int] = None, note: Optional[str] = None):
execute_query(
"""
INSERT INTO pipeline_stage_history (opportunity_id, from_stage_id, to_stage_id, changed_by_user_id, note)
VALUES (%s, %s, %s, %s, %s)
""",
(opportunity_id, from_stage_id, to_stage_id, user_id, note)
)
def _fetch_opportunity_comments(opportunity_id: int):
query = """
SELECT c.*, u.full_name AS user_full_name, u.username,
em.subject AS email_subject, em.sender_email AS email_sender
FROM pipeline_opportunity_comments c
LEFT JOIN users u ON u.user_id = c.user_id
LEFT JOIN email_messages em ON em.id = c.email_id
WHERE c.opportunity_id = %s
ORDER BY c.created_at DESC
"""
return execute_query(query, (opportunity_id,)) or []
def _fetch_comment(comment_id: int):
query = """
SELECT c.*, u.full_name AS user_full_name, u.username,
em.subject AS email_subject, em.sender_email AS email_sender
FROM pipeline_opportunity_comments c
LEFT JOIN users u ON u.user_id = c.user_id
LEFT JOIN email_messages em ON em.id = c.email_id
WHERE c.id = %s
"""
result = execute_query(query, (comment_id,))
return result[0] if result else None
# ============================
# Pipeline Stages
# ============================
@router.get("/pipeline/stages", tags=["Pipeline Stages"])
async def list_stages():
query = "SELECT * FROM pipeline_stages WHERE is_active = TRUE ORDER BY sort_order ASC"
return execute_query(query) or []
@router.post("/pipeline/stages", tags=["Pipeline Stages"])
async def create_stage(stage: PipelineStageCreate):
query = """
INSERT INTO pipeline_stages
(name, description, sort_order, default_probability, color, is_won, is_lost, is_active)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING *
"""
result = execute_query(query, (
stage.name,
stage.description,
stage.sort_order,
stage.default_probability,
stage.color,
stage.is_won,
stage.is_lost,
stage.is_active
))
logger.info("✅ Created pipeline stage: %s", stage.name)
return result[0] if result else None
@router.put("/pipeline/stages/{stage_id}", tags=["Pipeline Stages"])
async def update_stage(stage_id: int, stage: PipelineStageUpdate):
updates = []
params = []
for field, value in stage.dict(exclude_unset=True).items():
updates.append(f"{field} = %s")
params.append(value)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
params.append(stage_id)
query = f"UPDATE pipeline_stages SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s RETURNING *"
result = execute_query(query, tuple(params))
if not result:
raise HTTPException(status_code=404, detail="Stage not found")
logger.info("✅ Updated pipeline stage: %s", stage_id)
return result[0]
@router.delete("/pipeline/stages/{stage_id}", tags=["Pipeline Stages"])
async def deactivate_stage(stage_id: int):
affected = execute_update(
"UPDATE pipeline_stages SET is_active = FALSE, updated_at = CURRENT_TIMESTAMP WHERE id = %s",
(stage_id,)
)
if not affected:
raise HTTPException(status_code=404, detail="Stage not found")
logger.info("⚠️ Deactivated pipeline stage: %s", stage_id)
return {"status": "success", "stage_id": stage_id}
# ============================
# Opportunities
# ============================
@router.get("/opportunities", tags=["Opportunities"])
async def list_opportunities(customer_id: Optional[int] = None, stage_id: Optional[int] = None):
query = """
SELECT o.*, c.name AS customer_name,
s.name AS stage_name, s.color AS stage_color, s.is_won, s.is_lost
FROM pipeline_opportunities o
JOIN customers c ON c.id = o.customer_id
JOIN pipeline_stages s ON s.id = o.stage_id
WHERE o.is_active = TRUE
"""
params: List = []
if customer_id is not None:
query += " AND o.customer_id = %s"
params.append(customer_id)
if stage_id is not None:
query += " AND o.stage_id = %s"
params.append(stage_id)
query += " ORDER BY o.updated_at DESC NULLS LAST, o.created_at DESC"
if params:
return execute_query(query, tuple(params)) or []
return execute_query(query) or []
@router.get("/opportunities/{opportunity_id}", tags=["Opportunities"])
async def get_opportunity(opportunity_id: int):
return _get_opportunity(opportunity_id)
@router.post("/opportunities", tags=["Opportunities"])
async def create_opportunity(opportunity: OpportunityCreate):
stage = _get_stage(opportunity.stage_id) if opportunity.stage_id else _get_default_stage()
probability = stage["default_probability"]
query = """
INSERT INTO pipeline_opportunities
(customer_id, title, description, amount, currency, expected_close_date, stage_id, probability, owner_user_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
"""
result = execute_query(query, (
opportunity.customer_id,
opportunity.title,
opportunity.description,
opportunity.amount or 0,
opportunity.currency or "DKK",
opportunity.expected_close_date,
stage["id"],
probability,
opportunity.owner_user_id
))
if not result:
raise HTTPException(status_code=500, detail="Failed to create opportunity")
opportunity_id = result[0]["id"]
_insert_stage_history(opportunity_id, None, stage["id"], opportunity.owner_user_id, "Oprettet")
logger.info("✅ Created opportunity %s", opportunity_id)
return _get_opportunity(opportunity_id)
@router.put("/opportunities/{opportunity_id}", tags=["Opportunities"])
async def update_opportunity(opportunity_id: int, update: OpportunityUpdate):
existing = _get_opportunity(opportunity_id)
updates = []
params = []
update_dict = update.dict(exclude_unset=True)
stage_changed = False
new_stage = None
if "stage_id" in update_dict:
new_stage = _get_stage(update_dict["stage_id"])
update_dict["probability"] = new_stage["default_probability"]
stage_changed = new_stage["id"] != existing["stage_id"]
for field, value in update_dict.items():
updates.append(f"{field} = %s")
params.append(value)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
params.append(opportunity_id)
query = f"UPDATE pipeline_opportunities SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s"
execute_update(query, tuple(params))
if stage_changed and new_stage:
_insert_stage_history(opportunity_id, existing["stage_id"], new_stage["id"], update.owner_user_id, "Stage ændret")
updated = _get_opportunity(opportunity_id)
handle_stage_change(updated, new_stage)
return _get_opportunity(opportunity_id)
@router.patch("/opportunities/{opportunity_id}/stage", tags=["Opportunities"])
async def update_opportunity_stage(opportunity_id: int, update: OpportunityStageUpdate):
existing = _get_opportunity(opportunity_id)
new_stage = _get_stage(update.stage_id)
execute_update(
"""
UPDATE pipeline_opportunities
SET stage_id = %s,
probability = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""",
(new_stage["id"], new_stage["default_probability"], opportunity_id)
)
_insert_stage_history(opportunity_id, existing["stage_id"], new_stage["id"], update.user_id, update.note)
updated = _get_opportunity(opportunity_id)
handle_stage_change(updated, new_stage)
return updated
@router.get("/opportunities/{opportunity_id}/lines", tags=["Opportunities"])
async def list_opportunity_lines(opportunity_id: int):
query = """
SELECT id, opportunity_id, product_number, name, description, quantity, unit_price,
quantity * unit_price AS total_price
FROM pipeline_opportunity_lines
WHERE opportunity_id = %s
ORDER BY id ASC
"""
return execute_query(query, (opportunity_id,)) or []
@router.post("/opportunities/{opportunity_id}/lines", tags=["Opportunities"])
async def add_opportunity_line(opportunity_id: int, line: OpportunityLineCreate):
query = """
INSERT INTO pipeline_opportunity_lines
(opportunity_id, product_number, name, description, quantity, unit_price)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id, opportunity_id, product_number, name, description, quantity, unit_price,
quantity * unit_price AS total_price
"""
result = execute_query(
query,
(
opportunity_id,
line.product_number,
line.name,
line.description,
line.quantity,
line.unit_price
)
)
if not result:
raise HTTPException(status_code=500, detail="Failed to create line item")
return result[0]
@router.delete("/opportunities/{opportunity_id}/lines/{line_id}", tags=["Opportunities"])
async def remove_opportunity_line(opportunity_id: int, line_id: int):
query = """
DELETE FROM pipeline_opportunity_lines
WHERE opportunity_id = %s AND id = %s
RETURNING id
"""
result = execute_query(query, (opportunity_id, line_id))
if not result:
raise HTTPException(status_code=404, detail="Line item not found")
return {"success": True, "line_id": line_id}
@router.get(
"/opportunities/{opportunity_id}/comments",
response_model=List[OpportunityCommentResponse],
tags=["Opportunities"]
)
async def list_opportunity_comments(opportunity_id: int):
_get_opportunity(opportunity_id)
return _fetch_opportunity_comments(opportunity_id)
@router.post(
"/opportunities/{opportunity_id}/comments",
response_model=OpportunityCommentResponse,
tags=["Opportunities"]
)
async def add_opportunity_comment(opportunity_id: int, comment: OpportunityCommentCreate):
_get_opportunity(opportunity_id)
author_name = comment.author_name or 'Hub Bruger'
metadata_json = json.dumps(comment.metadata) if comment.metadata else None
query = """
INSERT INTO pipeline_opportunity_comments
(opportunity_id, user_id, author_name, content, email_id,
contract_number, contract_context, contract_link, metadata)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
"""
result = execute_query(
query,
(
opportunity_id,
comment.user_id,
author_name,
comment.content,
comment.email_id,
comment.contract_number,
comment.contract_context,
comment.contract_link,
metadata_json
)
)
if not result:
raise HTTPException(status_code=500, detail="Kunne ikke oprette kommentar")
comment_id = result[0]["id"]
return _fetch_comment(comment_id)
@router.get(
"/contracts/search",
tags=["Opportunities"],
response_model=List[Dict]
)
async def search_contracts(query: str = Query(..., min_length=2), limit: int = Query(10, ge=1, le=50)):
sql = """
SELECT contract_number,
MAX(created_at) AS last_seen,
COUNT(*) AS hits
FROM extraction_lines
WHERE contract_number IS NOT NULL
AND contract_number <> ''
AND contract_number ILIKE %s
GROUP BY contract_number
ORDER BY MAX(created_at) DESC
LIMIT %s
"""
params = (f"%{query}%", limit)
results = execute_query(sql, params)
return results or []