我在下面为异步池编写了代码。在__aexit__
任务完成后,我将取消 _worker 任务。但是当我运行代码时,工作任务不会被取消并且代码会永远运行。这是任务的样子:<Task pending coro=<AsyncPool._worker() running at \async_pool.py:17> wait_for=<Future cancelled>>
. asyncio.wait_for
正在被取消,但不是工作任务。
class AsyncPool:
def __init__(self,coroutine,no_of_workers,timeout):
self._loop = asyncio.get_event_loop()
self._queue = asyncio.Queue()
self._no_of_workers = no_of_workers
self._coroutine = coroutine
self._timeout = timeout
self._workers = None
async def _worker(self):
while True:
try:
ret = False
queue_item = await self._queue.get()
ret = True
result = await asyncio.wait_for(self._coroutine(queue_item), timeout = self._timeout,loop= self._loop)
except Exception as e:
print(e)
finally:
if ret:
self._queue.task_done()
async def push_to_queue(self,item):
self._queue.put_nowait(item)
async def __aenter__(self):
assert self._workers == None
self._workers = [asyncio.create_task(self._worker()) for _ in range(self._no_of_workers)]
return self
async def __aexit__(self,type,value,traceback):
await self._queue.join()
for worker in self._workers:
worker.cancel()
await asyncio.gather(*self._workers, loop=self._loop, return_exceptions =True)
使用异步池:
async def something(item):
print("got", item)
await asyncio.sleep(item)
async def main():
async with AsyncPool(something, 5, 2) as pool:
for i in range(10):
await pool.push_to_queue(i)
asyncio.run(main())
我终端的输出:
最佳答案
问题是您的 except Exception
异常子句也会捕获取消并忽略它。更让人困惑的是,print(e)
只是在出现 CancelledError
时打印一个空行,这是输出中空行的来源。 (将其更改为 print(type(e))
显示发生了什么。)
要更正此问题,请将 except Exception
更改为更具体的内容,例如 except asyncio.TimeoutError
。在 Python 3.8 中不需要此更改,其中 asyncio.CancelledError
不再派生自 Exception
,而是派生自 BaseException
,因此 except Exception
没有捕捉到它。
https://stackoverflow.com/questions/63575632/
相关文章:
ruby-on-rails - 在 Rails 中进行 View 更新的正确方法
javascript - 在 quill-image-resize-vue 中对齐时出错
python - 从数据框创建字典,其中多列的元组作为键,另一列作为值
git - 以编程方式为新的 Azure 存储库设置默认分支名称
python-3.x - Python3 Beautifulsoup4 从多个容器兄弟中提取文本
java - 无法在数据库中使用 BCryptPassword 保存加密密码
c++ - leveldb 是否每 4KB 或 2KB 数据 block 生成 Bloom Filt