假设我们有一堆下载链接,每个链接可能需要不同的下载时间.我只允许使用最多3个连接下载.现在,我想确保使用asyncio有效地执行此操作.
这就是我想要实现的目标:在任何时候,尽量确保我至少运行3次下载.@H_502_3@
Connection 1: 1---------7---9--- Connection 2: 2---4----6----- Connection 3: 3-----5---8-----
from random import randint import asyncio count = 0 async def download(code,permit_download,no_concurrent,downloading_event): global count downloading_event.set() wait_time = randint(1,3) print('downloading {} will take {} second(s)'.format(code,wait_time)) await asyncio.sleep(wait_time) # I/O,context will switch to main function print('downloaded {}'.format(code)) count -= 1 if count < no_concurrent and not permit_download.is_set(): permit_download.set() async def main(loop): global count permit_download = asyncio.Event() permit_download.set() downloading_event = asyncio.Event() no_concurrent = 3 i = 0 while i < 9: if permit_download.is_set(): count += 1 if count >= no_concurrent: permit_download.clear() loop.create_task(download(i,downloading_event)) await downloading_event.wait() # To force context to switch to download function downloading_event.clear() i += 1 else: await permit_download.wait() await asyncio.sleep(9) if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main(loop)) finally: loop.close()
downloading 0 will take 2 second(s) downloading 1 will take 3 second(s) downloading 2 will take 1 second(s) downloaded 2 downloading 3 will take 2 second(s) downloaded 0 downloading 4 will take 3 second(s) downloaded 1 downloaded 3 downloading 5 will take 2 second(s) downloading 6 will take 2 second(s) downloaded 5 downloaded 6 downloaded 4 downloading 7 will take 1 second(s) downloading 8 will take 1 second(s) downloaded 7 downloaded 8
但这是我的问题:@H_502_3@
>目前,我只是等待9秒钟让主功能继续运行,直到下载完成.在退出main函数之前是否有一种等待上次下载完成的有效方法? (我知道有asyncio.wait,但是我需要存储它的所有任务引用才能工作)
>做这种任务的好图书馆是什么?我知道javascript有很多异步库,但是Python呢?@H_502_3@
编辑:
2.什么是一个很好的库来处理常见的异步模式? (像https://www.npmjs.com/package/async这样的东西)@H_502_3@
解决方法
您基本上需要一个固定大小的下载任务池. asyncio没有开箱即用的这种功能,但很容易创建一个:只需保留一组任务,不要让它超过限制.虽然问题表明你不愿意沿着这条路走下去,但代码更加优雅:
async def download(code): wait_time = randint(1,context will switch to main function print('downloaded {}'.format(code)) async def main(loop): no_concurrent = 3 dltasks = set() i = 0 while i < 9: if len(dltasks) >= no_concurrent: # Wait for some download to finish before adding a new one _done,dltasks = await asyncio.wait( dltasks,return_when=asyncio.FIRST_COMPLETED) dltasks.add(loop.create_task(download(i))) i += 1 # Wait for the remaining downloads to finish await asyncio.wait(dltasks)
另一种方法是创建一个固定数量的协同程序进行下载,就像固定大小的线程池一样,并使用asyncio.Queue为它们提供工作.这消除了手动限制下载次数的需要,这将自动受到调用download()的协同程序数量的限制:@H_502_3@
# download() defined as above async def download_from(q): while True: code = await q.get() if code is None: # pass on the word that we're done,and exit await q.put(None) break await download(code) async def main(loop): q = asyncio.Queue() dltasks = [loop.create_task(download_from(q)) for _ in range(3)] i = 0 while i < 9: await q.put(i) i += 1 # Inform the consumers there is no more work. await q.put(None) await asyncio.wait(dltasks)