graph_mgr_router.py 38 KB


  1. from fastapi import APIRouter, Depends, HTTPException
  2. from sqlalchemy.orm import Session
  3. from sqlalchemy.sql import select, or_, and_, func,distinct
  4. from agent.models.web.graph import *
  5. from agent.models.db.graph import *
  6. from db.database import get_db
  7. from agent.libs.response import resp_200
  8. from typing import List
  9. from math import ceil
  10. import json
  11. from datetime import datetime
  12. router = APIRouter(prefix="/graph-mgr", tags=["agent job interface"])
  13. def nodes_append(list, node):
  14. for n in list:
  15. if n['id'] == node["id"]:
  16. return
  17. list.append(node)
  18. ################################### GRAPH ###############################################
  19. @router.get("/graph-list/{page}/{page_size}")
  20. def graph_list(page: int, page_size: int, db: Session = Depends(get_db)):
  21. count = db.query(DbKgGraphs).count()
  22. total_page = ceil(count / page_size)
  23. start = 1
  24. if page <= total_page:
  25. start = (page - 1) * page_size
  26. results = db.query(DbKgGraphs).limit(page_size).offset(start).all()
  27. codes = [KgGraphs.model_validate(node) for node in results]
  28. return resp_200(
  29. data={"total": count, "pages": page, "size": page_size, "records": [item.model_dump() for item in codes]})
  30. @router.get("/graph-schema/{graph_id}")
  31. def graph_list(graph_id:int, db: Session = Depends(get_db)):
  32. data = db.query(DbKgGraphs).filter(DbKgGraphs.id == graph_id).first()
  33. settings = json.loads(data.graph_settings)
  34. schema_id = settings['schema_id']
  35. schema_data = db.query(DbKgSchemas).filter(DbKgSchemas.id == schema_id).first()
  36. schema_content = json.loads(schema_data.content)
  37. return resp_200(data=schema_content)
  38. @router.post("/graph-create")
  39. def create_graph(data: KgGraphCreate, db: Session = Depends(get_db)):
  40. graph_settings = { "schema_id": data.schema_id }
  41. db_node = DbKgGraphs()
  42. db_node.name = data.name
  43. db_node.category = data.category
  44. db_node.graph_description = data.graph_description
  45. db_node.graph_settings = json.dumps(graph_settings)
  46. db_node.created = datetime.now()
  47. db_node.updated = datetime.now()
  48. db.add(db_node)
  49. db.commit()
  50. db.refresh(db_node)
  51. return resp_200(data={'error_code': 0, 'error_message':'' ,'data':KgGraphs.model_validate(db_node).model_dump()})
  52. @router.get("/graph-search/{query}")
  53. def search_projects(query:str, db: Session = Depends(get_db)):
  54. result = db.query(DbKgGraphs).filter(DbKgGraphs.status>=0,
  55. DbKgGraphs.name.ilike('%'+query+'%')).all()
  56. data = [KgGraphs.model_validate(item) for item in result]
  57. graphs = [d.model_dump() for d in data]
  58. for proj in graphs:
  59. proj.pop("graph_settings")
  60. return resp_200(data={'records': graphs})
  61. # @router.get("/api/graph-schema/{graph_id}")
  62. # def get_graph_schema(graph_id: int, db: Session = Depends(get_db)):
  63. # graph_data = db.query(DbKgGraphs).filter(DbKgGraphs.id == graph_id).first()
  64. # if graph_data:
  65. # settings = json.loads(graph_data.graph_settings)
  66. # if settings:
  67. # schema_id = settings['schema_id']
  68. # schema_data = db.query(DbKgSchemas).filter(DbKgSchemas.id == schema_id)
  69. # if schema_data:
  70. # schema_content_data = json.loads(schema_data.content)
  71. # return resp_200(data=schema_content_data)
  72. # return resp_200(data={'entities': [], 'relations': []})
  73. ################################### SCHEMA ###############################################
  74. @router.get("/schemas/{page}/{page_size}")
  75. def read_schemas(page:int, page_size:int, db: Session = Depends(get_db)):
  76. count = db.query(DbKgSchemas).count()
  77. total_page = ceil(count / page_size)
  78. start = 1
  79. if page <= total_page:
  80. start = (page-1) * page_size
  81. results = db.query(DbKgSchemas).limit(page_size).offset(start).all()
  82. codes = [KgSchemas.model_validate(node) for node in results]
  83. return resp_200(data= {"total":count, "pages": page, "size":page_size, "records":[item.model_dump() for item in codes]})
  84. @router.get("/schemas-search/{sys_name}")
  85. def read_all_schemas(sys_name: str, db: Session = Depends(get_db)):
  86. results = None
  87. if sys_name == 'all':
  88. results = db.query(DbKgSchemas).all()
  89. else:
  90. results = db.query(DbKgSchemas).filter(DbKgSchemas.schema_system.is_(sys_name)).all()
  91. schema_systems = [KgSchemas.model_validate(node) for node in results]
  92. return resp_200(data= {"total":len(schema_systems), "pages": 1, "size":len(schema_systems), "records":[item.model_dump() for item in schema_systems]})
  93. @router.get("/schemas-load/{schema_id}")
  94. def load_schemas(schema_id: int, db: Session = Depends(get_db)):
  95. results = None
  96. schema = db.query(DbKgSchemas).filter(DbKgSchemas.id == schema_id).first()
  97. if schema.schema_system == 'GraphWork' and schema.schema_type =='GW.Schema':
  98. #this is graph work schema system
  99. schema_data = json.loads(schema.content)
  100. return resp_200(data=schema_data)
  101. return resp_200(data={'entities': [], 'relations': []})
  102. # if sys_type == 'entity' or sys_type=='relation':
  103. # results = db.query(DbKgSchemas).filter(DbKgSchemas.schema_system==sys_name,DbKgSchemas.schema_type==sys_type).all()
  104. # else:
  105. # results = db.query(DbKgSchemas).filter(DbKgSchemas.schema_system==sys_name).all()
  106. #
  107. # schemas = [KgSchemas.model_validate(node) for node in results]
  108. # entities = []
  109. # relations = []
  110. # records = [item.model_dump() for item in schemas]
  111. # try:
  112. # for data in records:
  113. # if data['schema_type'] == 'ENTITY':
  114. # content = data['content']
  115. # props = []
  116. # lines = content.split("\n")
  117. # for line in lines:
  118. # line.strip()
  119. # if len(line) > 1:
  120. # parts = line.split("|")
  121. # if len(parts)<2:
  122. # parts.append(parts[0])
  123. # props.append({"category":1, "prop_title":parts[1], "prop_name": parts[0], "prop_value":parts[1]})
  124. # data['props'] = props
  125. # entities.append(data)
  126. # if data['schema_type'] == 'RELATION':
  127. # relations.append(data)
  128. # except Exception as e:
  129. # print("Exception", e)
  130. # return resp_200(data= {'entities':entities, 'relations': relations})
  131. @router.get("/schemas-get/{page_size}")
  132. def get_schema_definitions(page_size:int, db: Session = Depends(get_db)):
  133. results = db.query(DbKgSchemas).limit(page_size)
  134. schemas = [KgSchemas.model_validate(node) for node in results]
  135. records = [item.model_dump() for item in schemas]
  136. try:
  137. for data in records:
  138. content = data['content']
  139. props = []
  140. lines = content.split("\n")
  141. for line in lines:
  142. line.strip()
  143. if len(line) > 1:
  144. parts = line.split("|")
  145. if len(parts)<2:
  146. parts.append(parts[0])
  147. props.append({"category":1, "prop_title":parts[1], "prop_name": parts[0], "prop_value":parts[1]})
  148. data['props'] = props
  149. except Exception as e:
  150. print("Exception", e)
  151. return resp_200(data= {"total":len(schemas), "pages": 1, "size":page_size, "records": records})
  152. @router.post("/schemas-create")
  153. def create_schemas(data :KgSchemasCreate, db: Session = Depends(get_db)):
  154. db_node = DbKgSchemas(**data.model_dump())
  155. db_node.schema_system = 'DEFAULT'
  156. db_node.schema_type = 'ENTITY'
  157. db.add(db_node)
  158. db.commit()
  159. db.refresh(db_node)
  160. return resp_200(data= KgSchemas.model_validate(db_node).model_dump())
  161. @router.post("/schemas-update")
  162. def update_schemas(data :KgSchemasUpdate, db: Session = Depends(get_db)):
  163. db_node = DbKgSchemas(**data.model_dump())
  164. db.query(DbKgSchemas).filter(DbKgSchemas.id == db_node.id).update({'name': db_node.name, 'content':db_node.content, 'category':db_node.category, 'version': db_node.version })
  165. db.commit()
  166. db_node = db.query(DbKgSchemas).filter(DbKgSchemas.id == db_node.id).first()
  167. content = db_node.content
  168. props = []
  169. lines = content.split("\n")
  170. for line in lines:
  171. line.strip()
  172. if len(line) > 1:
  173. parts = line.split("|")
  174. if len(parts)>1:
  175. new_title = parts[1]
  176. db.query(DbKgProp).filter(DbKgProp.prop_name==parts[0]).update({'prop_title': new_title})
  177. print("update title ",new_title)
  178. db.commit()
  179. return resp_200(data= KgSchemas.model_validate(db_node).model_dump())
  180. @router.get("/schemas-delete/{id}")
  181. def delete_schemas(id: int, db: Session = Depends(get_db)):
  182. db.query(DbKgSchemas).filter(DbKgSchemas.id == id).delete()
  183. db.commit()
  184. return resp_200(data= {"id": id})
  185. ################################### NODE ###############################################
  186. @router.get("/node-category/{graph_id}/{category}/{limit}")
  187. def read_node_by_category(graph_id: int, category: str, limit:int, db: Session = Depends(get_db)):
  188. db_nodes = db.query(DbKgNode).filter(DbKgNode.category==category, DbKgNode.graph_id==graph_id).limit(limit)
  189. if db_nodes is None:
  190. raise HTTPException(status_code=404, detail="Node not found")
  191. node_list = [KgNode.model_validate(node) for node in db_nodes]
  192. return resp_200(data={"nodes":[node.model_dump() for node in node_list], "edges":[]})
  193. @router.get("/node-delete/{graph_id}/{node_id}", response_model=KgNode)
  194. def delete_node(graph_id: int, node_id: int, db: Session = Depends(get_db)):
  195. db_node = db.query(DbKgNode).filter(DbKgNode.graph_id == graph_id, DbKgNode.id == node_id).first()
  196. if db_node is None:
  197. raise HTTPException(status_code=404, detail="Node not found")
  198. edges_out = db.query(DbKgEdge).filter(DbKgEdge.src_id == node_id).delete()
  199. edges_in= db.query(DbKgEdge).filter(DbKgEdge.dest_id == node_id).delete()
  200. db.delete(db_node)
  201. db.commit()
  202. return resp_200(data= {"id": node_id})
  203. @router.post("/node-create")
  204. def create_node(graph_id: int, node: KgNode, db: Session = Depends(get_db)):
  205. count = db.query(DbKgNode).filter(DbKgNode.category == node.category, DbKgNode.name == node.name).count()
  206. if count > 0:
  207. return resp_200(data= {"id": 0, "error_code": 1, "error_msg": "Node already existed"})
  208. db_node = DbKgNode()
  209. db_node.graph_id = node.graph_id
  210. db_node.category = node.category
  211. db_node.name = node.name
  212. db_node.layout = ''
  213. db_node.version = '1.0'
  214. db.add(db_node)
  215. db.commit()
  216. db.refresh(db_node)
  217. print(db_node)
  218. kg_props = []
  219. for prop in node.props:
  220. p = DbKgProp()
  221. p.ref_id = db_node.id
  222. p.category = 1
  223. p.prop_name = prop.prop_name
  224. p.prop_title = prop.prop_title
  225. p.prop_value = prop.prop_value
  226. kg_props.append(p)
  227. db.add_all(kg_props)
  228. db.commit()
  229. return resp_200(data= {"id": db_node.id, "error_code": 0, "error_msg": ""})
  230. @router.get("/nodes/summary/{graph_id}")
  231. def get_node_summary(graph_id: int=0, db:Session=Depends(get_db)):
  232. results = db.query(DbKgNode.category, func.count(1)).group_by(DbKgNode.category).all()
  233. ret = []
  234. for r in results:
  235. category, count = r
  236. ret.append({"category":category, "count":count})
  237. return resp_200(data=ret)
  238. @router.get("/nodes/search/{graph_id}/{node_name}/{in_out}/{deepth}")
  239. def search_node(graph_id:int, node_name: str, in_out: int =0, deepth: int=1, db: Session= Depends(get_db)):
  240. '''
  241. in_out: 0=只有节点 1=节点及进入节点的边 2=节点及出节点的边 3=节点及进出节点的边
  242. deepth: 深度,递归搜索的深度,目前只支持1
  243. '''
  244. if node_name.startswith("-"):
  245. query = db.query(DbKgNode).filter(DbKgNode.graph_id==graph_id, DbKgNode.name==node_name[1:])
  246. else:
  247. query = db.query(DbKgNode).filter(DbKgNode.graph_id==graph_id, DbKgNode.name.ilike('%'+node_name+'%'))
  248. #print("原生SQL:", query.statement.compile(compile_kwargs={"literal_binds": True}))
  249. db_nodes = query.all()
  250. if db_nodes is None:
  251. raise HTTPException(status_code=404, detail="Node not found")
  252. #print (f'%{node_name}%')
  253. #print('sql query result', db_nodes)
  254. nodes = []
  255. edges = []
  256. in_count = 0
  257. out_count = 0
  258. for node in db_nodes:
  259. nodes_append(nodes, KgNode.model_validate(node).model_dump())
  260. if in_out == 2 or in_out == 3:
  261. print("select out edges")
  262. count = db.query(DbKgEdge).filter(DbKgEdge.graph_id==graph_id, and_(DbKgEdge.src_id == node.id, DbKgEdge.status == 0)).count()
  263. if count > 0:
  264. in_count = count
  265. results = db.query(DbKgEdge).filter(DbKgEdge.graph_id==graph_id, and_(DbKgEdge.src_id == node.id, DbKgEdge.status == 0)).limit(10)
  266. edges_list = [KgEdge.model_validate(n) for n in results]
  267. edges_raw = [n.model_dump() for n in edges_list]
  268. for edge in edges_raw:
  269. nodes_append(nodes, edge["src_node"])
  270. nodes_append(nodes, edge["dest_node"])
  271. data = edge
  272. data.pop("src_node", None)
  273. data.pop("dest_node", None)
  274. nodes_append(edges, data)
  275. if in_out == 1 or in_out == 3:
  276. print("select in edges")
  277. count = db.query(DbKgEdge).filter(DbKgEdge.graph_id==graph_id, and_(DbKgEdge.dest_id == node.id, DbKgEdge.status == 0)).count()
  278. if count > 0:
  279. out_count = count
  280. results = db.query(DbKgEdge).filter(DbKgEdge.graph_id==graph_id, and_(DbKgEdge.dest_id == node.id, DbKgEdge.status == 0)).limit(10)
  281. edges_list = [KgEdge.model_validate(n) for n in results]
  282. edges_raw = [n.model_dump() for n in edges_list]
  283. for edge in edges_raw:
  284. nodes_append(nodes, KgNode.model_validate(edge["src_node"]).model_dump())
  285. nodes_append(nodes, KgNode.model_validate(edge["dest_node"]).model_dump())
  286. data = edge
  287. data.pop("src_node", None)
  288. data.pop("dest_node", None)
  289. nodes_append(edges, data)
  290. return resp_200(data={"summary":{"count_in": in_count, "count_out":out_count},"nodes":nodes, "edges":edges})
  291. @router.get("/nodes/{node_id}/{in_out}/{deepth}")
  292. def read_node(node_id: int, in_out: int =0, deepth: int=1, db: Session = Depends(get_db)):
  293. '''
  294. in_out: 0=只有节点 1=节点及进入节点的边 2=节点及出节点的边 3=节点及进出节点的边
  295. deepth: 深度,递归搜索的深度,目前只支持1
  296. '''
  297. if in_out == 0:
  298. #only current node
  299. node = db.query(DbKgNode).filter(DbKgNode.id == node_id).first()
  300. return resp_200(data={"summary":{"count_in": 0, "count_out":0},"nodes":[KgNode.model_validate(node).model_dump()], "edges":[]})
  301. nodes = []
  302. edges = []
  303. nodes_ids = []
  304. edge_ids = []
  305. count_in = 0
  306. count_out = 0
  307. if in_out == 1 or in_out == 3:
  308. #print("原生SQL:", db.query(DbKgEdge).filter(DbKgEdge.dest_id == node_id).statement.compile(compile_kwargs={"literal_binds": True}))
  309. count_in = db.query(DbKgEdge).filter(and_(DbKgEdge.dest_id == node_id, DbKgEdge.status ==0)).count()
  310. db_edges = db.query(DbKgEdge).filter(and_(DbKgEdge.dest_id == node_id, DbKgEdge.status ==0)).limit(25)
  311. for result in db_edges:
  312. edge = KgEdge.model_validate(result).model_dump()
  313. if (edge["src_node"]["id"] in nodes_ids) == False:
  314. nodes.append(edge["src_node"])
  315. nodes_ids.append(edge["src_node"]["id"])
  316. if (edge["dest_node"]["id"] in nodes_ids)==False:
  317. nodes_ids.append(edge["dest_node"]["id"])
  318. nodes.append(edge["dest_node"])
  319. data = edge
  320. data.pop("src_node", None)
  321. data.pop("dest_node", None)
  322. if (edge["id"] in edge_ids ) == False:
  323. edges.append(data)
  324. edge_ids.append(edge["id"])
  325. if in_out == 2 or in_out == 3:
  326. count_out = db.query(DbKgEdge).filter(and_(DbKgEdge.src_id == node_id, DbKgEdge.status ==0)).count()
  327. db_edges = db.query(DbKgEdge).filter(and_(DbKgEdge.src_id == node_id, DbKgEdge.status ==0)).limit(25)
  328. #print(count_out)
  329. for result in db_edges:
  330. edge = KgEdge.model_validate(result).model_dump()
  331. if (edge["src_node"]["id"] in nodes_ids) == False:
  332. nodes.append(edge["src_node"])
  333. nodes_ids.append(edge["src_node"]["id"])
  334. if (edge["dest_node"]["id"] in nodes_ids)==False:
  335. nodes_ids.append(edge["dest_node"]["id"])
  336. nodes.append(edge["dest_node"])
  337. data = edge
  338. data.pop("src_node", None)
  339. data.pop("dest_node", None)
  340. if (edge["id"] in edge_ids ) == False:
  341. edges.append(data)
  342. edge_ids.append(edge["id"])
  343. return resp_200(data={"summary":{"count_in": count_in, "count_out":count_out},"nodes":nodes, "edges":edges})
  344. @router.get("/nodes-browse/{node_id}")
  345. def browse_node(node_id: int, db: Session = Depends(get_db)):
  346. total_remain = 999
  347. db_edges_count = 0 #db.query(DbKgEdge).filter(DbKgEdge.src_id == node_id).count()
  348. db_edges0_count = db.query(DbKgEdge).filter(DbKgEdge.dest_id == node_id, DbKgEdge.status==0, DbKgEdge.category=='belongs_to').count()
  349. total = db_edges_count + db_edges0_count
  350. if total == 0:
  351. return resp_200(data={"summary":{"count_in": db_edges0_count, "count_out":db_edges_count},"nodes":[], "edges":[]})
  352. factor = db_edges_count / total
  353. db_edges_count = total_remain * factor
  354. factor = db_edges0_count / total
  355. db_edges0_count = total_remain * factor
  356. db_edges = [] #db.query(DbKgEdge).filter(DbKgEdge.src_id == node_id).limit(ceil(db_edges_count))
  357. if db_edges is None:
  358. raise HTTPException(status_code=404, detail="Edge not found")
  359. try:
  360. db_edges0 = db.query(DbKgEdge).filter(DbKgEdge.dest_id == node_id, DbKgEdge.status==0).limit(ceil(db_edges0_count))
  361. except Exception as e:
  362. print(e)
  363. if db_edges0 is None:
  364. raise HTTPException(status_code=404, detail="Edge not found")
  365. nodes_ids = []
  366. edge_ids = []
  367. nodes = []
  368. edges = []
  369. for results in [db_edges, db_edges0]:
  370. edges_list = [KgEdge.model_validate(node) for node in results]
  371. edges_raw = [node.model_dump() for node in edges_list]
  372. for edge in edges_raw:
  373. if (edge["src_node"]["id"] in nodes_ids) == False:
  374. nodes.append(edge["src_node"])
  375. nodes_ids.append(edge["src_node"]["id"])
  376. #if (edge["dest_node"]["id"] in nodes_ids)==False:
  377. # nodes_ids.append(edge["dest_node"]["id"])
  378. # nodes.append(edge["dest_node"])
  379. data = edge
  380. data.pop("src_node", None)
  381. data.pop("dest_node", None)
  382. if (edge["id"] in edge_ids ) == False:
  383. edges.append(data)
  384. edge_ids.append(edge["id"])
  385. return resp_200(data={"summary":{"count_in": len(nodes), "count_out":0},"nodes":nodes, "edges":edges})
  386. @router.post("/node-merge")
  387. def merge_node(mergeData:List[KgNodeMerge],db: Session = Depends(get_db)):
  388. edges_merge = []
  389. edges_invalid = []
  390. edges_insert = []
  391. for merge in mergeData:
  392. print("merge from %d to %d" % (merge.src_id, merge.dest_id))
  393. #原有的到源节点的边要更新到目标节点
  394. edges = db.query(DbKgEdge.id, DbKgEdge.src_id, DbKgEdge.dest_id, DbKgEdge.category, DbKgEdge.name ).filter(DbKgEdge.dest_id==merge.src_id, DbKgEdge.status==0).all()
  395. for edge in edges:
  396. id, src_id, dest_id, category, name = edge
  397. edges_merge.append({"id":id, "in_out": "src_in", "old_dest_id": dest_id, "src_id":src_id, "dest_id":merge.dest_id, "category":category, "name":name, "status":0})
  398. #原有节点的出边也要更新到目标节点
  399. edges = db.query(DbKgEdge.id, DbKgEdge.src_id, DbKgEdge.dest_id, DbKgEdge.category, DbKgEdge.name ).filter(DbKgEdge.src_id==merge.src_id, DbKgEdge.status==0).all()
  400. for edge in edges:
  401. id, src_id, dest_id, category, name = edge
  402. edges_merge.append({"id":id, "in_out": "src_out", "old_src_id": src_id, "src_id":merge.dest_id, "dest_id":dest_id, "category":category, "name":name, "status":0})
  403. #原有的两个节点之间的边需要设置为无效
  404. edges_to_update = db.query(DbKgEdge.id, DbKgEdge.src_id, DbKgEdge.dest_id, DbKgEdge.category, DbKgEdge.name, DbKgEdge.status).filter(and_(DbKgEdge.src_id==merge.src_id, DbKgEdge.dest_id==merge.dest_id, DbKgEdge.status==0)).all()
  405. for edge in edges_to_update:
  406. id, src_id, dest_id, category, name, status = edge
  407. status = -1 #delete
  408. edges_invalid.append({"id":id, "in_out": "out", "src_id":src_id, "dest_id":dest_id, "category":category, "name":name, "status":status})
  409. edges_to_update = db.query(DbKgEdge.id, DbKgEdge.src_id, DbKgEdge.dest_id, DbKgEdge.category, DbKgEdge.name, DbKgEdge.status).filter(and_(DbKgEdge.dest_id==merge.src_id, DbKgEdge.src_id==merge.dest_id, DbKgEdge.status==0)).all()
  410. for edge in edges_to_update:
  411. id, src_id, dest_id, category, name, status = edge
  412. status = -1 #delete
  413. edges_invalid.append({"id":id, "in_out": "in", "src_id":src_id, "dest_id":dest_id, "category":category, "name":name, "status":status})
  414. #插入一条merge的边
  415. edges_insert.append({"src_id":merge.src_id, "dest_id":merge.dest_id, "category":'MERGE_TO', "name":'MERGE', "status":0, 'version':"1.0"})
  416. edges = []
  417. for edge in edges_merge:
  418. if edge['dest_id'] == edge['src_id']:
  419. print("circle detected, skip")
  420. continue
  421. print("edge merged %d to %d %s-%s" % (edge['src_id'], edge['dest_id'], edge['category'], edge['name']))
  422. count = db.query(DbKgEdge).filter(DbKgEdge.src_id==edge['src_id'], DbKgEdge.dest_id==edge['dest_id'], DbKgEdge.category==edge['category']).count()
  423. if count > 0:
  424. print("can not move edge because of target has same edge already existed")
  425. edge['status'] = -1
  426. if edge["in_out"] == "src_in":
  427. edge["dest_id"] = edge["old_dest_id"]
  428. if edge["in_out"] == "src_out":
  429. edge["src_id"] = edge["old_src_id"]
  430. edges_invalid.append(edge)
  431. else:
  432. db.query(DbKgEdge).filter(DbKgEdge.id == edge['id']).update({'dest_id':edge['dest_id']})
  433. for edge in edges_insert:
  434. print("edge inserted %d to %d %s-%s" % (edge['src_id'], edge['dest_id'], edge['category'], edge['name']))
  435. count = db.query(DbKgEdge).filter(DbKgEdge.src_id==edge['src_id'], DbKgEdge.dest_id==edge['dest_id'], DbKgEdge.category==edge['category']).count()
  436. if count > 0:
  437. print("can insert edge because of edge already existed")
  438. continue
  439. else:
  440. edgeData = DbKgEdge()
  441. edgeData.src_id = edge['src_id']
  442. edgeData.dest_id = edge['dest_id']
  443. edgeData.category = edge['category']
  444. edgeData.name = edge['name']
  445. edgeData.status = edge['status']
  446. edgeData.version = edge['version']
  447. db.add(edgeData)
  448. for edge in edges_invalid:
  449. print("edge invalid %d to %d %s-%s" % (edge['src_id'], edge['dest_id'], edge['category'], edge['name']))
  450. db.query(DbKgEdge).filter(DbKgEdge.id == edge['id']).update({'status':edge['status']})
  451. db.commit()
  452. return resp_200(data= {"edges": edges, "error_code": 0, "error_msg": ""})
  453. ################################### EDGE ###############################################
  454. @router.get("/edges/c/{category}")
  455. def read_links_by_category(category: str, db: Session = Depends(get_db)):
  456. edges_names = db.query(DbKgEdge.category, DbKgEdge.name).group_by(DbKgEdge.category,DbKgEdge.name)
  457. if edges_names is None:
  458. raise HTTPException(status_code=404, detail="Node not found")
  459. names_list = [KgEdgeName.model_validate(node).model_dump() for node in edges_names]
  460. return resp_200(data={"records":names_list})
  461. @router.post("/edge-create", response_model=KgEdge)
  462. def create_edge(edges: List[KgEdgeCreate], db: Session = Depends(get_db)):
  463. try:
  464. db_edges = []
  465. db_edge_ids = []
  466. for edge in edges:
  467. if edge.src_id != edge.dest_id:
  468. existed_edges = db.query(DbKgEdge).filter(DbKgEdge.graph_id==edge.graph_id, DbKgEdge.src_id==edge.src_id, DbKgEdge.dest_id==edge.dest_id, DbKgEdge.category == edge.category).all()
  469. count = 0
  470. for ex_edge in existed_edges:
  471. ex_edge.status = 0
  472. db_edge_ids.append(ex_edge.id)
  473. count = count + 1
  474. if count == 0:
  475. db_edges.append(DbKgEdge(**edge.model_dump()))
  476. db_edge_ids.append((edge.src_id, edge.dest_id))
  477. error_msg = ""
  478. code = 0
  479. edges = []
  480. db.add_all(db_edges)
  481. db.commit()
  482. for ids in db_edge_ids:
  483. src_id, dest_id = ids
  484. db_edges = db.query(DbKgEdge).filter(and_(DbKgEdge.src_id==src_id, DbKgEdge.dest_id==dest_id)).all()
  485. for edge in db_edges:
  486. edge_raw = KgEdge.model_validate(edge).model_dump()
  487. edge_raw.pop("src_node", None)
  488. edge_raw.pop("dest_node", None)
  489. edges.append(edge_raw)
  490. except Exception as e:
  491. error_msg = str(e)
  492. code = -1
  493. return resp_200(data= {"error_code": code, "error_msg": error_msg, "edges": edges})
  494. @router.get("/edge-of-node/{node_id}")
  495. def get_edge_by_node(node_id:int, db: Session = Depends(get_db)):
  496. edges_in = db.query(DbKgEdge).filter(or_(DbKgEdge.src_id == node_id, DbKgEdge.dest_id==node_id)).order_by(DbKgEdge.id).all()
  497. edges = [KgEdge.model_validate(edge) for edge in edges_in]
  498. return resp_200(data={"edges":[edge.model_dump() for edge in edges]})
  499. @router.get("/edge-delete/{edge_id}/{status}")
  500. def delete_edge(edge_id:int, status:int, db: Session = Depends(get_db)):
  501. if status == 0 or status == -1:
  502. db.query(DbKgEdge).filter(DbKgEdge.id == edge_id).update({"status": status })
  503. db.commit()
  504. if status == -99:
  505. #delete
  506. db.query(DbKgEdge).filter(DbKgEdge.id == edge_id).delete()
  507. db.commit()
  508. return resp_200(data={"id":edge_id,'error_code':0, 'message':'Edge status updated'})
  509. ################################### SUB GRAPH ##########################################
  510. @router.get("/workspace-load")
  511. def get_all_sub_graph(db: Session = Depends(get_db)):
  512. db_datas = db.query(DbKgSubGraph).filter(DbKgSubGraph.status == 0).all()
  513. validate_data = [KgSubGraph.model_validate(data) for data in db_datas]
  514. return resp_200(data={"graphs":[data.model_dump() for data in validate_data]})
  515. @router.get("/workspace-get/{graph_id}")
  516. def sub_graph_load(graph_id: int, db: Session = Depends(get_db)):
  517. graph_data = db.query(DbKgSubGraph).filter(DbKgSubGraph.id == graph_id).first()
  518. nodes = []
  519. edges = []
  520. if graph_data:
  521. json_data = json.loads(graph_data.graph_content)
  522. node_ids = []
  523. for node in json_data["nodes"]:
  524. node_ids.append(node["id"])
  525. nodes_data = db.query(DbKgNode).filter(DbKgNode.id.in_(node_ids)).all()
  526. edges_in = db.query(DbKgEdge).filter(DbKgEdge.dest_id.in_(node_ids), DbKgEdge.status==0).all()
  527. edges_out = db.query(DbKgEdge).filter(DbKgEdge.src_id.in_(node_ids), DbKgEdge.status==0).all()
  528. all_edges = edges_in + edges_out
  529. node_ids = []
  530. for node in nodes_data:
  531. nodes.append(KgNode.model_validate(node))
  532. node_ids.append(node.id)
  533. for edge in all_edges:
  534. if edge.src_id in node_ids and edge.dest_id in node_ids and edge.status >= 0:
  535. edges.append(KgEdge.model_validate(edge))
  536. return resp_200(data={"nodes":[node.model_dump() for node in nodes],"edges":[edge.model_dump() for edge in edges]})
  537. #TODO : retrieve all data and edges
  538. @router.post("/workspace-update")
  539. def update_sub_graph(data:KgSubGraph, db:Session = Depends(get_db)):
  540. db.query(DbKgSubGraph).filter(DbKgSubGraph.id == data.id).update({'graph_name':data.graph_name,'graph_content': data.graph_content})
  541. db.commit()
  542. return resp_200(data= {"id": data.id, "error_code": 0, "error_msg": ""})
  543. @router.post("/workspace-create")
  544. def create_sub_graph(data:KgSubGraphCreate,db: Session = Depends(get_db)):
  545. count = db.query(DbKgSubGraph).filter(DbKgSubGraph.graph_name == data.graph_name, DbKgSubGraph.status==0).count()
  546. if count > 0:
  547. return resp_200(data= {"id": 0, "error_code": 1, "error_msg": "Graph already existed"})
  548. db_data = DbKgSubGraph()
  549. db_data.graph_name = data.graph_name
  550. db_data.graph_content = data.graph_content
  551. db_data.status = data.status
  552. db.add(db_data)
  553. db.commit()
  554. db.refresh(db_data)
  555. return resp_200(data= {"id": db_data.id, "error_code": 0, "error_msg": ""})
  556. @router.get("/workspace-delete/{graph_id}")
  557. def sub_graph_delete(graph_id: int, db: Session = Depends(get_db)):
  558. db.query(DbKgSubGraph).filter(DbKgSubGraph.id == graph_id).delete()
  559. db.commit()
  560. return resp_200(data= {"id": graph_id, "error_code": 0, "error_msg": ""})
  561. @router.post("/workspace-validate")
  562. def sub_graph_validate(data:KgSubGraphCreate, db: Session = Depends(get_db)):
  563. graph_data = data.graph_content
  564. nodes = []
  565. edges = []
  566. if graph_data:
  567. json_data = json.loads(graph_data)
  568. node_ids = []
  569. for node in json_data["nodes"]:
  570. node_ids.append(node["id"])
  571. nodes_data = db.query(DbKgNode).filter(DbKgNode.id.in_(node_ids)).all()
  572. edges_in = db.query(DbKgEdge).filter(and_(DbKgEdge.dest_id.in_(node_ids), DbKgEdge.status==0)).order_by(DbKgEdge.id).all()
  573. edges_out = db.query(DbKgEdge).filter(and_(DbKgEdge.src_id.in_(node_ids), DbKgEdge.status==0)).order_by(DbKgEdge.id).all()
  574. all_edges = edges_in + edges_out
  575. node_ids = []
  576. edge_ids = []
  577. for node in nodes_data:
  578. nodes.append(KgNode.model_validate(node))
  579. node_ids.append(node.id)
  580. for edge in all_edges:
  581. if edge.id in edge_ids:
  582. continue
  583. if edge.src_id in node_ids and edge.dest_id in node_ids:
  584. data = KgEdge.model_validate(edge).model_dump()
  585. data.pop("src_node", None)
  586. data.pop("dest_node", None)
  587. edges.append(data)
  588. edge_ids.append(edge.id)
  589. return resp_200(data={ "error_code": 0, "error_msg": "", "nodes":[node.model_dump() for node in nodes],"edges":edges})
  590. ################################### DICT ###############################################
  591. @router.get("/dict/icd/{page}/{page_size}")
  592. def read_icd_page(page:int, page_size:int, db:Session=Depends(get_db)):
  593. count = db.query(DbDictICD).count()
  594. total_page = ceil(count / page_size)
  595. start = 1
  596. if page <= total_page:
  597. start = (page-1) * page_size
  598. results = db.query(DbDictICD).limit(page_size).offset(start).all()
  599. codes = [DictICD.model_validate(node) for node in results]
  600. return resp_200(data= {"total":count, "pages": page, "size":page_size, "records":[item.model_dump() for item in codes]})
  601. @router.get("/dict/icd/search/{page}/{page_size}/{name}")
  602. def search_icd(page:int, page_size:int, name:str, db:Session=Depends(get_db)):
  603. count = db.query(DbDictICD).filter(DbDictICD.icd_name.like("%"+name+"%")).count()
  604. total_page = ceil(count / page_size)
  605. start = 1
  606. if page <= total_page:
  607. start = (page-1) * page_size
  608. results = db.query(DbDictICD).filter(DbDictICD.icd_name.like("%"+name+"%")).limit(page_size).offset(start).all()
  609. codes = [DictICD.model_validate(node) for node in results]
  610. return resp_200(data= {"total":count, "pages": page, "size":page_size, "records":[item.model_dump() for item in codes]})
  611. @router.get("/dict/drg/{page}/{page_size}")
  612. def get_drg_page(page:int, page_size:int, db:Session=Depends(get_db)):
  613. count = db.query(DbDictDRG).count()
  614. total_page = ceil(count / page_size)
  615. start = 1
  616. if page <= total_page:
  617. start = (page-1) * page_size
  618. results = db.query(DbDictDRG).limit(page_size).offset(start).all()
  619. codes = [DictDRG.model_validate(node) for node in results]
  620. return resp_200(data= {"total":count, "pages": page, "size":page_size, "records":[item.model_dump() for item in codes]})
  621. @router.get("/drg/search/{page}/{page_size}/{name}")
  622. def search_drg(page:int, page_size:int, name:str, db:Session=Depends(get_db)):
  623. count = db.query(DbDictDRG).filter(DbDictDRG.drg_name.like("%"+name+"%")).count()
  624. total_page = ceil(count / page_size)
  625. start = 1
  626. if page <= total_page:
  627. start = (page-1) * page_size
  628. results = db.query(DbDictDRG).filter(DbDictDRG.drg_name.like("%"+name+"%")).limit(page_size).offset(start).all()
  629. codes = [DictDRG.model_validate(node) for node in results]
  630. return resp_200(data= {"total":count, "pages": page, "size":page_size, "records":[item.model_dump() for item in codes]})
  631. @router.get("/dict/drug/{page}/{page_size}")
  632. def read_drug_page(page:int, page_size:int, db:Session=Depends(get_db)):
  633. count = db.query(DbDictDrug).count()
  634. total_page = ceil(count / page_size)
  635. start = 1
  636. if page <= total_page:
  637. start = (page-1) * page_size
  638. results = db.query(DbDictDrug).limit(page_size).offset(start).all()
  639. codes = [DictDrug.model_validate(node) for node in results]
  640. return resp_200(data= {"total":count, "pages": page, "size":page_size, "records":[item.model_dump() for item in codes]})
  641. @router.get("/dict/drug/search/{page}/{page_size}/{name}")
  642. def search_drg(page:int, page_size:int, name:str, db:Session=Depends(get_db)):
  643. count = db.query(DbDictDrug).filter(or_(DbDictDrug.reg_name.like("%"+name+"%"), DbDictDrug.prod_factory.like("%"+name+"%"))).count()
  644. total_page = ceil(count / page_size)
  645. start = 1
  646. if page <= total_page:
  647. start = (page-1) * page_size
  648. results = db.query(DbDictDrug).filter(or_(DbDictDrug.reg_name.like("%"+name+"%"), DbDictDrug.prod_factory.like("%"+name+"%"))).limit(page_size).offset(start).all()
  649. codes = [DictDrug.model_validate(node) for node in results]
  650. return resp_200(data= {"total":count, "pages": page, "size":page_size, "records":[item.model_dump() for item in codes]})
  651. #######################################################################################
  652. '''
  653. @router.put("/api/nodes/{node_id}", response_model=KgNode)
  654. def update_node(node_id: int, node: KgNodeCreate, db: Session = Depends(get_db)):
  655. db_node = db.query(DbKgNode).filter(DbKgNode.id == node_id).first()
  656. if db_node is None:
  657. raise HTTPException(status_code=404, detail="Node not found")
  658. for key, value in node.dict().items():
  659. setattr(db_node, key, value)
  660. db.commit()
  661. db.refresh(db_node)
  662. return db_node
  663. @router.get("/api/edges/{edge_id}", response_model=KgEdge)
  664. def read_edge(edge_id: int, db: Session = Depends(get_db)):
  665. db_edge = db.query(DbKgEdge).filter(DbKgEdge.id == edge_id).first()
  666. if db_edge is None:
  667. raise HTTPException(status_code=404, detail="Edge not found")
  668. return db_edge
  669. @router.put("/api/edges/{edge_id}", response_model=KgEdge)
  670. def update_edge(edge_id: int, edge: KgEdgeCreate, db: Session = Depends(get_db)):
  671. db_edge = db.query(DbKgEdge).filter(DbKgEdge.id == edge_id).first()
  672. if db_edge is None:
  673. raise HTTPException(status_code=404, detail="Edge not found")
  674. for key, value in edge.dict().items():
  675. setattr(db_edge, key, value)
  676. db.commit()
  677. db.refresh(db_edge)
  678. return db_edge
  679. @router.delete("/api/edges/{edge_id}", response_model=KgEdge)
  680. def delete_edge(edge_id: int, db: Session = Depends(get_db)):
  681. db_edge = db.query(DbKgEdge).filter(DbKgEdge.id == edge_id).first()
  682. if db_edge is None:
  683. raise HTTPException(status_code=404, detail="Edge not found")
  684. db.delete(db_edge)
  685. db.commit()
  686. return db_edge
  687. @router.post("/api/props/", response_model=KgProp)
  688. def create_prop(prop: KgPropCreate, db: Session = Depends(get_db)):
  689. db_prop = DbKgProp(**prop.dict())
  690. db.add(db_prop)
  691. db.commit()
  692. db.refresh(db_prop)
  693. return db_prop
  694. @router.get("/api/props/{prop_id}", response_model=KgProp)
  695. def read_prop(prop_id: int, db: Session = Depends(get_db)):
  696. db_prop = db.query(models.KgProp).filter(models.KgProp.id == prop_id).first()
  697. if db_prop is None:
  698. raise HTTPException(status_code=404, detail="Property not found")
  699. return db_prop
  700. @router.put("/api/props/{prop_id}", response_model=KgProp)
  701. def update_prop(prop_id: int, prop: KgPropCreate, db: Session = Depends(get_db)):
  702. db_prop = db.query(models.KgProp).filter(models.KgProp.id == prop_id).first()
  703. if db_prop is None:
  704. raise HTTPException(status_code=404, detail="Property not found")
  705. for key, value in prop.dict().items():
  706. setattr(db_prop, key, value)
  707. db.commit()
  708. db.refresh(db_prop)
  709. return db_prop
  710. @router.delete("/api/props/{prop_id}", response_model=KgProp)
  711. def delete_prop(prop_id: int, db: Session = Depends(get_db)):
  712. db_prop = db.query(models.KgProp).filter(models.KgProp.id == prop_id).first()
  713. if db_prop is None:
  714. raise HTTPException(status_code=404, detail="Property not found")
  715. db.delete(db_prop)
  716. db.commit()
  717. return db_prop
  718. '''
  719. graph_mgr_router = router