from sqlalchemy.orm import Session from typing import Optional from model.kg_node import KGNode from db.session import get_db import logging from sqlalchemy.exc import IntegrityError from service.kg_node_service import KGNodeService # from tests.service.test_kg_node_service import kg_node_service from utils import vectorizer from utils.vectorizer import Vectorizer from sqlalchemy import func from service.kg_prop_service import KGPropService from service.kg_edge_service import KGEdgeService from cachetools import TTLCache from cachetools.keys import hashkey logger = logging.getLogger(__name__) DISTANCE_THRESHOLD = 0.65 class CdssService: _cache = TTLCache(maxsize=100000, ttl=60*60*24*30) kgPropService = KGPropService(next(get_db())) def get_disease_detail(self, diease_name: str,category: str): nodeService = KGNodeService(next(get_db())) diseases = nodeService.search_title_index("", diease_name, category, 1, 0) if diseases is None: return None disease_id = diseases[0]["id"] edgeService = KGEdgeService(next(get_db())) #category='疾病相关实验室检查项目' laboratoryTests = edgeService.get_edges_by_nodes(src_id=disease_id, category='疾病相关实验室检查项目') #category='疾病相关辅助检查项目' examinations = edgeService.get_edges_by_nodes(src_id=disease_id, category='疾病相关辅助检查项目') # category='疾病相关鉴别诊断' differential_diagnosis = edgeService.get_edges_by_nodes(src_id=disease_id, category='疾病相关鉴别诊断') if laboratoryTests: self.add_props_to_edges(laboratoryTests,True,['sysjclcmd','timelimit']) if examinations: self.add_props_to_edges(examinations,True,['fzjclcmd','timelimit']) if differential_diagnosis: self.add_props_to_edges(differential_diagnosis, True, ['keydiagnosis']) return { 'disease':diseases[0], 'laboratory_tests': laboratoryTests, 'examinations': examinations, 'differential_diagnosis': differential_diagnosis, 'treatments':self.get_disease_treatments(disease_id) } def get_disease_treatments(self, disease_id: str): treatment = self.kgPropService.get_props_by_ref_id(disease_id, ['distreatment']) drug_treatments = [] non_drug_treatments = [] if treatment and treatment[0] and treatment[0]['prop_value']: treatments = treatment[0]['prop_value'].split('^|~|^') type_groups = {} for temp in treatments: splits = temp.split('---') splits_len = len(splits) if splits_len >= 5: if splits_len == 5: type_key = tuple([splits[1], splits[-3]]) else: type_key = tuple([splits[1], splits[-4], splits[-3]]) if type_key not in type_groups: type_groups[type_key] = [] type_groups[type_key].append({ 'order': splits[0], 'name': splits[-2], 'reference': splits[-1], 'type': splits[1:-2] }) # 将type_groups转换为树状结构 tree = {} for type_key, items in type_groups.items(): current_level = tree for level in type_key: if level not in current_level: current_level[level] = {} current_level = current_level[level] # 对叶子节点按照order升序排列 current_level['items'] = sorted(items, key=lambda x: int(x['order'])) return tree def add_props_to_edges(self, edges,edge_prop,prop_names=None): for edge in edges: if edge_prop: props = self.kgPropService.get_props_by_ref_id(edge['id'],prop_names) else: props = self.kgPropService.get_props_by_ref_id(edge['dest_node']['id'],prop_names) if props: for p in props: if p['prop_name']=='timelimit': p['prop_value'] = p['prop_value'].split('---')[0] continue p['prop_value'] = p['prop_value'].split('^|~|^') if edge_prop: edge['props'] = props else: edge['dest_node']['props'] = props