bmc_hub/app/modules/nextcloud/backend/service.py

302 lines
10 KiB
Python
Raw Normal View History

"""
Nextcloud Integration Service
Direct OCS API calls with DB cache and audit logging.
"""
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, Optional
import aiohttp
from app.core.config import settings
from app.core.crypto import decrypt_secret
from app.core.database import execute_query
logger = logging.getLogger(__name__)
class NextcloudService:
def __init__(self) -> None:
self.read_only = settings.NEXTCLOUD_READ_ONLY
self.dry_run = settings.NEXTCLOUD_DRY_RUN
self.timeout = settings.NEXTCLOUD_TIMEOUT_SECONDS
self.cache_ttl = settings.NEXTCLOUD_CACHE_TTL_SECONDS
if self.read_only:
logger.warning("🔒 Nextcloud READ_ONLY MODE ENABLED")
elif self.dry_run:
logger.warning("🏃 Nextcloud DRY_RUN MODE ENABLED")
else:
logger.warning("⚠️ Nextcloud WRITE MODE ACTIVE")
def _get_instance(self, instance_id: int, customer_id: Optional[int] = None) -> Optional[dict]:
query = "SELECT * FROM nextcloud_instances WHERE id = %s AND deleted_at IS NULL"
params = [instance_id]
if customer_id is not None:
query += " AND customer_id = %s"
params.append(customer_id)
result = execute_query(query, tuple(params))
return result[0] if result else None
def _get_auth(self, instance: dict) -> Optional[aiohttp.BasicAuth]:
password = decrypt_secret(instance["password_encrypted"])
if not password:
return None
return aiohttp.BasicAuth(instance["username"], password)
def _cache_get(self, cache_key: str) -> Optional[dict]:
query = "SELECT payload FROM nextcloud_cache WHERE cache_key = %s AND expires_at > NOW()"
result = execute_query(query, (cache_key,))
if result:
return result[0]["payload"]
return None
def _cache_set(self, cache_key: str, payload: dict) -> None:
expires_at = datetime.utcnow() + timedelta(seconds=self.cache_ttl)
query = """
INSERT INTO nextcloud_cache (cache_key, payload, expires_at)
VALUES (%s, %s, %s)
ON CONFLICT (cache_key) DO UPDATE
SET payload = EXCLUDED.payload, expires_at = EXCLUDED.expires_at
"""
execute_query(query, (cache_key, json.dumps(payload), expires_at))
def _audit(
self,
customer_id: int,
instance_id: int,
event_type: str,
request_meta: dict,
response_meta: dict,
actor_user_id: Optional[int] = None,
) -> None:
query = """
INSERT INTO nextcloud_audit_log
(customer_id, instance_id, event_type, request_meta, response_meta, actor_user_id)
VALUES (%s, %s, %s, %s, %s, %s)
"""
execute_query(
query,
(
customer_id,
instance_id,
event_type,
json.dumps(request_meta),
json.dumps(response_meta),
actor_user_id,
),
)
def _check_write_permission(self, operation: str) -> bool:
if self.read_only:
logger.error("🚫 BLOCKED: %s - READ_ONLY mode is enabled", operation)
return False
if self.dry_run:
logger.warning("🏃 DRY_RUN: %s - Operation will not be executed", operation)
return False
logger.warning("⚠️ EXECUTING WRITE OPERATION: %s", operation)
return True
async def _ocs_request(
self,
instance: dict,
endpoint: str,
method: str = "GET",
params: Optional[dict] = None,
data: Optional[dict] = None,
use_cache: bool = True,
) -> dict:
cache_key = None
if use_cache and method.upper() == "GET":
cache_key = f"nextcloud:{instance['id']}:{endpoint}:{json.dumps(params or {}, sort_keys=True)}"
cached = self._cache_get(cache_key)
if cached:
cached["cache_hit"] = True
return cached
auth = self._get_auth(instance)
if not auth:
return {"error": "credentials_invalid"}
base_url = instance["base_url"].rstrip("/")
url = f"{base_url}/{endpoint.lstrip('/')}"
headers = {"OCS-APIRequest": "true", "Accept": "application/json"}
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
async with session.request(
method=method.upper(),
url=url,
headers=headers,
auth=auth,
params=params,
data=data,
) as resp:
try:
payload = await resp.json()
except Exception:
payload = {"raw": await resp.text()}
response = {
"status": resp.status,
"payload": payload,
"cache_hit": False,
}
if cache_key and resp.status == 200:
self._cache_set(cache_key, response)
return response
async def get_status(self, instance_id: int, customer_id: Optional[int] = None) -> dict:
instance = self._get_instance(instance_id, customer_id)
if not instance or not instance["is_enabled"]:
return {"status": "offline", "checked_at": datetime.utcnow().isoformat()}
response = await self._ocs_request(
instance,
"/ocs/v2.php/apps/serverinfo/api/v1/info",
method="GET",
use_cache=True,
)
return {
"status": "online" if response.get("status") == 200 else "unknown",
"checked_at": datetime.utcnow().isoformat(),
"raw": response,
}
async def list_groups(self, instance_id: int, customer_id: Optional[int] = None) -> dict:
instance = self._get_instance(instance_id, customer_id)
if not instance or not instance["is_enabled"]:
return {"groups": []}
return await self._ocs_request(
instance,
"/ocs/v1.php/cloud/groups",
method="GET",
use_cache=True,
)
async def list_users(
self,
instance_id: int,
customer_id: Optional[int] = None,
search: Optional[str] = None,
) -> dict:
instance = self._get_instance(instance_id, customer_id)
if not instance or not instance["is_enabled"]:
return {"users": []}
params = {"search": search} if search else None
return await self._ocs_request(
instance,
"/ocs/v1.php/cloud/users",
method="GET",
params=params,
use_cache=False,
)
async def get_user_details(
self,
instance_id: int,
uid: str,
customer_id: Optional[int] = None,
) -> dict:
instance = self._get_instance(instance_id, customer_id)
if not instance or not instance["is_enabled"]:
return {"user": None}
return await self._ocs_request(
instance,
f"/ocs/v1.php/cloud/users/{uid}",
method="GET",
use_cache=False,
)
async def list_users_details(
self,
instance_id: int,
customer_id: Optional[int] = None,
search: Optional[str] = None,
limit: int = 200,
) -> dict:
response = await self.list_users(instance_id, customer_id, search)
users = response.get("payload", {}).get("ocs", {}).get("data", {}).get("users", [])
if not isinstance(users, list):
users = []
users = users[: max(1, min(limit, 500))]
detailed = []
for uid in users:
detail_resp = await self.get_user_details(instance_id, uid, customer_id)
data = detail_resp.get("payload", {}).get("ocs", {}).get("data", {}) if isinstance(detail_resp, dict) else {}
detailed.append({
"uid": uid,
"display_name": data.get("displayname") if isinstance(data, dict) else None,
"email": data.get("email") if isinstance(data, dict) else None,
})
return {"users": detailed}
async def list_public_shares(self, instance_id: int, customer_id: Optional[int] = None) -> dict:
instance = self._get_instance(instance_id, customer_id)
if not instance or not instance["is_enabled"]:
return {"payload": {"ocs": {"data": []}}}
return await self._ocs_request(
instance,
"/ocs/v1.php/apps/files_sharing/api/v1/shares",
method="GET",
params={"share_type": 3},
use_cache=True,
)
async def create_user(self, instance_id: int, customer_id: Optional[int], payload: dict) -> dict:
if not self._check_write_permission("create_nextcloud_user"):
return {"blocked": True, "read_only": self.read_only, "dry_run": self.dry_run}
instance = self._get_instance(instance_id, customer_id)
if not instance or not instance["is_enabled"]:
return {"error": "instance_unavailable"}
return await self._ocs_request(
instance,
"/ocs/v1.php/cloud/users",
method="POST",
data=payload,
use_cache=False,
)
async def reset_password(self, instance_id: int, customer_id: Optional[int], uid: str, password: str) -> dict:
if not self._check_write_permission("reset_nextcloud_password"):
return {"blocked": True, "read_only": self.read_only, "dry_run": self.dry_run}
instance = self._get_instance(instance_id, customer_id)
if not instance or not instance["is_enabled"]:
return {"error": "instance_unavailable"}
return await self._ocs_request(
instance,
f"/ocs/v1.php/cloud/users/{uid}",
method="PUT",
data={"password": password},
use_cache=False,
)
async def disable_user(self, instance_id: int, customer_id: Optional[int], uid: str) -> dict:
if not self._check_write_permission("disable_nextcloud_user"):
return {"blocked": True, "read_only": self.read_only, "dry_run": self.dry_run}
instance = self._get_instance(instance_id, customer_id)
if not instance or not instance["is_enabled"]:
return {"error": "instance_unavailable"}
return await self._ocs_request(
instance,
f"/ocs/v1.php/cloud/users/{uid}/disable",
method="PUT",
use_cache=False,
)