123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443 |
- import os
- import psycopg2
- from psycopg2 import pool
- from typing import List, Dict, Optional
- import re
- from llm_client import LLMClient
- class TextSliceProcessor:
- def __init__(self, text_dir: str, prompt_template: str, db_host="localhost", db_port=5432, db_name="postgres", db_user="postgres", db_password="your_password", min_conn=1, max_conn=10, api_key: str = None, model: str = None, base_url: str = None):
- """
- 初始化文本切片处理器
-
- Args:
- text_dir: 文本目录路径
- prompt_template: 提示词模板
- db_host: PostgreSQL数据库主机
- db_port: PostgreSQL数据库端口
- db_name: PostgreSQL数据库名称
- db_user: PostgreSQL用户名
- db_password: PostgreSQL密码
- min_conn: 连接池最小连接数
- max_conn: 连接池最大连接数
- """
- self.text_dir = text_dir
- self.prompt_template = prompt_template
-
- # 初始化LLM客户端
- self.llm_client = LLMClient(api_key=api_key, model=model, base_url=base_url)
-
- # 创建数据库连接池
- self.pool = psycopg2.pool.SimpleConnectionPool(
- min_conn,
- max_conn,
- host=db_host,
- port=db_port,
- database=db_name,
- user=db_user,
- password=db_password
- )
- # 获取一个连接和游标
- self.conn = self.pool.getconn()
- self.cursor = self.conn.cursor()
-
- # 用于缓存实体信息的字典
- self.entity_cache = {}
- # 初始化错误日志列表
- self.error_logs = []
-
- def __del__(self):
- """
- 析构函数,确保关闭数据库连接和连接池
- """
- if hasattr(self, 'cursor') and self.cursor:
- self.cursor.close()
- if hasattr(self, 'conn') and self.conn:
- self.pool.putconn(self.conn)
- if hasattr(self, 'pool') and self.pool:
- self.pool.closeall()
-
- def _get_sorted_files(self, directory: str) -> List[str]:
- """
- 获取目录下所有txt文件的列表,按章节和节数字顺序返回
-
- Args:
- directory: 目录路径
-
- Returns:
- List[str]: 文件路径列表
- """
- # 存储目录路径和对应的章节数字
- chapters_with_numbers = []
-
- # 首先获取所有章节目录并排序
- for root, dirs, _ in os.walk(directory):
- for dir_name in dirs:
- # 提取章节数字
- match = re.search(r'第(\d+)章', dir_name)
- if match:
- chapter_num = int(match.group(1))
- chapter_path = os.path.join(root, dir_name)
- chapters_with_numbers.append((chapter_num, chapter_path))
-
- # 按章节数字排序
- chapters_with_numbers.sort(key=lambda x: x[0])
-
- # 存储所有文件路径
- all_files = []
-
- # 遍历排序后的章节目录
- for chapter_num, chapter_path in chapters_with_numbers:
- # 存储当前章节下的所有节
- sections_with_numbers = []
-
- # 获取当前章节目录下的所有文件
- for root, _, filenames in os.walk(chapter_path):
- for filename in filenames:
- if filename.endswith('.txt') and '_split_' in filename:
- file_path = os.path.join(root, filename)
- # 提取节数字
- section_match = re.search(r'第(\d+)节', filename)
- if section_match:
- section_num = int(section_match.group(1))
- sections_with_numbers.append((chapter_num, section_num, file_path))
- else:
- # 如果没有找到节数字,将其放在当前章节的最后
- sections_with_numbers.append((chapter_num, float('inf'), file_path))
-
- # 先按章节号排序,再按节号排序
- sections_with_numbers.sort(key=lambda x: (x[0], x[1]))
- all_files.extend([file_path for _, _, file_path in sections_with_numbers])
-
- return all_files
-
- def _read_text_file(self, file_path: str) -> str:
- """
- 读取文本文件内容
-
- Args:
- file_path: 文件路径
-
- Returns:
- str: 文件内容
- """
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- lines = f.readlines()
- if len(lines) > 1:
- # 跳过第一行,返回其余内容
- return ''.join(lines[1:]).strip()
- return ''
- except Exception as e:
- error_msg = f"读取文件 {file_path} 失败: {str(e)}"
- print(error_msg)
- self.error_logs.append({"error": error_msg, "file": file_path})
- return ""
-
- def _create_or_get_node(self, node_name: str, category: str) -> int:
- """
- 创建或获取节点
-
- Args:
- node_name: 节点名称
- category: 节点类别
-
- Returns:
- int: 节点ID
- """
- # 检查缓存
- cache_key = f"{node_name}_{category}"
- if cache_key in self.entity_cache:
- return self.entity_cache[cache_key]
-
- # 查询是否存在
- self.cursor.execute(
- "SELECT id FROM kg_nodes WHERE name = %s AND category = %s",
- (node_name, category)
- )
- result = self.cursor.fetchone()
-
- if result:
- node_id = result[0]
- else:
- # 创建新节点
- self.cursor.execute(
- "INSERT INTO kg_nodes (name, category, status, version) "
- "VALUES (%s, %s, 0, '1.3') RETURNING id",
- (node_name, category)
- )
- node_id = self.cursor.fetchone()[0]
-
- # 更新缓存
- self.entity_cache[cache_key] = node_id
- return node_id
- def _create_relationship(self, text_content: str, prev_text: str, file_path: str) -> None:
- """
- 创建文本关系并存储到数据库
-
- Args:
- text_content: 当前文本内容
- prev_text: 前置文本内容
- file_path: 文件路径
- """
- try:
- # 构建完整的提示词内容
- full_content = f"{self.prompt_template}用户输入的\"前置医学文本\"为:\n{prev_text if prev_text else '无'}\n用户输入的\"任务医学文本\"为:\n{text_content}"
-
- # 调用LLM获取结果
- response = self.llm_client.chat_completion([{"role": "user", "content": full_content}])
-
- if not hasattr(response, 'choices') or len(response.choices) == 0:
- raise Exception("未收到有效回复")
-
- # 获取LLM返回的结果
- llm_result = response.choices[0].message.content
-
- # 开始事务
- self.cursor.execute("BEGIN")
-
- # 处理每一行结果
- for line in llm_result.strip().split('\n'):
- if not line.strip():
- continue
-
- # 解析结果
- parts = line.strip().split('---')
- if len(parts) != 5:
- continue
-
- start_node_name, relation_name, end_node_name, start_category, end_category = parts
-
- # 创建或获取起始节点和结束节点
- start_node_id = self._create_or_get_node(start_node_name, start_category)
- end_node_id = self._create_or_get_node(end_node_name, end_category)
-
- # 创建关系
- self.cursor.execute(
- "INSERT INTO kg_edges (category, src_id, dest_id, name, status, version) "
- "VALUES (%s, %s, %s, %s, 0, '1.3') RETURNING id",
- (relation_name, start_node_id, end_node_id, relation_name)
- )
- edge_id = self.cursor.fetchone()[0]
-
- # 添加文件路径属性
- self.cursor.execute(
- "INSERT INTO kg_props (category, ref_id, prop_name, prop_value, type, prop_title) "
- "VALUES (4, %s, 'file_path', %s, 1, 'file_path')",
- (edge_id, file_path)
- )
-
- # 提交事务
- self.conn.commit()
- print(f"处理文件完成: {file_path}")
-
- except Exception as e:
- self.conn.rollback()
- error_msg = f"处理文件 {file_path} 时出错: {str(e)}"
- print(error_msg)
- self.error_logs.append({"error": error_msg, "file": file_path})
-
- def process_text_files(self):
- """
- 处理目录下的所有文本文件
- """
- try:
- # 记录处理情况
- self.processed_files = 0
- self.total_files = 0
-
- # 递归处理目录
- self._process_directory(self.text_dir)
-
- # 输出处理总结
- print("\n处理完成!")
- print(f"总文件数: {self.total_files}")
- print(f"成功处理: {self.processed_files}")
- print(f"处理失败: {len(self.error_logs)}")
-
- if self.error_logs:
- print("\n失败文件列表:")
- for log in self.error_logs:
- print(f"文件: {log['file']}")
- print(f"错误: {log['error']}\n")
-
- except Exception as e:
- print(f"批量处理失败: {str(e)}")
-
- def _process_directory(self, directory: str, prev_text: str = "") -> None:
- """
- 递归处理目录下的文件
-
- Args:
- directory: 目录路径
- prev_text: 前置文本内容
- """
- try:
- # 获取目录下的所有文件和子目录
- items = sorted(os.listdir(directory))
-
- # 收集当前目录下的所有txt文件
- txt_files = []
- for item in items:
- item_path = os.path.join(directory, item)
- if os.path.isfile(item_path) and item.endswith('.txt') and '_split_' in item:
- # 提取split序号
- match = re.search(r'split_(\d+)', item)
- if match:
- split_num = int(match.group(1))
- txt_files.append((split_num, item_path))
-
- # 按split序号排序文件
- txt_files.sort(key=lambda x: x[0])
-
- # 处理排序后的文件
- for split_num, item_path in txt_files:
- self.total_files += 1
- print(f"\n开始处理文件 [{self.processed_files + 1}/{self.total_files}]: {os.path.basename(item_path)}")
-
- try:
- # 读取当前文件内容
- text_content = self._read_text_file(item_path)
- if not text_content:
- continue
-
- # 如果是第一个文件(split_0),前置文本为'无'
- if split_num == 0:
- current_prev_text = '无'
- else:
- # 获取前一个文件的路径
- prev_path = item_path.replace(f'split_{split_num}', f'split_{split_num-1}')
- # 如果前一个文件存在,读取其内容作为前置文本
- if os.path.exists(prev_path):
- current_prev_text = self._read_text_file(prev_path)
- else:
- current_prev_text = prev_text
-
- # 创建关系
- self._create_relationship(text_content, current_prev_text, item_path)
- self.processed_files += 1
-
- # 更新前置文本
- prev_text = text_content
-
- except Exception as e:
- error_msg = f"处理文件 {item_path} 时发生错误: {str(e)}"
- print(error_msg)
- self.error_logs.append({"error": error_msg, "file": item_path})
- continue
-
- # 然后递归处理子目录
- for item in items:
- item_path = os.path.join(directory, item)
- if os.path.isdir(item_path):
- # 检查目录名是否包含章节信息
- if re.search(r'第(\d+)章', item):
- self._process_directory(item_path, prev_text)
-
- except Exception as e:
- error_msg = f"处理目录 {directory} 时发生错误: {str(e)}"
- print(error_msg)
- self.error_logs.append({"error": error_msg, "file": directory})
- def main():
- # 解析命令行参数
- import argparse
- parser = argparse.ArgumentParser(description='文本切片处理工具')
- parser.add_argument('--text_dir', type=str, required=False, default='/Users/ycw/work/qwen32b知识抽取/《急诊医学(第2版)》',
- help='文本目录路径')
- prompt = """
- ##角色任务
- 你是一个医学专家,且精通中文文本的实体和关系标注(自然语言处理),用户将提供两个医学文本:“前置医学文本”和“任务医学文本”,你的任务是从用户输入的“任务医学文本”中抽取所有的“关系型”知识要点,构建符合「头部实体---关系---尾部实体」的三元组集合。
- ##请严格依次执行以下步骤及其细节要求:
- 第一步,通读“前置医学文本”和“任务医学文本”,对“任务医学文本”形成一个整体的理解,搞明白其围绕的主题是什么?主要讲的是什么?主要说了哪几块或几点内容?。此处“前置医学文本”是“任务医学文本”的上文语义背景,其有助于你对“任务医学文本”语义的理解,且在你提取三元组时,其可能作为参考或辅助信息。
- 第二步【实体识别】:
- 规则1:可参考但不限于如下医学实体类型:症状、体格检查项目(简称:体征项)、体格检查项目结果(简称:“体征”或“体征结果”)、检查、检查结果、疾病、药品、手术操作、其他治疗、科室、人群、人体结构或部位、医疗器械、食物、病理机制等。
- 规则2:复合实体要拆分:如“老年人和免疫功能低下者”不能作为一个实体,需拆分为:"老年人"、"免疫功能低下者"两个实体。
- 规则3:文字中省略了,但语义中暗含的文字内容要进行语义补全:如文本:“胸、腹部检查”中,这里的“胸”其实是指“胸部检查”,按语义补全后应该有两个实体:胸部检查、腹部检查。同理,文本:“注意有无心内膜炎、心肌炎、心包炎体征”中,应该有三个实体:内膜炎体征、心肌炎体征、心包炎体征;文本:“有无肝脏和脾脏肿大”中,应该有两个实体:肝脏肿大、脾脏肿大。
- 第三步【关系构建】:
- 规则4:可参考的“基础关系类型”如下(但不仅仅限于这些):属于(是)、包括(包含)、导致(的结果)、是由…导致(的结果)、的原因是、是…的原因、的病因是、是…的病因、基于(基础是)、是…的基础、推荐、被推荐于、区别于、相似于、关联、疾病常关联、疾病可推荐等等。
- 规则5:根据上下文整体语义分析并确定出两个实体之间的“基础关系类型”后,可能还不足以描述清楚两个实体间的“详细关系”或“精准的关系”,所以你要尽量构建字数更多的详细的“关系”,需要依据原文中真实的语义,在“基础关系”中增加“修饰词”、“限定条件词”等语义描述,避免关系构建太过粗糙,避免原文关系语义信息的衰减或丢失。
- 规则6:语义中属于该三元组关系的修饰词、限定条件词等,需要融入到该“关系”中(如:可能、少数、30%、多数、显著、轻微、手术后、治疗无效时、满足XX条件时等),如果归属于该“关系”,则需要组合到该“关系”中使得关系更丰满(如:“可能导致”、“少数由…导致”、“手术后导致”、“满足XX条件时,推荐”等等)。
- 规则7:“尾部实体”不能为复合实体或并排结构,必须合理的拆分为多个三元组。
- 规则8:强制质量检测:“头部实体---关系类型---尾部实体”中,三者必须能组合成语法通顺的一句话,不能有语病,且关系指向不能模糊或错误。如三元组1:“A---的原因是---B”中,组合成一句话“A的原因是B”没有语病,且关系明确:B是原因,A是B原因导致的结果;如三元组2:“A---是…的原因---B”中,组合成一句话“A是B的原因”没有语病,且关系明确:A是原因,B是A原因导致的结果。注意:不是所有的关系都是镜像对称的,很多关系是单向关系,请根据语义关系的方向选择正确的关系类型,否则语义会完全相反。
- ##输出格式要求:
- 对于每个三元组关系,提取以下信息:头部实体的名称(source_entity)、尾部实体的名称(target_entity)、关系(relationship_type)、头部实体的类型(source_entity_type)、尾部实体的类型(target_entity_type),最后这些信息整理成如下格式的一个“字符串”:“头部实体的名称---关系---尾部实体的名称---头部实体的类型---尾部实体的类型”,不同的三元组“字符串”之间用换行符号连接起来。
- ##质量红线
- 1.禁止出现四元组或嵌套结构
- 2.禁止合并多个“差异点”或“并列实体”到单个三元组
- 3.每个三元组必须保留原文核心逻辑
- ##示例:
- “前置医学文本”为:
- 第一节,自发性气胸
- “任务医学文本”为:
- "三、诊断要点
- 自发性气胸通过胸部 X 线片确立诊断,条件允许时,应选择直立位拍片。
- 1. 既往胸部 X 线检查无明显病变或有 COPD、肺结核、哮喘等肺部基础病变。
- 2. 突发一侧胸痛伴不同程度的胸闷、呼吸困难。患侧胸廓饱满、呼吸运动减弱,叩诊呈鼓音,肝、 肺浊音界消失,听诊呼吸音减弱,甚至消失。
- 3. 发病时胸部 X 线影像学检查是诊断气胸最为准确和可靠的方法。
- 典型自发性气胸诊断不难。继发性气胸病人可因原有基础疾病而影响诊断,因此,对临床不能用 其他原因解释或经急诊处理呼吸困难无改善者,需考虑自发性气胸的可能。因病情危重不能立即行 胸部 X 线检查时,可在胸腔积气体征最明显处进行诊断性穿刺。"
- 输出为:
- "自发性气胸---通过...确立诊断---胸部X线片---疾病---检查
- 自发性气胸---条件允许时推荐选择---直立位拍片---疾病---检查操作
- 自发性气胸---诊断要点需结合---既往胸部X线检查无明显病变---疾病---检查结果
- 自发性气胸---诊断要点需结合---或有肺部基础病变---疾病---检查结果
- 肺部基础病变---的原因包括---COPD---检查结果---疾病
- 肺部基础病变---的原因包括---肺结核---检查结果---疾病
- 肺部基础病变---的原因包括---哮喘---检查结果---疾病
- 自发性气胸---诊断要点需结合---突发一侧胸痛伴不同程度的胸闷
- 自发性气胸---诊断要点需结合---突发一侧胸痛伴不同程度的呼吸困难
- 突发一侧胸痛---伴发---胸闷---症状---症状
- 突发一侧胸痛---伴发---呼吸困难---症状---症状
- 自发性气胸---患侧表现---患侧胸廓饱满---疾病---体征结果
- 自发性气胸---患侧表现---呼吸运动减弱---疾病---体征结果
- 自发性气胸---患侧表现---叩诊呈鼓音---疾病---体征结果
- 自发性气胸---体征结果---肝脏浊音界消失---疾病---体征结果
- 自发性气胸---体征结果---肺浊音界消失---疾病---体征结果
- 自发性气胸---患侧表现---肺浊音界消失---疾病---体征结果
- 自发性气胸---患侧表现---听诊呼吸音减弱---疾病---体征结果
- 自发性气胸---患侧表现---听诊呼吸音消失---疾病---体征结果
- 胸部X线影像学检查---是诊断...最准确可靠方法---气胸---检查---疾病
- 继发性气胸病人---可因...被影响诊断---原有基础疾病---人群---疾病
- 临床不能用其他原因解释的呼吸困难者---需考虑可能为---自发性气胸---人群---疾病
- 急诊处理呼吸困难无改善者---需考虑可能为---自发性气胸---人群---疾病
- 病情危重不能立即行胸部X线检查者---推荐进行---诊断性穿刺---人群---检查操作
- 诊断性穿刺---实施部位---胸腔积气体征最明显处---检查操作---人体部位"
- """
- parser.add_argument('--prompt', type=str, required=False, default=prompt,
- help='提示词模板')
- args = parser.parse_args()
-
- # 设置PostgreSQL连接参数
- db_host = "173.18.12.203"
- db_port = 5432
- db_name = "kg_ycw"
- db_user = "knowledge"
- db_password = "qwer1234." # 请替换为实际的密码
-
- try:
- # 创建处理器实例
- processor = TextSliceProcessor(
- text_dir=args.text_dir,
- prompt_template=args.prompt,
- db_host=db_host,
- db_port=db_port,
- db_name=db_name,
- db_user=db_user,
- db_password=db_password
- )
-
- # 执行批量处理
- processor.process_text_files()
-
- except Exception as e:
- print(f"错误: {str(e)}")
- if __name__ == "__main__":
- main()
|