浏览代码

代码提交

SGTY 3 天之前
父节点
当前提交
498dec22e1
共有 2 个文件被更改,包括 194 次插入63 次删除
  1. 49 2
      src/knowledge/main.py
  2. 145 61
      src/knowledge/service/search_service.py

+ 49 - 2
src/knowledge/main.py

@@ -809,16 +809,63 @@ async def suggest_appropriate_department(
 #     validate_type: str
 #     validate_type: str
 #
 #
 #
 #
-@app.post("/medical/validate_record", response_model=StandardResponse)
+@app.post("/medical/validate_record", response_model=StandardResponse, operation_id="validateMedicalRecord",
+          summary="验证病历内容的逻辑一致性",
+          description="""根据输入的病历内容,验证其逻辑一致性并返回验证结果。
+
+该接口主要用于医疗病历质控场景,例如:
+- 验证诊断与症状的相关性
+- 检查病历中的逻辑矛盾
+- 确保病历符合医疗规范
+
+典型应用场景:
+1. 病历自动质控:输入病历内容获取验证结果
+2. 临床决策支持:验证诊断与症状的匹配度
+3. 病历完整性检查:确保病历包含必要信息
+
+输入要求:
+- 病历内容应为标准医疗文本
+
+输出格式:
+- 返回标准JSON响应
+- 包含验证结果和错误列表""",
+          response_description="""返回标准响应格式,包含验证结果。
+
+成功响应示例:
+{
+    "success": true,
+    "data": {
+        "errors": [
+            "初步诊断'胆总管结石伴胆管炎'与主诉症状不相关",
+            "初步诊断'胆囊结石并胆囊炎'与主诉症状不相关"
+        ]
+    }
+}
+
+错误响应示例:
+{
+    "success": false,
+    "error": "Invalid medical content"
+}""")
 async def validate_medical_record(
 async def validate_medical_record(
         medical_content: str = Query(...,
         medical_content: str = Query(...,
-                                    description="""病历信息""")
+                                    description="""病历内容文本,应为标准医疗术语。
+
+该参数用于验证病历的逻辑一致性。
+必须使用标准医学术语。
+示例值:"患者主诉头痛3天,初步诊断:高血压病""",
+                                    min_length=10)
 ):
 ):
     try:
     try:
         # 调用search_service中的验证方法
         # 调用search_service中的验证方法
         search = SearchBusiness()
         search = SearchBusiness()
         results = search.validate_medical_record(medical_content)
         results = search.validate_medical_record(medical_content)
+        #只返回errors字段
+        results = results["errors"]
         return StandardResponse(success=True, data=results)
         return StandardResponse(success=True, data=results)
+    except ValueError as e:
+        logger.warning(f"无效的病历内容: {medical_content}")
+        raise HTTPException(400, detail=StandardResponse.error(str(e)))
     except Exception as e:
     except Exception as e:
         logger.error(f"病历验证失败: {str(e)}")
         logger.error(f"病历验证失败: {str(e)}")
         raise HTTPException(500, detail=StandardResponse.error(str(e)))
         raise HTTPException(500, detail=StandardResponse.error(str(e)))

+ 145 - 61
src/knowledge/service/search_service.py

@@ -1,5 +1,8 @@
 import re
 import re
 import sys,os
 import sys,os
+
+from src.knowledge.utils.MedicalRecordParser import MedicalRecordParser
+
 current_path = os.getcwd()
 current_path = os.getcwd()
 sys.path.append(current_path)
 sys.path.append(current_path)
 import logging
 import logging
@@ -7,6 +10,7 @@ from py_tools.logging import logger
 import json
 import json
 from ..config.site import SiteConfig
 from ..config.site import SiteConfig
 from elasticsearch import Elasticsearch, helpers, exceptions
 from elasticsearch import Elasticsearch, helpers, exceptions
+import re
 config = SiteConfig()
 config = SiteConfig()
 ELASTICSEARCH_USER = config.get_config("ELASTICSEARCH_USER", "/tmp")
 ELASTICSEARCH_USER = config.get_config("ELASTICSEARCH_USER", "/tmp")
 ELASTICSEARCH_PWD = config.get_config("ELASTICSEARCH_PWD", "/tmp")
 ELASTICSEARCH_PWD = config.get_config("ELASTICSEARCH_PWD", "/tmp")
@@ -274,25 +278,25 @@ class SearchBusiness:
         nodes = self.search_nodes(name=name, type=type)
         nodes = self.search_nodes(name=name, type=type)
         if not nodes:
         if not nodes:
             return {}
             return {}
-            
+
         result = {}
         result = {}
         for node in nodes:
         for node in nodes:
             node_name = node.get("public_kg_nodes_name", "")
             node_name = node.get("public_kg_nodes_name", "")
             if not node_name:
             if not node_name:
                 continue
                 continue
-                
+
             # 使用node id作为src_id查询edges关系表
             # 使用node id作为src_id查询edges关系表
             edges = self.search_edges(src_id=node["public_kg_nodes_id"])
             edges = self.search_edges(src_id=node["public_kg_nodes_id"])
             if not edges:
             if not edges:
                 result[node_name] = {"relations": []}
                 result[node_name] = {"relations": []}
                 continue
                 continue
-                
+
             # 收集所有目标节点ID
             # 收集所有目标节点ID
             dest_ids = [edge["public_kg_edges_dest_id"] for edge in edges]
             dest_ids = [edge["public_kg_edges_dest_id"] for edge in edges]
             # 查询目标节点信息
             # 查询目标节点信息
             dest_nodes = self.search_nodes(ids=dest_ids)
             dest_nodes = self.search_nodes(ids=dest_ids)
             node_map = {n["public_kg_nodes_id"]: n for n in dest_nodes}
             node_map = {n["public_kg_nodes_id"]: n for n in dest_nodes}
-            
+
             # 构建关系列表
             # 构建关系列表
             relations = []
             relations = []
             for edge in edges:
             for edge in edges:
@@ -302,9 +306,9 @@ class SearchBusiness:
                         "relation_type": edge["public_kg_edges_category"],
                         "relation_type": edge["public_kg_edges_category"],
                         "target_name": node_map[dest_id].get("public_kg_nodes_name", "")
                         "target_name": node_map[dest_id].get("public_kg_nodes_name", "")
                     })
                     })
