SpringBoot自定义线程池

1.我们都知道spring只是为我们简单的处理线程池,每次用到线程总会new 一个新的线程,效率不高,所以我们需要自定义一个线程池。

2.自定义线程池有两种方法,第一种自定义线程池然后使用自己的自定义的,第二种重写spring默认的线程池,然后使用自己重写过的线程池

一:自定义线程池

1.1 创建线程池 


/**
 * @author liu
 * @Desc springboot自定义线程池
 */
@Configuration
public class ThreadPoolConfig {

    @Bean(name = "customThreadExecutor")
    public ExecutorService executor() throws Exception {
        //cpu核心
        int cpuNum = Runtime.getRuntime().availableProcessors();
        ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

}

1.2使用他

import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;


@Component
public class Consumer1 {


    @Resource
    @Qualifier(value = "customThreadExecutor")//指定线程池的名字
    private ExecutorService executorService;

    
    public void test(String msg){
        System.out.println(Thread.currentThread().getName()+":"+msg);


        /**
         * 分类1:可以返回值的 Callable
         */
        Future fal  = executorService.submit(new Callable<String>() {
            @Override
            public String call() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
                return "处理成功!";
            }
        });

        try {
            System.out.println(fal.get());
        }catch (Exception e){
            System.out.println(e);
        }

        /**
         * 分类2:不会返回值的 Runnable
         */
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
            }
        });

        /**
         * 分类3:也可以这样
         */
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
            }
        });

    }
}

或者这样使用也可以


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

@RestController
@RequestMapping("/custom")
@Slf4j
public class ThreadPoolTest {


    @Autowired
    @Qualifier(value = "customThreadExecutor")
    ExecutorService executorService;


    @RequestMapping(value = "/thread", method = RequestMethod.GET)
    public String task() {
        Future fal = executorService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println(Thread.currentThread().getName() + ":");
                return "处理成功!";
            }
        });
        try {
            fal.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        return "ok";
    }
}

二:配置默认的线程池

2.1 第一种方式的那个线程池使用时候总要加注解@Async(“taskExecutor”),而这种方式是重写spring默认线程池的方式,使用的时候只需要加@Async注解就可以,不用去声明线程池类。

2.2 这个和上面的TaskThreadPoolConfig类相同,这里不重复

2.3 NativeAsyncTaskExecutePool.java 装配线程池

**
 * 原生(Spring)异步任务线程池装配类,实现AsyncConfigurer重写他的两个方法,这样在使用默认的
 *  线程池的时候就会使用自己重写的
 */
@Slf4j
@Configuration
public class NativeAsyncTaskExecutePool implements AsyncConfigurer{
 
 
    //注入配置类
    @Autowired
    TaskThreadPoolConfig config;
 
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
        executor.setCorePoolSize(config.getCorePoolSize());
        //最大线程数
        executor.setMaxPoolSize(config.getMaxPoolSize());
        //队列容量
        executor.setQueueCapacity(config.getQueueCapacity());
        //活跃时间
        executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
        //线程名字前缀
        executor.setThreadNamePrefix("NativeAsyncTaskExecutePool-");
 
        // setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
        // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }
 
 
    /**
     *  异步任务中异常处理
     * @return
     */
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncUncaughtExceptionHandler() {
 
            @Override
            public void handleUncaughtException(Throwable arg0, Method arg1, Object... arg2) {
                log.error("=========================="+arg0.getMessage()+"=======================", arg0);
                log.error("exception method:"+arg1.getName());
            }
        };
    }
}

2.4 测试service方法

/**
 *
 */
 
@Service
public class ThreadPoolService2 {
    private static final Logger logger = LoggerFactory.getLogger(ThreadPoolService2.class);
 
    /**
     * @Async该注解不需要在指定任何bean
     */
    @Async
    public void executeAsync() {
        logger.info("start executeAsync");
        try {
            System.out.println("当前运行的线程名称:" + Thread.currentThread().getName());
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        logger.info("end executeAsync");
    }
 
 
}

2.5 测试controller

/**
 * 
 */
@Api(description = "测试控制类22222")
@RestController
@RequestMapping("/threadPoolController2")
public class ThreadPoolController2 {
        @Autowired
        private ThreadPoolService2 threadPoolService;
 
        @ApiOperation(value = "测试方法")
        @ResponseBody
        @RequestMapping(value = "/test",method = RequestMethod.GET)
        public String threadPoolTest() {
            threadPoolService.executeAsync();
            return "hello word!";
        }
}

3.另外一种方式

import com.example.recordlog.threadFactory.CustomThreadFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.*;


/**
 * @author liu
 * @Desc springboot自定义线程池
 */
@Configuration
public class ThreadPoolConfig {

    @Bean(name = "customThreadExecutor")
    public ExecutorService nlpFeignExecutor() throws Exception {
        ExecutorService executor = new ThreadPoolExecutor(4, 8,
                0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(5), new CustomThreadFactory("custom-Thread-pool"), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                try {
                    e.getQueue().put(r);
                } catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        return executor;
    }

}

自定义实现ThreadFactory

import java.util.concurrent.atomic.AtomicInteger;


/**
 * @author
 * @Desc 自定义线程工厂
 */
public class CustomThreadFactory implements java.util.concurrent.ThreadFactory {


    private AtomicInteger threadNo = new AtomicInteger(1);

    private final String nameStart;

    private final String nameEnd = "]";

    public CustomThreadFactory(String poolName) {
        this.nameStart = "[" + poolName + "-";
    }

    @Override
    public Thread newThread(Runnable r) {
        String threadName = this.nameStart + this.threadNo.getAndIncrement() + "]";
        Thread newThread = new Thread(r, threadName);
        newThread.setDaemon(true);
        if (newThread.getPriority() != 5) {
            newThread.setPriority(5);
        }
        return newThread;
    }
}

使用自定义线程池


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

import java.util.concurrent.ExecutorService;

@Controller
@RequestMapping("/custom")
@Slf4j
public class ThreadPoolTest {


    @Autowired
    @Qualifier(value = "customThreadExecutor")
    ExecutorService executorService;


    @RequestMapping(value = "/thread", method = RequestMethod.GET)
    public String task() {

        for (int i = 0; i < 5; i++) {
            executorService.execute(() -> {
                //实现逻辑
                System.out.println(Thread.currentThread() + "正在执行");
            });
        }
        return "OK";
    }

}

    原文作者:人生就像一场戏!
    原文地址: https://blog.csdn.net/qq_40428665/article/details/121658386
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