import os import sys import logging import json current_path = os.getcwd() sys.path.append(current_path) from libs.graph_helper import GraphHelper from typing import List from cdss.models.schemas import CDSSInput from config.site import SiteConfig import networkx as nx import pandas as pd logger = logging.getLogger(__name__) class CDSSHelper(GraphHelper): def node_search(self, node_id=None, node_type=None, filters=None, limit=1, min_degree=None): """节点检索功能""" es_result = self.es.search_title_index("graph_entity_index", node_id, limit) results = [] for item in es_result: score = item["score"] results.append({ 'id': item["title"], 'score': score, "name": item["title"], }) return results def _load_entity_data(self): config = SiteConfig() CACHED_DATA_PATH = config.get_config("CACHED_DATA_PATH") logger.info("load entity data") #这里设置了读取的属性 data = {"id":[], "name":[], "type":[], "allowed_sex_list":[], "allowed_age_range":[]} with open(f"{CACHED_DATA_PATH}\\entities_med.json", "r", encoding="utf-8") as f: entities = json.load(f) for item in entities: data["id"].append(int(item[0])) data["name"].append(item[1]["name"]) data["type"].append(item[1]["type"]) data["allowed_sex_liste"].append(item[1]["allowed_sex_list"]) if "allowed_sex_list" in item[1] else data["allowed_sex_list"].append("") data["allowed_age_range"].append(item[1]["allowed_age_range"]) if "allowed_age_range" in item[1] else data["allowed_age_range"].append("") #item[1]["id"] = item[0] #item[1]["name"] = item[0] #attrs = item[1] #self.graph.add_node(item[0], **attrs) self.entity_data = pd.DataFrame(data) self.entity_data.set_index("id", inplace=True) logger.info("load entity data finished") def _load_relation_data(self): config = SiteConfig() CACHED_DATA_PATH = config.get_config("CACHED_DATA_PATH") logger.info("load relationship data") for i in range(99): if os.path.exists(f"{CACHED_DATA_PATH}\\relationship_med_{i}.json"): logger.info(f"load entity data {CACHED_DATA_PATH}\\relationship_med_{i}.json") with open(f"{CACHED_DATA_PATH}\\relationship_med_{i}.json", "r", encoding="utf-8") as f: data = {"src":[], "dest":[], "type":[], "weight":[]} relations = json.load(f) for item in relations: data["src"].append(int(item[0])) data["dest"].append(int(item[2])) if data['src'] == 2969539 or data['dest'] == 2969539: print(">>>>>>>> FOUND 2969539") data["type"].append(item[4]["type"]) data["weight"].append(item[4]["weight"]) if "weight" in item[4] else data["weight"].append(1) self.relation_data = pd.concat([self.relation_data, pd.DataFrame(data)], ignore_index=True) def build_graph(self): self.entity_data = pd.DataFrame({"id":[],"name":[], "type":[], "allowed_sex_list":[], "allowed_age_range":[]}) self.relation_data = pd.DataFrame({"src":[], "dest":[], "type":[], "weight":[]}) self._load_entity_data() self._load_relation_data() self._load_local_data() self.graph = nx.from_pandas_edgelist(self.relation_data, "src", "dest", edge_attr=True, create_using=nx.DiGraph()) nx.set_node_attributes(self.graph, self.entity_data.to_dict(orient="index")) #print(self.graph.in_edges('1257357',data=True)) def _load_local_data(self): #这里加载update数据和权重数据 config = SiteConfig() self.update_data_path = config.get_config('UPDATE_DATA_PATH') self.factor_data_path = config.get_config('FACTOR_DATA_PATH') logger.info(f"load update data from {self.update_data_path}") for root, dirs, files in os.walk(self.update_data_path): for file in files: file_path = os.path.join(root, file) if file_path.endswith(".json") and file.startswith("ent"): self._load_update_entity_json(file_path) if file_path.endswith(".json") and file.startswith("rel"): self._load_update_relationship_json(file_path) def _load_update_entity_json(self, file): '''load json data from file''' logger.info(f"load entity update data from {file}") #这里加载update数据,update数据是一个json文件,格式同cached data如下: with open(file, "r", encoding="utf-8") as f: entities = json.load(f) for item in entities: original_data = self.entity_data[self.entity_data.index==item[0]] if original_data.empty: continue original_data = original_data.iloc[0] id=int(item[0]) name = item[1]["name"] if "name" in item[1] else original_data['name'] type = item[1]["type"] if "type" in item[1] else original_data['type'] allowed_sex_liste = item[1]["allowed_sex_list"] if "allowed_sex_list" in item[1] else original_data['allowed_sex_list'] allowed_age_range = item[1]["allowed_age_range"] if "allowed_age_range" in item[1] else original_data['allowed_age_range'] self.entity_data.loc[id,["name", "type", "allowed_sex_list","allowed_age_range"]] = [name, type, allowed_sex_liste, allowed_age_range] def _load_update_relationship_json(self, file): '''load json data from file''' logger.info(f"load relationship update data from {file}") with open(file, "r", encoding="utf-8") as f: relations = json.load(f) for item in relations: data = {} original_data = self.relation_data[(self.relation_data['src']==data['src']) & (self.relation_data['dest']==data['dest']) & (self.relation_data['type']==data['type'])] if original_data.empty: continue original_data = original_data.iloc[0] data["src"] = int(item[0]) data["dest"]= int(item[2]) data["type"]= item[4]["type"] data["weight"]=item[4]["weight"] if "weight" in item[4] else original_data['weight'] self.relation_data.loc[(self.relation_data['src']==data['src']) & (self.relation_data['dest']==data['dest']) & (self.relation_data['type']==data['type']), 'weight'] = data["weight"] def check_sex_allowed(self, node, sex): #性别过滤,假设疾病节点有一个属性叫做allowed_sex_type,值为“0,1,2”,分别代表未知,男,女 sex_allowed = self.graph.nodes[node].get('allowed_sex_list', None) if sex_allowed: if len(sex_allowed) == 0: #如果性别列表为空,那么默认允许所有性别 return True sex_allowed_list = sex_allowed.split(',') if sex not in sex_allowed_list: #如果性别不匹配,跳过 return False return True def check_age_allowed(self, node, age): #年龄过滤,假设疾病节点有一个属性叫做allowed_age_range,值为“6-88”,代表年龄在0-88月之间是允许的 #如果说年龄小于6岁,那么我们就认为是儿童,所以儿童的年龄范围是0-6月 age_allowed = self.graph.nodes[node].get('allowed_age_range', None) if age_allowed: if len(age_allowed) == 0: #如果年龄范围为空,那么默认允许所有年龄 return True age_allowed_list = age_allowed.split('-') age_min = int(age_allowed_list[0]) age_max = int(age_allowed_list[-1]) if age >= age_min and age < age_max: #如果年龄范围正常,那么返回True return True else: #如果没有设置年龄范围,那么默认返回True return True return False def cdss_travel(self, input:CDSSInput, start_nodes:List, max_hops=3): #这里设置了节点的type取值范围,可以根据实际情况进行修改,允许出现多个类型 DEPARTMENT=['科室','Department'] DIESEASE=['疾病','Disease'] DRUG=['药品','Drug'] CHECK=['检查','Check'] SYMPTOM=['症状','Symptom'] allowed_types = DEPARTMENT + DIESEASE+ DRUG + CHECK + SYMPTOM #这里设置了边的type取值范围,可以根据实际情况进行修改,允许出现多个类型 #不过后面的代码里面没有对边的type进行过滤,所以这里是留做以后扩展的 allowed_links = ['has_symptom', 'need_check', 'recommend_drug', 'belongs_to'] #这里要将用户输入的文本转换成节点id,由于存在同名节点的情况,所以实际node_ids的数量会大于start_nodes的数量 node_ids = [] node_id_names = {} for node in start_nodes: logger.debug(f"searching for node {node}") result = self.entity_data[self.entity_data['name'] == node] for index, data in result.iterrows(): node_id_names[index] = data["name"] node_ids = node_ids + [index] logger.info(f"start travel from {node_id_names}") #这里是一个队列,用于存储待遍历的症状: node_ids_filtered = [] for node in node_ids: if self.graph.has_node(node): node_ids_filtered.append(node) else: logger.debug(f"node {node} not found") node_ids = node_ids_filtered queue = [(node, 0, node_id_names[node], {'allowed_types': allowed_types, 'allowed_links':allowed_links}) for node in node_ids] visited = set() results = {} #整理input的数据,这里主要是要检查输入数据是否正确,也需要做转换 if input.pat_age.value > 0 and input.pat_age.type == 'year': #这里将年龄从年转换为月,因为我们的图里面的年龄都是以月为单位的 input.pat_age.value = input.pat_age.value * 12 input.pat_age.type = 'month' #STEP 1: 假设start_nodes里面都是症状,第一步我们先找到这些症状对应的疾病 #TODO 由于这部分是按照症状逐一去寻找疾病,所以实际应用中可以缓存这些结果 while queue: node, depth, path, data = queue.pop(0) #这里是通过id去获取节点的name和type node_type = self.entity_data[self.entity_data.index == node]['type'].tolist()[0] node_name = self.entity_data[self.entity_data.index == node]['name'].tolist()[0] logger.debug(f"node {node} type {node_type}") if node_type in DIESEASE: logger.debug(f"node {node} type {node_type} is a disease") if self.check_sex_allowed(node, input.pat_sex.value) == False: continue if self.check_age_allowed(node, input.pat_age.value) == False: continue if node in results.keys(): results[node]["count"] = results[node]["count"] + 1 results[node]["path"].append(path) else: results[node] = {"type": node_type, "count":1, "name":node_name, 'path':[path]} continue if node in visited or depth > max_hops: logger.debug(f"{node} already visited or reach max hops") continue visited.add(node) logger.debug(f"check edges from {node}") for edge in self.graph.in_edges(node, data=True): src, dest, edge_data = edge if src not in visited and depth + 1 < max_hops: logger.debug(f"put into queue travel from {src} to {dest}") queue.append((src, depth + 1, path, data)) else: logger.debug(f"skip travel from {src} to {dest}") #print("-" * (indent+4), f"start travel from {src} to {dest}") logger.info(f"STEP 1 finished") #这里输出markdonw格式日志 log_data = ["|疾病|症状|出现次数|是否相关"] log_data.append("|--|--|--|--|") for item in results: data = results[item] data['relevant'] = False if data["count"] / len(start_nodes) > 0.5: #疾病有50%以上的症状出现,才认为是相关的 data['relevant'] = True log_data.append(f"|{data['name']}|{','.join(data['path'])}|{data['count']}|{data['relevant']}|") content = "疾病和症状相关性统计表格\n"+"\n".join(log_data) logger.debug(f"\n{content}") #STEP 2: 找到这些疾病对应的科室,检查和药品 #由于这部分是按照疾病逐一去寻找,所以实际应用中可以缓存这些结果 logger.info("STEP 2 start") for disease in results.keys(): #TODO 这里需要对疾病对应的科室检查药品进行加载缓存,性能可以得到很大的提升 if results[disease]["relevant"] == False: continue logger.debug(f"search data for {disease}:{results[disease]['name']}") queue = [] queue.append((disease, 0, disease, {'allowed_types': DEPARTMENT, 'allowed_links':['belongs_to']})) #这里尝试过将visited放倒for disease循环外面,但是会造成一些问题,性能提升也不明显,所以这里还是放在for disease循环里面 visited = set() while queue: node, depth, disease, data = queue.pop(0) if node in visited or depth > max_hops: continue visited.add(node) node_type = self.entity_data[self.entity_data.index == node]['type'].tolist()[0] node_name = self.entity_data[self.entity_data.index == node]['name'].tolist()[0] logger.debug(f"node {results[disease].get("name", disease)} {node_name} type {node_type}") #node_type = self.graph.nodes[node].get('type') if node_type in DEPARTMENT: #展开科室,重复次数为疾病出现的次数,为了方便后续统计 department_data = [node_name] * results[disease]["count"] if 'department' in results[disease].keys(): results[disease]["department"] = results[disease]["department"] + department_data else: results[disease]["department"] = department_data continue if node_type in CHECK: if 'check' in results[disease].keys(): results[disease]["check"] = list(set(results[disease]["check"]+[node_name])) else: results[disease]["check"] = [node_name] continue if node_type in DRUG: if 'drug' in results[disease].keys(): results[disease]["drug"] = list(set(results[disease]["drug"]+[node_name])) else: results[disease]["drug"] = [node_name] continue for edge in self.graph.out_edges(node, data=True): src, dest, edge_data = edge src_name = self.entity_data[self.entity_data.index == src]['name'].tolist()[0] dest_name = self.entity_data[self.entity_data.index == dest]['name'].tolist()[0] dest_type = self.entity_data[self.entity_data.index == dest]['type'].tolist()[0] if dest_type in allowed_types: if dest not in visited and depth + 1 < max_hops: logger.debug(f"put travel request in queue from {src}:{src_name} to {dest}:{dest_name}") queue.append((edge[1], depth + 1, disease, data)) #TODO 可以在这里将results里面的每个疾病对应的科室,检查和药品进行缓存,方便后续使用 # for item in results.keys(): # department_data = results[item].get("department", []) # count_data = results[item].get("count") # check_data = results[item].get("check", []) # drug_data = results[item].get("drug", []) # #缓存代码放在这里 logger.info(f"STEP 2 finished") #这里输出日志 log_data = ["|disease|count|department|check|drug|"] log_data.append("|--|--|--|--|--|") for item in results.keys(): department_data = results[item].get("department", []) count_data = results[item].get("count") check_data = results[item].get("check", []) drug_data = results[item].get("drug", []) log_data.append(f"|{results[item].get("name", item)}|{count_data}|{','.join(department_data)}|{','.join(check_data)}|{','.join(drug_data)}|") logger.debug("疾病科室检查药品相关统计\n"+"\n".join(log_data)) #日志输出完毕 #STEP 3: 对于结果按照科室维度进行汇总 logger.info(f"STEP 3 start") final_results = {} total = 0 for disease in results.keys(): #由于存在有些疾病没有科室的情况,所以这里需要做一下处理 departments = ['DEFAULT'] if 'department' in results[disease].keys(): departments = results[disease]["department"] for department in departments: total += 1 if not department in final_results.keys(): final_results[department] = { "diseases": [results[disease].get("name",disease)], "checks": results[disease].get("check",[]), "drugs": results[disease].get("drug",[]), "count": 1 } else: final_results[department]["diseases"] = final_results[department]["diseases"]+[results[disease].get("name",disease)] final_results[department]["checks"] = final_results[department]["checks"]+results[disease].get("check",[]) final_results[department]["drugs"] = final_results[department]["drugs"]+results[disease].get("drug",[]) final_results[department]["count"] += 1 #这里是统计科室出现的分布 for department in final_results.keys(): final_results[department]["score"] = final_results[department]["count"] / total logger.info(f"STEP 3 finished") #这里输出日志 log_data = ["|department|disease|check|drug|count|score"] log_data.append("|--|--|--|--|--|--|") for department in final_results.keys(): diesease_data = final_results[department].get("diseases", []) check_data = final_results[department].get("checks", []) drug_data = final_results[department].get("drugs", []) count_data = final_results[department].get("count", 0) score_data = final_results[department].get("score", 0) log_data.append(f"|{department}|{','.join(diesease_data)}|{','.join(check_data)}|{','.join(drug_data)}|{count_data}|{score_data}|") logger.debug("\n"+"\n".join(log_data)) #STEP 4: 对于final_results里面的disease,checks和durgs统计出现的次数并且按照次数降序排序 logger.info(f"STEP 4 start") def sort_data(data, count=5): tmp = {} for item in data: if item in tmp.keys(): tmp[item]["count"] +=1 else: tmp[item] = {"count":1} sorted_data = sorted(tmp.items(), key=lambda x:x[1]["count"],reverse=True) return sorted_data[:count] for department in final_results.keys(): final_results[department]['name'] = department final_results[department]["diseases"] = sort_data(final_results[department]["diseases"]) final_results[department]["checks"] = sort_data(final_results[department]["checks"]) final_results[department]["drugs"] = sort_data(final_results[department]["drugs"]) #这里把科室做一个排序,按照出现的次数降序排序 sorted_final_results = sorted(final_results.items(), key=lambda x:x[1]["count"],reverse=True) logger.info(f"STEP 4 finished") #这里输出markdown日志 log_data = ["|department|disease|check|drug|count|score"] log_data.append("|--|--|--|--|--|--|") for department in final_results.keys(): diesease_data = final_results[department].get("diseases") check_data = final_results[department].get("checks") drug_data = final_results[department].get("drugs") count_data = final_results[department].get("count", 0) score_data = final_results[department].get("score", 0) log_data.append(f"|{department}|{diesease_data}|{check_data}|{drug_data}|{count_data}|{score_data}|") logger.debug("\n"+"\n".join(log_data)) #STEP 5: 对于final_results里面的diseases, checks和durgs统计全局出现的次数并且按照次数降序排序 logger.info(f"STEP 5 start") checks = {} drugs = {} diags = {} total_check = 0 total_drug = 0 total_diags = 0 for department in final_results.keys(): #这里是提取了科室出现的概率,对于缺省的科室设置了0.1 #对于疾病来说用疾病在科室中出现的次数乘以科室出现的概率作为分数 department_factor = 0.1 if department == 'DEFAULT' else final_results[department]["score"] for disease, data in final_results[department]["diseases"]: total_diags += 1 if disease in diags.keys(): diags[disease]["count"] += data["count"] diags[disease]["score"] += data["count"] * department_factor else: diags[disease] = {"count":data["count"], "score":data["count"] * department_factor} #对于检查和药品直接累加出现的次数 for check, data in final_results[department]["checks"]: total_check += 1 if check in checks.keys(): checks[check]["count"] += data["count"] else: checks[check] = {"count":data["count"]} for drug, data in final_results[department]["drugs"]: total_drug += 1 if drug in drugs.keys(): drugs[drug]["count"] += data["count"] else: drugs[drug] = {"count":data["count"]} sorted_diags = sorted(diags.items(), key=lambda x:x[1]["score"],reverse=True) sorted_checks = sorted(checks.items(), key=lambda x:x[1]["count"],reverse=True) sorted_drugs = sorted(drugs.items(), key=lambda x:x[1]["count"],reverse=True) logger.info(f"STEP 5 finished") #这里输出markdown日志 log_data = ["|department|disease|check|drug|count|score"] log_data.append("|--|--|--|--|--|--|") for department in final_results.keys(): diesease_data = final_results[department].get("diseases") check_data = final_results[department].get("checks") drug_data = final_results[department].get("drugs") count_data = final_results[department].get("count", 0) score_data = final_results[department].get("score", 0) log_data.append(f"|{department}|{diesease_data}|{check_data}|{drug_data}|{count_data}|{score_data}|") logger.debug("这里是经过排序的数据\n"+"\n".join(log_data)) #STEP 6: 整合数据并返回 # if "department" in item.keys(): # final_results["department"] = list(set(final_results["department"]+item["department"])) # if "diseases" in item.keys(): # final_results["diseases"] = list(set(final_results["diseases"]+item["diseases"])) # if "checks" in item.keys(): # final_results["checks"] = list(set(final_results["checks"]+item["checks"])) # if "drugs" in item.keys(): # final_results["drugs"] = list(set(final_results["drugs"]+item["drugs"])) # if "symptoms" in item.keys(): # final_results["symptoms"] = list(set(final_results["symptoms"]+item["symptoms"])) return {"details":sorted_final_results, "diags":sorted_diags, "total_diags":total_diags, "checks":sorted_checks, "drugs":sorted_drugs, "total_checks":total_check, "total_drugs":total_drug}