本文主要是介绍fastapi 结合 celery 将异步过程转变为同步过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
使用 asyncio 的协程方式查询任务结果,查询超时时间为 120 s,每 200 ms 查询一次:
# 使用 asyncio 的协程方式查询任务结果,查询超时时间为 120 s,每 200 ms 查询一次
@router.post('/backgroud_type', response_model=BackgroundTypeResponse)
async def backgroud_type(request: Request, backgroundTypeRequest: BackgroundTypeRequest):logging.info(f"{backgroundTypeRequest}")task = request.app.state.celery_app.send_task(name="tasks.backgroud_type",priority=backgroundTypeRequest.priority,args=[backgroundTypeRequest.dict()])try:return await async_check_task_result(task.id, request.app.state.celery_app)except asyncio.TimeoutError:raise HTTPException(status_code=408, detail="Request timed out")async def async_check_task_result(task_id, celery_app, timeout=120):start_time = asyncio.get_running_loop().time()result_async = AsyncResult(task_id, app=celery_app)while True:if result_async.ready():return {'task_id': task_id,'result': result_async.get()}if (asyncio.get_running_loop().time() - start_time) > timeout:raise asyncio.TimeoutErrorawait asyncio.sleep(0.2)
这篇关于fastapi 结合 celery 将异步过程转变为同步过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!