SpringBoot使用@Async实现异步任务

目录
  1. 1. 使用@Async实现异步调用
    1. 1.1. 同步调用
    2. 1.2. 异步调用
    3. 1.3. 异步回调
    4. 1.4. 代码示例
  2. 2. 配置@Async异步任务的线程池
    1. 2.1. 配置默认线程池
    2. 2.2. 动手试一试
    3. 2.3. 代码示例
  3. 3. 如何隔离@Async异步任务的线程池
    1. 3.1. 什么是线程池的隔离,为什么要隔离
    2. 3.2. 不同异步任务配置不同线程池
    3. 3.3. 代码示例
  4. 4. 配置线程池的拒绝策略
    1. 4.1. 场景重现
    2. 4.2. 配置拒绝策略
    3. 4.3. 代码示例

使用@Async实现异步调用

什么是“异步调用”?“异步调用”对应的是“同步调用”,同步调用指程序按照定义顺序依次执行,每一行程序都必须等待上一行程序执行完成之后才能执行;异步调用指程序在顺序执行时,不等待异步调用的语句返回结果就执行后面的程序。

同步调用

下面通过一个简单示例来直观的理解什么是同步调用:

定义Task类,创建三个处理函数分别模拟三个执行任务的操作,操作消耗时间随机取(10秒内)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
@Component
public class AsyncTasks {

public static Random random = new Random();

public void doTaskOne() throws Exception {
log.info("开始做任务一");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任务一,耗时:" + (end - start) + "毫秒");
}

public void doTaskTwo() throws Exception {
log.info("开始做任务二");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任务二,耗时:" + (end - start) + "毫秒");
}

public void doTaskThree() throws Exception {
log.info("开始做任务三");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任务三,耗时:" + (end - start) + "毫秒");
}

}

在单元测试用例中,注入Task对象,并在测试用例中执行doTaskOnedoTaskTwodoTaskThree三个函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
@SpringBootTest
public class Chapter75ApplicationTests {

@Autowired
private AsyncTasks asyncTasks;

@Test
public void test() throws Exception {
asyncTasks.doTaskOne();
asyncTasks.doTaskTwo();
asyncTasks.doTaskThree();
}

}

执行单元测试,可以看到类似如下输出:

1
2
3
4
5
6
2021-09-11 23:19:12.922  INFO 92539 --- [           main] com.didispace.chapter75.AsyncTasks       : 开始做任务一
2021-09-11 23:19:17.788 INFO 92539 --- [ main] com.didispace.chapter75.AsyncTasks : 完成任务一,耗时:4865毫秒
2021-09-11 23:19:17.788 INFO 92539 --- [ main] com.didispace.chapter75.AsyncTasks : 开始做任务二
2021-09-11 23:19:24.851 INFO 92539 --- [ main] com.didispace.chapter75.AsyncTasks : 完成任务二,耗时:7063毫秒
2021-09-11 23:19:24.851 INFO 92539 --- [ main] com.didispace.chapter75.AsyncTasks : 开始做任务三
2021-09-11 23:19:26.928 INFO 92539 --- [ main] com.didispace.chapter75.AsyncTasks : 完成任务三,耗时:2076毫秒

任务一、任务二、任务三顺序的执行完了,换言之doTaskOne、doTaskTwo、doTaskThree三个函数顺序的执行完成。

异步调用

上述的同步调用虽然顺利的执行完了三个任务,但是可以看到执行时间比较长,若这三个任务本身之间不存在依赖关系,可以并发执行的话,同步调用在执行效率方面就比较差,可以考虑通过异步调用的方式来并发执行。

在Spring Boot中,我们只需要通过使用@Async注解就能简单的将原来的同步函数变为异步函数,Task类改在为如下模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
@Component
public class AsyncTasks {

public static Random random = new Random();

@Async
public void doTaskOne() throws Exception {
log.info("开始做任务一");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任务一,耗时:" + (end - start) + "毫秒");
}

@Async
public void doTaskTwo() throws Exception {
log.info("开始做任务二");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任务二,耗时:" + (end - start) + "毫秒");
}

@Async
public void doTaskThree() throws Exception {
log.info("开始做任务三");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任务三,耗时:" + (end - start) + "毫秒");
}

}

