SGTY 4 روز پیش
والد
کامیت
d909bac075
2فایلهای تغییر یافته به همراه389 افزوده شده و 15 حذف شده
  1. 387 13
      src/knowledge/main.py
  2. 2 2
      src/knowledge/router/medical_knowledge_api.py

+ 387 - 13
src/knowledge/main.py

@@ -21,9 +21,7 @@ app = FastAPI(
     lifespan=lifespan,
     middleware=register_middlewares(),  # 注册web中间件
 )
-mcp = FastApiMCP(app)
-mcp.mount()
-mcp.setup_server()
+
 
 async def startup(app: FastAPI):
     """项目启动时准备环境"""
@@ -35,7 +33,6 @@ async def startup(app: FastAPI):
     from .router.knowledge_nodes_api import knowledge_nodes_api_router
     from .router.knowledge_saas import saas_kb_router
     from .router.text_search import text_search_router
-    from .router.medical_knowledge_api import medical_knowledge_router
     from .utils import log_util
     
     log_util.setup_logger()
@@ -45,7 +42,7 @@ async def startup(app: FastAPI):
     app.include_router(text_search_router)
     app.include_router(graph_router)
     app.include_router(saas_kb_router)
-    app.include_router(medical_knowledge_router)
+    #app.include_router(medical_knowledge_router)
     # 挂载静态文件目录
     config = SiteConfig()
     books_path = Path(config.get_config("BOOKS"))
@@ -62,14 +59,391 @@ async def shutdown():
     logger.error("app shutdown")
 
 
