asyncio异步IO--协程(Coroutine)与任务(Task)详解

目录
  1. 1. 协程
  2. 2. “可等待”对象(Awaitables)
    1. 2.1. Coroutine(协程)
    2. 2.2. Task(任务)
    3. 2.3. Future(未来对象)
  3. 3. 执行asyncio程序
  4. 4. 创建Task
  5. 5. Sleeping
  6. 6. 并发执行Tasks
    1. 6.1. 避免取消
  7. 7. 超时(Timeouts)
  8. 8. 等待原语(Waiting Primitives)
  9. 9. 从其他线程调度执行(Scheduling From Other Threads)
  10. 10. 自查(Introspection)
  11. 11. Task对象
  12. 12. 基于生成器的协程(Generator-based Coroutines)

摘要:本文翻译自Coroutines and Tasks,主要介绍asyncio中用于处理协程和任务的方法和接口。在翻译过程中,译者在官方文档的基础上增加了部分样例代码和示意图表,以帮助读者对文档的理解。本文内容主要针对python3.7,在低版本的python中可能不适用,敬请留意。

协程

协程(coroutines)是通过async/await定义函数或方法,是使用asyncio进行异步编程的首选途径。如下,是一个协程的例子:

1
2
3
4
5
6
import asyncio

async def main():
print("hello")
await asyncio.sleep(1)
print("world")

上例中的 main 方法就是我们定义的协程 。代码的功能很简单:

我们在交互环境(Python3.7)下执行以上代码,看看效果:

1
2
3
4
5
6
7
8
9
10
>>> import asyncio

>>> async def main():
... print("hello")
... await asyncio.sleep(1)
... print("world")

>>> asyncio.run(main())
hello
world

需要注意的是:如果像执行普通代码一样直接调用main(),只会返回一个coroutine对象,main()方法内的代码不会执行:

1
2
>>> main() #直接执行main()返回的是一个coroutine对象。
<coroutine object main at 0x0000000002C97848>

实际上,asyncio提供了三种执行协程的机制:

  • 使用asyncio.run()执行协程。一般用于执行最顶层的入口函数,如main()。
  • await一个协程。一般用于在一个协程中调用另一协程。 如下是一个示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
>>> import time
>>> async def say_after(delay,what):
await asyncio.sleep(delay)
print(what)

>>> async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1,"hello")
await say_after(2,"world")
print(f"finished at {time.strftime('%X')}")

>>> asyncio.run(main())
started at 16:47:10
hello
world
finished at 16:47:13

执行耗时 3秒

  • asyncio.create_task()方法将Coroutine(协程)封装为Task(任务)。一般用于实现异步并发操作。 需要注意的是,只有在当前线程存在事件循环的时候才能创建任务(Task)。

我们修改以上的例程,并发执行两个say_after协程。

1
2
3
4
5
6
7
async def main():
task1 = asyncio.create_task(say_after(1,"hello"))
task2 = asyncio.create_task(say_after(2,"world"))
print(f"started at {time.strftime('%X')}")
await task1
await task2
print(f"finished at {time.strftime('%X')}")

执行asyncio.run(main()),结果如下:

1
2
3
4
started at 17:01:34
hello
world
finished at 17:01:36

耗时2秒

“可等待”对象(Awaitables)

如果一个对象能够被用在await表达式中,那么我们称这个对象是可等待对象(awaitable object)。很多asyncio API都被设计成了可等待的。
主要有三类可等待对象:

  • 协程coroutine
  • 任务Task
  • 未来对象Future。

Coroutine(协程)

Python的协程是可等待的(awaitable),因此能够被其他协程用在await表达式中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio

async def nested():
print("something")

async def main():
# 如果直接调用 "nested()",什么都不会发生.
# 直接调用的时候只是创建了一个 协程对象 ,但这个对象没有被 await,
# 所以它并不会执行.
nested()

