import logging import uuid from logging.handlers import RotatingFileHandler from fastapi import FastAPI, Request, Response, status from typing import Optional, Set # 导入FastAPI及相关模块 import os import uvicorn from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from router.knowledge_dify import dify_kb_router from router.knowledge_saas import saas_kb_router from router.text_search import text_search_router from router.graph_router import graph_router # from router.knowledge_nodes_api import knowledge_nodes_api_router # 配置日志 logging.basicConfig( level=logging.ERROR, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), RotatingFileHandler('app.log', maxBytes=10485760, backupCount=5, encoding='utf-8') ] ) logger = logging.getLogger(__name__) logger.propagate = True # 创建FastAPI应用 #app = FastAPI(title="医疗知识",root_path="/knowledge") app = FastAPI(title="医疗知识") app.include_router(dify_kb_router) app.include_router(saas_kb_router) app.include_router(text_search_router) app.include_router(graph_router) # app.include_router(knowledge_nodes_api_router) # 挂载静态文件目录,将/books路径映射到本地books文件夹 app.mount("/books", StaticFiles(directory="books"), name="books") # 允许所有来源(仅用于测试,生产环境应限制) app.add_middleware( CORSMiddleware, allow_origins=["*"], # 允许所有来源(或指定 ["http://localhost:3000"]) allow_credentials=True, # 允许携带 Cookie allow_methods=["*"], # 允许所有方法(或指定 ["GET", "POST"]) allow_headers=["*"], # 允许所有请求头 ) # 需要拦截的 URL 列表(支持通配符) INTERCEPT_URLS = { "/v1/knowledge/*" } # 白名单 URL(不需要拦截的路径) WHITE_LIST = { "/api/public", "/admin/login" } async def verify_token(authorization: str) -> Optional[dict]: """ 验证 token 有效性 返回:验证成功返回用户信息字典,失败返回 None """ if not authorization.startswith("Bearer "): return None token = authorization[7:] # 这里添加实际的 token 验证逻辑 # 示例:简单验证 token 是否等于 secret-token if token == "secret-token": return {"id": 1, "username": "admin", "role": "admin"} return None def should_intercept(path: str) -> bool: """ 判断是否需要拦截当前路径 """ if path in WHITE_LIST: return False for pattern in INTERCEPT_URLS: # 处理通配符匹配 if pattern.endswith("/*"): if path.startswith(pattern[:-1]): return True # 精确匹配 elif path == pattern: return True return False @app.middleware("http") async def interceptor_middleware(request: Request, call_next): path = request.url.path if not should_intercept(path): return await call_next(request) # 权限校验 auth_header = request.headers.get("Authorization") if not auth_header: return Response( content="Missing Authorization header", status_code=status.HTTP_401_UNAUTHORIZED ) user_info = await verify_token(auth_header) if not user_info: return Response( content="Invalid token", status_code=status.HTTP_401_UNAUTHORIZED ) # 初始化操作:将用户信息添加到请求状态中 request.state.user = user_info # 添加请求上下文(示例) request.state.context = { "request_id": request.headers.get("request-id", str(uuid.uuid4())), "client_ip": request.client.host } # 继续处理请求 response = await call_next(request) # 可以在返回前添加统一响应处理(如添加头信息) response.headers["request-id"]=request.state.context["request_id"] return response #capability = CDSSCapability() if __name__ == "__main__": logger.info('Starting uvicorn server...2222') #uvicorn main:app --host 0.0.0.0 --port 8000 --reload uvicorn.run("main:app", host="0.0.0.0", port=8001, reload=False)