import uuid import logging import hashlib from typing import Optional def hash_pwd(password): return hashlib.sha256(password.encode()).hexdigest() logger = logging.getLogger(__name__) from datetime import datetime,timedelta from agent.models.db.user import User,Session,Role,Permission, user_roles, role_permissions class UserBusiness: def __init__(self, db): self.db = db def get_user(self, user_id): return self.db.query(User).filter(User.id == user_id).first() def get_user_by_username(self, username): return self.db.query(User).filter(User.username == username).first() def get_all_users(self): return self.db.query(User).all() def get_users_paginated(self, username: Optional[str], page_no: int, page_size: int): query = self.db.query(User) if username: query = query.filter(User.username.ilike(f"%{username}%")) total_count = query.count() users = query.offset((page_no - 1) * page_size).limit(page_size).all() return users, total_count def create_user(self, username, password, fullname, email=""): password = hash_pwd(password) user = User(username=username, hashed_password=password, full_name=fullname, email=email) self.db.add(user) self.db.commit() self.db.refresh(user) return user def update_user(self, user_id, username=None, password=None): user = self.get_user(user_id) if user: if username: user.username = username if password: password = hash_pwd(password) user.password = password self.db.commit() self.db.refresh(user) return user def delete_user(self, user_id): user = self.get_user(user_id) if user: self.db.delete(user) self.db.commit() return user def verify_password(self, request_password, user_hashed_password): hashed_password = hash_pwd(request_password) logger.info(f"verify password: {hashed_password} == {user_hashed_password}") return hashed_password == user_hashed_password class SessionBusiness: def __init__(self, db): self.db = db def create_session(self, user:User): session_id = str(uuid.uuid4()) session = Session(session_id=session_id, user_id=user.id, username=user.username, full_name=user.full_name, created=datetime.now(), updated=datetime.now()) self.db.add(session) self.db.commit() self.db.refresh(session) return session def get_session(self, session_id): return self.db.query(Session).filter(Session.session_id == session_id).first() def delete_session(self, session_id): session = self.get_session(session_id) if session: self.db.delete(session) self.db.commit() return True def update_session(self, session_id): session = self.get_session(session_id) if session: session.updated = datetime.now() self.db.commit() self.db.refresh(session) def get_session_by_user_id(self, user_id): return self.db.query(Session).filter(Session.user_id == user_id).first() def validate_session(self, username, session_id): session = self.get_session(session_id) if session: if session.username != username: return None expired:timedelta = datetime.now() - session.updated if expired.seconds > 1800 : # 30 minutes logger.info(f"session expired: {session_id}") #self.delete_session(session_id) return None else: self.update_session(session_id) return session class RoleBusiness: def __init__(self, db): self.db = db def create_role(self, name, description=""): role = Role(name=name, description=description) self.db.add(role) self.db.commit() self.db.refresh(role) return role def get_role_by_name(self, name): return self.db.query(Role).filter(Role.name == name).first() def get_role(self, role_id): return self.db.query(Role).filter(Role.id == role_id).first() def get_all_roles(self): return self.db.query(Role).all() def delete_role(self, role_id): role = self.get_role(role_id) if role: self.db.delete(role) self.db.commit() return role def update_role(self, role_id, name=None, description=None): role = self.get_role(role_id) if role: if name: role.name = name if description: role.description = description self.db.commit() self.db.refresh(role) return role def assign_permission_to_role(self, role_id, permission_id): role = self.get_role(role_id) permission = self.db.query(Permission).filter(Permission.id == permission_id).first() if role and permission: role.permissions.append(permission) self.db.commit() return True return False def revoke_permission_from_role(self, role_id, permission_id): role = self.get_role(role_id) permission = self.db.query(Permission).filter(Permission.id == permission_id).first() if role and permission and permission in role.permissions: role.permissions.remove(permission) self.db.commit() return True return False def revoke_all_permissions_from_role(self, role_id): role = self.get_role(role_id) if role: role.permissions.clear() self.db.commit() return True return False def get_role_permissions(self, role_id): role = self.get_role(role_id) if role: return role.permissions return [] class PermissionBusiness: def __init__(self, db): self.db = db def create_permission(self, name, description="", menu_name=None, menu_route=None, menu_icon=None, parent_id=None): permission = Permission( name=name, description=description, menu_name=menu_name, menu_route=menu_route, menu_icon=menu_icon, parent_id=parent_id ) self.db.add(permission) self.db.commit() self.db.refresh(permission) return permission def get_permission_by_name(self, name): return self.db.query(Permission).filter(Permission.name == name).first() def get_permission(self, permission_id): return self.db.query(Permission).filter(Permission.id == permission_id).first() def get_all_permissions(self): return self.db.query(Permission).all() def delete_permission(self, permission_id): permission = self.get_permission(permission_id) if permission: self.db.delete(permission) self.db.commit() return permission class UserRoleBusiness: def __init__(self, db): self.db = db self.user_biz = UserBusiness(db) self.role_biz = RoleBusiness(db) def assign_role_to_user(self, user_id, role_id): user = self.user_biz.get_user(user_id) role = self.role_biz.get_role(role_id) if user and role: user.roles.append(role) self.db.commit() return True return False def revoke_role_from_user(self, user_id, role_id): user = self.user_biz.get_user(user_id) role = self.role_biz.get_role(role_id) if user and role and role in user.roles: user.roles.remove(role) self.db.commit() return True return False def get_user_roles(self, user_id): user = self.user_biz.get_user(user_id) if user: return user.roles return [] def get_user_permissions(self, user_id): user = self.user_biz.get_user(user_id) if not user: return [] permissions = set() for role in user.roles: for perm in role.permissions: permissions.add(perm) return list(permissions) def get_user_menu_permissions(self, user_id): user_permissions = self.get_user_permissions(user_id) menu_permissions = [p for p in user_permissions if p.menu_name is not None] # You might want to structure this hierarchically if you have parent_id relationships return menu_permissions if __name__ == "__main__": print("hello world")