# 那么我们 await 这个协程,看看会是什么结果:
await nested() # 将会打印 "something".

asyncio.run(main())

重要:在这篇文章中,术语coroutine或协程指代两个关系紧密的概念:

  • 协程函数(coroutine function):由async def定义的函数;
  • 协程对象(coroutine object):调用 协程函数返回的对象。

asyncio也支持传统的基于生成器的协程。

Task(任务)

Task用来 并发的 调度协程。
当一个协程通过类似 asyncio.create_task() 的函数被封装进一个 Task时,这个协程 会很快被自动调度执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

async def nested():
return 42

async def main():
# Schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())

# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await task

asyncio.run(main())

Future(未来对象)

Future 是一种特殊的底层可等待对象,代表一个异步操作的最终结果。
当一个Future对象被await的时候,表示当前的协程会持续等待,直到 Future对象所指向的异步操作执行完毕。
在asyncio中,Future对象能使基于回调的代码被用于asyn/await表达式中。
一般情况下,在应用层编程中,没有必要创建Future对象。
有时候,有些Future对象会被一些库和asyncio API暴露出来,我们可以await它们:

1
2
3
4
5
6
7
8
async def main():
await function_that_returns_a_future_object()

# this is also valid:
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
)

底层函数返回Future对象的一个例子是:loop.run_in_executor

执行asyncio程序

asyncio.run(coro, * , debug=False)
这个函数运行coro参数指定的协程,负责管理asyncio事件循环 , 终止异步生成器
在同一个线程中,当已经有asyncio事件循环在执行时,不能调用此函数。
如果debug=True,事件循环将运行在调试模式
此函数总是创建一个新的事件循环,并在最后关闭它。建议将它用作asyncio程序的主入口,并且只调用一次。
Python3.7新增
重要:这个函数是在Python3.7被临时添加到asyncio中的。

创建Task

asyncio.create_task(coro)
将coro参数指定的协程(coroutine)封装到一个Task中,并调度执行。返回值是一个Task对象。
任务在由get_running_loop()返回的事件循环(loop)中执行。如果当前线程中没有正在运行的事件循环,将会引发RuntimeError异常:

1
2
3
4
5
import asyncio
async def coro_1():
print("do somthing")

task = asyncio.create_task(coro_1())

因为当前线程中没有正运行的事件循环,所以引发异常:

1
2
3
4
5
6
7
8
Traceback (most recent call last):
File "C:\Program Files\Python37\lib\site-packages\IPython\core\interactiveshell.py", line 3265, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-4-456c15a4ed16>", line 1, in <module>
task = asyncio.create_task(coro_1())
File "C:\Program Files\Python37\lib\asyncio\tasks.py", line 324, in create_task
loop = events.get_running_loop()
RuntimeError: no running event loop

对以上代码稍作修改,创建main()方法,在其中创建Task对象,然后在主程序中利用asyncio.run()创建事件循环:

1
2
3
4
5
6
7
8
9
import asyncio
async def coro():
print("something is running")

async def main():
task = asyncio.create_task(coro())
print(asyncio.get_running_loop())

asyncio.run(main())

执行结果如下:

1
2
<_WindowsSelectorEventLoop running=True closed=False debug=False>
something is running

此函数已经被引入到Python3.7。在Python早期版本中,可以使用底层函数asyncio.ensure_future()代替。

1
2
3
4
5
6
7
8
9
10
async def coro():
...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
...

Python3.7新增

Sleeping

coroutine asyncio.sleep(delay,result=None,* ,loop=None)
阻塞delay秒,例如delay=3,则阻塞3秒。
如果指定了result参数的值,则在协程结束时,将该值返回给调用者。
sleep()通常只暂停当前task,并不影响其他task的执行。
不建议使用loop参数,因为Python计划在3.10版本中移除它。
以下是一个协程的例子,功能是在5秒钟内,每秒显示一次当前的日期:

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio
import datetime

