123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- from sqlalchemy import create_engine
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm import sessionmaker
- from config.site import POSTGRESQL_USER,POSTGRESQL_PASSWORD,POSTGRESQL_DATABASE
- from db.models import *
- POSTGRESQL_HOST = '127.0.0.1'
- DATABASE_URL = f"postgresql+psycopg2://{POSTGRESQL_USER}:{POSTGRESQL_PASSWORD}@{POSTGRESQL_HOST}/{POSTGRESQL_DATABASE}"
- engine = create_engine(DATABASE_URL)
- SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
- Base = declarative_base()
- def get_db():
- db = SessionLocal()
- try:
- yield db
- finally:
- db.close()
- db = SessionLocal()
- rules = []
- def get_disease_filename(id):
- filename = f'./prolog/dis_{id}.pl'
- return filename
- def save_disease_info(f, id):
- disease_data = {}
- datas = db.query(DbKgEdge).filter(DbKgEdge.src_id == id).order_by(DbKgEdge.category).all()
- for data in datas:
- src_type = str.lower(data.src_node.category)
- dest_type = str.lower(data.dest_node.category)
- src_lead_str = src_type[:3]
- dest_lead_str = dest_type[:3]
- obj_id = f"{dest_lead_str}_{data.dest_node.id}"
- if dest_type in disease_data.keys():
- disease_data[dest_type].append(obj_id)
- else:
- disease_data[dest_type] = [obj_id]
- # rules.append(f"{dest_type}({dest_lead_str}_{data.dest_node.id}).\n")
- # rules.append(f"{data.category}({src_lead_str}_{data.src_node.id},{dest_lead_str}_{data.dest_node.id}).\n")
- datas = db.query(DbKgEdge).filter(DbKgEdge.dest_id == id).all()
- for data in datas:
- src_type = str.lower(data.src_node.category)
- dest_type = str.lower(data.dest_node.category)
- src_lead_str = src_type[:3]
- dest_lead_str = dest_type[:3]
- obj_id = f"{src_lead_str}_{data.src_node.id}"
- if src_type in disease_data.keys():
- disease_data[src_type].append(obj_id)
- else:
- disease_data[src_type] = [obj_id]
- # rules.append(f"{src_type}({src_lead_str}_{data.src_node.id}).\n")
- # rules.append(f"{data.category}({dest_lead_str}_{data.dest_node.id},{src_lead_str}_{data.src_node.id}).\n")
- for key in disease_data.keys():
- rules.append(f"{key}(dis_{id},[" + ",".join(disease_data[key])+"]).\n")
- def fetch_disease(session):
- total = session.query(DbKgNode).filter(DbKgNode.category == 'Disease', DbKgNode.status == 0).count()
- disease_data = session.query(DbKgNode).filter(DbKgNode.category == 'Disease', DbKgNode.status == 0).all()
- main_file = open("main.pl", "w", encoding='utf-8')
- index = 1
- for data in disease_data:
- #filename = get_disease_filename(data.id)
- #main_file.write(f":- include('{filename}').\n")
- print(f"{index}:{data.name}")
- #f = open(filename, "w", encoding='utf-8')
- #f.write(f"disease(dis_{data.id}).\n")
- save_disease_info(None, data.id)
- index = index + 1
- #f.close()
- rules.sort()
- for rule in rules:
- main_file.write(rule)
- main_file.close()
- fetch_disease(db)
|