-            
+
             result[node_name] = {"relations": relations}
             result[node_name] = {"relations": relations}
-        
+
         return result
         return result
 
 
     def get_similar_concepts(self, name, top_k=5):
     def get_similar_concepts(self, name, top_k=5):
@@ -318,25 +322,25 @@ class SearchBusiness:
         ]
         ]
 
 
         return {"similar_concepts": similar_concepts}
         return {"similar_concepts": similar_concepts}
-     
+
     def get_drug_indications(self, drug_name):
     def get_drug_indications(self, drug_name):
         drugs = self.search_nodes(name=drug_name, type="药品")
         drugs = self.search_nodes(name=drug_name, type="药品")
         if not drugs:
         if not drugs:
             return []
             return []
 
 
         drug_map = {}
         drug_map = {}
-        
+
         for drug in drugs:
         for drug in drugs:
             drug_name = drug.get("public_kg_nodes_name", "")
             drug_name = drug.get("public_kg_nodes_name", "")
-            
+
             if drug_name not in drug_map:
             if drug_name not in drug_map:
                 drug_map[drug_name] = {
                 drug_map[drug_name] = {
                     "drug": drug_name,
                     "drug": drug_name,
                     "indications": []
                     "indications": []
                 }
                 }
-            
+
             drug_info = drug_map[drug_name]
             drug_info = drug_map[drug_name]