async def display_date():
loop = asyncio.get_running_loop()
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)
asyncio.run(display_date())

执行结果大致如下:

1
2
3
4
5
6
2018-11-20 11:27:15.961830
2018-11-20 11:27:16.961887
2018-11-20 11:27:17.961944
2018-11-20 11:27:18.962001
2018-11-20 11:27:19.962059
2018-11-20 11:27:20.962116

并发执行Tasks

awaitable asyncio.gather(* aws, loop=None, return_exceptions=False)
并发执行aws参数指定的 可等待(awaitable)对象序列。
如果aws序列中的某个awaitable对象是一个协程,则自动将这个协程封装为Task对象进行处理。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio

async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")

async def main():
# Schedule three calls *concurrently*:
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)

asyncio.run(main())

# Expected output:
#
# Task A: Compute factorial(2)...
# Task B: Compute factorial(2)...
# Task C: Compute factorial(2)...
# Task A: factorial(2) = 2
# Task B: Compute factorial(3)...
# Task C: Compute factorial(3)...
# Task B: factorial(3) = 6
# Task C: Compute factorial(4)...
# Task C: factorial(4) = 24

如果所有的awaitable对象都执行完毕,则返回awaitable对象执行结果的聚合列表。返回值的顺序于aws参数的顺序一致。
简单修改以上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio

async def factorial(name, number):
f = 1
for i in range(2, number + 1):
#print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i

#print(f"Task {name}: factorial({number}) = {f}")
return number

async def main():
# Schedule three calls *concurrently*:
print(await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
))

asyncio.run(main())

# Expected output:
#
#[2, 3, 4]#await asyncio.gather()的返回值是一个列表,
#分别对应factorial("A", 2),factorial("B", 3),factorial("C", 4)的执行结果。

如果return_execptions参数为False(默认值即为False),引发的第一个异常会立即传播给等待gather()的任务,即调用await asyncio.gather()对象。序列中其他awaitable对象的执行不会受影响。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio

async def division(divisor, dividend):
if divisor == 0:
raise ZeroDivisionError
else:
print(f"{dividend}/{divisor}={dividend/divisor}")
return dividend/divisor

async def main():
# Schedule three calls *concurrently*:
print(await asyncio.gather(
division(0, 2),
division(1, 2),
division(2, 2),
))

asyncio.run(main())

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
2/1=2.0
2/2=1.0
Traceback (most recent call last):
File "test.py", line 19, in <module>
asyncio.run(main())
File "c:\Program Files\Python37\lib\asyncio\runners.py", line 43, in run
return loop.run_until_complete(main)
File "c:\Program Files\Python37\lib\asyncio\base_events.py", line 573, in run_until_complete
return future.result()
File "test.py", line 16, in main
division(2, 2),
File "test.py", line 6, in division
raise ZeroDivisionError
ZeroDivisionError

如果return_exceptions参数为True,异常会和正常结果一样,被聚合到结果列表中返回。
对以上代码稍作修改,将return_exceptions设为True:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def division(divisor, dividend):
if divisor == 0:
raise ZeroDivisionError
else:
print(f"{dividend}/{divisor}={dividend/divisor}")
return dividend/divisor

async def main():
# Schedule three calls *concurrently*:
print(await asyncio.gather(
division(0, 2),
division(1, 2),
division(2, 2),
return_exceptions=True
))

asyncio.run(main())

执行结果如下:

1
2
3
2/1=2.0
2/2=1.0
[ZeroDivisionError(), 2.0, 1.0]#错误不会向上传播,而是作为结果返回

如果gather()被取消,则提交的所有awaitable对象(尚未执行完成的)都会被取消。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio

async def division(divisor, dividend):
if divisor == 0:
raise ZeroDivisionError
else:
await asyncio.sleep(divisor)
print(f"{dividend}/{divisor}={dividend/divisor}")
return dividend/divisor

