123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- from fastapi import APIRouter, UploadFile, File, HTTPException, Depends, Header
- from typing import List, Optional
- from pydantic import BaseModel
- from models.web.response import StandardResponse
- from db.database import get_db
- from sqlalchemy.orm import Session
- from sqlalchemy import text
- router = APIRouter(prefix="/dify", tags=["Dify Knowledge Base"])
- # --- Data Models ---
- class RetrievalSetting(BaseModel):
- top_k: int
- score_threshold: float
- class MetadataCondition(BaseModel):
- name: List[str]
- comparison_operator: str
- value: Optional[str] = None
- class MetadataFilter(BaseModel):
- logical_operator: str = "and"
- conditions: List[MetadataCondition]
- class DifyRetrievalRequest(BaseModel):
- knowledge_id: str
- query: str
- retrieval_setting: RetrievalSetting
- metadata_condition: Optional[MetadataFilter] = None
- class KnowledgeRecord(BaseModel):
- content: str
- score: float
- title: str
- metadata: dict
- # --- Authentication ---
- async def verify_api_key(authorization: str = Header(...)):
- if not authorization.startswith("Bearer "):
- raise HTTPException(
- status_code=403,
- detail=StandardResponse(
- success=False,
- error_code=1001,
- error_msg="Invalid Authorization header format"
- )
- )
- api_key = authorization[7:]
- # TODO: Implement actual API key validation logic
- if not api_key:
- raise HTTPException(
- status_code=403,
- detail=StandardResponse(
- success=False,
- error_code=1002,
- error_msg="Authorization failed"
- )
- )
- return api_key
- @router.post("/retrieval", response_model=StandardResponse)
- async def dify_retrieval(
- request: DifyRetrievalRequest,
- api_key: str = Depends(verify_api_key),
- db: Session = Depends(get_db)
- ):
- """
- 实现Dify外部知识库检索接口
- """
- print("dify_retrieval start")
- try:
- # 检查知识库是否存在
- result = db.execute(text("select 1 from kg_graphs where id = :graph_id"), {"graph_id": request.knowledge_id})
-
- kb_exists = result.scalar()
- print(kb_exists)
- if kb_exists == 0:
- raise HTTPException(
- status_code=404,
- detail=StandardResponse(
- success=False,
- error_code=2001,
- error_msg="The knowledge does not exist"
- )
- )
- print("知识库存在")
- # 构建基础查询
- query = """
- select id,name,category,version from kg_nodes as node where node.graph_id = :graph_id and node.name = :node_name
- """
- # 添加元数据过滤条件
- if request.metadata_condition:
- conditions = []
- for cond in request.metadata_condition.conditions:
- operator_map = {
- "contains": "CONTAINS",
- "not contains": "NOT CONTAINS",
- "start with": "STARTS WITH",
- "end with": "ENDS WITH",
- "is": "=",
- "is not": "<>",
- "empty": "IS NULL",
- "not empty": "IS NOT NULL",
- ">": ">",
- "<": "<",
- "≥": ">=",
- "≤": "<=",
- "before": "<",
- "after": ">"
- }
- cypher_op = operator_map.get(cond.comparison_operator, "=")
-
- for field in cond.name:
- if cond.comparison_operator in ["empty", "not empty"]:
- conditions.append(f"node.{field} {cypher_op}")
- else:
- conditions.append(
- f"node.{field} {cypher_op} ${field}_value"
- )
- where_clause = " AND ".join(conditions)
- query += f" AND {where_clause}"
- query += """
- ORDER BY node.name DESC
- LIMIT :top_k
- """
- params = {'graph_id': request.knowledge_id, 'node_name': request.query,'top_k':request.retrieval_setting.top_k}
- supply_params = {f"{cond.name}_value": cond.value for cond in request.metadata_condition.conditions} if request.metadata_condition else {}
- # 执行查询
-
- result = db.execute(text(query),
- {
- **params,
- **supply_params
- } )
-
-
- data_returned = []
- for record in result:
- id,name,category,version = record
- doc_node = {
- "id": id,
- "name": name,
- "category": category,
- "version": version
- }
-
- data_returned.append({
- "content": "",
- "score": float(1.0),
- "title": doc_node.get("name", "Untitled"),
- "metadata": {
- "id": doc_node.get("id", 0),
- "category": doc_node.get("category", "")
- }
- })
-
- for data in data_returned:
- id = data['metadata']['id']
- result = db.execute(text("""
- select prop_title, prop_value from kg_props as prop where prop.category=1 and prop.ref_id =:node_id
- """),{'node_id':id})
- content = []
- for record in result:
- prop_title, prop_value = record
- content.append(f"{prop_title}:{prop_value}\n")
- data['content']="\n".join(content)
- response_data = StandardResponse(
- records=data_returned
- )
- return response_data
- except HTTPException:
- raise
- except Exception as e:
- print(e)
- raise HTTPException(
- status_code=500,
- detail=StandardResponse(
- success=False,
- error_msg=str(e)
- )
- )
- dify_kb_router = router
|