from re import S from fastapi import FastAPI, HTTPException from fastapi import APIRouter, Depends, Query # from networkx import graph from pydantic import BaseModel from typing import List, Optional, Dict from sqlalchemy import create_engine, Column, Integer, String, Boolean, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import properties, sessionmaker from sqlalchemy.orm import Session import logging from agent.libs.graph import GraphBusiness from agent.libs.user_data_relation import UserDataRelationBusiness from agent.libs.user import UserBusiness from agent.db.database import get_db from agent.models.web.response import StandardResponse,FAILED,SUCCESS router = APIRouter(prefix="/graph_mg", tags=["knowledge graph management interface"]) logger = logging.getLogger(__name__) # Pydantic models class CreateEntity(BaseModel): user_id: int graph_id: int label: str name: str properties: Optional[Dict] = None class DeleteEntity(BaseModel): user_id: int graph_id: int node_id: int class UpdateEntity(BaseModel): user_id: int graph_id: int node_id: int name: str class FindEntity(BaseModel): user_id: int graph_id: int node_id: int class SearchEntity(BaseModel): user_id: int graph_id: int label: str name: str class EntityVO(BaseModel): user_id: int graph_id: int label: str node_id: int properties: Dict class DeleteProperty(BaseModel): user_id: int graph_id: int node_id: int property_name: str class UpdateProperty(BaseModel): user_id: int graph_id: int node_id: int property_name: str property_value: str class RelationshipVO(BaseModel): user_id: int graph_id: int start_id: int end_id: int start_label: str end_label: str relationship_type: str property: Optional[Dict] = None class RelationshipNameVO(BaseModel): user_id: int graph_id: int relationship_type: str class UpdateRelationTypeVO(BaseModel): user_id: int graph_id: int old_relationship_type: str new_relationship_type: str # Entity endpoints @router.post("/entity/create", response_model=StandardResponse) def create_entity(entities: List[CreateEntity], db: Session = Depends(get_db)): try: results = [] graph_business = GraphBusiness(db) user_data_relation_business = UserDataRelationBusiness(db) user_business = UserBusiness(db) user_id = entities[0].user_id user = user_business.get_user(user_id) if not user: raise HTTPException(status_code=404, detail="User not found") for entity in entities: node = graph_business.create_node(graph_id=entity.graph_id, name=entity.name, category=entity.label, props=entity.properties if entity.properties else {}) user_data_relation_business.create_relation(user_id=entity.user_id, data_category="DbKgNode", data_id=node.id, user_name=user.username, role_id=user.roles[0].id, role_name=user.roles[0].name) results.append({"id": node.id, "label": node.category, "name": node.name}) return StandardResponse(records=results, code=SUCCESS, message="Success") except Exception as e: logger.error(f"Error creating entity: {str(e)}") raise HTTPException(status_code=500, detail="Internal Server Error") @router.post("/entity/delete", response_model=StandardResponse) def delete_entity(entities: List[DeleteEntity], db: Session = Depends(get_db)): try: graph_business = GraphBusiness(db) for entity in entities: graph_business.delete_node(entity.graph_id,entity.node_id) return StandardResponse(records=[], code=SUCCESS, message="Success") except Exception as e: logger.error(f"Error deleting entity: {str(e)}") raise HTTPException(status_code=500, detail="Internal Server Error") @router.post("/entity/update", response_model=StandardResponse) def update_entity(entities: List[UpdateEntity], db: Session = Depends(get_db)): try: results = [] graph_business = GraphBusiness(db) for entity in entities: node = graph_business.update_node(graph_id=entity.graph_id, id=entity.node_id, name=entity.name) if not node: continue results.append({"id": node.id, "label": node.category, "name": node.name}) return StandardResponse(records=results, code=SUCCESS, message="Success") except Exception as e: logger.error(f"Error updating entity: {str(e)}") raise HTTPException(status_code=500, detail="Internal Server Error") @router.post("/entity/find", response_model=StandardResponse) def find_entity(entity: FindEntity, db: Session = Depends(get_db)): try: graph_business = GraphBusiness(db) node = graph_business.get_node_by_id(entity.graph_id, entity.node_id) if not node: raise HTTPException(status_code=404, detail="Entity not found") props = [] if node.props: for prop in node.props: props.append({ "name": prop.prop_name, "title": prop.prop_title, "value": prop.prop_value }) result = {"id": node.id, "label": node.category, "name": node.name, "properties": props} return StandardResponse(records=[result], code=SUCCESS, message="Success") except Exception as e: logger.error(f"Error finding entity: {str(e)}") raise HTTPException(status_code=500, detail="Internal Server Error") @router.post("/entity/search", response_model=StandardResponse) def search_entity(entity: SearchEntity, db: Session = Depends(get_db)): try: graph_business = GraphBusiness(db) nodes = graph_business.search_like_node_by_name(entity.name, entity.label, entity.graph_id,20) results = [{"id": node.id, "label": node.category, "name": node.name} for node in nodes] return StandardResponse(records=results, code=SUCCESS, message="Success") except Exception as e: logger.error(f"Error searching entity: {str(e)}") raise HTTPException(status_code=500, detail="Internal Server Error") # Property endpoints @router.post("/property/create", response_model=StandardResponse) def create_property(entities: List[EntityVO], db: Session = Depends(get_db)): try: results = [] graph_business = GraphBusiness(db) for entity in entities: node = graph_business.get_node_by_id(entity.graph_id, entity.node_id) if not node: continue property = entity.properties props = graph_business.create_node_prop(category=entity.label, ref_id=entity.node_id, prop_name=property["name"], prop_value=property["value"],prop_title=property["name"]) properties = [] for prop in props: properties.append({ "name": prop.prop_name, "title": prop.prop_title, "value": prop.prop_value }) results.append({"id": node.id, "properties": properties}) return StandardResponse(records=results, code=SUCCESS, message="Success") except Exception as e: logger.error(f"Error creating property: {str(e)}") raise HTTPException(status_code=500, detail="Internal Server Error") # @router.post("/property/delete", response_model=StandardResponse) # def delete_property(properties: List[DeleteProperty], db: Session = Depends(get_db)): # try: # results = [] # graph_business = GraphBusiness(db) # for property in properties: # node = graph_business.get_node_by_id(property.graph_id, property.node_id) # if not node: # continue # graph_business.delete_node_prop(property.graph_id, property.node_id, property.property_name) # except Exception as e: # logger.error(f"Error deleting property: {str(e)}") # raise HTTPException(status_code=500, detail="Internal Server Error") @router.post("/property/update", response_model=StandardResponse) def update_property(property: UpdateProperty, db: Session = Depends(get_db)): try: graph_business = GraphBusiness(db) node = graph_business.get_node_by_id(property.graph_id, property.node_id) if not node or not node.props: raise HTTPException(status_code=404, detail="Entity or property not found") graph_business.update_node_prop(property.node_id, property.property_name, property.property_value) return StandardResponse(records=[], code=SUCCESS, message="Success") except Exception as e: logger.error(f"Error updating property: {str(e)}") raise HTTPException(status_code=500, detail="Internal Server Error") # Relationship endpoints @router.post("/relationship/create", response_model=StandardResponse) def create_relationship(relationships: List[RelationshipVO], db: Session = Depends(get_db)): try: graph_business = GraphBusiness(db) for rel in relationships: graph_business.create_edge(graph_id=rel.graph_id, src_id=rel.start_id, dest_id=rel.end_id, category=rel.relationship_type, name=rel.relationship_type, props=rel.property if rel.property else {}) return StandardResponse(records=[], code=SUCCESS, message="Success") except Exception as e: logger.error(f"Error creating relationship: {str(e)}") raise HTTPException(status_code=500, detail="Internal Server Error") @router.post("/relationship/delete", response_model=StandardResponse) def delete_relationship(relationships: List[RelationshipVO], db: Session = Depends(get_db)): try: graph_business = GraphBusiness(db) for rel in relationships: graph_business.delete_edge(graph_id=rel.graph_id, src_id=rel.start_id, dest_id=rel.end_id, category=rel.relationship_type, name=rel.relationship_type) return StandardResponse(records=[], code=SUCCESS, message="Success") except Exception as e: logger.error(f"Error deleting relationship: {str(e)}") raise HTTPException(status_code=500, detail="Internal Server Error") @router.post("/relationship/search", response_model=StandardResponse) def search_relationship(relation: RelationshipNameVO, db: Session = Depends(get_db)): try: graph_business = GraphBusiness(db) edges = graph_business.search_edges(graph_id=relation.graph_id, category=relation.relationship_type) if not edges: raise HTTPException(status_code=404, detail="Relationship not found") return StandardResponse(records=[edge.category for edge in edges], code=SUCCESS, message="Success") except Exception as e: logger.error(f"Error searching relationship: {str(e)}") raise HTTPException(status_code=500, detail="Internal Server Error") @router.post("/relationship/update", response_model=StandardResponse) def update_relationship(updates: List[UpdateRelationTypeVO], db: Session = Depends(get_db)): try: graph_business = GraphBusiness(db) for update in updates: graph_business.update_edges(graph_id=update.graph_id, old_category=update.old_relationship_type, new_category=update.new_relationship_type) return StandardResponse(records=[], code=SUCCESS, message="Success") except Exception as e: logger.error(f"Error updating relationship: {str(e)}") raise HTTPException(status_code=500, detail="Internal Server Error") graph_router = router