|
- from fastapi import APIRouter, Depends, HTTPException
- from sqlalchemy.orm import Session
- from sqlalchemy.sql import select, or_, and_, func,distinct
- from agent.models.web.graph import *
- from agent.models.db.graph import *
- from db.database import get_db
- from agent.libs.response import resp_200
- from typing import List
- from math import ceil
- import json
- from datetime import datetime
- router = APIRouter(prefix="/graph-mgr", tags=["agent job interface"])
- def nodes_append(list, node):
- for n in list:
- if n['id'] == node["id"]:
- return
- list.append(node)
- ################################### GRAPH ###############################################
- @router.get("/graph-list/{page}/{page_size}")
- def graph_list(page: int, page_size: int, db: Session = Depends(get_db)):
- count = db.query(DbKgGraphs).count()
- total_page = ceil(count / page_size)
- start = 1
- if page <= total_page:
- start = (page - 1) * page_size
- results = db.query(DbKgGraphs).limit(page_size).offset(start).all()
- codes = [KgGraphs.model_validate(node) for node in results]
- return resp_200(
- data={"total": count, "pages": page, "size": page_size, "records": [item.model_dump() for item in codes]})
- @router.get("/graph-schema/{graph_id}")
- def graph_list(graph_id:int, db: Session = Depends(get_db)):
- data = db.query(DbKgGraphs).filter(DbKgGraphs.id == graph_id).first()
- settings = json.loads(data.graph_settings)
- schema_id = settings['schema_id']
- schema_data = db.query(DbKgSchemas).filter(DbKgSchemas.id == schema_id).first()
- schema_content = json.loads(schema_data.content)
- return resp_200(data=schema_content)
- @router.post("/graph-create")
- def create_graph(data: KgGraphCreate, db: Session = Depends(get_db)):
- graph_settings = { "schema_id": data.schema_id }
- db_node = DbKgGraphs()
- db_node.name = data.name
- db_node.category = data.category
- db_node.graph_description = data.graph_description
- db_node.graph_settings = json.dumps(graph_settings)
- db_node.created = datetime.now()
- db_node.updated = datetime.now()
- db.add(db_node)
- db.commit()
- db.refresh(db_node)
- return resp_200(data={'error_code': 0, 'error_message':'' ,'data':KgGraphs.model_validate(db_node).model_dump()})
- @router.get("/graph-search/{query}")
- def search_projects(query:str, db: Session = Depends(get_db)):
- result = db.query(DbKgGraphs).filter(DbKgGraphs.status>=0,
- DbKgGraphs.name.ilike('%'+query+'%')).all()
- data = [KgGraphs.model_validate(item) for item in result]
- graphs = [d.model_dump() for d in data]
- for proj in graphs:
- proj.pop("graph_settings")
- return resp_200(data={'records': graphs})
- # @router.get("/api/graph-schema/{graph_id}")
- # def get_graph_schema(graph_id: int, db: Session = Depends(get_db)):
- # graph_data = db.query(DbKgGraphs).filter(DbKgGraphs.id == graph_id).first()
- # if graph_data:
- # settings = json.loads(graph_data.graph_settings)
- # if settings:
- # schema_id = settings['schema_id']
- # schema_data = db.query(DbKgSchemas).filter(DbKgSchemas.id == schema_id)
- # if schema_data:
- # schema_content_data = json.loads(schema_data.content)
- # return resp_200(data=schema_content_data)
- # return resp_200(data={'entities': [], 'relations': []})
- ################################### SCHEMA ###############################################
- @router.get("/schemas/{page}/{page_size}")
- def read_schemas(page:int, page_size:int, db: Session = Depends(get_db)):
- count = db.query(DbKgSchemas).count()
- total_page = ceil(count / page_size)
- start = 1
- if page <= total_page:
- start = (page-1) * page_size
-
- results = db.query(DbKgSchemas).limit(page_size).offset(start).all()
- codes = [KgSchemas.model_validate(node) for node in results]
-
- return resp_200(data= {"total":count, "pages": page, "size":page_size, "records":[item.model_dump() for item in codes]})
- @router.get("/schemas-search/{sys_name}")
- def read_all_schemas(sys_name: str, db: Session = Depends(get_db)):
- results = None
- if sys_name == 'all':
- results = db.query(DbKgSchemas).all()
- else:
- results = db.query(DbKgSchemas).filter(DbKgSchemas.schema_system.is_(sys_name)).all()
- schema_systems = [KgSchemas.model_validate(node) for node in results]
-
- return resp_200(data= {"total":len(schema_systems), "pages": 1, "size":len(schema_systems), "records":[item.model_dump() for item in schema_systems]})
- @router.get("/schemas-load/{schema_id}")
- def load_schemas(schema_id: int, db: Session = Depends(get_db)):
- results = None
- schema = db.query(DbKgSchemas).filter(DbKgSchemas.id == schema_id).first()
- if schema.schema_system == 'GraphWork' and schema.schema_type =='GW.Schema':
- #this is graph work schema system
- schema_data = json.loads(schema.content)
- return resp_200(data=schema_data)
- return resp_200(data={'entities': [], 'relations': []})
- # if sys_type == 'entity' or sys_type=='relation':
- # results = db.query(DbKgSchemas).filter(DbKgSchemas.schema_system==sys_name,DbKgSchemas.schema_type==sys_type).all()
- # else:
- # results = db.query(DbKgSchemas).filter(DbKgSchemas.schema_system==sys_name).all()
- #
- # schemas = [KgSchemas.model_validate(node) for node in results]
- # entities = []
- # relations = []
- # records = [item.model_dump() for item in schemas]
- # try:
- # for data in records:
- # if data['schema_type'] == 'ENTITY':
- # content = data['content']
- # props = []
- # lines = content.split("\n")
- # for line in lines:
- # line.strip()
- # if len(line) > 1:
- # parts = line.split("|")
- # if len(parts)<2:
- # parts.append(parts[0])
- # props.append({"category":1, "prop_title":parts[1], "prop_name": parts[0], "prop_value":parts[1]})
- # data['props'] = props
- # entities.append(data)
- # if data['schema_type'] == 'RELATION':
- # relations.append(data)
- # except Exception as e:
- # print("Exception", e)
- # return resp_200(data= {'entities':entities, 'relations': relations})
- @router.get("/schemas-get/{page_size}")
- def get_schema_definitions(page_size:int, db: Session = Depends(get_db)):
- results = db.query(DbKgSchemas).limit(page_size)
- schemas = [KgSchemas.model_validate(node) for node in results]
- records = [item.model_dump() for item in schemas]
- try:
- for data in records:
- content = data['content']
- props = []
- lines = content.split("\n")
- for line in lines:
- line.strip()
- if len(line) > 1:
- parts = line.split("|")
- if len(parts)<2:
- parts.append(parts[0])
- props.append({"category":1, "prop_title":parts[1], "prop_name": parts[0], "prop_value":parts[1]})
- data['props'] = props
- except Exception as e:
- print("Exception", e)
- return resp_200(data= {"total":len(schemas), "pages": 1, "size":page_size, "records": records})
- @router.post("/schemas-create")
- def create_schemas(data :KgSchemasCreate, db: Session = Depends(get_db)):
- db_node = DbKgSchemas(**data.model_dump())
- db_node.schema_system = 'DEFAULT'
- db_node.schema_type = 'ENTITY'
- db.add(db_node)
- db.commit()
- db.refresh(db_node)
-
- return resp_200(data= KgSchemas.model_validate(db_node).model_dump())
- @router.post("/schemas-update")
- def update_schemas(data :KgSchemasUpdate, db: Session = Depends(get_db)):
- db_node = DbKgSchemas(**data.model_dump())
- db.query(DbKgSchemas).filter(DbKgSchemas.id == db_node.id).update({'name': db_node.name, 'content':db_node.content, 'category':db_node.category, 'version': db_node.version })
-
- db.commit()
-
- db_node = db.query(DbKgSchemas).filter(DbKgSchemas.id == db_node.id).first()
-
- content = db_node.content
- props = []
- lines = content.split("\n")
- for line in lines:
- line.strip()
- if len(line) > 1:
- parts = line.split("|")
- if len(parts)>1:
- new_title = parts[1]
- db.query(DbKgProp).filter(DbKgProp.prop_name==parts[0]).update({'prop_title': new_title})
- print("update title ",new_title)
- db.commit()
-
- return resp_200(data= KgSchemas.model_validate(db_node).model_dump())
- @router.get("/schemas-delete/{id}")
- def delete_schemas(id: int, db: Session = Depends(get_db)):
- db.query(DbKgSchemas).filter(DbKgSchemas.id == id).delete()
- db.commit()
- return resp_200(data= {"id": id})
- ################################### NODE ###############################################
- @router.get("/node-category/{graph_id}/{category}/{limit}")
- def read_node_by_category(graph_id: int, category: str, limit:int, db: Session = Depends(get_db)):
- db_nodes = db.query(DbKgNode).filter(DbKgNode.category==category, DbKgNode.graph_id==graph_id).limit(limit)
- if db_nodes is None:
- raise HTTPException(status_code=404, detail="Node not found")
- node_list = [KgNode.model_validate(node) for node in db_nodes]
- return resp_200(data={"nodes":[node.model_dump() for node in node_list], "edges":[]})
- @router.get("/node-delete/{graph_id}/{node_id}", response_model=KgNode)
- def delete_node(graph_id: int, node_id: int, db: Session = Depends(get_db)):
- db_node = db.query(DbKgNode).filter(DbKgNode.graph_id == graph_id, DbKgNode.id == node_id).first()
- if db_node is None:
- raise HTTPException(status_code=404, detail="Node not found")
- edges_out = db.query(DbKgEdge).filter(DbKgEdge.src_id == node_id).delete()
- edges_in= db.query(DbKgEdge).filter(DbKgEdge.dest_id == node_id).delete()
- db.delete(db_node)
- db.commit()
- return resp_200(data= {"id": node_id})
- @router.post("/node-create")
- def create_node(graph_id: int, node: KgNode, db: Session = Depends(get_db)):
- count = db.query(DbKgNode).filter(DbKgNode.category == node.category, DbKgNode.name == node.name).count()
-
- if count > 0:
- return resp_200(data= {"id": 0, "error_code": 1, "error_msg": "Node already existed"})
- db_node = DbKgNode()
- db_node.graph_id = node.graph_id
- db_node.category = node.category
- db_node.name = node.name
- db_node.layout = ''
- db_node.version = '1.0'
-
- db.add(db_node)
- db.commit()
- db.refresh(db_node)
- print(db_node)
- kg_props = []
- for prop in node.props:
- p = DbKgProp()
- p.ref_id = db_node.id
- p.category = 1
- p.prop_name = prop.prop_name
- p.prop_title = prop.prop_title
- p.prop_value = prop.prop_value
- kg_props.append(p)
- db.add_all(kg_props)
- db.commit()
- return resp_200(data= {"id": db_node.id, "error_code": 0, "error_msg": ""})
- @router.get("/nodes/summary/{graph_id}")
- def get_node_summary(graph_id: int=0, db:Session=Depends(get_db)):
- results = db.query(DbKgNode.category, func.count(1)).group_by(DbKgNode.category).all()
- ret = []
- for r in results:
- category, count = r
- ret.append({"category":category, "count":count})
- return resp_200(data=ret)
- @router.get("/nodes/search/{graph_id}/{node_name}/{in_out}/{deepth}")
- def search_node(graph_id:int, node_name: str, in_out: int =0, deepth: int=1, db: Session= Depends(get_db)):
- '''
- in_out: 0=只有节点 1=节点及进入节点的边 2=节点及出节点的边 3=节点及进出节点的边
- deepth: 深度,递归搜索的深度,目前只支持1
- '''
- if node_name.startswith("-"):
- query = db.query(DbKgNode).filter(DbKgNode.graph_id==graph_id, DbKgNode.name==node_name[1:])
- else:
- query = db.query(DbKgNode).filter(DbKgNode.graph_id==graph_id, DbKgNode.name.ilike('%'+node_name+'%'))
- #print("原生SQL:", query.statement.compile(compile_kwargs={"literal_binds": True}))
- db_nodes = query.all()
- if db_nodes is None:
- raise HTTPException(status_code=404, detail="Node not found")
-
- #print (f'%{node_name}%')
- #print('sql query result', db_nodes)
- nodes = []
- edges = []
- in_count = 0
- out_count = 0
- for node in db_nodes:
- nodes_append(nodes, KgNode.model_validate(node).model_dump())
-
- if in_out == 2 or in_out == 3:
- print("select out edges")
- count = db.query(DbKgEdge).filter(DbKgEdge.graph_id==graph_id, and_(DbKgEdge.src_id == node.id, DbKgEdge.status == 0)).count()
- if count > 0:
- in_count = count
- results = db.query(DbKgEdge).filter(DbKgEdge.graph_id==graph_id, and_(DbKgEdge.src_id == node.id, DbKgEdge.status == 0)).limit(10)
- edges_list = [KgEdge.model_validate(n) for n in results]
- edges_raw = [n.model_dump() for n in edges_list]
- for edge in edges_raw:
- nodes_append(nodes, edge["src_node"])
- nodes_append(nodes, edge["dest_node"])
- data = edge
- data.pop("src_node", None)
- data.pop("dest_node", None)
- nodes_append(edges, data)
-
- if in_out == 1 or in_out == 3:
- print("select in edges")
- count = db.query(DbKgEdge).filter(DbKgEdge.graph_id==graph_id, and_(DbKgEdge.dest_id == node.id, DbKgEdge.status == 0)).count()
- if count > 0:
- out_count = count
- results = db.query(DbKgEdge).filter(DbKgEdge.graph_id==graph_id, and_(DbKgEdge.dest_id == node.id, DbKgEdge.status == 0)).limit(10)
- edges_list = [KgEdge.model_validate(n) for n in results]
- edges_raw = [n.model_dump() for n in edges_list]
- for edge in edges_raw:
- nodes_append(nodes, KgNode.model_validate(edge["src_node"]).model_dump())
- nodes_append(nodes, KgNode.model_validate(edge["dest_node"]).model_dump())
- data = edge
- data.pop("src_node", None)
- data.pop("dest_node", None)
- nodes_append(edges, data)
-
- return resp_200(data={"summary":{"count_in": in_count, "count_out":out_count},"nodes":nodes, "edges":edges})
- @router.get("/nodes/{node_id}/{in_out}/{deepth}")
- def read_node(node_id: int, in_out: int =0, deepth: int=1, db: Session = Depends(get_db)):
- '''
- in_out: 0=只有节点 1=节点及进入节点的边 2=节点及出节点的边 3=节点及进出节点的边
- deepth: 深度,递归搜索的深度,目前只支持1
- '''
- if in_out == 0:
- #only current node
- node = db.query(DbKgNode).filter(DbKgNode.id == node_id).first()
- return resp_200(data={"summary":{"count_in": 0, "count_out":0},"nodes":[KgNode.model_validate(node).model_dump()], "edges":[]})
-
- nodes = []
- edges = []
- nodes_ids = []
- edge_ids = []
- count_in = 0
- count_out = 0
- if in_out == 1 or in_out == 3:
- #print("原生SQL:", db.query(DbKgEdge).filter(DbKgEdge.dest_id == node_id).statement.compile(compile_kwargs={"literal_binds": True}))
- count_in = db.query(DbKgEdge).filter(and_(DbKgEdge.dest_id == node_id, DbKgEdge.status ==0)).count()
- db_edges = db.query(DbKgEdge).filter(and_(DbKgEdge.dest_id == node_id, DbKgEdge.status ==0)).limit(25)
-
- for result in db_edges:
- edge = KgEdge.model_validate(result).model_dump()
-
- if (edge["src_node"]["id"] in nodes_ids) == False:
- nodes.append(edge["src_node"])
- nodes_ids.append(edge["src_node"]["id"])
- if (edge["dest_node"]["id"] in nodes_ids)==False:
- nodes_ids.append(edge["dest_node"]["id"])
- nodes.append(edge["dest_node"])
- data = edge
- data.pop("src_node", None)
- data.pop("dest_node", None)
- if (edge["id"] in edge_ids ) == False:
- edges.append(data)
- edge_ids.append(edge["id"])
-
- if in_out == 2 or in_out == 3:
- count_out = db.query(DbKgEdge).filter(and_(DbKgEdge.src_id == node_id, DbKgEdge.status ==0)).count()
- db_edges = db.query(DbKgEdge).filter(and_(DbKgEdge.src_id == node_id, DbKgEdge.status ==0)).limit(25)
- #print(count_out)
- for result in db_edges:
- edge = KgEdge.model_validate(result).model_dump()
-
- if (edge["src_node"]["id"] in nodes_ids) == False:
- nodes.append(edge["src_node"])
- nodes_ids.append(edge["src_node"]["id"])
- if (edge["dest_node"]["id"] in nodes_ids)==False:
- nodes_ids.append(edge["dest_node"]["id"])
- nodes.append(edge["dest_node"])
- data = edge
- data.pop("src_node", None)
- data.pop("dest_node", None)
- if (edge["id"] in edge_ids ) == False:
- edges.append(data)
- edge_ids.append(edge["id"])
-
-
- return resp_200(data={"summary":{"count_in": count_in, "count_out":count_out},"nodes":nodes, "edges":edges})
- @router.get("/nodes-browse/{node_id}")
- def browse_node(node_id: int, db: Session = Depends(get_db)):
- total_remain = 999
- db_edges_count = 0 #db.query(DbKgEdge).filter(DbKgEdge.src_id == node_id).count()
-
- db_edges0_count = db.query(DbKgEdge).filter(DbKgEdge.dest_id == node_id, DbKgEdge.status==0, DbKgEdge.category=='belongs_to').count()
-
- total = db_edges_count + db_edges0_count
- if total == 0:
- return resp_200(data={"summary":{"count_in": db_edges0_count, "count_out":db_edges_count},"nodes":[], "edges":[]})
- factor = db_edges_count / total
- db_edges_count = total_remain * factor
- factor = db_edges0_count / total
- db_edges0_count = total_remain * factor
-
- db_edges = [] #db.query(DbKgEdge).filter(DbKgEdge.src_id == node_id).limit(ceil(db_edges_count))
-
- if db_edges is None:
- raise HTTPException(status_code=404, detail="Edge not found")
- try:
- db_edges0 = db.query(DbKgEdge).filter(DbKgEdge.dest_id == node_id, DbKgEdge.status==0).limit(ceil(db_edges0_count))
- except Exception as e:
- print(e)
- if db_edges0 is None:
- raise HTTPException(status_code=404, detail="Edge not found")
-
- nodes_ids = []
- edge_ids = []
- nodes = []
- edges = []
-
- for results in [db_edges, db_edges0]:
- edges_list = [KgEdge.model_validate(node) for node in results]
- edges_raw = [node.model_dump() for node in edges_list]
- for edge in edges_raw:
- if (edge["src_node"]["id"] in nodes_ids) == False:
- nodes.append(edge["src_node"])
- nodes_ids.append(edge["src_node"]["id"])
- #if (edge["dest_node"]["id"] in nodes_ids)==False:
- # nodes_ids.append(edge["dest_node"]["id"])
- # nodes.append(edge["dest_node"])
- data = edge
- data.pop("src_node", None)
- data.pop("dest_node", None)
- if (edge["id"] in edge_ids ) == False:
- edges.append(data)
- edge_ids.append(edge["id"])
- return resp_200(data={"summary":{"count_in": len(nodes), "count_out":0},"nodes":nodes, "edges":edges})
- @router.post("/node-merge")
- def merge_node(mergeData:List[KgNodeMerge],db: Session = Depends(get_db)):
- edges_merge = []
- edges_invalid = []
- edges_insert = []
- for merge in mergeData:
- print("merge from %d to %d" % (merge.src_id, merge.dest_id))
- #原有的到源节点的边要更新到目标节点
- edges = db.query(DbKgEdge.id, DbKgEdge.src_id, DbKgEdge.dest_id, DbKgEdge.category, DbKgEdge.name ).filter(DbKgEdge.dest_id==merge.src_id, DbKgEdge.status==0).all()
- for edge in edges:
- id, src_id, dest_id, category, name = edge
- edges_merge.append({"id":id, "in_out": "src_in", "old_dest_id": dest_id, "src_id":src_id, "dest_id":merge.dest_id, "category":category, "name":name, "status":0})
- #原有节点的出边也要更新到目标节点
- edges = db.query(DbKgEdge.id, DbKgEdge.src_id, DbKgEdge.dest_id, DbKgEdge.category, DbKgEdge.name ).filter(DbKgEdge.src_id==merge.src_id, DbKgEdge.status==0).all()
- for edge in edges:
- id, src_id, dest_id, category, name = edge
- edges_merge.append({"id":id, "in_out": "src_out", "old_src_id": src_id, "src_id":merge.dest_id, "dest_id":dest_id, "category":category, "name":name, "status":0})
- #原有的两个节点之间的边需要设置为无效
- edges_to_update = db.query(DbKgEdge.id, DbKgEdge.src_id, DbKgEdge.dest_id, DbKgEdge.category, DbKgEdge.name, DbKgEdge.status).filter(and_(DbKgEdge.src_id==merge.src_id, DbKgEdge.dest_id==merge.dest_id, DbKgEdge.status==0)).all()
- for edge in edges_to_update:
- id, src_id, dest_id, category, name, status = edge
- status = -1 #delete
- edges_invalid.append({"id":id, "in_out": "out", "src_id":src_id, "dest_id":dest_id, "category":category, "name":name, "status":status})
-
- edges_to_update = db.query(DbKgEdge.id, DbKgEdge.src_id, DbKgEdge.dest_id, DbKgEdge.category, DbKgEdge.name, DbKgEdge.status).filter(and_(DbKgEdge.dest_id==merge.src_id, DbKgEdge.src_id==merge.dest_id, DbKgEdge.status==0)).all()
- for edge in edges_to_update:
- id, src_id, dest_id, category, name, status = edge
- status = -1 #delete
- edges_invalid.append({"id":id, "in_out": "in", "src_id":src_id, "dest_id":dest_id, "category":category, "name":name, "status":status})
- #插入一条merge的边
- edges_insert.append({"src_id":merge.src_id, "dest_id":merge.dest_id, "category":'MERGE_TO', "name":'MERGE', "status":0, 'version':"1.0"})
- edges = []
- for edge in edges_merge:
- if edge['dest_id'] == edge['src_id']:
- print("circle detected, skip")
- continue
- print("edge merged %d to %d %s-%s" % (edge['src_id'], edge['dest_id'], edge['category'], edge['name']))
- count = db.query(DbKgEdge).filter(DbKgEdge.src_id==edge['src_id'], DbKgEdge.dest_id==edge['dest_id'], DbKgEdge.category==edge['category']).count()
-
- if count > 0:
- print("can not move edge because of target has same edge already existed")
- edge['status'] = -1
- if edge["in_out"] == "src_in":
- edge["dest_id"] = edge["old_dest_id"]
- if edge["in_out"] == "src_out":
- edge["src_id"] = edge["old_src_id"]
- edges_invalid.append(edge)
- else:
- db.query(DbKgEdge).filter(DbKgEdge.id == edge['id']).update({'dest_id':edge['dest_id']})
- for edge in edges_insert:
- print("edge inserted %d to %d %s-%s" % (edge['src_id'], edge['dest_id'], edge['category'], edge['name']))
- count = db.query(DbKgEdge).filter(DbKgEdge.src_id==edge['src_id'], DbKgEdge.dest_id==edge['dest_id'], DbKgEdge.category==edge['category']).count()
- if count > 0:
- print("can insert edge because of edge already existed")
- continue
- else:
- edgeData = DbKgEdge()
- edgeData.src_id = edge['src_id']
- edgeData.dest_id = edge['dest_id']
- edgeData.category = edge['category']
- edgeData.name = edge['name']
- edgeData.status = edge['status']
- edgeData.version = edge['version']
- db.add(edgeData)
-
- for edge in edges_invalid:
- print("edge invalid %d to %d %s-%s" % (edge['src_id'], edge['dest_id'], edge['category'], edge['name']))
- db.query(DbKgEdge).filter(DbKgEdge.id == edge['id']).update({'status':edge['status']})
- db.commit()
- return resp_200(data= {"edges": edges, "error_code": 0, "error_msg": ""})
- ################################### EDGE ###############################################
- @router.get("/edges/c/{category}")
- def read_links_by_category(category: str, db: Session = Depends(get_db)):
- edges_names = db.query(DbKgEdge.category, DbKgEdge.name).group_by(DbKgEdge.category,DbKgEdge.name)
- if edges_names is None:
- raise HTTPException(status_code=404, detail="Node not found")
- names_list = [KgEdgeName.model_validate(node).model_dump() for node in edges_names]
- return resp_200(data={"records":names_list})
- @router.post("/edge-create", response_model=KgEdge)
- def create_edge(edges: List[KgEdgeCreate], db: Session = Depends(get_db)):
- try:
- db_edges = []
- db_edge_ids = []
- for edge in edges:
- if edge.src_id != edge.dest_id:
- existed_edges = db.query(DbKgEdge).filter(DbKgEdge.graph_id==edge.graph_id, DbKgEdge.src_id==edge.src_id, DbKgEdge.dest_id==edge.dest_id, DbKgEdge.category == edge.category).all()
- count = 0
- for ex_edge in existed_edges:
- ex_edge.status = 0
- db_edge_ids.append(ex_edge.id)
- count = count + 1
- if count == 0:
- db_edges.append(DbKgEdge(**edge.model_dump()))
- db_edge_ids.append((edge.src_id, edge.dest_id))
- error_msg = ""
- code = 0
- edges = []
- db.add_all(db_edges)
- db.commit()
- for ids in db_edge_ids:
- src_id, dest_id = ids
- db_edges = db.query(DbKgEdge).filter(and_(DbKgEdge.src_id==src_id, DbKgEdge.dest_id==dest_id)).all()
- for edge in db_edges:
- edge_raw = KgEdge.model_validate(edge).model_dump()
- edge_raw.pop("src_node", None)
- edge_raw.pop("dest_node", None)
- edges.append(edge_raw)
-
- except Exception as e:
- error_msg = str(e)
- code = -1
-
- return resp_200(data= {"error_code": code, "error_msg": error_msg, "edges": edges})
- @router.get("/edge-of-node/{node_id}")
- def get_edge_by_node(node_id:int, db: Session = Depends(get_db)):
- edges_in = db.query(DbKgEdge).filter(or_(DbKgEdge.src_id == node_id, DbKgEdge.dest_id==node_id)).order_by(DbKgEdge.id).all()
- edges = [KgEdge.model_validate(edge) for edge in edges_in]
- return resp_200(data={"edges":[edge.model_dump() for edge in edges]})
- @router.get("/edge-delete/{edge_id}/{status}")
- def delete_edge(edge_id:int, status:int, db: Session = Depends(get_db)):
- if status == 0 or status == -1:
- db.query(DbKgEdge).filter(DbKgEdge.id == edge_id).update({"status": status })
- db.commit()
- if status == -99:
- #delete
- db.query(DbKgEdge).filter(DbKgEdge.id == edge_id).delete()
- db.commit()
-
- return resp_200(data={"id":edge_id,'error_code':0, 'message':'Edge status updated'})
- ################################### SUB GRAPH ##########################################
- @router.get("/workspace-load")
- def get_all_sub_graph(db: Session = Depends(get_db)):
- db_datas = db.query(DbKgSubGraph).filter(DbKgSubGraph.status == 0).all()
- validate_data = [KgSubGraph.model_validate(data) for data in db_datas]
- return resp_200(data={"graphs":[data.model_dump() for data in validate_data]})
- @router.get("/workspace-get/{graph_id}")
- def sub_graph_load(graph_id: int, db: Session = Depends(get_db)):
- graph_data = db.query(DbKgSubGraph).filter(DbKgSubGraph.id == graph_id).first()
- nodes = []
- edges = []
- if graph_data:
- json_data = json.loads(graph_data.graph_content)
- node_ids = []
- for node in json_data["nodes"]:
- node_ids.append(node["id"])
- nodes_data = db.query(DbKgNode).filter(DbKgNode.id.in_(node_ids)).all()
- edges_in = db.query(DbKgEdge).filter(DbKgEdge.dest_id.in_(node_ids), DbKgEdge.status==0).all()
- edges_out = db.query(DbKgEdge).filter(DbKgEdge.src_id.in_(node_ids), DbKgEdge.status==0).all()
- all_edges = edges_in + edges_out
- node_ids = []
- for node in nodes_data:
- nodes.append(KgNode.model_validate(node))
- node_ids.append(node.id)
- for edge in all_edges:
- if edge.src_id in node_ids and edge.dest_id in node_ids and edge.status >= 0:
- edges.append(KgEdge.model_validate(edge))
-
- return resp_200(data={"nodes":[node.model_dump() for node in nodes],"edges":[edge.model_dump() for edge in edges]})
- #TODO : retrieve all data and edges
- @router.post("/workspace-update")
- def update_sub_graph(data:KgSubGraph, db:Session = Depends(get_db)):
- db.query(DbKgSubGraph).filter(DbKgSubGraph.id == data.id).update({'graph_name':data.graph_name,'graph_content': data.graph_content})
- db.commit()
- return resp_200(data= {"id": data.id, "error_code": 0, "error_msg": ""})
- @router.post("/workspace-create")
- def create_sub_graph(data:KgSubGraphCreate,db: Session = Depends(get_db)):
- count = db.query(DbKgSubGraph).filter(DbKgSubGraph.graph_name == data.graph_name, DbKgSubGraph.status==0).count()
- if count > 0:
- return resp_200(data= {"id": 0, "error_code": 1, "error_msg": "Graph already existed"})
- db_data = DbKgSubGraph()
- db_data.graph_name = data.graph_name
- db_data.graph_content = data.graph_content
- db_data.status = data.status
-
- db.add(db_data)
- db.commit()
- db.refresh(db_data)
- return resp_200(data= {"id": db_data.id, "error_code": 0, "error_msg": ""})
- @router.get("/workspace-delete/{graph_id}")
- def sub_graph_delete(graph_id: int, db: Session = Depends(get_db)):
- db.query(DbKgSubGraph).filter(DbKgSubGraph.id == graph_id).delete()
- db.commit()
- return resp_200(data= {"id": graph_id, "error_code": 0, "error_msg": ""})
- @router.post("/workspace-validate")
- def sub_graph_validate(data:KgSubGraphCreate, db: Session = Depends(get_db)):
- graph_data = data.graph_content
- nodes = []
- edges = []
- if graph_data:
- json_data = json.loads(graph_data)
- node_ids = []
- for node in json_data["nodes"]:
- node_ids.append(node["id"])
- nodes_data = db.query(DbKgNode).filter(DbKgNode.id.in_(node_ids)).all()
- edges_in = db.query(DbKgEdge).filter(and_(DbKgEdge.dest_id.in_(node_ids), DbKgEdge.status==0)).order_by(DbKgEdge.id).all()
- edges_out = db.query(DbKgEdge).filter(and_(DbKgEdge.src_id.in_(node_ids), DbKgEdge.status==0)).order_by(DbKgEdge.id).all()
- all_edges = edges_in + edges_out
- node_ids = []
- edge_ids = []
- for node in nodes_data:
- nodes.append(KgNode.model_validate(node))
- node_ids.append(node.id)
- for edge in all_edges:
- if edge.id in edge_ids:
- continue
- if edge.src_id in node_ids and edge.dest_id in node_ids:
- data = KgEdge.model_validate(edge).model_dump()
- data.pop("src_node", None)
- data.pop("dest_node", None)
- edges.append(data)
- edge_ids.append(edge.id)
-
- return resp_200(data={ "error_code": 0, "error_msg": "", "nodes":[node.model_dump() for node in nodes],"edges":edges})
- ################################### DICT ###############################################
- @router.get("/dict/icd/{page}/{page_size}")
- def read_icd_page(page:int, page_size:int, db:Session=Depends(get_db)):
- count = db.query(DbDictICD).count()
- total_page = ceil(count / page_size)
- start = 1
- if page <= total_page:
- start = (page-1) * page_size
-
- results = db.query(DbDictICD).limit(page_size).offset(start).all()
- codes = [DictICD.model_validate(node) for node in results]
-
- return resp_200(data= {"total":count, "pages": page, "size":page_size, "records":[item.model_dump() for item in codes]})
- @router.get("/dict/icd/search/{page}/{page_size}/{name}")
- def search_icd(page:int, page_size:int, name:str, db:Session=Depends(get_db)):
- count = db.query(DbDictICD).filter(DbDictICD.icd_name.like("%"+name+"%")).count()
- total_page = ceil(count / page_size)
- start = 1
- if page <= total_page:
- start = (page-1) * page_size
-
- results = db.query(DbDictICD).filter(DbDictICD.icd_name.like("%"+name+"%")).limit(page_size).offset(start).all()
- codes = [DictICD.model_validate(node) for node in results]
-
- return resp_200(data= {"total":count, "pages": page, "size":page_size, "records":[item.model_dump() for item in codes]})
- @router.get("/dict/drg/{page}/{page_size}")
- def get_drg_page(page:int, page_size:int, db:Session=Depends(get_db)):
- count = db.query(DbDictDRG).count()
- total_page = ceil(count / page_size)
- start = 1
- if page <= total_page:
- start = (page-1) * page_size
-
- results = db.query(DbDictDRG).limit(page_size).offset(start).all()
- codes = [DictDRG.model_validate(node) for node in results]
-
- return resp_200(data= {"total":count, "pages": page, "size":page_size, "records":[item.model_dump() for item in codes]})
- @router.get("/drg/search/{page}/{page_size}/{name}")
- def search_drg(page:int, page_size:int, name:str, db:Session=Depends(get_db)):
- count = db.query(DbDictDRG).filter(DbDictDRG.drg_name.like("%"+name+"%")).count()
- total_page = ceil(count / page_size)
- start = 1
- if page <= total_page:
- start = (page-1) * page_size
-
- results = db.query(DbDictDRG).filter(DbDictDRG.drg_name.like("%"+name+"%")).limit(page_size).offset(start).all()
- codes = [DictDRG.model_validate(node) for node in results]
-
- return resp_200(data= {"total":count, "pages": page, "size":page_size, "records":[item.model_dump() for item in codes]})
- @router.get("/dict/drug/{page}/{page_size}")
- def read_drug_page(page:int, page_size:int, db:Session=Depends(get_db)):
- count = db.query(DbDictDrug).count()
- total_page = ceil(count / page_size)
- start = 1
- if page <= total_page:
- start = (page-1) * page_size
-
- results = db.query(DbDictDrug).limit(page_size).offset(start).all()
- codes = [DictDrug.model_validate(node) for node in results]
-
- return resp_200(data= {"total":count, "pages": page, "size":page_size, "records":[item.model_dump() for item in codes]})
- @router.get("/dict/drug/search/{page}/{page_size}/{name}")
- def search_drg(page:int, page_size:int, name:str, db:Session=Depends(get_db)):
- count = db.query(DbDictDrug).filter(or_(DbDictDrug.reg_name.like("%"+name+"%"), DbDictDrug.prod_factory.like("%"+name+"%"))).count()
- total_page = ceil(count / page_size)
- start = 1
- if page <= total_page:
- start = (page-1) * page_size
-
- results = db.query(DbDictDrug).filter(or_(DbDictDrug.reg_name.like("%"+name+"%"), DbDictDrug.prod_factory.like("%"+name+"%"))).limit(page_size).offset(start).all()
- codes = [DictDrug.model_validate(node) for node in results]
-
- return resp_200(data= {"total":count, "pages": page, "size":page_size, "records":[item.model_dump() for item in codes]})
- #######################################################################################
- '''
- @router.put("/api/nodes/{node_id}", response_model=KgNode)
- def update_node(node_id: int, node: KgNodeCreate, db: Session = Depends(get_db)):
- db_node = db.query(DbKgNode).filter(DbKgNode.id == node_id).first()
- if db_node is None:
- raise HTTPException(status_code=404, detail="Node not found")
- for key, value in node.dict().items():
- setattr(db_node, key, value)
- db.commit()
- db.refresh(db_node)
- return db_node
- @router.get("/api/edges/{edge_id}", response_model=KgEdge)
- def read_edge(edge_id: int, db: Session = Depends(get_db)):
- db_edge = db.query(DbKgEdge).filter(DbKgEdge.id == edge_id).first()
- if db_edge is None:
- raise HTTPException(status_code=404, detail="Edge not found")
- return db_edge
- @router.put("/api/edges/{edge_id}", response_model=KgEdge)
- def update_edge(edge_id: int, edge: KgEdgeCreate, db: Session = Depends(get_db)):
- db_edge = db.query(DbKgEdge).filter(DbKgEdge.id == edge_id).first()
- if db_edge is None:
- raise HTTPException(status_code=404, detail="Edge not found")
- for key, value in edge.dict().items():
- setattr(db_edge, key, value)
- db.commit()
- db.refresh(db_edge)
- return db_edge
- @router.delete("/api/edges/{edge_id}", response_model=KgEdge)
- def delete_edge(edge_id: int, db: Session = Depends(get_db)):
- db_edge = db.query(DbKgEdge).filter(DbKgEdge.id == edge_id).first()
- if db_edge is None:
- raise HTTPException(status_code=404, detail="Edge not found")
- db.delete(db_edge)
- db.commit()
- return db_edge
- @router.post("/api/props/", response_model=KgProp)
- def create_prop(prop: KgPropCreate, db: Session = Depends(get_db)):
- db_prop = DbKgProp(**prop.dict())
- db.add(db_prop)
- db.commit()
- db.refresh(db_prop)
- return db_prop
- @router.get("/api/props/{prop_id}", response_model=KgProp)
- def read_prop(prop_id: int, db: Session = Depends(get_db)):
- db_prop = db.query(models.KgProp).filter(models.KgProp.id == prop_id).first()
- if db_prop is None:
- raise HTTPException(status_code=404, detail="Property not found")
- return db_prop
- @router.put("/api/props/{prop_id}", response_model=KgProp)
- def update_prop(prop_id: int, prop: KgPropCreate, db: Session = Depends(get_db)):
- db_prop = db.query(models.KgProp).filter(models.KgProp.id == prop_id).first()
- if db_prop is None:
- raise HTTPException(status_code=404, detail="Property not found")
- for key, value in prop.dict().items():
- setattr(db_prop, key, value)
- db.commit()
- db.refresh(db_prop)
- return db_prop
- @router.delete("/api/props/{prop_id}", response_model=KgProp)
- def delete_prop(prop_id: int, db: Session = Depends(get_db)):
- db_prop = db.query(models.KgProp).filter(models.KgProp.id == prop_id).first()
- if db_prop is None:
- raise HTTPException(status_code=404, detail="Property not found")
- db.delete(db_prop)
- db.commit()
- return db_prop
- '''
- graph_mgr_router = router
|