123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- from fastapi import APIRouter, Depends, HTTPException, Path,Request
- from fastapi.responses import JSONResponse, FileResponse
- from pathlib import Path as PathLib
- from config.site import TEMP_STORAGE_PATH
- from sqlalchemy.orm import Session
- import csv
- import io
- import json
- import os
- from datetime import datetime
- from db.database import get_db
- from db.models import DbKgNode, DbKgEdge, DbKgTask
- from utils.response import resp_200
- router = APIRouter(
- prefix="/api/data-export",
- )
- def generate_csv(rows, fieldnames):
- """生成CSV文件流"""
- output = io.StringIO()
- writer = csv.DictWriter(output, fieldnames=fieldnames)
- writer.writeheader()
- for row in rows:
- writer.writerow(row)
- output.seek(0)
- return output
- @router.get("/all/{category}/{graph_id}")
- def export_nodes_csv(category:str, graph_id: int, db: Session = Depends(get_db)):
- try:
-
- taskContent = ""
- if category == 'graph':
- taskContent=json.dumps({"graph_id": graph_id})
- if category == 'labeling':
- taskContent=json.dumps({"proj_id": graph_id})
- print(taskContent)
- # 创建任务记录
- task = DbKgTask(
- proj_id=1,
- task_category=category,
- task_name="data_export",
- task_content=taskContent,
- status=0,
- created=datetime.now(),
- updated=datetime.now()
- )
- db.add(task)
- db.commit()
- db.refresh(task)
- return resp_200(data={"task_id": task.id, 'error_code': 0, 'error_message':'任务创建成功'})
-
- except Exception as e:
- db.rollback()
- return resp_200(data={"task_id":0 , 'error_code': 500, 'error_message':str(e)})
- @router.get("/download/{filename}")
- def download_file(
- filename: str = Path(..., regex=r"^[a-zA-Z0-9_\-\.]+$"),
- ):
- """下载文件接口"""
- # 基础路径
- base_path = PathLib(TEMP_STORAGE_PATH)
- # 拼接完整路径
- file_path = base_path / filename
- print(f"Attempting to access file at path: {file_path}")
- # 安全检查
- try:
- file_path.resolve().relative_to(base_path)
- except ValueError:
- raise HTTPException(status_code=400, detail="Invalid file path")
-
- # 检查文件是否存在
- if not file_path.exists():
- raise HTTPException(status_code=404, detail="File not found")
-
- # 支持断点续传
- return FileResponse(
- path=file_path,
- filename=filename,
- headers={"Accept-Ranges": "bytes"},
- )
- @router.get("/list-routes")
- def list_routes(request: Request):
- """列出所有已注册的路由"""
- route_list = []
- for route in request.app.routes:
- route_info = {
- "path": route.path,
- "methods": list(route.methods) if hasattr(route, 'methods') else ['N/A'],
- "name": route.name
- }
- route_list.append(route_info)
- return {"routes": route_list}
- data_export_router = router
|