123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- from agent.db.database import SessionLocal
- from agent.libs.sys import SysUserRoleOrganBusiness
- from agent.libs.user import SessionBusiness, UserBusiness
- from fastapi import Depends, HTTPException, Request, status
- class SessionValues:
- def __init__(self, session_id: str, user_id: str, username: str, full_name: str, suro_id:int,suro_type:int,suro_role:int,suro_organ:int):
- self.session_id = session_id
- self.user_id = user_id
- self.username = username
- self.full_name = full_name
- self.suro_id = suro_id
- self.suro_type = suro_type
- self.suro_role = suro_role
- self.suro_organ = suro_organ
-
-
- def verify_session_id(request: Request)-> SessionValues:
- # 获取 Authorization 头
- with SessionLocal() as db:
- session_business = SessionBusiness(db)
- user_business = UserBusiness(db)
- auth_header = request.headers.get("Authorization")
- if not auth_header:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Authorization header is missing",
- headers={"WWW-Authenticate": "Beaver"}
- )
- # 检查 Authorization 头是否符合预期格式
- if not auth_header.startswith("Beaver "):
- print("Invalid Authorization header format", auth_header)
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid Authorization header format",
- headers={"WWW-Authenticate": "Beaver"}
- )
- # 提取 session_id
- session_user_id = auth_header.split(" ")[1]
- session_id = auth_header.split(" ")[2]
- # return SessionValues(session_id, '', session_user_id, '')
- # 在这里添加你的 session_id 校验逻辑
- # 例如,检查 session_id 是否在数据库中存在
- sessionData = session_business.validate_session(session_user_id, session_id)
- if not sessionData:
- print("Invalid session_id", session_user_id, session_id)
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid session_id",
- headers={"WWW-Authenticate": "Beaver"}
- )
- user = user_business.get_user_by_username(session_user_id)
- if user is None:
- print("Invalid user_id", session_user_id)
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid username",
- headers={"WWW-Authenticate": "Beaver"}
- )
- if sessionData.user_role_organ_id is not None:
- biz = SysUserRoleOrganBusiness(db)
- SURO_data = biz.getSUROById(sessionData.user_role_organ_id);
- return SessionValues(session_id, user.id, user.username, user.full_name,SURO_data.id,SURO_data.data_type,SURO_data.role_id,SURO_data.organ_id)
- return SessionValues(session_id, user.id, user.username, user.full_name,None,None,None,None,)
- # 如果校验通过,返回 session_id 或其他需要的信息
- return None
|