import uuid import logging import hashlib def hash_pwd(password): return hashlib.sha256(password.encode()).hexdigest() logger = logging.getLogger(__name__) from datetime import datetime,timedelta from agent.models.db.user import User,Session class UserBusiness: def __init__(self, db): self.db = db def get_user(self, user_id): return self.db.query(User).filter(User.id == user_id).first() def get_user_by_username(self, username): return self.db.query(User).filter(User.username == username).first() def create_user(self, username, password, fullname, email=""): password = hash_pwd(password) user = User(username=username, hashed_password=password, full_name=fullname, email=email) self.db.add(user) self.db.commit() self.db.refresh(user) return user def update_user(self, user_id, username=None, password=None): user = self.get_user(user_id) if user: if username: user.username = username if password: password = hash_pwd(password) user.password = password self.db.commit() self.db.refresh(user) return user def delete_user(self, user_id): user = self.get_user(user_id) if user: self.db.delete(user) self.db.commit() return user def verify_password(self, request_password, user_hashed_password): hashed_password = hash_pwd(request_password) logger.info(f"verify password: {hashed_password} == {user_hashed_password}") return hashed_password == user_hashed_password class SessionBusiness: def __init__(self, db): self.db = db def create_session(self, user:User): session_id = str(uuid.uuid4()) session = Session(session_id=session_id, user_id=user.id, username=user.username, full_name=user.full_name) self.db.add(session) self.db.commit() self.db.refresh(session) return session def get_session(self, session_id): return self.db.query(Session).filter(Session.session_id == session_id).first() def delete_session(self, session_id): session = self.get_session(session_id) if session: self.db.delete(session) self.db.commit() return True def update_session(self, session_id): session = self.get_session(session_id) if session: session.updated = datetime.now() self.db.commit() self.db.refresh(session) def get_session_by_user_id(self, user_id): return self.db.query(Session).filter(Session.user_id == user_id).first() def validate_session(self, username, session_id): session = self.get_session(session_id) if session: if session.username != username: return None expired:timedelta = datetime.now() - session.updated if expired.seconds > 1800 : # 30 minutes logger.info(f"session expired: {session_id}") #self.delete_session(session_id) return None else: self.update_session(session_id) return session if __name__ == "__main__": print("hello world")