-            
+
             # 查询药物适应症关系
             # 查询药物适应症关系
             edges = self.search_edges(name="适应症", src_id=drug["public_kg_nodes_id"])
             edges = self.search_edges(name="适应症", src_id=drug["public_kg_nodes_id"])
             logger.info(f"Get indications by drug id: {drug['public_kg_nodes_id']}, result: {edges}")
             logger.info(f"Get indications by drug id: {drug['public_kg_nodes_id']}, result: {edges}")
@@ -359,7 +363,7 @@ class SearchBusiness:
                     indication_name = indication["public_kg_nodes_name"]
                     indication_name = indication["public_kg_nodes_name"]
                     if indication_name not in drug_info["indications"]:
                     if indication_name not in drug_info["indications"]:
                         drug_info["indications"].append(indication_name)
                         drug_info["indications"].append(indication_name)
-        
+
         # 过滤掉适应症列表为空的药物
         # 过滤掉适应症列表为空的药物
         return [d for d in drug_map.values() if d["indications"]]
         return [d for d in drug_map.values() if d["indications"]]
 
 
@@ -374,48 +378,94 @@ class SearchBusiness:
             diseases = self.search_nodes(name=disease_name, type="疾病")
             diseases = self.search_nodes(name=disease_name, type="疾病")
             if not diseases:
             if not diseases:
                 return []
                 return []
-                
+
             result = []
             result = []
-            
+
             for disease in diseases:
             for disease in diseases:
                 disease_info = {
                 disease_info = {
                     "disease_name": disease.get("public_kg_nodes_name", ""),
                     "disease_name": disease.get("public_kg_nodes_name", ""),
                     "departments": []
                     "departments": []
                 }
                 }
-                
+
                 # 2. 查询每个疾病的科室关系
                 # 2. 查询每个疾病的科室关系
                 edges = self.search_edges(name="所属科室", src_id=disease["public_kg_nodes_id"])
                 edges = self.search_edges(name="所属科室", src_id=disease["public_kg_nodes_id"])
                 if not edges:
                 if not edges:
                     continue
                     continue
-                    
+
                 # 3. 收集科室节点ID
                 # 3. 收集科室节点ID
-                department_ids = [edge["public_kg_edges_dest_id"] for edge in edges 
+                department_ids = [edge["public_kg_edges_dest_id"] for edge in edges
                                 if "public_kg_edges_dest_id" in edge]
                                 if "public_kg_edges_dest_id" in edge]
                 if not department_ids:
                 if not department_ids:
                     continue
                     continue
-                    
+
                 # 4. 查询科室节点信息
                 # 4. 查询科室节点信息
                 departments = self.search_nodes(ids=department_ids)
                 departments = self.search_nodes(ids=department_ids)
                 if not departments:
                 if not departments:
                     continue
                     continue
-                    
+
                 # 5. 收集科室名称并去重
                 # 5. 收集科室名称并去重
                 department_set = set()
                 department_set = set()
                 for dept in departments:
                 for dept in departments:
                     if "public_kg_nodes_name" in dept:
                     if "public_kg_nodes_name" in dept:
                         department_set.add(dept["public_kg_nodes_name"])
                         department_set.add(dept["public_kg_nodes_name"])
-                
+
                 if department_set:
                 if department_set:
                     disease_info["departments"] = list(department_set)
                     disease_info["departments"] = list(department_set)
                     result.append(disease_info)
                     result.append(disease_info)
-            
+
             return result
             return result
-            
+
         except Exception as e:
         except Exception as e:
             logger.error(f"科室推荐失败: {str(e)}")
             logger.error(f"科室推荐失败: {str(e)}")
             return []
             return []
 
 
     def validate_medical_record(self, medical_record: str, validate_type: str) -> dict:
     def validate_medical_record(self, medical_record: str, validate_type: str) -> dict:
