- Introduced a global search button and modal for enhanced user experience. - Added a new section for displaying email results in the global search modal. - Implemented functionality to fetch and display emails based on user queries. - Updated the UI to include a reminders button and improved accessibility features. fix: Update docker-compose to allow reload configuration - Changed ENABLE_RELOAD environment variable to default to true for easier development. chore: Update requirements for new dependencies - Added brother_ql, pyzbar, and pypdfium2 to requirements for label printing and PDF processing. feat: Implement Brother label printing service - Created a new service for printing labels using Brother QL printers. - Supports direct printing of case hardware labels with customizable layouts. feat: Add Vaultwarden service for credential management - Implemented a service to interact with Vaultwarden for secure credential storage and retrieval. sql: Add migrations for email thread keys and document tokens - Created migrations to backfill email thread keys and manage document tokens for work orders. - Introduced new tables and updated existing structures to support token-based linking of scanned documents. sql: Import links into the database - Added a script to import a predefined set of links into the database with associated categories.
186 lines
6.0 KiB
Python
186 lines
6.0 KiB
Python
import logging
|
|
from typing import Any, Dict, List, Optional
|
|
from urllib.parse import quote
|
|
|
|
import httpx
|
|
|
|
from app.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class VaultwardenServiceError(Exception):
|
|
pass
|
|
|
|
|
|
def _is_configured() -> bool:
|
|
return bool((settings.VAULTWARDEN_BASE_URL or "").strip()) and bool((settings.VAULTWARDEN_API_TOKEN or "").strip())
|
|
|
|
|
|
def _base_url() -> str:
|
|
return (settings.VAULTWARDEN_BASE_URL or "").strip().rstrip("/")
|
|
|
|
|
|
def _headers() -> Dict[str, str]:
|
|
token = (settings.VAULTWARDEN_API_TOKEN or "").strip()
|
|
return {
|
|
"Authorization": f"Bearer {token}",
|
|
"X-API-Token": token,
|
|
"Accept": "application/json",
|
|
}
|
|
|
|
|
|
def _extract_from_cipher(payload: dict) -> Optional[dict]:
|
|
if not isinstance(payload, dict):
|
|
return None
|
|
|
|
login = payload.get("login") or payload.get("Login") or {}
|
|
if not isinstance(login, dict):
|
|
login = {}
|
|
|
|
username = login.get("username") or login.get("Username")
|
|
password = login.get("password") or login.get("Password")
|
|
totp = login.get("totp") or login.get("Totp")
|
|
|
|
uris = login.get("uris") or login.get("Uris") or []
|
|
url = None
|
|
if isinstance(uris, list) and uris:
|
|
first = uris[0] or {}
|
|
if isinstance(first, dict):
|
|
url = first.get("uri") or first.get("Uri")
|
|
|
|
if not any([username, password, totp, url, payload.get("notes") or payload.get("Notes")]):
|
|
return None
|
|
|
|
return {
|
|
"item_id": str(payload.get("id") or payload.get("Id") or "") or None,
|
|
"item_name": payload.get("name") or payload.get("Name"),
|
|
"username": username,
|
|
"password": password,
|
|
"totp": totp,
|
|
"notes": payload.get("notes") or payload.get("Notes"),
|
|
"url": url,
|
|
}
|
|
|
|
|
|
def _extract_from_custom_payload(payload: Any) -> Optional[dict]:
|
|
if isinstance(payload, dict):
|
|
direct = {
|
|
"item_id": payload.get("item_id") or payload.get("id"),
|
|
"item_name": payload.get("item_name") or payload.get("name"),
|
|
"username": payload.get("username"),
|
|
"password": payload.get("password"),
|
|
"totp": payload.get("totp") or payload.get("otp"),
|
|
"notes": payload.get("notes"),
|
|
"url": payload.get("url"),
|
|
}
|
|
if any(direct.values()):
|
|
return direct
|
|
|
|
nested = payload.get("data")
|
|
if isinstance(nested, dict):
|
|
nested_res = _extract_from_custom_payload(nested)
|
|
if nested_res:
|
|
return nested_res
|
|
|
|
cipher_res = _extract_from_cipher(payload)
|
|
if cipher_res:
|
|
return cipher_res
|
|
|
|
if isinstance(payload, list):
|
|
for item in payload:
|
|
extracted = _extract_from_custom_payload(item)
|
|
if extracted:
|
|
return extracted
|
|
|
|
return None
|
|
|
|
|
|
async def _get_json(client: httpx.AsyncClient, url: str) -> Any:
|
|
response = await client.get(url)
|
|
if response.status_code == 404:
|
|
return None
|
|
response.raise_for_status()
|
|
if not response.content:
|
|
return None
|
|
return response.json()
|
|
|
|
|
|
async def resolve_vault_credentials(
|
|
*,
|
|
preferred_item_id: Optional[str],
|
|
fallback_item_ids: List[str],
|
|
search_hint: Optional[str],
|
|
) -> dict:
|
|
if not _is_configured():
|
|
return {
|
|
"status": "unavailable",
|
|
"configured": False,
|
|
"message": "Vaultwarden er ikke konfigureret.",
|
|
"checked_item_ids": [],
|
|
"credential": None,
|
|
}
|
|
|
|
checked_item_ids: List[str] = []
|
|
item_id_candidates = [preferred_item_id] + list(fallback_item_ids)
|
|
deduped_candidates: List[str] = []
|
|
seen = set()
|
|
for item_id in item_id_candidates:
|
|
candidate = (item_id or "").strip()
|
|
if not candidate or candidate in seen:
|
|
continue
|
|
seen.add(candidate)
|
|
deduped_candidates.append(candidate)
|
|
|
|
timeout = httpx.Timeout(connect=6.0, read=10.0, write=10.0, pool=6.0)
|
|
async with httpx.AsyncClient(timeout=timeout, headers=_headers(), follow_redirects=True) as client:
|
|
base = _base_url()
|
|
|
|
for item_id in deduped_candidates:
|
|
checked_item_ids.append(item_id)
|
|
try:
|
|
payload = await _get_json(client, f"{base}/api/ciphers/{quote(item_id)}")
|
|
extracted = _extract_from_custom_payload(payload)
|
|
if extracted:
|
|
return {
|
|
"status": "ok",
|
|
"configured": True,
|
|
"message": "Vault-opslag gennemfoert.",
|
|
"checked_item_ids": checked_item_ids,
|
|
"credential": extracted,
|
|
}
|
|
except httpx.HTTPError as exc:
|
|
logger.warning("Vaultwarden item lookup failed for id=%s: %s", item_id, exc)
|
|
|
|
hint = (search_hint or "").strip()
|
|
if hint:
|
|
encoded_hint = quote(hint)
|
|
search_endpoints = [
|
|
f"{base}/api/links/credentials?search={encoded_hint}",
|
|
f"{base}/api/ciphers?search={encoded_hint}",
|
|
f"{base}/api/ciphers?url={encoded_hint}",
|
|
]
|
|
|
|
for endpoint in search_endpoints:
|
|
try:
|
|
payload = await _get_json(client, endpoint)
|
|
extracted = _extract_from_custom_payload(payload)
|
|
if extracted:
|
|
return {
|
|
"status": "ok",
|
|
"configured": True,
|
|
"message": "Vault-opslag gennemfoert.",
|
|
"checked_item_ids": checked_item_ids,
|
|
"credential": extracted,
|
|
}
|
|
except httpx.HTTPError as exc:
|
|
logger.info("Vaultwarden search endpoint failed (%s): %s", endpoint, exc)
|
|
|
|
return {
|
|
"status": "not_found",
|
|
"configured": True,
|
|
"message": "Ingen vault credentials fundet for linket.",
|
|
"checked_item_ids": checked_item_ids,
|
|
"credential": None,
|
|
}
|