为了让@Async注解能够生效,还需要在Spring Boot的主程序中配置@EnableAsync,如下所示:

1
2
3
4
5
6
7
8
9
@EnableAsync
@SpringBootApplication
public class Chapter75Application {

public static void main(String[] args) {
SpringApplication.run(Chapter75Application.class, args);
}

}

此时可以反复执行单元测试,您可能会遇到各种不同的结果,比如:

  • 没有任何任务相关的输出
  • 有部分任务相关的输出
  • 乱序的任务相关的输出
  • 原因是目前doTaskOnedoTaskTwodoTaskThree三个函数的时候已经是异步执行了。主程序在异步调用之后,主程序并不会理会这三个函数是否执行完成了,由于没有其他需要执行的内容,所以程序就自动结束了,导致了不完整或是没有输出任务相关内容的情况。

注:@Async所修饰的函数不要定义为static类型,这样异步调用不会生效

异步回调

为了让doTaskOnedoTaskTwodoTaskThree能正常结束,假设我们需要统计一下三个任务并发执行共耗时多少,这就需要等到上述三个函数都完成调动之后记录时间,并计算结果。

那么我们如何判断上述三个异步调用是否已经执行完成呢?我们需要使用CompletableFuture<T>来返回异步调用的结果,就像如下方式改造doTaskOne函数:

1
2
3
4
5
6
7
8
9
@Async
public CompletableFuture<String> doTaskOne() throws Exception {
log.info("开始做任务一");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任务一,耗时:" + (end - start) + "毫秒");
return CompletableFuture.completedFuture("任务一完成");
}

按照如上方式改造一下其他两个异步函数之后,下面我们改造一下测试用例,让测试在等待完成三个异步调用之后来做一些其他事情。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void test() throws Exception {
long start = System.currentTimeMillis();

CompletableFuture<String> task1 = asyncTasks.doTaskOne();
CompletableFuture<String> task2 = asyncTasks.doTaskTwo();
CompletableFuture<String> task3 = asyncTasks.doTaskThree();

CompletableFuture.allOf(task1, task2, task3).join();

long end = System.currentTimeMillis();

log.info("任务全部完成,总耗时:" + (end - start) + "毫秒");
}

看看我们做了哪些改变:

  • 在测试用例一开始记录开始时间
  • 在调用三个异步函数的时候,返回CompletableFuture<String>类型的结果对象
  • 通过CompletableFuture.allOf(task1, task2, task3).join()实现三个异步任务都结束之前的阻塞效果
  • 三个任务都完成之后,根据结束时间 - 开始时间,计算出三个任务并发执行的总耗时。

执行一下上述的单元测试,可以看到如下结果:

1
2
3
4
5
6
7
8
2021-09-11 23:33:38.842  INFO 95891 --- [         task-3] com.didispace.chapter75.AsyncTasks       : 开始做任务三
2021-09-11 23:33:38.842 INFO 95891 --- [ task-2] com.didispace.chapter75.AsyncTasks : 开始做任务二
2021-09-11 23:33:38.842 INFO 95891 --- [ task-1] com.didispace.chapter75.AsyncTasks : 开始做任务一
2021-09-11 23:33:45.155 INFO 95891 --- [ task-2] com.didispace.chapter75.AsyncTasks : 完成任务二,耗时:6312毫秒
2021-09-11 23:33:47.308 INFO 95891 --- [ task-3] com.didispace.chapter75.AsyncTasks : 完成任务三,耗时:8465毫秒
2021-09-11 23:33:47.403 INFO 95891 --- [ task-1] com.didispace.chapter75.AsyncTasks : 完成任务一,耗时:8560毫秒
2021-09-11 23:33:47.404 INFO 95891 --- [ main] c.d.chapter75.Chapter75ApplicationTests : 任务全部完成,总耗时:8590毫秒

可以看到,通过异步调用,让任务一、二、三并发执行,有效的减少了程序的总运行时间。

本系列教程《Spring Boot 2.x基础教程》点击直达!,欢迎收藏与转发!如果学习过程中如遇困难?可以加入我们的 Spring技术交流群,参与交流与讨论,更好的学习与进步!

代码示例

本文的完整工程可以查看下面仓库中2.x目录下的chapter7-5工程:

配置@Async异步任务的线程池

