使用asyncio处理并发

1、asyncio是一个异步IO非阻塞框架

2、async/await是Python提供的异步编程API,而asyncio只是一个利用 async/await API进行异步编程的框架

3、并发:一次处理多件事

4、并行:一次做多件事

5、asyncio使用事件循环驱动的协程实现并发,这是Python中最大也是最具雄心壮志的库之一

6、应该摒弃线程或进程,使用异步编程管理网络应用中的高并发

7、在异步编程中,与回调相比,协程是显著提升性能的方式

8、除非想阻塞主线程,否则不要在asyncio协程中使用time.sleep(), 应该使用yield from asyncip.sleep()

9、asyncio.wait()协程的参数是一个由Feture或Coroutine构成的可迭代对象;wait会分别把各个协程包装进一个Task对象。最终的结果是,wait处理的所有对象都通过某种方式变成Future类的实例。

10、为了使用asyncio包,我们必须把每个访问网络的函数变成异步版,使用yield from处理网络操作,这样才能把控制权交还给事件循环。

11、yield from foo句法能防止阻塞,是因为当前协程暂停后,控制权回到事件循环手中,再去驱动其他协程;foo期物或协程运行完毕后把结果返回给暂停的协程,将其恢复。

12、yield from两点陈述:
(1)使用yield from链接的多个协程最终必须由不是协程的调用方驱动,调用方显示或隐式(例for循环中)在最外委派生成器上调用next()函数或send()方法。
(2)链条中最内层的子生成器必须是简单的生成器(只使用yield)或可迭代的对象。
(3)我们编写的协程链条始终通过把最外层委派生成器传给asyncio包API中的某个函数(如:loop.run_until_complete(...))驱动
(4)使用asyncio包时,我们编写的代码不通过调用next()函数或者.send()方法驱动协程 ——这一点由asyncio包实现的事件循环(loop)去做。
(5)最内层的子生成器是库中真正执行IO操作的函数,而不是我们自己编写的函数。

13、asyncio.ensure_future(coroutine)loop.create_task(coroutine)都可以创建一个task,python3.7增加了asyncio.create_task(coro)

14、run_until_complete的参数是一个futrue对象。当传入一个协程,其内部会自动封装成task,task是Future的子类。

15、asyncio.wait(...) 通过它可以获取一个协同程序的列表,同时返回一个将它们全包括在内的单独的协同程序,并交给loop_run_until_complete处理。

线程与协程对比

示例1: 多个协程切换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import threading

@asyncio.coroutine
def hello():
print("Hello world! (%s)" % threading.currentThread())
yield from asyncio.sleep(10)
print('Hello again! (%s)' % threading.currentThread())

@asyncio.coroutine
def hello2():
print("Hello world2! (%s)" % threading.currentThread())
yield from asyncio.sleep(10)
print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello2()]
wait = asyncio.wait(tasks)
res, _ = loop.run_until_complete(wait)
loop.close()
print(res)

>>>>
Hello world2! (<_MainThread(MainThread, started 35116)>)
Hello world! (<_MainThread(MainThread, started 35116)>)
Hello again! (<_MainThread(MainThread, started 35116)>)
Hello again! (<_MainThread(MainThread, started 35116)>)
{<Task finished coro=<hello() done, defined at C:/Users/gck1d6o/Desktop/FTP/Speed Python/16/async for.py:5> result=None>, <Task finished coro=<hello2() done, defined at C:/Users/gck1d6o/Desktop/FTP/Speed Python/16/async for.py:12> result=None>}

示例2: 异步IO执行HTTP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio

@asyncio.coroutine
def wget(host):
print("wget %s..." % host)
connect = asyncio.open_connection(host, 80)
reader, writer = yield from connect
header = "GET / HTTP/1.0\r\nHost: %s\r\n\r\n" % host
writer.write(header.encode("utf-8"))
yield from writer.drain()
while True:
line = yield from reader.readline()
if line == b"\r\n":
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
writer.close()


loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ('www.sina.com.cn', 'www.sohu.com', 'www.163.com')]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

>>>>
wget www.sina.com.cn...
wget www.163.com...
wget www.sohu.com...
...

示例3: 异步IO爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import asyncio
import aiohttp
import os
import sys
import time

POP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR").split()
BASE_URL = "http://flupy.org/data/flags"
DEST_DIR = "downloads/"

def save_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, "wb") as fp:
fp.write(img)

def show(text):
print(text, end=" ")
sys.stdout.flush()

def main(download):
t0 = time.time()
count = download(POP20_CC)
elapsed = time.time() - t0
msg = "\n{} flags downloaded in {:.2f}s"
print(msg.format(count, elapsed))

@asyncio.coroutine
def get_flag(cc):
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
resp = yield from aiohttp.request("GET", url)
image = yield from resp.read()
return image

