recieve.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
  2. from pydantic import BaseModel
  3. from .auth import get_current_user, User
  4. from ..db.database import get_db, DataItem
  5. from sqlalchemy.orm import Session
  6. from ..mq.tasks import process_data
  7. from celery.result import AsyncResult
  8. import time
  9. router = APIRouter()
  10. # 请求体模型
  11. class DataItemCreate(BaseModel):
  12. content: str
  13. # 受保护的接口
  14. @router.post("/data", response_model=DataItemCreate)
  15. async def create_data_item(
  16. item: DataItemCreate,
  17. current_user: User = Depends(get_current_user),
  18. db: Session = Depends(get_db)
  19. ):
  20. db_item = DataItem(content=item.content)
  21. db.add(db_item)
  22. db.commit()
  23. db.refresh(db_item)
  24. return db_item
  25. @router.post("/submit/")
  26. async def submit_data(item: DataItem, background_tasks: BackgroundTasks):
  27. task = process_data.delay(item.content)
  28. background_tasks.add_task(check_task_status, task.id)
  29. return {"task_id": task.id, "message": "Task submitted successfully"}
  30. def check_task_status(task_id: str):
  31. result = AsyncResult(task_id)
  32. while not result.ready():
  33. time.sleep(1) # 检查任务状态
  34. print(f"Task {task_id} completed with result: {result.result}")
  35. @router.get("/task/{task_id}")
  36. async def get_task_status(task_id: str):
  37. task_result = AsyncResult(task_id)
  38. if task_result.ready():
  39. return {"task_id": task_id, "status": "completed", "result": task_result.result}
  40. else:
  41. return {"task_id": task_id, "status": "processing"}
  42. protected_router = router