async def main():
# Schedule three calls *concurrently*:
t = asyncio.gather(
division(0, 2),
division(1, 5),
division(3, 6),
return_exceptions=True
)
await asyncio.sleep(2)
t.cancel()
await t

asyncio.run(main())

执行结果:

1
2
3
4
5
6
7
8
9
10
5/1=5.0 #除已执行的之外,其他的任务全部被取消
Traceback (most recent call last):
File "test.py", line 23, in <module>
asyncio.run(main())
File "c:\Program Files\Python37\lib\asyncio\runners.py", line 43, in run
return loop.run_until_complete(main)
File "c:\Program Files\Python37\lib\asyncio\base_events.py", line 573, in run_until_complete
return future.result()
concurrent.futures._base.CancelledError
#在return_exceptions=True的情况下,异常依然向上传播。

如果aws中某些Task或Future被取消,gather()调用不会被取消,被取消的Task或Future会以引发CancelledError的方式被处理。这样可以避免个别awaitable对象的取消操作影响其他awaitable对象的执行。
例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio

async def division(divisor, dividend):
if divisor == 0:
raise ZeroDivisionError
else:
await asyncio.sleep(divisor)
print(f"{dividend}/{divisor}={dividend/divisor}")
return dividend/divisor

async def main():
# Schedule three calls *concurrently*:
task1 = asyncio.create_task(division(0, 2))
task2 = asyncio.create_task(division(1, 5))
task3 = asyncio.create_task(division(3, 6))
t = asyncio.gather(
task1,
task2,
task3,
return_exceptions=True
)
task1.cancel()

print(await t)

asyncio.run(main())

预期执行结果如下:

1
2
3
5/1=5.0
6/3=2.0
[CancelledError(), 5.0, 2.0] # 仅task1被取消,其他任务不受影响。

避免取消

awaitable asyncio.shield(aw, * , loop=None)
防止awaitable对象被取消(cancelled)执行。
如果aw参数是一个协程(coroutines),该对象会被自动封装为Task对象进行处理。
通常,代码:

1
2
#code 1
res = await shield(something())

同代码:

1
2
#code 2
res = await something()

是等价的。
特殊情况是,如果包含以上代码的协程被 取消,code 1与code 2的执行效果就完全不同了:

  • code 1中,运行于something()中的任务不会被取消
  • code 2中,运行于something()中的任务会被取消

在code 1中,从something()的视角看,取消操作并没有发生。然而,事实上它的调用者确实被取消了,所以await shield(something())仍然会引发一个CancelledError异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
import time

async def division(divisor, dividend):
if divisor == 0:
raise ZeroDivisionError
else:
await asyncio.sleep(divisor)
print(f"{time.strftime('%X')}:{dividend}/{divisor}={dividend/divisor}")
return dividend/divisor

async def main():
# Schedule three calls *concurrently*:
print(f"Start time:{time.strftime('%X')}")
task1 = asyncio.shield(division(1, 2))
task2 = asyncio.create_task(division(1, 5))
task3 = asyncio.create_task(division(3, 6))

res = asyncio.gather(task1, task2, task3, return_exceptions=True)

task1.cancel()
task2.cancel()
print(await res)

asyncio.run(main())

执行结果:

1
2
3
4
5
6
7
Start time:10:38:48
10:38:49:2/1=2.0
10:38:51:6/3=2.0
[CancelledError(), CancelledError(), 2.0]
#task1虽然被取消,但是division(1,2)依然正常执行了。
#task2被取消后,division(1,5)没有执行
#虽然task1内的协程被执行,但返回值依然为CancelledError

如果something()以其他的方式被取消,比如从自身内部取消,那么shield()也会被取消。
如果希望完全忽略取消操作(不推荐这么做),则可以将shield()与try/except结合起来使用:

1
2
3
4
try:
res = await shield(something())
except CancelledError:
res = None

超时(Timeouts)

