bmc_hub/app/auth/backend/router.py

210 lines
5.7 KiB
Python
Raw Normal View History

"""
Auth API Router - Login, Logout, Me endpoints
"""
from fastapi import APIRouter, HTTPException, status, Request, Depends, Response
from pydantic import BaseModel
from typing import Optional
from app.core.auth_service import AuthService
from app.core.config import settings
from app.core.auth_dependencies import get_current_user
import logging
logger = logging.getLogger(__name__)
router = APIRouter()
class LoginRequest(BaseModel):
username: str
password: str
otp_code: Optional[str] = None
class LoginResponse(BaseModel):
access_token: str
token_type: str = "bearer"
user: dict
requires_2fa_setup: bool = False
class LogoutRequest(BaseModel):
token_jti: Optional[str] = None
class TwoFactorCodeRequest(BaseModel):
otp_code: str
@router.post("/login", response_model=LoginResponse)
async def login(request: Request, credentials: LoginRequest, response: Response):
"""
Authenticate user and return JWT token
"""
ip_address = request.client.host if request.client else None
# Authenticate user
user, error_detail = AuthService.authenticate_user(
username=credentials.username,
password=credentials.password,
ip_address=ip_address,
otp_code=credentials.otp_code
)
if error_detail:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=error_detail,
headers={"WWW-Authenticate": "Bearer"},
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create access token
access_token = AuthService.create_access_token(
user_id=user['user_id'],
username=user['username'],
is_superadmin=user['is_superadmin'],
is_shadow_admin=user.get('is_shadow_admin', False)
)
requires_2fa_setup = (
not user.get("is_shadow_admin", False)
and not settings.AUTH_DISABLE_2FA
and AuthService.is_2fa_supported()
and not user.get("is_2fa_enabled", False)
)
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
samesite=settings.COOKIE_SAMESITE,
secure=settings.COOKIE_SECURE
)
return LoginResponse(
access_token=access_token,
user=user,
requires_2fa_setup=requires_2fa_setup
)
@router.post("/logout")
async def logout(
response: Response,
current_user: dict = Depends(get_current_user),
request: Optional[LogoutRequest] = None
):
"""
Revoke JWT token (logout)
"""
token_jti = request.token_jti if request and request.token_jti else current_user.get("token_jti")
if token_jti:
AuthService.revoke_token(
token_jti,
current_user['id'],
current_user.get('is_shadow_admin', False)
)
response.delete_cookie("access_token")
return {"message": "Successfully logged out"}
@router.get("/me")
async def get_me(current_user: dict = Depends(get_current_user)):
"""
Get current authenticated user info
"""
return {
"id": current_user['id'],
"username": current_user['username'],
"email": current_user['email'],
"full_name": current_user['full_name'],
"is_superadmin": current_user['is_superadmin'],
"is_2fa_enabled": current_user.get('is_2fa_enabled', False),
"permissions": current_user['permissions']
}
@router.post("/2fa/setup")
async def setup_2fa(current_user: dict = Depends(get_current_user)):
"""Generate and store TOTP secret (requires verification to enable)"""
if current_user.get("is_shadow_admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Shadow admin cannot configure 2FA",
)
try:
result = AuthService.setup_user_2fa(
user_id=current_user["id"],
username=current_user["username"]
)
except RuntimeError as exc:
if "2FA columns missing" in str(exc):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="2FA er ikke tilgaengelig i denne database (mangler kolonner).",
)
raise
return result
@router.post("/2fa/enable")
async def enable_2fa(
request: TwoFactorCodeRequest,
current_user: dict = Depends(get_current_user)
):
"""Enable 2FA after verifying the provided code"""
if current_user.get("is_shadow_admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Shadow admin cannot configure 2FA",
)
ok = AuthService.enable_user_2fa(
user_id=current_user["id"],
otp_code=request.otp_code
)
if not ok:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid 2FA code or missing setup",
)
return {"message": "2FA enabled"}
@router.post("/2fa/disable")
async def disable_2fa(
request: TwoFactorCodeRequest,
current_user: dict = Depends(get_current_user)
):
"""Disable 2FA after verifying the provided code"""
if current_user.get("is_shadow_admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Shadow admin cannot configure 2FA",
)
ok = AuthService.disable_user_2fa(
user_id=current_user["id"],
otp_code=request.otp_code
)
if not ok:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid 2FA code or missing setup",
)
return {"message": "2FA disabled"}