''' 这个脚本是用来从postgre数据库中导出图谱数据到json文件的。 ''' import sys,os current_path = os.getcwd() sys.path.append(current_path) from sqlalchemy import text from sqlalchemy.orm import Session import json #这个是数据库的连接 from db.session import SessionLocal #两个会话,分别是读取节点和属性的 db = SessionLocal() prop = SessionLocal() def get_props(ref_id): props = {} sql = """select prop_name, prop_value,prop_title from kg_props where ref_id=:ref_id""" result = prop.execute(text(sql), {'ref_id':ref_id}) for record in result: prop_name, prop_value,prop_title = record props[prop_name] = prop_title + ":" +prop_value return props def get_entities(): #COUNT_SQL = "select count(*) from kg_nodes where version=:version" COUNT_SQL = "select count(*) from kg_nodes where status=0" result = db.execute(text(COUNT_SQL)) count = result.scalar() print("total nodes: ", count) entities = [] batch = 100 start = 1 while start < count: #sql = """select id,name,category from kg_nodes where version=:version order by id limit :batch OFFSET :start""" sql = """select id,name,category from kg_nodes where status=0 order by id limit :batch OFFSET :start""" result = db.execute(text(sql), {'start':start, 'batch':batch}) #["发热",{"type":"症状","description":"发热是诊断的主要目的,用于明确发热病因。"}] row_count = 0 for row in result: id,name,category = row props = get_props(id) entities.append([id,{"name":name, 'type':category,'description':'', **props}]) row_count += 1 if row_count == 0: break start = start + row_count print("start: ", start, "row_count: ", row_count) with open(current_path+"\\entities_med.json", "w", encoding="utf-8") as f: f.write(json.dumps(entities, ensure_ascii=False,indent=4)) def get_names(src_id, dest_id): sql = """select id,name,category from kg_nodes where id = :src_id""" result = db.execute(text(sql), {'src_id':src_id}).first() print(result) if result is None: #返回空 return (src_id, "", "", dest_id, "", "") id,src_name,src_category = result result = db.execute(text(sql), {'src_id':dest_id}).first() id,dest_name,dest_category = result return (src_id, src_name, src_category, dest_id, dest_name, dest_category) def get_relationships(): #COUNT_SQL = "select count(*) from kg_edges where version=:version" COUNT_SQL = "select count(*) from kg_edges where status=0" result = db.execute(text(COUNT_SQL)) count = result.scalar() print("total edges: ", count) edges = [] batch = 1000 start = 1 file_index = 1 while start < count: #sql = """select id,name,category,src_id,dest_id from kg_edges where version=:version order by id limit :batch OFFSET :start""" sql = """select id,name,category,src_id,dest_id from kg_edges where status=0 order by id limit :batch OFFSET :start""" result = db.execute(text(sql), {'start':start, 'batch':batch}) #["发热",{"type":"症状","description":"发热是诊断的主要目的,用于明确发热病因。"}] row_count = 0 for row in result: id,name,category,src_id,dest_id = row props = get_props(id) #如果get_names异常,跳过 try: src_id, src_name, src_category, dest_id, dest_name, dest_category = get_names(src_id, dest_id) except Exception as e: print(e) print("src_id: ", src_id, "dest_id: ", dest_id) continue #src_name或dest_name为空,说明节点不存在,跳过 if src_name == "" or dest_name == "": continue edges.append([src_id, {"id":src_id, "name":src_name, "type":src_category}, dest_id,{"id":dest_id,"name":dest_name,"type":dest_category}, {'type':category,'name':name, **props}]) row_count += 1 if row_count == 0: break start = start + row_count print("start: ", start, "row_count: ", row_count) if len(edges) > 10000: with open(current_path+f"\\relationship_med_{file_index}.json", "w", encoding="utf-8") as f: f.write(json.dumps(edges, ensure_ascii=False,indent=4)) edges = [] file_index += 1 with open(current_path+"\\relationship_med_0.json", "w", encoding="utf-8") as f: f.write(json.dumps(edges, ensure_ascii=False,indent=4)) if __name__ == "__main__": #导出节点数据 get_entities() #导出关系数据 get_relationships()