from fastapi import APIRouter, Depends, HTTPException, Path,Request from fastapi.responses import JSONResponse, FileResponse from pathlib import Path as PathLib from config.site import TEMP_STORAGE_PATH from sqlalchemy.orm import Session import csv import io import json import os from datetime import datetime from db.database import get_db from db.models import DbKgNode, DbKgEdge, DbKgTask from utils.response import resp_200 router = APIRouter( prefix="/api/data-export", ) def generate_csv(rows, fieldnames): """生成CSV文件流""" output = io.StringIO() writer = csv.DictWriter(output, fieldnames=fieldnames) writer.writeheader() for row in rows: writer.writerow(row) output.seek(0) return output @router.get("/all/{category}/{graph_id}") def export_nodes_csv(category:str, graph_id: int, db: Session = Depends(get_db)): try: taskContent = "" if category == 'graph': taskContent=json.dumps({"graph_id": graph_id}) if category == 'labeling': taskContent=json.dumps({"proj_id": graph_id}) print(taskContent) # 创建任务记录 task = DbKgTask( proj_id=1, task_category=category, task_name="data_export", task_content=taskContent, status=0, created=datetime.now(), updated=datetime.now() ) db.add(task) db.commit() db.refresh(task) return resp_200(data={"task_id": task.id, 'error_code': 0, 'error_message':'任务创建成功'}) except Exception as e: db.rollback() return resp_200(data={"task_id":0 , 'error_code': 500, 'error_message':str(e)}) @router.get("/download/{filename}") def download_file( filename: str = Path(..., regex=r"^[a-zA-Z0-9_\-\.]+$"), ): """下载文件接口""" # 基础路径 base_path = PathLib(TEMP_STORAGE_PATH) # 拼接完整路径 file_path = base_path / filename print(f"Attempting to access file at path: {file_path}") # 安全检查 try: file_path.resolve().relative_to(base_path) except ValueError: raise HTTPException(status_code=400, detail="Invalid file path") # 检查文件是否存在 if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found") # 支持断点续传 return FileResponse( path=file_path, filename=filename, headers={"Accept-Ranges": "bytes"}, ) @router.get("/list-routes") def list_routes(request: Request): """列出所有已注册的路由""" route_list = [] for route in request.app.routes: route_info = { "path": route.path, "methods": list(route.methods) if hasattr(route, 'methods') else ['N/A'], "name": route.name } route_list.append(route_info) return {"routes": route_list} data_export_router = router