123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- from re import S
- from fastapi import FastAPI, HTTPException
- from fastapi import APIRouter, Depends, Query
- # from networkx import graph
- from pydantic import BaseModel
- from typing import List, Optional, Dict
- from sqlalchemy import create_engine, Column, Integer, String, Boolean, JSON
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm import properties, sessionmaker
- from sqlalchemy.orm import Session
- import logging
- from agent.libs.graph import GraphBusiness
- from agent.libs.user_data_relation import UserDataRelationBusiness
- from agent.libs.user import UserBusiness
- from agent.db.database import get_db
- from agent.models.web.response import StandardResponse,FAILED,SUCCESS
- router = APIRouter(prefix="/graph_mg", tags=["knowledge graph management interface"])
- logger = logging.getLogger(__name__)
- # Pydantic models
- class CreateEntity(BaseModel):
- user_id: int
- graph_id: int
- label: str
- name: str
- properties: Optional[Dict] = None
- class DeleteEntity(BaseModel):
- user_id: int
- graph_id: int
- node_id: int
- class UpdateEntity(BaseModel):
- user_id: int
- graph_id: int
- node_id: int
- name: str
- class FindEntity(BaseModel):
- user_id: int
- graph_id: int
- node_id: int
- class SearchEntity(BaseModel):
- user_id: int
- graph_id: int
- label: str
- name: str
- class EntityVO(BaseModel):
- user_id: int
- graph_id: int
- label: str
- node_id: int
- properties: Dict
- class DeleteProperty(BaseModel):
- user_id: int
- graph_id: int
- node_id: int
- property_name: str
- class UpdateProperty(BaseModel):
- user_id: int
- graph_id: int
- node_id: int
- property_name: str
- property_value: str
- class RelationshipVO(BaseModel):
- user_id: int
- graph_id: int
- start_id: int
- end_id: int
- start_label: str
- end_label: str
- relationship_type: str
- property: Optional[Dict] = None
- class RelationshipNameVO(BaseModel):
- user_id: int
- graph_id: int
- relationship_type: str
- class UpdateRelationTypeVO(BaseModel):
- user_id: int
- graph_id: int
- old_relationship_type: str
- new_relationship_type: str
- # Entity endpoints
- @router.post("/entity/create", response_model=StandardResponse)
- def create_entity(entities: List[CreateEntity], db: Session = Depends(get_db)):
- try:
- results = []
- graph_business = GraphBusiness(db)
- user_data_relation_business = UserDataRelationBusiness(db)
- user_business = UserBusiness(db)
- user_id = entities[0].user_id
- user = user_business.get_user(user_id)
- if not user:
- raise HTTPException(status_code=404, detail="User not found")
- for entity in entities:
- node = graph_business.create_node(graph_id=entity.graph_id, name=entity.name, category=entity.label, props=entity.properties if entity.properties else {})
- user_data_relation_business.create_relation(user_id=entity.user_id, data_category="DbKgNode", data_id=node.id, user_name=user.username, role_id=user.roles[0].id, role_name=user.roles[0].name)
- results.append({"id": node.id, "label": node.category, "name": node.name})
- return StandardResponse(records=results, code=SUCCESS, message="Success")
- except Exception as e:
- logger.error(f"Error creating entity: {str(e)}")
- raise HTTPException(status_code=500, detail="Internal Server Error")
- @router.post("/entity/delete", response_model=StandardResponse)
- def delete_entity(entities: List[DeleteEntity], db: Session = Depends(get_db)):
-
- try:
- graph_business = GraphBusiness(db)
- for entity in entities:
- graph_business.delete_node(entity.graph_id,entity.node_id)
- return StandardResponse(records=[], code=SUCCESS, message="Success")
- except Exception as e:
- logger.error(f"Error deleting entity: {str(e)}")
- raise HTTPException(status_code=500, detail="Internal Server Error")
- @router.post("/entity/update", response_model=StandardResponse)
- def update_entity(entities: List[UpdateEntity], db: Session = Depends(get_db)):
- try:
- results = []
- graph_business = GraphBusiness(db)
- for entity in entities:
- node = graph_business.update_node(graph_id=entity.graph_id, id=entity.node_id, name=entity.name)
- if not node:
- continue
- results.append({"id": node.id, "label": node.category, "name": node.name})
- return StandardResponse(records=results, code=SUCCESS, message="Success")
- except Exception as e:
- logger.error(f"Error updating entity: {str(e)}")
- raise HTTPException(status_code=500, detail="Internal Server Error")
- @router.post("/entity/find", response_model=StandardResponse)
- def find_entity(entity: FindEntity, db: Session = Depends(get_db)):
-
- try:
- graph_business = GraphBusiness(db)
- node = graph_business.get_node_by_id(entity.graph_id, entity.node_id)
- if not node:
- raise HTTPException(status_code=404, detail="Entity not found")
- props = []
- if node.props:
- for prop in node.props:
- props.append({
- "name": prop.prop_name,
- "title": prop.prop_title,
- "value": prop.prop_value
- })
- result = {"id": node.id, "label": node.category, "name": node.name, "properties": props}
- return StandardResponse(records=[result], code=SUCCESS, message="Success")
- except Exception as e:
- logger.error(f"Error finding entity: {str(e)}")
- raise HTTPException(status_code=500, detail="Internal Server Error")
- @router.post("/entity/search", response_model=StandardResponse)
- def search_entity(entity: SearchEntity, db: Session = Depends(get_db)):
- try:
- graph_business = GraphBusiness(db)
- nodes = graph_business.search_like_node_by_name(entity.name, entity.label, entity.graph_id,20)
- results = [{"id": node.id, "label": node.category, "name": node.name} for node in nodes]
- return StandardResponse(records=results, code=SUCCESS, message="Success")
-
- except Exception as e:
- logger.error(f"Error searching entity: {str(e)}")
- raise HTTPException(status_code=500, detail="Internal Server Error")
-
- # Property endpoints
- @router.post("/property/create", response_model=StandardResponse)
- def create_property(entities: List[EntityVO], db: Session = Depends(get_db)):
- try:
- results = []
- graph_business = GraphBusiness(db)
- for entity in entities:
- node = graph_business.get_node_by_id(entity.graph_id, entity.node_id)
- if not node:
- continue
- property = entity.properties
- props = graph_business.create_node_prop(category=entity.label, ref_id=entity.node_id, prop_name=property["name"], prop_value=property["value"],prop_title=property["name"])
- properties = []
- for prop in props:
- properties.append({
- "name": prop.prop_name,
- "title": prop.prop_title,
- "value": prop.prop_value
- })
- results.append({"id": node.id, "properties": properties})
-
- return StandardResponse(records=results, code=SUCCESS, message="Success")
- except Exception as e:
- logger.error(f"Error creating property: {str(e)}")
- raise HTTPException(status_code=500, detail="Internal Server Error")
- # @router.post("/property/delete", response_model=StandardResponse)
- # def delete_property(properties: List[DeleteProperty], db: Session = Depends(get_db)):
- # try:
- # results = []
- # graph_business = GraphBusiness(db)
- # for property in properties:
- # node = graph_business.get_node_by_id(property.graph_id, property.node_id)
- # if not node:
- # continue
- # graph_business.delete_node_prop(property.graph_id, property.node_id, property.property_name)
- # except Exception as e:
- # logger.error(f"Error deleting property: {str(e)}")
- # raise HTTPException(status_code=500, detail="Internal Server Error")
- @router.post("/property/update", response_model=StandardResponse)
- def update_property(property: UpdateProperty, db: Session = Depends(get_db)):
- try:
- graph_business = GraphBusiness(db)
- node = graph_business.get_node_by_id(property.graph_id, property.node_id)
- if not node or not node.props:
- raise HTTPException(status_code=404, detail="Entity or property not found")
- graph_business.update_node_prop(property.node_id, property.property_name, property.property_value)
- return StandardResponse(records=[], code=SUCCESS, message="Success")
- except Exception as e:
- logger.error(f"Error updating property: {str(e)}")
- raise HTTPException(status_code=500, detail="Internal Server Error")
- # Relationship endpoints
- @router.post("/relationship/create", response_model=StandardResponse)
- def create_relationship(relationships: List[RelationshipVO], db: Session = Depends(get_db)):
- try:
- graph_business = GraphBusiness(db)
- for rel in relationships:
- graph_business.create_edge(graph_id=rel.graph_id, src_id=rel.start_id, dest_id=rel.end_id, category=rel.relationship_type, name=rel.relationship_type, props=rel.property if rel.property else {})
-
- return StandardResponse(records=[], code=SUCCESS, message="Success")
- except Exception as e:
- logger.error(f"Error creating relationship: {str(e)}")
- raise HTTPException(status_code=500, detail="Internal Server Error")
- @router.post("/relationship/delete", response_model=StandardResponse)
- def delete_relationship(relationships: List[RelationshipVO], db: Session = Depends(get_db)):
- try:
- graph_business = GraphBusiness(db)
- for rel in relationships:
- graph_business.delete_edge(graph_id=rel.graph_id, src_id=rel.start_id, dest_id=rel.end_id, category=rel.relationship_type, name=rel.relationship_type)
- return StandardResponse(records=[], code=SUCCESS, message="Success")
- except Exception as e:
- logger.error(f"Error deleting relationship: {str(e)}")
- raise HTTPException(status_code=500, detail="Internal Server Error")
- @router.post("/relationship/search", response_model=StandardResponse)
- def search_relationship(relation: RelationshipNameVO, db: Session = Depends(get_db)):
- try:
- graph_business = GraphBusiness(db)
- edges = graph_business.search_edges(graph_id=relation.graph_id, category=relation.relationship_type)
- if not edges:
- raise HTTPException(status_code=404, detail="Relationship not found")
- return StandardResponse(records=[edge.category for edge in edges], code=SUCCESS, message="Success")
- except Exception as e:
- logger.error(f"Error searching relationship: {str(e)}")
- raise HTTPException(status_code=500, detail="Internal Server Error")
- @router.post("/relationship/update", response_model=StandardResponse)
- def update_relationship(updates: List[UpdateRelationTypeVO], db: Session = Depends(get_db)):
- try:
- graph_business = GraphBusiness(db)
- for update in updates:
- graph_business.update_edges(graph_id=update.graph_id, old_category=update.old_relationship_type, new_category=update.new_relationship_type)
- return StandardResponse(records=[], code=SUCCESS, message="Success")
- except Exception as e:
- logger.error(f"Error updating relationship: {str(e)}")
- raise HTTPException(status_code=500, detail="Internal Server Error")
- graph_router = router
|