bmc_hub/app/modules/nextcloud/backend/router.py
Christian 693ac4cfd6 feat: Add case types to settings if not found
feat: Update frontend navigation and links for support and CRM sections

fix: Modify subscription listing and stats endpoints to support 'all' status

feat: Implement subscription status filter in the subscriptions list view

feat: Redirect ticket routes to the new sag path

feat: Integrate devportal routes into the main application

feat: Create a wizard for location creation with nested floors and rooms

feat: Add product suppliers table to track multiple suppliers per product

feat: Implement product audit log to track changes in products

feat: Extend location types to include kantine and moedelokale

feat: Add last_2fa_at column to users table for 2FA grace period tracking
2026-02-09 15:30:07 +01:00

319 lines
10 KiB
Python

"""
Nextcloud Module - API Router
"""
import json
import logging
import secrets
from typing import List, Optional
from fastapi import APIRouter, HTTPException, Query
from app.core.crypto import encrypt_secret
from app.core.database import execute_query
from app.modules.nextcloud.backend.service import NextcloudService
from app.modules.nextcloud.models.schemas import (
NextcloudInstanceCreate,
NextcloudInstanceUpdate,
NextcloudUserCreate,
NextcloudPasswordReset,
)
logger = logging.getLogger(__name__)
router = APIRouter()
service = NextcloudService()
def _audit(customer_id: int, instance_id: int, event_type: str, request_meta: dict, response_meta: dict):
query = """
INSERT INTO nextcloud_audit_log
(customer_id, instance_id, event_type, request_meta, response_meta)
VALUES (%s, %s, %s, %s, %s)
"""
execute_query(
query,
(
customer_id,
instance_id,
event_type,
json.dumps(request_meta),
json.dumps(response_meta),
),
)
@router.get("/instances")
async def list_instances(customer_id: Optional[int] = Query(None)):
query = "SELECT * FROM nextcloud_instances WHERE deleted_at IS NULL"
params: List[int] = []
if customer_id is not None:
query += " AND customer_id = %s"
params.append(customer_id)
return execute_query(query, tuple(params)) or []
@router.get("/customers/{customer_id}/instance")
async def get_instance_for_customer(customer_id: int):
query = "SELECT * FROM nextcloud_instances WHERE customer_id = %s AND deleted_at IS NULL"
result = execute_query(query, (customer_id,))
if not result:
return None
return result[0]
@router.post("/instances")
async def create_instance(payload: NextcloudInstanceCreate):
try:
password_encrypted = encrypt_secret(payload.password)
query = """
INSERT INTO nextcloud_instances
(customer_id, base_url, auth_type, username, password_encrypted)
VALUES (%s, %s, %s, %s, %s)
RETURNING *
"""
result = execute_query(
query,
(
payload.customer_id,
payload.base_url,
payload.auth_type,
payload.username,
password_encrypted,
),
)
return result[0] if result else None
except Exception as exc:
logger.error("❌ Failed to create Nextcloud instance: %s", exc)
raise HTTPException(status_code=500, detail="Failed to create instance")
@router.patch("/instances/{instance_id}")
async def update_instance(instance_id: int, payload: NextcloudInstanceUpdate):
updates = []
params = []
if payload.base_url is not None:
updates.append("base_url = %s")
params.append(payload.base_url)
if payload.auth_type is not None:
updates.append("auth_type = %s")
params.append(payload.auth_type)
if payload.username is not None:
updates.append("username = %s")
params.append(payload.username)
if payload.password is not None:
updates.append("password_encrypted = %s")
params.append(encrypt_secret(payload.password))
if payload.is_enabled is not None:
updates.append("is_enabled = %s")
params.append(payload.is_enabled)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
updates.append("updated_at = NOW()")
params.append(instance_id)
query = f"UPDATE nextcloud_instances SET {', '.join(updates)} WHERE id = %s RETURNING *"
result = execute_query(query, tuple(params))
if not result:
raise HTTPException(status_code=404, detail="Instance not found")
return result[0]
@router.post("/instances/{instance_id}/disable")
async def disable_instance(instance_id: int):
query = """
UPDATE nextcloud_instances
SET is_enabled = false, disabled_at = NOW(), updated_at = NOW()
WHERE id = %s
RETURNING *
"""
result = execute_query(query, (instance_id,))
if not result:
raise HTTPException(status_code=404, detail="Instance not found")
return result[0]
@router.post("/instances/{instance_id}/enable")
async def enable_instance(instance_id: int):
query = """
UPDATE nextcloud_instances
SET is_enabled = true, disabled_at = NULL, updated_at = NOW()
WHERE id = %s
RETURNING *
"""
result = execute_query(query, (instance_id,))
if not result:
raise HTTPException(status_code=404, detail="Instance not found")
return result[0]
@router.post("/instances/{instance_id}/rotate-credentials")
async def rotate_credentials(instance_id: int, payload: NextcloudInstanceUpdate):
if not payload.password:
raise HTTPException(status_code=400, detail="Password is required")
query = """
UPDATE nextcloud_instances
SET password_encrypted = %s, updated_at = NOW()
WHERE id = %s
RETURNING *
"""
result = execute_query(query, (encrypt_secret(payload.password), instance_id))
if not result:
raise HTTPException(status_code=404, detail="Instance not found")
return result[0]
@router.get("/instances/{instance_id}/status")
async def get_status(instance_id: int, customer_id: Optional[int] = Query(None)):
response = await service.get_status(instance_id, customer_id)
if customer_id is not None:
_audit(customer_id, instance_id, "status", {"instance_id": instance_id}, response)
return response
@router.get("/instances/{instance_id}/groups")
async def list_groups(instance_id: int, customer_id: Optional[int] = Query(None)):
response = await service.list_groups(instance_id, customer_id)
if customer_id is not None:
_audit(customer_id, instance_id, "groups", {"instance_id": instance_id}, response)
return response
@router.get("/instances/{instance_id}/users")
async def list_users(
instance_id: int,
customer_id: Optional[int] = Query(None),
search: Optional[str] = Query(None),
include_details: bool = Query(False),
limit: int = Query(200, ge=1, le=500),
):
if include_details:
response = await service.list_users_details(instance_id, customer_id, search, limit)
else:
response = await service.list_users(instance_id, customer_id, search)
if customer_id is not None:
_audit(
customer_id,
instance_id,
"users",
{
"instance_id": instance_id,
"search": search,
"include_details": include_details,
"limit": limit,
},
response,
)
return response
@router.get("/instances/{instance_id}/users/{uid}")
async def get_user_details(
instance_id: int,
uid: str,
customer_id: Optional[int] = Query(None),
):
response = await service.get_user_details(instance_id, uid, customer_id)
if customer_id is not None:
_audit(
customer_id,
instance_id,
"user_details",
{"instance_id": instance_id, "uid": uid},
response,
)
return response
@router.get("/instances/{instance_id}/shares")
async def list_shares(instance_id: int, customer_id: Optional[int] = Query(None)):
response = await service.list_public_shares(instance_id, customer_id)
if customer_id is not None:
_audit(customer_id, instance_id, "shares", {"instance_id": instance_id}, response)
return response
@router.post("/instances/{instance_id}/users")
async def create_user(instance_id: int, payload: NextcloudUserCreate, customer_id: Optional[int] = Query(None)):
password = secrets.token_urlsafe(12)
request_payload = {
"userid": payload.uid,
"password": password,
"email": payload.email,
"displayName": payload.display_name,
"groups[]": payload.groups,
}
response = await service.create_user(instance_id, customer_id, request_payload)
if customer_id is not None:
_audit(customer_id, instance_id, "create_user", {"uid": payload.uid}, response)
return {"result": response, "generated_password": password if payload.send_welcome else None}
@router.post("/instances/{instance_id}/users/{uid}/reset-password")
async def reset_password(
instance_id: int,
uid: str,
payload: NextcloudPasswordReset,
customer_id: Optional[int] = Query(None),
):
password = secrets.token_urlsafe(12)
response = await service.reset_password(instance_id, customer_id, uid, password)
if customer_id is not None:
_audit(customer_id, instance_id, "reset_password", {"uid": uid}, response)
return {"result": response, "generated_password": password if payload.send_email else None}
@router.post("/instances/{instance_id}/users/{uid}/disable")
async def disable_user(instance_id: int, uid: str, customer_id: Optional[int] = Query(None)):
response = await service.disable_user(instance_id, customer_id, uid)
if customer_id is not None:
_audit(customer_id, instance_id, "disable_user", {"uid": uid}, response)
return response
@router.post("/instances/{instance_id}/users/{uid}/resend-guide")
async def resend_guide(instance_id: int, uid: str, customer_id: Optional[int] = Query(None)):
response = {"status": "queued", "uid": uid}
if customer_id is not None:
_audit(customer_id, instance_id, "resend_guide", {"uid": uid}, response)
return response
@router.get("/audit")
async def list_audit(
customer_id: int = Query(...),
instance_id: Optional[int] = Query(None),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
):
query = """
SELECT * FROM nextcloud_audit_log
WHERE customer_id = %s
"""
params: List[object] = [customer_id]
if instance_id is not None:
query += " AND instance_id = %s"
params.append(instance_id)
query += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
params.extend([limit, offset])
return execute_query(query, tuple(params)) or []
@router.post("/audit/purge")
async def purge_audit(data: dict):
customer_id = data.get("customer_id")
before_date = data.get("before_date")
if not customer_id or not before_date:
raise HTTPException(status_code=400, detail="customer_id and before_date are required")
query = """
DELETE FROM nextcloud_audit_log
WHERE customer_id = %s AND created_at < %s
"""
deleted = execute_query(query, (customer_id, before_date))
return {"deleted": deleted}