from fastapi import Depends, HTTPException, Request, status from agent.db.database import SessionLocal from agent.libs.user import SessionBusiness, UserBusiness class SessionValues: def __init__(self, session_id: str, user_id: str, username: str, full_name: str): self.session_id = session_id self.user_id = user_id self.username = username self.full_name = full_name def verify_session_id(request: Request)-> SessionValues: # 获取 Authorization 头 with SessionLocal() as db: session_business = SessionBusiness(db) user_business = UserBusiness(db) auth_header = request.headers.get("Authorization") if not auth_header: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authorization header is missing", headers={"WWW-Authenticate": "Beaver"} ) # 检查 Authorization 头是否符合预期格式 if not auth_header.startswith("Beaver "): print("Invalid Authorization header format", auth_header) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Authorization header format", headers={"WWW-Authenticate": "Beaver"} ) # 提取 session_id session_user_id = auth_header.split(" ")[1] session_id = auth_header.split(" ")[2] # return SessionValues(session_id, '', session_user_id, '') # 在这里添加你的 session_id 校验逻辑 # 例如,检查 session_id 是否在数据库中存在 if not session_business.validate_session(session_user_id, session_id): print("Invalid session_id", session_user_id, session_id) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid session_id", headers={"WWW-Authenticate": "Beaver"} ) user = user_business.get_user_by_username(session_user_id) if user is None: print("Invalid user_id", session_user_id) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username", headers={"WWW-Authenticate": "Beaver"} ) return SessionValues(session_id, user.id, user.username, user.full_name) # 如果校验通过,返回 session_id 或其他需要的信息 return None