12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152 |
- from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
- from pydantic import BaseModel
- from .auth import get_current_user, User
- from ..db.database import get_db, DataItem
- from sqlalchemy.orm import Session
- from ..mq.tasks import process_data
- from celery.result import AsyncResult
- import time
- router = APIRouter()
- # 请求体模型
- class DataItemCreate(BaseModel):
- content: str
- # 受保护的接口
- @router.post("/data", response_model=DataItemCreate)
- async def create_data_item(
- item: DataItemCreate,
- current_user: User = Depends(get_current_user),
- db: Session = Depends(get_db)
- ):
- db_item = DataItem(content=item.content)
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- return db_item
- @router.post("/submit/")
- async def submit_data(item: DataItem, background_tasks: BackgroundTasks):
- task = process_data.delay(item.content)
- background_tasks.add_task(check_task_status, task.id)
- return {"task_id": task.id, "message": "Task submitted successfully"}
- def check_task_status(task_id: str):
- result = AsyncResult(task_id)
- while not result.ready():
- time.sleep(1) # 检查任务状态
- print(f"Task {task_id} completed with result: {result.result}")
- @router.get("/task/{task_id}")
- async def get_task_status(task_id: str):
- task_result = AsyncResult(task_id)
- if task_result.ready():
- return {"task_id": task_id, "status": "completed", "result": task_result.result}
- else:
- return {"task_id": task_id, "status": "processing"}
-
-
- protected_router = router
|