221 lines
6.7 KiB
Python
221 lines
6.7 KiB
Python
|
|
"""
|
||
|
|
FastAPI dependencies for authentication and authorization
|
||
|
|
Adapted from OmniSync for BMC Hub
|
||
|
|
"""
|
||
|
|
from fastapi import Depends, HTTPException, status, Request
|
||
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||
|
|
from typing import Optional
|
||
|
|
from app.core.auth_service import AuthService
|
||
|
|
import logging
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
security = HTTPBearer()
|
||
|
|
|
||
|
|
|
||
|
|
async def get_current_user(
|
||
|
|
request: Request,
|
||
|
|
credentials: HTTPAuthorizationCredentials = Depends(security)
|
||
|
|
) -> dict:
|
||
|
|
"""
|
||
|
|
Dependency to get current authenticated user from JWT token
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
@router.get("/endpoint")
|
||
|
|
async def my_endpoint(current_user: dict = Depends(get_current_user)):
|
||
|
|
...
|
||
|
|
"""
|
||
|
|
token = credentials.credentials
|
||
|
|
|
||
|
|
# Verify token
|
||
|
|
payload = AuthService.verify_token(token)
|
||
|
|
|
||
|
|
if not payload:
|
||
|
|
raise HTTPException(
|
||
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
|
|
detail="Invalid or expired token",
|
||
|
|
headers={"WWW-Authenticate": "Bearer"},
|
||
|
|
)
|
||
|
|
|
||
|
|
# Get user ID
|
||
|
|
user_id = int(payload.get("sub"))
|
||
|
|
username = payload.get("username")
|
||
|
|
is_superadmin = payload.get("is_superadmin", False)
|
||
|
|
|
||
|
|
# Add IP address to user info
|
||
|
|
ip_address = request.client.host if request.client else None
|
||
|
|
|
||
|
|
# Get additional user details from database
|
||
|
|
from app.core.database import execute_query
|
||
|
|
user_details = execute_query(
|
||
|
|
"SELECT email, full_name FROM users WHERE id = %s",
|
||
|
|
(user_id,),
|
||
|
|
fetchone=True
|
||
|
|
)
|
||
|
|
|
||
|
|
return {
|
||
|
|
"id": user_id,
|
||
|
|
"username": username,
|
||
|
|
"email": user_details.get('email') if user_details else None,
|
||
|
|
"full_name": user_details.get('full_name') if user_details else None,
|
||
|
|
"is_superadmin": is_superadmin,
|
||
|
|
"ip_address": ip_address,
|
||
|
|
"permissions": AuthService.get_user_permissions(user_id)
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
async def get_optional_user(
|
||
|
|
request: Request,
|
||
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)
|
||
|
|
) -> Optional[dict]:
|
||
|
|
"""
|
||
|
|
Dependency to get current user if authenticated, None otherwise
|
||
|
|
Allows endpoints that work both with and without authentication
|
||
|
|
"""
|
||
|
|
if not credentials:
|
||
|
|
return None
|
||
|
|
|
||
|
|
try:
|
||
|
|
return await get_current_user(request, credentials)
|
||
|
|
except HTTPException:
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def require_permission(permission: str):
|
||
|
|
"""
|
||
|
|
Dependency factory to require specific permission
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
@router.post("/products", dependencies=[Depends(require_permission("products.create"))])
|
||
|
|
async def create_product(...):
|
||
|
|
...
|
||
|
|
|
||
|
|
Or with user access:
|
||
|
|
@router.post("/products")
|
||
|
|
async def create_product(
|
||
|
|
current_user: dict = Depends(require_permission("products.create"))
|
||
|
|
):
|
||
|
|
...
|
||
|
|
"""
|
||
|
|
async def permission_checker(current_user: dict = Depends(get_current_user)) -> dict:
|
||
|
|
user_id = current_user["id"]
|
||
|
|
username = current_user["username"]
|
||
|
|
|
||
|
|
# Superadmins have all permissions
|
||
|
|
if current_user.get("is_superadmin"):
|
||
|
|
return current_user
|
||
|
|
|
||
|
|
# Check permission
|
||
|
|
if not AuthService.user_has_permission(user_id, permission):
|
||
|
|
logger.warning(
|
||
|
|
f"⚠️ Permission denied: {username} attempted {permission}"
|
||
|
|
)
|
||
|
|
|
||
|
|
raise HTTPException(
|
||
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
||
|
|
detail=f"Missing required permission: {permission}"
|
||
|
|
)
|
||
|
|
|
||
|
|
return current_user
|
||
|
|
|
||
|
|
return permission_checker
|
||
|
|
|
||
|
|
|
||
|
|
def require_superadmin(current_user: dict = Depends(get_current_user)) -> dict:
|
||
|
|
"""
|
||
|
|
Dependency to require superadmin access
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
@router.post("/admin/users")
|
||
|
|
async def create_user(current_user: dict = Depends(require_superadmin)):
|
||
|
|
...
|
||
|
|
"""
|
||
|
|
if not current_user.get("is_superadmin"):
|
||
|
|
logger.warning(
|
||
|
|
f"⚠️ Superadmin required: {current_user['username']} attempted admin access"
|
||
|
|
)
|
||
|
|
|
||
|
|
raise HTTPException(
|
||
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
||
|
|
detail="Superadmin access required"
|
||
|
|
)
|
||
|
|
|
||
|
|
return current_user
|
||
|
|
|
||
|
|
|
||
|
|
def require_any_permission(*permissions: str):
|
||
|
|
"""
|
||
|
|
Dependency factory to require ANY of the specified permissions
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
@router.get("/reports")
|
||
|
|
async def get_reports(
|
||
|
|
current_user: dict = Depends(require_any_permission("reports.view", "reports.admin"))
|
||
|
|
):
|
||
|
|
...
|
||
|
|
"""
|
||
|
|
async def permission_checker(current_user: dict = Depends(get_current_user)) -> dict:
|
||
|
|
user_id = current_user["id"]
|
||
|
|
|
||
|
|
# Superadmins have all permissions
|
||
|
|
if current_user.get("is_superadmin"):
|
||
|
|
return current_user
|
||
|
|
|
||
|
|
# Check if user has ANY of the permissions
|
||
|
|
for permission in permissions:
|
||
|
|
if AuthService.user_has_permission(user_id, permission):
|
||
|
|
return current_user
|
||
|
|
|
||
|
|
# None of the permissions matched
|
||
|
|
logger.warning(
|
||
|
|
f"⚠️ Permission denied: {current_user['username']} "
|
||
|
|
f"attempted one of: {', '.join(permissions)}"
|
||
|
|
)
|
||
|
|
|
||
|
|
raise HTTPException(
|
||
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
||
|
|
detail=f"Missing required permission. Need one of: {', '.join(permissions)}"
|
||
|
|
)
|
||
|
|
|
||
|
|
return permission_checker
|
||
|
|
|
||
|
|
|
||
|
|
def require_all_permissions(*permissions: str):
|
||
|
|
"""
|
||
|
|
Dependency factory to require ALL of the specified permissions
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
@router.post("/sensitive-operation")
|
||
|
|
async def sensitive_op(
|
||
|
|
current_user: dict = Depends(require_all_permissions("admin.access", "data.export"))
|
||
|
|
):
|
||
|
|
...
|
||
|
|
"""
|
||
|
|
async def permission_checker(current_user: dict = Depends(get_current_user)) -> dict:
|
||
|
|
user_id = current_user["id"]
|
||
|
|
|
||
|
|
# Superadmins have all permissions
|
||
|
|
if current_user.get("is_superadmin"):
|
||
|
|
return current_user
|
||
|
|
|
||
|
|
# Check if user has ALL permissions
|
||
|
|
missing_permissions = []
|
||
|
|
for permission in permissions:
|
||
|
|
if not AuthService.user_has_permission(user_id, permission):
|
||
|
|
missing_permissions.append(permission)
|
||
|
|
|
||
|
|
if missing_permissions:
|
||
|
|
logger.warning(
|
||
|
|
f"⚠️ Permission denied: {current_user['username']} "
|
||
|
|
f"missing: {', '.join(missing_permissions)}"
|
||
|
|
)
|
||
|
|
|
||
|
|
raise HTTPException(
|
||
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
||
|
|
detail=f"Missing required permissions: {', '.join(missing_permissions)}"
|
||
|
|
)
|
||
|
|
|
||
|
|
return current_user
|
||
|
|
|
||
|
|
return permission_checker
|