目录
一、简介
在实际的开发中使用定时任务功能的不在少数,并且可以说很多地方都是滥用了,比如一个定时任务几个小时都做不完,或者是定时任务挂了,甚至导致很多的生产问题,反正我个人不是很建议的定时任务滥用的,尤其是比较核心的任务,一个就是要尽量让定时任务稳定,一个就是就是让定时任务比较轻便。比如任务的拆解,只创建任务,通过中间件,多服务的形式进行异步的处理。不管怎么样,我们还是来写个定时任务吧。
二、maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.alian</groupId>
<artifactId>scheduled</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>scheduled</name>
<description>Spring Boot整合任务调度</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${parent.version}</version>
</dependency>
<!-- 本例中你不加入容器也可以运行-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${parent.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.14</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
三、核心配置
3.1 配置文件application.yml
application.yml
#仅仅是定时任务测试,你也可以不配置此项
server:
port: 8080
servlet:
context-path: /scheduled
#定时任务
app:
executor-config:
core-pool-size: 5
max-pool-size: 10
queue-capacity: 10
keep-alive-time: 60
task-query: 0/5 * * * * ?
3.2 属性配置
AppProperties.java
package com.alian.scheduled.common;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(value = "app")
public class AppProperties {
/** * 小程序配置 */
private ExecutorConfig executorConfig;
@Data
public static class ExecutorConfig {
/** * 核心线程数 */
private int corePoolSize;
/** * 最大线程数 */
private int maxPoolSize;
/** * 任务队列容量(阻塞队列) */
private int queueCapacity;
/** * 线程空闲时间 */
private int keepAliveTime;
}
}
此配置类不懂的可以参考我另一篇文章:Spring Boot读取配置文件常用方式.
3.3 配置自定义的TaskExecutor(重要)
此配置类需要注意的几个点:
- 注解@EnableScheduling绝不能少(也可以配置在主类上),表示开启任务调度
- 注解@EnableAsync是使用多线程不能少的(也可以配置在主类上),表示开启异步调用
- 配置自定义TaskExecutor(参数含义都在代码里)
ScheduledConfig.java
package com.alian.scheduled.common;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@EnableScheduling
@EnableAsync
@Configuration
public class ScheduledConfig {
@Autowired
private AppProperties appProperties;
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//设置核心线程数
executor.setCorePoolSize(appProperties.getExecutorConfig().getCorePoolSize());
//设置最大线程数
executor.setMaxPoolSize(appProperties.getExecutorConfig().getMaxPoolSize());
//设置任务队列容量
executor.setQueueCapacity(appProperties.getExecutorConfig().getQueueCapacity());
//设置线程活跃时间(秒)
executor.setKeepAliveSeconds(appProperties.getExecutorConfig().getKeepAliveTime());
//设置默认线程名称(线程前缀名称,有助于区分不同线程池之间的线程比如:taskExecutor-query-)
executor.setThreadNamePrefix("taskExecutor-query-");
//设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//设置允许核心线程超时,默认是false
executor.setAllowCoreThreadTimeOut(false);
return executor;
}
}
自定义TaskExecutor大概的工作流程如下:
- 当线程池的线程数小于corePoolSize,就新建线程,并处理请求
- 当线程池的线程数等于corePoolSize,并且任务队列workQueue也没有满,则把请求放入任务队列workQueue中,线程池里的空闲线程会去取任务并处理
- 当线程池的线程数等于corePoolSize,任务队列workQueue也满了,并且线程池中的线程数小于maxPoolSize,则创建新的线程来处理被添加的任务
- 当线程池的线程数大于corePoolSize,任务队列workQueue也满了,并且线程池中的数量等于maxPoolSize,那么通过RejectedExecutionHandler所指定的策略来处理此任务。处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程 maxPoolSize,如果三者都满了,则使用RejectedExecutionHandler处理被拒绝的任务。
- 当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数
- 此外maxPoolSize的设定如果比系统支持的线程数还要大时,会抛出:java.lang.OutOfMemoryError: unable to create new native thread异常
四、使用线程池实现定时任务
只要我们加上注解@Async即可,此注解里可指定线程池的名称,@Async的默认线程池为SimpleAsyncTaskExecutor,这里我们指定我们自定义的线程池taskExecutor(也就是beanName)。我们的任务是每隔5秒执行一次。
QueryTask.java
package com.alian.scheduled.task;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
@Slf4j
@Component
public class QueryTask {
/** * 注解@Async里的"taskExecutor",就是ScheduledConfig里定义的bean: public Executor taskExecutor() */
@Async("taskExecutor")
@Scheduled(cron = "${app.task-query}")
public void query() throws Exception {
log.info("-----------开始查询任务-----------");
//记录当前任务开始时间
LocalDateTime startTime = LocalDateTime.now();
//模拟任务处理
Thread.sleep((int) (Math.random() * 10));
//记录当前任务结束时间
LocalDateTime endTime = LocalDateTime.now();
//计算两个时间的毫秒差
long millis = ChronoUnit.MILLIS.between(startTime, endTime);
log.info("执行任务耗时:{}毫秒", millis);
log.info("-----------结束查询任务-----------");
}
}
五、运行结果
我们在上面的代码上注释掉@Async(“taskExecutor”)执行结果如下:
2021-09-24 15:38:50 010 [scheduling-1] INFO :-----------开始查询任务-----------
2021-09-24 15:38:50 014 [scheduling-1] INFO :执行任务耗时:3毫秒
2021-09-24 15:38:50 015 [scheduling-1] INFO :-----------结束查询任务-----------
2021-09-24 15:38:55 010 [scheduling-1] INFO :-----------开始查询任务-----------
2021-09-24 15:38:55 013 [scheduling-1] INFO :执行任务耗时:3毫秒
2021-09-24 15:38:55 013 [scheduling-1] INFO :-----------结束查询任务-----------
2021-09-24 15:39:00 013 [scheduling-1] INFO :-----------开始查询任务-----------
2021-09-24 15:39:00 020 [scheduling-1] INFO :执行任务耗时:7毫秒
2021-09-24 15:39:00 020 [scheduling-1] INFO :-----------结束查询任务-----------
2021-09-24 15:39:05 009 [scheduling-1] INFO :-----------开始查询任务-----------
2021-09-24 15:39:05 013 [scheduling-1] INFO :执行任务耗时:4毫秒
2021-09-24 15:39:05 013 [scheduling-1] INFO :-----------结束查询任务-----------
2021-09-24 15:39:10 008 [scheduling-1] INFO :-----------开始查询任务-----------
2021-09-24 15:39:10 014 [scheduling-1] INFO :执行任务耗时:6毫秒
2021-09-24 15:39:10 014 [scheduling-1] INFO :-----------结束查询任务-----------
2021-09-24 15:39:15 011 [scheduling-1] INFO :-----------开始查询任务-----------
2021-09-24 15:39:15 013 [scheduling-1] INFO :执行任务耗时:2毫秒
2021-09-24 15:39:15 013 [scheduling-1] INFO :-----------结束查询任务-----------
从这里我们可以看到,此任务一直是同一个线程在运行。
然后我们放开@Async(“taskExecutor”)注释,执行结果如下:
2021-09-24 15:40:25 018 [taskExecutor-query-1] INFO :-----------开始查询任务-----------
2021-09-24 15:40:25 023 [taskExecutor-query-1] INFO :执行任务耗时:4毫秒
2021-09-24 15:40:25 024 [taskExecutor-query-1] INFO :-----------结束查询任务-----------
2021-09-24 15:40:30 007 [taskExecutor-query-2] INFO :-----------开始查询任务-----------
2021-09-24 15:40:30 010 [taskExecutor-query-2] INFO :执行任务耗时:2毫秒
2021-09-24 15:40:30 010 [taskExecutor-query-2] INFO :-----------结束查询任务-----------
2021-09-24 15:40:35 012 [taskExecutor-query-3] INFO :-----------开始查询任务-----------
2021-09-24 15:40:35 021 [taskExecutor-query-3] INFO :执行任务耗时:9毫秒
2021-09-24 15:40:35 021 [taskExecutor-query-3] INFO :-----------结束查询任务-----------
2021-09-24 15:40:40 007 [taskExecutor-query-4] INFO :-----------开始查询任务-----------
2021-09-24 15:40:40 013 [taskExecutor-query-4] INFO :执行任务耗时:5毫秒
2021-09-24 15:40:40 013 [taskExecutor-query-4] INFO :-----------结束查询任务-----------
2021-09-24 15:40:45 015 [taskExecutor-query-5] INFO :-----------开始查询任务-----------
2021-09-24 15:40:45 022 [taskExecutor-query-5] INFO :执行任务耗时:6毫秒
2021-09-24 15:40:45 022 [taskExecutor-query-5] INFO :-----------结束查询任务-----------
2021-09-24 15:40:50 009 [taskExecutor-query-1] INFO :-----------开始查询任务-----------
2021-09-24 15:40:50 013 [taskExecutor-query-1] INFO :执行任务耗时:4毫秒
2021-09-24 15:40:50 013 [taskExecutor-query-1] INFO :-----------结束查询任务-----------
2021-09-24 15:40:55 013 [taskExecutor-query-2] INFO :-----------开始查询任务-----------
2021-09-24 15:40:55 017 [taskExecutor-query-2] INFO :执行任务耗时:3毫秒
2021-09-24 15:40:55 017 [taskExecutor-query-2] INFO :-----------结束查询任务-----------
从上述结果看来,使用线程池后,每次的运行的线程是不一样的。但是这个线程池怎么配置呢?
六、参数设置(corePoolSize、queueCapacity、maxPoolSize)(重要)
6.1 基本参数定义
- tasks :每秒的任务数,假设为100~1000
- spendTime:每个任务花费时间,假设为0.2s
- responseTime:系统允许容忍的最大响应时间,假设为1s
6.2 核心线程数(corePoolSize)
核心线程数 = 每秒任务数 / 每个线程每秒处理的任务数
其中:每个线程每秒处理的任务数=1秒/每个任务任务花费的时间
corePoolSize = tasks/(1/spendTime) =tasksprocessTime=(100~1000)0.2 = 50~200
根据8020原则
如果80%的每秒任务数小于300时,那么corePoolSize设置为3000.2=60即可
如果80%的每秒任务数小于800时,那么corePoolSize设置为8000.2=160即可
6.3 任务队列数(queueCapacity)
我们之前就知道了,当线程池的线程数等于corePoolSize,并且任务队列workQueue也没有满,则把请求放入任务队列workQueue中,那么就相当于所有的核心线程数的任务都放到任务队列workQueue,并且可以等待1s,即响应时间,由此得出公式:
queueCapacity = (coreSizePool/spendTime)responsetime
如果80%的每秒任务数小于300时,计算可得 queueCapacity = 60/0.21 = 300
如果80%的每秒任务数小于800时,计算可得 queueCapacity = 160/0.2*1 = 800
6.4 最大线程数(maxPoolSize)
最大线程数 = (最大任务数-队列容量)/每个线程每秒处理能力
maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
如果80%的每秒任务数小于300时,计算可得 maxPoolSize = (1000-300)/10 = 70
如果80%的每秒任务数小于800时,计算可得 maxPoolSize = (1000-800)/10 = 20
好了,最大的问题来了,也是所有网友的一个疑惑,为啥你计算的最大线程数比核心线程数小啊???我们来重点回顾下本章第三节的要点,你就知道为什么了。
6.4.1 知识点一
- 当线程池的线程数等于corePoolSize,任务队列workQueue也满了,并且线程池中的线程数小于maxPoolSize,则创建新的线程来处理被添加的任务
如果80%的每秒任务数小于300时
此时:corePoolSize =60,queueCapacity =300,每个线程每秒完成5个任务,每秒消耗任务数300个,当1000个任务来的时候,就有700个任务需要写入工作队列,但是工作队列大小只有300,写入300个后,此时任务队列满了,就会新建线程去消耗任务。
6.4.2 知识点二
- 当线程池的线程数等于corePoolSize,并且任务队列workQueue也没有满,则把请求放入任务队列workQueue中,线程池里的空闲线程会去取任务并处理
如果80%的每秒任务数小于800时
此时:corePoolSize =160,queueCapacity =800,每个线程每秒完成5个任务,每秒消耗任务数800个,当1000个任务来的时候,是不是有200个任务需要写入到任务队列workQueue中。这个时候,任务队列workQueue是没有满的,是不会新建线程去消耗任务的,会等空闲的核心线程去消耗。
6.4.3 建议
因为数据都是我们人为定,比如响应时间,并发数量等,比如已本例来说,我说响应时间是5秒,那么你计算的最大线程数还为负数呢。所以大家就不要因为网上重复的案例去纠结了。你只要记住,计算出的最大线程数小于等于核心线程数,就设置为核心线程数就行了,这样也省了创建线程的开支,直接进入等待队列。也不要比核心进程数小,因为可能你的系统启动就报错了。源码里有判断maximumPoolSize 和 corePoolSize
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
如果maximumPoolSize < corePoolSize则直接异常了,所以最大线程数一定是大于等于核心线程数的。
6.5 空闲时间(keepAliveTime)
keepAliveTime,指的是线程池维护线程所允许的空闲时间,当负载降低时,可减少线程数量,如果一个线程空闲时间达到keepAliveTiime,该线程就退出,默认情况下线程池最少会保持corePoolSize个线程。如果设置allowCoreThreadTimeout=true,则线程池最后的线程数量可以为0。
6.6 任务拒绝处理器(RejectedExecutionHandler)
关于RejectedExecutionHandler,需要根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理,它目前有四种策略:
- ThreadPoolExecutor.AbortPolicy,此策略是默认的策略,处理程序遭到拒绝将抛出运行时异常RejectedExecutionException。
- ThreadPoolExecutor.CallerRunsPolicy,调用者的线程会执行该任务,如果执行器已关闭,则丢弃
- ThreadPoolExecutor.DiscardPolicy,不能执行的任务将被丢弃
- ThreadPoolExecutor.DiscardOldestPolicy,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程),此策略可能导致数据丢失。
两种情况会拒绝处理任务:一个是当线程数已经达到maxPoolSize,且工作队列已满;另一个是当线程池被调用shutdown()后,和线程池真正shutdown之前提交任务(调用shutdown方法后,会等待线程池里的任务执行完毕,再shutdown)。
结语
其实定时任务的整合非常的简单,主要是要理解ThreadPoolTaskExecutor的工作流程及其参数配置。