main.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. import logging
  2. import uuid
  3. from logging.handlers import RotatingFileHandler
  4. from fastapi import FastAPI, Request, Response, status
  5. from typing import Optional, Set
  6. # 导入FastAPI及相关模块
  7. import os
  8. import uvicorn
  9. from router.knowledge_dify import dify_kb_router
  10. from router.knowledge_saas import saas_kb_router
  11. from router.text_search import text_search_router
  12. from router.graph_router import graph_router
  13. from router.knowledge_nodes_api import knowledge_nodes_api_router
  14. # 配置日志
  15. logging.basicConfig(
  16. level=logging.INFO,
  17. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  18. handlers=[
  19. logging.StreamHandler(),
  20. RotatingFileHandler('app.log', maxBytes=10485760, backupCount=5, encoding='utf-8')
  21. ]
  22. )
  23. logger = logging.getLogger(__name__)
  24. logger.propagate = True
  25. # 创建FastAPI应用
  26. app = FastAPI(title="知识图谱")
  27. app.include_router(dify_kb_router)
  28. app.include_router(saas_kb_router)
  29. app.include_router(text_search_router)
  30. app.include_router(graph_router)
  31. app.include_router(knowledge_nodes_api_router)
  32. # 需要拦截的 URL 列表(支持通配符)
  33. INTERCEPT_URLS = {
  34. "/v1/knowledge/*"
  35. }
  36. # 白名单 URL(不需要拦截的路径)
  37. WHITE_LIST = {
  38. "/api/public",
  39. "/admin/login"
  40. }
  41. async def verify_token(authorization: str) -> Optional[dict]:
  42. """
  43. 验证 token 有效性
  44. 返回:验证成功返回用户信息字典,失败返回 None
  45. """
  46. if not authorization.startswith("Bearer "):
  47. return None
  48. token = authorization[7:]
  49. # 这里添加实际的 token 验证逻辑
  50. # 示例:简单验证 token 是否等于 secret-token
  51. if token == "secret-token":
  52. return {"id": 1, "username": "admin", "role": "admin"}
  53. return None
  54. def should_intercept(path: str) -> bool:
  55. """
  56. 判断是否需要拦截当前路径
  57. """
  58. if path in WHITE_LIST:
  59. return False
  60. for pattern in INTERCEPT_URLS:
  61. # 处理通配符匹配
  62. if pattern.endswith("/*"):
  63. if path.startswith(pattern[:-1]):
  64. return True
  65. # 精确匹配
  66. elif path == pattern:
  67. return True
  68. return False
  69. @app.middleware("http")
  70. async def interceptor_middleware(request: Request, call_next):
  71. path = request.url.path
  72. if not should_intercept(path):
  73. return await call_next(request)
  74. # 权限校验
  75. auth_header = request.headers.get("Authorization")
  76. if not auth_header:
  77. return Response(
  78. content="Missing Authorization header",
  79. status_code=status.HTTP_401_UNAUTHORIZED
  80. )
  81. user_info = await verify_token(auth_header)
  82. if not user_info:
  83. return Response(
  84. content="Invalid token",
  85. status_code=status.HTTP_401_UNAUTHORIZED
  86. )
  87. # 初始化操作:将用户信息添加到请求状态中
  88. request.state.user = user_info
  89. # 添加请求上下文(示例)
  90. request.state.context = {
  91. "request_id": request.headers.get("request-id", str(uuid.uuid4())),
  92. "client_ip": request.client.host
  93. }
  94. # 继续处理请求
  95. response = await call_next(request)
  96. # 可以在返回前添加统一响应处理(如添加头信息)
  97. response.headers["request-id"]=request.state.context["request_id"]
  98. return response
  99. if __name__ == "__main__":
  100. logger.info('Starting uvicorn server...2222')
  101. #uvicorn main:app --host 0.0.0.0 --port 8000 --reload
  102. uvicorn.run("main:app", host="0.0.0.0", port=8001, reload=False)