+
+    def _validate_chief_complaint(self, structured_data: dict) -> list:
+        """
+        验证主诉字段
+        :param chief_complaint: 主诉字典
+        :return: 错误列表
+        """
+        chief_complaint = structured_data.get("主诉", {})
+        errors = []
+
+        # 检查主要症状是否存在,主要症状是个列表,长度要大于0
+        if not chief_complaint.get("主要症状") or not isinstance(chief_complaint["主要症状"], list) or len(chief_complaint["主要症状"]) == 0:
+            errors.append("主诉中缺少主要症状或主要症状列表为空")
+
+        # 检查持续时间是否存在
+        if not chief_complaint.get("持续时间"):
+            errors.append("主诉中缺少持续时间")
+        #使用check_time函数检查持续时间是否准确
+        if chief_complaint.get("持续时间"):
+            is_accurate, error_msg = check_time(chief_complaint["持续时间"])
+            if not is_accurate:
+                errors.append(f"持续时间'{chief_complaint['持续时间']}'{error_msg}")
+        return errors
+
+    def _validate_diagnosis_info(self, structured_data: dict) -> list:
+        """
+        验证诊断信息
+        :param structured_data: 结构化病历数据
+        :return: 错误列表
+        """
+        errors = []
+        diagnosis_info = structured_data.get("诊断信息", {})
+
+        if diagnosis_info and diagnosis_info.get("初步诊断"):
+            chief_complaint = structured_data.get("主诉", {})
+            if chief_complaint and chief_complaint.get("主要症状"):
+                related_diseases = self.get_symptom_diseases(chief_complaint["主要症状"])
+                related_disease_names = [d["disease_name"] for d in related_diseases]
+
+                for diagnosis in diagnosis_info["初步诊断"]:
+                    if diagnosis not in related_disease_names:
+                        errors.append(f"初步诊断'{diagnosis}'与主诉症状不相关")
+
+        return errors
+
+    def validate_medical_record(self, medical_record: str, validate_type: str=None) -> dict:
         """
         """
         验证病历数据
         验证病历数据
         :param medical_record: 病历数据字典
         :param medical_record: 病历数据字典
@@ -423,30 +473,27 @@ class SearchBusiness:
         :return: 验证结果字典
         :return: 验证结果字典
         """
         """
         try:
         try:
-            # 基本验证逻辑
-            if not medical_record:
-                return {"valid": False, "errors": ["病历数据为空"]}
-                
-            # 根据验证类型进行不同验证
-            if validate_type == "completeness":
-                # 检查病历完整性
-                required_fields = ["patient_name", "diagnosis", "treatment"]
-                missing_fields = [field for field in required_fields if field not in medical_record]
-                if missing_fields:
-                    return {"valid": False, "errors": [f"缺少必填字段: {', '.join(missing_fields)}"]}
-                
-            elif validate_type == "consistency":
-                # 检查病历一致性
-                if "diagnosis" in medical_record and "symptoms" in medical_record:
-                    # 这里可以添加更复杂的诊断与症状一致性检查逻辑
-                    pass
+            #将病历数据结构化
+            parser = MedicalRecordParser()
+            structured_data = parser.parse_medical_record(medical_record)
+            #json打印structured_data
+            logger.info(f"结构化数据: {json.dumps(structured_data, ensure_ascii=False, indent=4)}")
+
+            # 验证主诉字段
+            errors = self._validate_chief_complaint(structured_data)
+
+            # 验证诊断信息
+            errors.extend(self._validate_diagnosis_info(structured_data))
+
+            if errors:
+                return {"valid": False, "errors": errors}
                 
                 
             return {"valid": True, "errors": []}
             return {"valid": True, "errors": []}
             
             
         except Exception as e:
         except Exception as e:
             logger.error(f"病历验证失败: {str(e)}")
             logger.error(f"病历验证失败: {str(e)}")
             return {"valid": False, "errors": [f"验证过程中发生错误: {str(e)}"]}
             return {"valid": False, "errors": [f"验证过程中发生错误: {str(e)}"]}
-            
+
     def get_lab_examinations(self, disease_name: str) -> list:
     def get_lab_examinations(self, disease_name: str) -> list:
         """
         """
         根据疾病名称获取相关实验室检查项目
         根据疾病名称获取相关实验室检查项目
