""" Reminder Scheduler Job Processes pending time-based reminders and queue-based trigger events Runs every 5 minutes (configurable) """ import logging from datetime import datetime, timedelta import json from app.core.config import settings from app.core.database import execute_query, execute_insert from app.services.reminder_notification_service import reminder_notification_service logger = logging.getLogger(__name__) async def check_reminders(): """ Main job: Check for pending reminders and trigger notifications - Process time-based reminders (scheduled_at or next_check_at <= NOW()) - Process queued trigger events from database triggers - Handle recurring reminders (calculate next_check_at) - Respect rate limiting (max 5 per user per hour) """ if not settings.REMINDERS_ENABLED: return try: logger.info("🔔 Checking for pending reminders...") # Step 1: Process queued trigger events (status changes) queue_count = await _process_reminder_queue() # Step 2: Process time-based reminders time_based_count = await _process_time_based_reminders() logger.info(f"✅ Reminder check complete: {queue_count} queue events, {time_based_count} time-based") except Exception as e: logger.error(f"❌ Reminder check failed: {e}") async def _process_reminder_queue(): """Process queued reminder events from status change triggers""" count = 0 batch_size = settings.REMINDERS_QUEUE_BATCH_SIZE try: # Get pending queue events query = """ SELECT q.id, q.reminder_id, q.sag_id, q.event_data, r.title, r.message, r.priority, r.recipient_user_ids, r.recipient_emails, r.notify_mattermost, r.notify_email, r.notify_frontend, r.override_user_preferences, s.titel as case_title, c.name as customer_name, s.status as case_status, s.deadline, s.ansvarlig_bruger_id FROM v_pending_reminder_queue q JOIN sag_reminders r ON q.reminder_id = r.id JOIN sag_sager s ON q.sag_id = s.id JOIN customers c ON s.customer_id = c.id LIMIT %s """ events = execute_query(query, (batch_size,)) for event in events: try: # Update queue status to processing update_query = "UPDATE sag_reminder_queue SET status = 'processing' WHERE id = %s" execute_insert(update_query, (event['id'],)) # Get assigned user name assigned_user = None if event['ansvarlig_bruger_id']: user_query = "SELECT full_name FROM users WHERE id = %s" user = execute_query(user_query, (event['ansvarlig_bruger_id'],)) assigned_user = user[0]['full_name'] if user else None # Send reminder result = await reminder_notification_service.send_reminder( reminder_id=event['reminder_id'], sag_id=event['sag_id'], case_title=event['case_title'], customer_name=event['customer_name'], reminder_title=event['title'], reminder_message=event['message'], recipient_user_ids=event['recipient_user_ids'] or [], recipient_emails=event['recipient_emails'] or [], priority=event['priority'], notify_mattermost=event['notify_mattermost'], notify_email=event['notify_email'], notify_frontend=event['notify_frontend'], override_user_preferences=event['override_user_preferences'], case_status=event['case_status'], deadline=event['deadline'].isoformat() if event['deadline'] else None, assigned_user=assigned_user ) # Update queue status if result['success']: status = 'sent' log_msg = None elif result['rate_limited_users']: status = 'rate_limited' log_msg = f"Rate limited: {len(result['rate_limited_users'])} users" else: status = 'failed' log_msg = ', '.join(result['errors'])[:500] update_query = """ UPDATE sag_reminder_queue SET status = %s, processed_at = CURRENT_TIMESTAMP, error_message = %s WHERE id = %s """ execute_insert(update_query, (status, log_msg, event['id'])) count += 1 logger.info(f"✅ Processed queue event {event['id']} (reminder {event['reminder_id']})") except Exception as e: logger.error(f"❌ Failed to process queue event {event['id']}: {e}") update_query = """ UPDATE sag_reminder_queue SET status = 'failed', error_message = %s, processed_at = CURRENT_TIMESTAMP WHERE id = %s """ execute_insert(update_query, (str(e)[:500], event['id'])) except Exception as e: logger.error(f"❌ Error processing reminder queue: {e}") return count async def _process_time_based_reminders(): """Process time-based reminders with scheduling""" count = 0 batch_size = settings.REMINDERS_QUEUE_BATCH_SIZE try: # Get pending time-based reminders query = """ SELECT r.id, r.sag_id, r.title, r.message, r.priority, r.recipient_user_ids, r.recipient_emails, r.notify_mattermost, r.notify_email, r.notify_frontend, r.override_user_preferences, r.recurrence_type, r.recurrence_day_of_week, r.recurrence_day_of_month, r.next_check_at, s.titel as case_title, c.name as customer_name, s.status as case_status, s.deadline, s.ansvarlig_bruger_id FROM sag_reminders r JOIN sag_sager s ON r.sag_id = s.id JOIN customers c ON s.customer_id = c.id WHERE r.is_active = true AND r.deleted_at IS NULL AND r.trigger_type = 'time_based' AND r.next_check_at IS NOT NULL AND r.next_check_at <= CURRENT_TIMESTAMP ORDER BY r.priority DESC, r.next_check_at ASC LIMIT %s """ reminders = execute_query(query, (batch_size,)) for reminder in reminders: try: # Get assigned user name assigned_user = None if reminder['ansvarlig_bruger_id']: user_query = "SELECT full_name FROM users WHERE id = %s" user = execute_query(user_query, (reminder['ansvarlig_bruger_id'],)) assigned_user = user[0]['full_name'] if user else None # Send reminder result = await reminder_notification_service.send_reminder( reminder_id=reminder['id'], sag_id=reminder['sag_id'], case_title=reminder['case_title'], customer_name=reminder['customer_name'], reminder_title=reminder['title'], reminder_message=reminder['message'], recipient_user_ids=reminder['recipient_user_ids'] or [], recipient_emails=reminder['recipient_emails'] or [], priority=reminder['priority'], notify_mattermost=reminder['notify_mattermost'], notify_email=reminder['notify_email'], notify_frontend=reminder['notify_frontend'], override_user_preferences=reminder['override_user_preferences'], case_status=reminder['case_status'], deadline=reminder['deadline'].isoformat() if reminder['deadline'] else None, assigned_user=assigned_user ) # Calculate next check time for recurring reminders next_check_at = _calculate_next_check( reminder['recurrence_type'], reminder['recurrence_day_of_week'], reminder['recurrence_day_of_month'] ) # Update reminder update_query = """ UPDATE sag_reminders SET last_sent_at = CURRENT_TIMESTAMP, next_check_at = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s """ execute_insert(update_query, (next_check_at, reminder['id'])) count += 1 logger.info(f"✅ Processed reminder {reminder['id']} (next: {next_check_at})") except Exception as e: logger.error(f"❌ Failed to process reminder {reminder['id']}: {e}") except Exception as e: logger.error(f"❌ Error processing time-based reminders: {e}") return count def _calculate_next_check(recurrence_type: str, day_of_week: int = None, day_of_month: int = None): """Calculate when reminder should be checked next""" now = datetime.now() if recurrence_type == 'once': # One-time reminder - no next check return None elif recurrence_type == 'daily': # Next day at same time return now + timedelta(days=1) elif recurrence_type == 'weekly': # Same day next week if day_of_week is not None: # If specific day set, calculate days until that day days_ahead = day_of_week - now.weekday() if days_ahead <= 0: # Target day already happened this week days_ahead += 7 return now + timedelta(days=days_ahead) else: # Next week same day return now + timedelta(days=7) elif recurrence_type == 'monthly': # Same day next month if day_of_month is not None: try: # Try to set day in next month if now.month == 12: next_month = now.replace(year=now.year + 1, month=1, day=min(day_of_month, 28)) else: next_month = now.replace(month=now.month + 1, day=min(day_of_month, 28)) if next_month <= now: # Already passed this month, go to next next_month = next_month + timedelta(days=28) return next_month except ValueError: # Invalid date (e.g., Feb 30), use last day of month pass # Fallback: 30 days from now return now + timedelta(days=30) return None