上一篇我们介绍了如何使用@Async注解来创建异步任务,我可以用这种方法来实现一些并发操作,以加速任务的执行效率。但是,如果只是如前文那样直接简单的创建来使用,可能还是会碰到一些问题。存在有什么问题呢?先来思考下,下面的这个接口,通过异步任务加速执行的实现,是否存在问题或风险呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RestController
public class HelloController {

@Autowired
private AsyncTasks asyncTasks;

@GetMapping("/hello")
public String hello() {
// 将可以并行的处理逻辑,拆分成三个异步任务同时执行
CompletableFuture<String> task1 = asyncTasks.doTaskOne();
CompletableFuture<String> task2 = asyncTasks.doTaskTwo();
CompletableFuture<String> task3 = asyncTasks.doTaskThree();

CompletableFuture.allOf(task1, task2, task3).join();
return "Hello World";
}
}

虽然,从单次接口调用来说,是没有问题的。但当接口被客户端频繁调用的时候,异步任务的数量就会大量增长:3 x n(n为请求数量),如果任务处理不够快,就很可能会出现内存溢出的情况。那么为什么会内存溢出呢?根本原因是由于Spring Boot默认用于异步任务的线程池是这样配置的:

图中我标出的两个重要参数是需要关注的:

  • queueCapacity:缓冲队列的容量,默认为INT的最大值(2的31次方-1)。
  • maxSize:允许的最大线程数,默认为INT的最大值(2的31次方-1)。

所以,默认情况下,一般任务队列就可能把内存给堆满了。所以,我们真正使用的时候,还需要对异步任务的执行线程池做一些基础配置,以防止出现内存溢出导致服务不可用的问题。

配置默认线程池

默认线程池的配置很简单,只需要在配置文件中完成即可,主要有以下这些参数:

1
2
3
4
5
6
7
8
spring.task.execution.pool.core-size=2 # 线程池创建时的初始化线程数,默认为8
spring.task.execution.pool.max-size=5 # 线程池的最大线程数,默认为int最大值
spring.task.execution.pool.queue-capacity=10 # 用来缓冲执行任务的队列,默认为int最大值
spring.task.execution.pool.keep-alive=60s # 线程终止前允许保持空闲的时间
spring.task.execution.pool.allow-core-thread-timeout=true # 线程终止前允许保持空闲的时间
spring.task.execution.shutdown.await-termination=false # 是否等待剩余任务完成后才关闭应用
spring.task.execution.shutdown.await-termination-period= # 等待剩余任务完成的最大时间
spring.task.execution.thread-name-prefix=task- # 线程名的前缀,设置好了之后可以方便我们在日志中查看处理任务所在的线程池

动手试一试

我们直接基于之前chapter7-5的结果来进行如下操作。

首先,在没有进行线程池配置之前,可以先执行一下单元测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void test1() throws Exception {
long start = System.currentTimeMillis();

CompletableFuture<String> task1 = asyncTasks.doTaskOne();
CompletableFuture<String> task2 = asyncTasks.doTaskTwo();
CompletableFuture<String> task3 = asyncTasks.doTaskThree();

CompletableFuture.allOf(task1, task2, task3).join();

long end = System.currentTimeMillis();

log.info("任务全部完成,总耗时:" + (end - start) + "毫秒");
}

由于默认线程池的核心线程数是8,所以3个任务会同时开始执行,日志输出是这样的:

1
2
3
4
5
6
7
2021-09-15 00:30:14.819  INFO 77614 --- [         task-2] com.didispace.chapter76.AsyncTasks       : 开始做任务二
2021-09-15 00:30:14.819 INFO 77614 --- [ task-3] com.didispace.chapter76.AsyncTasks : 开始做任务三
2021-09-15 00:30:14.819 INFO 77614 --- [ task-1] com.didispace.chapter76.AsyncTasks : 开始做任务一
2021-09-15 00:30:15.491 INFO 77614 --- [ task-2] com.didispace.chapter76.AsyncTasks : 完成任务二,耗时:672毫秒
2021-09-15 00:30:19.496 INFO 77614 --- [ task-3] com.didispace.chapter76.AsyncTasks : 完成任务三,耗时:4677毫秒
2021-09-15 00:30:20.443 INFO 77614 --- [ task-1] com.didispace.chapter76.AsyncTasks : 完成任务一,耗时:5624毫秒
2021-09-15 00:30:20.443 INFO 77614 --- [ main] c.d.chapter76.Chapter76ApplicationTests : 任务全部完成,总耗时:5653毫秒