@@ -458,47 +505,47 @@ class SearchBusiness:
             diseases = self.search_nodes(name=disease_name, type="疾病")
             diseases = self.search_nodes(name=disease_name, type="疾病")
             if not diseases:
             if not diseases:
                 return []
                 return []
-                
+
             result = []
             result = []
-            
+
             for disease in diseases:
             for disease in diseases:
                 disease_info = {
                 disease_info = {
                     "disease_name": disease.get("public_kg_nodes_name", ""),
                     "disease_name": disease.get("public_kg_nodes_name", ""),
                     "lab_examinations": []
                     "lab_examinations": []
                 }
                 }
-                
+
                 # 2. 查询每个疾病的实验室检查关系
                 # 2. 查询每个疾病的实验室检查关系
                 edges = self.search_edges(name="实验室检查", src_id=disease["public_kg_nodes_id"])
                 edges = self.search_edges(name="实验室检查", src_id=disease["public_kg_nodes_id"])
                 if not edges:
                 if not edges:
                     continue
                     continue
-                    
+
                 # 3. 收集实验室检查节点ID
                 # 3. 收集实验室检查节点ID
-                lab_exam_ids = [edge["public_kg_edges_dest_id"] for edge in edges 
+                lab_exam_ids = [edge["public_kg_edges_dest_id"] for edge in edges
                                if "public_kg_edges_dest_id" in edge]
                                if "public_kg_edges_dest_id" in edge]
                 if not lab_exam_ids:
                 if not lab_exam_ids:
                     continue
                     continue
-                    
+
                 # 4. 查询实验室检查节点信息
                 # 4. 查询实验室检查节点信息
                 lab_exams = self.search_nodes(ids=lab_exam_ids)
                 lab_exams = self.search_nodes(ids=lab_exam_ids)
                 if not lab_exams:
                 if not lab_exams:
                     continue
                     continue
-                    
+
                 # 5. 收集实验室检查名称并去重
                 # 5. 收集实验室检查名称并去重
                 lab_exam_set = set()
                 lab_exam_set = set()
                 for exam in lab_exams:
                 for exam in lab_exams:
                     if "public_kg_nodes_name" in exam:
                     if "public_kg_nodes_name" in exam:
                         lab_exam_set.add(exam["public_kg_nodes_name"])
                         lab_exam_set.add(exam["public_kg_nodes_name"])
-                
+
                 if lab_exam_set:
                 if lab_exam_set:
                     disease_info["lab_examinations"] = list(lab_exam_set)
                     disease_info["lab_examinations"] = list(lab_exam_set)
                     result.append(disease_info)
                     result.append(disease_info)
-            
+
             return result
             return result
-            
+
         except Exception as e:
         except Exception as e:
             logger.error(f"实验室检查查询失败: {str(e)}")
             logger.error(f"实验室检查查询失败: {str(e)}")
             return []
             return []
-            
+
     def get_auxiliary_examinations(self, disease_name: str) -> list:
     def get_auxiliary_examinations(self, disease_name: str) -> list:
         """
         """
         根据疾病名称获取相关辅助检查项目
         根据疾病名称获取相关辅助检查项目
@@ -510,48 +557,85 @@ class SearchBusiness:
             diseases = self.search_nodes(name=disease_name, type="疾病")
             diseases = self.search_nodes(name=disease_name, type="疾病")
             if not diseases:
             if not diseases:
                 return []
                 return []
-                
+
             result = []
             result = []
-            
+
             for disease in diseases:
             for disease in diseases:
                 disease_info = {
                 disease_info = {
                     "disease_name": disease.get("public_kg_nodes_name", ""),
                     "disease_name": disease.get("public_kg_nodes_name", ""),
                     "auxiliary_examinations": []
                     "auxiliary_examinations": []
                 }
                 }
