import sys,os,io current_path = os.getcwd() sys.path.append(current_path) from config.site import SiteConfig from fastapi import APIRouter, Depends, Query, UploadFile, File from fastapi.responses import FileResponse from db.database import get_db from sqlalchemy.orm import Session from agent.models.web.response import StandardResponse,FAILED,SUCCESS from agent.models.web.request import BasicRequest from agent.libs.agent import AgentBusiness from typing import Optional import logging import base64 import chardet router = APIRouter(prefix="/file", tags=["agent job interface"]) logger = logging.getLogger(__name__) config = SiteConfig() @router.post("/upload/{file_type}/{job_id}", response_model=StandardResponse) async def upload_file(file_type:str, job_id:int, file: UploadFile = File(...)): #if not file.filename.endswith(file_type): # return StandardResponse(code=FAILED, message="Invalid file type") logger.info(f"upload file: {file.filename} {file_type}") data = { "file_type":file_type, "file_name":file.filename} try: file_content = await file.read() os.makedirs(config.get_config('JOB_PATH')+f"/{job_id}/upload", exist_ok=True) with open(config.get_config('JOB_PATH')+f"/{job_id}/upload/{file.filename}", "wb") as f: f.write(file_content) except Exception as e: logger.error(f"upload file error: {e}") return StandardResponse(code=FAILED, message="File upload failed") return StandardResponse(code=SUCCESS, message="File uploaded successfully", records=[data]) @router.post("/browse", response_model=StandardResponse) async def browser_file(request:BasicRequest): job_id = request.get_param("job_id", 0) param_path = request.get_param("path", "") if job_id == 0: return StandardResponse(code=FAILED, message="Job id is required") job_path = os.path.join(config.get_config('JOB_PATH'),f"{job_id}") path = os.path.join(job_path, param_path.replace("..", "")) if not os.path.exists(path): return StandardResponse(code=FAILED, message="File not found") if os.path.isdir(path): files = os.listdir(path) data = [] for file in files: param_path = os.path.join(path, file) encoded_param_path = base64.b64encode(param_path.encode("utf-8")).decode('utf-8') file_type = "file" if os.path.isdir(param_path): file_type = "dir" data.append({ "name":file, "path":encoded_param_path, "type":file_type}) return StandardResponse(code=SUCCESS, message="File list", records=data) else: return StandardResponse(code=SUCCESS, message="nothing found", records=[]) @router.get("/download/{job_id}") async def download_file(job_id:int, path:str=Query("path")): logger.info(f"GET download file: {job_id} {path}") if job_id == 0: return StandardResponse(code=FAILED, message="Job id is required") job_path = os.path.join(config.get_config('JOB_PATH'),f"{job_id}") param_path = base64.b64decode(path.encode("utf-8")).decode('utf-8') param_path = param_path.replace("..", "") path = os.path.join(job_path, param_path) filename = os.path.basename(path) logger.info(f"path: {path} filename: {filename}") if not os.path.exists(path): return StandardResponse(code=FAILED, message="File not found") if os.path.isdir(path): return StandardResponse(code=FAILED, message="Can not download directory") else: return FileResponse(path, media_type='application/octet-stream', filename=filename) @router.get("/view/{job_id}", response_model=StandardResponse) def view_file(job_id:int, path:str=Query("path")): logger.info(f"view file: {job_id} {path}") if job_id == 0: return StandardResponse(code=FAILED, message="Job id is required") job_path = os.path.join(config.get_config('JOB_PATH'),f"{job_id}") param_path = base64.b64decode(path.encode("utf-8")).decode('utf-8') param_path = param_path.replace("..", "") path = os.path.join(job_path, param_path) filename = os.path.basename(path) logger.info(f"path: {path} filename: {filename}") logger.info(f"view file: {path}") if not os.path.exists(path): logger.info(f"file not exists: {path}") return StandardResponse(code=FAILED, message="File not found") if os.path.isdir(path): logger.info(f"is dir: {path}") return StandardResponse(code=FAILED, message="Can not download directory") else: logger.info("open file") with open(path, "rb") as f: raw_content = f.read() result = chardet.detect(raw_content) content = raw_content.decode(result['encoding'], errors='ignore') return StandardResponse(code=SUCCESS, message="File content", records=[{"content":content}]) return StandardResponse(code=FAILED, message="File not found") file_router = router