CompletableFuture 和@Async 配置自定义线程池

介绍,配置自定义线程池,可以处理线程异常情况 ,不配置,都走自带的线程池,不好。下面是自定义线程池的方法,可以把@Bean放到配置类中。在丰富一下线程池的几个参数,建好对应的表存入异常的执行任务。

【1】TaskExecutor

Spring异步线程池的接口类,其实质是java.util.concurrent.Executor。

Spring 已经实现的异常线程池:

① SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。

② SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作,只适用于不需要多线程的地方。

③ ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类 。

④ SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类。

⑤ ThreadPoolTaskExecutor :最常使用,推荐。 其实质是对java.util.concurrent.ThreadPoolExecutor的包装。
 

代码:

启动开启异步:

@SpringBootApplication
//开启才可以
@EnableAsync
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}
package com.example.nacosdemo;

import com.example.nacosdemo.service.AsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.*;

@RestController
public class TestController {

    //============================================CompletableFuture=====配置自定义的线程池================================
    @Qualifier("selfThreadPoolExecutor")

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
    @Autowired
    private AsyncService asyncService;

    @RequestMapping("/getThreadPool")
    public String getThreadPool() throws ExecutionException, InterruptedException {
        System.out.println("getsgg..."+Thread.currentThread().getName());
        //1.无返回值的异步任务 runAsync()
        Executor service;
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("线程号为***" + Thread.currentThread().getId()+"线程名称..."+Thread.currentThread().getName());
            int i = 5;
            System.out.println("---------" + i);
        }, threadPoolTaskExecutor);
        System.out.println("结果是+=="+ voidCompletableFuture.get());
        return "ok";
    }

//============================================@Async=====配置自定义的线程池================================
    /**
     * 测试异步执行线程池
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @RequestMapping("/getOk")
    public String getDe() throws ExecutionException, InterruptedException {
        System.out.println("主线程名称..."+Thread.currentThread().getName());
        Future<String> future = asyncService.asyncInvokeReturnFuture(5);
        System.out.println("结果是+=="+ future.get());
        return "ok";
    }




}
package com.example.nacosdemo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class PoolConfig {
    @Bean
    public ThreadPoolTaskExecutor selfThreadPoolExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //设置线程名称
        executor.setThreadNamePrefix("completableFuture--pool");
        //设置最大线程数
        executor.setMaxPoolSize(50);
        //设置核心线程数
        executor.setCorePoolSize(10);
        //设置线程空闲时间,默认60
        executor.setKeepAliveSeconds(60);
        //设置队列容量
        executor.setQueueCapacity(1000);

        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // .....任务过多时,存入数据库,定时执行,还是...
            }
        });
        // 使用预定义的异常处理类
        // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        return executor;
    }

    /**
     * 自定义异步线程池
     * @return
     */
    @Bean
    public AsyncTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //设置线程名称
        executor.setThreadNamePrefix("Anno-Executor");
        //设置最大线程数
        executor.setMaxPoolSize(50);
        //设置核心线程数
        executor.setCorePoolSize(10);
        //设置线程空闲时间,默认60
        executor.setKeepAliveSeconds(60);
        //设置队列容量
        executor.setQueueCapacity(1000);

        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // .....任务过多时,存入数据库,定时执行,还是...
            }
        });
        // 使用预定义的异常处理类
        // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        return executor;
    }
}

package com.example.nacosdemo.service;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.Future;
@Service
public class AsyncService {
    /**
     * 异步调用返回Future
     *
     * @param i
     * @return
     */
    @Async
    public Future<String> asyncInvokeReturnFuture(int i) {
        System.out.println("进入asyncInvokeReturnFuture...线程名称=="+Thread.currentThread().getName());
        Future<String> future;
        try {
            Thread.sleep(3000);
            System.out.println("3S后asyncInvokeReturnFuture数据开始处理中。。");
            future = new AsyncResult<String>("success:" + i);
        } catch (InterruptedException e) {
            future = new AsyncResult<String>("error");
        }
        return future;
    }

}

2. 执行结果:

completableFuture的执行结果

getsgg...http-nio-8080-exec-3
线程号为***36线程名称...completableFuture--pool1
---------5
结果是+==null
getsgg...http-nio-8080-exec-4
线程号为***38线程名称...completableFuture--pool2
---------5
结果是+==null
getsgg...http-nio-8080-exec-5
线程号为***40线程名称...completableFuture--pool3
---------5
结果是+==null
getsgg...http-nio-8080-exec-6
线程号为***51线程名称...completableFuture--pool4
---------5
结果是+==null
getsgg...http-nio-8080-exec-7
线程号为***53线程名称...completableFuture--pool5
---------5
结果是+==null
主线程名称...http-nio-8080-exec-1
进入asyncInvokeReturnFuture...线程名称==Anno-Executor1
主线程名称...http-nio-8080-exec-2
进入asyncInvokeReturnFuture...线程名称==Anno-Executor2
主线程名称...http-nio-8080-exec-3
进入asyncInvokeReturnFuture...线程名称==Anno-Executor3
主线程名称...http-nio-8080-exec-4
进入asyncInvokeReturnFuture...线程名称==Anno-Executor4
3S后asyncInvokeReturnFuture数据开始处理中。。
结果是+==success:5
3S后asyncInvokeReturnFuture数据开始处理中。。
结果是+==success:5
3S后asyncInvokeReturnFuture数据开始处理中。。
结果是+==success:5
3S后asyncInvokeReturnFuture数据开始处理中。。
结果是+==success:5

关于    CompletableFuture    使用参考网址:


多线程、线程池的创建方式,为什么阿里推荐自定义线程池?_知识分子_的博客-CSDN博客_completablefuture 线程池

SpringBoot – @Async异步任务与线程池_小小默:进无止境-CSDN博客 

    原文作者:wangfenglei123456
    原文地址: https://blog.csdn.net/wangfenglei123456/article/details/120280794
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