kb_router.py 20 KB


  1. import sys,os
  2. current_path = os.getcwd()
  3. sys.path.append(current_path)
  4. from config.site import SiteConfig
  5. from fastapi import APIRouter, Depends, Query
  6. from db.database import get_db
  7. from sqlalchemy.orm import Session
  8. from sqlalchemy import text
  9. from agent.models.web.response import StandardResponse,FAILED,SUCCESS
  10. from agent.models.web.request import BasicRequest
  11. from agent.libs.graph import GraphBusiness
  12. from agent.libs.auth import verify_session_id, SessionValues
  13. import logging
  14. from typing import Optional, List
  15. from agent.models.db.graph import DbKgNode
  16. from agent.models.db.tree_structure import TreeStructure,KgGraphCategory
  17. import string
  18. import json
  19. from agent.tree_utils import get_tree_dto
  20. router = APIRouter(prefix="/kb", tags=["knowledge build interface"])
  21. logger = logging.getLogger(__name__)
  22. config = SiteConfig()
  23. LOG_DIR = config.get_config("TASK_LOG_DIR", current_path)
  24. # job_category = Column(String(64), nullable=False)
  25. # job_name = Column(String(64))
  26. # job_details = Column(Text, nullable=False)
  27. # job_creator = Column(String(64), nullable=False)
  28. # job_logs = Column(Text, nullable=True)
  29. # job_files = Column(String(300), nullable=True)
  30. @router.post('/summary', response_model=StandardResponse)
  31. def summary_func(request:BasicRequest, db: Session = Depends(get_db), sess:SessionValues = Depends(verify_session_id))->StandardResponse:
  32. if request.action != "get_summary":
  33. return StandardResponse(code=FAILED, message="invalid action")
  34. graph_id = request.get_param("graph_id",0)
  35. biz = GraphBusiness(db)
  36. summary = biz.get_graph_summary(graph_id=graph_id)
  37. if summary:
  38. logger.info(summary)
  39. return StandardResponse(code=SUCCESS, message="summary found", records=[summary])
  40. else:
  41. return StandardResponse(code=FAILED, message="summary not found",records=[])
  42. @router.post('/schemas', response_model=StandardResponse)
  43. def schemas_func(request:BasicRequest, db: Session = Depends(get_db), sess:SessionValues = Depends(verify_session_id))->StandardResponse:
  44. if request.action== "get_nodes_schemas":
  45. graph_id = request.get_param("graph_id",0)
  46. biz = GraphBusiness(db)
  47. schemas = biz.get_nodes_categories(graph_id=graph_id)
  48. if schemas:
  49. return StandardResponse(code=SUCCESS, message="schemas found", records=schemas)
  50. if request.action== "get_edges_schemas":
  51. graph_id = request.get_param("graph_id",0)
  52. biz = GraphBusiness(db)
  53. schemas = biz.get_edges_categories(graph_id=graph_id)
  54. if schemas:
  55. return StandardResponse(code=SUCCESS, message="schemas found", records=schemas)
  56. return StandardResponse(code=FAILED, message="invalid action")
  57. @router.post('/nodes', response_model=StandardResponse)
  58. def nodes_func(request:BasicRequest, db: Session = Depends(get_db), sess:SessionValues = Depends(verify_session_id))->StandardResponse:
  59. if (request.action == "search_nodes"):
  60. node_name = request.get_param("name","")
  61. category = request.get_param("category","")
  62. graph_id = request.get_param("graph_id",0)
  63. biz = GraphBusiness(db)
  64. if node_name == "":
  65. return StandardResponse(code=FAILED, message="node name is empty", records=[])
  66. if category == "":
  67. return StandardResponse(code=FAILED, message="category is empty", records=[])
  68. if graph_id == 0:
  69. return StandardResponse(code=FAILED, message="graph id is empty", records=[])
  70. nodes = biz.search_like_node_by_name(graph_id=graph_id, category=category, name=node_name)
  71. if nodes:
  72. return StandardResponse(code=SUCCESS, message="nodes found", records=nodes)
  73. else:
  74. return StandardResponse(code=FAILED, message="search job failed")
  75. elif (request.action == "get_nodes"):
  76. graph_id = request.get_param("graph_id",0)
  77. page = request.get_param("page",1)
  78. page_size = request.get_param("page_size",1)
  79. biz = GraphBusiness(db)
  80. nodes = biz.get_nodes_by_page(graph_id=graph_id, page=page, page_size=page_size)
  81. if nodes:
  82. return StandardResponse(code=SUCCESS, message="nodes found", records=nodes)
  83. elif (request.action == "neighbors"):
  84. graph_id = request.get_param("graph_id",0)
  85. node_id = request.get_param("node_id",0)
  86. if node_id>0:
  87. biz = GraphBusiness(db)
  88. node = biz.get_node_by_id(graph_id=graph_id, node_id=node_id)
  89. if node is None:
  90. return StandardResponse(code=FAILED, message="node not found", records=[])
  91. nodes_in = biz.get_neighbors(graph_id=graph_id, node_id=node_id, direction="in")
  92. nodes_out = biz.get_neighbors(graph_id=graph_id, node_id=node_id, direction="out")
  93. nodes_all = []
  94. nodes_all.append({"id": node.id, "name":node.name, "category":node.category, "direction":"self"})
  95. for node in nodes_in:
  96. nodes_all.append({"id": node.id, "name":node.name, "category":node.category, "direction":"in"})
  97. for node in nodes_out:
  98. nodes_all.append({"id": node.id, "name":node.name, "category":node.category, "direction":"out"})
  99. return StandardResponse(code=SUCCESS, message="nodes found", records=nodes_all)
  100. return StandardResponse(code=FAILED, message="invalid action")
  101. async def get_node_properties(node_id: int, db: Session) -> dict:
  102. """
  103. 查询节点的属性
  104. :param node_id: 节点ID
  105. :param db: 数据库会话
  106. :return: 属性字典
  107. """
  108. prop_sql = text("SELECT prop_title, prop_value FROM kg_props WHERE ref_id = :node_id")
  109. result = db.execute(prop_sql, {'node_id': node_id}).fetchall()
  110. properties = {}
  111. for row in result:
  112. properties[row._mapping['prop_title']] = row._mapping['prop_value']
  113. return properties
  114. @router.get("/graph_data", response_model=StandardResponse)
  115. async def get_graph_data(
  116. label_name: str,
  117. user_id: int,
  118. graph_id: int,
  119. db: Session = Depends(get_db),
  120. input_str: Optional[str] = None
  121. ):
  122. """
  123. 获取用户关联的图谱数据
  124. - 从session_id获取user_id
  125. - 查询DbUserDataRelation获取用户关联的数据
  126. - 返回与Java端一致的数据结构
  127. """
  128. try:
  129. # 1. 从session获取user_id
  130. # user_id = sess.user_id
  131. # if not user_id:
  132. # return StandardResponse(code=FAILED, message="user not found", records=[])
  133. # 处理input_str为空的情况
  134. if not input_str:
  135. # 根据user_id、graph_id和label_name从kg_nodes表中获取一个name
  136. get_name_sql = text("""
  137. SELECT n.name
  138. FROM user_data_relations udr
  139. JOIN kg_nodes n ON udr.data_id = n.id AND udr.data_category = 'DbKgNode'
  140. WHERE udr.user_id = :user_id
  141. AND n.category = :label_name
  142. AND n.status = '0'
  143. AND n.graph_id = :graph_id
  144. LIMIT 1
  145. """)
  146. name_result = db.execute(get_name_sql, {
  147. 'user_id': user_id,
  148. 'label_name': label_name,
  149. 'graph_id': graph_id
  150. }).fetchone()
  151. if not name_result:
  152. return StandardResponse(code=FAILED, message="No node found for given parameters", records=[])
  153. input_str = name_result._mapping['name']
  154. # 2. 使用JOIN查询用户关联的图谱数据
  155. sql = text("""
  156. WITH RankedRelations AS (
  157. SELECT
  158. e.name as rType,
  159. m.id as target_id,
  160. m.name as target_name,
  161. m.category as target_label,
  162. (SELECT COUNT(*) FROM kg_edges WHERE src_id = m.id) as pCount,
  163. ROW_NUMBER() OVER(PARTITION BY e.name ORDER BY m.id) as rn
  164. FROM user_data_relations udr
  165. JOIN kg_nodes n ON udr.data_id = n.id AND udr.data_category = 'DbKgNode'
  166. JOIN kg_edges e ON n.id = e.src_id
  167. JOIN kg_nodes m ON e.dest_id = m.id
  168. WHERE udr.user_id = :user_id
  169. AND n.category = :label_name
  170. AND n.name = :input_str
  171. AND n.status = '0'
  172. AND n.graph_id = :graph_id
  173. )
  174. SELECT rType, target_id, target_name, target_label, pCount
  175. FROM RankedRelations
  176. WHERE rn <= 50
  177. ORDER BY rType
  178. """)
  179. # 3. 组装返回数据结构
  180. categories = [{"name": "中心词"}, {"name": "关系"}]
  181. nodes = []
  182. links = []
  183. # 查询中心节点
  184. center_sql = text("""
  185. SELECT n.id, n.name, n.category
  186. FROM user_data_relations udr
  187. JOIN kg_nodes n ON udr.data_id = n.id AND udr.data_category = 'DbKgNode'
  188. WHERE udr.user_id = :user_id
  189. AND n.category = :label_name
  190. AND n.name = :input_str
  191. AND n.status = '0' AND n.graph_id= :graph_id limit 1
  192. """)
  193. # 执行查询并处理结果
  194. center_node = None
  195. c_map = {"中心词": 0, "关系": 1}
  196. node_id = 0
  197. # 构建graph_dto数据结构
  198. graph_dto = {
  199. "label": "",
  200. "name": "",
  201. "id": 0,
  202. "properties": {},
  203. "ENodeRSDTOS": []
  204. }
  205. # 1. 查询中心节点
  206. center_result = db.execute(center_sql, {
  207. 'user_id': user_id,
  208. 'label_name': label_name,
  209. 'input_str': input_str,
  210. 'graph_id': graph_id
  211. }).fetchall()
  212. if center_result:
  213. for row in center_result:
  214. graph_dto["label"] = row._mapping['category']
  215. graph_dto["name"] = row._mapping['name']
  216. graph_dto["id"] = row._mapping['id']
  217. graph_dto["properties"] = await get_node_properties(row._mapping['id'], db)
  218. break
  219. # 2. 查询关联的边和目标节点
  220. relation_result = db.execute(sql, {
  221. 'user_id': user_id,
  222. 'label_name': label_name,
  223. 'input_str': input_str,
  224. 'graph_id': graph_id
  225. }).fetchall()
  226. if relation_result:
  227. rs_id = 2
  228. for row in relation_result:
  229. r_type = row._mapping['rtype']
  230. # 添加到graph_dto
  231. e_node_rs = {
  232. "RType": r_type,
  233. "ENodeDTOS": [{
  234. "Label": row._mapping['target_label'],
  235. "Name": row._mapping['target_name'],
  236. "Id": row._mapping['target_id'],
  237. "PCount": row._mapping['pcount'],
  238. "properties": await get_node_properties(row._mapping['target_id'], db)
  239. }]
  240. }
  241. # 检查是否已有该关系类型
  242. existing_rs = next((rs for rs in graph_dto["ENodeRSDTOS"] if rs["RType"] == r_type), None)
  243. if existing_rs:
  244. existing_rs["ENodeDTOS"].extend(e_node_rs["ENodeDTOS"])
  245. else:
  246. graph_dto["ENodeRSDTOS"].append(e_node_rs)
  247. if r_type not in c_map:
  248. c_map[r_type] = rs_id
  249. categories.append({"name": r_type})
  250. rs_id += 1
  251. print("graph_dto:", graph_dto) # 打印graph_dto
  252. # 构建中心节点
  253. center_node = {
  254. "label": graph_dto["name"],
  255. 'type': graph_dto["label"],
  256. "category": 0,
  257. "name": "0",
  258. # "id": graph_dto["id"],
  259. "symbol": "circle",
  260. "symbolSize": 50,
  261. "properties": graph_dto["properties"],
  262. "nodeId": graph_dto["id"],
  263. "itemStyle": {"display": True}
  264. }
  265. nodes.append(center_node)
  266. # 处理关系类型
  267. rs_id = 2
  268. for rs in graph_dto["ENodeRSDTOS"]:
  269. r_type = rs["RType"]
  270. if r_type not in c_map:
  271. c_map[r_type] = rs_id
  272. categories.append({"name": r_type})
  273. rs_id += 1
  274. # 关系节点
  275. relation_node = {
  276. "label": "",
  277. 'type': graph_dto["label"],
  278. "category": 1,
  279. "name": str(len(nodes)),
  280. # "id": len(nodes),
  281. "symbol": "diamond",
  282. "symbolSize": 10,
  283. "properties": graph_dto["properties"],
  284. "nodeId": len(nodes),
  285. "itemStyle": {"display": True}
  286. }
  287. nodes.append(relation_node)
  288. # 添加链接
  289. links.append({
  290. "source": "0",
  291. "target": str(nodes.index(relation_node)),
  292. "value": r_type,
  293. "relationType": r_type
  294. })
  295. # 处理子节点
  296. for e_node in rs["ENodeDTOS"]:
  297. item_style = {"display": e_node["PCount"] > 0}
  298. child_node = {
  299. "label": e_node["Name"],
  300. "type": e_node["Label"],
  301. "category": c_map[r_type],
  302. "name": str(len(nodes)),
  303. # "id": e_node["Id"],
  304. "symbol": "circle",
  305. "symbolSize": 28,
  306. "properties": e_node["properties"],
  307. "nodeId": e_node["Id"],
  308. "itemStyle": item_style
  309. }
  310. nodes.append(child_node)
  311. links.append({
  312. "source": str(nodes.index(relation_node)),
  313. "target": str(nodes.index(child_node)),
  314. "value": "",
  315. "relationType": r_type
  316. })
  317. final_data = {
  318. "categories": categories,
  319. "node": nodes,
  320. "links": links
  321. }
  322. return StandardResponse(
  323. records=[{"records": final_data}],
  324. message="Graph data retrieved"
  325. )
  326. except Exception as e:
  327. return StandardResponse(
  328. code=500,
  329. message=str(e)
  330. )
  331. @router.get("/user_sub_graphs", response_model=StandardResponse)
  332. async def get_user_sub_graphs(
  333. user_id: int,
  334. pageNo: int = 1,
  335. pageSize: int = 10,
  336. db: Session = Depends(get_db)
  337. ):
  338. """
  339. 获取用户关联的子图列表
  340. - 根据user_id和data_category='sub_graph'查询user_data_relations表
  341. - 关联jobs表获取job_name
  342. - 返回data_id和job_name列表
  343. - 支持分页查询,参数pageNo(默认1)和pageSize(默认10)
  344. """
  345. try:
  346. # 查询用户关联的子图
  347. offset = (pageNo - 1) * pageSize
  348. sql = text("""
  349. SELECT udr.data_id, j.job_name
  350. FROM user_data_relations udr
  351. LEFT JOIN jobs j ON udr.data_id = j.id
  352. WHERE udr.user_id = :user_id
  353. AND udr.data_category = 'sub_graph' order by udr.data_id desc
  354. LIMIT :pageSize OFFSET :offset
  355. """)
  356. result = db.execute(sql, {'user_id': user_id, 'pageSize': pageSize, 'offset': offset}).fetchall()
  357. records = []
  358. for row in result:
  359. records.append({
  360. "graph_id": row._mapping['data_id'],
  361. "graph_name": row._mapping['job_name']
  362. })
  363. return StandardResponse(
  364. records=records,
  365. message="User sub graphs retrieved"
  366. )
  367. except Exception as e:
  368. return StandardResponse(
  369. code=500,
  370. message=str(e)
  371. )
  372. def build_disease_tree(disease_nodes: list, root_name: str = "疾病") -> dict:
  373. """
  374. 构建疾病树状结构的公共方法
  375. :param disease_nodes: 疾病节点列表,每个节点需包含name属性
  376. :param root_name: 根节点名称,默认为"疾病"
  377. :return: 树状结构字典
  378. """
  379. if not disease_nodes:
  380. return {"name": root_name, "sNode": []}
  381. # 按拼音首字母分类
  382. letter_groups = {letter: [] for letter in string.ascii_uppercase}
  383. letter_groups['其他'] = []
  384. for node in disease_nodes:
  385. name = node.name if hasattr(node, 'name') else str(node)
  386. first_letter = get_first_letter(name)
  387. letter_groups[first_letter].append(name)
  388. # 构建JSON结构
  389. tree_structure = {
  390. "name": root_name,
  391. "sNode": []
  392. }
  393. # 先添加A-Z的分类
  394. for letter in string.ascii_uppercase:
  395. if letter_groups[letter]:
  396. letter_node = {
  397. "name": letter,
  398. "sNode": [{"name": disease, "sNode": []} for disease in sorted(letter_groups[letter])]
  399. }
  400. tree_structure["sNode"].append(letter_node)
  401. # 最后添加"其他"分类(如果有的话)
  402. if letter_groups['其他']:
  403. other_node = {
  404. "name": "其他",
  405. "sNode": [{"name": disease, "sNode": []} for disease in sorted(letter_groups['其他'])]
  406. }
  407. tree_structure["sNode"].append(other_node)
  408. # content=json.dumps(tree_structure, ensure_ascii=False)
  409. # print(content)
  410. # tree_dto=get_tree_dto(content)
  411. # print(tree_dto)
  412. return tree_structure
  413. def get_first_letter(word):
  414. """获取中文词语的拼音首字母"""
  415. if not word:
  416. return '其他'
  417. # 获取第一个汉字的拼音首字母
  418. first_char = word[0]
  419. try:
  420. import pypinyin
  421. first_letter = pypinyin.pinyin(first_char, style=pypinyin.FIRST_LETTER)[0][0].upper()
  422. return first_letter if first_letter in string.ascii_uppercase else '其他'
  423. except Exception as e:
  424. print(str(e))
  425. return '其他'
  426. @router.get('/disease_tree')
  427. async def get_disease_tree(graph_id: int, db: Session = Depends(get_db)):
  428. """
  429. 根据graph_id查询kg_nodes表中category是疾病的数据并构建树状结构
  430. 严格按照字母A-Z顺序进行归类,中文首字母归类到对应拼音首字母
  431. """
  432. # 查询疾病节点
  433. disease_nodes = db.query(DbKgNode).filter(
  434. DbKgNode.graph_id == graph_id,
  435. DbKgNode.category == '疾病'
  436. ).all()
  437. tree_structure = build_disease_tree(disease_nodes)
  438. return StandardResponse(records=[{"records": tree_structure}])
  439. @router.get('/graph_categories')
  440. async def get_graph_categories(
  441. user_id: int,
  442. graph_id: int,
  443. db: Session = Depends(get_db)
  444. ):
  445. """
  446. 根据user_id和graph_id查询kg_graph_category表中的category列表
  447. 返回category的字符串列表(按照id正序排列)
  448. """
  449. try:
  450. # 查询category列表(按照id正序)
  451. categories = db.query(KgGraphCategory.category).filter(
  452. KgGraphCategory.user_id == user_id,
  453. KgGraphCategory.graph_id == graph_id
  454. ).order_by(KgGraphCategory.id).all()
  455. if not categories:
  456. return StandardResponse(code=FAILED, message="No categories found")
  457. # 转换为字符串列表
  458. category_list = [category[0] for category in categories]
  459. return StandardResponse(
  460. records=[{"records": category_list}],
  461. message="Graph categories retrieved"
  462. )
  463. except Exception as e:
  464. return StandardResponse(
  465. code=500,
  466. message=str(e)
  467. )
  468. @router.get('/tree_structure')
  469. async def get_tree_structure(
  470. user_id: int,
  471. graph_id: int,
  472. db: Session = Depends(get_db)
  473. ):
  474. """
  475. 根据user_id和graph_id获取树状结构数据
  476. 1. 查询kg_tree_structures表获取content
  477. 2. 调用get_tree_dto方法转换数据格式
  478. 3. 返回转换后的数据
  479. """
  480. try:
  481. # 查询树状结构数据
  482. tree_structure = db.query(TreeStructure).filter(
  483. TreeStructure.user_id == user_id,
  484. TreeStructure.graph_id == graph_id
  485. ).first()
  486. if not tree_structure:
  487. return StandardResponse(code=FAILED, message="Tree structure not found")
  488. # 转换数据格式
  489. tree_dto = get_tree_dto(tree_structure.content)
  490. return StandardResponse(
  491. records=[{"records": tree_dto}],
  492. message="Tree structure retrieved"
  493. )
  494. except Exception as e:
  495. return StandardResponse(
  496. code=500,
  497. message=str(e)
  498. )
  499. kb_router = router