- Added hub_customer_id to TModuleApprovalStats for better tracking. - Introduced TModuleWizardEditRequest for editing time entries, allowing updates to description, hours, and billing method. - Implemented approval and rejection logic for Hub Worklogs, including handling negative IDs. - Created a new endpoint for updating entry details, supporting both Hub Worklogs and Module Times. - Updated frontend to include an edit modal for time entries, with specific fields for Hub Worklogs and Module Times. - Enhanced customer statistics retrieval to include pending counts from Hub Worklogs. - Added migrations for ticket enhancements, including new fields and constraints for worklogs and prepaid cards.
528 lines
17 KiB
Python
528 lines
17 KiB
Python
"""
|
|
Ticket Service
|
|
==============
|
|
|
|
Business logic for ticket operations: status transitions, validation, audit logging.
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, Any, List
|
|
from decimal import Decimal
|
|
|
|
from app.core.database import execute_query, execute_insert, execute_update, execute_query_single
|
|
from app.ticket.backend.models import (
|
|
TicketStatus,
|
|
TicketPriority,
|
|
TTicketCreate,
|
|
TTicketUpdate,
|
|
TTicket
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TicketService:
|
|
"""Service for ticket business logic"""
|
|
|
|
# Status transition rules (which transitions are allowed)
|
|
ALLOWED_TRANSITIONS = {
|
|
TicketStatus.OPEN: [TicketStatus.IN_PROGRESS, TicketStatus.CLOSED],
|
|
TicketStatus.IN_PROGRESS: [
|
|
TicketStatus.WAITING_CUSTOMER,
|
|
TicketStatus.WAITING_INTERNAL,
|
|
TicketStatus.RESOLVED
|
|
],
|
|
TicketStatus.WAITING_CUSTOMER: [TicketStatus.IN_PROGRESS, TicketStatus.CLOSED],
|
|
TicketStatus.WAITING_INTERNAL: [TicketStatus.IN_PROGRESS, TicketStatus.CLOSED],
|
|
TicketStatus.RESOLVED: [TicketStatus.CLOSED, TicketStatus.IN_PROGRESS], # Can reopen
|
|
TicketStatus.CLOSED: [] # Cannot transition from closed
|
|
}
|
|
|
|
@staticmethod
|
|
def validate_status_transition(current_status: str, new_status: str) -> tuple[bool, Optional[str]]:
|
|
"""
|
|
Validate if status transition is allowed
|
|
|
|
Returns:
|
|
(is_valid, error_message)
|
|
"""
|
|
try:
|
|
current = TicketStatus(current_status)
|
|
new = TicketStatus(new_status)
|
|
except ValueError as e:
|
|
return False, f"Invalid status: {e}"
|
|
|
|
if current == new:
|
|
return True, None # Same status is OK
|
|
|
|
allowed = TicketService.ALLOWED_TRANSITIONS.get(current, [])
|
|
if new not in allowed:
|
|
return False, f"Cannot transition from {current.value} to {new.value}"
|
|
|
|
return True, None
|
|
|
|
@staticmethod
|
|
def create_ticket(
|
|
ticket_data: TTicketCreate,
|
|
user_id: Optional[int] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create a new ticket with auto-generated ticket_number if not provided
|
|
|
|
Args:
|
|
ticket_data: Ticket creation data
|
|
user_id: User creating the ticket
|
|
|
|
Returns:
|
|
Created ticket dict
|
|
"""
|
|
logger.info(f"🎫 Creating ticket: {ticket_data.subject}")
|
|
|
|
# Prepare data for PostgreSQL (convert Python types to SQL types)
|
|
import json
|
|
from psycopg2.extras import Json
|
|
|
|
# Insert ticket (trigger will auto-generate ticket_number if NULL)
|
|
result = execute_query_single(
|
|
"""
|
|
INSERT INTO tticket_tickets (
|
|
ticket_number, subject, description, status, priority, category,
|
|
customer_id, contact_id, assigned_to_user_id, created_by_user_id,
|
|
source, tags, custom_fields, ticket_type, internal_note
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(
|
|
ticket_data.ticket_number,
|
|
ticket_data.subject,
|
|
ticket_data.description,
|
|
ticket_data.status.value,
|
|
ticket_data.priority.value,
|
|
ticket_data.category,
|
|
ticket_data.customer_id,
|
|
ticket_data.contact_id,
|
|
ticket_data.assigned_to_user_id,
|
|
user_id or ticket_data.created_by_user_id,
|
|
ticket_data.source.value,
|
|
ticket_data.tags or [], # PostgreSQL array
|
|
Json(ticket_data.custom_fields or {}), # PostgreSQL JSONB
|
|
ticket_data.ticket_type.value,
|
|
ticket_data.internal_note
|
|
)
|
|
)
|
|
|
|
if not result:
|
|
raise Exception("Failed to create ticket - no ID returned")
|
|
|
|
ticket_id = result['id']
|
|
|
|
# Log creation
|
|
TicketService.log_audit(
|
|
ticket_id=ticket_id,
|
|
entity_type="ticket",
|
|
entity_id=ticket_id,
|
|
user_id=user_id,
|
|
action="created",
|
|
details={"subject": ticket_data.subject, "status": ticket_data.status.value}
|
|
)
|
|
|
|
# Fetch created ticket
|
|
ticket = execute_query_single(
|
|
"SELECT * FROM tticket_tickets WHERE id = %s",
|
|
(ticket_id,))
|
|
|
|
logger.info(f"✅ Created ticket {ticket['ticket_number']} (ID: {ticket_id})")
|
|
return ticket
|
|
|
|
@staticmethod
|
|
def update_ticket(
|
|
ticket_id: int,
|
|
update_data: TTicketUpdate,
|
|
user_id: Optional[int] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Update ticket with partial data
|
|
|
|
Args:
|
|
ticket_id: Ticket ID to update
|
|
update_data: Fields to update
|
|
user_id: User making the update
|
|
|
|
Returns:
|
|
Updated ticket dict
|
|
"""
|
|
# Get current ticket
|
|
current = execute_query_single(
|
|
"SELECT * FROM tticket_tickets WHERE id = %s",
|
|
(ticket_id,))
|
|
|
|
if not current:
|
|
raise ValueError(f"Ticket {ticket_id} not found")
|
|
|
|
# Build UPDATE query dynamically
|
|
import json
|
|
from psycopg2.extras import Json
|
|
|
|
updates = []
|
|
params = []
|
|
|
|
update_dict = update_data.model_dump(exclude_unset=True)
|
|
|
|
for field, value in update_dict.items():
|
|
# Convert enums to values
|
|
if hasattr(value, 'value'):
|
|
value = value.value
|
|
# Convert dict to JSON for PostgreSQL JSONB
|
|
elif field == 'custom_fields' and isinstance(value, dict):
|
|
value = Json(value)
|
|
# Arrays are handled automatically by psycopg2
|
|
|
|
updates.append(f"{field} = %s")
|
|
params.append(value)
|
|
|
|
if not updates:
|
|
logger.warning(f"No fields to update for ticket {ticket_id}")
|
|
return current
|
|
|
|
# Add ticket_id to params
|
|
params.append(ticket_id)
|
|
|
|
query = f"UPDATE tticket_tickets SET {', '.join(updates)} WHERE id = %s"
|
|
execute_update(query, tuple(params))
|
|
|
|
# Log update
|
|
TicketService.log_audit(
|
|
ticket_id=ticket_id,
|
|
entity_type="ticket",
|
|
entity_id=ticket_id,
|
|
user_id=user_id,
|
|
action="updated",
|
|
details=update_dict
|
|
)
|
|
|
|
# Fetch updated ticket
|
|
updated = execute_query_single(
|
|
"SELECT * FROM tticket_tickets WHERE id = %s",
|
|
(ticket_id,))
|
|
|
|
logger.info(f"✅ Updated ticket {updated['ticket_number']}")
|
|
return updated
|
|
|
|
@staticmethod
|
|
def update_ticket_status(
|
|
ticket_id: int,
|
|
new_status: str,
|
|
user_id: Optional[int] = None,
|
|
note: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Update ticket status with validation
|
|
|
|
Args:
|
|
ticket_id: Ticket ID
|
|
new_status: New status value
|
|
user_id: User making the change
|
|
note: Optional note for audit log
|
|
|
|
Returns:
|
|
Updated ticket dict
|
|
|
|
Raises:
|
|
ValueError: If transition is not allowed
|
|
"""
|
|
# Get current ticket
|
|
current = execute_query_single(
|
|
"SELECT * FROM tticket_tickets WHERE id = %s",
|
|
(ticket_id,))
|
|
|
|
if not current:
|
|
raise ValueError(f"Ticket {ticket_id} not found")
|
|
|
|
current_status = current['status']
|
|
|
|
# Validate transition
|
|
is_valid, error = TicketService.validate_status_transition(current_status, new_status)
|
|
if not is_valid:
|
|
logger.error(f"❌ Invalid status transition: {error}")
|
|
raise ValueError(error)
|
|
|
|
# Update status
|
|
execute_update(
|
|
"UPDATE tticket_tickets SET status = %s WHERE id = %s",
|
|
(new_status, ticket_id)
|
|
)
|
|
|
|
# Update resolved_at timestamp if transitioning to resolved
|
|
if new_status == TicketStatus.RESOLVED.value and not current['resolved_at']:
|
|
execute_update(
|
|
"UPDATE tticket_tickets SET resolved_at = CURRENT_TIMESTAMP WHERE id = %s",
|
|
(ticket_id,)
|
|
)
|
|
|
|
# Update closed_at timestamp if transitioning to closed
|
|
if new_status == TicketStatus.CLOSED.value and not current['closed_at']:
|
|
execute_update(
|
|
"UPDATE tticket_tickets SET closed_at = CURRENT_TIMESTAMP WHERE id = %s",
|
|
(ticket_id,)
|
|
)
|
|
|
|
# Log status change
|
|
TicketService.log_audit(
|
|
ticket_id=ticket_id,
|
|
entity_type="ticket",
|
|
entity_id=ticket_id,
|
|
user_id=user_id,
|
|
action="status_changed",
|
|
old_value=current_status,
|
|
new_value=new_status,
|
|
details={"note": note} if note else None
|
|
)
|
|
|
|
# Fetch updated ticket
|
|
updated = execute_query_single(
|
|
"SELECT * FROM tticket_tickets WHERE id = %s",
|
|
(ticket_id,))
|
|
|
|
logger.info(f"✅ Updated ticket {updated['ticket_number']} status: {current_status} → {new_status}")
|
|
return updated
|
|
|
|
@staticmethod
|
|
def assign_ticket(
|
|
ticket_id: int,
|
|
assigned_to_user_id: int,
|
|
user_id: Optional[int] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Assign ticket to a user
|
|
|
|
Args:
|
|
ticket_id: Ticket ID
|
|
assigned_to_user_id: User to assign to
|
|
user_id: User making the assignment
|
|
|
|
Returns:
|
|
Updated ticket dict
|
|
"""
|
|
# Get current assignment
|
|
current = execute_query_single(
|
|
"SELECT assigned_to_user_id FROM tticket_tickets WHERE id = %s",
|
|
(ticket_id,))
|
|
|
|
if not current:
|
|
raise ValueError(f"Ticket {ticket_id} not found")
|
|
|
|
# Update assignment
|
|
execute_update(
|
|
"UPDATE tticket_tickets SET assigned_to_user_id = %s WHERE id = %s",
|
|
(assigned_to_user_id, ticket_id)
|
|
)
|
|
|
|
# Log assignment
|
|
TicketService.log_audit(
|
|
ticket_id=ticket_id,
|
|
entity_type="ticket",
|
|
entity_id=ticket_id,
|
|
user_id=user_id,
|
|
action="assigned",
|
|
old_value=str(current['assigned_to_user_id']) if current['assigned_to_user_id'] else None,
|
|
new_value=str(assigned_to_user_id)
|
|
)
|
|
|
|
# Fetch updated ticket
|
|
updated = execute_query_single(
|
|
"SELECT * FROM tticket_tickets WHERE id = %s",
|
|
(ticket_id,))
|
|
|
|
logger.info(f"✅ Assigned ticket {updated['ticket_number']} to user {assigned_to_user_id}")
|
|
return updated
|
|
|
|
@staticmethod
|
|
def add_comment(
|
|
ticket_id: int,
|
|
comment_text: str,
|
|
user_id: Optional[int] = None,
|
|
is_internal: bool = False
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Add a comment to a ticket
|
|
|
|
Args:
|
|
ticket_id: Ticket ID
|
|
comment_text: Comment content
|
|
user_id: User adding comment
|
|
is_internal: Whether comment is internal
|
|
|
|
Returns:
|
|
Created comment dict
|
|
"""
|
|
# Verify ticket exists
|
|
ticket = execute_query_single(
|
|
"SELECT id FROM tticket_tickets WHERE id = %s",
|
|
(ticket_id,))
|
|
|
|
if not ticket:
|
|
raise ValueError(f"Ticket {ticket_id} not found")
|
|
|
|
# Insert comment
|
|
comment_id = execute_insert(
|
|
"""
|
|
INSERT INTO tticket_comments (ticket_id, user_id, comment_text, is_internal)
|
|
VALUES (%s, %s, %s, %s)
|
|
""",
|
|
(ticket_id, user_id, comment_text, is_internal)
|
|
)
|
|
|
|
# Update ticket's updated_at timestamp
|
|
execute_update(
|
|
"UPDATE tticket_tickets SET updated_at = CURRENT_TIMESTAMP WHERE id = %s",
|
|
(ticket_id,)
|
|
)
|
|
|
|
# Update first_response_at if this is the first non-internal comment
|
|
if not is_internal:
|
|
ticket = execute_query_single(
|
|
"SELECT first_response_at FROM tticket_tickets WHERE id = %s",
|
|
(ticket_id,))
|
|
if not ticket['first_response_at']:
|
|
execute_update(
|
|
"UPDATE tticket_tickets SET first_response_at = CURRENT_TIMESTAMP WHERE id = %s",
|
|
(ticket_id,)
|
|
)
|
|
|
|
# Log comment
|
|
TicketService.log_audit(
|
|
ticket_id=ticket_id,
|
|
entity_type="comment",
|
|
entity_id=comment_id,
|
|
user_id=user_id,
|
|
action="comment_added",
|
|
details={"is_internal": is_internal, "length": len(comment_text)}
|
|
)
|
|
|
|
# Fetch created comment
|
|
comment = execute_query_single(
|
|
"SELECT * FROM tticket_comments WHERE id = %s",
|
|
(comment_id,))
|
|
|
|
logger.info(f"💬 Added comment to ticket {ticket_id} (internal: {is_internal})")
|
|
return comment
|
|
|
|
@staticmethod
|
|
def log_audit(
|
|
ticket_id: Optional[int] = None,
|
|
entity_type: str = "ticket",
|
|
entity_id: Optional[int] = None,
|
|
user_id: Optional[int] = None,
|
|
action: str = "updated",
|
|
old_value: Optional[str] = None,
|
|
new_value: Optional[str] = None,
|
|
details: Optional[Dict[str, Any]] = None
|
|
) -> None:
|
|
"""
|
|
Log an action to audit log
|
|
|
|
Args:
|
|
ticket_id: Related ticket ID (optional)
|
|
entity_type: Type of entity (ticket, comment, worklog, etc.)
|
|
entity_id: ID of the entity
|
|
user_id: User performing action
|
|
action: Action performed
|
|
old_value: Old value (for updates)
|
|
new_value: New value (for updates)
|
|
details: Additional details as JSON
|
|
"""
|
|
try:
|
|
from psycopg2.extras import Json
|
|
|
|
execute_insert(
|
|
"""
|
|
INSERT INTO tticket_audit_log
|
|
(ticket_id, entity_type, entity_id, user_id, action, old_value, new_value, details)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
|
""",
|
|
(ticket_id, entity_type, entity_id, user_id, action, old_value, new_value,
|
|
Json(details) if details else None)
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to log audit entry: {e}")
|
|
# Don't raise - audit logging should not break the main operation
|
|
|
|
@staticmethod
|
|
def get_ticket_with_stats(ticket_id: int) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get ticket with statistics from view
|
|
|
|
Args:
|
|
ticket_id: Ticket ID
|
|
|
|
Returns:
|
|
Ticket dict with stats or None if not found
|
|
"""
|
|
ticket = execute_query_single(
|
|
"SELECT * FROM tticket_open_tickets WHERE id = %s",
|
|
(ticket_id,))
|
|
|
|
# If not in open_tickets view, fetch from main table
|
|
if not ticket:
|
|
ticket = execute_query_single(
|
|
"SELECT * FROM tticket_tickets WHERE id = %s",
|
|
(ticket_id,))
|
|
|
|
return ticket
|
|
|
|
@staticmethod
|
|
def list_tickets(
|
|
status: Optional[str] = None,
|
|
priority: Optional[str] = None,
|
|
customer_id: Optional[int] = None,
|
|
assigned_to_user_id: Optional[int] = None,
|
|
search: Optional[str] = None,
|
|
limit: int = 50,
|
|
offset: int = 0
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
List tickets with filters
|
|
|
|
Args:
|
|
status: Filter by status
|
|
priority: Filter by priority
|
|
customer_id: Filter by customer
|
|
assigned_to_user_id: Filter by assigned user
|
|
search: Search in subject/description
|
|
limit: Number of results
|
|
offset: Offset for pagination
|
|
|
|
Returns:
|
|
List of ticket dicts
|
|
"""
|
|
query = "SELECT * FROM tticket_tickets WHERE 1=1"
|
|
params = []
|
|
|
|
if status:
|
|
query += " AND status = %s"
|
|
params.append(status)
|
|
|
|
if priority:
|
|
query += " AND priority = %s"
|
|
params.append(priority)
|
|
|
|
if customer_id:
|
|
query += " AND customer_id = %s"
|
|
params.append(customer_id)
|
|
|
|
if assigned_to_user_id:
|
|
query += " AND assigned_to_user_id = %s"
|
|
params.append(assigned_to_user_id)
|
|
|
|
if search:
|
|
query += " AND (subject ILIKE %s OR description ILIKE %s)"
|
|
search_pattern = f"%{search}%"
|
|
params.extend([search_pattern, search_pattern])
|
|
|
|
query += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
|
|
params.extend([limit, offset])
|
|
|
|
tickets = execute_query(query, tuple(params))
|
|
return tickets or []
|