from sqlalchemy.orm import Session from sqlalchemy.sql import select, or_, and_, func,distinct from db.schemas import * from db.models import * from pyswip import Prolog from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from config.site import POSTGRESQL_USER,POSTGRESQL_PASSWORD,POSTGRESQL_DATABASE POSTGRESQL_HOST = '127.0.0.1' DATABASE_URL = f"postgresql+psycopg2://{POSTGRESQL_USER}:{POSTGRESQL_PASSWORD}@{POSTGRESQL_HOST}/{POSTGRESQL_DATABASE}" engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() def get_db(): db = SessionLocal() try: yield db finally: db.close() db = SessionLocal() prolog = Prolog() project_dir = "D:/work/02/kg-server" prolog.consult("cdss.pl", relative_to=project_dir) # 查询Bob的母亲是谁 def get_ids(category, name): data = db.query(DbKgNode).filter(DbKgNode.name == name, DbKgNode.category==category).first() if data: lead = str.lower(category)[:3] return f"{lead}_{data.id}" return None def get_name(category, id): data = db.query(DbKgNode).filter(DbKgNode.id == id, DbKgNode.category==category).first() if data: return data.name return None def get_symptoms(): return ['咳嗽','呼吸困难','咳嗽伴胸痛'] def put_symptoms_into_prolog(data): try: list(prolog.query("clear_reported_symptoms.")) for symptom in data: id_name = get_ids('Symptom',symptom) query = f"add_reported_symptom('{id_name}')." print(f"添加症状: {symptom} {id_name}") list(prolog.query(query)) except Exception as e: print(f"put_symptoms_into_prolog failed: {e}") def list_symptoms_in_prolog(): print("验证已报告的症状...") try: list(prolog.query("list_reported_symptoms.")) except Exception as e: print(f"列出已报告的症状失败: {e}") def reason_possible_disease(): possible_diseases = list(prolog.query(f"possible_disease(DiseaseList)")) disease_score = list(prolog.query("disease_score(Score, Disease).")) dis_possible = [] dis_scores = {} for data in disease_score: score = data['Score'] id = data['Disease'] if id in dis_scores.keys(): if dis_scores[id] < score: dis_scores[id] = score else: dis_scores[id] = score if len(dis_possible) >= 10: if id in dis_possible: pass else: for i in range(len(dis_possible)): if score > dis_scores[dis_possible[i]]: dis_possible[i] = id break else: if id in dis_possible: pass else: dis_possible.append(id) return dis_possible def reason_recommend_check(dis_possible): check_data = [] for dis in dis_possible: check_list_ids = [dis] recommend_data = list(prolog.query(f"disease_check({dis}, CheckList).")) checks = recommend_data[0]['CheckList'] for item in checks: if item in check_list_ids: continue check_list_ids.append(item) check_data.append(check_list_ids) return check_data def subtotal(data): total = {} for row in data: for cell in row: if cell in total.keys(): total[cell] = total[cell]+1 else: total[cell] = 1 return total symptoms = get_symptoms() put_symptoms_into_prolog(symptoms) disease_possible = reason_possible_disease() for dis in disease_possible: parts = dis.split("_") dis_name = get_name("Disease", parts[1]) print(f"possible diseases:", dis, dis_name) recommend_checks = reason_recommend_check(disease_possible) checks_freq = subtotal(recommend_checks) top5 = [] for check in checks_freq.keys(): min_i = 0 min_value = 0 for i in range(len(top5)): if min_value == 0: min_i = i min_value = top5[i]['value'] continue if top5[i]['value'] < min_value: min_value = top5[i]['value'] min_i = i if checks_freq[check] > min_value: if len(top5) < 5: top5.append({'id': check, 'value':checks_freq[check] }) else: top5[min_i]['id'] = check top5[min_i]['value'] = checks_freq[check] else: if len(top5) < 5: top5.append({'id': check, 'value':checks_freq[check] }) # s = [] # for item in top5: # s.append(f"[{item['id']},{item['value']}]") # print(",".join(s)) for check in top5: parts = check['id'].split("_") chk_name = get_name("Check", parts[1]) print(f"possible check of {chk_name}:", check['value']) # quit() # # possible_diseases = list(prolog.query(f"diagnose(DiseaseList)")) # # # dis_possible = [] # for dis in possible_diseases[0]['DiseaseList']: # id = dis[0] # score = dis[1] # if id in dis_scores.keys(): # if dis_scores[id] < score: # dis_scores[id] = score # else: # dis_scores[id] = score # if len(dis_possible) >= 10: # if id in dis_possible: # pass # else: # for i in range(len(dis_possible)): # if score > dis_scores[dis_possible[i]]: # dis_possible[i] = id # break # else: # if id in dis_possible: # pass # else: # dis_possible.append(id) # if id in dis_names.keys(): # continue # parts = id.split("_") # dis_name = get_name("Disease", parts[1]) # dis_names[id] = dis_name # # for dis in dis_possible: # print(f"possible diseases of {symptoms}:", dis, dis_names[dis], dis_scores[dis])