run_prolog.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. from sqlalchemy.orm import Session
  2. from sqlalchemy.sql import select, or_, and_, func,distinct
  3. from db.schemas import *
  4. from db.models import *
  5. from pyswip import Prolog
  6. from sqlalchemy import create_engine
  7. from sqlalchemy.ext.declarative import declarative_base
  8. from sqlalchemy.orm import sessionmaker
  9. from config.site import POSTGRESQL_USER,POSTGRESQL_PASSWORD,POSTGRESQL_DATABASE
  10. POSTGRESQL_HOST = '127.0.0.1'
  11. DATABASE_URL = f"postgresql+psycopg2://{POSTGRESQL_USER}:{POSTGRESQL_PASSWORD}@{POSTGRESQL_HOST}/{POSTGRESQL_DATABASE}"
  12. engine = create_engine(DATABASE_URL)
  13. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  14. Base = declarative_base()
  15. def get_db():
  16. db = SessionLocal()
  17. try:
  18. yield db
  19. finally:
  20. db.close()
  21. db = SessionLocal()
  22. prolog = Prolog()
  23. project_dir = "D:/work/02/kg-server"
  24. prolog.consult("cdss.pl", relative_to=project_dir)
  25. # 查询Bob的母亲是谁
  26. def get_ids(category, name):
  27. data = db.query(DbKgNode).filter(DbKgNode.name == name, DbKgNode.category==category).first()
  28. if data:
  29. lead = str.lower(category)[:3]
  30. return f"{lead}_{data.id}"
  31. return None
  32. def get_name(category, id):
  33. data = db.query(DbKgNode).filter(DbKgNode.id == id, DbKgNode.category==category).first()
  34. if data:
  35. return data.name
  36. return None
  37. def get_symptoms():
  38. return ['咳嗽','呼吸困难','咳嗽伴胸痛']
  39. def put_symptoms_into_prolog(data):
  40. try:
  41. list(prolog.query("clear_reported_symptoms."))
  42. for symptom in data:
  43. id_name = get_ids('Symptom',symptom)
  44. query = f"add_reported_symptom('{id_name}')."
  45. print(f"添加症状: {symptom} {id_name}")
  46. list(prolog.query(query))
  47. except Exception as e:
  48. print(f"put_symptoms_into_prolog failed: {e}")
  49. def list_symptoms_in_prolog():
  50. print("验证已报告的症状...")
  51. try:
  52. list(prolog.query("list_reported_symptoms."))
  53. except Exception as e:
  54. print(f"列出已报告的症状失败: {e}")
  55. def reason_possible_disease():
  56. possible_diseases = list(prolog.query(f"possible_disease(DiseaseList)"))
  57. disease_score = list(prolog.query("disease_score(Score, Disease)."))
  58. dis_possible = []
  59. dis_scores = {}
  60. for data in disease_score:
  61. score = data['Score']
  62. id = data['Disease']
  63. if id in dis_scores.keys():
  64. if dis_scores[id] < score:
  65. dis_scores[id] = score
  66. else:
  67. dis_scores[id] = score
  68. if len(dis_possible) >= 10:
  69. if id in dis_possible:
  70. pass
  71. else:
  72. for i in range(len(dis_possible)):
  73. if score > dis_scores[dis_possible[i]]:
  74. dis_possible[i] = id
  75. break
  76. else:
  77. if id in dis_possible:
  78. pass
  79. else:
  80. dis_possible.append(id)
  81. return dis_possible
  82. def reason_recommend_check(dis_possible):
  83. check_data = []
  84. for dis in dis_possible:
  85. check_list_ids = [dis]
  86. recommend_data = list(prolog.query(f"disease_check({dis}, CheckList)."))
  87. checks = recommend_data[0]['CheckList']
  88. for item in checks:
  89. if item in check_list_ids:
  90. continue
  91. check_list_ids.append(item)
  92. check_data.append(check_list_ids)
  93. return check_data
  94. def subtotal(data):
  95. total = {}
  96. for row in data:
  97. for cell in row:
  98. if cell in total.keys():
  99. total[cell] = total[cell]+1
  100. else:
  101. total[cell] = 1
  102. return total
  103. symptoms = get_symptoms()
  104. put_symptoms_into_prolog(symptoms)
  105. disease_possible = reason_possible_disease()
  106. for dis in disease_possible:
  107. parts = dis.split("_")
  108. dis_name = get_name("Disease", parts[1])
  109. print(f"possible diseases:", dis, dis_name)
  110. recommend_checks = reason_recommend_check(disease_possible)
  111. checks_freq = subtotal(recommend_checks)
  112. top5 = []
  113. for check in checks_freq.keys():
  114. min_i = 0
  115. min_value = 0
  116. for i in range(len(top5)):
  117. if min_value == 0:
  118. min_i = i
  119. min_value = top5[i]['value']
  120. continue
  121. if top5[i]['value'] < min_value:
  122. min_value = top5[i]['value']
  123. min_i = i
  124. if checks_freq[check] > min_value:
  125. if len(top5) < 5:
  126. top5.append({'id': check, 'value':checks_freq[check] })
  127. else:
  128. top5[min_i]['id'] = check
  129. top5[min_i]['value'] = checks_freq[check]
  130. else:
  131. if len(top5) < 5:
  132. top5.append({'id': check, 'value':checks_freq[check] })
  133. # s = []
  134. # for item in top5:
  135. # s.append(f"[{item['id']},{item['value']}]")
  136. # print(",".join(s))
  137. for check in top5:
  138. parts = check['id'].split("_")
  139. chk_name = get_name("Check", parts[1])
  140. print(f"possible check of {chk_name}:", check['value'])
  141. # quit()
  142. #
  143. # possible_diseases = list(prolog.query(f"diagnose(DiseaseList)"))
  144. #
  145. #
  146. # dis_possible = []
  147. # for dis in possible_diseases[0]['DiseaseList']:
  148. # id = dis[0]
  149. # score = dis[1]
  150. # if id in dis_scores.keys():
  151. # if dis_scores[id] < score:
  152. # dis_scores[id] = score
  153. # else:
  154. # dis_scores[id] = score
  155. # if len(dis_possible) >= 10:
  156. # if id in dis_possible:
  157. # pass
  158. # else:
  159. # for i in range(len(dis_possible)):
  160. # if score > dis_scores[dis_possible[i]]:
  161. # dis_possible[i] = id
  162. # break
  163. # else:
  164. # if id in dis_possible:
  165. # pass
  166. # else:
  167. # dis_possible.append(id)
  168. # if id in dis_names.keys():
  169. # continue
  170. # parts = id.split("_")
  171. # dis_name = get_name("Disease", parts[1])
  172. # dis_names[id] = dis_name
  173. #
  174. # for dis in dis_possible:
  175. # print(f"possible diseases of {symptoms}:", dis, dis_names[dis], dis_scores[dis])