coroutine asyncio.wait_for(aw,timeout,*,loop=None)
在timeout时间之内,等待aw参数指定的awaitable对象执行完毕。
如果aw是一个协程,则会被自动作为Task处理。
timeout可以是None也可以是一个float或int类型的数字,表示需要等待的秒数。如果timeout是None,则永不超时,一直阻塞到aw执行完毕。
如果达到timeout时间,将会取消待执行的任务,引发asyncio.TimeoutError.
如果想避免任务被取消,可以将其封装在shield()中。
程序会等待到任务确实被取消掉,所以等待的总时间会比timeout略大。
如果await_for()被取消,aw也会被取消。
loop参数将在Python3.10中删除,所以不推荐使用。
示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print('yay!')

async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')

asyncio.run(main())

# Expected output:
#
# timeout!

Python3.7新特性:当aw因为超时被取消,wait_for()等到aw确实被取消之后返回异常。在以前的版本中,wait_for会立即返回异常。

等待原语(Waiting Primitives)

wait()
coroutine asyncio.wait(aws,*,loop=None,timeout=None,return_when=ALL_COMPLETED)
并发执行aws中的awaitable对象,一直阻塞到return_when指定的情况出现。
如果aws中的某些对象是协程(coroutine),则自动转换为Task对象进行处理。直接将coroutine对象传递给wait()会导致令人迷惑的执行结果,所以不建议这么做。
返回值是两个Task/Future集合:(done,pending)。
用法示例:
done,pending = await asyncio.wait(aws)
loop参数将在Python3.10中删除,所以不建议使用。
timeout参数可以是一个int或float类型的值,可以控制最大等待时间。
需要注意的是,wait()不会引发asyncio.TimeoutError错误。返回前没有被执行的Future和Task会被简单的放入pending集合。
return_when决定函数返回的时机。它只能被设置为以下常量:

Constant Description
FIRST_COMPLETED The function will return when any future finishes or is cancelled.
FIRST_EXCEPTION The function will return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to ALL_COMPLETED.
ALL_COMPLETED The function will return when all futures finish or are cancelled.

与wait_for()不同,wait()不会再超时的时候取消任务。
注意:
因为wait()会自动将协程转换为Task对象进行处理,然后返回这些隐式创建的Task到(done,pending)集合,所以以下代码不会如预期的那样执行。

1
2
3
4
5
6
7
8
async def foo():
return 42

coro = foo()
done, pending = await asyncio.wait({coro})

if coro in done:
# 因为wait()会自动将协程转换为Task对象进行处理,然后返回这些隐式创建的Task到(done,pending)集合,所以这个条件分支永远不会被执行。

上面的代码可以做如下修正:

1
2
3
4
5
6
7
8
async def foo():
return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

if task in done:
# 这回可以正常执行了.

所以,正如上文所讲,不建议将coroutine对象直接传递给wait()。

as_completed()
asyncio.as_completed(aws,*,loop=None,timeout=None)
并发执行aws中的awaitable对象。返回一个Future对象迭代器。每次迭代时返回的Future对象代表待执行的awaitable对象集合里最早出现的结果。注意:迭代器返回的顺序与aws列表的顺序无关,只与结果出现的早晚有关。
如果超时之前还有Future对象未完成,则引发asyncio.TimeoutError异常。
用法示例:

1
2
3
for f in as_completed(aws):
earliest_result = await f
# ...

以下为一个完整的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import time

async def eternity(delay):
await asyncio.sleep(delay)
print(f"delay for {delay} seconds.")
return delay

async def main():
print(f"Start at: {time.strftime('%X')}")
tasks = [eternity(i) for i in range(10)]
for f in asyncio.as_completed(tasks):
res = await f
print(f"End at: {time.strftime('%X')}")

asyncio.run(main())

