2026-01-29 00:36:32 +01:00
2026-01-28 07:48:10 +01:00
"""
Opportunities ( Pipeline ) Router
Hub - local sales pipeline
"""
2026-01-29 00:36:32 +01:00
from pathlib import Path
from uuid import uuid4
from fastapi import APIRouter , HTTPException , Query , UploadFile , File , Form , Request
from fastapi . responses import FileResponse
2026-01-28 07:48:10 +01:00
from pydantic import BaseModel
2026-01-29 00:36:32 +01:00
from typing import Optional , List , Dict , Any , Tuple
2026-01-28 14:37:47 +01:00
from datetime import date , datetime
import json
2026-01-28 07:48:10 +01:00
import logging
2026-01-29 00:36:32 +01:00
import os
import shutil
2026-01-28 07:48:10 +01:00
2026-01-29 23:07:33 +01:00
import psycopg2
2026-01-29 00:36:32 +01:00
from app . core . config import settings
2026-01-28 07:48:10 +01:00
from app . core . database import execute_query , execute_query_single , execute_update
from app . services . opportunity_service import handle_stage_change
2026-01-29 00:36:32 +01:00
from app . services . email_service import EmailService
import email
from email . header import decode_header
try :
import extract_msg
except ImportError :
extract_msg = None
2026-01-28 07:48:10 +01:00
logger = logging . getLogger ( __name__ )
router = APIRouter ( )
2026-01-29 23:07:33 +01:00
def _is_undefined_table_error ( exc : Exception ) - > bool :
# Postgres undefined_table SQLSTATE
if getattr ( exc , " pgcode " , None ) == " 42P01 " :
return True
undefined_table = getattr ( psycopg2 , " errors " , None )
if undefined_table is not None :
try :
return isinstance ( exc , psycopg2 . errors . UndefinedTable )
except Exception :
return False
return False
2026-01-29 00:36:32 +01:00
@router.post ( " /opportunities/ {opportunity_id} /email-links " , tags = [ " Opportunities " ] )
async def add_opportunity_email_link ( opportunity_id : int , payload : dict ) :
""" Add a linked email to an opportunity """
email_id = payload . get ( " email_id " )
if not email_id or not isinstance ( email_id , int ) :
raise HTTPException ( status_code = 400 , detail = " Invalid email_id " )
try :
_get_opportunity ( opportunity_id )
except HTTPException :
raise HTTPException ( status_code = 404 , detail = " Opportunity not found " )
try :
execute_query (
" INSERT INTO pipeline_opportunity_emails (opportunity_id, email_id) VALUES ( %s , %s ) ON CONFLICT DO NOTHING " ,
( opportunity_id , email_id )
)
except Exception as e :
2026-01-29 23:07:33 +01:00
if _is_undefined_table_error ( e ) :
raise HTTPException (
status_code = 409 ,
detail = " Database migration required: run 032_opportunity_emails_m2m.sql " ,
)
2026-01-29 00:36:32 +01:00
logger . error ( f " Failed to add email link: { e } " )
raise HTTPException ( status_code = 500 , detail = " Kunne ikke tilføje email-link " )
return _get_opportunity ( opportunity_id )
@router.delete ( " /opportunities/ {opportunity_id} /email-links/ {email_id} " , tags = [ " Opportunities " ] )
async def remove_opportunity_email_link ( opportunity_id : int , email_id : int ) :
""" Remove a linked email from an opportunity """
try :
execute_query (
" DELETE FROM pipeline_opportunity_emails WHERE opportunity_id = %s AND email_id = %s " ,
( opportunity_id , email_id )
)
except Exception as e :
2026-01-29 23:07:33 +01:00
if _is_undefined_table_error ( e ) :
raise HTTPException (
status_code = 409 ,
detail = " Database migration required: run 032_opportunity_emails_m2m.sql " ,
)
2026-01-29 00:36:32 +01:00
logger . error ( f " Failed to remove email link: { e } " )
raise HTTPException ( status_code = 500 , detail = " Kunne ikke fjerne email-link " )
return { " success " : True }
@router.patch ( " /opportunities/ {opportunity_id} /email-link " , tags = [ " Opportunities " ] )
async def update_opportunity_email_link ( opportunity_id : int , payload : dict ) :
""" Legacy endpoint: Update the linked email (single) -> Redirects to add link """
# For backward compatibility, we treat this as "add link" for now
return await add_opportunity_email_link ( opportunity_id , payload )
def _decode_header_str ( header_val ) :
if not header_val :
return " "
try :
decoded_list = decode_header ( header_val )
result = " "
for content , encoding in decoded_list :
if isinstance ( content , bytes ) :
if encoding :
try :
result + = content . decode ( encoding )
except LookupError :
result + = content . decode ( ' utf-8 ' , errors = ' ignore ' )
except Exception :
result + = content . decode ( ' utf-8 ' , errors = ' ignore ' )
else :
result + = content . decode ( ' utf-8 ' , errors = ' ignore ' )
else :
result + = str ( content )
return result
except Exception :
return str ( header_val )
async def _process_uploaded_email ( file : UploadFile , opportunity_id : int ) - > dict :
content = await file . read ( )
filename = file . filename . lower ( )
email_data = { }
# Generate a unique message ID if one doesn't exist to prevent collisions/logic errors
temp_id = str ( uuid4 ( ) )
if filename . endswith ( ' .msg ' ) :
if not extract_msg :
raise HTTPException ( status_code = 500 , detail = " Library ' extract-msg ' not installed " )
# extract-msg needs a file-like object or path. BytesIO works.
import io
msg = extract_msg . Message ( io . BytesIO ( content ) )
# Map fields
email_data = {
' message_id ' : msg . messageId or f " msg- { temp_id } " ,
' subject ' : msg . subject or " No Subject " ,
' sender_email ' : msg . sender or " " ,
' sender_name ' : msg . sender or " " , # msg.sender is often "Name <email>" or just email
' recipient_email ' : msg . to or " " ,
' cc ' : msg . cc or " " ,
' body_text ' : msg . body ,
' body_html ' : msg . htmlBody , # might be None
' received_date ' : msg . date or datetime . now ( ) ,
' folder ' : ' Imported ' ,
' attachment_count ' : len ( msg . attachments ) ,
' has_attachments ' : len ( msg . attachments ) > 0 ,
' attachments ' : [ ]
}
# Handle msg attachments (simplified, might need more work for full fidelity)
for att in msg . attachments :
# Binary attachments in msg
if hasattr ( att , ' data ' ) :
email_data [ ' attachments ' ] . append ( {
' filename ' : att . longFilename or att . shortFilename or ' attachment ' ,
' content ' : att . data ,
' size ' : len ( att . data ) ,
' content_type ' : ' application/octet-stream '
} )
elif filename . endswith ( ' .eml ' ) :
msg = email . message_from_bytes ( content )
# Helper to get body
body_text = " "
body_html = " "
if msg . is_multipart ( ) :
for part in msg . walk ( ) :
ctype = part . get_content_type ( )
if ctype == " text/plain " and not body_text :
body_text = part . get_payload ( decode = True ) . decode ( ' utf-8 ' , errors = ' ignore ' )
elif ctype == " text/html " and not body_html :
body_html = part . get_payload ( decode = True ) . decode ( ' utf-8 ' , errors = ' ignore ' )
else :
body_text = msg . get_payload ( decode = True ) . decode ( ' utf-8 ' , errors = ' ignore ' )
# Attachments
attachments = [ ]
for part in msg . walk ( ) :
if part . get_content_maintype ( ) == ' multipart ' : continue
if part . get_content_type ( ) in [ ' text/plain ' , ' text/html ' ] : continue
fname = part . get_filename ( )
if fname :
payload = part . get_payload ( decode = True )
if payload :
attachments . append ( {
' filename ' : _decode_header_str ( fname ) ,
' content ' : payload ,
' size ' : len ( payload ) ,
' content_type ' : part . get_content_type ( )
} )
email_data = {
' message_id ' : msg . get ( ' Message-ID ' , f " eml- { temp_id } " ) ,
' subject ' : _decode_header_str ( msg . get ( ' Subject ' , ' No Subject ' ) ) ,
' sender_email ' : _decode_header_str ( msg . get ( ' From ' , ' ' ) ) ,
' sender_name ' : _decode_header_str ( msg . get ( ' From ' , ' ' ) ) ,
' recipient_email ' : _decode_header_str ( msg . get ( ' To ' , ' ' ) ) ,
' cc ' : _decode_header_str ( msg . get ( ' Cc ' , ' ' ) ) ,
' body_text ' : body_text ,
' body_html ' : body_html ,
' received_date ' : datetime . now ( ) , # EML date parsing is complex, default to now for import
' folder ' : ' Imported ' ,
' has_attachments ' : len ( attachments ) > 0 ,
' attachment_count ' : len ( attachments ) ,
' attachments ' : attachments
}
# Try parse date
if msg . get ( ' Date ' ) :
try :
from email . utils import parsedate_to_datetime
email_data [ ' received_date ' ] = parsedate_to_datetime ( msg . get ( ' Date ' ) )
except : pass
else :
raise HTTPException ( status_code = 400 , detail = " Unsupported file format. Use .eml or .msg " )
# Save via EmailService
svc = EmailService ( )
# Check if exists (by message_id)
# The svc.save_email method inserts. We might want to check for duplicates first?
# save_email returns id if new, or might fail?
# Actually email_messages table likely has unique constraint on message_id?
# Let's check save_email again. It does INSERT.
# We should search first.
# Simple check query
existing = execute_query_single ( " SELECT id FROM email_messages WHERE message_id = %s " , ( email_data [ ' message_id ' ] , ) )
if existing :
email_id = existing [ ' id ' ]
else :
email_id = await svc . save_email ( email_data )
if not email_id :
raise HTTPException ( status_code = 500 , detail = " Failed to save imported email " )
# Link to opportunity
try :
execute_query (
" INSERT INTO pipeline_opportunity_emails (opportunity_id, email_id) VALUES ( %s , %s ) ON CONFLICT DO NOTHING " ,
( opportunity_id , email_id )
)
except Exception as e :
logger . error ( f " Failed to link imported email: { e } " )
raise HTTPException ( status_code = 500 , detail = " Failed to link email " )
return _get_opportunity ( opportunity_id )
@router.post ( " /opportunities/ {opportunity_id} /upload-email " , tags = [ " Opportunities " ] )
async def upload_opportunity_email ( opportunity_id : int , file : UploadFile = File ( . . . ) ) :
""" Upload an .eml or .msg file and link it to the opportunity """
return await _process_uploaded_email ( file , opportunity_id )
@router.post ( " /opportunities/ {opportunity_id} /contacts " , tags = [ " Opportunities " ] )
async def add_opportunity_contact_link ( opportunity_id : int , payload : dict ) :
""" Link a contact to an opportunity """
contact_id = payload . get ( " contact_id " )
role = payload . get ( " role " )
if not contact_id :
raise HTTPException ( status_code = 400 , detail = " Invalid contact_id " )
try :
execute_query (
" INSERT INTO pipeline_opportunity_contacts (opportunity_id, contact_id, role) VALUES ( %s , %s , %s ) ON CONFLICT (opportunity_id, contact_id) DO UPDATE SET role = EXCLUDED.role " ,
( opportunity_id , contact_id , role )
)
except Exception as e :
logger . error ( f " Failed to add contact link: { e } " )
raise HTTPException ( status_code = 500 , detail = " Kunne ikke tilføje kontaktperson " )
return _get_opportunity ( opportunity_id )
@router.delete ( " /opportunities/ {opportunity_id} /contacts/ {contact_id} " , tags = [ " Opportunities " ] )
async def remove_opportunity_contact_link ( opportunity_id : int , contact_id : int ) :
""" Remove a linked contact from an opportunity """
try :
execute_query (
" DELETE FROM pipeline_opportunity_contacts WHERE opportunity_id = %s AND contact_id = %s " ,
( opportunity_id , contact_id )
)
except Exception as e :
logger . error ( f " Failed to remove contact link: { e } " )
raise HTTPException ( status_code = 500 , detail = " Kunne ikke fjerne kontaktperson " )
return _get_opportunity ( opportunity_id )
UPLOAD_BASE_PATH = Path ( settings . UPLOAD_DIR ) . resolve ( )
COMMENT_ATTACHMENT_SUBDIR = " opportunity_comments "
CONTRACT_ATTACHMENT_SUBDIR = " opportunity_contract_files "
for subdir in ( COMMENT_ATTACHMENT_SUBDIR , CONTRACT_ATTACHMENT_SUBDIR ) :
( UPLOAD_BASE_PATH / subdir ) . mkdir ( parents = True , exist_ok = True )
ALLOWED_EXTENSIONS = { ext . lower ( ) for ext in settings . ALLOWED_EXTENSIONS }
MAX_ATTACHMENT_SIZE = settings . EMAIL_MAX_UPLOAD_SIZE_MB * 1024 * 1024
def _is_attachment_allowed ( filename : str ) - > bool :
extension = Path ( filename ) . suffix . lower ( ) . lstrip ( " . " )
return extension in ALLOWED_EXTENSIONS
def _validate_attachment ( upload_file : UploadFile ) - > None :
if not _is_attachment_allowed ( upload_file . filename ) :
raise HTTPException ( 400 , detail = " Unsupported attachment type " )
upload_file . file . seek ( 0 , os . SEEK_END )
size = upload_file . file . tell ( )
upload_file . file . seek ( 0 )
if size > MAX_ATTACHMENT_SIZE :
raise HTTPException (
400 ,
detail = f " Attachment exceeds size limit of { settings . EMAIL_MAX_UPLOAD_SIZE_MB } MB " ,
)
def _generate_stored_name ( filename : str , subdir : str ) - > str :
cleaned = Path ( filename ) . name
unique = f " { uuid4 ( ) . hex } _ { cleaned } "
return f " { subdir } / { unique } "
def _resolve_attachment_path ( stored_name : str ) - > Path :
return UPLOAD_BASE_PATH / stored_name
def _store_upload_file ( upload_file : UploadFile , subdir : str ) - > Tuple [ str , int ] :
_validate_attachment ( upload_file )
stored_name = _generate_stored_name ( upload_file . filename , subdir )
destination = _resolve_attachment_path ( stored_name )
destination . parent . mkdir ( parents = True , exist_ok = True )
upload_file . file . seek ( 0 )
2026-01-29 23:07:33 +01:00
try :
with destination . open ( " wb " ) as buffer :
shutil . copyfileobj ( upload_file . file , buffer )
except PermissionError as e :
logger . error (
" ❌ Upload permission denied: %s (base= %s , subdir= %s ) " ,
str ( destination ) ,
str ( UPLOAD_BASE_PATH ) ,
subdir ,
)
raise HTTPException (
status_code = 500 ,
detail = (
" Upload directory is not writable. Fix permissions for the host-mounted ' uploads ' folder "
" (e.g. /srv/podman/bmc_hub_v1.0/uploads) and restart the API container. "
) ,
) from e
2026-01-29 00:36:32 +01:00
return stored_name , destination . stat ( ) . st_size
2026-01-28 07:48:10 +01:00
class PipelineStageBase ( BaseModel ) :
name : str
description : Optional [ str ] = None
sort_order : int = 0
default_probability : int = 0
color : Optional [ str ] = " #0f4c75 "
is_won : bool = False
is_lost : bool = False
is_active : bool = True
class PipelineStageCreate ( PipelineStageBase ) :
pass
class PipelineStageUpdate ( BaseModel ) :
name : Optional [ str ] = None
description : Optional [ str ] = None
sort_order : Optional [ int ] = None
default_probability : Optional [ int ] = None
color : Optional [ str ] = None
is_won : Optional [ bool ] = None
is_lost : Optional [ bool ] = None
is_active : Optional [ bool ] = None
class OpportunityBase ( BaseModel ) :
customer_id : int
title : str
description : Optional [ str ] = None
amount : Optional [ float ] = 0
currency : Optional [ str ] = " DKK "
expected_close_date : Optional [ date ] = None
stage_id : Optional [ int ] = None
owner_user_id : Optional [ int ] = None
class OpportunityCreate ( OpportunityBase ) :
pass
class OpportunityUpdate ( BaseModel ) :
title : Optional [ str ] = None
description : Optional [ str ] = None
amount : Optional [ float ] = None
currency : Optional [ str ] = None
expected_close_date : Optional [ date ] = None
stage_id : Optional [ int ] = None
owner_user_id : Optional [ int ] = None
is_active : Optional [ bool ] = None
class OpportunityStageUpdate ( BaseModel ) :
stage_id : int
note : Optional [ str ] = None
user_id : Optional [ int ] = None
2026-01-28 14:37:47 +01:00
class OpportunityLineBase ( BaseModel ) :
name : str
quantity : int = 1
unit_price : float = 0.0
product_number : Optional [ str ] = None
description : Optional [ str ] = None
class OpportunityLineCreate ( OpportunityLineBase ) :
pass
class OpportunityCommentBase ( BaseModel ) :
content : str
author_name : Optional [ str ] = None
user_id : Optional [ int ] = None
email_id : Optional [ int ] = None
contract_number : Optional [ str ] = None
contract_context : Optional [ str ] = None
contract_link : Optional [ str ] = None
metadata : Optional [ Dict ] = None
class OpportunityCommentCreate ( OpportunityCommentBase ) :
pass
2026-01-29 00:36:32 +01:00
class OpportunityCommentAttachment ( BaseModel ) :
id : int
filename : str
content_type : Optional [ str ] = None
size_bytes : Optional [ int ] = None
created_at : datetime
download_url : str
class OpportunityEmailAttachment ( BaseModel ) :
id : int
filename : str
content_type : Optional [ str ] = None
size_bytes : Optional [ int ] = None
created_at : datetime
download_url : str
class OpportunityContractFile ( BaseModel ) :
id : int
filename : str
content_type : Optional [ str ] = None
size_bytes : Optional [ int ] = None
created_at : datetime
download_url : str
2026-01-28 14:37:47 +01:00
class OpportunityCommentResponse ( BaseModel ) :
id : int
opportunity_id : int
content : str
author_name : Optional [ str ] = None
user_id : Optional [ int ] = None
user_full_name : Optional [ str ] = None
username : Optional [ str ] = None
email_id : Optional [ int ] = None
email_subject : Optional [ str ] = None
email_sender : Optional [ str ] = None
contract_number : Optional [ str ] = None
contract_context : Optional [ str ] = None
contract_link : Optional [ str ] = None
metadata : Optional [ Dict ] = None
created_at : datetime
updated_at : datetime
2026-01-29 00:36:32 +01:00
attachments : List [ OpportunityCommentAttachment ] = [ ]
email_attachments : List [ OpportunityEmailAttachment ] = [ ]
2026-01-28 14:37:47 +01:00
2026-01-28 07:48:10 +01:00
def _get_stage ( stage_id : int ) :
stage = execute_query_single (
" SELECT * FROM pipeline_stages WHERE id = %s AND is_active = TRUE " ,
( stage_id , )
)
if not stage :
raise HTTPException ( status_code = 404 , detail = " Stage not found " )
return stage
def _get_default_stage ( ) :
stage = execute_query_single (
" SELECT * FROM pipeline_stages WHERE is_active = TRUE ORDER BY sort_order ASC LIMIT 1 "
)
if not stage :
raise HTTPException ( status_code = 400 , detail = " No active stages configured " )
return stage
def _get_opportunity ( opportunity_id : int ) :
query = """
SELECT o . * , c . name AS customer_name ,
s . name AS stage_name , s . color AS stage_color , s . is_won , s . is_lost
FROM pipeline_opportunities o
JOIN customers c ON c . id = o . customer_id
JOIN pipeline_stages s ON s . id = o . stage_id
WHERE o . id = % s
"""
opportunity = execute_query_single ( query , ( opportunity_id , ) )
if not opportunity :
raise HTTPException ( status_code = 404 , detail = " Opportunity not found " )
2026-01-29 00:36:32 +01:00
# Fetch linked emails
email_query = """
SELECT e . id , e . subject , e . sender_email , e . received_date , e . body_text , e . body_html
FROM email_messages e
JOIN pipeline_opportunity_emails poe ON e . id = poe . email_id
WHERE poe . opportunity_id = % s
ORDER BY e . received_date DESC
"""
linked_emails = execute_query ( email_query , ( opportunity_id , ) )
2026-01-29 23:07:33 +01:00
try :
linked_emails = execute_query ( email_query , ( opportunity_id , ) )
except Exception as e :
if _is_undefined_table_error ( e ) :
logger . warning (
" ⚠️ Missing table pipeline_opportunity_emails; linked_emails disabled until migration 032_opportunity_emails_m2m.sql is applied "
)
linked_emails = [ ]
else :
raise
2026-01-29 00:36:32 +01:00
opportunity [ " linked_emails " ] = linked_emails or [ ]
# Fetch linked contacts
contacts_query = """
2026-01-29 00:47:40 +01:00
SELECT c . id , c . first_name , c . last_name , c . email , c . phone , c . mobile , poc . role
2026-01-29 00:36:32 +01:00
FROM contacts c
JOIN pipeline_opportunity_contacts poc ON c . id = poc . contact_id
WHERE poc . opportunity_id = % s
ORDER BY c . first_name , c . last_name
"""
linked_contacts = execute_query ( contacts_query , ( opportunity_id , ) )
2026-01-29 23:07:33 +01:00
try :
linked_contacts = execute_query ( contacts_query , ( opportunity_id , ) )
except Exception as e :
if _is_undefined_table_error ( e ) :
logger . warning (
" ⚠️ Missing table pipeline_opportunity_contacts; linked_contacts disabled until migration 033_opportunity_contacts.sql is applied "
)
linked_contacts = [ ]
else :
raise
2026-01-29 00:36:32 +01:00
opportunity [ " linked_contacts " ] = linked_contacts or [ ]
2026-01-28 07:48:10 +01:00
return opportunity
def _insert_stage_history ( opportunity_id : int , from_stage_id : Optional [ int ] , to_stage_id : int ,
user_id : Optional [ int ] = None , note : Optional [ str ] = None ) :
execute_query (
"""
INSERT INTO pipeline_stage_history ( opportunity_id , from_stage_id , to_stage_id , changed_by_user_id , note )
VALUES ( % s , % s , % s , % s , % s )
""" ,
( opportunity_id , from_stage_id , to_stage_id , user_id , note )
)
2026-01-28 14:37:47 +01:00
def _fetch_opportunity_comments ( opportunity_id : int ) :
query = """
SELECT c . * , u . full_name AS user_full_name , u . username ,
em . subject AS email_subject , em . sender_email AS email_sender
FROM pipeline_opportunity_comments c
LEFT JOIN users u ON u . user_id = c . user_id
LEFT JOIN email_messages em ON em . id = c . email_id
WHERE c . opportunity_id = % s
ORDER BY c . created_at DESC
"""
2026-01-29 00:36:32 +01:00
comments = execute_query ( query , ( opportunity_id , ) ) or [ ]
if not comments :
return [ ]
comment_ids = [ comment [ " id " ] for comment in comments ]
attachments_map = _fetch_comment_attachments_map ( comment_ids )
email_ids = list ( { comment [ " email_id " ] for comment in comments if comment . get ( " email_id " ) } )
email_attachment_map = _fetch_email_attachments_map ( email_ids )
for comment in comments :
comment [ " attachments " ] = attachments_map . get ( comment [ " id " ] , [ ] )
if comment . get ( " email_id " ) :
comment [ " email_attachments " ] = email_attachment_map . get ( comment [ " email_id " ] , [ ] )
else :
comment [ " email_attachments " ] = [ ]
return comments
2026-01-28 14:37:47 +01:00
def _fetch_comment ( comment_id : int ) :
query = """
SELECT c . * , u . full_name AS user_full_name , u . username ,
em . subject AS email_subject , em . sender_email AS email_sender
FROM pipeline_opportunity_comments c
LEFT JOIN users u ON u . user_id = c . user_id
LEFT JOIN email_messages em ON em . id = c . email_id
WHERE c . id = % s
"""
result = execute_query ( query , ( comment_id , ) )
2026-01-29 00:36:32 +01:00
if not result :
return None
comment = result [ 0 ]
attachments = _fetch_comment_attachments_map ( [ comment_id ] )
comment [ " attachments " ] = attachments . get ( comment_id , [ ] )
if comment . get ( " email_id " ) :
email_attachments = _fetch_email_attachments_map ( [ comment [ " email_id " ] ] )
comment [ " email_attachments " ] = email_attachments . get ( comment [ " email_id " ] , [ ] )
else :
comment [ " email_attachments " ] = [ ]
return comment
def _comment_attachment_download_url ( opportunity_id : int , attachment_id : int ) - > str :
return f " /api/v1/opportunities/ { opportunity_id } /comment-attachments/ { attachment_id } "
def _email_attachment_download_url ( email_id : int , attachment_id : int ) - > str :
return f " /api/v1/emails/ { email_id } /attachments/ { attachment_id } "
def _fetch_comment_attachments_map ( comment_ids : List [ int ] ) - > Dict [ int , List [ Dict [ str , Any ] ] ] :
if not comment_ids :
return { }
query = """
SELECT a . id , a . comment_id , a . opportunity_id , a . filename , a . content_type , a . size_bytes , a . created_at
FROM pipeline_opportunity_comment_attachments a
WHERE a . comment_id = ANY ( % s )
ORDER BY a . created_at DESC
"""
rows = execute_query ( query , ( comment_ids , ) ) or [ ]
attachments_by_comment : Dict [ int , List [ Dict [ str , Any ] ] ] = { }
for row in rows :
attachments_by_comment . setdefault ( row [ " comment_id " ] , [ ] ) . append ( {
" id " : row [ " id " ] ,
" filename " : row [ " filename " ] ,
" content_type " : row . get ( " content_type " ) ,
" size_bytes " : row . get ( " size_bytes " ) ,
" created_at " : row . get ( " created_at " ) ,
" download_url " : _comment_attachment_download_url ( row [ " opportunity_id " ] , row [ " id " ] )
} )
return attachments_by_comment
def _fetch_email_attachments_map ( email_ids : List [ int ] ) - > Dict [ int , List [ Dict [ str , Any ] ] ] :
if not email_ids :
return { }
query = """
SELECT id , email_id , filename , content_type , size_bytes , created_at
FROM email_attachments
WHERE email_id = ANY ( % s )
ORDER BY id ASC
"""
rows = execute_query ( query , ( email_ids , ) ) or [ ]
email_map : Dict [ int , List [ Dict [ str , Any ] ] ] = { }
for row in rows :
email_map . setdefault ( row [ " email_id " ] , [ ] ) . append ( {
" id " : row [ " id " ] ,
" filename " : row [ " filename " ] ,
" content_type " : row . get ( " content_type " ) ,
" size_bytes " : row . get ( " size_bytes " ) ,
" created_at " : row . get ( " created_at " ) ,
" download_url " : _email_attachment_download_url ( row [ " email_id " ] , row [ " id " ] )
} )
return email_map
def _contract_file_download_url ( opportunity_id : int , file_id : int ) - > str :
return f " /api/v1/opportunities/ { opportunity_id } /contract-files/ { file_id } "
def _fetch_contract_files ( opportunity_id : int ) - > List [ Dict [ str , Any ] ] :
query = """
SELECT id , filename , content_type , size_bytes , stored_name , created_at
FROM pipeline_opportunity_contract_files
WHERE opportunity_id = % s
ORDER BY created_at DESC
"""
rows = execute_query ( query , ( opportunity_id , ) ) or [ ]
return [
{
" id " : row [ " id " ] ,
" filename " : row [ " filename " ] ,
" content_type " : row . get ( " content_type " ) ,
" size_bytes " : row . get ( " size_bytes " ) ,
" created_at " : row . get ( " created_at " ) ,
" download_url " : _contract_file_download_url ( opportunity_id , row [ " id " ] ) ,
}
for row in rows
]
def _save_contract_files ( opportunity_id : int , files : List [ UploadFile ] , uploaded_by_user_id : Optional [ int ] = None ) - > List [ Dict [ str , Any ] ] :
if not files :
return [ ]
insert_query = """
INSERT INTO pipeline_opportunity_contract_files
( opportunity_id , filename , content_type , size_bytes , stored_name , uploaded_by_user_id )
VALUES ( % s , % s , % s , % s , % s , % s )
RETURNING id , filename , content_type , size_bytes , created_at
"""
saved_files = [ ]
for upload_file in files :
if not upload_file or not upload_file . filename :
continue
stored_name , size_bytes = _store_upload_file ( upload_file , CONTRACT_ATTACHMENT_SUBDIR )
result = execute_query (
insert_query ,
(
opportunity_id ,
upload_file . filename ,
upload_file . content_type ,
size_bytes ,
stored_name ,
uploaded_by_user_id ,
)
)
if result :
saved = result [ 0 ]
saved_files . append ( {
" id " : saved [ " id " ] ,
" filename " : saved [ " filename " ] ,
" content_type " : saved . get ( " content_type " ) ,
" size_bytes " : saved . get ( " size_bytes " ) ,
" created_at " : saved . get ( " created_at " ) ,
" download_url " : _contract_file_download_url ( opportunity_id , saved [ " id " ] ) ,
} )
return saved_files
def _save_comment_attachments ( opportunity_id : int , comment_id : int , files : List [ UploadFile ] , uploaded_by_user_id : Optional [ int ] = None ) - > None :
if not files :
return
insert_query = """
INSERT INTO pipeline_opportunity_comment_attachments
( opportunity_id , comment_id , filename , content_type , size_bytes , stored_name , uploaded_by_user_id )
VALUES ( % s , % s , % s , % s , % s , % s , % s )
"""
for upload_file in files :
if not upload_file or not upload_file . filename :
continue
stored_name , size_bytes = _store_upload_file ( upload_file , COMMENT_ATTACHMENT_SUBDIR )
execute_query (
insert_query ,
(
opportunity_id ,
comment_id ,
upload_file . filename ,
upload_file . content_type ,
size_bytes ,
stored_name ,
uploaded_by_user_id ,
)
)
2026-01-28 14:37:47 +01:00
2026-01-28 07:48:10 +01:00
# ============================
# Pipeline Stages
# ============================
@router.get ( " /pipeline/stages " , tags = [ " Pipeline Stages " ] )
async def list_stages ( ) :
query = " SELECT * FROM pipeline_stages WHERE is_active = TRUE ORDER BY sort_order ASC "
return execute_query ( query ) or [ ]
@router.post ( " /pipeline/stages " , tags = [ " Pipeline Stages " ] )
async def create_stage ( stage : PipelineStageCreate ) :
query = """
INSERT INTO pipeline_stages
( name , description , sort_order , default_probability , color , is_won , is_lost , is_active )
VALUES ( % s , % s , % s , % s , % s , % s , % s , % s )
RETURNING *
"""
result = execute_query ( query , (
stage . name ,
stage . description ,
stage . sort_order ,
stage . default_probability ,
stage . color ,
stage . is_won ,
stage . is_lost ,
stage . is_active
) )
logger . info ( " ✅ Created pipeline stage: %s " , stage . name )
return result [ 0 ] if result else None
@router.put ( " /pipeline/stages/ {stage_id} " , tags = [ " Pipeline Stages " ] )
async def update_stage ( stage_id : int , stage : PipelineStageUpdate ) :
updates = [ ]
params = [ ]
for field , value in stage . dict ( exclude_unset = True ) . items ( ) :
updates . append ( f " { field } = %s " )
params . append ( value )
if not updates :
raise HTTPException ( status_code = 400 , detail = " No fields to update " )
params . append ( stage_id )
query = f " UPDATE pipeline_stages SET { ' , ' . join ( updates ) } , updated_at = CURRENT_TIMESTAMP WHERE id = %s RETURNING * "
result = execute_query ( query , tuple ( params ) )
if not result :
raise HTTPException ( status_code = 404 , detail = " Stage not found " )
logger . info ( " ✅ Updated pipeline stage: %s " , stage_id )
return result [ 0 ]
@router.delete ( " /pipeline/stages/ {stage_id} " , tags = [ " Pipeline Stages " ] )
async def deactivate_stage ( stage_id : int ) :
affected = execute_update (
" UPDATE pipeline_stages SET is_active = FALSE, updated_at = CURRENT_TIMESTAMP WHERE id = %s " ,
( stage_id , )
)
if not affected :
raise HTTPException ( status_code = 404 , detail = " Stage not found " )
logger . info ( " ⚠️ Deactivated pipeline stage: %s " , stage_id )
return { " status " : " success " , " stage_id " : stage_id }
# ============================
# Opportunities
# ============================
@router.get ( " /opportunities " , tags = [ " Opportunities " ] )
async def list_opportunities ( customer_id : Optional [ int ] = None , stage_id : Optional [ int ] = None ) :
query = """
SELECT o . * , c . name AS customer_name ,
s . name AS stage_name , s . color AS stage_color , s . is_won , s . is_lost
FROM pipeline_opportunities o
JOIN customers c ON c . id = o . customer_id
JOIN pipeline_stages s ON s . id = o . stage_id
WHERE o . is_active = TRUE
"""
params : List = [ ]
if customer_id is not None :
query + = " AND o.customer_id = %s "
params . append ( customer_id )
if stage_id is not None :
query + = " AND o.stage_id = %s "
params . append ( stage_id )
query + = " ORDER BY o.updated_at DESC NULLS LAST, o.created_at DESC "
if params :
return execute_query ( query , tuple ( params ) ) or [ ]
return execute_query ( query ) or [ ]
@router.get ( " /opportunities/ {opportunity_id} " , tags = [ " Opportunities " ] )
async def get_opportunity ( opportunity_id : int ) :
return _get_opportunity ( opportunity_id )
@router.post ( " /opportunities " , tags = [ " Opportunities " ] )
async def create_opportunity ( opportunity : OpportunityCreate ) :
stage = _get_stage ( opportunity . stage_id ) if opportunity . stage_id else _get_default_stage ( )
probability = stage [ " default_probability " ]
query = """
INSERT INTO pipeline_opportunities
( customer_id , title , description , amount , currency , expected_close_date , stage_id , probability , owner_user_id )
VALUES ( % s , % s , % s , % s , % s , % s , % s , % s , % s )
RETURNING id
"""
result = execute_query ( query , (
opportunity . customer_id ,
opportunity . title ,
opportunity . description ,
opportunity . amount or 0 ,
opportunity . currency or " DKK " ,
opportunity . expected_close_date ,
stage [ " id " ] ,
probability ,
opportunity . owner_user_id
) )
if not result :
raise HTTPException ( status_code = 500 , detail = " Failed to create opportunity " )
opportunity_id = result [ 0 ] [ " id " ]
_insert_stage_history ( opportunity_id , None , stage [ " id " ] , opportunity . owner_user_id , " Oprettet " )
logger . info ( " ✅ Created opportunity %s " , opportunity_id )
return _get_opportunity ( opportunity_id )
@router.put ( " /opportunities/ {opportunity_id} " , tags = [ " Opportunities " ] )
async def update_opportunity ( opportunity_id : int , update : OpportunityUpdate ) :
existing = _get_opportunity ( opportunity_id )
updates = [ ]
params = [ ]
update_dict = update . dict ( exclude_unset = True )
stage_changed = False
new_stage = None
if " stage_id " in update_dict :
new_stage = _get_stage ( update_dict [ " stage_id " ] )
update_dict [ " probability " ] = new_stage [ " default_probability " ]
stage_changed = new_stage [ " id " ] != existing [ " stage_id " ]
for field , value in update_dict . items ( ) :
updates . append ( f " { field } = %s " )
params . append ( value )
if not updates :
raise HTTPException ( status_code = 400 , detail = " No fields to update " )
params . append ( opportunity_id )
query = f " UPDATE pipeline_opportunities SET { ' , ' . join ( updates ) } , updated_at = CURRENT_TIMESTAMP WHERE id = %s "
execute_update ( query , tuple ( params ) )
if stage_changed and new_stage :
_insert_stage_history ( opportunity_id , existing [ " stage_id " ] , new_stage [ " id " ] , update . owner_user_id , " Stage ændret " )
updated = _get_opportunity ( opportunity_id )
handle_stage_change ( updated , new_stage )
return _get_opportunity ( opportunity_id )
@router.patch ( " /opportunities/ {opportunity_id} /stage " , tags = [ " Opportunities " ] )
async def update_opportunity_stage ( opportunity_id : int , update : OpportunityStageUpdate ) :
existing = _get_opportunity ( opportunity_id )
new_stage = _get_stage ( update . stage_id )
execute_update (
"""
UPDATE pipeline_opportunities
SET stage_id = % s ,
probability = % s ,
updated_at = CURRENT_TIMESTAMP
WHERE id = % s
""" ,
( new_stage [ " id " ] , new_stage [ " default_probability " ] , opportunity_id )
)
_insert_stage_history ( opportunity_id , existing [ " stage_id " ] , new_stage [ " id " ] , update . user_id , update . note )
updated = _get_opportunity ( opportunity_id )
handle_stage_change ( updated , new_stage )
return updated
2026-01-28 14:37:47 +01:00
@router.get ( " /opportunities/ {opportunity_id} /lines " , tags = [ " Opportunities " ] )
async def list_opportunity_lines ( opportunity_id : int ) :
query = """
SELECT id , opportunity_id , product_number , name , description , quantity , unit_price ,
quantity * unit_price AS total_price
FROM pipeline_opportunity_lines
WHERE opportunity_id = % s
ORDER BY id ASC
"""
return execute_query ( query , ( opportunity_id , ) ) or [ ]
@router.post ( " /opportunities/ {opportunity_id} /lines " , tags = [ " Opportunities " ] )
async def add_opportunity_line ( opportunity_id : int , line : OpportunityLineCreate ) :
query = """
INSERT INTO pipeline_opportunity_lines
( opportunity_id , product_number , name , description , quantity , unit_price )
VALUES ( % s , % s , % s , % s , % s , % s )
RETURNING id , opportunity_id , product_number , name , description , quantity , unit_price ,
quantity * unit_price AS total_price
"""
result = execute_query (
query ,
(
opportunity_id ,
line . product_number ,
line . name ,
line . description ,
line . quantity ,
line . unit_price
)
)
if not result :
raise HTTPException ( status_code = 500 , detail = " Failed to create line item " )
return result [ 0 ]
@router.delete ( " /opportunities/ {opportunity_id} /lines/ {line_id} " , tags = [ " Opportunities " ] )
async def remove_opportunity_line ( opportunity_id : int , line_id : int ) :
query = """
DELETE FROM pipeline_opportunity_lines
WHERE opportunity_id = % s AND id = % s
RETURNING id
"""
result = execute_query ( query , ( opportunity_id , line_id ) )
if not result :
raise HTTPException ( status_code = 404 , detail = " Line item not found " )
return { " success " : True , " line_id " : line_id }
@router.get (
" /opportunities/ {opportunity_id} /comments " ,
response_model = List [ OpportunityCommentResponse ] ,
tags = [ " Opportunities " ]
)
async def list_opportunity_comments ( opportunity_id : int ) :
_get_opportunity ( opportunity_id )
return _fetch_opportunity_comments ( opportunity_id )
@router.post (
" /opportunities/ {opportunity_id} /comments " ,
response_model = OpportunityCommentResponse ,
tags = [ " Opportunities " ]
)
2026-01-29 00:36:32 +01:00
async def add_opportunity_comment (
opportunity_id : int ,
request : Request ,
content : Optional [ str ] = Form ( None ) ,
author_name : Optional [ str ] = Form ( None ) ,
user_id : Optional [ int ] = Form ( None ) ,
email_id : Optional [ int ] = Form ( None ) ,
contract_number : Optional [ str ] = Form ( None ) ,
contract_context : Optional [ str ] = Form ( None ) ,
contract_link : Optional [ str ] = Form ( None ) ,
metadata : Optional [ str ] = Form ( None ) ,
files : Optional [ List [ UploadFile ] ] = File ( None ) ,
) :
2026-01-28 14:37:47 +01:00
_get_opportunity ( opportunity_id )
2026-01-29 00:36:32 +01:00
if request . headers . get ( " content-type " , " " ) . startswith ( " application/json " ) :
payload : Dict [ str , Any ] = await request . json ( )
else :
payload = {
" content " : content ,
" author_name " : author_name ,
" user_id " : user_id ,
" email_id " : email_id ,
" contract_number " : contract_number ,
" contract_context " : contract_context ,
" contract_link " : contract_link ,
" metadata " : metadata ,
}
content_value = payload . get ( " content " )
if not content_value :
raise HTTPException ( status_code = 400 , detail = " Kommentar er påkrævet " )
resolved_author = payload . get ( " author_name " ) or ' Hub Bruger '
resolved_user_id = payload . get ( " user_id " )
if isinstance ( resolved_user_id , str ) :
try :
resolved_user_id = int ( resolved_user_id )
except ValueError :
resolved_user_id = None
resolved_email_id = payload . get ( " email_id " )
if isinstance ( resolved_email_id , str ) :
try :
resolved_email_id = int ( resolved_email_id )
except ValueError :
resolved_email_id = None
metadata_payload = payload . get ( " metadata " )
metadata_obj = None
if metadata_payload :
if isinstance ( metadata_payload , str ) :
try :
metadata_obj = json . loads ( metadata_payload )
except json . JSONDecodeError :
metadata_obj = None
elif isinstance ( metadata_payload , dict ) :
metadata_obj = metadata_payload
metadata_json = json . dumps ( metadata_obj ) if metadata_obj else None
2026-01-28 14:37:47 +01:00
query = """
INSERT INTO pipeline_opportunity_comments
( opportunity_id , user_id , author_name , content , email_id ,
contract_number , contract_context , contract_link , metadata )
VALUES ( % s , % s , % s , % s , % s , % s , % s , % s , % s )
RETURNING id
"""
result = execute_query (
query ,
(
opportunity_id ,
2026-01-29 00:36:32 +01:00
resolved_user_id ,
resolved_author ,
content_value ,
resolved_email_id ,
payload . get ( " contract_number " ) ,
payload . get ( " contract_context " ) ,
payload . get ( " contract_link " ) ,
2026-01-28 14:37:47 +01:00
metadata_json
)
)
if not result :
raise HTTPException ( status_code = 500 , detail = " Kunne ikke oprette kommentar " )
comment_id = result [ 0 ] [ " id " ]
2026-01-29 00:36:32 +01:00
attachment_files = files or [ ]
if attachment_files :
_save_comment_attachments ( opportunity_id , comment_id , attachment_files , resolved_user_id )
2026-01-28 14:37:47 +01:00
return _fetch_comment ( comment_id )
2026-01-29 00:36:32 +01:00
@router.get ( " /opportunities/ {opportunity_id} /comment-attachments/ {attachment_id} " , tags = [ " Opportunities " ] )
async def download_comment_attachment ( opportunity_id : int , attachment_id : int ) :
query = """
SELECT * FROM pipeline_opportunity_comment_attachments
WHERE id = % s AND opportunity_id = % s
"""
result = execute_query ( query , ( attachment_id , opportunity_id ) )
if not result :
raise HTTPException ( status_code = 404 , detail = " Vedhæftet fil ikke fundet " )
attachment = result [ 0 ]
stored_name = attachment . get ( " stored_name " )
if not stored_name :
raise HTTPException ( status_code = 500 , detail = " Vedhæftet fil mangler sti " )
file_path = _resolve_attachment_path ( stored_name )
if not file_path . exists ( ) :
raise HTTPException ( status_code = 404 , detail = " Filen findes ikke på serveren " )
return FileResponse (
path = file_path ,
filename = attachment . get ( " filename " ) ,
media_type = attachment . get ( " content_type " ) or " application/octet-stream "
)
2026-01-28 14:37:47 +01:00
@router.get (
" /contracts/search " ,
tags = [ " Opportunities " ] ,
response_model = List [ Dict ]
)
async def search_contracts ( query : str = Query ( . . . , min_length = 2 ) , limit : int = Query ( 10 , ge = 1 , le = 50 ) ) :
sql = """
SELECT contract_number ,
MAX ( created_at ) AS last_seen ,
COUNT ( * ) AS hits
FROM extraction_lines
WHERE contract_number IS NOT NULL
AND contract_number < > ' '
AND contract_number ILIKE % s
GROUP BY contract_number
ORDER BY MAX ( created_at ) DESC
LIMIT % s
"""
params = ( f " % { query } % " , limit )
results = execute_query ( sql , params )
return results or [ ]
2026-01-29 00:36:32 +01:00
@router.get (
" /opportunities/ {opportunity_id} /contract-files " ,
tags = [ " Opportunities " ] ,
response_model = List [ OpportunityContractFile ]
)
async def list_contract_files ( opportunity_id : int ) :
_get_opportunity ( opportunity_id )
return _fetch_contract_files ( opportunity_id )
@router.post (
" /opportunities/ {opportunity_id} /contract-files " ,
tags = [ " Opportunities " ] ,
response_model = List [ OpportunityContractFile ]
)
async def upload_contract_files ( opportunity_id : int , files : List [ UploadFile ] = File ( . . . ) ) :
_get_opportunity ( opportunity_id )
if not files :
raise HTTPException ( status_code = 400 , detail = " Ingen filer at uploade " )
saved = _save_contract_files ( opportunity_id , files )
if not saved :
raise HTTPException ( status_code = 500 , detail = " Kunne ikke gemme filer " )
return saved
@router.get (
" /opportunities/ {opportunity_id} /contract-files/ {file_id} " ,
tags = [ " Opportunities " ]
)
async def download_contract_file ( opportunity_id : int , file_id : int ) :
query = """
SELECT * FROM pipeline_opportunity_contract_files
WHERE id = % s AND opportunity_id = % s
"""
result = execute_query ( query , ( file_id , opportunity_id ) )
if not result :
raise HTTPException ( status_code = 404 , detail = " Filen ikke fundet " )
row = result [ 0 ]
stored_name = row . get ( " stored_name " )
if not stored_name :
raise HTTPException ( status_code = 500 , detail = " Filen mangler lagring " )
path = _resolve_attachment_path ( stored_name )
if not path . exists ( ) :
raise HTTPException ( status_code = 404 , detail = " Filen findes ikke på serveren " )
return FileResponse (
path = path ,
filename = row . get ( " filename " ) ,
media_type = row . get ( " content_type " ) or " application/octet-stream " ,
headers = { " Content-Disposition " : f " inline; filename= \" { row . get ( ' filename ' ) } \" " }
)