方法。 (2)链条中最内层的子生成器必须是简单的生成器(只使用yield)或可迭代的对象。 (3)我们编写的协程链条始终通过把最外层委派生成器传给asyncio包API中的某个函数(如:loop.run_until_complete(...)
)驱动 (4)使用asyncio包时,我们编写的代码不通过调用next()
方法驱动协程 ——这一点由asyncio包实现的事件循环(loop)去做。 (5)最内层的子生成器是库中真正执行IO操作的函数,而不是我们自己编写的函数。
和 loop.create_task(coroutine)
线程与协程对比 示例1: 多个协程切换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import asyncioimport threading@asyncio.coroutine def hello (): print ("Hello world! (%s)" % threading.currentThread()) yield from asyncio.sleep(10 ) print ('Hello again! (%s)' % threading.currentThread()) @asyncio.coroutine def hello2 (): print ("Hello world2! (%s)" % threading.currentThread()) yield from asyncio.sleep(10 ) print ('Hello again! (%s)' % threading.currentThread()) loop = asyncio.get_event_loop() tasks = [hello(), hello2()] wait = asyncio.wait(tasks) res, _ = loop.run_until_complete(wait) loop.close() print (res)>>>> Hello world2! (<_MainThread(MainThread, started 35116 )>) Hello world! (<_MainThread(MainThread, started 35116 )>) Hello again! (<_MainThread(MainThread, started 35116 )>) Hello again! (<_MainThread(MainThread, started 35116 )>) {<Task finished coro=<hello() done, defined at C:/Users/gck1d6o/Desktop/FTP/Speed Python/16 /async for .py:5 > result=None >, <Task finished coro=<hello2() done, defined at C:/Users/gck1d6o/Desktop/FTP/Speed Python/16 /async for .py:12 > result=None >}
示例2: 异步IO执行HTTP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import asyncio@asyncio.coroutine def wget (host ): print ("wget %s..." % host) connect = asyncio.open_connection(host, 80 ) reader, writer = yield from connect header = "GET / HTTP/1.0\r\nHost: %s\r\n\r\n" % host writer.write(header.encode("utf-8" )) yield from writer.drain() while True : line = yield from reader.readline() if line == b"\r\n" : break print ('%s header > %s' % (host, line.decode('utf-8' ).rstrip())) writer.close() loop = asyncio.get_event_loop() tasks = [wget(host) for host in ('www.sina.com.cn' , 'www.sohu.com' , 'www.163.com' )] loop.run_until_complete(asyncio.wait(tasks)) loop.close() >>>> wget www.sina.com.cn... wget www.163 .com... wget www.sohu.com... ...
示例3: 异步IO爬虫
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import asyncioimport aiohttpimport osimport sysimport timePOP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR" ).split() BASE_URL = "http://flupy.org/data/flags" DEST_DIR = "downloads/" def save_flag (img, filename ): path = os.path.join(DEST_DIR, filename) with open (path, "wb" ) as fp: fp.write(img) def show (text ): print (text, end=" " ) sys.stdout.flush() def main (download ): t0 = time.time() count = download(POP20_CC) elapsed = time.time() - t0 msg = "\n{} flags downloaded in {:.2f}s" print (msg.format (count, elapsed)) @asyncio.coroutine def get_flag (cc ): url = "{}/{cc}/{cc}.gif" .format (BASE_URL, cc=cc.lower()) resp = yield from aiohttp.request("GET" , url) image = yield from resp.read() return image @asyncio.coroutine def download_one (cc ): image = yield from get_flag(cc) show(cc) save_flag(image, cc.lower() + ".gif" ) return cc def download_many (cc_list ): loop = asyncio.get_event_loop() tasks = [download_one(cc) for cc in sorted (cc_list)] wait_coro = asyncio.wait(tasks) res, _ = loop.run_until_complete(wait_coro) loop.close() return len (res) if __name__ == '__main__' : main(download_many) >>>> ID TR FR RU EG IN VN CN BD BR DE NG JP MX ET PK PH US CD IR 20 flags downloaded in 0.89
避免阻塞型调用 有两种方法能避免阻塞型调用中止整个应用程序的进程: 1、在单独的线程或进程中运行各个阻塞型操作 2、把每个阻塞型操作转换为非阻塞的异步调用
使用Executor对象,防止阻塞事件循环 1 2 3 4 5 6 7 8 9 10 11 12 import asyncioimport osDEST_DIR = "downloads/" def save_flag (img, filename ): path = os.path.join(DEST_DIR, filename) with open (path, "wb" ) as fp: fp.write(img) loop = asyncio.get_event_loop() loop.run_in_executor(None , save_flag, image, cc.lower() + ".gif" )
Semaphore 信号量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import asyncioimport aiohttpimport osimport sysimport timePOP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR" ).split() BASE_URL = "http://flupy.org/data/flags" DEST_DIR = "downloads/" def save_flag (img, filename ): path = os.path.join(DEST_DIR, filename) with open (path, "wb" ) as fp: fp.write(img) def show (text ): print (text, end=" " ) sys.stdout.flush() def main (download ): t0 = time.time() count = download(POP20_CC) elapsed = time.time() - t0 msg = "\n{} flags downloaded in {:.2f}s" print (msg.format (count, elapsed)) @asyncio.coroutine def get_flag (cc ): url = "{}/{cc}/{cc}.gif" .format (BASE_URL, cc=cc.lower()) resp = yield from aiohttp.request("GET" , url) image = yield from resp.read() return image @asyncio.coroutine def download_one (cc, semaphore ): with (yield from semaphore): image = yield from get_flag(cc) show(cc) save_flag(image, cc.lower() + ".gif" ) return cc def download_many (cc_list ): loop = asyncio.get_event_loop() semaphore = asyncio.Semaphore(5 ) tasks = [download_one(cc, semaphore) for cc in sorted (cc_list)] wait_coro = asyncio.wait(tasks) res, _ = loop.run_until_complete(wait_coro) loop.close() return len (res) if __name__ == '__main__' : main(download_many)
绑定回调函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import timeimport asyncionow = lambda : time.clock() async def do_some_work (x ): print ("Waiting:" , x) return "Done after {}s" .format (x) def callback (future ): print ("Callback:" , future.result()) start = now() coroutine = do_some_work(2 ) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) task.add_done_callback(callback) loop.run_until_complete(task) print ("Time:" , now() - start)>>>> Waiting: 2 Callback: Done after 2s Time: 0.0015685070140109125
获取协程的result 示例1: 获取result
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import asyncioimport timenow = lambda : time.clock() async def do_some_work (x ): print ("Waiting:" , x) await asyncio.sleep(x) return "Done after {}s" .format (x) start = now() tasks = [do_some_work(x) for x in range (1 , 4 )] wait_coro = asyncio.wait(tasks) loop = asyncio.get_event_loop() res, _ = loop.run_until_complete(wait_coro) for re in res: print (re.result()) >>>> Waiting: 1 Waiting: 2 Waiting: 3 Done after 3s Done after 2s Done after 1s
示例2: 获取result
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import asyncioimport timenow = lambda : time.clock() async def do_some_work (x ): print ("Waiting:" , x) await asyncio.sleep(x) return "Done after {}s" .format (x) start = now() tasks = [asyncio.ensure_future(do_some_work(x)) for x in range (1 , 4 )] wait_coro = asyncio.wait(tasks) loop = asyncio.get_event_loop() loop.run_until_complete(wait_coro) for task in tasks: print ("Task ret:" , task.result()) >>>> Waiting: 1 Waiting: 2 Waiting: 3 Task ret: Done after 1s Task ret: Done after 2s Task ret: Done after 3s
前面两个示例都是需要等待所有协程都运行完毕后才能查看到result, 有时候我们需要协程运行结束后要求立即获取结果,这里可以用asyncio.as_completed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import asyncioimport timeasync def do_some_work (x ): print ("Waiting:" , x) await asyncio.sleep(x) return "Done after {}s" .format (x) async def main (): tasks = [do_some_work(x) for x in range (1 , 4 )] wait_coro = asyncio.wait(tasks) for task in asyncio.as_completed(tasks): result = await task print ("Task ret:{}" .format (result)) loop = asyncio.get_event_loop() loop.run_until_complete(main()) >>>> Waiting: 2 Waiting: 3 Waiting: 1 Task ret:Done after 1s Task ret:Done after 2s Task ret:Done after 3s
示例4:asyncio.as_completed + tqdm
1 2 to_do_iter = asyncio.as_completed(to_do) to_do_iter = tqdm.tqdm(to_do_iter, total=len (cc_list))
摘自: 《流畅的Python》第十八章