来自asyncio docs:
asyncio.as_completed(aws,*,loop=None,timeout=None)
Run awaitable objects in the aws set concurrently. Return an iterator
of Future objects. Each Future object returned represents the earliest
result from the set of the remaining awaitables.
我假设这些Future对象中的每一个都具有asyncio.Future
中描述的方法:.cancelled(),. exception()和.result().但似乎所产生的元素只是协程,而不是Future对象.我错过了什么?
这似乎打败了.as_completed()的描述.如果我需要等待它,协程如何“完成”?
>>> import asyncio
>>> import aiohttp
>>>
>>> async def get(session,url):
... async with session.request('GET',url) as resp:
... t = await resp.text()
... return t
...
>>> async def bulk_as_completed(urls):
... async with aiohttp.ClientSession() as session:
... aws = [get(session,url) for url in urls]
... for future in asyncio.as_completed(aws):
... for i in ('cancelled','exception','result'):
... print(hasattr(future,i))
... print(type(future))
... try:
... result = await future
... except:
... pass
... else:
... print(type(result))
... print()
...
>>>
>>> urls = (
... 'https://docs.python.org/3/library/asyncio-task.html',... 'https://docs.python.org/3/library/select.html',... 'https://docs.python.org/3/library/this-page-will-404.html',... )
>>>
>>> asyncio.run(bulk_as_completed(urls))
False
False
False
最终,我关心这个的原因是因为我想让异常像asyncio.gather(…,return_exceptions = True)那样冒泡.考虑在调用session.request()时添加一个伪造的URL:
urls = (
'https://docs.python.org/3/library/asyncio-task.html','https://docs.python.org/3/library/select.html','https://docs.python.org/3/library/this-page-will-404.html',# This URL will raise on session.request(). How can I propagate
# that exception to the iterator of results?
'https://asdfasdfasdf-does-not-exist-asdfasdfasdf.com'
)
我希望能够做的是这样的事情(使用Future对象的方法,但这些都不是Future对象,这就是问题):
async def bulk_as_completed(urls):
async with aiohttp.ClientSession() as session:
aws = [get(session,url) for url in urls]
for future in asyncio.as_completed(aws):
if future.cancelled():
res = futures.CancelledError()
else:
exc = future.exception()
if exc is not None:
res = exc
else:
res = future.result()
# ...
# [Do something with `res`]
What I would like to be able to do is something like this […]
async def bulk_as_completed(urls):
async with aiohttp.ClientSession() as session:
aws = [get(session,url) for url in urls]
for future in asyncio.as_completed(aws):
try:
res = await future
except Exception as e:
res = e
# ...
# [Do something with `res`]
This [yielding coroutines rather than futures] seems to defeat the description of
.as_completed()
. How is the coroutine “completed” if I need to await it?
不是.首次实现asyncio.as_completed时,异步迭代器不存在.没有异步迭代就没有办法在它们完成时返回期货,所以as_completed类型通过屈服(立即)虚拟等待来伪造它,人们必须等待获得实际结果.
即使as_completed产生了实际的期货,也不会对你的用例有所帮助,因为如果没有等待他们的人,这些期货就无法完成.为了提供as_completed让步完成的期货的预期语义,as_completed需要实现异步迭代,其等价的__next__可以等待.
as_completed的惊人行为之前已经提出,我已经通过提供异步迭代提交了an issue来修复它.一旦实现,您的原始代码将只用于更改为async for.