bmc_hub/app/services/vaultwarden_service.py

186 lines
6.0 KiB
Python
Raw Permalink Normal View History

import logging
from typing import Any, Dict, List, Optional
from urllib.parse import quote
import httpx
from app.core.config import settings
logger = logging.getLogger(__name__)
class VaultwardenServiceError(Exception):
pass
def _is_configured() -> bool:
return bool((settings.VAULTWARDEN_BASE_URL or "").strip()) and bool((settings.VAULTWARDEN_API_TOKEN or "").strip())
def _base_url() -> str:
return (settings.VAULTWARDEN_BASE_URL or "").strip().rstrip("/")
def _headers() -> Dict[str, str]:
token = (settings.VAULTWARDEN_API_TOKEN or "").strip()
return {
"Authorization": f"Bearer {token}",
"X-API-Token": token,
"Accept": "application/json",
}
def _extract_from_cipher(payload: dict) -> Optional[dict]:
if not isinstance(payload, dict):
return None
login = payload.get("login") or payload.get("Login") or {}
if not isinstance(login, dict):
login = {}
username = login.get("username") or login.get("Username")
password = login.get("password") or login.get("Password")
totp = login.get("totp") or login.get("Totp")
uris = login.get("uris") or login.get("Uris") or []
url = None
if isinstance(uris, list) and uris:
first = uris[0] or {}
if isinstance(first, dict):
url = first.get("uri") or first.get("Uri")
if not any([username, password, totp, url, payload.get("notes") or payload.get("Notes")]):
return None
return {
"item_id": str(payload.get("id") or payload.get("Id") or "") or None,
"item_name": payload.get("name") or payload.get("Name"),
"username": username,
"password": password,
"totp": totp,
"notes": payload.get("notes") or payload.get("Notes"),
"url": url,
}
def _extract_from_custom_payload(payload: Any) -> Optional[dict]:
if isinstance(payload, dict):
direct = {
"item_id": payload.get("item_id") or payload.get("id"),
"item_name": payload.get("item_name") or payload.get("name"),
"username": payload.get("username"),
"password": payload.get("password"),
"totp": payload.get("totp") or payload.get("otp"),
"notes": payload.get("notes"),
"url": payload.get("url"),
}
if any(direct.values()):
return direct
nested = payload.get("data")
if isinstance(nested, dict):
nested_res = _extract_from_custom_payload(nested)
if nested_res:
return nested_res
cipher_res = _extract_from_cipher(payload)
if cipher_res:
return cipher_res
if isinstance(payload, list):
for item in payload:
extracted = _extract_from_custom_payload(item)
if extracted:
return extracted
return None
async def _get_json(client: httpx.AsyncClient, url: str) -> Any:
response = await client.get(url)
if response.status_code == 404:
return None
response.raise_for_status()
if not response.content:
return None
return response.json()
async def resolve_vault_credentials(
*,
preferred_item_id: Optional[str],
fallback_item_ids: List[str],
search_hint: Optional[str],
) -> dict:
if not _is_configured():
return {
"status": "unavailable",
"configured": False,
"message": "Vaultwarden er ikke konfigureret.",
"checked_item_ids": [],
"credential": None,
}
checked_item_ids: List[str] = []
item_id_candidates = [preferred_item_id] + list(fallback_item_ids)
deduped_candidates: List[str] = []
seen = set()
for item_id in item_id_candidates:
candidate = (item_id or "").strip()
if not candidate or candidate in seen:
continue
seen.add(candidate)
deduped_candidates.append(candidate)
timeout = httpx.Timeout(connect=6.0, read=10.0, write=10.0, pool=6.0)
async with httpx.AsyncClient(timeout=timeout, headers=_headers(), follow_redirects=True) as client:
base = _base_url()
for item_id in deduped_candidates:
checked_item_ids.append(item_id)
try:
payload = await _get_json(client, f"{base}/api/ciphers/{quote(item_id)}")
extracted = _extract_from_custom_payload(payload)
if extracted:
return {
"status": "ok",
"configured": True,
"message": "Vault-opslag gennemfoert.",
"checked_item_ids": checked_item_ids,
"credential": extracted,
}
except httpx.HTTPError as exc:
logger.warning("Vaultwarden item lookup failed for id=%s: %s", item_id, exc)
hint = (search_hint or "").strip()
if hint:
encoded_hint = quote(hint)
search_endpoints = [
f"{base}/api/links/credentials?search={encoded_hint}",
f"{base}/api/ciphers?search={encoded_hint}",
f"{base}/api/ciphers?url={encoded_hint}",
]
for endpoint in search_endpoints:
try:
payload = await _get_json(client, endpoint)
extracted = _extract_from_custom_payload(payload)
if extracted:
return {
"status": "ok",
"configured": True,
"message": "Vault-opslag gennemfoert.",
"checked_item_ids": checked_item_ids,
"credential": extracted,
}
except httpx.HTTPError as exc:
logger.info("Vaultwarden search endpoint failed (%s): %s", endpoint, exc)
return {
"status": "not_found",
"configured": True,
"message": "Ingen vault credentials fundet for linket.",
"checked_item_ids": checked_item_ids,
"credential": None,
}