bmc_hub/app/ticket/backend/ticket_service.py
Christian f62cd8104a feat: Enhance time tracking with Hub Worklog integration and editing capabilities
- Added hub_customer_id to TModuleApprovalStats for better tracking.
- Introduced TModuleWizardEditRequest for editing time entries, allowing updates to description, hours, and billing method.
- Implemented approval and rejection logic for Hub Worklogs, including handling negative IDs.
- Created a new endpoint for updating entry details, supporting both Hub Worklogs and Module Times.
- Updated frontend to include an edit modal for time entries, with specific fields for Hub Worklogs and Module Times.
- Enhanced customer statistics retrieval to include pending counts from Hub Worklogs.
- Added migrations for ticket enhancements, including new fields and constraints for worklogs and prepaid cards.
2026-01-10 21:09:29 +01:00

528 lines
17 KiB
Python

"""
Ticket Service
==============
Business logic for ticket operations: status transitions, validation, audit logging.
"""
import logging
from datetime import datetime
from typing import Optional, Dict, Any, List
from decimal import Decimal
from app.core.database import execute_query, execute_insert, execute_update, execute_query_single
from app.ticket.backend.models import (
TicketStatus,
TicketPriority,
TTicketCreate,
TTicketUpdate,
TTicket
)
logger = logging.getLogger(__name__)
class TicketService:
"""Service for ticket business logic"""
# Status transition rules (which transitions are allowed)
ALLOWED_TRANSITIONS = {
TicketStatus.OPEN: [TicketStatus.IN_PROGRESS, TicketStatus.CLOSED],
TicketStatus.IN_PROGRESS: [
TicketStatus.WAITING_CUSTOMER,
TicketStatus.WAITING_INTERNAL,
TicketStatus.RESOLVED
],
TicketStatus.WAITING_CUSTOMER: [TicketStatus.IN_PROGRESS, TicketStatus.CLOSED],
TicketStatus.WAITING_INTERNAL: [TicketStatus.IN_PROGRESS, TicketStatus.CLOSED],
TicketStatus.RESOLVED: [TicketStatus.CLOSED, TicketStatus.IN_PROGRESS], # Can reopen
TicketStatus.CLOSED: [] # Cannot transition from closed
}
@staticmethod
def validate_status_transition(current_status: str, new_status: str) -> tuple[bool, Optional[str]]:
"""
Validate if status transition is allowed
Returns:
(is_valid, error_message)
"""
try:
current = TicketStatus(current_status)
new = TicketStatus(new_status)
except ValueError as e:
return False, f"Invalid status: {e}"
if current == new:
return True, None # Same status is OK
allowed = TicketService.ALLOWED_TRANSITIONS.get(current, [])
if new not in allowed:
return False, f"Cannot transition from {current.value} to {new.value}"
return True, None
@staticmethod
def create_ticket(
ticket_data: TTicketCreate,
user_id: Optional[int] = None
) -> Dict[str, Any]:
"""
Create a new ticket with auto-generated ticket_number if not provided
Args:
ticket_data: Ticket creation data
user_id: User creating the ticket
Returns:
Created ticket dict
"""
logger.info(f"🎫 Creating ticket: {ticket_data.subject}")
# Prepare data for PostgreSQL (convert Python types to SQL types)
import json
from psycopg2.extras import Json
# Insert ticket (trigger will auto-generate ticket_number if NULL)
result = execute_query_single(
"""
INSERT INTO tticket_tickets (
ticket_number, subject, description, status, priority, category,
customer_id, contact_id, assigned_to_user_id, created_by_user_id,
source, tags, custom_fields, ticket_type, internal_note
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
ticket_data.ticket_number,
ticket_data.subject,
ticket_data.description,
ticket_data.status.value,
ticket_data.priority.value,
ticket_data.category,
ticket_data.customer_id,
ticket_data.contact_id,
ticket_data.assigned_to_user_id,
user_id or ticket_data.created_by_user_id,
ticket_data.source.value,
ticket_data.tags or [], # PostgreSQL array
Json(ticket_data.custom_fields or {}), # PostgreSQL JSONB
ticket_data.ticket_type.value,
ticket_data.internal_note
)
)
if not result:
raise Exception("Failed to create ticket - no ID returned")
ticket_id = result['id']
# Log creation
TicketService.log_audit(
ticket_id=ticket_id,
entity_type="ticket",
entity_id=ticket_id,
user_id=user_id,
action="created",
details={"subject": ticket_data.subject, "status": ticket_data.status.value}
)
# Fetch created ticket
ticket = execute_query_single(
"SELECT * FROM tticket_tickets WHERE id = %s",
(ticket_id,))
logger.info(f"✅ Created ticket {ticket['ticket_number']} (ID: {ticket_id})")
return ticket
@staticmethod
def update_ticket(
ticket_id: int,
update_data: TTicketUpdate,
user_id: Optional[int] = None
) -> Dict[str, Any]:
"""
Update ticket with partial data
Args:
ticket_id: Ticket ID to update
update_data: Fields to update
user_id: User making the update
Returns:
Updated ticket dict
"""
# Get current ticket
current = execute_query_single(
"SELECT * FROM tticket_tickets WHERE id = %s",
(ticket_id,))
if not current:
raise ValueError(f"Ticket {ticket_id} not found")
# Build UPDATE query dynamically
import json
from psycopg2.extras import Json
updates = []
params = []
update_dict = update_data.model_dump(exclude_unset=True)
for field, value in update_dict.items():
# Convert enums to values
if hasattr(value, 'value'):
value = value.value
# Convert dict to JSON for PostgreSQL JSONB
elif field == 'custom_fields' and isinstance(value, dict):
value = Json(value)
# Arrays are handled automatically by psycopg2
updates.append(f"{field} = %s")
params.append(value)
if not updates:
logger.warning(f"No fields to update for ticket {ticket_id}")
return current
# Add ticket_id to params
params.append(ticket_id)
query = f"UPDATE tticket_tickets SET {', '.join(updates)} WHERE id = %s"
execute_update(query, tuple(params))
# Log update
TicketService.log_audit(
ticket_id=ticket_id,
entity_type="ticket",
entity_id=ticket_id,
user_id=user_id,
action="updated",
details=update_dict
)
# Fetch updated ticket
updated = execute_query_single(
"SELECT * FROM tticket_tickets WHERE id = %s",
(ticket_id,))
logger.info(f"✅ Updated ticket {updated['ticket_number']}")
return updated
@staticmethod
def update_ticket_status(
ticket_id: int,
new_status: str,
user_id: Optional[int] = None,
note: Optional[str] = None
) -> Dict[str, Any]:
"""
Update ticket status with validation
Args:
ticket_id: Ticket ID
new_status: New status value
user_id: User making the change
note: Optional note for audit log
Returns:
Updated ticket dict
Raises:
ValueError: If transition is not allowed
"""
# Get current ticket
current = execute_query_single(
"SELECT * FROM tticket_tickets WHERE id = %s",
(ticket_id,))
if not current:
raise ValueError(f"Ticket {ticket_id} not found")
current_status = current['status']
# Validate transition
is_valid, error = TicketService.validate_status_transition(current_status, new_status)
if not is_valid:
logger.error(f"❌ Invalid status transition: {error}")
raise ValueError(error)
# Update status
execute_update(
"UPDATE tticket_tickets SET status = %s WHERE id = %s",
(new_status, ticket_id)
)
# Update resolved_at timestamp if transitioning to resolved
if new_status == TicketStatus.RESOLVED.value and not current['resolved_at']:
execute_update(
"UPDATE tticket_tickets SET resolved_at = CURRENT_TIMESTAMP WHERE id = %s",
(ticket_id,)
)
# Update closed_at timestamp if transitioning to closed
if new_status == TicketStatus.CLOSED.value and not current['closed_at']:
execute_update(
"UPDATE tticket_tickets SET closed_at = CURRENT_TIMESTAMP WHERE id = %s",
(ticket_id,)
)
# Log status change
TicketService.log_audit(
ticket_id=ticket_id,
entity_type="ticket",
entity_id=ticket_id,
user_id=user_id,
action="status_changed",
old_value=current_status,
new_value=new_status,
details={"note": note} if note else None
)
# Fetch updated ticket
updated = execute_query_single(
"SELECT * FROM tticket_tickets WHERE id = %s",
(ticket_id,))
logger.info(f"✅ Updated ticket {updated['ticket_number']} status: {current_status}{new_status}")
return updated
@staticmethod
def assign_ticket(
ticket_id: int,
assigned_to_user_id: int,
user_id: Optional[int] = None
) -> Dict[str, Any]:
"""
Assign ticket to a user
Args:
ticket_id: Ticket ID
assigned_to_user_id: User to assign to
user_id: User making the assignment
Returns:
Updated ticket dict
"""
# Get current assignment
current = execute_query_single(
"SELECT assigned_to_user_id FROM tticket_tickets WHERE id = %s",
(ticket_id,))
if not current:
raise ValueError(f"Ticket {ticket_id} not found")
# Update assignment
execute_update(
"UPDATE tticket_tickets SET assigned_to_user_id = %s WHERE id = %s",
(assigned_to_user_id, ticket_id)
)
# Log assignment
TicketService.log_audit(
ticket_id=ticket_id,
entity_type="ticket",
entity_id=ticket_id,
user_id=user_id,
action="assigned",
old_value=str(current['assigned_to_user_id']) if current['assigned_to_user_id'] else None,
new_value=str(assigned_to_user_id)
)
# Fetch updated ticket
updated = execute_query_single(
"SELECT * FROM tticket_tickets WHERE id = %s",
(ticket_id,))
logger.info(f"✅ Assigned ticket {updated['ticket_number']} to user {assigned_to_user_id}")
return updated
@staticmethod
def add_comment(
ticket_id: int,
comment_text: str,
user_id: Optional[int] = None,
is_internal: bool = False
) -> Dict[str, Any]:
"""
Add a comment to a ticket
Args:
ticket_id: Ticket ID
comment_text: Comment content
user_id: User adding comment
is_internal: Whether comment is internal
Returns:
Created comment dict
"""
# Verify ticket exists
ticket = execute_query_single(
"SELECT id FROM tticket_tickets WHERE id = %s",
(ticket_id,))
if not ticket:
raise ValueError(f"Ticket {ticket_id} not found")
# Insert comment
comment_id = execute_insert(
"""
INSERT INTO tticket_comments (ticket_id, user_id, comment_text, is_internal)
VALUES (%s, %s, %s, %s)
""",
(ticket_id, user_id, comment_text, is_internal)
)
# Update ticket's updated_at timestamp
execute_update(
"UPDATE tticket_tickets SET updated_at = CURRENT_TIMESTAMP WHERE id = %s",
(ticket_id,)
)
# Update first_response_at if this is the first non-internal comment
if not is_internal:
ticket = execute_query_single(
"SELECT first_response_at FROM tticket_tickets WHERE id = %s",
(ticket_id,))
if not ticket['first_response_at']:
execute_update(
"UPDATE tticket_tickets SET first_response_at = CURRENT_TIMESTAMP WHERE id = %s",
(ticket_id,)
)
# Log comment
TicketService.log_audit(
ticket_id=ticket_id,
entity_type="comment",
entity_id=comment_id,
user_id=user_id,
action="comment_added",
details={"is_internal": is_internal, "length": len(comment_text)}
)
# Fetch created comment
comment = execute_query_single(
"SELECT * FROM tticket_comments WHERE id = %s",
(comment_id,))
logger.info(f"💬 Added comment to ticket {ticket_id} (internal: {is_internal})")
return comment
@staticmethod
def log_audit(
ticket_id: Optional[int] = None,
entity_type: str = "ticket",
entity_id: Optional[int] = None,
user_id: Optional[int] = None,
action: str = "updated",
old_value: Optional[str] = None,
new_value: Optional[str] = None,
details: Optional[Dict[str, Any]] = None
) -> None:
"""
Log an action to audit log
Args:
ticket_id: Related ticket ID (optional)
entity_type: Type of entity (ticket, comment, worklog, etc.)
entity_id: ID of the entity
user_id: User performing action
action: Action performed
old_value: Old value (for updates)
new_value: New value (for updates)
details: Additional details as JSON
"""
try:
from psycopg2.extras import Json
execute_insert(
"""
INSERT INTO tticket_audit_log
(ticket_id, entity_type, entity_id, user_id, action, old_value, new_value, details)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""",
(ticket_id, entity_type, entity_id, user_id, action, old_value, new_value,
Json(details) if details else None)
)
except Exception as e:
logger.error(f"❌ Failed to log audit entry: {e}")
# Don't raise - audit logging should not break the main operation
@staticmethod
def get_ticket_with_stats(ticket_id: int) -> Optional[Dict[str, Any]]:
"""
Get ticket with statistics from view
Args:
ticket_id: Ticket ID
Returns:
Ticket dict with stats or None if not found
"""
ticket = execute_query_single(
"SELECT * FROM tticket_open_tickets WHERE id = %s",
(ticket_id,))
# If not in open_tickets view, fetch from main table
if not ticket:
ticket = execute_query_single(
"SELECT * FROM tticket_tickets WHERE id = %s",
(ticket_id,))
return ticket
@staticmethod
def list_tickets(
status: Optional[str] = None,
priority: Optional[str] = None,
customer_id: Optional[int] = None,
assigned_to_user_id: Optional[int] = None,
search: Optional[str] = None,
limit: int = 50,
offset: int = 0
) -> List[Dict[str, Any]]:
"""
List tickets with filters
Args:
status: Filter by status
priority: Filter by priority
customer_id: Filter by customer
assigned_to_user_id: Filter by assigned user
search: Search in subject/description
limit: Number of results
offset: Offset for pagination
Returns:
List of ticket dicts
"""
query = "SELECT * FROM tticket_tickets WHERE 1=1"
params = []
if status:
query += " AND status = %s"
params.append(status)
if priority:
query += " AND priority = %s"
params.append(priority)
if customer_id:
query += " AND customer_id = %s"
params.append(customer_id)
if assigned_to_user_id:
query += " AND assigned_to_user_id = %s"
params.append(assigned_to_user_id)
if search:
query += " AND (subject ILIKE %s OR description ILIKE %s)"
search_pattern = f"%{search}%"
params.extend([search_pattern, search_pattern])
query += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
params.extend([limit, offset])
tickets = execute_query(query, tuple(params))
return tickets or []