-@app.get("/health")
-async def health_check():
-    """健康检查接口"""
-    return {
-        "status": "ok",
-        "timestamp": datetime.utcnow().isoformat(),
-        "service": "knowledge-graph"
-    }
+# @app.get("/health")
+# async def health_check():
+#     """健康检查接口"""
+#     return {
+#         "status": "ok",
+#         "timestamp": datetime.utcnow().isoformat(),
+#         "service": "knowledge-graph"
+#     }
+
+from fastapi import APIRouter, Depends, HTTPException
+from pydantic import BaseModel
+from typing import List, Optional, Dict, Any
+
+
+from .model.response import StandardResponse
+from .db.session import get_db
+from sqlalchemy.orm import Session
+import logging
+
+
+from .service.search_service import SearchBusiness
+logger = logging.getLogger(__name__)
+
+
+# 1. 疾病与症状相关接口
+class SymptomDiseasesRequest(BaseModel):
+    symptoms: List[str]
+
+
+class DiseaseSymptomsRequest(BaseModel):
+    disease_id: str
+
+
+class DiseaseInfoRequest(BaseModel):
+    query: str
+    type: Optional[str] = None
+
+
+@app.post("/symptom_diseases", response_model=StandardResponse, operation_id="根据症状获取相关疾病列表",
+          summary="根据症状获取相关疾病列表",
+          description="""根据输入的症状列表,查询可能相关的疾病列表。
+         该接口主要用于医疗知识图谱查询场景,例如:通过输入症状名称,
+         返回可能相关的疾病信息。
+         典型应用场景包括:
+         - 初步诊断:输入患者症状获取可能疾病
+         - 鉴别诊断:通过症状缩小疾病范围
+         - 健康教育:了解症状可能对应的疾病""",
+          response_description="""返回疾病名称的字符串列表,格式为:
+         ["疾病名称1", "疾病名称2", ...]""")
+async def get_symptom_diseases(
+        request: SymptomDiseasesRequest
+):
+    try:
+        # 实现获取症状相关疾病的逻辑
+        search = SearchBusiness()
+        results = search.get_symptom_diseases(request.symptoms)
+        return StandardResponse(success=True, data=results)
+    except Exception as e:
+        logger.exception(f"获取症状相关疾病失败: {str(e)}")
+        raise HTTPException(500, detail=StandardResponse.error(str(e)))
+
+@app.post("/disease_symptoms",
+          response_model=StandardResponse,
+          operation_id="获取疾病相关症状",
+          summary="获取疾病相关症状",
+          description="根据疾病ID获取该疾病的所有症状及其严重程度权重",
+          response_description="""返回包含症状列表和症状严重程度权重的JSON结构,格式为:
+    {
+        "success": bool,
+        "data": {
+            {"symptoms": [], "severity_weights": {}}
+        }
+    }"""
+          )
+async def get_disease_symptoms(
+        request: DiseaseSymptomsRequest
+):
+    try:
+        # 实现获取疾病症状的逻辑
+        search = SearchBusiness()
+        results = search.get_disease_symptoms(request.disease_id)
+        return StandardResponse(success=True, data=results)
+    except Exception as e:
+        logger.error(f"获取疾病症状失败: {str(e)}")
+        raise HTTPException(500, detail=StandardResponse.error(str(e)))
+
+
+# @app.post("/disease_info", response_model=StandardResponse)
+# async def get_disease_info(
+#         request: DiseaseInfoRequest,
+#         db: Session = Depends(get_db)
+# ):
+#     try:
+#         # 实现搜索医学概念的逻辑
+#         return StandardResponse(success=True, data=[])
+#     except Exception as e:
+#         logger.error(f"搜索医学概念失败: {str(e)}")
+#         raise HTTPException(500, detail=StandardResponse.error(str(e)))
+#
+#
+# # 2. 药物相关接口
+# class DrugRequest(BaseModel):
+#     drug_id: str
+#
+#
+# @app.post("/drug_indications", response_model=StandardResponse)
+# async def get_drug_indications(
+#         request: DrugRequest,
+#         db: Session = Depends(get_db)
+# ):
+#     try:
+#         # 实现获取药物适应症的逻辑
+#         return StandardResponse(success=True, data=[])
+#     except Exception as e:
+#         logger.error(f"获取药物适应症失败: {str(e)}")
+#         raise HTTPException(500, detail=StandardResponse.error(str(e)))
+
+
+# @app.post("/drug_contraindications", response_model=StandardResponse)
+# async def get_drug_contraindications(
+#         request: DrugRequest,
+#         db: Session = Depends(get_db)
+# ):
+#     try:
+#         # 实现获取药物禁忌症的逻辑
+#         return StandardResponse(success=True, data=[])
+#     except Exception as e:
+#         logger.error(f"获取药物禁忌症失败: {str(e)}")
+#         raise HTTPException(500, detail=StandardResponse.error(str(e)))
+
+
+# @app.post("/drug_interactions", response_model=StandardResponse)
+# async def get_drug_interactions(
+#         request: DrugRequest,
+#         db: Session = Depends(get_db)
+# ):
+#     try:
+#         # 实现获取药物相互作用的逻辑
+#         return StandardResponse(success=True, data=[])
+#     except Exception as e:
+#         logger.error(f"获取药物相互作用失败: {str(e)}")
+#         raise HTTPException(500, detail=StandardResponse.error(str(e)))
+
+
+# 3. 关系与概念处理接口
+class ConceptSearchRequest(BaseModel):
+    query: str
+    type: Optional[str] = None
+
+
+class ConceptRelationsRequest(BaseModel):
+    concept_id: str
+
+
+class SimilarConceptsRequest(BaseModel):
+    concept_id: str
+    top_k: int = 5
+
+
+@app.post("/search_concept",
+          operation_id="医学概念搜索",
+          summary="根据名称和类型搜索医学概念",
+          description="""根据概念名称和类型搜索医学概念节点。
+    该接口主要用于医疗知识图谱中的概念搜索场景,例如:
+    - 通过输入疾病名称和类型搜索相关疾病概念
+    - 通过输入症状名称和类型搜索相关症状概念
+    - 通过输入药物名称和类型搜索相关药物概念
+
+    典型应用场景包括:
+    - 疾病概念搜索
+    - 症状概念搜索
+    - 药物概念搜索""",
+          response_description="""返回标准响应格式,包含匹配的概念列表。
+    格式为:
+    {
+        "success": bool,
+        "data": {
+            "concepts": [
+                {"id": str, "name": str, "type": str}
+            ]
+        }
+    }""",
+          response_model=StandardResponse
+          )
+async def search_concept(
+        request: ConceptSearchRequest
+):
+    try:
+        # 实现搜索医学概念的逻辑
+        search = SearchBusiness()
+        results = search.search_concept(name=request.query, type=request.type)
+        return StandardResponse(success=True, data=results)
+    except Exception as e:
+        logger.error(f"搜索医学概念失败: {str(e)}")
+        raise HTTPException(500, detail=StandardResponse.error(str(e)))
+
+
+@app.post("/get_relations",
+          operation_id="医学概念关系查询",
+          summary="根据概念ID查询相关关系",
+          description="""根据概念ID查询该概念的相关关系。
+    该接口主要用于医疗知识图谱中的关系查询场景,例如:
+    - 通过输入疾病概念ID查询相关症状概念ID
+    - 通过输入症状概念ID查询相关疾病概念ID
+    - 通过输入药物概念ID查询相关治疗方法概念ID
+
+    典型应用场景包括:
+    - 疾病症状关系查询
+    - 症状疾病关系查询
+    - 药物治疗方法关系查询""",
+          response_description="""返回标准响应格式,包含相关关系列表。
+    格式为:
+    {
+        "success": bool,
+        "data": {
+            "relations": [
+                {"relation_type": str, "target_id": str, "target_name": str}
+            ]
+        }
+    }""",
+          response_model=StandardResponse
+          )
+async def get_relations(
+        request: ConceptRelationsRequest
+):
+    try:
+        # 实现获取概念关系的逻辑
+        search = SearchBusiness()
+        results = search.get_relations(src_id=request.concept_id)
+        return StandardResponse(success=True, data=results)
+    except Exception as e:
+        logger.error(f"获取概念关系失败: {str(e)}")
+        raise HTTPException(500, detail=StandardResponse.error(str(e)))
+
+
+@app.post("/get_similar_concepts", response_model=StandardResponse,
+          operation_id="相似概念查询",
+          summary="根据概念ID查询相似概念",
+          description="""根据概念ID查询该概念的相似概念。
+    该接口主要用于医疗知识图谱中的相似概念查询场景,例如:
+    - 通过输入疾病概念ID查询相关症状概念ID
+    - 通过输入症状概念ID查询相关疾病概念ID
+    - 通过输入药物概念ID查询相关治疗方法概念ID
+
+    典型应用场景包括:
+    - 疾病相似概念查询
+    - 症状相似概念查询
+    - 药物相似概念查询""",
+          response_description="""返回标准响应格式,包含相似概念列表。
+    格式为:
+    {
+        "success": bool,
+        "data": {
+            "similar_concepts": [
+                {"id": str, "name": str, "similarity": str}
+            ]
+        }
+    }""",
+          )
+async def get_similar_concepts(
+        request: SimilarConceptsRequest
+):
+    try:
+        # 1. 先根据ID获取节点名称
+        search = SearchBusiness()
+        results = search.get_similar_concepts(id=request.concept_id, top_k=request.top_k)
+
+        return StandardResponse(success=True, data=results)
+    except Exception as e:
+        logger.error(f"获取相似概念失败: {str(e)}")
+        raise HTTPException(500, detail=StandardResponse.error(str(e)))
+
+
+# 4. 临床辅助决策接口
+class DiagnosisCheckRequest(BaseModel):
+    diagnosis: str
+    symptoms: List[str]
+    lab_results: Optional[Dict[str, Any]] = None
+
+
+class NextQuestionsRequest(BaseModel):
+    current_input: Dict[str, Any]
+
+
+class TriageRequest(BaseModel):
+    symptoms: List[str]
+    vital_signs: Dict[str, Any]
+
+
+class CodingRequest(BaseModel):
+    clinical_term: str
+
+
+class DepartmentRequest(BaseModel):
+    diagnosis: str
+    symptoms: Optional[List[str]] = None
+
+
+# @app.post("/medical/check_diagnosis", response_model=StandardResponse)
+# async def check_diagnosis_consistency(
+#         request: DiagnosisCheckRequest,
+#         db: Session = Depends(get_db)
+# ):
+#     try:
+#         # 实现诊断一致性检查的逻辑
+#         return StandardResponse(success=True, data=[])
+#     except Exception as e:
+#         logger.error(f"诊断一致性检查失败: {str(e)}")
+#         raise HTTPException(500, detail=StandardResponse.error(str(e)))
+#
+#
+# @app.post("/medical/suggest_questions", response_model=StandardResponse)
+# async def suggest_next_questions(
+#         request: NextQuestionsRequest,
+#         db: Session = Depends(get_db)
+# ):
+#     try:
+#         # 实现建议下一步问题的逻辑
+#         return StandardResponse(success=True, data=[])
+#     except Exception as e:
+#         logger.error(f"建议下一步问题失败: {str(e)}")
+#         raise HTTPException(500, detail=StandardResponse.error(str(e)))
+#
+#
+# @app.post("/medical/triage_guidance", response_model=StandardResponse)
+# async def get_triage_guidance(
+#         request: TriageRequest,
+#         db: Session = Depends(get_db)
+# ):
+#     try:
+#         # 实现分诊建议的逻辑
+#         return StandardResponse(success=True, data=[])
+#     except Exception as e:
+#         logger.error(f"获取分诊建议失败: {str(e)}")
+#         raise HTTPException(500, detail=StandardResponse.error(str(e)))
+#
+#
+# @app.post("/medical/coding_recommendation", response_model=StandardResponse)
+# async def get_coding_recommendation(
+#         request: CodingRequest,
+#         db: Session = Depends(get_db)
+# ):
+#     try:
+#         # 实现编码推荐的逻辑
+#         return StandardResponse(success=True, data=[])
+#     except Exception as e:
+#         logger.error(f"获取编码推荐失败: {str(e)}")
+#         raise HTTPException(500, detail=StandardResponse.error(str(e)))
+#
+#
+# @app.post("/medical/suggest_department", response_model=StandardResponse)
+# async def suggest_appropriate_department(
+#         request: DepartmentRequest,
+#         db: Session = Depends(get_db)
+# ):
+#     try:
+#         # 实现科室推荐的逻辑
+#         return StandardResponse(success=True, data=[])
+#     except Exception as e:
+#         logger.error(f"科室推荐失败: {str(e)}")
+#         raise HTTPException(500, detail=StandardResponse.error(str(e)))
+#
+#
+# # 5. 病历质控接口
+# class MedicalRecordRequest(BaseModel):
+#     medical_record: Dict[str, Any]
+#     validate_type: str
+#
+#
+# @app.post("/medical/validate_record", response_model=StandardResponse)
+# async def validate_medical_record(
+#         request: MedicalRecordRequest,
+#         db: Session = Depends(get_db)
+# ):
+#     try:
+#         # 实现病历验证的逻辑
+#         return StandardResponse(success=True, data=[])
+#     except Exception as e:
+#         logger.error(f"病历验证失败: {str(e)}")
+#         raise HTTPException(500, detail=StandardResponse.error(str(e)))
+
+
+mcp = FastApiMCP(app)
+mcp.mount()
+mcp.setup_server()
 
 def main():
     logger.info(f"project run {base_setting.server_host}:{base_setting.server_port}")

+ 2 - 2
src/knowledge/router/medical_knowledge_api.py

@@ -11,7 +11,7 @@ import logging
 
 from ..service.search_service import SearchBusiness
 
-router = APIRouter(prefix="/medical", tags=["Medical Knowledge API"])
+#router = APIRouter(prefix="/medical", tags=["Medical Knowledge API"])
 logger = logging.getLogger(__name__)
 
 # 1. 疾病与症状相关接口
@@ -372,4 +372,4 @@ async def validate_medical_record(
         logger.error(f"病历验证失败: {str(e)}")
         raise HTTPException(500, detail=StandardResponse.error(str(e)))
 
-medical_knowledge_router=router
+#medical_knowledge_router=router