接着,可以尝试在配置文件中增加如下的线程池配置

1
2
3
4
5
6
spring.task.execution.pool.core-size=2
spring.task.execution.pool.max-size=5
spring.task.execution.pool.queue-capacity=10
spring.task.execution.pool.keep-alive=60s
spring.task.execution.pool.allow-core-thread-timeout=true
spring.task.execution.thread-name-prefix=task-

日志输出的顺序会变成如下的顺序:

1
2
3
4
5
6
7
2021-09-15 00:31:50.013  INFO 77985 --- [         task-1] com.didispace.chapter76.AsyncTasks       : 开始做任务一
2021-09-15 00:31:50.013 INFO 77985 --- [ task-2] com.didispace.chapter76.AsyncTasks : 开始做任务二
2021-09-15 00:31:52.452 INFO 77985 --- [ task-1] com.didispace.chapter76.AsyncTasks : 完成任务一,耗时:2439毫秒
2021-09-15 00:31:52.452 INFO 77985 --- [ task-1] com.didispace.chapter76.AsyncTasks : 开始做任务三
2021-09-15 00:31:55.880 INFO 77985 --- [ task-2] com.didispace.chapter76.AsyncTasks : 完成任务二,耗时:5867毫秒
2021-09-15 00:32:00.346 INFO 77985 --- [ task-1] com.didispace.chapter76.AsyncTasks : 完成任务三,耗时:7894毫秒
2021-09-15 00:32:00.347 INFO 77985 --- [ main] c.d.chapter76.Chapter76ApplicationTests : 任务全部完成,总耗时:10363毫秒
  • 任务一和任务二会马上占用核心线程,任务三进入队列等待
  • 任务一完成,释放出一个核心线程,任务三从队列中移出,并占用核心线程开始处理

注意:这里可能有的小伙伴会问,最大线程不是5么,为什么任务三是进缓冲队列,不是创建新线程来处理吗?这里要理解缓冲队列与最大线程间的关系:只有在缓冲队列满了之后才会申请超过核心线程数的线程来进行处理。所以,这里只有缓冲队列中10个任务满了,再来第11个任务的时候,才会在线程池中创建第三个线程来处理。这个这里就不具体写列子了,读者可以自己调整下参数,或者调整下单元测试来验证这个逻辑。

好了,今天的学习就到这里!如果您学习过程中如遇困难?可以加入我们超高质量的Spring技术交流群,参与交流与讨论,更好的学习与进步!更多 Spring Boot教程可以点击直达!,欢迎收藏与转发支持!

代码示例

本文的完整工程可以查看下面仓库中2.x目录下的chapter7-6工程:

如何隔离@Async异步任务的线程池

通过上一篇:配置@Async异步任务的线程池的介绍,你应该已经了解到异步任务的执行背后有一个线程池来管理执行任务。为了控制异步任务的并发不影响到应用的正常运作,我们必须要对线程池做好相应的配置,防止资源的过渡使用。除了默认线程池的配置之外,还有一类场景,也是很常见的,那就是多任务情况下的线程池隔离。

什么是线程池的隔离,为什么要隔离

可能有的小伙伴还不太了解什么是线程池的隔离,为什么要隔离?。所以,我们先来看看下面的场景案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@RestController
public class HelloController {

@Autowired
private AsyncTasks asyncTasks;

@GetMapping("/api-1")
public String taskOne() {
CompletableFuture<String> task1 = asyncTasks.doTaskOne("1");
CompletableFuture<String> task2 = asyncTasks.doTaskOne("2");
CompletableFuture<String> task3 = asyncTasks.doTaskOne("3");

CompletableFuture.allOf(task1, task2, task3).join();
return "";
}

@GetMapping("/api-2")
public String taskTwo() {
CompletableFuture<String> task1 = asyncTasks.doTaskTwo("1");
CompletableFuture<String> task2 = asyncTasks.doTaskTwo("2");
CompletableFuture<String> task3 = asyncTasks.doTaskTwo("3");

CompletableFuture.allOf(task1, task2, task3).join();
return "";
}

}