-                
+
                 # 2. 查询每个疾病的辅助检查关系
                 # 2. 查询每个疾病的辅助检查关系
                 edges = self.search_edges(name="辅助检查", src_id=disease["public_kg_nodes_id"])
                 edges = self.search_edges(name="辅助检查", src_id=disease["public_kg_nodes_id"])
                 if not edges:
                 if not edges:
                     continue
                     continue
-                    
+
                 # 3. 收集辅助检查节点ID
                 # 3. 收集辅助检查节点ID
-                aux_exam_ids = [edge["public_kg_edges_dest_id"] for edge in edges 
+                aux_exam_ids = [edge["public_kg_edges_dest_id"] for edge in edges
                                if "public_kg_edges_dest_id" in edge]
                                if "public_kg_edges_dest_id" in edge]
                 if not aux_exam_ids:
                 if not aux_exam_ids:
                     continue
                     continue
-                    
+
                 # 4. 查询辅助检查节点信息
                 # 4. 查询辅助检查节点信息
                 aux_exams = self.search_nodes(ids=aux_exam_ids)
                 aux_exams = self.search_nodes(ids=aux_exam_ids)
                 if not aux_exams:
                 if not aux_exams:
                     continue
                     continue
-                    
+
                 # 5. 收集辅助检查名称并去重
                 # 5. 收集辅助检查名称并去重
                 aux_exam_set = set()
                 aux_exam_set = set()
                 for exam in aux_exams:
                 for exam in aux_exams:
                     if "public_kg_nodes_name" in exam:
                     if "public_kg_nodes_name" in exam:
                         aux_exam_set.add(exam["public_kg_nodes_name"])
                         aux_exam_set.add(exam["public_kg_nodes_name"])
-                
+
                 if aux_exam_set:
                 if aux_exam_set:
                     disease_info["auxiliary_examinations"] = list(aux_exam_set)
                     disease_info["auxiliary_examinations"] = list(aux_exam_set)
                     result.append(disease_info)
                     result.append(disease_info)
-            
+
             return result
             return result
-            
+
         except Exception as e:
         except Exception as e:
             logger.error(f"辅助检查查询失败: {str(e)}")
             logger.error(f"辅助检查查询失败: {str(e)}")
             return []
             return []
 
 
 
 
+
+def check_time(time_str: str) :
+    """
+    检查主诉中的时间描述是否准确
+    :param time_str: 主诉文本
+    :return: (是否合理, 错误原因)
+    """
+    # 定义可接受的时间单位
+    time_units = ["小时", "天", "日", "周", "月", "年"]
+
+    # 正则匹配时间描述(如"1天"、"2 小时"、"约3天")
+    time_pattern = re.compile(
+        r"(\d+)\s*([小时天日周月年]+)|(约|大约|近|接近)?\s*(\d+)\s*([小时天日周月年]+)"
+    )
+
+    match = time_pattern.search(time_str)
+    if not match:
+        return (False, "未找到明确的时间描述")
+
+    # 提取数值和单位
+    num = int(match.group(1)) if match.group(1) else int(match.group(4))
+    unit = match.group(2) if match.group(2) else match.group(5)
+
+    # 检查单位是否合法
+    if unit not in time_units:
+        return (False, f"时间单位'{unit}'不规范,应为{time_units}")
+
+    # 检查数值是否合理
+    if num <= 0:
+        return (False, f"时间数值'{num}'不能为0或负数")
+    if unit in ["小时"] and num > 720:  # 假设超过30天(720小时)不合理
+        return (False, f"时间数值'{num}{unit}'可能过大")
+    if unit in ["天", "日"] and num > 365:  # 超过1年应改用"年"
+        return (False, f"时间数值'{num}{unit}'建议改用年描述")
+
+    return (True, f"时间描述'{num}{unit}'合理")
+
 if __name__ == "__main__":
 if __name__ == "__main__":
     search_biz = SearchBusiness()
     search_biz = SearchBusiness()
     index=""
     index=""