from fastapi import APIRouter, UploadFile, File, HTTPException, Depends, Header from typing import List, Optional from pydantic import BaseModel from models.web.response import StandardResponse from db.database import get_db from sqlalchemy.orm import Session from sqlalchemy import text router = APIRouter(prefix="/dify", tags=["Dify Knowledge Base"]) # --- Data Models --- class RetrievalSetting(BaseModel): top_k: int score_threshold: float class MetadataCondition(BaseModel): name: List[str] comparison_operator: str value: Optional[str] = None class MetadataFilter(BaseModel): logical_operator: str = "and" conditions: List[MetadataCondition] class DifyRetrievalRequest(BaseModel): knowledge_id: str query: str retrieval_setting: RetrievalSetting metadata_condition: Optional[MetadataFilter] = None class KnowledgeRecord(BaseModel): content: str score: float title: str metadata: dict # --- Authentication --- async def verify_api_key(authorization: str = Header(...)): if not authorization.startswith("Bearer "): raise HTTPException( status_code=403, detail=StandardResponse( success=False, error_code=1001, error_msg="Invalid Authorization header format" ) ) api_key = authorization[7:] # TODO: Implement actual API key validation logic if not api_key: raise HTTPException( status_code=403, detail=StandardResponse( success=False, error_code=1002, error_msg="Authorization failed" ) ) return api_key @router.post("/retrieval", response_model=StandardResponse) async def dify_retrieval( request: DifyRetrievalRequest, api_key: str = Depends(verify_api_key), db: Session = Depends(get_db) ): """ 实现Dify外部知识库检索接口 """ print("dify_retrieval start") try: # 检查知识库是否存在 result = db.execute(text("select 1 from kg_graphs where id = :graph_id"), {"graph_id": request.knowledge_id}) kb_exists = result.scalar() print(kb_exists) if kb_exists == 0: raise HTTPException( status_code=404, detail=StandardResponse( success=False, error_code=2001, error_msg="The knowledge does not exist" ) ) print("知识库存在") # 构建基础查询 query = """ select id,name,category,version from kg_nodes as node where node.graph_id = :graph_id and node.name = :node_name """ # 添加元数据过滤条件 if request.metadata_condition: conditions = [] for cond in request.metadata_condition.conditions: operator_map = { "contains": "CONTAINS", "not contains": "NOT CONTAINS", "start with": "STARTS WITH", "end with": "ENDS WITH", "is": "=", "is not": "<>", "empty": "IS NULL", "not empty": "IS NOT NULL", ">": ">", "<": "<", "≥": ">=", "≤": "<=", "before": "<", "after": ">" } cypher_op = operator_map.get(cond.comparison_operator, "=") for field in cond.name: if cond.comparison_operator in ["empty", "not empty"]: conditions.append(f"node.{field} {cypher_op}") else: conditions.append( f"node.{field} {cypher_op} ${field}_value" ) where_clause = " AND ".join(conditions) query += f" AND {where_clause}" query += """ ORDER BY node.name DESC LIMIT :top_k """ params = {'graph_id': request.knowledge_id, 'node_name': request.query,'top_k':request.retrieval_setting.top_k} supply_params = {f"{cond.name}_value": cond.value for cond in request.metadata_condition.conditions} if request.metadata_condition else {} # 执行查询 result = db.execute(text(query), { **params, **supply_params } ) data_returned = [] for record in result: id,name,category,version = record doc_node = { "id": id, "name": name, "category": category, "version": version } data_returned.append({ "content": "", "score": float(1.0), "title": doc_node.get("name", "Untitled"), "metadata": { "id": doc_node.get("id", 0), "category": doc_node.get("category", "") } }) for data in data_returned: id = data['metadata']['id'] result = db.execute(text(""" select prop_title, prop_value from kg_props as prop where prop.category=1 and prop.ref_id =:node_id """),{'node_id':id}) content = [] for record in result: prop_title, prop_value = record content.append(f"{prop_title}:{prop_value}\n") data['content']="\n".join(content) response_data = StandardResponse( records=data_returned ) return response_data except HTTPException: raise except Exception as e: print(e) raise HTTPException( status_code=500, detail=StandardResponse( success=False, error_msg=str(e) ) ) dify_kb_router = router