bmc_hub/app/opportunities/backend/router.py

1180 lines
41 KiB
Python
Raw Normal View History

2026-01-29 00:36:32 +01:00
2026-01-28 07:48:10 +01:00
"""
Opportunities (Pipeline) Router
Hub-local sales pipeline
"""
2026-01-29 00:36:32 +01:00
from pathlib import Path
from uuid import uuid4
from fastapi import APIRouter, HTTPException, Query, UploadFile, File, Form, Request
from fastapi.responses import FileResponse
2026-01-28 07:48:10 +01:00
from pydantic import BaseModel
2026-01-29 00:36:32 +01:00
from typing import Optional, List, Dict, Any, Tuple
from datetime import date, datetime
import json
2026-01-28 07:48:10 +01:00
import logging
2026-01-29 00:36:32 +01:00
import os
import shutil
2026-01-28 07:48:10 +01:00
2026-01-29 00:36:32 +01:00
from app.core.config import settings
2026-01-28 07:48:10 +01:00
from app.core.database import execute_query, execute_query_single, execute_update
from app.services.opportunity_service import handle_stage_change
2026-01-29 00:36:32 +01:00
from app.services.email_service import EmailService
import email
from email.header import decode_header
try:
import extract_msg
except ImportError:
extract_msg = None
2026-01-28 07:48:10 +01:00
logger = logging.getLogger(__name__)
router = APIRouter()
2026-01-29 00:36:32 +01:00
@router.post("/opportunities/{opportunity_id}/email-links", tags=["Opportunities"])
async def add_opportunity_email_link(opportunity_id: int, payload: dict):
"""Add a linked email to an opportunity"""
email_id = payload.get("email_id")
if not email_id or not isinstance(email_id, int):
raise HTTPException(status_code=400, detail="Invalid email_id")
try:
_get_opportunity(opportunity_id)
except HTTPException:
raise HTTPException(status_code=404, detail="Opportunity not found")
try:
execute_query(
"INSERT INTO pipeline_opportunity_emails (opportunity_id, email_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
(opportunity_id, email_id)
)
except Exception as e:
logger.error(f"Failed to add email link: {e}")
raise HTTPException(status_code=500, detail="Kunne ikke tilføje email-link")
return _get_opportunity(opportunity_id)
@router.delete("/opportunities/{opportunity_id}/email-links/{email_id}", tags=["Opportunities"])
async def remove_opportunity_email_link(opportunity_id: int, email_id: int):
"""Remove a linked email from an opportunity"""
try:
execute_query(
"DELETE FROM pipeline_opportunity_emails WHERE opportunity_id = %s AND email_id = %s",
(opportunity_id, email_id)
)
except Exception as e:
logger.error(f"Failed to remove email link: {e}")
raise HTTPException(status_code=500, detail="Kunne ikke fjerne email-link")
return {"success": True}
@router.patch("/opportunities/{opportunity_id}/email-link", tags=["Opportunities"])
async def update_opportunity_email_link(opportunity_id: int, payload: dict):
"""Legacy endpoint: Update the linked email (single) -> Redirects to add link"""
# For backward compatibility, we treat this as "add link" for now
return await add_opportunity_email_link(opportunity_id, payload)
def _decode_header_str(header_val):
if not header_val:
return ""
try:
decoded_list = decode_header(header_val)
result = ""
for content, encoding in decoded_list:
if isinstance(content, bytes):
if encoding:
try:
result += content.decode(encoding)
except LookupError:
result += content.decode('utf-8', errors='ignore')
except Exception:
result += content.decode('utf-8', errors='ignore')
else:
result += content.decode('utf-8', errors='ignore')
else:
result += str(content)
return result
except Exception:
return str(header_val)
async def _process_uploaded_email(file: UploadFile, opportunity_id: int) -> dict:
content = await file.read()
filename = file.filename.lower()
email_data = {}
# Generate a unique message ID if one doesn't exist to prevent collisions/logic errors
temp_id = str(uuid4())
if filename.endswith('.msg'):
if not extract_msg:
raise HTTPException(status_code=500, detail="Library 'extract-msg' not installed")
# extract-msg needs a file-like object or path. BytesIO works.
import io
msg = extract_msg.Message(io.BytesIO(content))
# Map fields
email_data = {
'message_id': msg.messageId or f"msg-{temp_id}",
'subject': msg.subject or "No Subject",
'sender_email': msg.sender or "",
'sender_name': msg.sender or "", # msg.sender is often "Name <email>" or just email
'recipient_email': msg.to or "",
'cc': msg.cc or "",
'body_text': msg.body,
'body_html': msg.htmlBody, # might be None
'received_date': msg.date or datetime.now(),
'folder': 'Imported',
'attachment_count': len(msg.attachments),
'has_attachments': len(msg.attachments) > 0,
'attachments': []
}
# Handle msg attachments (simplified, might need more work for full fidelity)
for att in msg.attachments:
# Binary attachments in msg
if hasattr(att, 'data'):
email_data['attachments'].append({
'filename': att.longFilename or att.shortFilename or 'attachment',
'content': att.data,
'size': len(att.data),
'content_type': 'application/octet-stream'
})
elif filename.endswith('.eml'):
msg = email.message_from_bytes(content)
# Helper to get body
body_text = ""
body_html = ""
if msg.is_multipart():
for part in msg.walk():
ctype = part.get_content_type()
if ctype == "text/plain" and not body_text:
body_text = part.get_payload(decode=True).decode('utf-8', errors='ignore')
elif ctype == "text/html" and not body_html:
body_html = part.get_payload(decode=True).decode('utf-8', errors='ignore')
else:
body_text = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
# Attachments
attachments = []
for part in msg.walk():
if part.get_content_maintype() == 'multipart': continue
if part.get_content_type() in ['text/plain', 'text/html']: continue
fname = part.get_filename()
if fname:
payload = part.get_payload(decode=True)
if payload:
attachments.append({
'filename': _decode_header_str(fname),
'content': payload,
'size': len(payload),
'content_type': part.get_content_type()
})
email_data = {
'message_id': msg.get('Message-ID', f"eml-{temp_id}"),
'subject': _decode_header_str(msg.get('Subject', 'No Subject')),
'sender_email': _decode_header_str(msg.get('From', '')),
'sender_name': _decode_header_str(msg.get('From', '')),
'recipient_email': _decode_header_str(msg.get('To', '')),
'cc': _decode_header_str(msg.get('Cc', '')),
'body_text': body_text,
'body_html': body_html,
'received_date': datetime.now(), # EML date parsing is complex, default to now for import
'folder': 'Imported',
'has_attachments': len(attachments) > 0,
'attachment_count': len(attachments),
'attachments': attachments
}
# Try parse date
if msg.get('Date'):
try:
from email.utils import parsedate_to_datetime
email_data['received_date'] = parsedate_to_datetime(msg.get('Date'))
except: pass
else:
raise HTTPException(status_code=400, detail="Unsupported file format. Use .eml or .msg")
# Save via EmailService
svc = EmailService()
# Check if exists (by message_id)
# The svc.save_email method inserts. We might want to check for duplicates first?
# save_email returns id if new, or might fail?
# Actually email_messages table likely has unique constraint on message_id?
# Let's check save_email again. It does INSERT.
# We should search first.
# Simple check query
existing = execute_query_single("SELECT id FROM email_messages WHERE message_id = %s", (email_data['message_id'],))
if existing:
email_id = existing['id']
else:
email_id = await svc.save_email(email_data)
if not email_id:
raise HTTPException(status_code=500, detail="Failed to save imported email")
# Link to opportunity
try:
execute_query(
"INSERT INTO pipeline_opportunity_emails (opportunity_id, email_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
(opportunity_id, email_id)
)
except Exception as e:
logger.error(f"Failed to link imported email: {e}")
raise HTTPException(status_code=500, detail="Failed to link email")
return _get_opportunity(opportunity_id)
@router.post("/opportunities/{opportunity_id}/upload-email", tags=["Opportunities"])
async def upload_opportunity_email(opportunity_id: int, file: UploadFile = File(...)):
"""Upload an .eml or .msg file and link it to the opportunity"""
return await _process_uploaded_email(file, opportunity_id)
@router.post("/opportunities/{opportunity_id}/contacts", tags=["Opportunities"])
async def add_opportunity_contact_link(opportunity_id: int, payload: dict):
"""Link a contact to an opportunity"""
contact_id = payload.get("contact_id")
role = payload.get("role")
if not contact_id:
raise HTTPException(status_code=400, detail="Invalid contact_id")
try:
execute_query(
"INSERT INTO pipeline_opportunity_contacts (opportunity_id, contact_id, role) VALUES (%s, %s, %s) ON CONFLICT (opportunity_id, contact_id) DO UPDATE SET role = EXCLUDED.role",
(opportunity_id, contact_id, role)
)
except Exception as e:
logger.error(f"Failed to add contact link: {e}")
raise HTTPException(status_code=500, detail="Kunne ikke tilføje kontaktperson")
return _get_opportunity(opportunity_id)
@router.delete("/opportunities/{opportunity_id}/contacts/{contact_id}", tags=["Opportunities"])
async def remove_opportunity_contact_link(opportunity_id: int, contact_id: int):
"""Remove a linked contact from an opportunity"""
try:
execute_query(
"DELETE FROM pipeline_opportunity_contacts WHERE opportunity_id = %s AND contact_id = %s",
(opportunity_id, contact_id)
)
except Exception as e:
logger.error(f"Failed to remove contact link: {e}")
raise HTTPException(status_code=500, detail="Kunne ikke fjerne kontaktperson")
return _get_opportunity(opportunity_id)
UPLOAD_BASE_PATH = Path(settings.UPLOAD_DIR).resolve()
COMMENT_ATTACHMENT_SUBDIR = "opportunity_comments"
CONTRACT_ATTACHMENT_SUBDIR = "opportunity_contract_files"
for subdir in (COMMENT_ATTACHMENT_SUBDIR, CONTRACT_ATTACHMENT_SUBDIR):
(UPLOAD_BASE_PATH / subdir).mkdir(parents=True, exist_ok=True)
ALLOWED_EXTENSIONS = {ext.lower() for ext in settings.ALLOWED_EXTENSIONS}
MAX_ATTACHMENT_SIZE = settings.EMAIL_MAX_UPLOAD_SIZE_MB * 1024 * 1024
def _is_attachment_allowed(filename: str) -> bool:
extension = Path(filename).suffix.lower().lstrip(".")
return extension in ALLOWED_EXTENSIONS
def _validate_attachment(upload_file: UploadFile) -> None:
if not _is_attachment_allowed(upload_file.filename):
raise HTTPException(400, detail="Unsupported attachment type")
upload_file.file.seek(0, os.SEEK_END)
size = upload_file.file.tell()
upload_file.file.seek(0)
if size > MAX_ATTACHMENT_SIZE:
raise HTTPException(
400,
detail=f"Attachment exceeds size limit of {settings.EMAIL_MAX_UPLOAD_SIZE_MB} MB",
)
def _generate_stored_name(filename: str, subdir: str) -> str:
cleaned = Path(filename).name
unique = f"{uuid4().hex}_{cleaned}"
return f"{subdir}/{unique}"
def _resolve_attachment_path(stored_name: str) -> Path:
return UPLOAD_BASE_PATH / stored_name
def _store_upload_file(upload_file: UploadFile, subdir: str) -> Tuple[str, int]:
_validate_attachment(upload_file)
stored_name = _generate_stored_name(upload_file.filename, subdir)
destination = _resolve_attachment_path(stored_name)
destination.parent.mkdir(parents=True, exist_ok=True)
upload_file.file.seek(0)
with destination.open("wb") as buffer:
shutil.copyfileobj(upload_file.file, buffer)
return stored_name, destination.stat().st_size
2026-01-28 07:48:10 +01:00
class PipelineStageBase(BaseModel):
name: str
description: Optional[str] = None
sort_order: int = 0
default_probability: int = 0
color: Optional[str] = "#0f4c75"
is_won: bool = False
is_lost: bool = False
is_active: bool = True
class PipelineStageCreate(PipelineStageBase):
pass
class PipelineStageUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
sort_order: Optional[int] = None
default_probability: Optional[int] = None
color: Optional[str] = None
is_won: Optional[bool] = None
is_lost: Optional[bool] = None
is_active: Optional[bool] = None
class OpportunityBase(BaseModel):
customer_id: int
title: str
description: Optional[str] = None
amount: Optional[float] = 0
currency: Optional[str] = "DKK"
expected_close_date: Optional[date] = None
stage_id: Optional[int] = None
owner_user_id: Optional[int] = None
class OpportunityCreate(OpportunityBase):
pass
class OpportunityUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
amount: Optional[float] = None
currency: Optional[str] = None
expected_close_date: Optional[date] = None
stage_id: Optional[int] = None
owner_user_id: Optional[int] = None
is_active: Optional[bool] = None
class OpportunityStageUpdate(BaseModel):
stage_id: int
note: Optional[str] = None
user_id: Optional[int] = None
class OpportunityLineBase(BaseModel):
name: str
quantity: int = 1
unit_price: float = 0.0
product_number: Optional[str] = None
description: Optional[str] = None
class OpportunityLineCreate(OpportunityLineBase):
pass
class OpportunityCommentBase(BaseModel):
content: str
author_name: Optional[str] = None
user_id: Optional[int] = None
email_id: Optional[int] = None
contract_number: Optional[str] = None
contract_context: Optional[str] = None
contract_link: Optional[str] = None
metadata: Optional[Dict] = None
class OpportunityCommentCreate(OpportunityCommentBase):
pass
2026-01-29 00:36:32 +01:00
class OpportunityCommentAttachment(BaseModel):
id: int
filename: str
content_type: Optional[str] = None
size_bytes: Optional[int] = None
created_at: datetime
download_url: str
class OpportunityEmailAttachment(BaseModel):
id: int
filename: str
content_type: Optional[str] = None
size_bytes: Optional[int] = None
created_at: datetime
download_url: str
class OpportunityContractFile(BaseModel):
id: int
filename: str
content_type: Optional[str] = None
size_bytes: Optional[int] = None
created_at: datetime
download_url: str
class OpportunityCommentResponse(BaseModel):
id: int
opportunity_id: int
content: str
author_name: Optional[str] = None
user_id: Optional[int] = None
user_full_name: Optional[str] = None
username: Optional[str] = None
email_id: Optional[int] = None
email_subject: Optional[str] = None
email_sender: Optional[str] = None
contract_number: Optional[str] = None
contract_context: Optional[str] = None
contract_link: Optional[str] = None
metadata: Optional[Dict] = None
created_at: datetime
updated_at: datetime
2026-01-29 00:36:32 +01:00
attachments: List[OpportunityCommentAttachment] = []
email_attachments: List[OpportunityEmailAttachment] = []
2026-01-28 07:48:10 +01:00
def _get_stage(stage_id: int):
stage = execute_query_single(
"SELECT * FROM pipeline_stages WHERE id = %s AND is_active = TRUE",
(stage_id,)
)
if not stage:
raise HTTPException(status_code=404, detail="Stage not found")
return stage
def _get_default_stage():
stage = execute_query_single(
"SELECT * FROM pipeline_stages WHERE is_active = TRUE ORDER BY sort_order ASC LIMIT 1"
)
if not stage:
raise HTTPException(status_code=400, detail="No active stages configured")
return stage
def _get_opportunity(opportunity_id: int):
query = """
SELECT o.*, c.name AS customer_name,
s.name AS stage_name, s.color AS stage_color, s.is_won, s.is_lost
FROM pipeline_opportunities o
JOIN customers c ON c.id = o.customer_id
JOIN pipeline_stages s ON s.id = o.stage_id
WHERE o.id = %s
"""
opportunity = execute_query_single(query, (opportunity_id,))
if not opportunity:
raise HTTPException(status_code=404, detail="Opportunity not found")
2026-01-29 00:36:32 +01:00
# Fetch linked emails
email_query = """
SELECT e.id, e.subject, e.sender_email, e.received_date, e.body_text, e.body_html
FROM email_messages e
JOIN pipeline_opportunity_emails poe ON e.id = poe.email_id
WHERE poe.opportunity_id = %s
ORDER BY e.received_date DESC
"""
linked_emails = execute_query(email_query, (opportunity_id,))
opportunity["linked_emails"] = linked_emails or []
# Fetch linked contacts
contacts_query = """
SELECT c.id, c.first_name, c.last_name, c.email, c.phone, c.mobile_phone, poc.role
FROM contacts c
JOIN pipeline_opportunity_contacts poc ON c.id = poc.contact_id
WHERE poc.opportunity_id = %s
ORDER BY c.first_name, c.last_name
"""
linked_contacts = execute_query(contacts_query, (opportunity_id,))
opportunity["linked_contacts"] = linked_contacts or []
2026-01-28 07:48:10 +01:00
return opportunity
def _insert_stage_history(opportunity_id: int, from_stage_id: Optional[int], to_stage_id: int,
user_id: Optional[int] = None, note: Optional[str] = None):
execute_query(
"""
INSERT INTO pipeline_stage_history (opportunity_id, from_stage_id, to_stage_id, changed_by_user_id, note)
VALUES (%s, %s, %s, %s, %s)
""",
(opportunity_id, from_stage_id, to_stage_id, user_id, note)
)
def _fetch_opportunity_comments(opportunity_id: int):
query = """
SELECT c.*, u.full_name AS user_full_name, u.username,
em.subject AS email_subject, em.sender_email AS email_sender
FROM pipeline_opportunity_comments c
LEFT JOIN users u ON u.user_id = c.user_id
LEFT JOIN email_messages em ON em.id = c.email_id
WHERE c.opportunity_id = %s
ORDER BY c.created_at DESC
"""
2026-01-29 00:36:32 +01:00
comments = execute_query(query, (opportunity_id,)) or []
if not comments:
return []
comment_ids = [comment["id"] for comment in comments]
attachments_map = _fetch_comment_attachments_map(comment_ids)
email_ids = list({comment["email_id"] for comment in comments if comment.get("email_id")})
email_attachment_map = _fetch_email_attachments_map(email_ids)
for comment in comments:
comment["attachments"] = attachments_map.get(comment["id"], [])
if comment.get("email_id"):
comment["email_attachments"] = email_attachment_map.get(comment["email_id"], [])
else:
comment["email_attachments"] = []
return comments
def _fetch_comment(comment_id: int):
query = """
SELECT c.*, u.full_name AS user_full_name, u.username,
em.subject AS email_subject, em.sender_email AS email_sender
FROM pipeline_opportunity_comments c
LEFT JOIN users u ON u.user_id = c.user_id
LEFT JOIN email_messages em ON em.id = c.email_id
WHERE c.id = %s
"""
result = execute_query(query, (comment_id,))
2026-01-29 00:36:32 +01:00
if not result:
return None
comment = result[0]
attachments = _fetch_comment_attachments_map([comment_id])
comment["attachments"] = attachments.get(comment_id, [])
if comment.get("email_id"):
email_attachments = _fetch_email_attachments_map([comment["email_id"]])
comment["email_attachments"] = email_attachments.get(comment["email_id"], [])
else:
comment["email_attachments"] = []
return comment
def _comment_attachment_download_url(opportunity_id: int, attachment_id: int) -> str:
return f"/api/v1/opportunities/{opportunity_id}/comment-attachments/{attachment_id}"
def _email_attachment_download_url(email_id: int, attachment_id: int) -> str:
return f"/api/v1/emails/{email_id}/attachments/{attachment_id}"
def _fetch_comment_attachments_map(comment_ids: List[int]) -> Dict[int, List[Dict[str, Any]]]:
if not comment_ids:
return {}
query = """
SELECT a.id, a.comment_id, a.opportunity_id, a.filename, a.content_type, a.size_bytes, a.created_at
FROM pipeline_opportunity_comment_attachments a
WHERE a.comment_id = ANY(%s)
ORDER BY a.created_at DESC
"""
rows = execute_query(query, (comment_ids,)) or []
attachments_by_comment: Dict[int, List[Dict[str, Any]]] = {}
for row in rows:
attachments_by_comment.setdefault(row["comment_id"], []).append({
"id": row["id"],
"filename": row["filename"],
"content_type": row.get("content_type"),
"size_bytes": row.get("size_bytes"),
"created_at": row.get("created_at"),
"download_url": _comment_attachment_download_url(row["opportunity_id"], row["id"])
})
return attachments_by_comment
def _fetch_email_attachments_map(email_ids: List[int]) -> Dict[int, List[Dict[str, Any]]]:
if not email_ids:
return {}
query = """
SELECT id, email_id, filename, content_type, size_bytes, created_at
FROM email_attachments
WHERE email_id = ANY(%s)
ORDER BY id ASC
"""
rows = execute_query(query, (email_ids,)) or []
email_map: Dict[int, List[Dict[str, Any]]] = {}
for row in rows:
email_map.setdefault(row["email_id"], []).append({
"id": row["id"],
"filename": row["filename"],
"content_type": row.get("content_type"),
"size_bytes": row.get("size_bytes"),
"created_at": row.get("created_at"),
"download_url": _email_attachment_download_url(row["email_id"], row["id"])
})
return email_map
def _contract_file_download_url(opportunity_id: int, file_id: int) -> str:
return f"/api/v1/opportunities/{opportunity_id}/contract-files/{file_id}"
def _fetch_contract_files(opportunity_id: int) -> List[Dict[str, Any]]:
query = """
SELECT id, filename, content_type, size_bytes, stored_name, created_at
FROM pipeline_opportunity_contract_files
WHERE opportunity_id = %s
ORDER BY created_at DESC
"""
rows = execute_query(query, (opportunity_id,)) or []
return [
{
"id": row["id"],
"filename": row["filename"],
"content_type": row.get("content_type"),
"size_bytes": row.get("size_bytes"),
"created_at": row.get("created_at"),
"download_url": _contract_file_download_url(opportunity_id, row["id"]),
}
for row in rows
]
def _save_contract_files(opportunity_id: int, files: List[UploadFile], uploaded_by_user_id: Optional[int] = None) -> List[Dict[str, Any]]:
if not files:
return []
insert_query = """
INSERT INTO pipeline_opportunity_contract_files
(opportunity_id, filename, content_type, size_bytes, stored_name, uploaded_by_user_id)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id, filename, content_type, size_bytes, created_at
"""
saved_files = []
for upload_file in files:
if not upload_file or not upload_file.filename:
continue
stored_name, size_bytes = _store_upload_file(upload_file, CONTRACT_ATTACHMENT_SUBDIR)
result = execute_query(
insert_query,
(
opportunity_id,
upload_file.filename,
upload_file.content_type,
size_bytes,
stored_name,
uploaded_by_user_id,
)
)
if result:
saved = result[0]
saved_files.append({
"id": saved["id"],
"filename": saved["filename"],
"content_type": saved.get("content_type"),
"size_bytes": saved.get("size_bytes"),
"created_at": saved.get("created_at"),
"download_url": _contract_file_download_url(opportunity_id, saved["id"]),
})
return saved_files
def _save_comment_attachments(opportunity_id: int, comment_id: int, files: List[UploadFile], uploaded_by_user_id: Optional[int] = None) -> None:
if not files:
return
insert_query = """
INSERT INTO pipeline_opportunity_comment_attachments
(opportunity_id, comment_id, filename, content_type, size_bytes, stored_name, uploaded_by_user_id)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
for upload_file in files:
if not upload_file or not upload_file.filename:
continue
stored_name, size_bytes = _store_upload_file(upload_file, COMMENT_ATTACHMENT_SUBDIR)
execute_query(
insert_query,
(
opportunity_id,
comment_id,
upload_file.filename,
upload_file.content_type,
size_bytes,
stored_name,
uploaded_by_user_id,
)
)
2026-01-28 07:48:10 +01:00
# ============================
# Pipeline Stages
# ============================
@router.get("/pipeline/stages", tags=["Pipeline Stages"])
async def list_stages():
query = "SELECT * FROM pipeline_stages WHERE is_active = TRUE ORDER BY sort_order ASC"
return execute_query(query) or []
@router.post("/pipeline/stages", tags=["Pipeline Stages"])
async def create_stage(stage: PipelineStageCreate):
query = """
INSERT INTO pipeline_stages
(name, description, sort_order, default_probability, color, is_won, is_lost, is_active)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING *
"""
result = execute_query(query, (
stage.name,
stage.description,
stage.sort_order,
stage.default_probability,
stage.color,
stage.is_won,
stage.is_lost,
stage.is_active
))
logger.info("✅ Created pipeline stage: %s", stage.name)
return result[0] if result else None
@router.put("/pipeline/stages/{stage_id}", tags=["Pipeline Stages"])
async def update_stage(stage_id: int, stage: PipelineStageUpdate):
updates = []
params = []
for field, value in stage.dict(exclude_unset=True).items():
updates.append(f"{field} = %s")
params.append(value)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
params.append(stage_id)
query = f"UPDATE pipeline_stages SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s RETURNING *"
result = execute_query(query, tuple(params))
if not result:
raise HTTPException(status_code=404, detail="Stage not found")
logger.info("✅ Updated pipeline stage: %s", stage_id)
return result[0]
@router.delete("/pipeline/stages/{stage_id}", tags=["Pipeline Stages"])
async def deactivate_stage(stage_id: int):
affected = execute_update(
"UPDATE pipeline_stages SET is_active = FALSE, updated_at = CURRENT_TIMESTAMP WHERE id = %s",
(stage_id,)
)
if not affected:
raise HTTPException(status_code=404, detail="Stage not found")
logger.info("⚠️ Deactivated pipeline stage: %s", stage_id)
return {"status": "success", "stage_id": stage_id}
# ============================
# Opportunities
# ============================
@router.get("/opportunities", tags=["Opportunities"])
async def list_opportunities(customer_id: Optional[int] = None, stage_id: Optional[int] = None):
query = """
SELECT o.*, c.name AS customer_name,
s.name AS stage_name, s.color AS stage_color, s.is_won, s.is_lost
FROM pipeline_opportunities o
JOIN customers c ON c.id = o.customer_id
JOIN pipeline_stages s ON s.id = o.stage_id
WHERE o.is_active = TRUE
"""
params: List = []
if customer_id is not None:
query += " AND o.customer_id = %s"
params.append(customer_id)
if stage_id is not None:
query += " AND o.stage_id = %s"
params.append(stage_id)
query += " ORDER BY o.updated_at DESC NULLS LAST, o.created_at DESC"
if params:
return execute_query(query, tuple(params)) or []
return execute_query(query) or []
@router.get("/opportunities/{opportunity_id}", tags=["Opportunities"])
async def get_opportunity(opportunity_id: int):
return _get_opportunity(opportunity_id)
@router.post("/opportunities", tags=["Opportunities"])
async def create_opportunity(opportunity: OpportunityCreate):
stage = _get_stage(opportunity.stage_id) if opportunity.stage_id else _get_default_stage()
probability = stage["default_probability"]
query = """
INSERT INTO pipeline_opportunities
(customer_id, title, description, amount, currency, expected_close_date, stage_id, probability, owner_user_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
"""
result = execute_query(query, (
opportunity.customer_id,
opportunity.title,
opportunity.description,
opportunity.amount or 0,
opportunity.currency or "DKK",
opportunity.expected_close_date,
stage["id"],
probability,
opportunity.owner_user_id
))
if not result:
raise HTTPException(status_code=500, detail="Failed to create opportunity")
opportunity_id = result[0]["id"]
_insert_stage_history(opportunity_id, None, stage["id"], opportunity.owner_user_id, "Oprettet")
logger.info("✅ Created opportunity %s", opportunity_id)
return _get_opportunity(opportunity_id)
@router.put("/opportunities/{opportunity_id}", tags=["Opportunities"])
async def update_opportunity(opportunity_id: int, update: OpportunityUpdate):
existing = _get_opportunity(opportunity_id)
updates = []
params = []
update_dict = update.dict(exclude_unset=True)
stage_changed = False
new_stage = None
if "stage_id" in update_dict:
new_stage = _get_stage(update_dict["stage_id"])
update_dict["probability"] = new_stage["default_probability"]
stage_changed = new_stage["id"] != existing["stage_id"]
for field, value in update_dict.items():
updates.append(f"{field} = %s")
params.append(value)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
params.append(opportunity_id)
query = f"UPDATE pipeline_opportunities SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s"
execute_update(query, tuple(params))
if stage_changed and new_stage:
_insert_stage_history(opportunity_id, existing["stage_id"], new_stage["id"], update.owner_user_id, "Stage ændret")
updated = _get_opportunity(opportunity_id)
handle_stage_change(updated, new_stage)
return _get_opportunity(opportunity_id)
@router.patch("/opportunities/{opportunity_id}/stage", tags=["Opportunities"])
async def update_opportunity_stage(opportunity_id: int, update: OpportunityStageUpdate):
existing = _get_opportunity(opportunity_id)
new_stage = _get_stage(update.stage_id)
execute_update(
"""
UPDATE pipeline_opportunities
SET stage_id = %s,
probability = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""",
(new_stage["id"], new_stage["default_probability"], opportunity_id)
)
_insert_stage_history(opportunity_id, existing["stage_id"], new_stage["id"], update.user_id, update.note)
updated = _get_opportunity(opportunity_id)
handle_stage_change(updated, new_stage)
return updated
@router.get("/opportunities/{opportunity_id}/lines", tags=["Opportunities"])
async def list_opportunity_lines(opportunity_id: int):
query = """
SELECT id, opportunity_id, product_number, name, description, quantity, unit_price,
quantity * unit_price AS total_price
FROM pipeline_opportunity_lines
WHERE opportunity_id = %s
ORDER BY id ASC
"""
return execute_query(query, (opportunity_id,)) or []
@router.post("/opportunities/{opportunity_id}/lines", tags=["Opportunities"])
async def add_opportunity_line(opportunity_id: int, line: OpportunityLineCreate):
query = """
INSERT INTO pipeline_opportunity_lines
(opportunity_id, product_number, name, description, quantity, unit_price)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id, opportunity_id, product_number, name, description, quantity, unit_price,
quantity * unit_price AS total_price
"""
result = execute_query(
query,
(
opportunity_id,
line.product_number,
line.name,
line.description,
line.quantity,
line.unit_price
)
)
if not result:
raise HTTPException(status_code=500, detail="Failed to create line item")
return result[0]
@router.delete("/opportunities/{opportunity_id}/lines/{line_id}", tags=["Opportunities"])
async def remove_opportunity_line(opportunity_id: int, line_id: int):
query = """
DELETE FROM pipeline_opportunity_lines
WHERE opportunity_id = %s AND id = %s
RETURNING id
"""
result = execute_query(query, (opportunity_id, line_id))
if not result:
raise HTTPException(status_code=404, detail="Line item not found")
return {"success": True, "line_id": line_id}
@router.get(
"/opportunities/{opportunity_id}/comments",
response_model=List[OpportunityCommentResponse],
tags=["Opportunities"]
)
async def list_opportunity_comments(opportunity_id: int):
_get_opportunity(opportunity_id)
return _fetch_opportunity_comments(opportunity_id)
@router.post(
"/opportunities/{opportunity_id}/comments",
response_model=OpportunityCommentResponse,
tags=["Opportunities"]
)
2026-01-29 00:36:32 +01:00
async def add_opportunity_comment(
opportunity_id: int,
request: Request,
content: Optional[str] = Form(None),
author_name: Optional[str] = Form(None),
user_id: Optional[int] = Form(None),
email_id: Optional[int] = Form(None),
contract_number: Optional[str] = Form(None),
contract_context: Optional[str] = Form(None),
contract_link: Optional[str] = Form(None),
metadata: Optional[str] = Form(None),
files: Optional[List[UploadFile]] = File(None),
):
_get_opportunity(opportunity_id)
2026-01-29 00:36:32 +01:00
if request.headers.get("content-type", "").startswith("application/json"):
payload: Dict[str, Any] = await request.json()
else:
payload = {
"content": content,
"author_name": author_name,
"user_id": user_id,
"email_id": email_id,
"contract_number": contract_number,
"contract_context": contract_context,
"contract_link": contract_link,
"metadata": metadata,
}
content_value = payload.get("content")
if not content_value:
raise HTTPException(status_code=400, detail="Kommentar er påkrævet")
resolved_author = payload.get("author_name") or 'Hub Bruger'
resolved_user_id = payload.get("user_id")
if isinstance(resolved_user_id, str):
try:
resolved_user_id = int(resolved_user_id)
except ValueError:
resolved_user_id = None
resolved_email_id = payload.get("email_id")
if isinstance(resolved_email_id, str):
try:
resolved_email_id = int(resolved_email_id)
except ValueError:
resolved_email_id = None
metadata_payload = payload.get("metadata")
metadata_obj = None
if metadata_payload:
if isinstance(metadata_payload, str):
try:
metadata_obj = json.loads(metadata_payload)
except json.JSONDecodeError:
metadata_obj = None
elif isinstance(metadata_payload, dict):
metadata_obj = metadata_payload
metadata_json = json.dumps(metadata_obj) if metadata_obj else None
query = """
INSERT INTO pipeline_opportunity_comments
(opportunity_id, user_id, author_name, content, email_id,
contract_number, contract_context, contract_link, metadata)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
"""
result = execute_query(
query,
(
opportunity_id,
2026-01-29 00:36:32 +01:00
resolved_user_id,
resolved_author,
content_value,
resolved_email_id,
payload.get("contract_number"),
payload.get("contract_context"),
payload.get("contract_link"),
metadata_json
)
)
if not result:
raise HTTPException(status_code=500, detail="Kunne ikke oprette kommentar")
comment_id = result[0]["id"]
2026-01-29 00:36:32 +01:00
attachment_files = files or []
if attachment_files:
_save_comment_attachments(opportunity_id, comment_id, attachment_files, resolved_user_id)
return _fetch_comment(comment_id)
2026-01-29 00:36:32 +01:00
@router.get("/opportunities/{opportunity_id}/comment-attachments/{attachment_id}", tags=["Opportunities"])
async def download_comment_attachment(opportunity_id: int, attachment_id: int):
query = """
SELECT * FROM pipeline_opportunity_comment_attachments
WHERE id = %s AND opportunity_id = %s
"""
result = execute_query(query, (attachment_id, opportunity_id))
if not result:
raise HTTPException(status_code=404, detail="Vedhæftet fil ikke fundet")
attachment = result[0]
stored_name = attachment.get("stored_name")
if not stored_name:
raise HTTPException(status_code=500, detail="Vedhæftet fil mangler sti")
file_path = _resolve_attachment_path(stored_name)
if not file_path.exists():
raise HTTPException(status_code=404, detail="Filen findes ikke på serveren")
return FileResponse(
path=file_path,
filename=attachment.get("filename"),
media_type=attachment.get("content_type") or "application/octet-stream"
)
@router.get(
"/contracts/search",
tags=["Opportunities"],
response_model=List[Dict]
)
async def search_contracts(query: str = Query(..., min_length=2), limit: int = Query(10, ge=1, le=50)):
sql = """
SELECT contract_number,
MAX(created_at) AS last_seen,
COUNT(*) AS hits
FROM extraction_lines
WHERE contract_number IS NOT NULL
AND contract_number <> ''
AND contract_number ILIKE %s
GROUP BY contract_number
ORDER BY MAX(created_at) DESC
LIMIT %s
"""
params = (f"%{query}%", limit)
results = execute_query(sql, params)
return results or []
2026-01-29 00:36:32 +01:00
@router.get(
"/opportunities/{opportunity_id}/contract-files",
tags=["Opportunities"],
response_model=List[OpportunityContractFile]
)
async def list_contract_files(opportunity_id: int):
_get_opportunity(opportunity_id)
return _fetch_contract_files(opportunity_id)
@router.post(
"/opportunities/{opportunity_id}/contract-files",
tags=["Opportunities"],
response_model=List[OpportunityContractFile]
)
async def upload_contract_files(opportunity_id: int, files: List[UploadFile] = File(...)):
_get_opportunity(opportunity_id)
if not files:
raise HTTPException(status_code=400, detail="Ingen filer at uploade")
saved = _save_contract_files(opportunity_id, files)
if not saved:
raise HTTPException(status_code=500, detail="Kunne ikke gemme filer")
return saved
@router.get(
"/opportunities/{opportunity_id}/contract-files/{file_id}",
tags=["Opportunities"]
)
async def download_contract_file(opportunity_id: int, file_id: int):
query = """
SELECT * FROM pipeline_opportunity_contract_files
WHERE id = %s AND opportunity_id = %s
"""
result = execute_query(query, (file_id, opportunity_id))
if not result:
raise HTTPException(status_code=404, detail="Filen ikke fundet")
row = result[0]
stored_name = row.get("stored_name")
if not stored_name:
raise HTTPException(status_code=500, detail="Filen mangler lagring")
path = _resolve_attachment_path(stored_name)
if not path.exists():
raise HTTPException(status_code=404, detail="Filen findes ikke på serveren")
return FileResponse(
path=path,
filename=row.get("filename"),
media_type=row.get("content_type") or "application/octet-stream",
headers={"Content-Disposition": f"inline; filename=\"{row.get('filename')}\""}
)