import os import psycopg2 from psycopg2 import pool from typing import List, Dict, Optional import re from llm_client import LLMClient class TextSliceProcessor: def __init__(self, text_dir: str, prompt_template: str, db_host="localhost", db_port=5432, db_name="postgres", db_user="postgres", db_password="your_password", min_conn=1, max_conn=10, api_key: str = None, model: str = None, base_url: str = None): """ 初始化文本切片处理器 Args: text_dir: 文本目录路径 prompt_template: 提示词模板 db_host: PostgreSQL数据库主机 db_port: PostgreSQL数据库端口 db_name: PostgreSQL数据库名称 db_user: PostgreSQL用户名 db_password: PostgreSQL密码 min_conn: 连接池最小连接数 max_conn: 连接池最大连接数 """ self.text_dir = text_dir self.prompt_template = prompt_template # 初始化LLM客户端 self.llm_client = LLMClient(api_key=api_key, model=model, base_url=base_url) # 创建数据库连接池 self.pool = psycopg2.pool.SimpleConnectionPool( min_conn, max_conn, host=db_host, port=db_port, database=db_name, user=db_user, password=db_password ) # 获取一个连接和游标 self.conn = self.pool.getconn() self.cursor = self.conn.cursor() # 用于缓存实体信息的字典 self.entity_cache = {} # 初始化错误日志列表 self.error_logs = [] def __del__(self): """ 析构函数,确保关闭数据库连接和连接池 """ if hasattr(self, 'cursor') and self.cursor: self.cursor.close() if hasattr(self, 'conn') and self.conn: self.pool.putconn(self.conn) if hasattr(self, 'pool') and self.pool: self.pool.closeall() def _get_sorted_files(self, directory: str) -> List[str]: """ 获取目录下所有txt文件的列表,按章节和节数字顺序返回 Args: directory: 目录路径 Returns: List[str]: 文件路径列表 """ # 存储目录路径和对应的章节数字 chapters_with_numbers = [] # 首先获取所有章节目录并排序 for root, dirs, _ in os.walk(directory): for dir_name in dirs: # 提取章节数字 match = re.search(r'第(\d+)章', dir_name) if match: chapter_num = int(match.group(1)) chapter_path = os.path.join(root, dir_name) chapters_with_numbers.append((chapter_num, chapter_path)) # 按章节数字排序 chapters_with_numbers.sort(key=lambda x: x[0]) # 存储所有文件路径 all_files = [] # 遍历排序后的章节目录 for chapter_num, chapter_path in chapters_with_numbers: # 存储当前章节下的所有节 sections_with_numbers = [] # 获取当前章节目录下的所有文件 for root, _, filenames in os.walk(chapter_path): for filename in filenames: if filename.endswith('.txt') and '_split_' in filename: file_path = os.path.join(root, filename) # 提取节数字 section_match = re.search(r'第(\d+)节', filename) if section_match: section_num = int(section_match.group(1)) sections_with_numbers.append((chapter_num, section_num, file_path)) else: # 如果没有找到节数字,将其放在当前章节的最后 sections_with_numbers.append((chapter_num, float('inf'), file_path)) # 先按章节号排序,再按节号排序 sections_with_numbers.sort(key=lambda x: (x[0], x[1])) all_files.extend([file_path for _, _, file_path in sections_with_numbers]) return all_files def _read_text_file(self, file_path: str) -> str: """ 读取文本文件内容 Args: file_path: 文件路径 Returns: str: 文件内容 """ try: with open(file_path, 'r', encoding='utf-8') as f: lines = f.readlines() if len(lines) > 1: # 跳过第一行,返回其余内容 return ''.join(lines[1:]).strip() return '' except Exception as e: error_msg = f"读取文件 {file_path} 失败: {str(e)}" print(error_msg) self.error_logs.append({"error": error_msg, "file": file_path}) return "" def _create_or_get_node(self, node_name: str, category: str) -> int: """ 创建或获取节点 Args: node_name: 节点名称 category: 节点类别 Returns: int: 节点ID """ # 检查缓存 cache_key = f"{node_name}_{category}" if cache_key in self.entity_cache: return self.entity_cache[cache_key] # 查询是否存在 self.cursor.execute( "SELECT id FROM kg_nodes WHERE name = %s AND category = %s", (node_name, category) ) result = self.cursor.fetchone() if result: node_id = result[0] else: # 创建新节点 self.cursor.execute( "INSERT INTO kg_nodes (name, category, status, version) " "VALUES (%s, %s, 0, '1.3') RETURNING id", (node_name, category) ) node_id = self.cursor.fetchone()[0] # 更新缓存 self.entity_cache[cache_key] = node_id return node_id def _create_relationship(self, text_content: str, prev_text: str, file_path: str) -> None: """ 创建文本关系并存储到数据库 Args: text_content: 当前文本内容 prev_text: 前置文本内容 file_path: 文件路径 """ try: # 构建完整的提示词内容 full_content = f"{self.prompt_template}用户输入的\"前置医学文本\"为:\n{prev_text if prev_text else '无'}\n用户输入的\"任务医学文本\"为:\n{text_content}" # 调用LLM获取结果 response = self.llm_client.chat_completion([{"role": "user", "content": full_content}]) if not hasattr(response, 'choices') or len(response.choices) == 0: raise Exception("未收到有效回复") # 获取LLM返回的结果 llm_result = response.choices[0].message.content # 开始事务 self.cursor.execute("BEGIN") # 处理每一行结果 for line in llm_result.strip().split('\n'): if not line.strip(): continue # 解析结果 parts = line.strip().split('---') if len(parts) != 5: continue start_node_name, relation_name, end_node_name, start_category, end_category = parts # 创建或获取起始节点和结束节点 start_node_id = self._create_or_get_node(start_node_name, start_category) end_node_id = self._create_or_get_node(end_node_name, end_category) # 创建关系 self.cursor.execute( "INSERT INTO kg_edges (category, src_id, dest_id, name, status, version) " "VALUES (%s, %s, %s, %s, 0, '1.3') RETURNING id", (relation_name, start_node_id, end_node_id, relation_name) ) edge_id = self.cursor.fetchone()[0] # 添加文件路径属性 self.cursor.execute( "INSERT INTO kg_props (category, ref_id, prop_name, prop_value, type, prop_title) " "VALUES (4, %s, 'file_path', %s, 1, 'file_path')", (edge_id, file_path) ) # 提交事务 self.conn.commit() print(f"处理文件完成: {file_path}") except Exception as e: self.conn.rollback() error_msg = f"处理文件 {file_path} 时出错: {str(e)}" print(error_msg) self.error_logs.append({"error": error_msg, "file": file_path}) def process_text_files(self): """ 处理目录下的所有文本文件 """ try: # 记录处理情况 self.processed_files = 0 self.total_files = 0 # 递归处理目录 self._process_directory(self.text_dir) # 输出处理总结 print("\n处理完成!") print(f"总文件数: {self.total_files}") print(f"成功处理: {self.processed_files}") print(f"处理失败: {len(self.error_logs)}") if self.error_logs: print("\n失败文件列表:") for log in self.error_logs: print(f"文件: {log['file']}") print(f"错误: {log['error']}\n") except Exception as e: print(f"批量处理失败: {str(e)}") def _process_directory(self, directory: str, prev_text: str = "") -> None: """ 递归处理目录下的文件 Args: directory: 目录路径 prev_text: 前置文本内容 """ try: # 获取目录下的所有文件和子目录 items = sorted(os.listdir(directory)) # 收集当前目录下的所有txt文件 txt_files = [] for item in items: item_path = os.path.join(directory, item) if os.path.isfile(item_path) and item.endswith('.txt') and '_split_' in item: # 提取split序号 match = re.search(r'split_(\d+)', item) if match: split_num = int(match.group(1)) txt_files.append((split_num, item_path)) # 按split序号排序文件 txt_files.sort(key=lambda x: x[0]) # 处理排序后的文件 for split_num, item_path in txt_files: self.total_files += 1 print(f"\n开始处理文件 [{self.processed_files + 1}/{self.total_files}]: {os.path.basename(item_path)}") try: # 读取当前文件内容 text_content = self._read_text_file(item_path) if not text_content: continue # 如果是第一个文件(split_0),前置文本为'无' if split_num == 0: current_prev_text = '无' else: # 获取前一个文件的路径 prev_path = item_path.replace(f'split_{split_num}', f'split_{split_num-1}') # 如果前一个文件存在,读取其内容作为前置文本 if os.path.exists(prev_path): current_prev_text = self._read_text_file(prev_path) else: current_prev_text = prev_text # 创建关系 self._create_relationship(text_content, current_prev_text, item_path) self.processed_files += 1 # 更新前置文本 prev_text = text_content except Exception as e: error_msg = f"处理文件 {item_path} 时发生错误: {str(e)}" print(error_msg) self.error_logs.append({"error": error_msg, "file": item_path}) continue # 然后递归处理子目录 for item in items: item_path = os.path.join(directory, item) if os.path.isdir(item_path): # 检查目录名是否包含章节信息 if re.search(r'第(\d+)章', item): self._process_directory(item_path, prev_text) except Exception as e: error_msg = f"处理目录 {directory} 时发生错误: {str(e)}" print(error_msg) self.error_logs.append({"error": error_msg, "file": directory}) def main(): # 解析命令行参数 import argparse parser = argparse.ArgumentParser(description='文本切片处理工具') parser.add_argument('--text_dir', type=str, required=False, default='/Users/ycw/work/qwen32b知识抽取/《急诊医学(第2版)》', help='文本目录路径') prompt = """ ##角色任务 你是一个医学专家,且精通中文文本的实体和关系标注(自然语言处理),用户将提供两个医学文本:“前置医学文本”和“任务医学文本”,你的任务是从用户输入的“任务医学文本”中抽取所有的“关系型”知识要点,构建符合「头部实体---关系---尾部实体」的三元组集合。 ##请严格依次执行以下步骤及其细节要求: 第一步,通读“前置医学文本”和“任务医学文本”,对“任务医学文本”形成一个整体的理解,搞明白其围绕的主题是什么?主要讲的是什么?主要说了哪几块或几点内容?。此处“前置医学文本”是“任务医学文本”的上文语义背景,其有助于你对“任务医学文本”语义的理解,且在你提取三元组时,其可能作为参考或辅助信息。 第二步【实体识别】: 规则1:可参考但不限于如下医学实体类型:症状、体格检查项目(简称:体征项)、体格检查项目结果(简称:“体征”或“体征结果”)、检查、检查结果、疾病、药品、手术操作、其他治疗、科室、人群、人体结构或部位、医疗器械、食物、病理机制等。 规则2:复合实体要拆分:如“老年人和免疫功能低下者”不能作为一个实体,需拆分为:"老年人"、"免疫功能低下者"两个实体。 规则3:文字中省略了,但语义中暗含的文字内容要进行语义补全:如文本:“胸、腹部检查”中,这里的“胸”其实是指“胸部检查”,按语义补全后应该有两个实体:胸部检查、腹部检查。同理,文本:“注意有无心内膜炎、心肌炎、心包炎体征”中,应该有三个实体:内膜炎体征、心肌炎体征、心包炎体征;文本:“有无肝脏和脾脏肿大”中,应该有两个实体:肝脏肿大、脾脏肿大。 第三步【关系构建】: 规则4:可参考的“基础关系类型”如下(但不仅仅限于这些):属于(是)、包括(包含)、导致(的结果)、是由…导致(的结果)、的原因是、是…的原因、的病因是、是…的病因、基于(基础是)、是…的基础、推荐、被推荐于、区别于、相似于、关联、疾病常关联、疾病可推荐等等。 规则5:根据上下文整体语义分析并确定出两个实体之间的“基础关系类型”后,可能还不足以描述清楚两个实体间的“详细关系”或“精准的关系”,所以你要尽量构建字数更多的详细的“关系”,需要依据原文中真实的语义,在“基础关系”中增加“修饰词”、“限定条件词”等语义描述,避免关系构建太过粗糙,避免原文关系语义信息的衰减或丢失。 规则6:语义中属于该三元组关系的修饰词、限定条件词等,需要融入到该“关系”中(如:可能、少数、30%、多数、显著、轻微、手术后、治疗无效时、满足XX条件时等),如果归属于该“关系”,则需要组合到该“关系”中使得关系更丰满(如:“可能导致”、“少数由…导致”、“手术后导致”、“满足XX条件时,推荐”等等)。 规则7:“尾部实体”不能为复合实体或并排结构,必须合理的拆分为多个三元组。 规则8:强制质量检测:“头部实体---关系类型---尾部实体”中,三者必须能组合成语法通顺的一句话,不能有语病,且关系指向不能模糊或错误。如三元组1:“A---的原因是---B”中,组合成一句话“A的原因是B”没有语病,且关系明确:B是原因,A是B原因导致的结果;如三元组2:“A---是…的原因---B”中,组合成一句话“A是B的原因”没有语病,且关系明确:A是原因,B是A原因导致的结果。注意:不是所有的关系都是镜像对称的,很多关系是单向关系,请根据语义关系的方向选择正确的关系类型,否则语义会完全相反。 ##输出格式要求: 对于每个三元组关系,提取以下信息:头部实体的名称(source_entity)、尾部实体的名称(target_entity)、关系(relationship_type)、头部实体的类型(source_entity_type)、尾部实体的类型(target_entity_type),最后这些信息整理成如下格式的一个“字符串”:“头部实体的名称---关系---尾部实体的名称---头部实体的类型---尾部实体的类型”,不同的三元组“字符串”之间用换行符号连接起来。 ##质量红线 1.禁止出现四元组或嵌套结构 2.禁止合并多个“差异点”或“并列实体”到单个三元组 3.每个三元组必须保留原文核心逻辑 ##示例: “前置医学文本”为: 第一节,自发性气胸 “任务医学文本”为: "三、诊断要点 自发性气胸通过胸部 X 线片确立诊断,条件允许时,应选择直立位拍片。 1. 既往胸部 X 线检查无明显病变或有 COPD、肺结核、哮喘等肺部基础病变。 2. 突发一侧胸痛伴不同程度的胸闷、呼吸困难。患侧胸廓饱满、呼吸运动减弱,叩诊呈鼓音,肝、 肺浊音界消失,听诊呼吸音减弱,甚至消失。 3. 发病时胸部 X 线影像学检查是诊断气胸最为准确和可靠的方法。 典型自发性气胸诊断不难。继发性气胸病人可因原有基础疾病而影响诊断,因此,对临床不能用 其他原因解释或经急诊处理呼吸困难无改善者,需考虑自发性气胸的可能。因病情危重不能立即行 胸部 X 线检查时,可在胸腔积气体征最明显处进行诊断性穿刺。" 输出为: "自发性气胸---通过...确立诊断---胸部X线片---疾病---检查 自发性气胸---条件允许时推荐选择---直立位拍片---疾病---检查操作 自发性气胸---诊断要点需结合---既往胸部X线检查无明显病变---疾病---检查结果 自发性气胸---诊断要点需结合---或有肺部基础病变---疾病---检查结果 肺部基础病变---的原因包括---COPD---检查结果---疾病 肺部基础病变---的原因包括---肺结核---检查结果---疾病 肺部基础病变---的原因包括---哮喘---检查结果---疾病 自发性气胸---诊断要点需结合---突发一侧胸痛伴不同程度的胸闷 自发性气胸---诊断要点需结合---突发一侧胸痛伴不同程度的呼吸困难 突发一侧胸痛---伴发---胸闷---症状---症状 突发一侧胸痛---伴发---呼吸困难---症状---症状 自发性气胸---患侧表现---患侧胸廓饱满---疾病---体征结果 自发性气胸---患侧表现---呼吸运动减弱---疾病---体征结果 自发性气胸---患侧表现---叩诊呈鼓音---疾病---体征结果 自发性气胸---体征结果---肝脏浊音界消失---疾病---体征结果 自发性气胸---体征结果---肺浊音界消失---疾病---体征结果 自发性气胸---患侧表现---肺浊音界消失---疾病---体征结果 自发性气胸---患侧表现---听诊呼吸音减弱---疾病---体征结果 自发性气胸---患侧表现---听诊呼吸音消失---疾病---体征结果 胸部X线影像学检查---是诊断...最准确可靠方法---气胸---检查---疾病 继发性气胸病人---可因...被影响诊断---原有基础疾病---人群---疾病 临床不能用其他原因解释的呼吸困难者---需考虑可能为---自发性气胸---人群---疾病 急诊处理呼吸困难无改善者---需考虑可能为---自发性气胸---人群---疾病 病情危重不能立即行胸部X线检查者---推荐进行---诊断性穿刺---人群---检查操作 诊断性穿刺---实施部位---胸腔积气体征最明显处---检查操作---人体部位" """ parser.add_argument('--prompt', type=str, required=False, default=prompt, help='提示词模板') args = parser.parse_args() # 设置PostgreSQL连接参数 db_host = "173.18.12.203" db_port = 5432 db_name = "kg_ycw" db_user = "knowledge" db_password = "qwer1234." # 请替换为实际的密码 try: # 创建处理器实例 processor = TextSliceProcessor( text_dir=args.text_dir, prompt_template=args.prompt, db_host=db_host, db_port=db_port, db_name=db_name, db_user=db_user, db_password=db_password ) # 执行批量处理 processor.process_text_files() except Exception as e: print(f"错误: {str(e)}") if __name__ == "__main__": main()