SpringBoot线程池异步调用

SpringBoot线程池异步调用

异步调用就是在不阻塞主线程的情况下执行高耗时方法
在Springboot中启用异步方法
需要4个注解

@EnableAsync 开启异步
@Component 注册异步组件
@Async 标注异步方法
@Autowired 注入异步组件

异步线程需要在启动类上加上@EnableAsync注解(没有 @Async则不会起效):

@EnableAsync

线程池的使用:

springboot的线程池配置

1. 创建一个配置类ExecutorConfig

用来定义如何创建一个ThreadPoolTaskExecutor,要使用@Configuration和@EnableAsync这两个注解,表示这是个配置类,并且是线程池的配置类,如下所示:



import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author Judge
 * @date 2020/11/3 17:00
 */

@Configuration
@EnableAsync
public class ExecutorConfig {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);

    @Bean
    public Executor asyncServiceExecutor() {
        logger.info("start asyncServiceExecutor");
        //使用VisiableThreadPoolTaskExecutor
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(5);
        //配置最大线程数
        executor.setMaxPoolSize(10);
        // 线程最大空闲时间
//        executor.setKeepAliveSeconds(300);
        //配置队列大小
        executor.setQueueCapacity(100);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-service-");

     // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
//        AbortPolicy,用于被拒绝任务的处理程序,它将抛出RejectedExecutionException。
//        CallerRunsPolicy,用于被拒绝任务的处理程序,它直接在execute方法的调用线程中运行被拒绝的任务。
//        DiscardOldestPolicy,用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试execute。
//        DiscardPolicy,用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务。
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }

  

}


2.创建Service层的接口和实现

创建一个service层的接口AsyncService,如下:

/**
 * @author Judge
 * @date 2020/11/3 16:58
 */
public interface AsyncService {

    /**
     * 执行异步任务
     */
    void executeAsync(String a);

}


3.将Service层的服务异步化

打开AsyncServiceImpl.java,在executeAsync方法上增加注解@Async(“asyncServiceExecutor”),asyncServiceExecutor是前面ExecutorConfig.java中的方法名,表明executeAsync方法进入的线程池是asyncServiceExecutor方法创建的,如下:

对应的AsyncServiceImpl,实现如下:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * @author Judge
 * @date 2020/11/3 16:59
 */
@Service
public class AsyncServiceImpl implements AsyncService {

    private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);


    @Override
    @Async("asyncServiceExecutor")
    public void executeAsync() {
        logger.info("start executeAsync");
        try{
        	// 逻辑代码处理
            Thread.sleep(2000);
        }catch(Exception e){
            e.printStackTrace();
        }
        logger.info("end executeAsync");
    }
}

如需测试异步线程是否起效,可添加Thread.sleep(2000),睡眠2秒

4.创建控制器controller

创建一个controller为Hello,里面定义一个http接口,做的事情是调用Service层的服务,如下:

@RestController
public class Hello {

private static final Logger logger = LoggerFactory.getLogger(Hello.class);

@Autowired
private AsyncService asyncService;

@RequestMapping("/")
public String submit(){
    logger.info("---------------start------------");
    //调用服务service层的任务
    asyncService.executeAsync();
    logger.info("---------end---------");

    return "Hello World";
}

}

-可自行启动项目访问控制器验证效果-

5.扩展ThreadPoolTaskExecutor

虽然我们已经用上了线程池,但是还不清楚线程池当时的情况,有多少线程在执行,多少在队列中等待呢?这里我创建了一个ThreadPoolTaskExecutor的子类,在每次提交线程的时候都会将当前线程池的运行状况打印出来,代码如下:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 若是想在使用连接池的时候,打印出连接池的各项参数,应当如何设置:
 *
 * @author Judge
 * @date 2020/11/3 17:08
 */
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);

    private void showThreadPoolInfo(String prefix) {
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if (null == threadPoolExecutor) {
            return;
        }

//        logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
        logger.info("{}, {},提交数量 [{}], 完成 [{}], 处理中 [{}], 等待 [{}]",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}

如上所示,showThreadPoolInfo方法中将任务总数、已完成数、活跃线程数,队列大小都打印出来了,然后Override了父类的execute、submit等方法,在里面调用showThreadPoolInfo方法,这样每次有任务被提交到线程池的时候,都会将当前线程池的基本情况打印到日志中;

注意需要修改原ExeutorConfig的配置
修改ExecutorConfig.java的asyncServiceExecutor方法,

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()
改为
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(),
如下所示:

``` 
@Bean
public Executor asyncServiceExecutor() {
    logger.info("start asyncServiceExecutor");
    //使用VisiableThreadPoolTaskExecutor
    ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
    //配置核心线程数
    executor.setCorePoolSize(5);
    //配置最大线程数
    executor.setMaxPoolSize(10);
    // 线程最大空闲时间
// executor.setKeepAliveSeconds(300);
    //配置队列大小
    executor.setQueueCapacity(100);
    //配置线程池中的线程的名称前缀
    executor.setThreadNamePrefix("async-service-");

    // rejection-policy:当pool已经达到max size的时候,如何处理新任务
    // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    //执行初始化
    executor.initialize();
    return executor;
}

再次启动该工程,访问上面创建的控制器方法,看到的日志如下:

2020-11-04 10:28:51.677  INFO 106996 --- [main] c.t.conf.VisiableThreadPoolTaskExecutor  : 
async-service-test-, 2. do submit,提交数量 [9], 完成 [0], 处理中 [2], 等待 [7]

这说明提交任务到线程池的时候,调用的是submit(Callable task)这个方法,当前已经提交了19个任务,完成了0个,当前有2个线程在处理任务,还剩7个任务在队列中等待,线程池的基本情况一路了然;

注:

在类内创建一个异步方法,打上Async 标记。这个方法必须是实例方法。
然后就跟注入Service一样一样的了。
注意:事务调用
在Async 方法上标注@Transactional是无效的,只有在Service上标注@Transactional 有效。
异步方法不支持内部调用,也就是异步方法不能写在需要调用他的类的内部。
比如Class A 有a,b,c。b有Async标注。此时a对b的异步调用是失效的。
为什么异步方法必须是实例方法

GAME OVER.

Copyright: 采用 知识共享署名4.0 国际许可协议进行许可

Links: https://qingjiaqi.com/2020/11/03/springboot线程池异步调用

Buy me a cup of coffee ☕.