""" Auth API Router - Login, Logout, Me endpoints """ from fastapi import APIRouter, HTTPException, status, Request, Depends, Response from pydantic import BaseModel from typing import Optional from app.core.auth_service import AuthService from app.core.config import settings from app.core.auth_dependencies import get_current_user import logging logger = logging.getLogger(__name__) router = APIRouter() class LoginRequest(BaseModel): username: str password: str otp_code: Optional[str] = None class LoginResponse(BaseModel): access_token: str token_type: str = "bearer" user: dict requires_2fa_setup: bool = False class LogoutRequest(BaseModel): token_jti: Optional[str] = None class TwoFactorCodeRequest(BaseModel): otp_code: str @router.post("/login", response_model=LoginResponse) async def login(request: Request, credentials: LoginRequest, response: Response): """ Authenticate user and return JWT token """ ip_address = request.client.host if request.client else None # Authenticate user user, error_detail = AuthService.authenticate_user( username=credentials.username, password=credentials.password, ip_address=ip_address, otp_code=credentials.otp_code ) if error_detail: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=error_detail, headers={"WWW-Authenticate": "Bearer"}, ) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password", headers={"WWW-Authenticate": "Bearer"}, ) # Create access token access_token = AuthService.create_access_token( user_id=user['user_id'], username=user['username'], is_superadmin=user['is_superadmin'], is_shadow_admin=user.get('is_shadow_admin', False) ) requires_2fa_setup = ( not user.get("is_shadow_admin", False) and not settings.AUTH_DISABLE_2FA and AuthService.is_2fa_supported() and not user.get("is_2fa_enabled", False) ) response.set_cookie( key="access_token", value=access_token, httponly=True, samesite=settings.COOKIE_SAMESITE, secure=settings.COOKIE_SECURE ) return LoginResponse( access_token=access_token, user=user, requires_2fa_setup=requires_2fa_setup ) @router.post("/logout") async def logout( response: Response, current_user: dict = Depends(get_current_user), request: Optional[LogoutRequest] = None ): """ Revoke JWT token (logout) """ token_jti = request.token_jti if request and request.token_jti else current_user.get("token_jti") if token_jti: AuthService.revoke_token( token_jti, current_user['id'], current_user.get('is_shadow_admin', False) ) response.delete_cookie("access_token") return {"message": "Successfully logged out"} @router.get("/me") async def get_me(current_user: dict = Depends(get_current_user)): """ Get current authenticated user info """ return { "id": current_user['id'], "username": current_user['username'], "email": current_user['email'], "full_name": current_user['full_name'], "is_superadmin": current_user['is_superadmin'], "is_2fa_enabled": current_user.get('is_2fa_enabled', False), "permissions": current_user['permissions'] } @router.post("/2fa/setup") async def setup_2fa(current_user: dict = Depends(get_current_user)): """Generate and store TOTP secret (requires verification to enable)""" if current_user.get("is_shadow_admin"): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Shadow admin cannot configure 2FA", ) try: result = AuthService.setup_user_2fa( user_id=current_user["id"], username=current_user["username"] ) except RuntimeError as exc: if "2FA columns missing" in str(exc): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="2FA er ikke tilgaengelig i denne database (mangler kolonner).", ) raise return result @router.post("/2fa/enable") async def enable_2fa( request: TwoFactorCodeRequest, current_user: dict = Depends(get_current_user) ): """Enable 2FA after verifying the provided code""" if current_user.get("is_shadow_admin"): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Shadow admin cannot configure 2FA", ) ok = AuthService.enable_user_2fa( user_id=current_user["id"], otp_code=request.otp_code ) if not ok: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid 2FA code or missing setup", ) return {"message": "2FA enabled"} @router.post("/2fa/disable") async def disable_2fa( request: TwoFactorCodeRequest, current_user: dict = Depends(get_current_user) ): """Disable 2FA after verifying the provided code""" if current_user.get("is_shadow_admin"): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Shadow admin cannot configure 2FA", ) ok = AuthService.disable_user_2fa( user_id=current_user["id"], otp_code=request.otp_code ) if not ok: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid 2FA code or missing setup", ) return {"message": "2FA disabled"}