from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks from pydantic import BaseModel from .auth import get_current_user, User from ..db.database import get_db, DataItem from sqlalchemy.orm import Session from ..mq.tasks import process_data from celery.result import AsyncResult import time router = APIRouter() # 请求体模型 class DataItemCreate(BaseModel): content: str # 受保护的接口 @router.post("/data", response_model=DataItemCreate) async def create_data_item( item: DataItemCreate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): db_item = DataItem(content=item.content) db.add(db_item) db.commit() db.refresh(db_item) return db_item @router.post("/submit/") async def submit_data(item: DataItem, background_tasks: BackgroundTasks): task = process_data.delay(item.content) background_tasks.add_task(check_task_status, task.id) return {"task_id": task.id, "message": "Task submitted successfully"} def check_task_status(task_id: str): result = AsyncResult(task_id) while not result.ready(): time.sleep(1) # 检查任务状态 print(f"Task {task_id} completed with result: {result.result}") @router.get("/task/{task_id}") async def get_task_status(task_id: str): task_result = AsyncResult(task_id) if task_result.ready(): return {"task_id": task_id, "status": "completed", "result": task_result.result} else: return {"task_id": task_id, "status": "processing"} protected_router = router