from agent.db.database import SessionLocal from agent.libs.sys import SysUserRoleOrganBusiness from agent.libs.user import SessionBusiness, UserBusiness from fastapi import Depends, HTTPException, Request, status class SessionValues: def __init__(self, session_id: str, user_id: str, username: str, full_name: str, suro_id:int,suro_type:int,suro_role:int,suro_organ:int): self.session_id = session_id self.user_id = user_id self.username = username self.full_name = full_name self.suro_id = suro_id self.suro_type = suro_type self.suro_role = suro_role self.suro_organ = suro_organ def verify_session_id(request: Request)-> SessionValues: # 获取 Authorization 头 with SessionLocal() as db: session_business = SessionBusiness(db) user_business = UserBusiness(db) auth_header = request.headers.get("Authorization") if not auth_header: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authorization header is missing", headers={"WWW-Authenticate": "Beaver"} ) # 检查 Authorization 头是否符合预期格式 if not auth_header.startswith("Beaver "): print("Invalid Authorization header format", auth_header) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Authorization header format", headers={"WWW-Authenticate": "Beaver"} ) # 提取 session_id session_user_id = auth_header.split(" ")[1] session_id = auth_header.split(" ")[2] # return SessionValues(session_id, '', session_user_id, '') # 在这里添加你的 session_id 校验逻辑 # 例如,检查 session_id 是否在数据库中存在 sessionData = session_business.validate_session(session_user_id, session_id) if not sessionData: print("Invalid session_id", session_user_id, session_id) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid session_id", headers={"WWW-Authenticate": "Beaver"} ) user = user_business.get_user_by_username(session_user_id) if user is None: print("Invalid user_id", session_user_id) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username", headers={"WWW-Authenticate": "Beaver"} ) if sessionData.user_role_organ_id is not None: biz = SysUserRoleOrganBusiness(db) SURO_data = biz.getSUROById(sessionData.user_role_organ_id); return SessionValues(session_id, user.id, user.username, user.full_name,SURO_data.id,SURO_data.data_type,SURO_data.role_id,SURO_data.organ_id) return SessionValues(session_id, user.id, user.username, user.full_name,None,None,None,None,) # 如果校验通过,返回 session_id 或其他需要的信息 return None