""" FastAPI dependencies for authentication and authorization Adapted from OmniSync for BMC Hub """ from fastapi import Depends, HTTPException, status, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from typing import Optional from app.core.auth_service import AuthService import logging logger = logging.getLogger(__name__) security = HTTPBearer() async def get_current_user( request: Request, credentials: HTTPAuthorizationCredentials = Depends(security) ) -> dict: """ Dependency to get current authenticated user from JWT token Usage: @router.get("/endpoint") async def my_endpoint(current_user: dict = Depends(get_current_user)): ... """ token = credentials.credentials # Verify token payload = AuthService.verify_token(token) if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, ) # Get user ID user_id = int(payload.get("sub")) username = payload.get("username") is_superadmin = payload.get("is_superadmin", False) # Add IP address to user info ip_address = request.client.host if request.client else None # Get additional user details from database from app.core.database import execute_query user_details = execute_query( "SELECT email, full_name FROM users WHERE id = %s", (user_id,), fetchone=True ) return { "id": user_id, "username": username, "email": user_details.get('email') if user_details else None, "full_name": user_details.get('full_name') if user_details else None, "is_superadmin": is_superadmin, "ip_address": ip_address, "permissions": AuthService.get_user_permissions(user_id) } async def get_optional_user( request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security) ) -> Optional[dict]: """ Dependency to get current user if authenticated, None otherwise Allows endpoints that work both with and without authentication """ if not credentials: return None try: return await get_current_user(request, credentials) except HTTPException: return None def require_permission(permission: str): """ Dependency factory to require specific permission Usage: @router.post("/products", dependencies=[Depends(require_permission("products.create"))]) async def create_product(...): ... Or with user access: @router.post("/products") async def create_product( current_user: dict = Depends(require_permission("products.create")) ): ... """ async def permission_checker(current_user: dict = Depends(get_current_user)) -> dict: user_id = current_user["id"] username = current_user["username"] # Superadmins have all permissions if current_user.get("is_superadmin"): return current_user # Check permission if not AuthService.user_has_permission(user_id, permission): logger.warning( f"⚠️ Permission denied: {username} attempted {permission}" ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing required permission: {permission}" ) return current_user return permission_checker def require_superadmin(current_user: dict = Depends(get_current_user)) -> dict: """ Dependency to require superadmin access Usage: @router.post("/admin/users") async def create_user(current_user: dict = Depends(require_superadmin)): ... """ if not current_user.get("is_superadmin"): logger.warning( f"⚠️ Superadmin required: {current_user['username']} attempted admin access" ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Superadmin access required" ) return current_user def require_any_permission(*permissions: str): """ Dependency factory to require ANY of the specified permissions Usage: @router.get("/reports") async def get_reports( current_user: dict = Depends(require_any_permission("reports.view", "reports.admin")) ): ... """ async def permission_checker(current_user: dict = Depends(get_current_user)) -> dict: user_id = current_user["id"] # Superadmins have all permissions if current_user.get("is_superadmin"): return current_user # Check if user has ANY of the permissions for permission in permissions: if AuthService.user_has_permission(user_id, permission): return current_user # None of the permissions matched logger.warning( f"⚠️ Permission denied: {current_user['username']} " f"attempted one of: {', '.join(permissions)}" ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing required permission. Need one of: {', '.join(permissions)}" ) return permission_checker def require_all_permissions(*permissions: str): """ Dependency factory to require ALL of the specified permissions Usage: @router.post("/sensitive-operation") async def sensitive_op( current_user: dict = Depends(require_all_permissions("admin.access", "data.export")) ): ... """ async def permission_checker(current_user: dict = Depends(get_current_user)) -> dict: user_id = current_user["id"] # Superadmins have all permissions if current_user.get("is_superadmin"): return current_user # Check if user has ALL permissions missing_permissions = [] for permission in permissions: if not AuthService.user_has_permission(user_id, permission): missing_permissions.append(permission) if missing_permissions: logger.warning( f"⚠️ Permission denied: {current_user['username']} " f"missing: {', '.join(missing_permissions)}" ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing required permissions: {', '.join(missing_permissions)}" ) return current_user return permission_checker