bmc_hub/app/auth/backend/router.py
Christian bc504b9257 feat: Add subscription management functionality and AnyDesk API integration
- Implemented subscription creation, updating, and rendering in script_9.js.
- Added functions for handling subscription line items, product selection, and total calculations.
- Integrated AnyDesk API for session management in test_anydesk.py.
- Created REST client test requests for API endpoints in api.http.
- Developed a script to check ESET machine status and save details in tmp_check_eset_machine.py.
2026-03-30 07:50:15 +02:00

309 lines
9.1 KiB
Python

"""
Auth API Router - Login, Logout, Me endpoints
"""
from fastapi import APIRouter, HTTPException, status, Request, Depends, Response
from pydantic import BaseModel
from typing import Optional
from app.core.auth_service import AuthService
from app.core.config import settings
from app.core.auth_dependencies import get_current_user
from app.core.database import execute_query
import logging
logger = logging.getLogger(__name__)
router = APIRouter()
class LoginRequest(BaseModel):
username: str
password: str
otp_code: Optional[str] = None
class LoginResponse(BaseModel):
access_token: str
token_type: str = "bearer"
user: dict
requires_2fa_setup: bool = False
class LogoutRequest(BaseModel):
token_jti: Optional[str] = None
class TwoFactorCodeRequest(BaseModel):
otp_code: str
@router.post("/login", response_model=LoginResponse)
async def login(request: Request, credentials: LoginRequest, response: Response):
"""
Authenticate user and return JWT token
"""
ip_address = request.client.host if request.client else None
# Authenticate user
user, error_detail = AuthService.authenticate_user(
username=credentials.username,
password=credentials.password,
ip_address=ip_address,
otp_code=credentials.otp_code
)
if error_detail:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=error_detail,
headers={"WWW-Authenticate": "Bearer"},
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create access token
access_token = AuthService.create_access_token(
user_id=user['user_id'],
username=user['username'],
is_superadmin=user['is_superadmin'],
is_shadow_admin=user.get('is_shadow_admin', False)
)
requires_2fa_setup = (
not user.get("is_shadow_admin", False)
and not settings.AUTH_DISABLE_2FA
and AuthService.is_2fa_supported()
and not user.get("is_2fa_enabled", False)
)
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
samesite=settings.COOKIE_SAMESITE,
secure=settings.COOKIE_SECURE
)
return LoginResponse(
access_token=access_token,
user=user,
requires_2fa_setup=requires_2fa_setup
)
@router.post("/logout")
async def logout(
response: Response,
current_user: dict = Depends(get_current_user),
request: Optional[LogoutRequest] = None
):
"""
Revoke JWT token (logout)
"""
token_jti = request.token_jti if request and request.token_jti else current_user.get("token_jti")
if token_jti:
AuthService.revoke_token(
token_jti,
current_user['id'],
current_user.get('is_shadow_admin', False)
)
response.delete_cookie("access_token")
return {"message": "Successfully logged out"}
@router.get("/me")
async def get_me(current_user: dict = Depends(get_current_user)):
"""
Get current authenticated user info
"""
return {
"id": current_user['id'],
"username": current_user['username'],
"email": current_user['email'],
"full_name": current_user['full_name'],
"is_superadmin": current_user['is_superadmin'],
"is_2fa_enabled": current_user.get('is_2fa_enabled', False),
"permissions": current_user['permissions']
}
@router.post("/2fa/setup")
async def setup_2fa(current_user: dict = Depends(get_current_user)):
"""Generate and store TOTP secret (requires verification to enable)"""
if current_user.get("is_shadow_admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Shadow admin cannot configure 2FA",
)
try:
result = AuthService.setup_user_2fa(
user_id=current_user["id"],
username=current_user["username"]
)
except RuntimeError as exc:
if "2FA columns missing" in str(exc):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="2FA er ikke tilgaengelig i denne database (mangler kolonner).",
)
raise
return result
@router.post("/2fa/enable")
async def enable_2fa(
request: TwoFactorCodeRequest,
current_user: dict = Depends(get_current_user)
):
"""Enable 2FA after verifying the provided code"""
if current_user.get("is_shadow_admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Shadow admin cannot configure 2FA",
)
ok = AuthService.enable_user_2fa(
user_id=current_user["id"],
otp_code=request.otp_code
)
if not ok:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid 2FA code or missing setup",
)
return {"message": "2FA enabled"}
@router.post("/2fa/disable")
async def disable_2fa(
request: TwoFactorCodeRequest,
current_user: dict = Depends(get_current_user)
):
"""Disable 2FA after verifying the provided code"""
if current_user.get("is_shadow_admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Shadow admin cannot configure 2FA",
)
ok = AuthService.disable_user_2fa(
user_id=current_user["id"],
otp_code=request.otp_code
)
if not ok:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid 2FA code or missing setup",
)
return {"message": "2FA disabled"}
# ─── User Profile ─────────────────────────────────────────────────────────────
class UserProfileUpdate(BaseModel):
full_name: Optional[str] = None
phone: Optional[str] = None
title: Optional[str] = None
anydesk_id: Optional[str] = None
@router.get("/me/profile")
async def get_my_profile(current_user: dict = Depends(get_current_user)):
"""Get current user's extended profile fields"""
rows = execute_query(
"SELECT full_name, phone, title, anydesk_id FROM users WHERE user_id = %s",
(current_user["id"],)
)
if not rows:
raise HTTPException(status_code=404, detail="User not found")
return dict(rows[0])
@router.patch("/me/profile")
async def update_my_profile(
payload: UserProfileUpdate,
current_user: dict = Depends(get_current_user)
):
"""Update current user's profile fields"""
fields = []
values = []
if payload.full_name is not None:
fields.append("full_name = %s")
values.append(payload.full_name.strip() or None)
if payload.phone is not None:
fields.append("phone = %s")
values.append(payload.phone.strip() or None)
if payload.title is not None:
fields.append("title = %s")
values.append(payload.title.strip() or None)
if payload.anydesk_id is not None:
fields.append("anydesk_id = %s")
values.append(payload.anydesk_id.strip() or None)
if not fields:
raise HTTPException(status_code=400, detail="No fields to update")
fields.append("updated_at = NOW()")
values.append(current_user["id"])
execute_query(
f"UPDATE users SET {', '.join(fields)} WHERE user_id = %s",
tuple(values)
)
return {"message": "Profil opdateret"}
# ─── User AnyDesk IDs (multiple per technician) ───────────────────────────────
class AnyDeskIdAdd(BaseModel):
anydesk_id: str
label: Optional[str] = None
@router.get("/me/anydesk-ids")
async def get_my_anydesk_ids(current_user: dict = Depends(get_current_user)):
rows = execute_query(
"SELECT id, anydesk_id, label, created_at FROM user_anydesk_ids WHERE user_id = %s ORDER BY created_at",
(current_user["id"],)
)
return {"ids": [dict(r) for r in (rows or [])]}
@router.post("/me/anydesk-ids", status_code=201)
async def add_my_anydesk_id(payload: AnyDeskIdAdd, current_user: dict = Depends(get_current_user)):
ad_id = payload.anydesk_id.strip()
if not ad_id:
raise HTTPException(status_code=400, detail="anydesk_id cannot be empty")
try:
execute_query(
"INSERT INTO user_anydesk_ids (user_id, anydesk_id, label) VALUES (%s, %s, %s)",
(current_user["id"], ad_id, payload.label or None)
)
except Exception:
raise HTTPException(status_code=409, detail="AnyDesk ID allerede tilføjet")
return {"message": "Tilføjet"}
@router.delete("/me/anydesk-ids/{entry_id}")
async def delete_my_anydesk_id(entry_id: int, current_user: dict = Depends(get_current_user)):
rows = execute_query(
"DELETE FROM user_anydesk_ids WHERE id = %s AND user_id = %s RETURNING id",
(entry_id, current_user["id"])
)
if not rows:
raise HTTPException(status_code=404, detail="Ikke fundet")
return {"message": "Slettet"}