import logging from typing import Any, Dict, List, Optional from urllib.parse import quote import httpx from app.core.config import settings logger = logging.getLogger(__name__) class VaultwardenServiceError(Exception): pass def _is_configured() -> bool: return bool((settings.VAULTWARDEN_BASE_URL or "").strip()) and bool((settings.VAULTWARDEN_API_TOKEN or "").strip()) def _base_url() -> str: return (settings.VAULTWARDEN_BASE_URL or "").strip().rstrip("/") def _headers() -> Dict[str, str]: token = (settings.VAULTWARDEN_API_TOKEN or "").strip() return { "Authorization": f"Bearer {token}", "X-API-Token": token, "Accept": "application/json", } def _extract_from_cipher(payload: dict) -> Optional[dict]: if not isinstance(payload, dict): return None login = payload.get("login") or payload.get("Login") or {} if not isinstance(login, dict): login = {} username = login.get("username") or login.get("Username") password = login.get("password") or login.get("Password") totp = login.get("totp") or login.get("Totp") uris = login.get("uris") or login.get("Uris") or [] url = None if isinstance(uris, list) and uris: first = uris[0] or {} if isinstance(first, dict): url = first.get("uri") or first.get("Uri") if not any([username, password, totp, url, payload.get("notes") or payload.get("Notes")]): return None return { "item_id": str(payload.get("id") or payload.get("Id") or "") or None, "item_name": payload.get("name") or payload.get("Name"), "username": username, "password": password, "totp": totp, "notes": payload.get("notes") or payload.get("Notes"), "url": url, } def _extract_from_custom_payload(payload: Any) -> Optional[dict]: if isinstance(payload, dict): direct = { "item_id": payload.get("item_id") or payload.get("id"), "item_name": payload.get("item_name") or payload.get("name"), "username": payload.get("username"), "password": payload.get("password"), "totp": payload.get("totp") or payload.get("otp"), "notes": payload.get("notes"), "url": payload.get("url"), } if any(direct.values()): return direct nested = payload.get("data") if isinstance(nested, dict): nested_res = _extract_from_custom_payload(nested) if nested_res: return nested_res cipher_res = _extract_from_cipher(payload) if cipher_res: return cipher_res if isinstance(payload, list): for item in payload: extracted = _extract_from_custom_payload(item) if extracted: return extracted return None async def _get_json(client: httpx.AsyncClient, url: str) -> Any: response = await client.get(url) if response.status_code == 404: return None response.raise_for_status() if not response.content: return None return response.json() async def resolve_vault_credentials( *, preferred_item_id: Optional[str], fallback_item_ids: List[str], search_hint: Optional[str], ) -> dict: if not _is_configured(): return { "status": "unavailable", "configured": False, "message": "Vaultwarden er ikke konfigureret.", "checked_item_ids": [], "credential": None, } checked_item_ids: List[str] = [] item_id_candidates = [preferred_item_id] + list(fallback_item_ids) deduped_candidates: List[str] = [] seen = set() for item_id in item_id_candidates: candidate = (item_id or "").strip() if not candidate or candidate in seen: continue seen.add(candidate) deduped_candidates.append(candidate) timeout = httpx.Timeout(connect=6.0, read=10.0, write=10.0, pool=6.0) async with httpx.AsyncClient(timeout=timeout, headers=_headers(), follow_redirects=True) as client: base = _base_url() for item_id in deduped_candidates: checked_item_ids.append(item_id) try: payload = await _get_json(client, f"{base}/api/ciphers/{quote(item_id)}") extracted = _extract_from_custom_payload(payload) if extracted: return { "status": "ok", "configured": True, "message": "Vault-opslag gennemfoert.", "checked_item_ids": checked_item_ids, "credential": extracted, } except httpx.HTTPError as exc: logger.warning("Vaultwarden item lookup failed for id=%s: %s", item_id, exc) hint = (search_hint or "").strip() if hint: encoded_hint = quote(hint) search_endpoints = [ f"{base}/api/links/credentials?search={encoded_hint}", f"{base}/api/ciphers?search={encoded_hint}", f"{base}/api/ciphers?url={encoded_hint}", ] for endpoint in search_endpoints: try: payload = await _get_json(client, endpoint) extracted = _extract_from_custom_payload(payload) if extracted: return { "status": "ok", "configured": True, "message": "Vault-opslag gennemfoert.", "checked_item_ids": checked_item_ids, "credential": extracted, } except httpx.HTTPError as exc: logger.info("Vaultwarden search endpoint failed (%s): %s", endpoint, exc) return { "status": "not_found", "configured": True, "message": "Ingen vault credentials fundet for linket.", "checked_item_ids": checked_item_ids, "credential": None, }