@asyncio.coroutine
def download_one(cc):
image = yield from get_flag(cc)
show(cc)
save_flag(image, cc.lower() + ".gif")
return cc

def download_many(cc_list):
loop = asyncio.get_event_loop()
tasks = [download_one(cc) for cc in sorted(cc_list)]
wait_coro = asyncio.wait(tasks)
res, _ = loop.run_until_complete(wait_coro)
loop.close()
return len(res)

if __name__ == '__main__':
main(download_many)

>>>>
ID TR FR RU EG IN VN CN BD BR DE NG JP MX ET PK PH US CD IR
20 flags downloaded in 0.89

避免阻塞型调用

有两种方法能避免阻塞型调用中止整个应用程序的进程:
1、在单独的线程或进程中运行各个阻塞型操作
2、把每个阻塞型操作转换为非阻塞的异步调用

使用Executor对象,防止阻塞事件循环

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio
import os

DEST_DIR = "downloads/"

def save_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, "wb") as fp:
fp.write(img)

loop = asyncio.get_event_loop()
loop.run_in_executor(None, save_flag, image, cc.lower() + ".gif")

Semaphore 信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import asyncio
import aiohttp
import os
import sys
import time

POP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR").split()
BASE_URL = "http://flupy.org/data/flags"
DEST_DIR = "downloads/"

def save_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, "wb") as fp:
fp.write(img)

def show(text):
print(text, end=" ")
sys.stdout.flush()

def main(download):
t0 = time.time()
count = download(POP20_CC)
elapsed = time.time() - t0
msg = "\n{} flags downloaded in {:.2f}s"
print(msg.format(count, elapsed))

@asyncio.coroutine
def get_flag(cc):
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
resp = yield from aiohttp.request("GET", url)
image = yield from resp.read()
return image

@asyncio.coroutine
def download_one(cc, semaphore):
with (yield from semaphore):  # 把semaphore当成上下文管理器使用
image = yield from get_flag(cc)
show(cc)
save_flag(image, cc.lower() + ".gif")
return cc

def download_many(cc_list):
loop = asyncio.get_event_loop()
semaphore = asyncio.Semaphore(5)  # 给予5个信号
tasks = [download_one(cc, semaphore) for cc in sorted(cc_list)]
wait_coro = asyncio.wait(tasks)
res, _ = loop.run_until_complete(wait_coro)
loop.close()
return len(res)

if __name__ == '__main__':
main(download_many)

绑定回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import time
import asyncio

now = lambda : time.clock()

async def do_some_work(x):
print("Waiting:", x)
return "Done after {}s".format(x)

def callback(future):
print("Callback:", future.result())

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop.run_until_complete(task)

print("Time:", now() - start)

>>>>
Waiting: 2
Callback: Done after 2s
Time: 0.0015685070140109125

获取协程的result

示例1: 获取result

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import time

now = lambda: time.clock()

async def do_some_work(x):
print("Waiting:", x)
await asyncio.sleep(x)
return "Done after {}s".format(x)

start = now()

tasks = [do_some_work(x) for x in range(1, 4)]
wait_coro = asyncio.wait(tasks)
loop = asyncio.get_event_loop()
res, _ = loop.run_until_complete(wait_coro)

for re in res:
print(re.result())

>>>>
Waiting: 1
Waiting: 2
Waiting: 3
Done after 3s
Done after 2s
Done after 1s

示例2: 获取result

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import time

now = lambda: time.clock()

async def do_some_work(x):
print("Waiting:", x)
await asyncio.sleep(x)
return "Done after {}s".format(x)

start = now()

tasks = [asyncio.ensure_future(do_some_work(x)) for x in range(1, 4)]
wait_coro = asyncio.wait(tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(wait_coro)

for task in tasks:
print("Task ret:", task.result())

>>>>
Waiting: 1
Waiting: 2
Waiting: 3
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 3s

前面两个示例都是需要等待所有协程都运行完毕后才能查看到result, 有时候我们需要协程运行结束后要求立即获取结果,这里可以用asyncio.as_completed来实现

示例3:asyncio.as_completed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
import time

async def do_some_work(x):
print("Waiting:", x)
await asyncio.sleep(x)
return "Done after {}s".format(x)

async def main():
tasks = [do_some_work(x) for x in range(1, 4)]
wait_coro = asyncio.wait(tasks)

for task in asyncio.as_completed(tasks):
result = await task
print("Task ret:{}".format(result))

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

>>>>
Waiting: 2
Waiting: 3
Waiting: 1
Task ret:Done after 1s
Task ret:Done after 2s
Task ret:Done after 3s

示例4:asyncio.as_completed + tqdm实现进度条

1
2
to_do_iter = asyncio.as_completed(to_do) 
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))

摘自: 《流畅的Python》第十八章