main.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import logging
  2. import uuid
  3. from logging.handlers import RotatingFileHandler
  4. from fastapi import FastAPI, Request, Response, status
  5. from typing import Optional, Set
  6. # 导入FastAPI及相关模块
  7. import os
  8. import uvicorn
  9. from agent.cdss.capbility import CDSSCapability
  10. from router.knowledge_dify import dify_kb_router
  11. from router.knowledge_saas import saas_kb_router
  12. from router.text_search import text_search_router
  13. from router.graph_router import graph_router
  14. from router.knowledge_nodes_api import knowledge_nodes_api_router
  15. # 配置日志
  16. logging.basicConfig(
  17. level=logging.ERROR,
  18. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  19. handlers=[
  20. logging.StreamHandler(),
  21. RotatingFileHandler('app.log', maxBytes=10485760, backupCount=5, encoding='utf-8')
  22. ]
  23. )
  24. logger = logging.getLogger(__name__)
  25. logger.propagate = True
  26. # 创建FastAPI应用
  27. app = FastAPI(title="知识图谱")
  28. app.include_router(dify_kb_router)
  29. app.include_router(saas_kb_router)
  30. app.include_router(text_search_router)
  31. app.include_router(graph_router)
  32. app.include_router(knowledge_nodes_api_router)
  33. # 需要拦截的 URL 列表(支持通配符)
  34. INTERCEPT_URLS = {
  35. "/v1/knowledge/*"
  36. }
  37. # 白名单 URL(不需要拦截的路径)
  38. WHITE_LIST = {
  39. "/api/public",
  40. "/admin/login"
  41. }
  42. async def verify_token(authorization: str) -> Optional[dict]:
  43. """
  44. 验证 token 有效性
  45. 返回:验证成功返回用户信息字典,失败返回 None
  46. """
  47. if not authorization.startswith("Bearer "):
  48. return None
  49. token = authorization[7:]
  50. # 这里添加实际的 token 验证逻辑
  51. # 示例:简单验证 token 是否等于 secret-token
  52. if token == "secret-token":
  53. return {"id": 1, "username": "admin", "role": "admin"}
  54. return None
  55. def should_intercept(path: str) -> bool:
  56. """
  57. 判断是否需要拦截当前路径
  58. """
  59. if path in WHITE_LIST:
  60. return False
  61. for pattern in INTERCEPT_URLS:
  62. # 处理通配符匹配
  63. if pattern.endswith("/*"):
  64. if path.startswith(pattern[:-1]):
  65. return True
  66. # 精确匹配
  67. elif path == pattern:
  68. return True
  69. return False
  70. @app.middleware("http")
  71. async def interceptor_middleware(request: Request, call_next):
  72. path = request.url.path
  73. if not should_intercept(path):
  74. return await call_next(request)
  75. # 权限校验
  76. auth_header = request.headers.get("Authorization")
  77. if not auth_header:
  78. return Response(
  79. content="Missing Authorization header",
  80. status_code=status.HTTP_401_UNAUTHORIZED
  81. )
  82. user_info = await verify_token(auth_header)
  83. if not user_info:
  84. return Response(
  85. content="Invalid token",
  86. status_code=status.HTTP_401_UNAUTHORIZED
  87. )
  88. # 初始化操作:将用户信息添加到请求状态中
  89. request.state.user = user_info
  90. # 添加请求上下文(示例)
  91. request.state.context = {
  92. "request_id": request.headers.get("request-id", str(uuid.uuid4())),
  93. "client_ip": request.client.host
  94. }
  95. # 继续处理请求
  96. response = await call_next(request)
  97. # 可以在返回前添加统一响应处理(如添加头信息)
  98. response.headers["request-id"]=request.state.context["request_id"]
  99. return response
  100. #capability = CDSSCapability()
  101. if __name__ == "__main__":
  102. logger.info('Starting uvicorn server...2222')
  103. #uvicorn main:app --host 0.0.0.0 --port 8000 --reload
  104. uvicorn.run("main:app", host="0.0.0.0", port=8001, reload=False)