123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- import uuid
- import logging
- import hashlib
- from typing import Optional
- def hash_pwd(password):
- return hashlib.sha256(password.encode()).hexdigest()
- logger = logging.getLogger(__name__)
- from datetime import datetime,timedelta
- from agent.models.db.user import User,Session,Role,Permission, user_roles, role_permissions
- class UserBusiness:
- def __init__(self, db):
- self.db = db
-
- def get_user(self, user_id):
- return self.db.query(User).filter(User.id == user_id).first()
-
- def get_user_by_username(self, username):
- return self.db.query(User).filter(User.username == username).first()
- def get_all_users(self):
- return self.db.query(User).all()
- def get_users_paginated(self, username: Optional[str], page_no: int, page_size: int):
- query = self.db.query(User)
- if username:
- query = query.filter(User.username.ilike(f"%{username}%"))
- total_count = query.count()
- users = query.offset((page_no - 1) * page_size).limit(page_size).all()
- return users, total_count
- def create_user(self, username, password, fullname, email=""):
- password = hash_pwd(password)
- user = User(username=username, hashed_password=password, full_name=fullname, email=email)
- self.db.add(user)
- self.db.commit()
- self.db.refresh(user)
- return user
- def update_user(self, user_id, username=None, password=None):
- user = self.get_user(user_id)
- if user:
- if username:
- user.username = username
- if password:
- password = hash_pwd(password)
- user.password = password
- self.db.commit()
- self.db.refresh(user)
- return user
- def delete_user(self, user_id):
- user = self.get_user(user_id)
- if user:
- self.db.delete(user)
- self.db.commit()
- return user
- def verify_password(self, request_password, user_hashed_password):
- hashed_password = hash_pwd(request_password)
- logger.info(f"verify password: {hashed_password} == {user_hashed_password}")
- return hashed_password == user_hashed_password
-
- class SessionBusiness:
- def __init__(self, db):
- self.db = db
- def create_session(self, user:User):
- session_id = str(uuid.uuid4())
- session = Session(session_id=session_id, user_id=user.id, username=user.username, full_name=user.full_name)
- self.db.add(session)
- self.db.commit()
- self.db.refresh(session)
- return session
- def get_session(self, session_id):
- return self.db.query(Session).filter(Session.session_id == session_id).first()
- def delete_session(self, session_id):
- session = self.get_session(session_id)
- if session:
- self.db.delete(session)
- self.db.commit()
- return True
- def update_session(self, session_id):
- session = self.get_session(session_id)
- if session:
- session.updated = datetime.now()
- self.db.commit()
- self.db.refresh(session)
- def get_session_by_user_id(self, user_id):
- return self.db.query(Session).filter(Session.user_id == user_id).first()
-
- def validate_session(self, username, session_id):
- session = self.get_session(session_id)
- if session:
- if session.username != username:
- return None
- expired:timedelta = datetime.now() - session.updated
-
- if expired.seconds > 1800 : # 30 minutes
- logger.info(f"session expired: {session_id}")
- #self.delete_session(session_id)
- return None
- else:
- self.update_session(session_id)
- return session
-
- class RoleBusiness:
- def __init__(self, db):
- self.db = db
- def create_role(self, name, description=""):
- role = Role(name=name, description=description)
- self.db.add(role)
- self.db.commit()
- self.db.refresh(role)
- return role
- def get_role_by_name(self, name):
- return self.db.query(Role).filter(Role.name == name).first()
- def get_role(self, role_id):
- return self.db.query(Role).filter(Role.id == role_id).first()
- def get_all_roles(self):
- return self.db.query(Role).all()
- def delete_role(self, role_id):
- role = self.get_role(role_id)
- if role:
- self.db.delete(role)
- self.db.commit()
- return role
- def update_role(self, role_id, name=None, description=None):
- role = self.get_role(role_id)
- if role:
- if name:
- role.name = name
- if description:
- role.description = description
- self.db.commit()
- self.db.refresh(role)
- return role
- def assign_permission_to_role(self, role_id, permission_id):
- role = self.get_role(role_id)
- permission = self.db.query(Permission).filter(Permission.id == permission_id).first()
- if role and permission:
- role.permissions.append(permission)
- self.db.commit()
- return True
- return False
- def revoke_permission_from_role(self, role_id, permission_id):
- role = self.get_role(role_id)
- permission = self.db.query(Permission).filter(Permission.id == permission_id).first()
- if role and permission and permission in role.permissions:
- role.permissions.remove(permission)
- self.db.commit()
- return True
- return False
- def revoke_all_permissions_from_role(self, role_id):
- role = self.get_role(role_id)
- if role:
- role.permissions.clear()
- self.db.commit()
- return True
- return False
- def get_role_permissions(self, role_id):
- role = self.get_role(role_id)
- if role:
- return role.permissions
- return []
- class PermissionBusiness:
- def __init__(self, db):
- self.db = db
- def create_permission(self, name, description="", menu_name=None, menu_route=None, menu_icon=None, parent_id=None):
- permission = Permission(
- name=name,
- description=description,
- menu_name=menu_name,
- menu_route=menu_route,
- menu_icon=menu_icon,
- parent_id=parent_id
- )
- self.db.add(permission)
- self.db.commit()
- self.db.refresh(permission)
- return permission
- def get_permission_by_name(self, name):
- return self.db.query(Permission).filter(Permission.name == name).first()
-
- def get_permission(self, permission_id):
- return self.db.query(Permission).filter(Permission.id == permission_id).first()
- def get_all_permissions(self):
- return self.db.query(Permission).all()
- def delete_permission(self, permission_id):
- permission = self.get_permission(permission_id)
- if permission:
- self.db.delete(permission)
- self.db.commit()
- return permission
- class UserRoleBusiness:
- def __init__(self, db):
- self.db = db
- self.user_biz = UserBusiness(db)
- self.role_biz = RoleBusiness(db)
- def assign_role_to_user(self, user_id, role_id):
- user = self.user_biz.get_user(user_id)
- role = self.role_biz.get_role(role_id)
- if user and role:
- user.roles.append(role)
- self.db.commit()
- return True
- return False
- def revoke_role_from_user(self, user_id, role_id):
- user = self.user_biz.get_user(user_id)
- role = self.role_biz.get_role(role_id)
- if user and role and role in user.roles:
- user.roles.remove(role)
- self.db.commit()
- return True
- return False
- def get_user_roles(self, user_id):
- user = self.user_biz.get_user(user_id)
- if user:
- return user.roles
- return []
- def get_user_permissions(self, user_id):
- user = self.user_biz.get_user(user_id)
- if not user:
- return []
- permissions = set()
- for role in user.roles:
- for perm in role.permissions:
- permissions.add(perm)
- return list(permissions)
- def get_user_menu_permissions(self, user_id):
- user_permissions = self.get_user_permissions(user_id)
- menu_permissions = [p for p in user_permissions if p.menu_name is not None]
- # You might want to structure this hierarchically if you have parent_id relationships
- return menu_permissions
- if __name__ == "__main__":
- print("hello world")
|