SpringBoot利用@Async注解实现异步调用

目录
  1. 1. 一、异步编程
    1. 1.1. 1.1、什么是异步调用?
    2. 1.2. 1.2、如何实现异步调用?
  2. 2. 二、Java 实现异步编程的几种方式
    1. 2.1. 2.1、非Spring项目利用多线程
    2. 2.2. 2.2、Spring项目异步任务处理,@Async的配置和使用
  3. 3. 三、SpringBoot利用@Async注解实现异步调用
    1. 3.1. 3.1、新建配置类,开启@Async功能支持
    2. 3.2. 3.2、在方法上标记异步调用
    3. 3.3. 3.3、在Controller中进行异步方法调用
  4. 4. 四、@Async自定义线程池
    1. 4.1. 4.1、为什么要给@Async自定义线程池?
    2. 4.2. 4.2、为@Async实现一个自定义线程池
    3. 4.3. 4.3、@Async配置默认线程池和多个线程池处理

前言:异步编程是让程序并发运行的一种手段,使用异步编程可以大大提高我们程序的吞吐量,减少用户的等待时间。在Java并发编程中实现异步功能,一般是需要使用线程或者线程池。而实现一个线程,要么继承Thread类,要么实现Runnable接口,然后在run方法中写具体的业务逻辑代码。开发Spring的大神们,为了简化这类异步操作,已经帮我们把异步功能封装好了。Spring中提供了@Async注解,我们可以通过它即可开启异步功能,使用起来非常方便。

一、异步编程

异步编程允许多个事情同时发生,当程序调用需要长时间运行的方法时,它不会阻塞当前的执行流程,程序可以继续运行,当方法执行完成时通知给主线程根据需要获取其执行结果或者失败异常的原因。使用异步编程可以大大提高我们程序的吞吐量,可以更好的面对更高的并发场景并更好的利用现有的系统资源,同时也会一定程度上减少用户的等待时间等。

1.1、什么是异步调用?

异步调用是相对于同步调用而言的,同步调用是指程序按预定顺序一步步执行,每一步必须等到上一步执行完后才能执行,异步调用则无需等待上一步程序执行完即可执行。异步调用可以减少程序执行时间。

1.2、如何实现异步调用?

多线程,这是很多人第一眼想到的关键词,没错,多线程就是一种实现异步调用的方式。

  • 在非spring目项目中我们要实现异步调用的就是使用多线程方式,可以自己实现Runable接口或者集成Thread类,或者使用jdk1.5以上提供了的Executors线程池;
  • 从Spring3开始提供了@Async注解,用于标注某个方法或某个类里面的所有方法都是需要异步处理的。被注解的方法被调用的时候,会在新线程中执行,而调用它的方法会在原来的线程中执行。这样可以避免阻塞、以及保证任务的实时性。适用于处理log、发送邮件、短信……等。

二、Java 实现异步编程的几种方式

2.1、非Spring项目利用多线程

①直接new线程

在 Java 语言中最简单使用异步编程的方式就是创建一个 Thread 来实现,如果你使用的 JDK 版本是 8 以上的话,可以使用 Lambda 表达式会更加简洁。

1
2
3
4
5
6
7
8
Thread t = new Thread()
{
@Override
public void run() {
longTimeMethod();
}
};
t.start();

但是new Thread()只能作为示例使用,如果用到了生产环境发生事故后果自负,使用上面这种 Thread 方式异步编程存在两个明显的问题。

  • 创建线程没有复用。我们知道频繁的线程创建与销毁是需要一部分开销的,而且示例里也没有限制线程的个数,如果使用不当可能会把系统线程用尽,从而引发事故,这个问题使用线程池可以解决。
  • 异步任务无法获取最终的执行结果,可以使用FutureTask 的方式。

②使用线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private ExecutorService executor = Executors.newCachedThreadPool(); 
public void fun() throws Exception {
executor.submit(new Runnable(){
@override
public void run() {
try {
//要执行的业务代码,我们这里没有写方法,可以让线程休息几秒进行测试
Thread.sleep(10000);
System.out.print("睡够啦~");
}catch(Exception e) {
throw new RuntimeException("报错啦!!");
}
}
});
}

Executors是concurrent包下的一个类,为我们提供了创建线程池的简便方法。

Executors可以创建我们常用的四种线程池:
(1)newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。不设上限,提交的任务将立即执行。
(2)newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
(3)newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
(4)newSingleThreadExecutor 创建一个单线程化的线程池执行任务。

submit方法
线程池建立完毕之后,我们就需要往线程池提交任务。通过线程池的submit方法即可,submit方法接收两种Runable和Callable。

区别如下:
Runable是实现该接口的run方法,callable是实现接口的call方法。
callable允许使用返回值,callable允许抛出异常。

2.2、Spring项目异步任务处理,@Async的配置和使用

