bmc_hub/app/auth/backend/router.py
Christian 693ac4cfd6 feat: Add case types to settings if not found
feat: Update frontend navigation and links for support and CRM sections

fix: Modify subscription listing and stats endpoints to support 'all' status

feat: Implement subscription status filter in the subscriptions list view

feat: Redirect ticket routes to the new sag path

feat: Integrate devportal routes into the main application

feat: Create a wizard for location creation with nested floors and rooms

feat: Add product suppliers table to track multiple suppliers per product

feat: Implement product audit log to track changes in products

feat: Extend location types to include kantine and moedelokale

feat: Add last_2fa_at column to users table for 2FA grace period tracking
2026-02-09 15:30:07 +01:00

193 lines
5.1 KiB
Python

"""
Auth API Router - Login, Logout, Me endpoints
"""
from fastapi import APIRouter, HTTPException, status, Request, Depends, Response
from pydantic import BaseModel
from typing import Optional
from app.core.auth_service import AuthService
from app.core.config import settings
from app.core.auth_dependencies import get_current_user
import logging
logger = logging.getLogger(__name__)
router = APIRouter()
class LoginRequest(BaseModel):
username: str
password: str
otp_code: Optional[str] = None
class LoginResponse(BaseModel):
access_token: str
token_type: str = "bearer"
user: dict
class LogoutRequest(BaseModel):
token_jti: Optional[str] = None
class TwoFactorCodeRequest(BaseModel):
otp_code: str
@router.post("/login", response_model=LoginResponse)
async def login(request: Request, credentials: LoginRequest, response: Response):
"""
Authenticate user and return JWT token
"""
ip_address = request.client.host if request.client else None
# Authenticate user
user, error_detail = AuthService.authenticate_user(
username=credentials.username,
password=credentials.password,
ip_address=ip_address,
otp_code=credentials.otp_code
)
if error_detail:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=error_detail,
headers={"WWW-Authenticate": "Bearer"},
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create access token
access_token = AuthService.create_access_token(
user_id=user['user_id'],
username=user['username'],
is_superadmin=user['is_superadmin'],
is_shadow_admin=user.get('is_shadow_admin', False)
)
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
samesite=settings.COOKIE_SAMESITE,
secure=settings.COOKIE_SECURE
)
return LoginResponse(
access_token=access_token,
user=user
)
@router.post("/logout")
async def logout(
response: Response,
current_user: dict = Depends(get_current_user),
request: Optional[LogoutRequest] = None
):
"""
Revoke JWT token (logout)
"""
token_jti = request.token_jti if request and request.token_jti else current_user.get("token_jti")
if token_jti:
AuthService.revoke_token(
token_jti,
current_user['id'],
current_user.get('is_shadow_admin', False)
)
response.delete_cookie("access_token")
return {"message": "Successfully logged out"}
@router.get("/me")
async def get_me(current_user: dict = Depends(get_current_user)):
"""
Get current authenticated user info
"""
return {
"id": current_user['id'],
"username": current_user['username'],
"email": current_user['email'],
"full_name": current_user['full_name'],
"is_superadmin": current_user['is_superadmin'],
"is_2fa_enabled": current_user.get('is_2fa_enabled', False),
"permissions": current_user['permissions']
}
@router.post("/2fa/setup")
async def setup_2fa(current_user: dict = Depends(get_current_user)):
"""Generate and store TOTP secret (requires verification to enable)"""
if current_user.get("is_shadow_admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Shadow admin cannot configure 2FA",
)
result = AuthService.setup_user_2fa(
user_id=current_user["id"],
username=current_user["username"]
)
return result
@router.post("/2fa/enable")
async def enable_2fa(
request: TwoFactorCodeRequest,
current_user: dict = Depends(get_current_user)
):
"""Enable 2FA after verifying the provided code"""
if current_user.get("is_shadow_admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Shadow admin cannot configure 2FA",
)
ok = AuthService.enable_user_2fa(
user_id=current_user["id"],
otp_code=request.otp_code
)
if not ok:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid 2FA code or missing setup",
)
return {"message": "2FA enabled"}
@router.post("/2fa/disable")
async def disable_2fa(
request: TwoFactorCodeRequest,
current_user: dict = Depends(get_current_user)
):
"""Disable 2FA after verifying the provided code"""
if current_user.get("is_shadow_admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Shadow admin cannot configure 2FA",
)
ok = AuthService.disable_user_2fa(
user_id=current_user["id"],
otp_code=request.otp_code
)
if not ok:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid 2FA code or missing setup",
)
return {"message": "2FA disabled"}