Java的线程池一般是基于concurrent包下的ThreadPoolExecutor类实现的,
不过当我们基于spring框架开发程序时,
通常会使用其包装类ThreadPoolTaskExecutor,
这里有一个小问题就是当使用线程池执行任务的时候,
任务的消费速度小于生产速度时,任务通常会被阻塞到阻塞队列,
而阻塞队列大小通常是固定的,当阻塞队列满的时候,execute方法并不会阻塞,
默认是使用RejectedExecutionHandler去处理拒绝的任务,默认策略是AbortPolicy,直接抛出RejectedExecutionException异常,
很多情况下,这不符合我们的业务需求,我们希望被拒绝的任务能够阻塞执行,从而阻止任务的生产速度;
一个比较巧妙的解决方案如下,仅供参考,具体还需要根据实际场景来应用:
import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration public class TaskExecutorConfig { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Bean("vgcThreadPoolTaskExecutor") public Executor threadPoolTaskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(6); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(1000); taskExecutor.setRejectedExecutionHandler((Runnable r, ThreadPoolExecutor executor) -> { if (!executor.isShutdown()) { try { executor.getQueue().put(r); } catch (InterruptedException e) { logger.error(e.toString(), e); Thread.currentThread().interrupt(); } } } ); taskExecutor.initialize(); return taskExecutor; } }
这里之所以能实现阻塞,是基于BlockingQueue的put方法来实现的,当阻塞队列满时,put方法会一直等待…