上面的代码中,有两个API接口,这两个接口的具体执行逻辑中都会把执行过程拆分为三个异步任务来实现。

好了,思考一分钟,想一下。如果这样实现,会有什么问题吗?

上面这段代码,在API请求并发不高,同时如果每个任务的处理速度也够快的时候,是没有问题的。但如果并发上来或其中某几个处理过程扯后腿了的时候。这两个提供不相干服务的接口可能会互相影响。比如:假设当前线程池配置的最大线程数有2个,这个时候/api-1接口中task1和task2处理速度很慢,阻塞了;那么此时,当用户调用api-2接口的时候,这个服务也会阻塞!

造成这种现场的原因是:默认情况下,所有用@Async创建的异步任务都是共用的一个线程池,所以当有一些异步任务碰到性能问题的时候,是会直接影响其他异步任务的。

为了解决这个问题,我们就需要对异步任务做一定的线程池隔离,让不同的异步任务互不影响。

不同异步任务配置不同线程池

下面,我们就来实际操作一下!

第一步:初始化多个线程池,比如下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@EnableAsync
@Configuration
public class TaskPoolConfig {

@Bean
public Executor taskExecutor1() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(10);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("executor-1-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
/*
在一些场景下,若需要在关闭线程池时等待当前调度任务完成后才开始关闭,可以通过简单的配置,进行优雅的停机策略配置。
关键就是通过setWaitForTasksToCompleteOnShutdown(true)和setAwaitTerminationSeconds方法。
- setWaitForTasksToCompleteOnShutdown:表明等待所有线程执行完,默认为false。
- setAwaitTerminationSeconds:等待的时间,因为不能无限的等待下去。
*/
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(1800);
return executor;
}

@Bean
public Executor taskExecutor2() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(10);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("executor-2-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}

注意:这里特地用executor.setThreadNamePrefix设置了线程名的前缀,这样可以方便观察后面具体执行的顺序。
通过实际测试所得结论:我在for循环里调用100次异步任务,异步任务里sleep-10秒钟,然后打印一句话。程序启动后,这句话最多打印QueueCapacity+setMaxPoolSize+1次,比如taskExecutor1线程池最多打印15次,然后程序就阻塞了。之后每消费完一个任务,才会再往队列里添加一个任务,直到100个任务全部完成。

第二步:创建异步任务,并指定要使用的线程池名称

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Slf4j
@Component
public class AsyncTasks {

public static Random random = new Random();

@Async("taskExecutor1")
public CompletableFuture<String> doTaskOne(String taskNo) throws Exception {
log.info("开始任务:{}", taskNo);
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任务:{},耗时:{} 毫秒", taskNo, end - start);
return CompletableFuture.completedFuture("任务完成");
}

@Async("taskExecutor2")
public CompletableFuture<String> doTaskTwo(String taskNo) throws Exception {
log.info("开始任务:{}", taskNo);
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任务:{},耗时:{} 毫秒", taskNo, end - start);
return CompletableFuture.completedFuture("任务完成");
}

}

这里@Async注解中定义的taskExecutor1taskExecutor2就是线程池的名字。由于在第一步中,我们没有具体写两个线程池Bean的名称,所以默认会使用方法名,也就是taskExecutor1taskExecutor2

第三步:写个单元测试来验证下,比如下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
@SpringBootTest
public class Chapter77ApplicationTests {

@Autowired
private AsyncTasks asyncTasks;

@Test
public void test() throws Exception {
long start = System.currentTimeMillis();

// 线程池1
CompletableFuture<String> task1 = asyncTasks.doTaskOne("1");
CompletableFuture<String> task2 = asyncTasks.doTaskOne("2");
CompletableFuture<String> task3 = asyncTasks.doTaskOne("3");

// 线程池2
CompletableFuture<String> task4 = asyncTasks.doTaskTwo("4");
CompletableFuture<String> task5 = asyncTasks.doTaskTwo("5");
CompletableFuture<String> task6 = asyncTasks.doTaskTwo("6");

// 一起执行
CompletableFuture.allOf(task1, task2, task3, task4, task5, task6).join();

long end = System.currentTimeMillis();

log.info("任务全部完成,总耗时:" + (end - start) + "毫秒");
}

}

在上面的单元测试中,一共启动了6个异步任务,前三个用的是线程池1,后三个用的是线程池2。

先不执行,根据设置的核心线程2和最大线程数2,来分析一下,大概会是怎么样的执行情况?

  1. 线程池1的三个任务,task1和task2会先获得执行线程,然后task3因为没有可分配线程进入缓冲队列
  2. 线程池2的三个任务,task4和task5会先获得执行线程,然后task6因为没有可分配线程进入缓冲队列
  3. 任务task3会在task1或task2完成之后,开始执行
  4. 任务task6会在task4或task5完成之后,开始执行

分析好之后,执行下单元测试,看看是否是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
2021-09-15 23:45:11.369  INFO 61670 --- [   executor-1-1] com.didispace.chapter77.AsyncTasks       : 开始任务:1
2021-09-15 23:45:11.369 INFO 61670 --- [ executor-2-2] com.didispace.chapter77.AsyncTasks : 开始任务:5
2021-09-15 23:45:11.369 INFO 61670 --- [ executor-2-1] com.didispace.chapter77.AsyncTasks : 开始任务:4
2021-09-15 23:45:11.369 INFO 61670 --- [ executor-1-2] com.didispace.chapter77.AsyncTasks : 开始任务:2
2021-09-15 23:45:15.905 INFO 61670 --- [ executor-2-1] com.didispace.chapter77.AsyncTasks : 完成任务:4,耗时:4532 毫秒
2021-09-15 23:45:15.905 INFO 61670 --- [ executor-2-1] com.didispace.chapter77.AsyncTasks : 开始任务:6
2021-09-15 23:45:18.263 INFO 61670 --- [ executor-1-2] com.didispace.chapter77.AsyncTasks : 完成任务:2,耗时:6890 毫秒
2021-09-15 23:45:18.263 INFO 61670 --- [ executor-1-2] com.didispace.chapter77.AsyncTasks : 开始任务:3
2021-09-15 23:45:18.896 INFO 61670 --- [ executor-2-2] com.didispace.chapter77.AsyncTasks : 完成任务:5,耗时:7523 毫秒
2021-09-15 23:45:19.842 INFO 61670 --- [ executor-1-2] com.didispace.chapter77.AsyncTasks : 完成任务:3,耗时:1579 毫秒
2021-09-15 23:45:20.551 INFO 61670 --- [ executor-1-1] com.didispace.chapter77.AsyncTasks : 完成任务:1,耗时:9178 毫秒
2021-09-15 23:45:24.117 INFO 61670 --- [ executor-2-1] com.didispace.chapter77.AsyncTasks : 完成任务:6,耗时:8212 毫秒
2021-09-15 23:45:24.117 INFO 61670 --- [ main] c.d.chapter77.Chapter77ApplicationTests : 任务全部完成,总耗时:12762毫秒

好了,今天的学习就到这里!如果您学习过程中如遇困难?可以加入我们超高质量的Spring技术交流群,参与交流与讨论,更好的学习与进步!更多Spring Boot教程可以点击直达!,欢迎收藏与转发支持!

代码示例

本文的完整工程可以查看下面仓库中2.x目录下的chapter7-7工程:

配置线程池的拒绝策略

通过之前三篇关于Spring Boot异步任务实现的博文,我们分别学会了用@Async创建异步任务为异步任务配置线程池使用多个线程池隔离不同的异步任务。今天这篇,我们继续对上面的知识进行完善和优化!

如果你已经看过上面几篇内容并已经掌握之后,一起来思考下面这个问题:

假设,线程池配置为核心线程数2、最大线程数2、缓冲队列长度2。此时,有5个异步任务同时开始,会发生什么?

场景重现

我们先来把上面的假设用代码实现一下:

第一步:创建Spring Boot应用,根据上面的假设写好线程池配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@EnableAsync
@SpringBootApplication
public class Chapter78Application {

public static void main(String[] args) {
SpringApplication.run(Chapter78Application.class, args);
}

@EnableAsync
@Configuration
class TaskPoolConfig {

@Bean
public Executor taskExecutor1() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(2);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("executor-1-");
return executor;
}

}

}

第二步:用@Async注解实现一个部分任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Slf4j
@Component
public class AsyncTasks {

public static Random random = new Random();

@Async("taskExecutor1")
public CompletableFuture<String> doTaskOne(String taskNo) throws Exception {
log.info("开始任务:{}", taskNo);
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任务:{},耗时:{} 毫秒", taskNo, end - start);
return CompletableFuture.completedFuture("任务完成");
}

}

第三步:编写测试用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
@SpringBootTest
public class Chapter78ApplicationTests {

@Autowired
private AsyncTasks asyncTasks;

@Test
public void test2() throws Exception {
// 线程池配置:core-2,max-2,queue=2,同时有5个任务,出现下面异常:
// org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@59901c4d[Running, pool size = 2,
// active threads = 0, queued tasks = 2, completed tasks = 4]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@408e96d9

long start = System.currentTimeMillis();

// 线程池1
CompletableFuture<String> task1 = asyncTasks.doTaskOne("1");
CompletableFuture<String> task2 = asyncTasks.doTaskOne("2");
CompletableFuture<String> task3 = asyncTasks.doTaskOne("3");
CompletableFuture<String> task4 = asyncTasks.doTaskOne("4");
CompletableFuture<String> task5 = asyncTasks.doTaskOne("5");

// 一起执行
CompletableFuture.allOf(task1, task2, task3, task4, task5).join();

long end = System.currentTimeMillis();

log.info("任务全部完成,总耗时:" + (end - start) + "毫秒");
}

}

执行一下,可以类似下面这样的日志信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2021-09-22 17:33:08.159  INFO 21119 --- [   executor-1-2] com.didispace.chapter78.AsyncTasks       : 开始任务:2
2021-09-22 17:33:08.159 INFO 21119 --- [ executor-1-1] com.didispace.chapter78.AsyncTasks : 开始任务:1

org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@64968732

at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324)
at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:274)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at com.didispace.chapter78.AsyncTasks$$EnhancerBySpringCGLIB$$c7e8d57b.doTaskOne(<generated>)
at com.didispace.chapter78.Chapter78ApplicationTests.test2(Chapter78ApplicationTests.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
......
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@64968732 rejected from java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
... 74 more

从异常信息org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task:中,可以很明确的知道,第5个任务因为超过了执行线程+缓冲队列长度,而被拒绝了。

所有,默认情况下,线程池的拒绝策略是:当线程池队列满了,会丢弃这个任务,并抛出异常。

配置拒绝策略

虽然线程池有默认的拒绝策略,但实际开发过程中,有些业务场景,直接拒绝的策略往往并不适用,有时候我们可能会选择舍弃最早开始执行而未完成的任务、也可能会选择舍弃刚开始执行而未完成的任务等更贴近业务需要的策略。所以,为线程池配置其他拒绝策略或自定义拒绝策略是很常见的需求,那么这个要怎么实现呢?

下面就来具体说说今天的正题,如何为线程池配置拒绝策略、如何自定义拒绝策略。

看下面这段代码的最后一行,setRejectedExecutionHandler方法就是为线程池设置拒绝策略的方法:

1
2
3
4
5
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

//...其他线程池配置

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

ThreadPoolExecutor中提供了4种线程的策略可以供开发者直接使用,你只需要像下面这样设置即可:

1
2
3
4
5
6
7
8
9
10
11
// AbortPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

// DiscardPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

// DiscardOldestPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

// CallerRunsPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

这四个策略对应的含义分别是:

  • AbortPolicy策略:默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
  • DiscardPolicy策略:如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。
  • DiscardOldestPolicy策略:如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列。
  • CallerRunsPolicy策略:如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行。

而如果你要自定义一个拒绝策略,那么可以这样写:

1
2
3
4
5
6
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 拒绝策略的逻辑
}
});

当然如果你喜欢用Lamba表达式,也可以这样写:

1
2
3
executor.setRejectedExecutionHandler((r, executor1) -> {
// 拒绝策略的逻辑
});

好了,今天的学习就到这里!

如果您学习过程中如遇困难?可以加入我们超高质量的Spring技术交流群,参与交流与讨论,更好的学习与进步!更多Spring Boot教程可以点击直达!,欢迎收藏与转发支持!

代码示例

本文的完整工程可以查看下面仓库中2.x目录下的chapter7-8工程: