""" FastAPI dependencies for authentication and authorization Adapted from OmniSync for BMC Hub """ from fastapi import Depends, HTTPException, status, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from typing import Optional from app.core.auth_service import AuthService from app.core.config import settings from app.core.database import execute_query_single import logging logger = logging.getLogger(__name__) security = HTTPBearer(auto_error=False) async def get_current_user( request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security) ) -> dict: """ Dependency to get current authenticated user from JWT token Usage: @router.get("/endpoint") async def my_endpoint(current_user: dict = Depends(get_current_user)): ... """ token = credentials.credentials if credentials else request.cookies.get("access_token") if not token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated", headers={"WWW-Authenticate": "Bearer"}, ) # Verify token payload = AuthService.verify_token(token) if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, ) # Get user ID user_id = int(payload.get("sub")) username = payload.get("username") is_superadmin = payload.get("is_superadmin", False) is_shadow_admin = payload.get("shadow_admin", False) # Add IP address to user info ip_address = request.client.host if request.client else None if is_shadow_admin: return { "id": user_id, "username": username, "email": settings.SHADOW_ADMIN_EMAIL, "full_name": settings.SHADOW_ADMIN_FULL_NAME, "is_superadmin": True, "is_shadow_admin": True, "is_2fa_enabled": True, "ip_address": ip_address, "permissions": AuthService.get_all_permissions() } # Get additional user details from database user_details = execute_query_single( "SELECT email, full_name, is_2fa_enabled FROM users WHERE user_id = %s", (user_id,)) return { "id": user_id, "username": username, "email": user_details.get('email') if user_details else None, "full_name": user_details.get('full_name') if user_details else None, "is_superadmin": is_superadmin, "is_shadow_admin": False, "is_2fa_enabled": user_details.get('is_2fa_enabled') if user_details else False, "ip_address": ip_address, "permissions": AuthService.get_user_permissions(user_id) } async def get_optional_user( request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security) ) -> Optional[dict]: """ Dependency to get current user if authenticated, None otherwise Allows endpoints that work both with and without authentication """ if not credentials and not request.cookies.get("access_token"): return None try: return await get_current_user(request, credentials) except HTTPException: return None def require_permission(permission: str): """ Dependency factory to require specific permission Usage: @router.post("/products", dependencies=[Depends(require_permission("products.create"))]) async def create_product(...): ... Or with user access: @router.post("/products") async def create_product( current_user: dict = Depends(require_permission("products.create")) ): ... """ async def permission_checker(current_user: dict = Depends(get_current_user)) -> dict: user_id = current_user["id"] username = current_user["username"] # Superadmins have all permissions if current_user.get("is_superadmin"): return current_user # Check permission if not AuthService.user_has_permission(user_id, permission): logger.warning( f"⚠️ Permission denied: {username} attempted {permission}" ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing required permission: {permission}" ) return current_user return permission_checker def require_superadmin(current_user: dict = Depends(get_current_user)) -> dict: """ Dependency to require superadmin access Usage: @router.post("/admin/users") async def create_user(current_user: dict = Depends(require_superadmin)): ... """ if not current_user.get("is_superadmin"): logger.warning( f"⚠️ Superadmin required: {current_user['username']} attempted admin access" ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Superadmin access required" ) return current_user def require_any_permission(*permissions: str): """ Dependency factory to require ANY of the specified permissions Usage: @router.get("/reports") async def get_reports( current_user: dict = Depends(require_any_permission("reports.view", "reports.admin")) ): ... """ async def permission_checker(current_user: dict = Depends(get_current_user)) -> dict: user_id = current_user["id"] # Superadmins have all permissions if current_user.get("is_superadmin"): return current_user # Check if user has ANY of the permissions for permission in permissions: if AuthService.user_has_permission(user_id, permission): return current_user # None of the permissions matched logger.warning( f"⚠️ Permission denied: {current_user['username']} " f"attempted one of: {', '.join(permissions)}" ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing required permission. Need one of: {', '.join(permissions)}" ) return permission_checker def require_all_permissions(*permissions: str): """ Dependency factory to require ALL of the specified permissions Usage: @router.post("/sensitive-operation") async def sensitive_op( current_user: dict = Depends(require_all_permissions("admin.access", "data.export")) ): ... """ async def permission_checker(current_user: dict = Depends(get_current_user)) -> dict: user_id = current_user["id"] # Superadmins have all permissions if current_user.get("is_superadmin"): return current_user # Check if user has ALL permissions missing_permissions = [] for permission in permissions: if not AuthService.user_has_permission(user_id, permission): missing_permissions.append(permission) if missing_permissions: logger.warning( f"⚠️ Permission denied: {current_user['username']} " f"missing: {', '.join(missing_permissions)}" ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing required permissions: {', '.join(missing_permissions)}" ) return current_user return permission_checker