123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- from sqlalchemy.orm import Session
- from sqlalchemy.sql import select, or_, and_, func,distinct
- from db.schemas import *
- from db.models import *
- from pyswip import Prolog
- from sqlalchemy import create_engine
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm import sessionmaker
- from config.site import POSTGRESQL_USER,POSTGRESQL_PASSWORD,POSTGRESQL_DATABASE
- POSTGRESQL_HOST = '127.0.0.1'
- DATABASE_URL = f"postgresql+psycopg2://{POSTGRESQL_USER}:{POSTGRESQL_PASSWORD}@{POSTGRESQL_HOST}/{POSTGRESQL_DATABASE}"
- engine = create_engine(DATABASE_URL)
- SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
- Base = declarative_base()
- def get_db():
- db = SessionLocal()
- try:
- yield db
- finally:
- db.close()
- db = SessionLocal()
- prolog = Prolog()
- project_dir = "D:/work/02/kg-server"
- prolog.consult("cdss.pl", relative_to=project_dir)
- # 查询Bob的母亲是谁
- def get_ids(category, name):
- data = db.query(DbKgNode).filter(DbKgNode.name == name, DbKgNode.category==category).first()
- if data:
- lead = str.lower(category)[:3]
- return f"{lead}_{data.id}"
- return None
- def get_name(category, id):
- data = db.query(DbKgNode).filter(DbKgNode.id == id, DbKgNode.category==category).first()
- if data:
- return data.name
- return None
- def get_symptoms():
- return ['咳嗽','呼吸困难','咳嗽伴胸痛']
- def put_symptoms_into_prolog(data):
- try:
- list(prolog.query("clear_reported_symptoms."))
- for symptom in data:
- id_name = get_ids('Symptom',symptom)
- query = f"add_reported_symptom('{id_name}')."
- print(f"添加症状: {symptom} {id_name}")
- list(prolog.query(query))
- except Exception as e:
- print(f"put_symptoms_into_prolog failed: {e}")
- def list_symptoms_in_prolog():
- print("验证已报告的症状...")
- try:
- list(prolog.query("list_reported_symptoms."))
- except Exception as e:
- print(f"列出已报告的症状失败: {e}")
- def reason_possible_disease():
- possible_diseases = list(prolog.query(f"possible_disease(DiseaseList)"))
- disease_score = list(prolog.query("disease_score(Score, Disease)."))
- dis_possible = []
- dis_scores = {}
- for data in disease_score:
- score = data['Score']
- id = data['Disease']
- if id in dis_scores.keys():
- if dis_scores[id] < score:
- dis_scores[id] = score
- else:
- dis_scores[id] = score
- if len(dis_possible) >= 10:
- if id in dis_possible:
- pass
- else:
- for i in range(len(dis_possible)):
- if score > dis_scores[dis_possible[i]]:
- dis_possible[i] = id
- break
- else:
- if id in dis_possible:
- pass
- else:
- dis_possible.append(id)
- return dis_possible
- def reason_recommend_check(dis_possible):
- check_data = []
- for dis in dis_possible:
- check_list_ids = [dis]
- recommend_data = list(prolog.query(f"disease_check({dis}, CheckList)."))
- checks = recommend_data[0]['CheckList']
- for item in checks:
- if item in check_list_ids:
- continue
- check_list_ids.append(item)
- check_data.append(check_list_ids)
- return check_data
- def subtotal(data):
- total = {}
- for row in data:
- for cell in row:
- if cell in total.keys():
- total[cell] = total[cell]+1
- else:
- total[cell] = 1
- return total
- symptoms = get_symptoms()
- put_symptoms_into_prolog(symptoms)
- disease_possible = reason_possible_disease()
- for dis in disease_possible:
- parts = dis.split("_")
- dis_name = get_name("Disease", parts[1])
- print(f"possible diseases:", dis, dis_name)
- recommend_checks = reason_recommend_check(disease_possible)
- checks_freq = subtotal(recommend_checks)
- top5 = []
- for check in checks_freq.keys():
- min_i = 0
- min_value = 0
- for i in range(len(top5)):
- if min_value == 0:
- min_i = i
- min_value = top5[i]['value']
- continue
- if top5[i]['value'] < min_value:
- min_value = top5[i]['value']
- min_i = i
- if checks_freq[check] > min_value:
- if len(top5) < 5:
- top5.append({'id': check, 'value':checks_freq[check] })
- else:
- top5[min_i]['id'] = check
- top5[min_i]['value'] = checks_freq[check]
- else:
- if len(top5) < 5:
- top5.append({'id': check, 'value':checks_freq[check] })
- # s = []
- # for item in top5:
- # s.append(f"[{item['id']},{item['value']}]")
- # print(",".join(s))
- for check in top5:
- parts = check['id'].split("_")
- chk_name = get_name("Check", parts[1])
- print(f"possible check of {chk_name}:", check['value'])
- # quit()
- #
- # possible_diseases = list(prolog.query(f"diagnose(DiseaseList)"))
- #
- #
- # dis_possible = []
- # for dis in possible_diseases[0]['DiseaseList']:
- # id = dis[0]
- # score = dis[1]
- # if id in dis_scores.keys():
- # if dis_scores[id] < score:
- # dis_scores[id] = score
- # else:
- # dis_scores[id] = score
- # if len(dis_possible) >= 10:
- # if id in dis_possible:
- # pass
- # else:
- # for i in range(len(dis_possible)):
- # if score > dis_scores[dis_possible[i]]:
- # dis_possible[i] = id
- # break
- # else:
- # if id in dis_possible:
- # pass
- # else:
- # dis_possible.append(id)
- # if id in dis_names.keys():
- # continue
- # parts = id.split("_")
- # dis_name = get_name("Disease", parts[1])
- # dis_names[id] = dis_name
- #
- # for dis in dis_possible:
- # print(f"possible diseases of {symptoms}:", dis, dis_names[dis], dis_scores[dis])
|