从Spring3开始提供了@Async注解用于异步方法调用,该注解可以被标注在方法上,以便异步地调用该方法。调用者将在调用时立即返回,方法的实际执行将提交给Spring TaskExecutor的任务中,由指定的线程池中的线程执行。

  • @Async应用默认线程池,指在@Async注解在使用时,不指定线程池的名称。查看源码,@Async的默认线程池为SimpleAsyncTaskExecutor
  • @Async默认异步配置使用的是SimpleAsyncTaskExecutor,该线程池默认来一个任务创建一个线程,若系统中不断的创建线程,最终会导致系统占用内存过高,引发OutOfMemoryError错误。基于默认配置,SimpleAsyncTaskExecutor并不是严格意义的线程池,达不到线程复用的功能。(注:Springboot-2.1.0 以下版本默认使用SimpleAsyncTaskExecutor不是线程池)
  • 在项目应用中,@Async调用线程池,推荐使用自定义线程池的模式。自定义线程池常用方案:重新实现接口AsyncConfigurer。

rex: 结论错误!默认线程池是ThreadPoolTaskExecutor,可同时执行8个线程。

三、SpringBoot利用@Async注解实现异步调用

使用@Async注解开启的异步功能,默认情况下,每次都会创建一个新线程。如果在高并发的场景下,可能会产生大量的线程,从而导致OOM问题。所以,大家在使用@Async注解的异步功能时,请别忘了自定义一个线程池。

3.1、新建配置类,开启@Async功能支持

使用@EnableAsync来开启异步任务支持,@EnableAsync注解可以直接放在SpringBoot启动类上,也可以单独放在其他配置类上。我们这里选择使用单独的配置类SyncConfiguration。

1
2
3
4
5
6
@Configuration
//主要是为了扫描范围包下的所有 @Async注解
@EnableAsync
public class AsyncConfiguration {

}

3.2、在方法上标记异步调用

增加一个Component类,用来进行业务处理,同时添加@Async注解,代表该方法为异步处理;如果标注在类上,则类里面的所有方法都是需要异步处理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
@Slf4j
public class AsyncTask {

@SneakyThrows
@Async
public void doTask1() {
long t1 = System.currentTimeMillis();
Thread.sleep(2000);
//这样直接调用是同步的,异步无效
//doTask2();
long t2 = System.currentTimeMillis();
log.info("task1 cost {} ms" , t2-t1);
}

@SneakyThrows
@Async
public void doTask2() {
long t1 = System.currentTimeMillis();
Thread.sleep(3000);
long t2 = System.currentTimeMillis();
log.info("task2 cost {} ms" , t2-t1);
}
}

直接调用doTask2()没有走Spring的代理类。因为@Async注解的实现是基于Spring的AOP,而AOP的实现是基于动态代理模式实现的。
那么异步无效的原因就很明显了,有可能因为调用方法的是对象本身而不是代理对象,因为没有经过Spring容器。
如果要在doTask1内部调用doTask2方法,正确的方式是在doTask1的内部:

1
2
AsyncTask task = cn.hutool.extra.spring.SpringUtil.getBean(AsyncTask.class);
task.doTask2();

3.3、在Controller中进行异步方法调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncController {
@Autowired
private AsyncTask asyncTask;

@RequestMapping("/task")
public void task() throws InterruptedException {
long t1 = System.currentTimeMillis();
asyncTask.doTask1();
asyncTask.doTask2();
Thread.sleep(1000);
long t2 = System.currentTimeMillis();
log.info("main cost {} ms", t2-t1);
}
}

通过访问http://localhost:8080/async/task查看控制台日志:主线程不需要等待异步方法执行完成,减少了响应时间,提高了接口性能。

1
2
3
2021-11-25 15:48:37 [http-nio-8080-exec-8] INFO AsyncController:26 - main cost 1009 ms
2021-11-25 15:48:38 [task-1] INFO com.async.AsyncTask:22 - task1 cost 2005 ms
2021-11-25 15:48:39 [task-2] INFO com.async.AsyncTask:31 - task2 cost 3005 ms

通过上面三步我们就可以在SpringBoot中欢乐的使用异步方法来提高我们接口性能了,是不是很简单?不过,如果你在实际项目开发中真这样写了,肯定会被老鸟们无情嘲讽?因为上面的代码忽略了一个最大的问题,就是给@Async异步框架自定义线程池

四、@Async自定义线程池

4.1、为什么要给@Async自定义线程池?

使用@Async注解,在默认情况下用的是SimpleAsyncTaskExecutor线程池,该线程池不是真正意义上的线程池 。使用此线程池无法实现线程重用,每次调用都会新建一条线程。若系统中不断的创建线程,最终会导致系统占用内存过高,引发OutOfMemoryError错误。

rex: 结论错误!默认线程池是ThreadPoolTaskExecutor,可同时执行8个线程

关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void execute(Runnable task, long startTimeout)
{
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = this.taskDecorator != null ? this.taskDecorator.decorate(task) : task;
//判断是否开启限流,默认为否
if (this.isThrottleActive() && startTimeout > 0L) {
//执行前置操作,进行限流
this.concurrencyThrottle.beforeAccess();
this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse));
} else {
//未限流的情况,执行线程任务
this.doExecute(taskToUse);
}
}

protected void doExecute(Runnable task) {
//不断创建线程
Thread thread = this.threadFactory != null ? this.threadFactory.newThread(task) : this.createThread(task);
thread.start();
}

//创建线程
public Thread createThread(Runnable runnable) {
//指定线程名,task-1,task-2...
Thread thread = new Thread(this.getThreadGroup(), runnable, this.nextThreadName());
thread.setPriority(this.getThreadPriority());
thread.setDaemon(this.isDaemon());
return thread;
}

我们也可以直接通过上面的控制台日志观察,每次打印的线程名都是[task-1]、[task-2]、[task-3]、[task-4]…..递增的。所以我们在使用Spring中的@Async异步框架时一定要自定义线程池,替代默认的SimpleAsyncTaskExecutor

Spring提供了多种线程池:
SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。
SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方。
ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类
ThreadPoolTaskScheduler:可以使用cron表达式
ThreadPoolTaskExecutor :最常使用,推荐。其实质是对java.util.concurrent.ThreadPoolExecutor的包装

4.2、为@Async实现一个自定义线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Configuration
@EnableAsync
public class SyncConfiguration {
@Bean(name = "asyncPoolTaskExecutor")
public ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//核心线程数
taskExecutor.setCorePoolSize(10);
//线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
taskExecutor.setMaxPoolSize(100);
//缓存队列
taskExecutor.setQueueCapacity(50);
//许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
taskExecutor.setKeepAliveSeconds(200);
//异步方法内部线程名称
taskExecutor.setThreadNamePrefix("async-");
/**
* 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
* 通常有以下四种策略:
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
return taskExecutor;
}
}

配置自定义线程池以后我们就可以大胆的使用@Async提供的异步处理能力了。

4.3、@Async配置默认线程池和多个线程池处理

在现实的互联网项目开发中,针对高并发的请求,一般的做法是高并发接口单独线程池隔离处理。

假设现在2个高并发接口:一个是修改用户信息接口,刷新用户redis缓存;一个是下订单接口,发送app push信息。往往会根据接口特征定义两个线程池,这时候我们在使用@Async时就需要通过指定线程池名称进行区分。

(1)为@Async指定线程池名字

1
2
3
4
5
6
7
8
@SneakyThrows
@Async("asyncPoolTaskExecutor")
public void doTask1() {
long t1 = System.currentTimeMillis();
Thread.sleep(2000);
long t2 = System.currentTimeMillis();
log.info("task1 cost {} ms" , t2-t1);
}

当系统存在多个线程池时,我们也可以配置一个默认线程池,对于非默认的异步任务再通过@Async("otherTaskExecutor")来指定线程池名称。

(2)配置默认线程池

可以修改配置类让其实现AsyncConfigurer,并重写getAsyncExecutor()方法,指定默认线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Configuration
@EnableAsync
@Slf4j
public class AsyncConfiguration implements AsyncConfigurer {

@Bean(name = "asyncPoolTaskExecutor")
public ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//核心线程数
taskExecutor.setCorePoolSize(2);
//线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
taskExecutor.setMaxPoolSize(10);
//缓存队列
taskExecutor.setQueueCapacity(50);
//许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
taskExecutor.setKeepAliveSeconds(200);
//异步方法内部线程名称
taskExecutor.setThreadNamePrefix("async-");
/**
* 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
* 通常有以下四种策略:
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
return taskExecutor;
}

/**
* 指定默认线程池
*/
@Override
public Executor getAsyncExecutor() {
return executor();
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) ->
log.error("线程池执行任务发送未知错误,执行方法:{}",method.getName(),ex);
}
}

如下,doTask1()方法使用默认使用线程池asyncPoolTaskExecutordoTask2()使用线程池otherTaskExecutor,非常灵活。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Async
public void doTask1() {
long t1 = System.currentTimeMillis();
Thread.sleep(2000);
long t2 = System.currentTimeMillis();
log.info("task1 cost {} ms" , t2-t1);
}

@SneakyThrows
@Async("otherTaskExecutor")
public void doTask2() {
long t1 = System.currentTimeMillis();
Thread.sleep(3000);
long t2 = System.currentTimeMillis();
log.info("task2 cost {} ms" , t2-t1);
}

@Async异步方法在日常开发中经常会用到,很有必要掌握。

参考链接:
Java异步调用方法
SpringBoot 如何实现异步编程,老鸟们都这么玩的!
@Async异步执行
Spring使用@Async注解
Java 异步编程的几种方式