bmc_hub/app/auth/backend/router.py
Christian 56d6d45aa2 feat(sag): Add Varekøb & Salg module with database migration and frontend template
- Created a new SQL migration for the sag_salgsvarer table to manage sales and purchase items.
- Implemented a new HTML template for the Varekøb & Salg module, including summary cards and tables for sales and purchases.
- Added JavaScript functions for loading and rendering order data dynamically.
- Introduced a new backend search module for customers, contacts, hardware, and locations with autocomplete functionality.
- Developed an email templates API for managing system and customer-specific email templates.
- Created multiple migrations for Nextcloud instances, cache, audit logs, email templates, sag comments, hardware locations, and billing methods.
- Enhanced the sag module with solutions, order lines, work types, and 2FA support for user authentication.
2026-02-02 20:23:56 +01:00

190 lines
4.9 KiB
Python

"""
Auth API Router - Login, Logout, Me endpoints
"""
from fastapi import APIRouter, HTTPException, status, Request, Depends, Response
from pydantic import BaseModel
from typing import Optional
from app.core.auth_service import AuthService
from app.core.auth_dependencies import get_current_user
import logging
logger = logging.getLogger(__name__)
router = APIRouter()
class LoginRequest(BaseModel):
username: str
password: str
otp_code: Optional[str] = None
class LoginResponse(BaseModel):
access_token: str
token_type: str = "bearer"
user: dict
class LogoutRequest(BaseModel):
token_jti: str
class TwoFactorCodeRequest(BaseModel):
otp_code: str
@router.post("/login", response_model=LoginResponse)
async def login(request: Request, credentials: LoginRequest, response: Response):
"""
Authenticate user and return JWT token
"""
ip_address = request.client.host if request.client else None
# Authenticate user
user, error_detail = AuthService.authenticate_user(
username=credentials.username,
password=credentials.password,
ip_address=ip_address,
otp_code=credentials.otp_code
)
if error_detail:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=error_detail,
headers={"WWW-Authenticate": "Bearer"},
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create access token
access_token = AuthService.create_access_token(
user_id=user['user_id'],
username=user['username'],
is_superadmin=user['is_superadmin'],
is_shadow_admin=user.get('is_shadow_admin', False)
)
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
samesite="lax",
secure=False
)
return LoginResponse(
access_token=access_token,
user=user
)
@router.post("/logout")
async def logout(
request: LogoutRequest,
response: Response,
current_user: dict = Depends(get_current_user)
):
"""
Revoke JWT token (logout)
"""
AuthService.revoke_token(
request.token_jti,
current_user['id'],
current_user.get('is_shadow_admin', False)
)
response.delete_cookie("access_token")
return {"message": "Successfully logged out"}
@router.get("/me")
async def get_me(current_user: dict = Depends(get_current_user)):
"""
Get current authenticated user info
"""
return {
"id": current_user['id'],
"username": current_user['username'],
"email": current_user['email'],
"full_name": current_user['full_name'],
"is_superadmin": current_user['is_superadmin'],
"is_2fa_enabled": current_user.get('is_2fa_enabled', False),
"permissions": current_user['permissions']
}
@router.post("/2fa/setup")
async def setup_2fa(current_user: dict = Depends(get_current_user)):
"""Generate and store TOTP secret (requires verification to enable)"""
if current_user.get("is_shadow_admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Shadow admin cannot configure 2FA",
)
result = AuthService.setup_user_2fa(
user_id=current_user["id"],
username=current_user["username"]
)
return result
@router.post("/2fa/enable")
async def enable_2fa(
request: TwoFactorCodeRequest,
current_user: dict = Depends(get_current_user)
):
"""Enable 2FA after verifying the provided code"""
if current_user.get("is_shadow_admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Shadow admin cannot configure 2FA",
)
ok = AuthService.enable_user_2fa(
user_id=current_user["id"],
otp_code=request.otp_code
)
if not ok:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid 2FA code or missing setup",
)
return {"message": "2FA enabled"}
@router.post("/2fa/disable")
async def disable_2fa(
request: TwoFactorCodeRequest,
current_user: dict = Depends(get_current_user)
):
"""Disable 2FA after verifying the provided code"""
if current_user.get("is_shadow_admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Shadow admin cannot configure 2FA",
)
ok = AuthService.disable_user_2fa(
user_id=current_user["id"],
otp_code=request.otp_code
)
if not ok:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid 2FA code or missing setup",
)
return {"message": "2FA disabled"}