|
@@ -0,0 +1,714 @@
|
|
|
+from hmac import new
|
|
|
+import os
|
|
|
+import sys
|
|
|
+import logging
|
|
|
+import json
|
|
|
+import time
|
|
|
+
|
|
|
+from service.kg_edge_service import KGEdgeService
|
|
|
+from db.session import get_db
|
|
|
+from service.kg_node_service import KGNodeService
|
|
|
+from service.kg_prop_service import KGPropService
|
|
|
+from utils.cache import Cache
|
|
|
+
|
|
|
+current_path = os.getcwd()
|
|
|
+sys.path.append(current_path)
|
|
|
+
|
|
|
+from community.graph_helper import GraphHelper
|
|
|
+from typing import List
|
|
|
+from agent.cdss.models.schemas import CDSSInput
|
|
|
+from config.site import SiteConfig
|
|
|
+import networkx as nx
|
|
|
+import pandas as pd
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+current_path = os.getcwd()
|
|
|
+sys.path.append(current_path)
|
|
|
+
|
|
|
+# 图谱数据缓存路径(由dump_graph_data.py生成)
|
|
|
+CACHED_DATA_PATH = os.path.join(current_path, 'community', 'web', 'cached_data')
|
|
|
+
|
|
|
+
|
|
|
+class CDSSHelper(GraphHelper):
|
|
|
+
|
|
|
+ def node_search(self, node_id=None, node_type=None, filters=None, limit=1, min_degree=None):
|
|
|
+
|
|
|
+ kg_node_service = KGNodeService(next(get_db()))
|
|
|
+ es_result = kg_node_service.search_title_index("graph_entity_index", node_id, limit)
|
|
|
+ results = []
|
|
|
+ for item in es_result:
|
|
|
+ n = self.graph.nodes.get(item["id"])
|
|
|
+ score = item["score"]
|
|
|
+ if n:
|
|
|
+ results.append({
|
|
|
+ 'id': item["title"],
|
|
|
+ 'score': score,
|
|
|
+ "name": item["title"],
|
|
|
+ })
|
|
|
+ return results
|
|
|
+
|
|
|
+ def _load_entity_data(self):
|
|
|
+ config = SiteConfig()
|
|
|
+ # CACHED_DATA_PATH = config.get_config("CACHED_DATA_PATH")
|
|
|
+ print("load entity data")
|
|
|
+ # 这里设置了读取的属性
|
|
|
+ data = {"id": [], "name": [], "type": [],"is_symptom": [], "sex": [], "age": []}
|
|
|
+ with open(f"{CACHED_DATA_PATH}\\entities_med.json", "r", encoding="utf-8") as f:
|
|
|
+ entities = json.load(f)
|
|
|
+ for item in entities:
|
|
|
+ #如果id已经存在,则跳过
|
|
|
+ # if item[0] in data["id"]:
|
|
|
+ # print(f"skip {item[0]}")
|
|
|
+ # continue
|
|
|
+ data["id"].append(int(item[0]))
|
|
|
+ data["name"].append(item[1]["name"])
|
|
|
+ data["type"].append(item[1]["type"])
|
|
|
+ self._append_entity_attribute(data, item, "sex")
|
|
|
+ self._append_entity_attribute(data, item, "age")
|
|
|
+ self._append_entity_attribute(data, item, "is_symptom")
|
|
|
+ # item[1]["id"] = item[0]
|
|
|
+ # item[1]["name"] = item[0]
|
|
|
+ # attrs = item[1]
|
|
|
+ # self.graph.add_node(item[0], **attrs)
|
|
|
+ self.entity_data = pd.DataFrame(data)
|
|
|
+ self.entity_data.set_index("id", inplace=True)
|
|
|
+ print("load entity data finished")
|
|
|
+
|
|
|
+ def _append_entity_attribute(self, data, item, attr_name):
|
|
|
+ if attr_name in item[1]:
|
|
|
+ value = item[1][attr_name].split(":")
|
|
|
+ if len(value) < 2:
|
|
|
+ data[attr_name].append(value[0])
|
|
|
+ else:
|
|
|
+ data[attr_name].append(value[1])
|
|
|
+ else:
|
|
|
+ data[attr_name].append("")
|
|
|
+
|
|
|
+ def _load_relation_data(self):
|
|
|
+ config = SiteConfig()
|
|
|
+ # CACHED_DATA_PATH = config.get_config("CACHED_DATA_PATH")
|
|
|
+ print("load relationship data")
|
|
|
+
|
|
|
+ for i in range(46):
|
|
|
+ if os.path.exists(f"{CACHED_DATA_PATH}\\relationship_med_{i}.json"):
|
|
|
+ print(f"load entity data {CACHED_DATA_PATH}\\relationship_med_{i}.json")
|
|
|
+ with open(f"{CACHED_DATA_PATH}\\relationship_med_{i}.json", "r", encoding="utf-8") as f:
|
|
|
+ data = {"src": [], "dest": [], "type": [], "weight": []}
|
|
|
+ relations = json.load(f)
|
|
|
+ for item in relations:
|
|
|
+ data["src"].append(int(item[0]))
|
|
|
+ data["dest"].append(int(item[2]))
|
|
|
+ data["type"].append(item[4]["type"])
|
|
|
+ if "order" in item[4]:
|
|
|
+ order = item[4]["order"].split(":")
|
|
|
+ if len(order) < 2:
|
|
|
+ data["weight"].append(order[0])
|
|
|
+ else:
|
|
|
+ data["weight"].append(order[1])
|
|
|
+ else:
|
|
|
+ data["weight"].append(1)
|
|
|
+
|
|
|
+ self.relation_data = pd.concat([self.relation_data, pd.DataFrame(data)], ignore_index=True)
|
|
|
+
|
|
|
+ def build_graph(self):
|
|
|
+ self.entity_data = pd.DataFrame(
|
|
|
+ {"id": [], "name": [], "type": [], "sex": [], "allowed_age_range": []})
|
|
|
+ self.relation_data = pd.DataFrame({"src": [], "dest": [], "type": [], "weight": []})
|
|
|
+ self._load_entity_data()
|
|
|
+ self._load_relation_data()
|
|
|
+ self._load_local_data()
|
|
|
+
|
|
|
+ self.graph = nx.from_pandas_edgelist(self.relation_data, "src", "dest", edge_attr=True,
|
|
|
+ create_using=nx.DiGraph())
|
|
|
+
|
|
|
+ nx.set_node_attributes(self.graph, self.entity_data.to_dict(orient="index"))
|
|
|
+ # print(self.graph.in_edges('1257357',data=True))
|
|
|
+
|
|
|
+ def _load_local_data(self):
|
|
|
+ # 这里加载update数据和权重数据
|
|
|
+ config = SiteConfig()
|
|
|
+ self.update_data_path = config.get_config('UPDATE_DATA_PATH')
|
|
|
+ self.factor_data_path = config.get_config('FACTOR_DATA_PATH')
|
|
|
+ print(f"load update data from {self.update_data_path}")
|
|
|
+ for root, dirs, files in os.walk(self.update_data_path):
|
|
|
+ for file in files:
|
|
|
+ file_path = os.path.join(root, file)
|
|
|
+ if file_path.endswith(".json") and file.startswith("ent"):
|
|
|
+ self._load_update_entity_json(file_path)
|
|
|
+ if file_path.endswith(".json") and file.startswith("rel"):
|
|
|
+ self._load_update_relationship_json(file_path)
|
|
|
+
|
|
|
+ def _load_update_entity_json(self, file):
|
|
|
+ '''load json data from file'''
|
|
|
+ print(f"load entity update data from {file}")
|
|
|
+
|
|
|
+ # 这里加载update数据,update数据是一个json文件,格式同cached data如下:
|
|
|
+
|
|
|
+ with open(file, "r", encoding="utf-8") as f:
|
|
|
+ entities = json.load(f)
|
|
|
+ for item in entities:
|
|
|
+ original_data = self.entity_data[self.entity_data.index == item[0]]
|
|
|
+ if original_data.empty:
|
|
|
+ continue
|
|
|
+ original_data = original_data.iloc[0]
|
|
|
+ id = int(item[0])
|
|
|
+ name = item[1]["name"] if "name" in item[1] else original_data['name']
|
|
|
+ type = item[1]["type"] if "type" in item[1] else original_data['type']
|
|
|
+ allowed_sex_list = item[1]["allowed_sex_list"] if "allowed_sex_list" in item[1] else original_data[
|
|
|
+ 'allowed_sex_list']
|
|
|
+ allowed_age_range = item[1]["allowed_age_range"] if "allowed_age_range" in item[1] else original_data[
|
|
|
+ 'allowed_age_range']
|
|
|
+
|
|
|
+ self.entity_data.loc[id, ["name", "type", "allowed_sex_list", "allowed_age_range"]] = [name, type,
|
|
|
+ allowed_sex_list,
|
|
|
+ allowed_age_range]
|
|
|
+
|
|
|
+ def _load_update_relationship_json(self, file):
|
|
|
+ '''load json data from file'''
|
|
|
+ print(f"load relationship update data from {file}")
|
|
|
+
|
|
|
+ with open(file, "r", encoding="utf-8") as f:
|
|
|
+ relations = json.load(f)
|
|
|
+ for item in relations:
|
|
|
+ data = {}
|
|
|
+ original_data = self.relation_data[(self.relation_data['src'] == data['src']) &
|
|
|
+ (self.relation_data['dest'] == data['dest']) &
|
|
|
+ (self.relation_data['type'] == data['type'])]
|
|
|
+ if original_data.empty:
|
|
|
+ continue
|
|
|
+ original_data = original_data.iloc[0]
|
|
|
+ data["src"] = int(item[0])
|
|
|
+ data["dest"] = int(item[2])
|
|
|
+ data["type"] = item[4]["type"]
|
|
|
+ data["weight"] = item[4]["weight"] if "weight" in item[4] else original_data['weight']
|
|
|
+
|
|
|
+ self.relation_data.loc[(self.relation_data['src'] == data['src']) &
|
|
|
+ (self.relation_data['dest'] == data['dest']) &
|
|
|
+ (self.relation_data['type'] == data['type']), 'weight'] = data["weight"]
|
|
|
+
|
|
|
+ def check_sex_allowed(self, node, sex):
|
|
|
+ # 性别过滤,假设疾病节点有一个属性叫做allowed_sex_type,值为“0,1,2”,分别代表未知,男,女
|
|
|
+
|
|
|
+ sex_allowed = self.graph.nodes[node].get('sex', None)
|
|
|
+
|
|
|
+ #sexProps = self.propService.get_props_by_ref_id(node, 'sex')
|
|
|
+ #if len(sexProps) > 0 and sexProps[0]['prop_value'] is not None and sexProps[0][
|
|
|
+ #'prop_value'] != input.pat_sex.value:
|
|
|
+ #continue
|
|
|
+ if sex_allowed:
|
|
|
+ if len(sex_allowed) == 0:
|
|
|
+ # 如果性别列表为空,那么默认允许所有性别
|
|
|
+ return True
|
|
|
+ sex_allowed_list = sex_allowed.split(',')
|
|
|
+ if sex not in sex_allowed_list:
|
|
|
+ # 如果性别不匹配,跳过
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
+ def check_age_allowed(self, node, age):
|
|
|
+ # 年龄过滤,假设疾病节点有一个属性叫做allowed_age_range,值为“6-88”,代表年龄在0-88月之间是允许的
|
|
|
+ # 如果说年龄小于6岁,那么我们就认为是儿童,所以儿童的年龄范围是0-6月
|
|
|
+ age_allowed = self.graph.nodes[node].get('age', None)
|
|
|
+ if age_allowed:
|
|
|
+ if len(age_allowed) == 0:
|
|
|
+ # 如果年龄范围为空,那么默认允许所有年龄
|
|
|
+ return True
|
|
|
+ age_allowed_list = age_allowed.split('-')
|
|
|
+ age_min = int(age_allowed_list[0])
|
|
|
+ age_max = int(age_allowed_list[-1])
|
|
|
+ if age_max ==0:
|
|
|
+ return True
|
|
|
+ if age >= age_min and age < age_max:
|
|
|
+ # 如果年龄范围正常,那么返回True
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ # 如果没有设置年龄范围,那么默认返回True
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+ def check_diease_allowed(self, node):
|
|
|
+ is_symptom = self.graph.nodes[node].get('is_symptom', None)
|
|
|
+ if is_symptom == "是":
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
+ propService = KGPropService(next(get_db()))
|
|
|
+ cache = Cache()
|
|
|
+ def cdss_travel(self, input: CDSSInput, start_nodes: List, max_hops=3):
|
|
|
+ """
|
|
|
+ 基于输入的症状节点,在知识图谱中进行遍历,查找相关疾病、科室、检查和药品
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ input: CDSSInput对象,包含患者的基本信息(年龄、性别等)
|
|
|
+ start_nodes: 症状节点名称列表,作为遍历的起点
|
|
|
+ max_hops: 最大遍历深度,默认为3
|
|
|
+
|
|
|
+ 返回值:
|
|
|
+ 返回一个包含以下信息的字典:
|
|
|
+ - details: 按科室汇总的结果
|
|
|
+ - diags: 按相关性排序的疾病列表
|
|
|
+ - checks: 按出现频率排序的检查列表
|
|
|
+ - drugs: 按出现频率排序的药品列表
|
|
|
+ - total_diags: 疾病总数
|
|
|
+ - total_checks: 检查总数
|
|
|
+ - total_drugs: 药品总数
|
|
|
+
|
|
|
+ 主要步骤:
|
|
|
+ 1. 初始化允许的节点类型和关系类型
|
|
|
+ 2. 将症状名称转换为节点ID
|
|
|
+ 3. 遍历图谱查找相关疾病(STEP 1)
|
|
|
+ 4. 查找疾病对应的科室、检查和药品(STEP 2)
|
|
|
+ 5. 按科室汇总结果(STEP 3)
|
|
|
+ 6. 对结果进行排序和统计(STEP 4-6)
|
|
|
+ """
|
|
|
+ start_time = time.time()
|
|
|
+ # 定义允许的节点类型,包括科室、疾病、药品、检查和症状
|
|
|
+ # 这些类型用于后续的节点过滤和路径查找
|
|
|
+ DEPARTMENT = ['科室', 'Department']
|
|
|
+ DIESEASE = ['疾病', 'Disease']
|
|
|
+ DRUG = ['药品', 'Drug']
|
|
|
+ CHECK = ['检查', 'Check']
|
|
|
+ SYMPTOM = ['症状', 'Symptom']
|
|
|
+ #allowed_types = DEPARTMENT + DIESEASE + DRUG + CHECK + SYMPTOM
|
|
|
+ allowed_types = DEPARTMENT + DIESEASE + SYMPTOM
|
|
|
+ # 定义允许的关系类型,包括has_symptom、need_check、recommend_drug、belongs_to
|
|
|
+ # 这些关系类型用于后续的路径查找和过滤
|
|
|
+ allowed_links = ['has_symptom', 'need_check', 'recommend_drug', 'belongs_to']
|
|
|
+ # 将输入的症状名称转换为节点ID
|
|
|
+ # 由于可能存在同名节点,转换后的节点ID数量可能大于输入的症状数量
|
|
|
+ node_ids = []
|
|
|
+ node_id_names = {}
|
|
|
+
|
|
|
+ # start_nodes里面重复的症状,去重同样的症状
|
|
|
+ start_nodes = list(set(start_nodes))
|
|
|
+ for node in start_nodes:
|
|
|
+ #print(f"searching for node {node}")
|
|
|
+ result = self.entity_data[self.entity_data['name'] == node]
|
|
|
+ # print(f"searching for node {result}")
|
|
|
+ for index, data in result.iterrows():
|
|
|
+ node_id_names[index] = data["name"]
|
|
|
+ node_ids = node_ids + [index]
|
|
|
+ #print(f"start travel from {node_id_names}")
|
|
|
+
|
|
|
+ # 这里是一个队列,用于存储待遍历的症状:
|
|
|
+ node_ids_filtered = []
|
|
|
+ for node in node_ids:
|
|
|
+ if self.graph.has_node(node):
|
|
|
+ node_ids_filtered.append(node)
|
|
|
+ else:
|
|
|
+ logger.debug(f"node {node} not found")
|
|
|
+ node_ids = node_ids_filtered
|
|
|
+
|
|
|
+ end_time = time.time()
|
|
|
+ print(f"init执行完成,耗时:{end_time - start_time:.2f}秒")
|
|
|
+ start_time = time.time()
|
|
|
+ results = {}
|
|
|
+ for node in node_ids:
|
|
|
+ visited = set()
|
|
|
+ temp_results = {}
|
|
|
+ cache_key = f"symptom_{node}"
|
|
|
+ cache_data = self.cache.get(cache_key)
|
|
|
+ if cache_data:
|
|
|
+ logger.debug(f"cache hit for {cache_key}")
|
|
|
+ temp_results = cache_data
|
|
|
+
|
|
|
+ if results=={}:
|
|
|
+ results = temp_results
|
|
|
+ else:
|
|
|
+ for disease_id in temp_results:
|
|
|
+ path = temp_results[disease_id]["path"][0]
|
|
|
+ if disease_id in results.keys():
|
|
|
+ results[disease_id]["count"] = temp_results[disease_id]["count"] + 1
|
|
|
+ results[disease_id]["path"].append(path)
|
|
|
+ else:
|
|
|
+ results[disease_id] = temp_results[disease_id]
|
|
|
+
|
|
|
+
|
|
|
+ continue
|
|
|
+ queue = [(node, 0, node_id_names[node], {'allowed_types': allowed_types, 'allowed_links': allowed_links})]
|
|
|
+
|
|
|
+ # 整理input的数据,这里主要是要检查输入数据是否正确,也需要做转换
|
|
|
+ if input.pat_age.value > 0 and input.pat_age.type == 'year':
|
|
|
+ # 这里将年龄从年转换为月,因为我们的图里面的年龄都是以月为单位的
|
|
|
+ input.pat_age.value = input.pat_age.value * 12
|
|
|
+ input.pat_age.type = 'month'
|
|
|
+
|
|
|
+ # STEP 1: 假设start_nodes里面都是症状,第一步我们先找到这些症状对应的疾病
|
|
|
+ # TODO 由于这部分是按照症状逐一去寻找疾病,所以实际应用中可以缓存这些结果
|
|
|
+ while queue:
|
|
|
+ node, depth, path, data = queue.pop(0)
|
|
|
+ node = int(node)
|
|
|
+ # 这里是通过id去获取节点的name和type
|
|
|
+ entity_data = self.entity_data[self.entity_data.index == node]
|
|
|
+ # 如果节点不存在,那么跳过
|
|
|
+ if entity_data.empty:
|
|
|
+ continue
|
|
|
+ if self.graph.nodes.get(node) is None:
|
|
|
+ continue
|
|
|
+ node_type = self.entity_data[self.entity_data.index == node]['type'].tolist()[0]
|
|
|
+ node_name = self.entity_data[self.entity_data.index == node]['name'].tolist()[0]
|
|
|
+ # print(f"node {node} type {node_type}")
|
|
|
+ if node_type in DIESEASE:
|
|
|
+ # print(f"node {node} type {node_type} is a disease")
|
|
|
+ if self.check_diease_allowed(node) == False:
|
|
|
+ continue
|
|
|
+ if node in temp_results.keys():
|
|
|
+ temp_results[node]["count"] = temp_results[node]["count"] + 1
|
|
|
+ temp_results[node]["path"].append(path)
|
|
|
+ else:
|
|
|
+ temp_results[node] = {"type": node_type, "count": 1, "name": node_name, 'path': [path]}
|
|
|
+
|
|
|
+ continue
|
|
|
+
|
|
|
+ if node in visited or depth > max_hops:
|
|
|
+ # print(f"{node} already visited or reach max hops")
|
|
|
+ continue
|
|
|
+
|
|
|
+ visited.add(node)
|
|
|
+ # print(f"check edges from {node}")
|
|
|
+ if node not in self.graph:
|
|
|
+ # print(f"node {node} not found in graph")
|
|
|
+ continue
|
|
|
+ # todo 目前是取入边,出边是不是也有用?
|
|
|
+ for edge in self.graph.in_edges(node, data=True):
|
|
|
+ src, dest, edge_data = edge
|
|
|
+ if src not in visited and depth + 1 < max_hops:
|
|
|
+ # print(f"put into queue travel from {src} to {dest}")
|
|
|
+ queue.append((src, depth + 1, path, data))
|
|
|
+ # else:
|
|
|
+ # print(f"skip travel from {src} to {dest}")
|
|
|
+ self.cache.set(cache_key, temp_results)
|
|
|
+ if results == {}:
|
|
|
+ results = temp_results
|
|
|
+ else:
|
|
|
+ for disease_id in temp_results:
|
|
|
+ path = temp_results[disease_id]["path"][0]
|
|
|
+ if disease_id in results.keys():
|
|
|
+ results[disease_id]["count"] = temp_results[disease_id]["count"] + 1
|
|
|
+ results[disease_id]["path"].append(path)
|
|
|
+ else:
|
|
|
+ results[disease_id] = temp_results[disease_id]
|
|
|
+
|
|
|
+ end_time = time.time()
|
|
|
+
|
|
|
+ # 这里我们需要对结果进行过滤,过滤掉不满足条件的疾病
|
|
|
+ new_results = {}
|
|
|
+ print(len(results))
|
|
|
+ for item in results:
|
|
|
+ if self.check_sex_allowed(node, input.pat_sex.value) == False:
|
|
|
+ continue
|
|
|
+ if self.check_age_allowed(node, input.pat_age.value) == False:
|
|
|
+ continue
|
|
|
+ new_results[item] = results[item]
|
|
|
+ results = new_results
|
|
|
+ print(len(results))
|
|
|
+ print(f"STEP 1 执行完成,耗时:{end_time - start_time:.2f}秒")
|
|
|
+ print(f"STEP 1 遍历图谱查找相关疾病 finished")
|
|
|
+ # 这里输出markdonw格式日志
|
|
|
+ log_data = ["|疾病|症状|出现次数|是否相关"]
|
|
|
+ log_data.append("|--|--|--|--|")
|
|
|
+ for item in results:
|
|
|
+ data = results[item]
|
|
|
+ data['relevant'] = False
|
|
|
+ if data["count"] / len(start_nodes) > 0.5:
|
|
|
+ # 疾病有50%以上的症状出现,才认为是相关的
|
|
|
+ data['relevant'] = True
|
|
|
+ disease_name = data["name"]
|
|
|
+ key = 'disease_name_parent_' +disease_name
|
|
|
+ cached_value = self.cache.get(key)
|
|
|
+ if cached_value is None:
|
|
|
+ out_edges = self.graph.out_edges(item, data=True)
|
|
|
+
|
|
|
+ for edge in out_edges:
|
|
|
+ src, dest, edge_data = edge
|
|
|
+ if edge_data["type"] != '疾病相关父类':
|
|
|
+ continue
|
|
|
+ dest_data = self.entity_data[self.entity_data.index == dest]
|
|
|
+ if dest_data.empty:
|
|
|
+ continue
|
|
|
+ dest_name = self.entity_data[self.entity_data.index == dest]['name'].tolist()[0]
|
|
|
+ self.cache.set(key, dest_name)
|
|
|
+ break
|
|
|
+
|
|
|
+ # 如果data['relevant']为False,那么我们就不输出
|
|
|
+ if data['relevant'] == False:
|
|
|
+ continue
|
|
|
+
|
|
|
+
|
|
|
+ log_data.append(f"|{data['name']}|{','.join(data['path'])}|{data['count']}|{data['relevant']}|")
|
|
|
+
|
|
|
+ content = "疾病和症状相关性统计表格\n" + "\n".join(log_data)
|
|
|
+ #print(f"\n{content}")
|
|
|
+ # STEP 2: 找到这些疾病对应的科室,检查和药品
|
|
|
+ # 由于这部分是按照疾病逐一去寻找,所以实际应用中可以缓存这些结果
|
|
|
+ start_time = time.time()
|
|
|
+ print("STEP 2 查找疾病对应的科室、检查和药品 start")
|
|
|
+
|
|
|
+ for disease in results.keys():
|
|
|
+ # TODO 这里需要对疾病对应的科室检查药品进行加载缓存,性能可以得到很大的提升
|
|
|
+ if results[disease]["relevant"] == False:
|
|
|
+ continue
|
|
|
+ print(f"search data for {disease}:{results[disease]['name']}")
|
|
|
+ queue = []
|
|
|
+ queue.append((disease, 0, disease, {'allowed_types': DEPARTMENT, 'allowed_links': ['belongs_to']}))
|
|
|
+
|
|
|
+ # 这里尝试过将visited放倒for disease循环外面,但是会造成一些问题,性能提升也不明显,所以这里还是放在for disease循环里面
|
|
|
+ visited = set()
|
|
|
+
|
|
|
+ while queue:
|
|
|
+ node, depth, disease, data = queue.pop(0)
|
|
|
+
|
|
|
+ if node in visited or depth > max_hops:
|
|
|
+ continue
|
|
|
+ visited.add(node)
|
|
|
+
|
|
|
+ entity_data = self.entity_data[self.entity_data.index == node]
|
|
|
+
|
|
|
+ # 如果节点不存在,那么跳过
|
|
|
+ if entity_data.empty:
|
|
|
+ continue
|
|
|
+ node_type = self.entity_data[self.entity_data.index == node]['type'].tolist()[0]
|
|
|
+ node_name = self.entity_data[self.entity_data.index == node]['name'].tolist()[0]
|
|
|
+
|
|
|
+ # print(f"node {results[disease].get("name", disease)} {node_name} type {node_type}")
|
|
|
+ # node_type = self.graph.nodes[node].get('type')
|
|
|
+ if node_type in DEPARTMENT:
|
|
|
+ # 展开科室,重复次数为疾病出现的次数,为了方便后续统计
|
|
|
+ department_data = [node_name] * results[disease]["count"]
|
|
|
+ if 'department' in results[disease].keys():
|
|
|
+ results[disease]["department"] = results[disease]["department"] + department_data
|
|
|
+ else:
|
|
|
+ results[disease]["department"] = department_data
|
|
|
+ continue
|
|
|
+ # if node_type in CHECK:
|
|
|
+ # if 'check' in results[disease].keys():
|
|
|
+ # results[disease]["check"] = list(set(results[disease]["check"]+[node_name]))
|
|
|
+ # else:
|
|
|
+ # results[disease]["check"] = [node_name]
|
|
|
+ # continue
|
|
|
+ # if node_type in DRUG:
|
|
|
+ # if 'drug' in results[disease].keys():
|
|
|
+ # results[disease]["drug"] = list(set(results[disease]["drug"]+[node_name]))
|
|
|
+ # else:
|
|
|
+ # results[disease]["drug"] = [node_name]
|
|
|
+ # continue
|
|
|
+ out_edges = self.graph.out_edges(node, data=True)
|
|
|
+
|
|
|
+ for edge in out_edges:
|
|
|
+ src, dest, edge_data = edge
|
|
|
+ src_data = self.entity_data[self.entity_data.index == src]
|
|
|
+ if src_data.empty:
|
|
|
+ continue
|
|
|
+ dest_data = self.entity_data[self.entity_data.index == dest]
|
|
|
+ if dest_data.empty:
|
|
|
+ continue
|
|
|
+ src_name = self.entity_data[self.entity_data.index == src]['name'].tolist()[0]
|
|
|
+ dest_name = self.entity_data[self.entity_data.index == dest]['name'].tolist()[0]
|
|
|
+ dest_type = self.entity_data[self.entity_data.index == dest]['type'].tolist()[0]
|
|
|
+
|
|
|
+ if dest_type in allowed_types:
|
|
|
+ if dest not in visited and depth + 1 < max_hops:
|
|
|
+ # print(f"put travel request in queue from {src}:{src_name} to {dest}:{dest_name}")
|
|
|
+ queue.append((edge[1], depth + 1, disease, data))
|
|
|
+
|
|
|
+ # TODO 可以在这里将results里面的每个疾病对应的科室,检查和药品进行缓存,方便后续使用
|
|
|
+ # for item in results.keys():
|
|
|
+ # department_data = results[item].get("department", [])
|
|
|
+ # count_data = results[item].get("count")
|
|
|
+ # check_data = results[item].get("check", [])
|
|
|
+ # drug_data = results[item].get("drug", [])
|
|
|
+ # #缓存代码放在这里
|
|
|
+
|
|
|
+ print(f"STEP 2 finished")
|
|
|
+ end_time = time.time()
|
|
|
+ print(f"STEP 2 执行完成,耗时:{end_time - start_time:.2f}秒")
|
|
|
+ # 这里输出日志
|
|
|
+ log_data = ["|disease|count|department|check|drug|"]
|
|
|
+ log_data.append("|--|--|--|--|--|")
|
|
|
+ for item in results.keys():
|
|
|
+ department_data = results[item].get("department", [])
|
|
|
+ count_data = results[item].get("count")
|
|
|
+ check_data = results[item].get("check", [])
|
|
|
+ drug_data = results[item].get("drug", [])
|
|
|
+ log_data.append(
|
|
|
+ f"|{results[item].get("name", item)}|{count_data}|{','.join(department_data)}|{','.join(check_data)}|{','.join(drug_data)}|")
|
|
|
+
|
|
|
+ #print("疾病科室检查药品相关统计\n" + "\n".join(log_data))
|
|
|
+ # 日志输出完毕
|
|
|
+
|
|
|
+ # STEP 3: 对于结果按照科室维度进行汇总
|
|
|
+ start_time = time.time()
|
|
|
+ print(f"STEP 3 对于结果按照科室维度进行汇总 start")
|
|
|
+ final_results = {}
|
|
|
+ total = 0
|
|
|
+ for disease in results.keys():
|
|
|
+ disease = int(disease)
|
|
|
+ # 由于存在有些疾病没有科室的情况,所以这里需要做一下处理
|
|
|
+ departments = ['DEFAULT']
|
|
|
+ if 'department' in results[disease].keys():
|
|
|
+ departments = results[disease]["department"]
|
|
|
+ # else:
|
|
|
+ # edges = KGEdgeService(next(get_db())).get_edges_by_nodes(src_id=disease, category='belongs_to')
|
|
|
+ # #edges有可能为空,这里需要做一下处理
|
|
|
+ # if len(edges) == 0:
|
|
|
+ # continue
|
|
|
+ # departments = [edge['dest_node']['name'] for edge in edges]
|
|
|
+ # 处理查询结果
|
|
|
+ for department in departments:
|
|
|
+ total += 1
|
|
|
+ if not department in final_results.keys():
|
|
|
+ final_results[department] = {
|
|
|
+ "diseases": [results[disease].get("name", disease)],
|
|
|
+ "checks": results[disease].get("check", []),
|
|
|
+ "drugs": results[disease].get("drug", []),
|
|
|
+ "count": 1
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ final_results[department]["diseases"] = final_results[department]["diseases"] + [
|
|
|
+ results[disease].get("name", disease)]
|
|
|
+ final_results[department]["checks"] = final_results[department]["checks"] + results[disease].get(
|
|
|
+ "check", [])
|
|
|
+ final_results[department]["drugs"] = final_results[department]["drugs"] + results[disease].get(
|
|
|
+ "drug", [])
|
|
|
+ final_results[department]["count"] += 1
|
|
|
+
|
|
|
+ # 这里是统计科室出现的分布
|
|
|
+ for department in final_results.keys():
|
|
|
+ final_results[department]["score"] = final_results[department]["count"] / total
|
|
|
+
|
|
|
+ print(f"STEP 3 finished")
|
|
|
+ end_time = time.time()
|
|
|
+ print(f"STEP 3 执行完成,耗时:{end_time - start_time:.2f}秒")
|
|
|
+ # 这里输出日志
|
|
|
+ log_data = ["|department|disease|check|drug|count|score"]
|
|
|
+ log_data.append("|--|--|--|--|--|--|")
|
|
|
+ for department in final_results.keys():
|
|
|
+ diesease_data = final_results[department].get("diseases", [])
|
|
|
+ check_data = final_results[department].get("checks", [])
|
|
|
+ drug_data = final_results[department].get("drugs", [])
|
|
|
+ count_data = final_results[department].get("count", 0)
|
|
|
+ score_data = final_results[department].get("score", 0)
|
|
|
+ log_data.append(
|
|
|
+ f"|{department}|{','.join(diesease_data)}|{','.join(check_data)}|{','.join(drug_data)}|{count_data}|{score_data}|")
|
|
|
+
|
|
|
+ #print("\n" + "\n".join(log_data))
|
|
|
+
|
|
|
+ # STEP 4: 对于final_results里面的disease,checks和durgs统计出现的次数并且按照次数降序排序
|
|
|
+ print(f"STEP 4 start")
|
|
|
+ start_time = time.time()
|
|
|
+ def sort_data(data, count=5):
|
|
|
+ tmp = {}
|
|
|
+ for item in data:
|
|
|
+ if item in tmp.keys():
|
|
|
+ tmp[item]["count"] += 1
|
|
|
+ else:
|
|
|
+ tmp[item] = {"count": 1}
|
|
|
+ sorted_data = sorted(tmp.items(), key=lambda x: x[1]["count"], reverse=True)
|
|
|
+ return sorted_data[:count]
|
|
|
+
|
|
|
+ for department in final_results.keys():
|
|
|
+ final_results[department]['name'] = department
|
|
|
+ final_results[department]["diseases"] = sort_data(final_results[department]["diseases"])
|
|
|
+ final_results[department]["checks"] = sort_data(final_results[department]["checks"])
|
|
|
+ final_results[department]["drugs"] = sort_data(final_results[department]["drugs"])
|
|
|
+
|
|
|
+ # 这里把科室做一个排序,按照出现的次数降序排序
|
|
|
+ sorted_final_results = sorted(final_results.items(), key=lambda x: x[1]["count"], reverse=True)
|
|
|
+
|
|
|
+ print(f"STEP 4 finished")
|
|
|
+ end_time = time.time()
|
|
|
+ print(f"STEP 4 执行完成,耗时:{end_time - start_time:.2f}秒")
|
|
|
+ # 这里输出markdown日志
|
|
|
+ log_data = ["|department|disease|check|drug|count|score"]
|
|
|
+ log_data.append("|--|--|--|--|--|--|")
|
|
|
+ for department in final_results.keys():
|
|
|
+ diesease_data = final_results[department].get("diseases")
|
|
|
+ check_data = final_results[department].get("checks")
|
|
|
+ drug_data = final_results[department].get("drugs")
|
|
|
+ count_data = final_results[department].get("count", 0)
|
|
|
+ score_data = final_results[department].get("score", 0)
|
|
|
+ log_data.append(f"|{department}|{diesease_data}|{check_data}|{drug_data}|{count_data}|{score_data}|")
|
|
|
+
|
|
|
+ #print("\n" + "\n".join(log_data))
|
|
|
+ # STEP 5: 对于final_results里面的diseases, checks和durgs统计全局出现的次数并且按照次数降序排序
|
|
|
+ print(f"STEP 5 start")
|
|
|
+ start_time = time.time()
|
|
|
+ checks = {}
|
|
|
+ drugs = {}
|
|
|
+ diags = {}
|
|
|
+ total_check = 0
|
|
|
+ total_drug = 0
|
|
|
+ total_diags = 0
|
|
|
+ for department in final_results.keys():
|
|
|
+ # 这里是提取了科室出现的概率,对于缺省的科室设置了0.1
|
|
|
+ # 对于疾病来说用疾病在科室中出现的次数乘以科室出现的概率作为分数
|
|
|
+ if department == 'DEFAULT':
|
|
|
+ department_factor = 0.1
|
|
|
+ else:
|
|
|
+ department_factor = final_results[department]["score"]
|
|
|
+ #input.department=department时,增加权重
|
|
|
+ #todo 科室同义词都需要增加权重
|
|
|
+ if input.department.value == department:
|
|
|
+ department_factor = department_factor*1.1
|
|
|
+ for disease, data in final_results[department]["diseases"]:
|
|
|
+ total_diags += 1
|
|
|
+ key = 'disease_name_parent_' + disease
|
|
|
+ cached_value = self.cache.get(key)
|
|
|
+ if cached_value is not None:
|
|
|
+ disease = cached_value
|
|
|
+ if disease in diags.keys():
|
|
|
+ diags[disease]["count"] += data["count"]
|
|
|
+ diags[disease]["score"] += data["count"] * department_factor
|
|
|
+ else:
|
|
|
+ diags[disease] = {"count": data["count"], "score": data["count"] * department_factor}
|
|
|
+ # 对于检查和药品直接累加出现的次数
|
|
|
+ # for check, data in final_results[department]["checks"]:
|
|
|
+ # total_check += 1
|
|
|
+ # if check in checks.keys():
|
|
|
+ # checks[check]["count"] += data["count"]
|
|
|
+ # else:
|
|
|
+ # checks[check] = {"count":data["count"]}
|
|
|
+ # for drug, data in final_results[department]["drugs"]:
|
|
|
+ # total_drug += 1
|
|
|
+ # if drug in drugs.keys():
|
|
|
+ # drugs[drug]["count"] += data["count"]
|
|
|
+ # else:
|
|
|
+ # drugs[drug] = {"count":data["count"]}
|
|
|
+
|
|
|
+ sorted_score_diags = sorted(diags.items(), key=lambda x: x[1]["score"], reverse=True)
|
|
|
+
|
|
|
+ # sorted_checks = sorted(checks.items(), key=lambda x:x[1]["count"],reverse=True)
|
|
|
+ # sorted_drugs = sorted(drugs.items(), key=lambda x:x[1]["count"],reverse=True)
|
|
|
+ print(f"STEP 5 finished")
|
|
|
+ end_time = time.time()
|
|
|
+ print(f"STEP 5 执行完成,耗时:{end_time - start_time:.2f}秒")
|
|
|
+ # 这里输出markdown日志
|
|
|
+ log_data = ["|department|disease|check|drug|count|score"]
|
|
|
+ log_data.append("|--|--|--|--|--|--|")
|
|
|
+ for department in final_results.keys():
|
|
|
+ diesease_data = final_results[department].get("diseases")
|
|
|
+ # check_data = final_results[department].get("checks")
|
|
|
+ # drug_data = final_results[department].get("drugs")
|
|
|
+ count_data = final_results[department].get("count", 0)
|
|
|
+ score_data = final_results[department].get("score", 0)
|
|
|
+ log_data.append(f"|{department}|{diesease_data}|{check_data}|{drug_data}|{count_data}|{score_data}|")
|
|
|
+
|
|
|
+ #print("这里是经过排序的数据\n" + "\n".join(log_data))
|
|
|
+ # STEP 6: 整合数据并返回
|
|
|
+ # if "department" in item.keys():
|
|
|
+ # final_results["department"] = list(set(final_results["department"]+item["department"]))
|
|
|
+ # if "diseases" in item.keys():
|
|
|
+ # final_results["diseases"] = list(set(final_results["diseases"]+item["diseases"]))
|
|
|
+ # if "checks" in item.keys():
|
|
|
+ # final_results["checks"] = list(set(final_results["checks"]+item["checks"]))
|
|
|
+ # if "drugs" in item.keys():
|
|
|
+ # final_results["drugs"] = list(set(final_results["drugs"]+item["drugs"]))
|
|
|
+ # if "symptoms" in item.keys():
|
|
|
+ # final_results["symptoms"] = list(set(final_results["symptoms"]+item["symptoms"]))
|
|
|
+
|
|
|
+ return {"details": sorted_final_results,
|
|
|
+ "score_diags": sorted_score_diags,"total_diags": total_diags,
|
|
|
+ # "checks":sorted_checks, "drugs":sorted_drugs,
|
|
|
+ # "total_checks":total_check, "total_drugs":total_drug
|
|
|
+ }
|