执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
Start at: 17:19:11
delay for 0 seconds.
delay for 1 seconds.
delay for 2 seconds.
delay for 3 seconds.
delay for 4 seconds.
delay for 5 seconds.
delay for 6 seconds.
delay for 7 seconds.
delay for 8 seconds.
delay for 9 seconds.
End at: 17:19:20

从其他线程调度执行(Scheduling From Other Threads)

asyncio.run_coroutine_threadsafe(coro,loop)
向loop指定的事件循环提交一个由coro指定协程。线程安全。
返回一个concurrent.futures.Future对象,等待另一个线程返回结果。
这个函数用于从当前线程向运行事件循环的线程提交协程(coroutine)。
例如:

1
2
3
4
5
6
7
8
# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

如果协程出现异常,返回的Future会收到通知。返回的Future也可以被用作取消事件循环中的任务:

1
2
3
4
5
6
7
8
9
try:
result = future.result(timeout)
except asyncio.TimeoutError:
print('The coroutine took too long, cancelling the task...')
future.cancel()
except Exception as exc:
print(f'The coroutine raised an exception: {exc!r}')
else:
print(f'The coroutine returned: {result!r}')

可以参考并发与多线程章节。
与其他asyncio函数不同,该函数需要 显式 传递loop参数。
新增于Python 3.5.1

自查(Introspection)

current_task()
asyncio.current_task(loop=None)
返回事件循环中正在运行的Task实例,如果没有Task在执行,则返回None。
如果loop为None,则使用get_running_loop()获取当前事件循环。
新增于Python3.7

all_tasks()
asyncio.all_tasks(loop=None)
返回事件循环中尚未运行结束的Task对象集合。
如果loop为None,则,使用get_running_loop()获取当前事件循环。
新增于Python3.7

Task对象

class asyncio.Task(coro,*,loop=None)
类似与Future对象,用于执行Python协程。非线程安全。
Tasks用于在事件循环中执行协程。如果协程等待一个Future,那么Task会暂停协程的执行,直到Future执行完成。当Future完成时,协程的执行会恢复。
事件循环的 协作调度 模式:一个事件循环同一时间只执行一个Task。当这个Task等待某个Future返回时,事件循环执行其他的Task、回调或IO操作。

可以通过高层函数asyncio.create_task()创建Task,或者通过底层函数loop.create_task()和ensure_future()创建Task。但是不建议直接实例化Task对象。

如果想要取消一个Task的执行,可以使用cancel()方法。调用cancel()会引起Task对象向被封装的协程抛出CancelledError异常。当取消行为发生时,如果协程正在等待某个Future对象执行,该Future对象将被取消。

cancelled()方法用于检查某个Task是否已被取消。如果Task封装的协程没有阻止CancelledError异常,且Task确实被取消了,则该方法返回True。

asyncio.Task继承了Future类中除Future.set_result()和Future.set_exception()以外的所有方法。

Task对象支持contextvars模块:当一个Task被创建的时候,它会复制当前的上下文,然后在复制的上下文副本中执行协程。

Python3.7中的变更:添加了对contextvars模块的支持。

cancel()
申请取消任务。
将在下一个事件循环周期中将CancelledError异常抛给封装在Task中的协程。
收到CancelledError异常后,协程有机会处理异常,甚至以try ...except CancelledError ...finally来拒绝请求。因此,与Future.cancel()不同,Task.cancel()不能保证Task一定被取消掉。当然,拒绝取消请求这种操作并不常见,而且很不提倡。

以下例子可以说明协程如何拦截取消请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio
async def cancel_me():
print('cancel_me(): before sleep')

try:
# Wait for 1 hour
await asyncio.sleep(3600)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')

async def main():
# Create a "cancel_me" Task
task = asyncio.create_task(cancel_me())

# Wait for 1 second
await asyncio.sleep(1)

task.cancel()
try:
await task
except asyncio.CancelledError:
print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
# cancel_me(): before sleep
# cancel_me(): cancel sleep
# cancel_me(): after sleep
# main(): cancel_me is cancelled now

