auth.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from agent.db.database import SessionLocal
  2. from agent.libs.sys import SysUserRoleOrganBusiness
  3. from agent.libs.user import SessionBusiness, UserBusiness
  4. from fastapi import Depends, HTTPException, Request, status
  5. class SessionValues:
  6. def __init__(self, session_id: str, user_id: str, username: str, full_name: str, suro_id:int,suro_type:int,suro_role:int,suro_organ:int):
  7. self.session_id = session_id
  8. self.user_id = user_id
  9. self.username = username
  10. self.full_name = full_name
  11. self.suro_id = suro_id
  12. self.suro_type = suro_type
  13. self.suro_role = suro_role
  14. self.suro_organ = suro_organ
  15. def verify_session_id(request: Request)-> SessionValues:
  16. # 获取 Authorization 头
  17. with SessionLocal() as db:
  18. session_business = SessionBusiness(db)
  19. user_business = UserBusiness(db)
  20. auth_header = request.headers.get("Authorization")
  21. if not auth_header:
  22. raise HTTPException(
  23. status_code=status.HTTP_401_UNAUTHORIZED,
  24. detail="Authorization header is missing",
  25. headers={"WWW-Authenticate": "Beaver"}
  26. )
  27. # 检查 Authorization 头是否符合预期格式
  28. if not auth_header.startswith("Beaver "):
  29. print("Invalid Authorization header format", auth_header)
  30. raise HTTPException(
  31. status_code=status.HTTP_401_UNAUTHORIZED,
  32. detail="Invalid Authorization header format",
  33. headers={"WWW-Authenticate": "Beaver"}
  34. )
  35. # 提取 session_id
  36. session_user_id = auth_header.split(" ")[1]
  37. session_id = auth_header.split(" ")[2]
  38. # return SessionValues(session_id, '', session_user_id, '')
  39. # 在这里添加你的 session_id 校验逻辑
  40. # 例如,检查 session_id 是否在数据库中存在
  41. sessionData = session_business.validate_session(session_user_id, session_id)
  42. if not sessionData:
  43. print("Invalid session_id", session_user_id, session_id)
  44. raise HTTPException(
  45. status_code=status.HTTP_401_UNAUTHORIZED,
  46. detail="Invalid session_id",
  47. headers={"WWW-Authenticate": "Beaver"}
  48. )
  49. user = user_business.get_user_by_username(session_user_id)
  50. if user is None:
  51. print("Invalid user_id", session_user_id)
  52. raise HTTPException(
  53. status_code=status.HTTP_401_UNAUTHORIZED,
  54. detail="Invalid username",
  55. headers={"WWW-Authenticate": "Beaver"}
  56. )
  57. if sessionData.user_role_organ_id is not None:
  58. biz = SysUserRoleOrganBusiness(db)
  59. SURO_data = biz.getSUROById(sessionData.user_role_organ_id);
  60. return SessionValues(session_id, user.id, user.username, user.full_name,SURO_data.id,SURO_data.data_type,SURO_data.role_id,SURO_data.organ_id)
  61. return SessionValues(session_id, user.id, user.username, user.full_name,None,None,None,None,)
  62. # 如果校验通过,返回 session_id 或其他需要的信息
  63. return None