- Created a new SQL migration for the sag_salgsvarer table to manage sales and purchase items. - Implemented a new HTML template for the Varekøb & Salg module, including summary cards and tables for sales and purchases. - Added JavaScript functions for loading and rendering order data dynamically. - Introduced a new backend search module for customers, contacts, hardware, and locations with autocomplete functionality. - Developed an email templates API for managing system and customer-specific email templates. - Created multiple migrations for Nextcloud instances, cache, audit logs, email templates, sag comments, hardware locations, and billing methods. - Enhanced the sag module with solutions, order lines, work types, and 2FA support for user authentication.
488 lines
17 KiB
Python
488 lines
17 KiB
Python
"""
|
|
Authentication Service - Håndterer login, JWT tokens, password hashing
|
|
Adapted from OmniSync for BMC Hub
|
|
"""
|
|
from typing import Optional, Dict, List, Tuple
|
|
from datetime import datetime, timedelta
|
|
import hashlib
|
|
import secrets
|
|
import jwt
|
|
import pyotp
|
|
from passlib.context import CryptContext
|
|
from app.core.database import execute_query, execute_query_single, execute_insert, execute_update
|
|
from app.core.config import settings
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# JWT Settings
|
|
SECRET_KEY = getattr(settings, 'JWT_SECRET_KEY', 'your-secret-key-change-in-production')
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 8 # 8 timer
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
|
|
class AuthService:
|
|
"""Service for authentication and authorization"""
|
|
|
|
@staticmethod
|
|
def hash_password(password: str) -> str:
|
|
"""
|
|
Hash password using bcrypt
|
|
"""
|
|
return pwd_context.hash(password)
|
|
|
|
@staticmethod
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
|
"""Verify password against hash"""
|
|
if not hashed_password:
|
|
return False
|
|
if not hashed_password.startswith("$2"):
|
|
return False
|
|
try:
|
|
return pwd_context.verify(plain_password, hashed_password)
|
|
except Exception:
|
|
return False
|
|
|
|
@staticmethod
|
|
def verify_legacy_sha256(plain_password: str, hashed_password: str) -> bool:
|
|
"""Verify legacy SHA256 hash and upgrade when used"""
|
|
if not hashed_password or len(hashed_password) != 64:
|
|
return False
|
|
try:
|
|
return hashlib.sha256(plain_password.encode()).hexdigest() == hashed_password
|
|
except Exception:
|
|
return False
|
|
|
|
@staticmethod
|
|
def upgrade_password_hash(user_id: int, plain_password: str):
|
|
"""Upgrade legacy password hash to bcrypt"""
|
|
new_hash = AuthService.hash_password(plain_password)
|
|
execute_update(
|
|
"UPDATE users SET password_hash = %s, updated_at = CURRENT_TIMESTAMP WHERE user_id = %s",
|
|
(new_hash, user_id)
|
|
)
|
|
|
|
@staticmethod
|
|
def verify_totp_code(secret: str, code: str) -> bool:
|
|
"""Verify TOTP code"""
|
|
if not secret or not code:
|
|
return False
|
|
try:
|
|
totp = pyotp.TOTP(secret)
|
|
return totp.verify(code, valid_window=1)
|
|
except Exception:
|
|
return False
|
|
|
|
@staticmethod
|
|
def generate_2fa_secret() -> str:
|
|
"""Generate a new TOTP secret"""
|
|
return pyotp.random_base32()
|
|
|
|
@staticmethod
|
|
def get_2fa_provisioning_uri(username: str, secret: str) -> str:
|
|
"""Generate provisioning URI for authenticator apps"""
|
|
totp = pyotp.TOTP(secret)
|
|
return totp.provisioning_uri(name=username, issuer_name="BMC Hub")
|
|
|
|
@staticmethod
|
|
def setup_user_2fa(user_id: int, username: str) -> Dict:
|
|
"""Create and store a new TOTP secret (not enabled until verified)"""
|
|
secret = AuthService.generate_2fa_secret()
|
|
execute_update(
|
|
"UPDATE users SET totp_secret = %s, is_2fa_enabled = FALSE, updated_at = CURRENT_TIMESTAMP WHERE user_id = %s",
|
|
(secret, user_id)
|
|
)
|
|
|
|
return {
|
|
"secret": secret,
|
|
"provisioning_uri": AuthService.get_2fa_provisioning_uri(username, secret)
|
|
}
|
|
|
|
@staticmethod
|
|
def enable_user_2fa(user_id: int, otp_code: str) -> bool:
|
|
"""Enable 2FA after verifying TOTP code"""
|
|
user = execute_query_single(
|
|
"SELECT totp_secret FROM users WHERE user_id = %s",
|
|
(user_id,)
|
|
)
|
|
|
|
if not user or not user.get("totp_secret"):
|
|
return False
|
|
|
|
if not AuthService.verify_totp_code(user["totp_secret"], otp_code):
|
|
return False
|
|
|
|
execute_update(
|
|
"UPDATE users SET is_2fa_enabled = TRUE, updated_at = CURRENT_TIMESTAMP WHERE user_id = %s",
|
|
(user_id,)
|
|
)
|
|
return True
|
|
|
|
@staticmethod
|
|
def disable_user_2fa(user_id: int, otp_code: str) -> bool:
|
|
"""Disable 2FA after verifying TOTP code"""
|
|
user = execute_query_single(
|
|
"SELECT totp_secret FROM users WHERE user_id = %s",
|
|
(user_id,)
|
|
)
|
|
|
|
if not user or not user.get("totp_secret"):
|
|
return False
|
|
|
|
if not AuthService.verify_totp_code(user["totp_secret"], otp_code):
|
|
return False
|
|
|
|
execute_update(
|
|
"UPDATE users SET is_2fa_enabled = FALSE, totp_secret = NULL, updated_at = CURRENT_TIMESTAMP WHERE user_id = %s",
|
|
(user_id,)
|
|
)
|
|
return True
|
|
|
|
@staticmethod
|
|
def create_access_token(
|
|
user_id: int,
|
|
username: str,
|
|
is_superadmin: bool = False,
|
|
is_shadow_admin: bool = False
|
|
) -> str:
|
|
"""
|
|
Create JWT access token
|
|
|
|
Args:
|
|
user_id: User ID
|
|
username: Username
|
|
is_superadmin: Whether user is superadmin
|
|
|
|
Returns:
|
|
JWT token string
|
|
"""
|
|
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
jti = secrets.token_urlsafe(32) # JWT ID for token revocation
|
|
|
|
payload = {
|
|
"sub": str(user_id),
|
|
"username": username,
|
|
"is_superadmin": is_superadmin,
|
|
"shadow_admin": is_shadow_admin,
|
|
"exp": expire,
|
|
"iat": datetime.utcnow(),
|
|
"jti": jti
|
|
}
|
|
|
|
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
# Store session for token revocation (skip for shadow admin)
|
|
if not is_shadow_admin:
|
|
execute_insert(
|
|
"""INSERT INTO sessions (user_id, token_jti, expires_at)
|
|
VALUES (%s, %s, %s)""",
|
|
(user_id, jti, expire)
|
|
)
|
|
|
|
return token
|
|
|
|
@staticmethod
|
|
def verify_token(token: str) -> Optional[Dict]:
|
|
"""
|
|
Verify and decode JWT token
|
|
|
|
Returns:
|
|
Dict with user info or None if invalid
|
|
"""
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
|
|
if payload.get("shadow_admin"):
|
|
return payload
|
|
|
|
# Check if token is revoked
|
|
jti = payload.get('jti')
|
|
if jti:
|
|
session = execute_query_single(
|
|
"SELECT revoked FROM sessions WHERE token_jti = %s",
|
|
(jti,))
|
|
if session and session.get('revoked'):
|
|
logger.warning(f"⚠️ Revoked token used: {jti[:10]}...")
|
|
return None
|
|
|
|
return payload
|
|
|
|
except jwt.ExpiredSignatureError:
|
|
logger.warning("⚠️ Expired token")
|
|
return None
|
|
except jwt.InvalidTokenError as e:
|
|
logger.warning(f"⚠️ Invalid token: {e}")
|
|
return None
|
|
|
|
@staticmethod
|
|
def authenticate_user(
|
|
username: str,
|
|
password: str,
|
|
ip_address: Optional[str] = None,
|
|
otp_code: Optional[str] = None
|
|
) -> Tuple[Optional[Dict], Optional[str]]:
|
|
"""
|
|
Authenticate user with username/password
|
|
|
|
Args:
|
|
username: Username
|
|
password: Plain text password
|
|
ip_address: Client IP address (for logging)
|
|
|
|
Returns:
|
|
User dict if successful, None otherwise
|
|
"""
|
|
# Shadow Admin shortcut
|
|
if settings.SHADOW_ADMIN_ENABLED and username == settings.SHADOW_ADMIN_USERNAME:
|
|
if not settings.SHADOW_ADMIN_PASSWORD or not settings.SHADOW_ADMIN_TOTP_SECRET:
|
|
logger.error("❌ Shadow admin enabled but not configured")
|
|
return None, "Shadow admin not configured"
|
|
|
|
if not secrets.compare_digest(password, settings.SHADOW_ADMIN_PASSWORD):
|
|
logger.warning(f"❌ Shadow admin login failed from IP: {ip_address}")
|
|
return None, "Invalid username or password"
|
|
|
|
if not otp_code:
|
|
return None, "2FA code required"
|
|
|
|
if not AuthService.verify_totp_code(settings.SHADOW_ADMIN_TOTP_SECRET, otp_code):
|
|
logger.warning(f"❌ Shadow admin 2FA failed from IP: {ip_address}")
|
|
return None, "Invalid 2FA code"
|
|
|
|
logger.warning(f"⚠️ Shadow admin login used from IP: {ip_address}")
|
|
return {
|
|
"user_id": 0,
|
|
"username": settings.SHADOW_ADMIN_USERNAME,
|
|
"email": settings.SHADOW_ADMIN_EMAIL,
|
|
"full_name": settings.SHADOW_ADMIN_FULL_NAME,
|
|
"is_superadmin": True,
|
|
"is_shadow_admin": True
|
|
}, None
|
|
|
|
# Get user
|
|
user = execute_query_single(
|
|
"""SELECT user_id, username, email, password_hash, full_name,
|
|
is_active, is_superadmin, failed_login_attempts, locked_until,
|
|
is_2fa_enabled, totp_secret
|
|
FROM users
|
|
WHERE username = %s OR email = %s""",
|
|
(username, username))
|
|
|
|
if not user:
|
|
logger.warning(f"❌ Login failed: User not found - {username}")
|
|
return None, "Invalid username or password"
|
|
|
|
# Check if account is active
|
|
if not user['is_active']:
|
|
logger.warning(f"❌ Login failed: Account disabled - {username}")
|
|
return None, "Account disabled"
|
|
|
|
# Check if account is locked
|
|
if user['locked_until']:
|
|
locked_until = user['locked_until']
|
|
if datetime.now() < locked_until:
|
|
logger.warning(f"❌ Login failed: Account locked - {username}")
|
|
return None, "Account locked"
|
|
else:
|
|
# Unlock account
|
|
execute_update(
|
|
"UPDATE users SET locked_until = NULL, failed_login_attempts = 0 WHERE user_id = %s",
|
|
(user['user_id'],)
|
|
)
|
|
|
|
# Verify password
|
|
if AuthService.verify_password(password, user['password_hash']):
|
|
pass
|
|
elif AuthService.verify_legacy_sha256(password, user['password_hash']):
|
|
AuthService.upgrade_password_hash(user['user_id'], password)
|
|
else:
|
|
# Increment failed attempts
|
|
failed_attempts = user['failed_login_attempts'] + 1
|
|
|
|
if failed_attempts >= 5:
|
|
# Lock account for 30 minutes
|
|
locked_until = datetime.now() + timedelta(minutes=30)
|
|
execute_update(
|
|
"""UPDATE users
|
|
SET failed_login_attempts = %s, locked_until = %s
|
|
WHERE user_id = %s""",
|
|
(failed_attempts, locked_until, user['user_id'])
|
|
)
|
|
logger.warning(f"🔒 Account locked due to failed attempts: {username}")
|
|
else:
|
|
execute_update(
|
|
"UPDATE users SET failed_login_attempts = %s WHERE user_id = %s",
|
|
(failed_attempts, user['user_id'])
|
|
)
|
|
|
|
logger.warning(f"❌ Login failed: Invalid password - {username} (attempt {failed_attempts})")
|
|
return None, "Invalid username or password"
|
|
|
|
# 2FA check
|
|
if user.get('is_2fa_enabled'):
|
|
if not user.get('totp_secret'):
|
|
return None, "2FA not configured"
|
|
|
|
if not otp_code:
|
|
return None, "2FA code required"
|
|
|
|
if not AuthService.verify_totp_code(user['totp_secret'], otp_code):
|
|
logger.warning(f"❌ Login failed: Invalid 2FA - {username}")
|
|
return None, "Invalid 2FA code"
|
|
|
|
# Success! Reset failed attempts and update last login
|
|
execute_update(
|
|
"""UPDATE users
|
|
SET failed_login_attempts = 0,
|
|
locked_until = NULL,
|
|
last_login_at = CURRENT_TIMESTAMP
|
|
WHERE user_id = %s""",
|
|
(user['user_id'],)
|
|
)
|
|
|
|
logger.info(f"✅ User logged in: {username} from IP: {ip_address}")
|
|
|
|
return {
|
|
'user_id': user['user_id'],
|
|
'username': user['username'],
|
|
'email': user['email'],
|
|
'full_name': user['full_name'],
|
|
'is_superadmin': bool(user['is_superadmin']),
|
|
'is_shadow_admin': False
|
|
}, None
|
|
|
|
@staticmethod
|
|
def revoke_token(jti: str, user_id: int, is_shadow_admin: bool = False):
|
|
"""Revoke a JWT token"""
|
|
if is_shadow_admin:
|
|
logger.info("🔒 Shadow admin logout - no session to revoke")
|
|
return
|
|
execute_update(
|
|
"UPDATE sessions SET revoked = TRUE WHERE token_jti = %s AND user_id = %s",
|
|
(jti, user_id)
|
|
)
|
|
logger.info(f"🔒 Token revoked for user {user_id}")
|
|
|
|
@staticmethod
|
|
def get_all_permissions() -> List[str]:
|
|
"""Get all permission codes"""
|
|
perms = execute_query("SELECT code FROM permissions")
|
|
return [p['code'] for p in perms] if perms else []
|
|
|
|
@staticmethod
|
|
def is_user_2fa_enabled(user_id: int) -> bool:
|
|
"""Check if user has 2FA enabled"""
|
|
user = execute_query_single(
|
|
"SELECT is_2fa_enabled FROM users WHERE user_id = %s",
|
|
(user_id,)
|
|
)
|
|
return bool(user and user.get("is_2fa_enabled"))
|
|
|
|
@staticmethod
|
|
def get_user_permissions(user_id: int) -> List[str]:
|
|
"""
|
|
Get all permissions for a user (through their groups)
|
|
|
|
Args:
|
|
user_id: User ID
|
|
|
|
Returns:
|
|
List of permission codes
|
|
"""
|
|
# Check if user is superadmin first
|
|
user = execute_query_single(
|
|
"SELECT is_superadmin FROM users WHERE user_id = %s",
|
|
(user_id,))
|
|
|
|
# Superadmins have all permissions
|
|
if user and user['is_superadmin']:
|
|
return AuthService.get_all_permissions()
|
|
|
|
# Get permissions through groups
|
|
perms = execute_query("""
|
|
SELECT DISTINCT p.code
|
|
FROM permissions p
|
|
JOIN group_permissions gp ON p.id = gp.permission_id
|
|
JOIN user_groups ug ON gp.group_id = ug.group_id
|
|
WHERE ug.user_id = %s
|
|
""", (user_id,))
|
|
|
|
return [p['code'] for p in perms] if perms else []
|
|
|
|
@staticmethod
|
|
def user_has_permission(user_id: int, permission_code: str) -> bool:
|
|
"""
|
|
Check if user has specific permission
|
|
|
|
Args:
|
|
user_id: User ID
|
|
permission_code: Permission code (e.g., 'customers.view')
|
|
|
|
Returns:
|
|
True if user has permission
|
|
"""
|
|
# Superadmins have all permissions
|
|
user = execute_query_single(
|
|
"SELECT is_superadmin FROM users WHERE user_id = %s",
|
|
(user_id,))
|
|
|
|
if user and user['is_superadmin']:
|
|
return True
|
|
|
|
# Check if user has permission through groups
|
|
result = execute_query_single("""
|
|
SELECT COUNT(*) as cnt
|
|
FROM permissions p
|
|
JOIN group_permissions gp ON p.id = gp.permission_id
|
|
JOIN user_groups ug ON gp.group_id = ug.group_id
|
|
WHERE ug.user_id = %s AND p.code = %s
|
|
""", (user_id, permission_code))
|
|
|
|
return bool(result and result['cnt'] > 0)
|
|
|
|
@staticmethod
|
|
def create_user(
|
|
username: str,
|
|
email: str,
|
|
password: str,
|
|
full_name: Optional[str] = None,
|
|
is_superadmin: bool = False
|
|
) -> Optional[int]:
|
|
"""
|
|
Create a new user
|
|
|
|
Returns:
|
|
New user ID or None if failed
|
|
"""
|
|
password_hash = AuthService.hash_password(password)
|
|
|
|
user_id = execute_insert(
|
|
"""INSERT INTO users
|
|
(username, email, password_hash, full_name, is_superadmin)
|
|
VALUES (%s, %s, %s, %s, %s) RETURNING user_id""",
|
|
(username, email, password_hash, full_name, is_superadmin)
|
|
)
|
|
|
|
logger.info(f"👤 User created: {username} (ID: {user_id})")
|
|
return user_id
|
|
|
|
@staticmethod
|
|
def change_password(user_id: int, new_password: str):
|
|
"""Change user password"""
|
|
password_hash = AuthService.hash_password(new_password)
|
|
|
|
execute_update(
|
|
"UPDATE users SET password_hash = %s, updated_at = CURRENT_TIMESTAMP WHERE user_id = %s",
|
|
(password_hash, user_id)
|
|
)
|
|
|
|
# Revoke all existing sessions
|
|
execute_update(
|
|
"UPDATE sessions SET revoked = TRUE WHERE user_id = %s",
|
|
(user_id,)
|
|
)
|
|
|
|
logger.info(f"🔑 Password changed for user {user_id}")
|