cancelled()
如果Task已经被取消,则返回True。
当取消请求通过cancel()被提交,且Task封装的协程传播了抛给它的CancelledError异常,则此Task被取消。

done()
如果Task已完成,则返回True。
Task完成有三种情况:

  • 封装的协程已返回
  • 封装的协程已抛出异常
  • Task被取消

result()
返回Task的执行结果。
如果Task已经完成,则返回Task封装的协程的执行结果(如果Task封装的协程引发异常,则重新引发该异常)。
如果Task已经取消,则该方法引发CancelledError异常。
如果Task的结果还不可用,该方法引发InvalidStateError异常。

exception()
返回Task的异常。
如果封装的协程引发了异常,则返回此异常。如果封装的协程执行正常,则返回None。
如果Task已被取消,则引发CancelledError异常。
如果Task尚未完成,则引发InvalidStateError异常。

add_done_callback()
添加一个回调函数,在Task完成后执行。
这个方法只应用在基于回调的底层编程中。
具体细节可以参考Future.remove_done_callback()

get_stack( ,limit=None)*
返回此Task的堆栈帧列表。

  • 如果封装的协程未完成,此方法返回它暂停位置的堆栈。
  • 如果封装的协程已经完成或已被取消,此方法返回一个空的列表。
  • 如果封装的协程因异常而结束,此方法返回异常回溯列表。

帧的顺序总是由旧到新
暂停中的协程只返回一个堆栈帧。
可选参数limit用于限定返回帧的最大数目。默认情况下,所有有效的帧都会返回。
在返回堆栈和返回异常回溯时,列表的顺序会有所不同:

  • 最新的堆栈帧会被返回
  • 最老的回溯帧会被返回(这和异常回溯模块的机制有关)

print_stack( ,limit=None,file=None)*
打印Task的栈帧或异常回溯。
此方法用于输出由get_stack()取回的帧列表,输出形式类似于回溯(traceback)模块
limit参数会直接传递给get_stack()。
file参数指定输出的I/O流,默认为sys.stderr。

classmethod all_tasks(loop=None)
返回一个事件循环上所有任务的集合。
默认情况下,当前事件循环上所有的任务都会被返回。如果loop参数为’None’,则通过get_event_loop()方法获取当前事件循环。

此方法将在Python3.9中被移除,所以不建议使用。可以使用asyncio.all_tasks()代替。

calssmethod current_task(loop=None)
返回当前正在运行的Task或None。
如果loop参数为None,则通过get_event_loop()方法获取当前事件循环。
此方法将在Python3.9中被移除,所以不建议使用。可以使用asyncio.current_task()代替。

基于生成器的协程(Generator-based Coroutines)

提示:对基于生成器的协程的支持将在Python3.10中移除,不建议使用。
基于生成器的协程是早期的异步实现方式,出现在async/await语法之前,使用yield from表达式等待Future或其他协程。
基于生成器的协程应该用 @asyncio.coroutine来修饰,尽管这不是强制的。

** @asyncio.coroutine**
基于生成器的协程的修饰器。
这个修饰器能使传统的基于生成器的协程async/await语法兼容:

1
2
3
4
5
6
@asyncio.coroutine
def old_style_coroutine():
yield from asyncio.sleep(1)

async def main():
await old_style_coroutine()

此修饰器将在Python3.10中被移除,所以不建议再使用。
此修饰器不能用于async def的协程中。

asyncio.iscoroutine(obj)
如果obj对象是一个coroutine对象,则返回True。
此方法与inspect.iscoroutine()不同,因为它对基于生成器的协程也返回True。

asyncio.iscoroutinefunction(func)
如果func是一个coroutine方法,则返回True。
此方法inspect.iscoroutinefunction()不同,因为它对用@coroutine修饰的基于生成器的协程也返回True。

转自:asyncio异步IO–协程(Coroutine)与任务(Task)详解