bmc_hub/app/core/auth_dependencies.py

221 lines
6.7 KiB
Python
Raw Normal View History

"""
FastAPI dependencies for authentication and authorization
Adapted from OmniSync for BMC Hub
"""
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional
from app.core.auth_service import AuthService
import logging
logger = logging.getLogger(__name__)
security = HTTPBearer()
async def get_current_user(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> dict:
"""
Dependency to get current authenticated user from JWT token
Usage:
@router.get("/endpoint")
async def my_endpoint(current_user: dict = Depends(get_current_user)):
...
"""
token = credentials.credentials
# Verify token
payload = AuthService.verify_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)
# Get user ID
user_id = int(payload.get("sub"))
username = payload.get("username")
is_superadmin = payload.get("is_superadmin", False)
# Add IP address to user info
ip_address = request.client.host if request.client else None
# Get additional user details from database
from app.core.database import execute_query
user_details = execute_query(
"SELECT email, full_name FROM users WHERE id = %s",
(user_id,),
fetchone=True
)
return {
"id": user_id,
"username": username,
"email": user_details.get('email') if user_details else None,
"full_name": user_details.get('full_name') if user_details else None,
"is_superadmin": is_superadmin,
"ip_address": ip_address,
"permissions": AuthService.get_user_permissions(user_id)
}
async def get_optional_user(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)
) -> Optional[dict]:
"""
Dependency to get current user if authenticated, None otherwise
Allows endpoints that work both with and without authentication
"""
if not credentials:
return None
try:
return await get_current_user(request, credentials)
except HTTPException:
return None
def require_permission(permission: str):
"""
Dependency factory to require specific permission
Usage:
@router.post("/products", dependencies=[Depends(require_permission("products.create"))])
async def create_product(...):
...
Or with user access:
@router.post("/products")
async def create_product(
current_user: dict = Depends(require_permission("products.create"))
):
...
"""
async def permission_checker(current_user: dict = Depends(get_current_user)) -> dict:
user_id = current_user["id"]
username = current_user["username"]
# Superadmins have all permissions
if current_user.get("is_superadmin"):
return current_user
# Check permission
if not AuthService.user_has_permission(user_id, permission):
logger.warning(
f"⚠️ Permission denied: {username} attempted {permission}"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required permission: {permission}"
)
return current_user
return permission_checker
def require_superadmin(current_user: dict = Depends(get_current_user)) -> dict:
"""
Dependency to require superadmin access
Usage:
@router.post("/admin/users")
async def create_user(current_user: dict = Depends(require_superadmin)):
...
"""
if not current_user.get("is_superadmin"):
logger.warning(
f"⚠️ Superadmin required: {current_user['username']} attempted admin access"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Superadmin access required"
)
return current_user
def require_any_permission(*permissions: str):
"""
Dependency factory to require ANY of the specified permissions
Usage:
@router.get("/reports")
async def get_reports(
current_user: dict = Depends(require_any_permission("reports.view", "reports.admin"))
):
...
"""
async def permission_checker(current_user: dict = Depends(get_current_user)) -> dict:
user_id = current_user["id"]
# Superadmins have all permissions
if current_user.get("is_superadmin"):
return current_user
# Check if user has ANY of the permissions
for permission in permissions:
if AuthService.user_has_permission(user_id, permission):
return current_user
# None of the permissions matched
logger.warning(
f"⚠️ Permission denied: {current_user['username']} "
f"attempted one of: {', '.join(permissions)}"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required permission. Need one of: {', '.join(permissions)}"
)
return permission_checker
def require_all_permissions(*permissions: str):
"""
Dependency factory to require ALL of the specified permissions
Usage:
@router.post("/sensitive-operation")
async def sensitive_op(
current_user: dict = Depends(require_all_permissions("admin.access", "data.export"))
):
...
"""
async def permission_checker(current_user: dict = Depends(get_current_user)) -> dict:
user_id = current_user["id"]
# Superadmins have all permissions
if current_user.get("is_superadmin"):
return current_user
# Check if user has ALL permissions
missing_permissions = []
for permission in permissions:
if not AuthService.user_has_permission(user_id, permission):
missing_permissions.append(permission)
if missing_permissions:
logger.warning(
f"⚠️ Permission denied: {current_user['username']} "
f"missing: {', '.join(missing_permissions)}"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required permissions: {', '.join(missing_permissions)}"
)
return current_user
return permission_checker