""" Email Management Router API endpoints for email viewing, classification, and rule management """ import logging from fastapi import APIRouter, HTTPException, Query, UploadFile, File from typing import List, Optional, Dict from pydantic import BaseModel from datetime import datetime, date from app.core.database import execute_query, execute_insert, execute_update from app.services.email_processor_service import EmailProcessorService from app.services.email_workflow_service import email_workflow_service from app.services.ollama_service import ollama_service logger = logging.getLogger(__name__) router = APIRouter() # Pydantic Models class EmailListItem(BaseModel): id: int message_id: str subject: str sender_email: str sender_name: Optional[str] received_date: datetime classification: Optional[str] confidence_score: Optional[float] status: str is_read: bool has_attachments: bool attachment_count: int rule_name: Optional[str] = None supplier_name: Optional[str] = None customer_name: Optional[str] = None class EmailAttachment(BaseModel): id: int email_id: int filename: str content_type: Optional[str] size_bytes: Optional[int] file_path: Optional[str] created_at: datetime class EmailDetail(BaseModel): id: int message_id: str subject: str sender_email: str sender_name: Optional[str] recipient_email: Optional[str] cc: Optional[str] body_text: Optional[str] body_html: Optional[str] received_date: datetime folder: str classification: Optional[str] confidence_score: Optional[float] status: str is_read: bool has_attachments: bool attachment_count: int rule_id: Optional[int] supplier_id: Optional[int] customer_id: Optional[int] linked_case_id: Optional[int] extracted_invoice_number: Optional[str] extracted_amount: Optional[float] extracted_due_date: Optional[date] auto_processed: bool created_at: datetime updated_at: datetime attachments: List[EmailAttachment] = [] customer_name: Optional[str] = None supplier_name: Optional[str] = None class EmailRule(BaseModel): id: Optional[int] = None name: str description: Optional[str] conditions: dict action_type: str action_params: Optional[dict] = {} priority: int = 100 enabled: bool = True match_count: int = 0 last_matched_at: Optional[datetime] class EmailWorkflow(BaseModel): id: Optional[int] = None name: str description: Optional[str] classification_trigger: str sender_pattern: Optional[str] = None subject_pattern: Optional[str] = None confidence_threshold: float = 0.70 workflow_steps: List[dict] priority: int = 100 enabled: bool = True stop_on_match: bool = True execution_count: int = 0 success_count: int = 0 failure_count: int = 0 last_executed_at: Optional[datetime] = None class WorkflowExecution(BaseModel): id: int workflow_id: int email_id: int status: str steps_completed: int steps_total: Optional[int] result_json: Optional[List[dict]] = None # Can be list of step results error_message: Optional[str] started_at: datetime completed_at: Optional[datetime] execution_time_ms: Optional[int] class WorkflowAction(BaseModel): id: int action_code: str name: str description: Optional[str] category: Optional[str] parameter_schema: Optional[dict] example_config: Optional[dict] enabled: bool class ProcessingStats(BaseModel): status: str fetched: int = 0 saved: int = 0 classified: int = 0 rules_matched: int = 0 errors: int = 0 # Email Endpoints @router.get("/emails", response_model=List[EmailListItem]) async def list_emails( status: Optional[str] = Query(None), classification: Optional[str] = Query(None), q: Optional[str] = Query(None), limit: int = Query(50, le=500), offset: int = Query(0, ge=0) ): """Get list of emails with filtering""" try: where_clauses = ["em.deleted_at IS NULL"] params = [] if status: where_clauses.append("em.status = %s") params.append(status) if classification: where_clauses.append("em.classification = %s") params.append(classification) if q: where_clauses.append("(em.subject ILIKE %s OR em.sender_email ILIKE %s OR em.sender_name ILIKE %s)") search_term = f"%{q}%" params.extend([search_term, search_term, search_term]) where_sql = " AND ".join(where_clauses) query = f""" SELECT em.id, em.message_id, em.subject, em.sender_email, em.sender_name, em.received_date, em.classification, em.confidence_score, em.status, em.is_read, em.has_attachments, em.attachment_count, em.body_text, em.body_html, er.name as rule_name, v.name as supplier_name, NULL as customer_name FROM email_messages em LEFT JOIN email_rules er ON em.rule_id = er.id LEFT JOIN vendors v ON em.supplier_id = v.id WHERE {where_sql} ORDER BY em.received_date DESC LIMIT %s OFFSET %s """ params.extend([limit, offset]) result = execute_query(query, tuple(params)) return result except Exception as e: logger.error(f"❌ Error listing emails: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/emails/{email_id}", response_model=EmailDetail) async def get_email(email_id: int): """Get email detail by ID""" try: query = """ SELECT em.*, c.name AS customer_name, v.name AS supplier_name FROM email_messages em LEFT JOIN customers c ON em.customer_id = c.id LEFT JOIN vendors v ON em.supplier_id = v.id WHERE em.id = %s AND em.deleted_at IS NULL """ result = execute_query(query, (email_id,)) logger.info(f"🔍 Query result type: {type(result)}, length: {len(result) if result else 0}") if not result: raise HTTPException(status_code=404, detail="Email not found") # Store email before update email_data = result[0] # Get attachments att_query = "SELECT * FROM email_attachments WHERE email_id = %s ORDER BY id" attachments = execute_query(att_query, (email_id,)) email_data['attachments'] = attachments or [] # Mark as read update_query = "UPDATE email_messages SET is_read = true WHERE id = %s" execute_update(update_query, (email_id,)) return email_data except HTTPException: raise except Exception as e: logger.error(f"❌ Error getting email {email_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/emails/{email_id}/mark-processed") async def mark_email_processed(email_id: int): """Mark email as processed and move to 'Processed' folder""" try: # Update email status and folder update_query = """ UPDATE email_messages SET status = 'processed', folder = 'Processed', processed_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = %s AND deleted_at IS NULL RETURNING id, folder, status """ result = execute_query(update_query, (email_id,)) if not result: raise HTTPException(status_code=404, detail="Email not found") logger.info(f"✅ Email {email_id} marked as processed and moved to Processed folder") return { "success": True, "email_id": result.get('id') if result else email_id, "folder": result.get('folder') if result else 'Processed', "status": result.get('status') if result else 'processed' } except HTTPException: raise except Exception as e: logger.error(f"❌ Error marking email {email_id} as processed: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/emails/{email_id}/attachments/{attachment_id}") async def download_attachment(email_id: int, attachment_id: int): """Download email attachment""" from fastapi.responses import FileResponse import os try: query = """ SELECT a.* FROM email_attachments a JOIN email_messages e ON e.id = a.email_id WHERE a.id = %s AND a.email_id = %s AND e.deleted_at IS NULL """ result = execute_query(query, (attachment_id, email_id)) if not result: raise HTTPException(status_code=404, detail="Attachment not found") attachment = result[0] file_path = attachment['file_path'] if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="File not found on disk") return FileResponse( path=file_path, filename=attachment['filename'], media_type=attachment.get('content_type', 'application/octet-stream') ) except HTTPException: raise except Exception as e: logger.error(f"❌ Error downloading attachment {attachment_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.put("/emails/{email_id}") async def update_email(email_id: int, status: Optional[str] = None): """Update email (archive, mark as read, etc)""" try: # Build update fields dynamically updates = [] params = [] if status: updates.append("status = %s") params.append(status) if not updates: raise HTTPException(status_code=400, detail="No fields to update") params.append(email_id) query = f"UPDATE email_messages SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s" execute_update(query, tuple(params)) logger.info(f"✅ Updated email {email_id}: status={status}") return {"success": True, "message": "Email updated"} except HTTPException: raise except Exception as e: logger.error(f"❌ Error updating email {email_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.patch("/emails/{email_id}/link") async def link_email(email_id: int, payload: Dict): """Link email to a customer and/or vendor/supplier""" try: updates = [] params = [] if 'customer_id' in payload: updates.append("customer_id = %s") params.append(payload['customer_id']) if 'supplier_id' in payload: updates.append("supplier_id = %s") params.append(payload['supplier_id']) if not updates: raise HTTPException(status_code=400, detail="Ingen felter at opdatere") params.append(email_id) query = f"UPDATE email_messages SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s" execute_update(query, tuple(params)) logger.info(f"✅ Linked email {email_id}: {payload}") return {"success": True, "message": "Email linket"} except HTTPException: raise except Exception as e: logger.error(f"❌ Error linking email {email_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/emails/{email_id}/extract-vendor-suggestion") async def extract_vendor_suggestion(email_id: int): """ Udtrækker leverandørinfo fra email body og vedhæftede PDF-fakturaer. Bruger stærke regex-mønstre + AI for CVR, adresse, telefon, domæne. """ import re import os # ── Hjælpefunktioner ──────────────────────────────────────────────────── def clean_phone(raw: str) -> str: """Normaliser telefonnummer til +45 XXXX XXXX eller 8 cifre""" digits = re.sub(r'[^\d+]', '', raw) if digits.startswith('+45') and len(digits) == 11: return digits if digits.startswith('45') and len(digits) == 10: return '+' + digits bare = re.sub(r'\D', '', raw) if len(bare) == 8: return bare return raw.strip()[:20] # Kendte faktureringsplatforme der sender på vegne af leverandører PLATFORM_DOMAINS = { 'e-conomic.com', 'e-conomic.dk', 'dinero.dk', 'billy.dk', 'uniconta.com', 'visma.com', 'simplybilling.dk', 'fakturasend.dk', 'invoicecloud.com', 'invoiced.com', 'stripe.com', 'paypal.com', } # Placeholder-adresser (e-conomic og lignende skabeloner) PLACEHOLDER_ADDRESSES = { 'vejnavn 1, 1234 by', 'vejnavn 1,1234 by', 'vejnavn 1', '1234 by', 'adresse 1', 'eksempel 1', 'gadenavn 1', } def is_placeholder_cvr(cvr: str) -> bool: """Filtrer åbenlyse placeholder/dummy CVRs""" known_fakes = { '12345678', '87654321', '00000000', '11111111', '22222222', '33333333', '44444444', '55555555', '66666666', '77777777', '88888888', '99999999', '12341234', '11223344', '99887766', } if cvr in known_fakes: return True if len(set(cvr)) == 1: return True digits = [int(c) for c in cvr] if all(digits[i+1] - digits[i] == 1 for i in range(7)): return True return False def extract_cvr(text: str, own_cvr: str = '') -> Optional[str]: patterns = [ # Med label r'(?:CVR|Cvr\.?-?nr\.?|cvr|Moms(?:nr\.?|registrerings?nr\.?)|VAT\s*(?:no\.?|nr\.?|number))[:\s.\-–]*(?:DK)?[\s\-]?(\d{8})', # DK-præfiks r'\bDK[\s\-]?(\d{8})\b', # Standalone 8 cifre (sidst – mindst specifik) r'\b(\d{8})\b', ] for pat in patterns: for m in re.finditer(pat, text, re.IGNORECASE): val = m.group(1) if val != own_cvr and val.isdigit() and not is_placeholder_cvr(val): return val return None def extract_phones(text: str) -> Optional[str]: patterns = [ # Med label r'(?:Tlf\.?|Tel\.?|Telefon|Phone|Mobil|Fax)[:\s.\-–]*(\+?[\d][\d\s\-().]{6,18})', # +45 XXXXXXXX r'(\+45[\s\-]?\d{2}[\s\-]?\d{2}[\s\-]?\d{2}[\s\-]?\d{2})', # 8 cifre i grupper: 12 34 56 78 / 1234 5678 r'\b(\d{2}[\s\-]\d{2}[\s\-]\d{2}[\s\-]\d{2})\b', r'\b(\d{4}[\s\-]\d{4})\b', ] for pat in patterns: m = re.search(pat, text, re.IGNORECASE) if m: return clean_phone(m.group(1)) return None def extract_address(text: str) -> Optional[str]: # Format 1: "Vejnavn 12K[, etage/side][ -/,] 4000 By" # Håndterer: "Jernbanegade 12K, st.tv - 4000 Roskilde" # "Nørregade 5, 1. sal, 8000 Aarhus" # "Industrivej 3 - 2200 København N" m = re.search( r'([A-ZÆØÅ][a-zæøåA-ZÆØÅ\-\.]{2,}\s+\d+[A-Za-z]?' r'(?:[,\s]+[a-zæøåA-ZÆØÅ0-9][a-zæøåA-ZÆØÅ0-9\.\s]{0,15}?)?' r'(?:[,\s]+|\s*[-–]\s*)\d{4}\s+[A-ZÆØÅ][a-zæøåA-ZÆØÅ]{2,})', text ) if m: return m.group(0).strip() # Format 2: vejnavn-suffix (vej/gade/alle osv.) + husnummer + postnummer m = re.search( r'([A-ZÆØÅ][a-zæøåA-ZÆØÅ]+(?:vej|gade|alle|vænge|torv|plads|stræde|boulevard|have|bakke|skov|park|strand|mark|eng)' r'\s*\d+[A-Za-z]?(?:[,\s]+|\s*[-–]\s*)\d{4}(?:\s+[A-ZÆØÅ][a-zæøåA-ZÆØÅ]+)?)', text, re.IGNORECASE ) if m: return m.group(0).strip() # Format 3: find postnummer + by, tag kontekst foran m = re.search(r'(\d{4}\s+[A-ZÆØÅ][a-zæøåA-ZÆØÅ]{2,})', text) if m: start = max(0, m.start() - 50) snippet = text[start:m.end()].strip().lstrip('-–, ') return snippet return None def is_platform_or_spam(domain: str) -> bool: """Returnerer True hvis domænet tilhører en platform/mailsystem""" spam = {'gmail.com', 'hotmail.com', 'outlook.com', 'yahoo.com', 'live.com', 'icloud.com'} return domain in spam or domain in PLATFORM_DOMAINS or 'bmc' in domain def extract_domain(text: str, sender_email: str = '') -> Optional[str]: # Eksplicit www m = re.search(r'(?:www\.|https?://)([\w\-]+\.[\w\-]+(?:\.[\w]{2,6})?)', text, re.IGNORECASE) if m: dom = m.group(1).lower() if not is_platform_or_spam(dom): return dom # Emailadresser i teksten for em in re.finditer(r'[\w.\-+]+@([\w\-]+\.[\w\-]+(?:\.[\w]{2,6})?)', text): dom = em.group(1).lower() if not is_platform_or_spam(dom): return dom # Sender email (kun hvis ikke platform) if sender_email and '@' in sender_email: dom = sender_email.split('@')[1].lower() if not is_platform_or_spam(dom): return dom return None def extract_company_name(text: str, sender_name: str = '') -> Optional[str]: """Find firmanavn via DK-suffikser, footer-mønster eller CVR-nær tekst""" # Prioritet 1: navne med DK juridiske suffikser m = re.search( r'\b([\w\s\-&\'\.]+(?:A/S|ApS|IVS|I/S|K/S|P/S|GmbH|Ltd\.?|LLC|AB|AS))\b', text ) if m: return m.group(1).strip() # Prioritet 2: e-conomic footer: "FirmaNavn - Adresse - ..." # Virker både med og uden linjeskift foran m = re.search( r'(?:^|\n)([A-ZÆØÅ][A-Za-zæøåÆØÅ][^\n\-]{1,40}?)\s*[-–]\s*[A-ZÆØÅ][a-zæøåA-ZÆØÅ]', text, re.MULTILINE ) if m: name = m.group(1).strip() if len(name) > 2 and not any(w in name.lower() for w in ('tlf', 'tel', 'mail', 'bank', 'cvr', 'mobil', 'kontonr', 'faktura')): return name # Prioritet 3: tekst umiddelbart FORAN "CVR" (typisk "FirmaNavn CVR-nr.") m = re.search( r'([A-ZÆØÅ][A-Za-zæøåÆØÅ\s&\'\.]{2,50}?)\s*[-–,]?\s*(?:CVR|cvr)', text ) if m: name = m.group(1).strip().rstrip('-–, \t') if len(name) > 2 and not any(w in name.lower() for w in ('tlf', 'mail', 'bank')): return name return sender_name or None def parse_vendor_footer(text: str, own_cvr: str = '') -> dict: """ Parser specifikt til e-conomic/Dinero footer-format: "KONI Accounting - Jernbanegade 12K, st.tv - 4000 Roskilde - DK - CVR-nr.: 35962344" Splitter på ' - ' og identificerer segmenterne. """ result = {} # Find linjer der indeholder både vejnavn/postnummer OG CVR-lignende mønstre # eller blot det klassiske "Firma - Adresse - Postnr By" mønster for line in text.replace('\r', '\n').split('\n'): line = line.strip() if len(line) < 10: continue # Forsøg: split på ' - ' eller ' – ' parts = re.split(r'\s*[-–]\s*', line) if len(parts) < 3: continue # Del 0 er typisk firmanavnet (ingen tal, ingen '@') # Del 1 er typisk adressen (indeholder tal + vejnavn) # Del 2 (eller del med 4 cifre) er postnummer + by name_candidate = parts[0].strip() if not name_candidate or any(c.isdigit() for c in name_candidate[:3]): continue if any(w in name_candidate.lower() for w in ('tlf', 'tel', 'mail', 'bank', 'cvr', 'mobil', 'kontonr')): continue # Find adresse-del (indeholder et vejnummer: bogstaver + tal) addr_part = None zip_city_part = None for part in parts[1:]: part = part.strip() # Postnummer-format: 4 cifre + by if re.match(r'^\d{4}\s+[A-ZÆØÅ]', part): zip_city_part = part elif re.search(r'\d', part) and addr_part is None: # Del med tal = adresse if not re.match(r'^DK$', part.strip(), re.IGNORECASE): addr_part = part if name_candidate and (addr_part or zip_city_part): result['name'] = name_candidate if addr_part and zip_city_part: result['address'] = f"{addr_part}, {zip_city_part}" elif addr_part: result['address'] = addr_part elif zip_city_part: result['address'] = zip_city_part # Find CVR i denne linje cvr_m = re.search(r'CVR[^:]*:\s*(\d{8})', line, re.IGNORECASE) if cvr_m: val = cvr_m.group(1) if val != own_cvr and not is_placeholder_cvr(val): result['cvr_number'] = val # Find telefon i denne linje phone_m = re.search(r'(?:Tlf|Tel|Mobil)[.:]?\s*(\+?[\d][\d\s\-]{6,15})', line, re.IGNORECASE) if phone_m: result['phone'] = clean_phone(phone_m.group(1)) # Find email i denne linje email_m = re.search(r'(?:Mail|E-mail|Email)[.:]?\s*([\w.\-+]+@[\w\-]+\.[\w\-]+)', line, re.IGNORECASE) if email_m: dom = email_m.group(1).split('@')[1].lower() if dom not in PLATFORM_DOMAINS and 'bmc' not in dom: result['email'] = email_m.group(1) result['domain'] = dom if result.get('name') or result.get('cvr_number'): break # Første matchende linje er nok return result # ── Hoved-logik ───────────────────────────────────────────────────────── try: email_result = execute_query( "SELECT * FROM email_messages WHERE id = %s AND deleted_at IS NULL", (email_id,) ) if not email_result: raise HTTPException(status_code=404, detail="Email ikke fundet") email = email_result[0] from app.core.config import settings own_cvr = getattr(settings, 'OWN_CVR', '') # Saml tekst fra body + PDF-bilag text_parts = [] if email.get('body_text'): text_parts.append(("body", email['body_text'])) attachments = execute_query( "SELECT * FROM email_attachments WHERE email_id = %s ORDER BY id", (email_id,) ) for att in (attachments or []): file_path = att.get('file_path') if file_path and os.path.exists(file_path): ct = att.get('content_type', '') if 'pdf' in ct or file_path.lower().endswith('.pdf'): try: pdf_text = await ollama_service._extract_text_from_file(file_path) if pdf_text: text_parts.append(("pdf", pdf_text)) except Exception as e: logger.warning(f"⚠️ Kunne ikke læse PDF {file_path}: {e}") # Prioriter PDF-tekst for leverandørinfo (header + footer indeholder firmainfo) # Tag: første 800 tegn (header) + sidste 800 tegn (footer) fra hvert dokument focused_parts = [] for src, txt in text_parts: if len(txt) > 1200: focused_parts.append(f"[{src} header]\n{txt[:800]}") focused_parts.append(f"[{src} footer]\n{txt[-800:]}") else: focused_parts.append(f"[{src}]\n{txt}") focused_text = "\n\n".join(focused_parts) combined_text = "\n\n".join(t for _, t in text_parts) # Debug: log de første 500 tegn af hvert dokument så vi kan se hvad PDF'en producerer for src, txt in text_parts: logger.info(f"📄 [{src}] tekstlængde={len(txt)} — første 300 tegn: {repr(txt[:300])}") logger.info(f"📄 [{src}] — sidste 300 tegn: {repr(txt[-300:])}") sender_name = email.get('sender_name') or '' sender_email = email.get('sender_email') or '' # Er afsenderen en faktureringsplatform? (e-conomic etc.) sender_domain = sender_email.split('@')[1].lower() if '@' in sender_email else '' is_platform_sender = sender_domain in PLATFORM_DOMAINS # ── Trin 1: Prøv dedikeret footer-parser på FULD tekst ────────────── # (finder "Firma - Adresse - PostnrBy - CVR" linjer overalt i dokumentet) footer_result = parse_vendor_footer(combined_text, own_cvr) logger.info(f"🏷️ Footer-parser resultat: {footer_result}") # Brug ikke sender_email som leverandør-email når det er en platform vendor_email = footer_result.get('email') if not vendor_email: if not is_platform_sender and sender_email: vendor_email = sender_email else: for em in re.finditer(r'[\w.\-+]+@([\w\-]+\.[\w\-]+(?:\.[\w]{2,6})?)', focused_text): dom = em.group(1).lower() if dom not in PLATFORM_DOMAINS and 'bmc' not in dom: vendor_email = em.group(0) break # ── Trin 2: Generisk regex udtræk (supplerer footer-parser) ────────── suggestion = { "name": footer_result.get('name') or extract_company_name(focused_text, sender_name) or sender_name, "email": vendor_email, "cvr_number": footer_result.get('cvr_number') or extract_cvr(focused_text, own_cvr), "phone": footer_result.get('phone') or extract_phones(focused_text), "address": footer_result.get('address') or extract_address(focused_text), "domain": footer_result.get('domain') or extract_domain(focused_text, sender_email if not is_platform_sender else ''), "source": "regex" } logger.info(f"🔍 Regex udtræk for email {email_id}: {suggestion}") # ── AI udtræk (forbedrer regex-resultat) ──────────────────────────── if focused_text.strip(): try: # Send kun den fokuserede tekst (max 4000 tegn) til AI ai_text = focused_text[:4000] # Pre-filtrer hints inden AI ser dem (undgå at sende placeholders som hints) hint_cvr = suggestion.get('cvr_number') if hint_cvr and is_placeholder_cvr(hint_cvr): hint_cvr = None hint_addr = suggestion.get('address') if hint_addr and any(ph in hint_addr.lower() for ph in PLACEHOLDER_ADDRESSES): hint_addr = None hint_domain = suggestion.get('domain') if hint_domain and is_platform_or_spam(hint_domain): hint_domain = None prompt = f"""Du er en ekspert i at udtrække firmaoplysninger fra danske fakturaer og e-mails. OPGAVE: Find LEVERANDØRENS firmaoplysninger i teksten nedenfor. Leverandøren er AFSENDEREN (sælger/udsteder) - IKKE BMC Networks og IKKE køber/modtager. VIGTIGT: E-mails kan være sendt via faktureringsplatforme som e-conomic, Dinero, Billy osv. I så fald er leverandøren den virksomhed DER EJER fakturaen - IKKE platformen selv. Ignorer alle data tilhørende: e-conomic.com, dinero.dk, billy.dk, uniconta.com RETURNER KUN DETTE JSON - ingen forklaring, ingen markdown: {{ "name": "Firmanavn ApS", "cvr_number": "87654321", "address": "Rigtig Vej 5, 2200 København", "phone": "12345678", "email": "kontakt@firma.dk", "domain": "firma.dk" }} REGLER: - name: Firmanavn med A/S, ApS, IVS osv. - IKKE BMC Networks, IKKE e-conomic - cvr_number: Præcis 8 cifre efter "CVR", "CVR-nr", "Moms" eller "DK" - IGNORER {own_cvr}, IGNORER 12345678 (placeholder) - address: Fuld RIGTIG adresse med postnummer og by - IGNORER "Vejnavn 1, 1234 By" (placeholder) - phone: Telefonnummer - foretrukket format: "+45 XXXX XXXX" eller "XXXX XXXX" - email: Kontakt-email til firmaet - IKKE e-conomic.com, IKKE post@e-conomic.com - domain: Hjemmeside-domæne - IKKE e-conomic.com, IKKE faktureringsplatform - Sæt null for felter der IKKE kan findes med sikkerhed KENDTE REGEX-RESULTATER (brug som hjælp - disse er allerede filtrerede): - cvr: {hint_cvr or 'ikke fundet'} - phone: {suggestion.get('phone') or 'ikke fundet'} - address: {hint_addr or 'ikke fundet'} - domain: {hint_domain or 'ikke fundet'} TEKST: {ai_text} JSON:""" ai_result = await ollama_service.extract_from_text(prompt) if ai_result and isinstance(ai_result, dict): improved = False for field in ('name', 'cvr_number', 'address', 'phone', 'email', 'domain'): val = ai_result.get(field) if val and str(val).strip() not in ('null', '', 'N/A', 'None', own_cvr): new_val = str(val).strip() if new_val != str(suggestion.get(field) or ''): suggestion[field] = new_val improved = True if improved: suggestion['source'] = 'ai' logger.info(f"✅ AI vendor suggestion for email {email_id}: {suggestion}") except Exception as e: logger.warning(f"⚠️ AI udtræk fejlede, bruger regex-resultat: {e}") # Rens: fjern platform/spam domæner if suggestion.get('domain') and is_platform_or_spam(suggestion['domain']): suggestion['domain'] = None # Fjern own_cvr og placeholder CVRs if suggestion.get('cvr_number') == own_cvr or ( suggestion.get('cvr_number') and is_placeholder_cvr(suggestion['cvr_number']) ): suggestion['cvr_number'] = None # Fjern placeholder-adresser (e-conomic og lignende skabeloner) addr_lower = (suggestion.get('address') or '').lower().strip() if any(ph in addr_lower for ph in PLACEHOLDER_ADDRESSES): suggestion['address'] = None # Fjern platform email hvis AI satte den alligevel if suggestion.get('email') and '@' in suggestion.get('email', ''): em_domain = suggestion['email'].split('@')[-1].lower() if em_domain in PLATFORM_DOMAINS: suggestion['email'] = None return suggestion except HTTPException: raise except Exception as e: logger.error(f"❌ extract-vendor-suggestion fejlede for email {email_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.delete("/emails/{email_id}") async def delete_email(email_id: int): """Soft delete email""" try: query = """ UPDATE email_messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = %s AND deleted_at IS NULL """ execute_update(query, (email_id,)) logger.info(f"🗑️ Deleted email {email_id}") return {"success": True, "message": "Email deleted"} except Exception as e: logger.error(f"❌ Error deleting email {email_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/emails/{email_id}/reprocess") async def reprocess_email(email_id: int): """Reprocess email (re-classify, run workflows, and apply rules)""" try: # Get email query = "SELECT * FROM email_messages WHERE id = %s AND deleted_at IS NULL" result = execute_query(query, (email_id,)) if not result: raise HTTPException(status_code=404, detail="Email not found") email = result[0] # Re-classify and run full processing pipeline processor = EmailProcessorService() processing_result = await processor.process_single_email(email) # Re-fetch updated email result = execute_query(query, (email_id,)) email = result[0] logger.info(f"🔄 Reprocessed email {email_id}: {email['classification']} ({email.get('confidence_score', 0):.2f})") return { "success": True, "message": "Email reprocessed with workflows", "classification": email['classification'], "confidence": email.get('confidence_score', 0), "workflows_executed": processing_result.get('workflows_executed', 0) } except HTTPException: raise except Exception as e: logger.error(f"❌ Error reprocessing email {email_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/emails/process") async def process_emails(): """Manually trigger email processing""" try: processor = EmailProcessorService() stats = await processor.process_inbox() return { "success": True, "message": "Email processing completed", "stats": stats } except Exception as e: logger.error(f"❌ Email processing failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/emails/upload") async def upload_emails(files: List[UploadFile] = File(...)): """ Upload email files (.eml or .msg) via drag-and-drop Supports multiple files at once """ from app.services.email_service import EmailService from app.services.email_processor_service import EmailProcessorService from app.services.email_workflow_service import email_workflow_service from app.services.email_activity_logger import EmailActivityLogger from app.core.config import settings email_service = EmailService() processor = EmailProcessorService() activity_logger = EmailActivityLogger() results = [] max_size = settings.EMAIL_MAX_UPLOAD_SIZE_MB * 1024 * 1024 # Convert MB to bytes logger.info(f"📤 Upload started: {len(files)} file(s)") for file in files: try: logger.info(f"📄 Processing file: {file.filename}") # Validate file type if not file.filename.lower().endswith(('.eml', '.msg')): logger.warning(f"⚠️ Skipped non-email file: {file.filename}") results.append({ "filename": file.filename, "status": "skipped", "message": "Only .eml and .msg files are supported" }) continue # Read file content content = await file.read() logger.info(f"📊 File size: {len(content)} bytes") # Check file size if len(content) > max_size: logger.warning(f"⚠️ File too large: {file.filename}") results.append({ "filename": file.filename, "status": "error", "message": f"File too large (max {settings.EMAIL_MAX_UPLOAD_SIZE_MB}MB)" }) continue # Parse email based on file type if file.filename.lower().endswith('.eml'): logger.info(f"📧 Parsing .eml file: {file.filename}") email_data = email_service.parse_eml_file(content) else: # .msg logger.info(f"📧 Parsing .msg file: {file.filename}") email_data = email_service.parse_msg_file(content) if not email_data: logger.error(f"❌ Failed to parse: {file.filename}") results.append({ "filename": file.filename, "status": "error", "message": "Failed to parse email file" }) continue logger.info(f"✅ Parsed: {email_data.get('subject', 'No Subject')[:50]}") # Save to database email_id = await email_service.save_uploaded_email(email_data) if email_id is None: logger.info(f"⏭️ Duplicate email: {file.filename}") results.append({ "filename": file.filename, "status": "duplicate", "message": "Email already exists in system" }) continue logger.info(f"💾 Saved to database with ID: {email_id}") # Log activity activity_logger.log_fetched( email_id=email_id, source="manual_upload", metadata={"filename": file.filename} ) # Auto-classify classification = None confidence = None try: logger.info(f"🤖 Classifying email {email_id}...") classification, confidence = await processor.classify_email( email_data['subject'], email_data['body_text'] or email_data['body_html'] ) logger.info(f"✅ Classified as: {classification} ({confidence:.2f})") # Update classification update_query = """ UPDATE email_messages SET classification = %s, confidence_score = %s, classification_date = CURRENT_TIMESTAMP WHERE id = %s """ execute_update(update_query, (classification, confidence, email_id)) activity_logger.log_classified( email_id=email_id, classification=classification, confidence=confidence, metadata={"method": "auto", "source": "manual_upload"} ) except Exception as e: logger.warning(f"⚠️ Classification failed for uploaded email: {e}") # Execute workflows try: logger.info(f"⚙️ Executing workflows for email {email_id}...") await email_workflow_service.execute_workflows_for_email(email_id) except Exception as e: logger.warning(f"⚠️ Workflow execution failed for uploaded email: {e}") results.append({ "filename": file.filename, "status": "success", "message": "Email imported successfully", "email_id": email_id, "subject": email_data['subject'], "classification": classification, "confidence": confidence, "attachments": len(email_data.get('attachments', [])) }) logger.info(f"✅ Successfully processed: {file.filename} -> Email ID {email_id}") except Exception as e: logger.error(f"❌ Failed to process {file.filename}: {e}", exc_info=True) results.append({ "filename": file.filename, "status": "error", "message": str(e) }) # Summary success_count = len([r for r in results if r["status"] == "success"]) duplicate_count = len([r for r in results if r["status"] == "duplicate"]) error_count = len([r for r in results if r["status"] == "error"]) skipped_count = len([r for r in results if r["status"] == "skipped"]) logger.info(f"📊 Upload summary: {success_count} success, {duplicate_count} duplicates, {error_count} errors, {skipped_count} skipped") return { "uploaded": success_count, "duplicates": duplicate_count, "failed": error_count, "skipped": skipped_count, "results": results } @router.get("/emails/processing/stats") async def get_processing_stats(): """Get email processing statistics""" try: query = """ SELECT COUNT(*) as total_emails, COUNT(*) FILTER (WHERE status = 'new') as new_emails, COUNT(*) FILTER (WHERE status = 'processed') as processed_emails, COUNT(*) FILTER (WHERE status = 'error') as error_emails, COUNT(*) FILTER (WHERE has_attachments = true) as with_attachments, COUNT(*) FILTER (WHERE import_method = 'manual_upload') as manually_uploaded, COUNT(*) FILTER (WHERE import_method = 'imap') as from_imap, COUNT(*) FILTER (WHERE import_method = 'graph_api') as from_graph_api, MAX(received_date) as last_email_received FROM email_messages WHERE deleted_at IS NULL AND received_date >= NOW() - INTERVAL '30 days' """ result = execute_query(query) if result: return result[0] else: return { "total_emails": 0, "new_emails": 0, "processed_emails": 0, "error_emails": 0, "with_attachments": 0, "manually_uploaded": 0, "from_imap": 0, "from_graph_api": 0, "last_email_received": None } except Exception as e: logger.error(f"❌ Error getting processing stats: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/emails/bulk/archive") async def bulk_archive(email_ids: List[int]): """Bulk archive emails""" try: if not email_ids: raise HTTPException(status_code=400, detail="No email IDs provided") placeholders = ','.join(['%s'] * len(email_ids)) query = f""" UPDATE email_messages SET status = 'archived', updated_at = CURRENT_TIMESTAMP WHERE id IN ({placeholders}) AND deleted_at IS NULL """ execute_update(query, tuple(email_ids)) logger.info(f"📦 Bulk archived {len(email_ids)} emails") return {"success": True, "message": f"{len(email_ids)} emails archived"} except Exception as e: logger.error(f"❌ Error bulk archiving: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/emails/bulk/reprocess") async def bulk_reprocess(email_ids: List[int]): """Bulk reprocess emails""" try: if not email_ids: raise HTTPException(status_code=400, detail="No email IDs provided") processor = EmailProcessorService() success_count = 0 for email_id in email_ids: try: # Get email query = "SELECT * FROM email_messages WHERE id = %s AND deleted_at IS NULL" result = execute_query(query, (email_id,)) if result: email_data = result[0] # Use central processing logic await processor.process_single_email(email_data) success_count += 1 except Exception as e: logger.error(f"Error reprocessing email {email_id}: {e}") return {"success": True, "count": success_count} except Exception as e: logger.error(f"❌ Error bulk reprocessing: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/emails/bulk/delete") async def bulk_delete(email_ids: List[int]): """Bulk soft delete emails""" try: if not email_ids: raise HTTPException(status_code=400, detail="No email IDs provided") placeholders = ','.join(['%s'] * len(email_ids)) query = f""" UPDATE email_messages SET deleted_at = CURRENT_TIMESTAMP WHERE id IN ({placeholders}) AND deleted_at IS NULL """ execute_update(query, tuple(email_ids)) logger.info(f"🗑️ Bulk deleted {len(email_ids)} emails") return {"success": True, "message": f"{len(email_ids)} emails deleted"} except Exception as e: logger.error(f"❌ Error bulk deleting: {e}") raise HTTPException(status_code=500, detail=str(e)) class ClassificationUpdate(BaseModel): classification: str confidence: Optional[float] = None @router.put("/emails/{email_id}/classify") async def update_classification(email_id: int, data: ClassificationUpdate): """Manually update email classification""" try: valid_classifications = [ 'invoice', 'freight_note', 'order_confirmation', 'time_confirmation', 'case_notification', 'customer_email', 'bankruptcy', 'general', 'spam', 'unknown' ] if data.classification not in valid_classifications: raise HTTPException(status_code=400, detail=f"Invalid classification. Must be one of: {valid_classifications}") confidence = data.confidence if data.confidence is not None else 1.0 query = """ UPDATE email_messages SET classification = %s, confidence_score = %s, classification_date = CURRENT_TIMESTAMP WHERE id = %s AND deleted_at IS NULL """ execute_update(query, (data.classification, confidence, email_id)) logger.info(f"✏️ Manual classification: Email {email_id} → {data.classification}") return { "success": True, "message": f"Email {email_id} classified as '{data.classification}'" } except HTTPException: raise except Exception as e: logger.error(f"❌ Error updating classification: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.delete("/emails/{email_id}") async def delete_email(email_id: int): """Soft delete email""" try: query = """ UPDATE email_messages SET deleted_at = CURRENT_TIMESTAMP WHERE id = %s """ execute_query(query, (email_id,)) return { "success": True, "message": f"Email {email_id} deleted" } except Exception as e: logger.error(f"❌ Error deleting email: {e}") raise HTTPException(status_code=500, detail=str(e)) # Email Rules Endpoints @router.get("/email-rules", response_model=List[EmailRule]) async def list_rules(): """Get all email rules""" try: query = """ SELECT * FROM email_rules ORDER BY priority ASC, name ASC """ result = execute_query(query) return result except Exception as e: logger.error(f"❌ Error listing rules: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/email-rules", response_model=EmailRule) async def create_rule(rule: EmailRule): """Create new email rule""" try: query = """ INSERT INTO email_rules (name, description, conditions, action_type, action_params, priority, enabled, created_by_user_id) VALUES (%s, %s, %s, %s, %s, %s, %s, 1) RETURNING * """ import json result = execute_query(query, ( rule.name, rule.description, json.dumps(rule.conditions), rule.action_type, json.dumps(rule.action_params or {}), rule.priority, rule.enabled )) if result: logger.info(f"✅ Created email rule: {rule.name}") return result[0] else: raise HTTPException(status_code=500, detail="Failed to create rule") except Exception as e: logger.error(f"❌ Error creating rule: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.put("/email-rules/{rule_id}", response_model=EmailRule) async def update_rule(rule_id: int, rule: EmailRule): """Update existing email rule""" try: import json query = """ UPDATE email_rules SET name = %s, description = %s, conditions = %s, action_type = %s, action_params = %s, priority = %s, enabled = %s WHERE id = %s RETURNING * """ result = execute_query(query, ( rule.name, rule.description, json.dumps(rule.conditions), rule.action_type, json.dumps(rule.action_params or {}), rule.priority, rule.enabled, rule_id )) if result: logger.info(f"✅ Updated email rule {rule_id}") return result[0] else: raise HTTPException(status_code=404, detail="Rule not found") except Exception as e: logger.error(f"❌ Error updating rule: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.delete("/email-rules/{rule_id}") async def delete_rule(rule_id: int): """Delete email rule""" try: query = "DELETE FROM email_rules WHERE id = %s" execute_query(query, (rule_id,)) return { "success": True, "message": f"Rule {rule_id} deleted" } except Exception as e: logger.error(f"❌ Error deleting rule: {e}") raise HTTPException(status_code=500, detail=str(e)) # Statistics Endpoint @router.get("/emails/stats/summary") async def get_email_stats(): """Get email processing statistics""" try: query = """ SELECT COUNT(*) as total_emails, COUNT(CASE WHEN status = 'new' THEN 1 END) as new_emails, COUNT(CASE WHEN status = 'processed' THEN 1 END) as processed_emails, COUNT(CASE WHEN classification = 'invoice' THEN 1 END) as invoices, COUNT(CASE WHEN classification = 'time_confirmation' THEN 1 END) as time_confirmations, COUNT(CASE WHEN classification = 'newsletter' THEN 1 END) as newsletters, COUNT(CASE WHEN classification = 'spam' THEN 1 END) as spam_emails, COUNT(CASE WHEN auto_processed THEN 1 END) as auto_processed, AVG(confidence_score) as avg_confidence FROM email_messages WHERE deleted_at IS NULL """ result = execute_query(query) return result[0] if result else {} except Exception as e: logger.error(f"❌ Error getting stats: {e}") raise HTTPException(status_code=500, detail=str(e)) # ========== Workflow Endpoints ========== @router.get("/workflows", response_model=List[EmailWorkflow]) async def list_workflows(): """Get all email workflows""" try: query = """ SELECT * FROM email_workflows ORDER BY priority ASC, name ASC """ result = execute_query(query) return result except Exception as e: logger.error(f"❌ Error listing workflows: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/workflows/{workflow_id}", response_model=EmailWorkflow) async def get_workflow(workflow_id: int): """Get specific workflow by ID""" try: query = "SELECT * FROM email_workflows WHERE id = %s" result = execute_query(query, (workflow_id,)) if not result: raise HTTPException(status_code=404, detail="Workflow not found") return result except HTTPException: raise except Exception as e: logger.error(f"❌ Error getting workflow: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/workflows", response_model=EmailWorkflow) async def create_workflow(workflow: EmailWorkflow): """Create new email workflow""" try: import json query = """ INSERT INTO email_workflows (name, description, classification_trigger, sender_pattern, subject_pattern, confidence_threshold, workflow_steps, priority, enabled, stop_on_match, created_by_user_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 1) RETURNING * """ result = execute_query(query, ( workflow.name, workflow.description, workflow.classification_trigger, workflow.sender_pattern, workflow.subject_pattern, workflow.confidence_threshold, json.dumps(workflow.workflow_steps), workflow.priority, workflow.enabled, workflow.stop_on_match )) if result: logger.info(f"✅ Created workflow: {workflow.name}") return result else: raise HTTPException(status_code=500, detail="Failed to create workflow") except Exception as e: logger.error(f"❌ Error creating workflow: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.put("/workflows/{workflow_id}", response_model=EmailWorkflow) async def update_workflow(workflow_id: int, workflow: EmailWorkflow): """Update existing email workflow""" try: import json query = """ UPDATE email_workflows SET name = %s, description = %s, classification_trigger = %s, sender_pattern = %s, subject_pattern = %s, confidence_threshold = %s, workflow_steps = %s, priority = %s, enabled = %s, stop_on_match = %s WHERE id = %s RETURNING * """ result = execute_query(query, ( workflow.name, workflow.description, workflow.classification_trigger, workflow.sender_pattern, workflow.subject_pattern, workflow.confidence_threshold, json.dumps(workflow.workflow_steps), workflow.priority, workflow.enabled, workflow.stop_on_match, workflow_id )) if result: logger.info(f"✅ Updated workflow {workflow_id}") return result else: raise HTTPException(status_code=404, detail="Workflow not found") except Exception as e: logger.error(f"❌ Error updating workflow: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.delete("/workflows/{workflow_id}") async def delete_workflow(workflow_id: int): """Delete email workflow""" try: query = "DELETE FROM email_workflows WHERE id = %s" execute_update(query, (workflow_id,)) logger.info(f"🗑️ Deleted workflow {workflow_id}") return {"success": True, "message": f"Workflow {workflow_id} deleted"} except Exception as e: logger.error(f"❌ Error deleting workflow: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/workflows/{workflow_id}/toggle") async def toggle_workflow(workflow_id: int): """Toggle workflow enabled status""" try: query = """ UPDATE email_workflows SET enabled = NOT enabled WHERE id = %s RETURNING enabled """ result = execute_query(query, (workflow_id,)) if not result: raise HTTPException(status_code=404, detail="Workflow not found") status = "enabled" if result['enabled'] else "disabled" logger.info(f"🔄 Workflow {workflow_id} {status}") return { "success": True, "workflow_id": workflow_id, "enabled": result['enabled'] } except HTTPException: raise except Exception as e: logger.error(f"❌ Error toggling workflow: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/emails/{email_id}/execute-workflows") async def execute_workflows_for_email(email_id: int): """Manually trigger workflow execution for an email""" try: # Get email data query = """ SELECT id, message_id, subject, sender_email, sender_name, body_text, classification, confidence_score, status FROM email_messages WHERE id = %s AND deleted_at IS NULL """ email_result = execute_query(query, (email_id,)) if not email_result: raise HTTPException(status_code=404, detail="Email not found") email_data = email_result[0] # Get first row as dict # Execute workflows result = await email_workflow_service.execute_workflows(email_data) return result except HTTPException: raise except Exception as e: logger.error(f"❌ Error executing workflows: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/workflow-executions", response_model=List[WorkflowExecution]) async def list_workflow_executions( workflow_id: Optional[int] = Query(None), email_id: Optional[int] = Query(None), status: Optional[str] = Query(None), limit: int = Query(50, le=500) ): """Get workflow execution history""" try: where_clauses = [] params = [] if workflow_id: where_clauses.append("workflow_id = %s") params.append(workflow_id) if email_id: where_clauses.append("email_id = %s") params.append(email_id) if status: where_clauses.append("status = %s") params.append(status) where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" query = f""" SELECT * FROM email_workflow_executions WHERE {where_sql} ORDER BY started_at DESC LIMIT %s """ params.append(limit) result = execute_query(query, tuple(params)) return result except Exception as e: logger.error(f"❌ Error listing workflow executions: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/workflow-actions", response_model=List[WorkflowAction]) async def list_workflow_actions(): """Get all available workflow actions""" try: query = """ SELECT * FROM email_workflow_actions WHERE enabled = true ORDER BY category, name """ result = execute_query(query) return result except Exception as e: logger.error(f"❌ Error listing workflow actions: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/workflows/stats/summary") async def get_workflow_stats(): """Get workflow execution statistics""" try: query = """ SELECT * FROM v_workflow_stats """ result = execute_query(query) return result except Exception as e: logger.error(f"❌ Error getting workflow stats: {e}") raise HTTPException(status_code=500, detail=str(e)) # ========== Email Activity Log Endpoints ========== class EmailActivityLog(BaseModel): id: int email_id: int event_type: str event_category: str description: str metadata: Optional[dict] user_id: Optional[int] user_name: Optional[str] created_at: datetime created_by: str @router.get("/emails/{email_id}/activity", response_model=List[EmailActivityLog]) async def get_email_activity_log(email_id: int, limit: int = Query(default=100, le=500)): """Get complete activity log for an email""" try: query = """ SELECT eal.id, eal.email_id, eal.event_type, eal.event_category, eal.description, eal.metadata, eal.user_id, u.username as user_name, eal.created_at, eal.created_by FROM email_activity_log eal LEFT JOIN users u ON eal.user_id = u.user_id WHERE eal.email_id = %s ORDER BY eal.created_at DESC LIMIT %s """ result = execute_query(query, (email_id, limit)) return result except Exception as e: logger.error(f"❌ Error getting email activity log: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/emails/activity/recent", response_model=List[EmailActivityLog]) async def get_recent_activity( limit: int = Query(default=50, le=200), event_type: Optional[str] = None, event_category: Optional[str] = None ): """Get recent email activity across all emails""" try: conditions = [] params = [] if event_type: conditions.append("eal.event_type = %s") params.append(event_type) if event_category: conditions.append("eal.event_category = %s") params.append(event_category) where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else "" params.append(limit) query = f""" SELECT eal.id, eal.email_id, eal.event_type, eal.event_category, eal.description, eal.metadata, eal.user_id, u.username as user_name, eal.created_at, eal.created_by FROM email_activity_log eal LEFT JOIN users u ON eal.user_id = u.user_id {where_clause} ORDER BY eal.created_at DESC LIMIT %s """ result = execute_query(query, tuple(params)) return result except Exception as e: logger.error(f"❌ Error getting recent activity: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/emails/activity/stats") async def get_activity_stats(): """Get activity statistics""" try: query = """ SELECT event_type, event_category, COUNT(*) as count, MAX(created_at) as last_occurrence FROM email_activity_log WHERE created_at >= NOW() - INTERVAL '7 days' GROUP BY event_type, event_category ORDER BY count DESC """ result = execute_query(query) return result except Exception as e: logger.error(f"❌ Error getting activity stats: {e}") raise HTTPException(status_code=500, detail=str(e))