""" Ticket Service ============== Business logic for ticket operations: status transitions, validation, audit logging. """ import logging from datetime import datetime from typing import Optional, Dict, Any, List from decimal import Decimal from app.core.database import execute_query, execute_insert, execute_update, execute_query_single from app.ticket.backend.models import ( TicketStatus, TicketPriority, TTicketCreate, TTicketUpdate, TTicket ) logger = logging.getLogger(__name__) class TicketService: """Service for ticket business logic""" # Status transition rules (which transitions are allowed) ALLOWED_TRANSITIONS = { TicketStatus.OPEN: [TicketStatus.IN_PROGRESS, TicketStatus.CLOSED], TicketStatus.IN_PROGRESS: [ TicketStatus.WAITING_CUSTOMER, TicketStatus.WAITING_INTERNAL, TicketStatus.RESOLVED ], TicketStatus.WAITING_CUSTOMER: [TicketStatus.IN_PROGRESS, TicketStatus.CLOSED], TicketStatus.WAITING_INTERNAL: [TicketStatus.IN_PROGRESS, TicketStatus.CLOSED], TicketStatus.RESOLVED: [TicketStatus.CLOSED, TicketStatus.IN_PROGRESS], # Can reopen TicketStatus.CLOSED: [] # Cannot transition from closed } @staticmethod def validate_status_transition(current_status: str, new_status: str) -> tuple[bool, Optional[str]]: """ Validate if status transition is allowed Returns: (is_valid, error_message) """ try: current = TicketStatus(current_status) new = TicketStatus(new_status) except ValueError as e: return False, f"Invalid status: {e}" if current == new: return True, None # Same status is OK allowed = TicketService.ALLOWED_TRANSITIONS.get(current, []) if new not in allowed: return False, f"Cannot transition from {current.value} to {new.value}" return True, None @staticmethod def create_ticket( ticket_data: TTicketCreate, user_id: Optional[int] = None ) -> Dict[str, Any]: """ Create a new ticket with auto-generated ticket_number if not provided Args: ticket_data: Ticket creation data user_id: User creating the ticket Returns: Created ticket dict """ logger.info(f"🎫 Creating ticket: {ticket_data.subject}") # Prepare data for PostgreSQL (convert Python types to SQL types) import json from psycopg2.extras import Json # Insert ticket (trigger will auto-generate ticket_number if NULL) result = execute_query_single( """ INSERT INTO tticket_tickets ( ticket_number, subject, description, status, priority, category, customer_id, contact_id, assigned_to_user_id, created_by_user_id, source, tags, custom_fields, ticket_type, internal_note ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id """, ( ticket_data.ticket_number, ticket_data.subject, ticket_data.description, ticket_data.status.value, ticket_data.priority.value, ticket_data.category, ticket_data.customer_id, ticket_data.contact_id, ticket_data.assigned_to_user_id, user_id or ticket_data.created_by_user_id, ticket_data.source.value, ticket_data.tags or [], # PostgreSQL array Json(ticket_data.custom_fields or {}), # PostgreSQL JSONB ticket_data.ticket_type.value, ticket_data.internal_note ) ) if not result: raise Exception("Failed to create ticket - no ID returned") ticket_id = result['id'] # Log creation TicketService.log_audit( ticket_id=ticket_id, entity_type="ticket", entity_id=ticket_id, user_id=user_id, action="created", details={"subject": ticket_data.subject, "status": ticket_data.status.value} ) # Fetch created ticket ticket = execute_query_single( "SELECT * FROM tticket_tickets WHERE id = %s", (ticket_id,)) logger.info(f"✅ Created ticket {ticket['ticket_number']} (ID: {ticket_id})") return ticket @staticmethod def update_ticket( ticket_id: int, update_data: TTicketUpdate, user_id: Optional[int] = None ) -> Dict[str, Any]: """ Update ticket with partial data Args: ticket_id: Ticket ID to update update_data: Fields to update user_id: User making the update Returns: Updated ticket dict """ # Get current ticket current = execute_query_single( "SELECT * FROM tticket_tickets WHERE id = %s", (ticket_id,)) if not current: raise ValueError(f"Ticket {ticket_id} not found") # Build UPDATE query dynamically import json from psycopg2.extras import Json updates = [] params = [] update_dict = update_data.model_dump(exclude_unset=True) for field, value in update_dict.items(): # Convert enums to values if hasattr(value, 'value'): value = value.value # Convert dict to JSON for PostgreSQL JSONB elif field == 'custom_fields' and isinstance(value, dict): value = Json(value) # Arrays are handled automatically by psycopg2 updates.append(f"{field} = %s") params.append(value) if not updates: logger.warning(f"No fields to update for ticket {ticket_id}") return current # Add ticket_id to params params.append(ticket_id) query = f"UPDATE tticket_tickets SET {', '.join(updates)} WHERE id = %s" execute_update(query, tuple(params)) # Log update TicketService.log_audit( ticket_id=ticket_id, entity_type="ticket", entity_id=ticket_id, user_id=user_id, action="updated", details=update_dict ) # Fetch updated ticket updated = execute_query_single( "SELECT * FROM tticket_tickets WHERE id = %s", (ticket_id,)) logger.info(f"✅ Updated ticket {updated['ticket_number']}") return updated @staticmethod def update_ticket_status( ticket_id: int, new_status: str, user_id: Optional[int] = None, note: Optional[str] = None ) -> Dict[str, Any]: """ Update ticket status with validation Args: ticket_id: Ticket ID new_status: New status value user_id: User making the change note: Optional note for audit log Returns: Updated ticket dict Raises: ValueError: If transition is not allowed """ # Get current ticket current = execute_query_single( "SELECT * FROM tticket_tickets WHERE id = %s", (ticket_id,)) if not current: raise ValueError(f"Ticket {ticket_id} not found") current_status = current['status'] # Validate transition is_valid, error = TicketService.validate_status_transition(current_status, new_status) if not is_valid: logger.error(f"❌ Invalid status transition: {error}") raise ValueError(error) # Update status execute_update( "UPDATE tticket_tickets SET status = %s WHERE id = %s", (new_status, ticket_id) ) # Update resolved_at timestamp if transitioning to resolved if new_status == TicketStatus.RESOLVED.value and not current['resolved_at']: execute_update( "UPDATE tticket_tickets SET resolved_at = CURRENT_TIMESTAMP WHERE id = %s", (ticket_id,) ) # Update closed_at timestamp if transitioning to closed if new_status == TicketStatus.CLOSED.value and not current['closed_at']: execute_update( "UPDATE tticket_tickets SET closed_at = CURRENT_TIMESTAMP WHERE id = %s", (ticket_id,) ) # Log status change TicketService.log_audit( ticket_id=ticket_id, entity_type="ticket", entity_id=ticket_id, user_id=user_id, action="status_changed", old_value=current_status, new_value=new_status, details={"note": note} if note else None ) # Fetch updated ticket updated = execute_query_single( "SELECT * FROM tticket_tickets WHERE id = %s", (ticket_id,)) logger.info(f"✅ Updated ticket {updated['ticket_number']} status: {current_status} → {new_status}") return updated @staticmethod def assign_ticket( ticket_id: int, assigned_to_user_id: int, user_id: Optional[int] = None ) -> Dict[str, Any]: """ Assign ticket to a user Args: ticket_id: Ticket ID assigned_to_user_id: User to assign to user_id: User making the assignment Returns: Updated ticket dict """ # Get current assignment current = execute_query_single( "SELECT assigned_to_user_id FROM tticket_tickets WHERE id = %s", (ticket_id,)) if not current: raise ValueError(f"Ticket {ticket_id} not found") # Update assignment execute_update( "UPDATE tticket_tickets SET assigned_to_user_id = %s WHERE id = %s", (assigned_to_user_id, ticket_id) ) # Log assignment TicketService.log_audit( ticket_id=ticket_id, entity_type="ticket", entity_id=ticket_id, user_id=user_id, action="assigned", old_value=str(current['assigned_to_user_id']) if current['assigned_to_user_id'] else None, new_value=str(assigned_to_user_id) ) # Fetch updated ticket updated = execute_query_single( "SELECT * FROM tticket_tickets WHERE id = %s", (ticket_id,)) logger.info(f"✅ Assigned ticket {updated['ticket_number']} to user {assigned_to_user_id}") return updated @staticmethod def add_comment( ticket_id: int, comment_text: str, user_id: Optional[int] = None, is_internal: bool = False ) -> Dict[str, Any]: """ Add a comment to a ticket Args: ticket_id: Ticket ID comment_text: Comment content user_id: User adding comment is_internal: Whether comment is internal Returns: Created comment dict """ # Verify ticket exists ticket = execute_query_single( "SELECT id FROM tticket_tickets WHERE id = %s", (ticket_id,)) if not ticket: raise ValueError(f"Ticket {ticket_id} not found") # Insert comment comment_id = execute_insert( """ INSERT INTO tticket_comments (ticket_id, user_id, comment_text, is_internal) VALUES (%s, %s, %s, %s) """, (ticket_id, user_id, comment_text, is_internal) ) # Update ticket's updated_at timestamp execute_update( "UPDATE tticket_tickets SET updated_at = CURRENT_TIMESTAMP WHERE id = %s", (ticket_id,) ) # Update first_response_at if this is the first non-internal comment if not is_internal: ticket = execute_query_single( "SELECT first_response_at FROM tticket_tickets WHERE id = %s", (ticket_id,)) if not ticket['first_response_at']: execute_update( "UPDATE tticket_tickets SET first_response_at = CURRENT_TIMESTAMP WHERE id = %s", (ticket_id,) ) # Log comment TicketService.log_audit( ticket_id=ticket_id, entity_type="comment", entity_id=comment_id, user_id=user_id, action="comment_added", details={"is_internal": is_internal, "length": len(comment_text)} ) # Fetch created comment comment = execute_query_single( "SELECT * FROM tticket_comments WHERE id = %s", (comment_id,)) logger.info(f"💬 Added comment to ticket {ticket_id} (internal: {is_internal})") return comment @staticmethod def log_audit( ticket_id: Optional[int] = None, entity_type: str = "ticket", entity_id: Optional[int] = None, user_id: Optional[int] = None, action: str = "updated", old_value: Optional[str] = None, new_value: Optional[str] = None, details: Optional[Dict[str, Any]] = None ) -> None: """ Log an action to audit log Args: ticket_id: Related ticket ID (optional) entity_type: Type of entity (ticket, comment, worklog, etc.) entity_id: ID of the entity user_id: User performing action action: Action performed old_value: Old value (for updates) new_value: New value (for updates) details: Additional details as JSON """ try: from psycopg2.extras import Json execute_insert( """ INSERT INTO tticket_audit_log (ticket_id, entity_type, entity_id, user_id, action, old_value, new_value, details) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """, (ticket_id, entity_type, entity_id, user_id, action, old_value, new_value, Json(details) if details else None) ) except Exception as e: logger.error(f"❌ Failed to log audit entry: {e}") # Don't raise - audit logging should not break the main operation @staticmethod def get_ticket_with_stats(ticket_id: int) -> Optional[Dict[str, Any]]: """ Get ticket with statistics from view Args: ticket_id: Ticket ID Returns: Ticket dict with stats or None if not found """ ticket = execute_query_single( "SELECT * FROM tticket_open_tickets WHERE id = %s", (ticket_id,)) # If not in open_tickets view, fetch from main table if not ticket: ticket = execute_query_single( "SELECT * FROM tticket_tickets WHERE id = %s", (ticket_id,)) return ticket @staticmethod def list_tickets( status: Optional[str] = None, priority: Optional[str] = None, customer_id: Optional[int] = None, assigned_to_user_id: Optional[int] = None, search: Optional[str] = None, limit: int = 50, offset: int = 0 ) -> List[Dict[str, Any]]: """ List tickets with filters Args: status: Filter by status priority: Filter by priority customer_id: Filter by customer assigned_to_user_id: Filter by assigned user search: Search in subject/description limit: Number of results offset: Offset for pagination Returns: List of ticket dicts """ query = "SELECT * FROM tticket_tickets WHERE 1=1" params = [] if status: query += " AND status = %s" params.append(status) if priority: query += " AND priority = %s" params.append(priority) if customer_id: query += " AND customer_id = %s" params.append(customer_id) if assigned_to_user_id: query += " AND assigned_to_user_id = %s" params.append(assigned_to_user_id) if search: query += " AND (subject ILIKE %s OR description ILIKE %s)" search_pattern = f"%{search}%" params.extend([search_pattern, search_pattern]) query += " ORDER BY created_at DESC LIMIT %s OFFSET %s" params.extend([limit, offset]) tickets = execute_query(query, tuple(params)) return tickets or []