import regex from pathlib import Path import pytest from service.kg_edge_service import KGEdgeService from service.kg_node_service import KGNodeService from service.kg_prop_service import KGPropService from service.trunks_service import TrunksService from model.trunks_model import Trunks from sqlalchemy.exc import IntegrityError from utils import DeepseekUtil from db.session import get_db @pytest.fixture(scope="module") def trunks_service(): return TrunksService() @pytest.fixture def test_trunk_data(): return { "content": """测试""", "file_path": "test_path.pdf", "type": "default" } class TestTrunksServiceCRUD: def test_create_and_get_trunk(self, trunks_service, test_trunk_data): # 测试创建和查询 created = trunks_service.create_trunk(test_trunk_data) assert created.id is not None def test_update_trunk(self, trunks_service, test_trunk_data): trunk = trunks_service.create_trunk(test_trunk_data) updated = trunks_service.update_trunk(trunk.id, {"content": "更新内容"}) assert updated.content == "更新内容" def test_delete_trunk(self, trunks_service, test_trunk_data): trunk = trunks_service.create_trunk(test_trunk_data) assert trunks_service.delete_trunk(trunk.id) assert trunks_service.get_trunk_by_id(trunk.id) is None class TestSearchOperations: def test_vector_search2(self, trunks_service): page = 1 limit = 100 file_path = '急诊医学(第2版' while True: results = trunks_service.paginated_search_by_type_and_filepath( {'pageNo': page, 'limit': limit, 'type': 'trunk', 'file_path': file_path}) if not results['data']: break for record in results['data']: print(f"{record['id']}{record['type']}{record['title']}{record['file_path']}") if record['type'] != 'trunk' or file_path not in record['file_path']: print('出现异常数据') break page_no = self.get_page_no(record['content'], trunks_service, file_path) if page_no is None: print(f"{record['id']}找到page_no: {page_no}") continue trunks_service.update_trunk(record['id'], {'page_no': page_no}) page += 1 def test_vector_search(self, trunks_service): page = 1 limit = 100 file_path='trunk2' while True: results = trunks_service.paginated_search_by_type_and_filepath({'pageNo': page, 'limit': limit, 'type': 'trunk', 'file_path': file_path}) if not results['data']: break for record in results['data']: print(f"{record['id']}{record['type']}{record['title']}{record['file_path']}") if record['type'] != 'trunk' or file_path not in record['file_path']: print('出现异常数据') break page_no = self.get_page_no(record['content'],trunks_service,file_path) if page_no is None: print(f"{record['id']}找到page_no: {page_no}") continue trunks_service.update_trunk(record['id'], {'page_no': page_no}) page += 1 def test_trunk_search(self, trunks_service): page = 1 limit = 100 while True: results = trunks_service.paginated_search_by_type_and_filepath({'pageNo': page, 'limit': limit, 'type': 'trunk'}) if not results['data']: break for record in results['data']: prompt = ''' ##角色任务 你是一个医学专家,且精通中文文本的实体和关系标注(自然语言处理),用户将提供两个医学文本:“前置医学文本”和“任务医学文本”,你的任务是从用户输入的“任务医学文本”中抽取所有的“关系型”知识要点,构建符合「头部实体---关系---尾部实体」的三元组集合。 ##请严格依次执行以下步骤及其细节要求: 第一步,通读“前置医学文本”和“任务医学文本”,对“任务医学文本”形成一个整体的理解,搞明白其围绕的主题是什么?主要讲的是什么?主要说了哪几块或几点内容?。此处“前置医学文本”是“任务医学文本”的上文语义背景,其有助于你对“任务医学文本”语义的理解,且在你提取三元组时,其可能作为参考或辅助信息。 第二步【实体识别】: 规则1:可参考但不限于如下医学实体类型:疾病、药品、药品剂型、症状、体格检查项目、体格检查结果、手术和操作、实验室检查套餐、实验室检查子项目、辅助检查项目、辅助检查子项目、辅助检查描述、辅助检查结果、输血类型、麻醉、科室、性别、人群、食物、其他过敏原、医疗器械及物品、给药途径、部位、护理、量表、单位、ICD10疾病类别、药品化学物质类别、药品治疗学类别、药品药理学类别、药品解剖学类别、症状类别、手术和操作类别、ICD10疾病类别根节点、科室疾病类别根节点、药品化学物质类别根节点、药品治疗学类别根节点、药品药理学类别根节点、药品解剖学类别根节点、症状类别根节点、手术和操作类别根节点、实验室检查类别根节点、辅助检查类别根节点、年龄、疾病系统分类、性质、中医疾病、中医证候、诱因、政策法规、否定词、疾病集合、药品通用名集合、药品剂型集合、症状集合、体格检查项目集合、体格检查结果集合、手术和操作集合、实验室检查套餐集合、辅助检查项目集合、辅助检查子项目集合、辅助检查描述集合、辅助检查结果集合、麻醉集合、科室集合、食物集合、其他过敏原集合、医疗器械及物品集合、部位集合、中医疾病集合、中医证候集合、诱因集合、给药途径集合、物理治疗、经典病例、历史病例检查、检查结果、手术操作、其他治疗、人群、人体结构或部位、医疗器械、食物、病理机制等。 规则2:复合实体要拆分:如“老年人和免疫功能低下者”不能作为一个实体,需拆分为:"老年人"、"免疫功能低下者"两个实体。 规则3:文字中省略了,但语义中暗含的文字内容要进行语义补全:如文本:“胸、腹部检查”中,这里的“胸”其实是指“胸部检查”,按语义补全后应该有两个实体:胸部检查、腹部检查。同理,文本:“注意有无心内膜炎、心肌炎、心包炎体征”中,应该有三个实体:内膜炎体征、心肌炎体征、心包炎体征;文本:“有无肝脏和脾脏肿大”中,应该有两个实体:肝脏肿大、脾脏肿大。 第三步【关系构建】: 规则4:可参考的“基础关系类型”如下(但不仅仅限于这些):属于(是)、包括(包含)、导致(的结果)、是由…导致(的结果)、的原因是、是…的原因、的病因是、是…的病因、基于(基础是)、是…的基础、推荐、被推荐于、区别于、相似于、关联、疾病常关联、疾病可推荐等等。 规则5:根据上下文整体语义分析并确定出两个实体之间的“基础关系类型”后,可能还不足以描述清楚两个实体间的“详细关系”或“精准的关系”,所以你要尽量构建字数更多的详细的“关系”,需要依据原文中真实的语义,在“基础关系”中增加“修饰词”、“限定条件词”等语义描述,避免关系构建太过粗糙,避免原文关系语义信息的衰减或丢失。 规则6:语义中属于该三元组关系的修饰词、限定条件词等,需要融入到该“关系”中(如:可能、少数、30%、多数、显著、轻微、手术后、治疗无效时、满足XX条件时等),如果归属于该“关系”,则需要组合到该“关系”中使得关系更丰满(如:“可能导致”、“少数由…导致”、“手术后导致”、“满足XX条件时,推荐”等等)。 规则7:“尾部实体”不能为复合实体或并排结构,必须合理的拆分为多个三元组。 规则8:强制质量检测:“头部实体---关系类型---尾部实体”中,三者必须能组合成语法通顺的一句话,不能有语病,且关系指向不能模糊或错误。如三元组1:“A---的原因是---B”中,组合成一句话“A的原因是B”没有语病,且关系明确:B是原因,A是B原因导致的结果;如三元组2:“A---是…的原因---B”中,组合成一句话“A是B的原因”没有语病,且关系明确:A是原因,B是A原因导致的结果。注意:不是所有的关系都是镜像对称的,很多关系是单向关系,请根据语义关系的方向选择正确的关系类型,否则语义会完全相反。 ##输出格式要求: 对于每个三元组关系,提取以下信息:头部实体的名称(source_entity)、尾部实体的名称(target_entity)、关系(relationship_type)、头部实体的类型(source_entity_type)、尾部实体的类型(target_entity_type),最后这些信息整理成如下格式的一个“字符串”:“头部实体的名称---关系---尾部实体的名称---头部实体的类型---尾部实体的类型”,不同的三元组“字符串”之间用换行符号连接起来。 ##质量红线 1.禁止出现四元组或嵌套结构 2.禁止合并多个“差异点”或“并列实体”到单个三元组 3.每个三元组必须保留原文核心逻辑 ##示例: “前置医学文本”为: 第一节,自发性气胸 “任务医学文本”为: "三、诊断要点 自发性气胸通过胸部 X 线片确立诊断,条件允许时,应选择直立位拍片。 1. 既往胸部 X 线检查无明显病变或有 COPD、肺结核、哮喘等肺部基础病变。 2. 突发一侧胸痛伴不同程度的胸闷、呼吸困难。患侧胸廓饱满、呼吸运动减弱,叩诊呈鼓音,肝、 肺浊音界消失,听诊呼吸音减弱,甚至消失。 3. 发病时胸部 X 线影像学检查是诊断气胸最为准确和可靠的方法。 典型自发性气胸诊断不难。继发性气胸病人可因原有基础疾病而影响诊断,因此,对临床不能用 其他原因解释或经急诊处理呼吸困难无改善者,需考虑自发性气胸的可能。因病情危重不能立即行 胸部 X 线检查时,可在胸腔积气体征最明显处进行诊断性穿刺。" 输出为: 自发性气胸---通过...确立诊断---胸部X线片---疾病---检查 自发性气胸---条件允许时推荐选择---直立位拍片---疾病---检查操作 自发性气胸---诊断要点需结合---既往胸部X线检查无明显病变---疾病---检查结果 自发性气胸---诊断要点需结合---或有肺部基础病变---疾病---检查结果 肺部基础病变---的原因包括---COPD---检查结果---疾病 肺部基础病变---的原因包括---肺结核---检查结果---疾病 肺部基础病变---的原因包括---哮喘---检查结果---疾病 自发性气胸---诊断要点需结合---突发一侧胸痛伴不同程度的胸闷 自发性气胸---诊断要点需结合---突发一侧胸痛伴不同程度的呼吸困难 突发一侧胸痛---伴发---胸闷---症状---症状 突发一侧胸痛---伴发---呼吸困难---症状---症状 自发性气胸---患侧表现---患侧胸廓饱满---疾病---体征结果 自发性气胸---患侧表现---呼吸运动减弱---疾病---体征结果 自发性气胸---患侧表现---叩诊呈鼓音---疾病---体征结果 自发性气胸---体征结果---肝脏浊音界消失---疾病---体征结果 自发性气胸---体征结果---肺浊音界消失---疾病---体征结果 自发性气胸---患侧表现---肺浊音界消失---疾病---体征结果 自发性气胸---患侧表现---听诊呼吸音减弱---疾病---体征结果 自发性气胸---患侧表现---听诊呼吸音消失---疾病---体征结果 胸部X线影像学检查---是诊断...最准确可靠方法---气胸---检查---疾病 继发性气胸病人---可因...被影响诊断---原有基础疾病---人群---疾病 临床不能用其他原因解释的呼吸困难者---需考虑可能为---自发性气胸---人群---疾病 急诊处理呼吸困难无改善者---需考虑可能为---自发性气胸---人群---疾病 病情危重不能立即行胸部X线检查者---推荐进行---诊断性穿刺---人群---检查操作 诊断性穿刺---实施部位---胸腔积气体征最明显处---检查操作---人体部位 用户输入的“前置医学文本”为: ''' prompt+=record['meta_header']+'\n任务文本为:\n' prompt = prompt + record['content'] llm_result = DeepseekUtil.chat(prompt) if not llm_result or not isinstance(llm_result, str): print(f"LLM返回结果无效: {type(llm_result)}") continue for line in llm_result.strip().split('\n'): try: if not line.strip(): continue # 验证行格式 if line.count('---') != 4: print(f"无效的三元组格式: {line}") continue # 解析结果 parts = line.strip().split('---') if len(parts) != 5: print(f"解析失败,部分数量不符: {len(parts)} parts in {line}") continue start_node_name, relation_name, end_node_name, start_category, end_category = parts # 创建或获取起始节点和结束节点 start_node_id = self._create_or_get_node(start_node_name, start_category) end_node_id = self._create_or_get_node(end_node_name, end_category) edgeService = KGEdgeService(next(get_db())) #edges = edgeService.get_edges_by_nodes(start_node_id, end_node_id, relation_name) #if len(edges) == 0: edge_data = {} edge_data['src_id'] = start_node_id edge_data['dest_id'] = end_node_id edge_data['name'] = relation_name edge_data['category'] = relation_name edge_data['version'] = 'unk' edge = edgeService.create_edge(edge_data) propService = KGPropService(next(get_db())) props_by_ref_id = propService.get_prop_by_ref_id(edge.id, 'trunk_ids') if props_by_ref_id: if record['id'] not in props_by_ref_id['trunk_ids']: props_by_ref_id['trunk_ids'].append(record['id']) propService.update_prop(edge.id, {'trunk_ids': props_by_ref_id['trunk_ids']}) continue prop_data = {} prop_data['ref_id'] = edge.id prop_data['category'] = 2 prop_data['type'] = 2 prop_data['prop_name'] = 'trunk_ids' prop_data['prop_value'] = [record['id']] prop_data['prop_title'] = '切片id列表' propService.create_prop(prop_data) except Exception as e: print(f"处理行时发生异常: {str(e)}") continue page += 1 def _create_or_get_node(self, node_name: str, category: str) -> int: node_service = KGNodeService(next(get_db())) node = node_service.get_node_by_name_category(node_name, category) if node: return node['id'] node_data = {} node_data['name'] = node_name node_data['category'] = category node_data['version'] = 'unk' node_data['status'] = 0 return node_service.create_node(node_data).id def get_page_no(self, text: str, trunks_service,file_path:str) -> int: results = trunks_service.search_by_vector(text,1000,type='page',file_path=file_path,conversation_id="1111111aaaa") sentences = self.split_text(text) count = 0 for r in results: #将r["content"]的所有空白字符去掉 content = regex.sub(r'[^\w\d\p{L}]', '', r["content"]) count+=1 match_count = 0 length = len(sentences)/2 for sentence in sentences: sentence = regex.sub(r'[^\w\d\p{L}]', '', sentence) if sentence in content: match_count += 1 if match_count >= 2: return r["page_no"] def test_match_trunk(self,trunks_service) -> int: must_matchs = ['心肌梗死'] keywords = [ '概述'] text = '''- 主要病因: 1. 冠状动脉粥样硬化(占90%以上) 2. 冠状动脉栓塞(如房颤血栓脱落) 3. 冠状动脉痉挛(可卡因滥用等) - 危险因素: 1. 吸烟(RR=2.87) 2. 高血压(RR=2.50) 3. LDL-C≥190mg/dL(RR=4.48) - 遗传因素: 家族性高胆固醇血症(OMIM#143890)''' text = regex.sub(r'[^\w\d\p{L}]', '', text) results = trunks_service.search_by_vector(text,1000,distance=0.72,type='trunk') print(f"原结果: {results[0]["meta_header"]}") print(results[0]["content"]) max_match_count = 0 best_match = None for r in results: if all(must_match in r["content"] or must_match in r["meta_header"] for must_match in must_matchs): match_count = sum(keyword in r["content"] for keyword in keywords) if match_count > max_match_count: max_match_count = match_count best_match = r elif best_match is None and max_match_count == 0: best_match = r if best_match: print(f"最佳匹配: {best_match["title"]}") print(best_match["content"]) return best_match def split_text(self, text): """将文本分割成句子""" print(text) # 使用常见的标点符号作为分隔符 delimiters = ['!', '?', '。', '!', '?', '\n', ';', '。', ';'] sentences = [text] for delimiter in delimiters: new_sentences = [] for sentence in sentences: parts = sentence.split(delimiter) new_sentences.extend([part + delimiter if i < len(parts) - 1 else part for i, part in enumerate(parts)]) sentences = [s.strip() for s in new_sentences if s.strip()] # 合并短句子 merged_sentences = [] buffer = "" for sentence in sentences: buffer += " " + sentence if buffer else sentence if len(buffer) >= 10: merged_sentences.append(buffer) buffer = "" if buffer: merged_sentences.append(buffer) # 打印最终句子 for i, sentence in enumerate(merged_sentences): print(f"句子{i+1}: {sentence.replace(" ","").replace("\u2003", "").replace("\u2002", "").replace("\u2009", "").replace("\n", "").replace("\r", "")}") return merged_sentences class TestExceptionCases: def test_duplicate_id(self, trunks_service, test_trunk_data): with pytest.raises(IntegrityError): trunk1 = trunks_service.create_trunk(test_trunk_data) test_trunk_data["id"] = trunk1.id trunks_service.create_trunk(test_trunk_data) def test_invalid_vector_dimension(self, trunks_service, test_trunk_data): with pytest.raises(ValueError): invalid_data = test_trunk_data.copy() invalid_data["embedding"] = [0.1]*100 trunks_service.create_trunk(invalid_data) @pytest.fixture def trunk_factory(): class TrunkFactory: @staticmethod def create(**overrides): defaults = { "content": "工厂内容", "file_path": "factory_path.pdf", "type": "default" } return {**defaults, **overrides} return TrunkFactory() class TestBatchCreateFromDirectory: def test_batch_create_from_directory(self, trunks_service): # 使用现有目录路径 base_path = Path(r'E:\project\vscode\《急诊医学(第2版)》') # 遍历目录并创建trunk created_ids = [] for txt_path in base_path.glob('**/*_split_*.txt'): relative_path = txt_path.relative_to(base_path.parent.parent) with open(txt_path, 'r', encoding='utf-8') as f: trunk_data = { "content": f.read(), "file_path": str(relative_path).replace('\\', '/') } trunk = trunks_service.create_trunk(trunk_data) created_ids.append(trunk.id) # 验证数据库记录 for trunk_id in created_ids: db_trunk = trunks_service.get_trunk_by_id(trunk_id) assert db_trunk is not None assert ".txt" in db_trunk.file_path assert "_split_" in db_trunk.file_path assert len(db_trunk.content) > 0