123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- import logging
- import sys,os
- import traceback
- from agent.cdss.capbility import CDSSCapability
- from agent.cdss.models.schemas import CDSSInput, CDSSInt, CDSSText
- from model.response import StandardResponse
- from service.cdss_service import CdssService
- from service.kg_node_service import KGNodeService
- from utils import DeepseekUtil
- current_path = os.getcwd()
- sys.path.append(current_path)
- import time
- from fastapi import APIRouter, Depends, Query
- from typing import Optional, List
- import sys
- sys.path.append('..')
- from utils.agent import call_chat_api,get_conversation_id
- import json
- router = APIRouter(tags=["Knowledge Graph"])
- service = CdssService()
- logger = logging.getLogger(__name__)
- @router.post("/knowledge/disease/recommend", response_model=StandardResponse)
- async def recommend(payload: dict):
- try:
- start_time = time.time()
- app_id = "256fd853-60b0-4357-b11b-8114b4e90ae0"
- conversation_id = get_conversation_id(app_id)
- desc = "主诉:" + payload["chief"]
- if "symptom" in payload:
- desc += "\n现病史:" + payload["symptom"]
- prompt = '''## 核心能力
- 1. 医学语义理解:
- - 精准解析临床表现描述
- - 将口语化表达转换为ICD-11标准术语
- - 完整提取症状特征(性质/部位/放射等),保持症状描述的临床完整性
- 2. 专业能力:
- - 精通ICD-11症状学术语体系
- - 识别症状关联性
- - 具备临床鉴别诊断思维
- ## 输出要求
- - 仅输出JSON格式结果,禁止添加解释性文字
- - 非症状不要抽取
- - 症状术语必须标准化
- - 尽量保留原始症状的临床特征(如部位、性质、放射等)
- - 抽取的症状应该简洁明了,尽量保持在5个字符以内,最多不宜超过7个字符。如果超过可以分多个症状词进行抽取。
- ## 处理流程
- 1. 接收患者主诉文本
- 2. 识别并提取所有症状描述
- 3. 转换为ICD-11标准术语
- 4. 结构化输出症状列表
- 示例1:
- 输入:突然感觉胸口压榨样疼痛,持续不缓解,向左肩和下颌放射,伴大汗、恶心,已经30分钟了。
- 输出: { "symptoms": ["胸痛", "左肩放射痛", "下颌放射痛","大汗","恶心"] }
- 本次用户输入:
-
- '''
- result = DeepseekUtil.chat(prompt+desc)
- json_data = json.loads(result)
- keyword = " ".join(json_data["symptoms"])
- sex = str(payload["sex"]) if "sex" in payload else None
- age = int(payload["age"]) if "age" in payload else None
- department = payload["dept"][0]["name"] if "dept" in payload and len(payload["dept"]) > 0 else None
- result = await neighbor_search(keyword=keyword, sex=sex, age=age, department=department)
- end_time = time.time()
- print(f"recommend执行完成,耗时:{end_time - start_time:.2f}秒")
- return result
- except Exception as e:
- traceback.print_exc()
- logger.error(f"recommend failed: {str(e)}")
- return StandardResponse(
- success=False,
- errorCode=500,
- errorMsg=str(e)
- )
- async def recommend2(
- chief: str,
- present_illness: Optional[str] = None,
- sex: Optional[str] = None,
- age: Optional[int] = None,
- department: Optional[str] = None,
- ):
- start_time = time.time()
- app_id = "256fd853-60b0-4357-b11b-8114b4e90ae0"
- conversation_id = get_conversation_id(app_id)
- desc = "主诉:"+chief
- if present_illness:
- desc+="\n现病史:" + present_illness
- result = call_chat_api(app_id, conversation_id, desc)
- json_data = json.loads(result)
- keyword = " ".join(json_data["symptoms"])
- result = await neighbor_search(keyword=keyword,sex=sex,age=age,department=department)
- end_time = time.time()
- print(f"recommend执行完成,耗时:{end_time - start_time:.2f}秒")
- return result
- @router.get("/neighbor_search", response_model=StandardResponse)
- async def neighbor_search(
- keyword: str = Query(..., min_length=2),
- sex: Optional[str] = None,
- age: Optional[int] = None,
- department: Optional[str] = None
- ):
- """
- 根据关键词和属性过滤条件搜索图谱节点
- """
- try:
- keywords = keyword.split(" ")
- record = CDSSInput(
- pat_age=CDSSInt(type="month", value=age),
- pat_sex=CDSSText(type="sex", value=sex),
- chief_complaint=keywords,
- department=CDSSText(type='department', value=department)
- )
- # 使用从main.py导入的capability实例处理CDSS逻辑
- output = capability.process(input=record)
- output.diagnosis.value = [{"name":key,"count":value["count"],"score":value["score"],"symptoms":value["symptoms"]
- } for key,value in output.diagnosis.value.items()]
- result = {}
- if len(output.diagnosis.value)>0:
- result = service.get_disease_detail(output.diagnosis.value[0]['name'], '疾病')
- return StandardResponse(
- success=True,
- data={"可能诊断":output.diagnosis.value,"症状":keywords,"disease_detail":result}
- )
- except Exception as e:
- traceback.print_exc()
- logger.error(f"get_disease_detail failed: {str(e)}")
- return StandardResponse(
- success=False,
- errorCode=500,
- errorMsg=str(e)
- )
- @router.get("/knowledge/disease/{disease_name}/detail", response_model=StandardResponse)
- async def get_disease_detail(
- disease_name: str
- ):
- try:
- result = service.get_disease_detail(disease_name,'疾病')
- return StandardResponse(success=True, data=result)
- except Exception as e:
- traceback.print_exc()
- logger.error(f"get_disease_detail failed: {str(e)}")
- return StandardResponse(
- success=False,
- errorCode=500,
- errorMsg=str(e)
- )
- capability = CDSSCapability()
- graph_router = router
|