Python - 在 asyncio 中取消任务?

我在下面为异步池编写了代码。在__aexit__任务完成后,我将取消 _worker 任务。但是当我运行代码时,工作任务不会被取消并且代码会永远运行。这是任务的样子:<Task pending coro=<AsyncPool._worker() running at \async_pool.py:17> wait_for=<Future cancelled>> . asyncio.wait_for正在被取消,但不是工作任务。

class AsyncPool:
    def __init__(self,coroutine,no_of_workers,timeout):
        self._loop           = asyncio.get_event_loop()
        self._queue          = asyncio.Queue()
        self._no_of_workers  = no_of_workers
        self._coroutine      = coroutine
        self._timeout        = timeout
        self._workers        = None

    async def _worker(self): 
        while True:
            try:
                ret = False
                queue_item           = await self._queue.get()
                ret = True
                result               = await asyncio.wait_for(self._coroutine(queue_item), timeout = self._timeout,loop= self._loop)
            except Exception as e:
                print(e)
            finally:
                if ret:
                    self._queue.task_done()


    async def push_to_queue(self,item):
        self._queue.put_nowait(item)
    
    async def __aenter__(self):
        assert self._workers == None
        self._workers = [asyncio.create_task(self._worker()) for _ in range(self._no_of_workers)]
        return self
    
    async def __aexit__(self,type,value,traceback):
        await self._queue.join()

        for worker in self._workers:
            worker.cancel()

        await asyncio.gather(*self._workers, loop=self._loop, return_exceptions =True)

使用异步池:

async def something(item):
    print("got", item)
    await asyncio.sleep(item)
 
async def main():
    async with AsyncPool(something, 5, 2) as pool:
        for i in range(10):
            await pool.push_to_queue(i)
 
asyncio.run(main())

我终端的输出:

最佳答案

问题是您的 except Exception 异常子句也会捕获取消并忽略它。更让人困惑的是,print(e) 只是在出现 CancelledError 时打印一个空行,这是输出中空行的来源。 (将其更改为 print(type(e)) 显示发生了什么。)

要更正此问题,请将 except Exception 更改为更具体的内容,例如 except asyncio.TimeoutError。在 Python 3.8 中不需要此更改,其中 asyncio.CancelledError 不再派生自 Exception,而是派生自 BaseException,因此 except Exception 没有捕捉到它。

https://stackoverflow.com/questions/63575632/

相关文章:

ruby-on-rails - 在 Rails 中进行 View 更新的正确方法

javascript - 在 quill-image-resize-vue 中对齐时出错

python - 从数据框创建字典,其中多列的元组作为键,另一列作为值

node.js - 跟踪 Node.js 程序执行

git - 以编程方式为新的 Azure 存储库设置默认分支名称

python-3.x - Python3 Beautifulsoup4 从多个容器兄弟中提取文本

java - 无法在数据库中使用 BCryptPassword 保存加密密码

c++ - leveldb 是否每 4KB 或 2KB 数据 block 生成 Bloom Filt

javascript - 使用javascript强制刷新rss feed

python - 如何启用对 Dash DataTable 上特定行的编辑