""" Authentication Service - Håndterer login, JWT tokens, password hashing Adapted from OmniSync for BMC Hub """ from typing import Optional, Dict, List from datetime import datetime, timedelta import hashlib import secrets import jwt from app.core.database import execute_query, execute_insert, execute_update from app.core.config import settings import logging logger = logging.getLogger(__name__) # JWT Settings SECRET_KEY = getattr(settings, 'JWT_SECRET_KEY', 'your-secret-key-change-in-production') ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 8 # 8 timer class AuthService: """Service for authentication and authorization""" @staticmethod def hash_password(password: str) -> str: """ Hash password using SHA256 I produktion: Brug bcrypt eller argon2! """ return hashlib.sha256(password.encode()).hexdigest() @staticmethod def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify password against hash""" return AuthService.hash_password(plain_password) == hashed_password @staticmethod def create_access_token(user_id: int, username: str, is_superadmin: bool = False) -> str: """ Create JWT access token Args: user_id: User ID username: Username is_superadmin: Whether user is superadmin Returns: JWT token string """ expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) jti = secrets.token_urlsafe(32) # JWT ID for token revocation payload = { "sub": str(user_id), "username": username, "is_superadmin": is_superadmin, "exp": expire, "iat": datetime.utcnow(), "jti": jti } token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) # Store session for token revocation execute_insert( """INSERT INTO sessions (user_id, token_jti, expires_at) VALUES (%s, %s, %s)""", (user_id, jti, expire) ) return token @staticmethod def verify_token(token: str) -> Optional[Dict]: """ Verify and decode JWT token Returns: Dict with user info or None if invalid """ try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) # Check if token is revoked jti = payload.get('jti') if jti: session = execute_query( "SELECT revoked FROM sessions WHERE token_jti = %s", (jti,), fetchone=True ) if session and session.get('revoked'): logger.warning(f"⚠️ Revoked token used: {jti[:10]}...") return None return payload except jwt.ExpiredSignatureError: logger.warning("⚠️ Expired token") return None except jwt.InvalidTokenError as e: logger.warning(f"⚠️ Invalid token: {e}") return None @staticmethod def authenticate_user(username: str, password: str, ip_address: Optional[str] = None) -> Optional[Dict]: """ Authenticate user with username/password Args: username: Username password: Plain text password ip_address: Client IP address (for logging) Returns: User dict if successful, None otherwise """ # Get user user = execute_query( """SELECT id, username, email, password_hash, full_name, is_active, is_superadmin, failed_login_attempts, locked_until FROM users WHERE username = %s OR email = %s""", (username, username), fetchone=True ) if not user: logger.warning(f"❌ Login failed: User not found - {username}") return None # Check if account is active if not user['is_active']: logger.warning(f"❌ Login failed: Account disabled - {username}") return None # Check if account is locked if user['locked_until']: locked_until = user['locked_until'] if datetime.now() < locked_until: logger.warning(f"❌ Login failed: Account locked - {username}") return None else: # Unlock account execute_update( "UPDATE users SET locked_until = NULL, failed_login_attempts = 0 WHERE id = %s", (user['id'],) ) # Verify password if not AuthService.verify_password(password, user['password_hash']): # Increment failed attempts failed_attempts = user['failed_login_attempts'] + 1 if failed_attempts >= 5: # Lock account for 30 minutes locked_until = datetime.now() + timedelta(minutes=30) execute_update( """UPDATE users SET failed_login_attempts = %s, locked_until = %s WHERE id = %s""", (failed_attempts, locked_until, user['id']) ) logger.warning(f"🔒 Account locked due to failed attempts: {username}") else: execute_update( "UPDATE users SET failed_login_attempts = %s WHERE id = %s", (failed_attempts, user['id']) ) logger.warning(f"❌ Login failed: Invalid password - {username} (attempt {failed_attempts})") return None # Success! Reset failed attempts and update last login execute_update( """UPDATE users SET failed_login_attempts = 0, locked_until = NULL, last_login_at = CURRENT_TIMESTAMP WHERE id = %s""", (user['id'],) ) logger.info(f"✅ User logged in: {username} from IP: {ip_address}") return { 'user_id': user['id'], 'username': user['username'], 'email': user['email'], 'full_name': user['full_name'], 'is_superadmin': bool(user['is_superadmin']) } @staticmethod def revoke_token(jti: str, user_id: int): """Revoke a JWT token""" execute_update( "UPDATE sessions SET revoked = TRUE WHERE token_jti = %s AND user_id = %s", (jti, user_id) ) logger.info(f"🔒 Token revoked for user {user_id}") @staticmethod def get_user_permissions(user_id: int) -> List[str]: """ Get all permissions for a user (through their groups) Args: user_id: User ID Returns: List of permission codes """ # Check if user is superadmin first user = execute_query( "SELECT is_superadmin FROM users WHERE id = %s", (user_id,), fetchone=True ) # Superadmins have all permissions if user and user['is_superadmin']: all_perms = execute_query("SELECT code FROM permissions") return [p['code'] for p in all_perms] if all_perms else [] # Get permissions through groups perms = execute_query(""" SELECT DISTINCT p.code FROM permissions p JOIN group_permissions gp ON p.id = gp.permission_id JOIN user_groups ug ON gp.group_id = ug.group_id WHERE ug.user_id = %s """, (user_id,)) return [p['code'] for p in perms] if perms else [] @staticmethod def user_has_permission(user_id: int, permission_code: str) -> bool: """ Check if user has specific permission Args: user_id: User ID permission_code: Permission code (e.g., 'customers.view') Returns: True if user has permission """ # Superadmins have all permissions user = execute_query( "SELECT is_superadmin FROM users WHERE id = %s", (user_id,), fetchone=True ) if user and user['is_superadmin']: return True # Check if user has permission through groups result = execute_query(""" SELECT COUNT(*) as cnt FROM permissions p JOIN group_permissions gp ON p.id = gp.permission_id JOIN user_groups ug ON gp.group_id = ug.group_id WHERE ug.user_id = %s AND p.code = %s """, (user_id, permission_code), fetchone=True) return bool(result and result['cnt'] > 0) @staticmethod def create_user( username: str, email: str, password: str, full_name: Optional[str] = None, is_superadmin: bool = False ) -> Optional[int]: """ Create a new user Returns: New user ID or None if failed """ password_hash = AuthService.hash_password(password) user_id = execute_insert( """INSERT INTO users (username, email, password_hash, full_name, is_superadmin) VALUES (%s, %s, %s, %s, %s) RETURNING id""", (username, email, password_hash, full_name, is_superadmin) ) logger.info(f"👤 User created: {username} (ID: {user_id})") return user_id @staticmethod def change_password(user_id: int, new_password: str): """Change user password""" password_hash = AuthService.hash_password(new_password) execute_update( "UPDATE users SET password_hash = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s", (password_hash, user_id) ) # Revoke all existing sessions execute_update( "UPDATE sessions SET revoked = TRUE WHERE user_id = %s", (user_id,) ) logger.info(f"🔑 Password changed for user {user_id}")