前言
之前写SpringBoot时,有简单介绍过分布式定时器的一些思路(SpringBoot | 第二十二章:定时任务的使用)。原来的项目本身使用dubbo实现了一个简单的实现,目前项目迁移至SpringCloud后,原来的就不适用了,但基本原理都是差不多的,都是集中管理需要调用的api及调度等相关信息。故本篇会简单介绍下一些常见的分布式定时器的实现方案,还会编写一个基于http调用的
统一调度
项目,实现简单的调用SpringCloud
项目RESTful
接口。
一些说明
本身Spring提供了
Spring Task
进行定时配置,基于注解和xml配置方式可实现简单的定时器配置,再一些场景下,若在非单机模式下,部署了多个应用时,若不加以控制,很容易造成数据的错误问题。在之前编写的文章中也有简单的提及一些分布式解决方案,比如Quartz
等,感谢的同学可点击:SpringBoot | 第二十二章:定时任务的使用,进行查看,这里就不再重复阐述了。
基于ShedLock实现轻量级分布式定时锁
ShedLock
是一个在分布式环境中使用的定时任务框架,用于解决在分布式环境中的多个实例的相同定时任务在同一时间点重复执行的问题,解决思路是通过对公用的数据库中的某个表进行记录和加锁,使得同一时间点只有第一个执行定时任务并成功在数据库表中写入相应记录的节点能够成功执行而其他节点直接跳过该任务。简单来说,ShedLock
本身只做一件事情:保证一个任务最多同时执行一次。所以如官网所说的,ShedLock
不是一个分布式调度器,只是一个锁!
注意:ShedLock
支持Mongo
,Redis
,Hazelcast
,ZooKeeper
以及任何带有JDBC驱动程序的东西。本例子为了方便,直接使用了redis
进行示例,若本身基于jdbc等,可直接参考官网给出的提示:https://github.com/lukas-krecan/ShedLock#jdbctemplate. 创建对应的表结构。
CREATE TABLE shedlock(
name VARCHAR(64),
lock_until TIMESTAMP(3) NULL,
locked_at TIMESTAMP(3) NULL,
locked_by VARCHAR(255),
PRIMARY KEY (name)
)
集成示例
创建工程名:
java-shedlock-demo
0.maven依赖(这里使用当前最新版本及使用redis进行实现),基于SpringBoot 2.0.3.RELEASE
版本。
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-spring</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-provider-redis-spring</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--spring2.0集成redis所需common-pool2 -->
<!-- 必须加上,jedis依赖此 -->
<!-- spring boot 2.0 的操作手册有标注 大家可以去看看 地址是:https://docs.spring.io/spring-boot/docs/2.0.3.RELEASE/reference/htmlsingle/ -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
1.配置LockProvider
,同时开启@EnableSchedulerLock注解。
ShedLockRedisConfig.java
/**
*
* @ClassName 类名:ShedLockRedisConfig
* @Description 功能说明:redis配置
* <p>
* TODO
*</p>
************************************************************************
* @date 创建日期:2019年3月3日
* @author 创建人:oKong
* @version 版本号:V1.0
*<p>
***************************修订记录*************************************
*
* 2019年3月3日 oKong 创建该类功能。
*
***********************************************************************
*</p>
*/
/**
*
* @ClassName 类名:ShedLockRedisConfig
* @Description 功能说明:redis配置
* <p>
* TODO
*</p>
************************************************************************
* @date 创建日期:2019年3月3日
* @author 创建人:oKong
* @version 版本号:V1.0
*<p>
***************************修订记录*************************************
*
* 2019年3月3日 oKong 创建该类功能。
*
***********************************************************************
*</p>
*/
@Configuration
//defaultLockAtMostFor 指定在执行节点结束时应保留锁的默认时间使用ISO8601 Duration格式
//作用就是在被加锁的节点挂了时,无法释放锁,造成其他节点无法进行下一任务
//这里默认30s
//关于ISO8601 Duration格式用的不到,具体可上网查询下相关资料,应该就是一套规范,规定一些时间表达方式
@EnableSchedulerLock(defaultLockAtMostFor = "PT30S")
public class ShedLockRedisConfig {
//正常情况下 应该按实际环境来区分的
//这里为了方便 写成test便于是测试
// @Value("${spring.profiles.active}")
String env = "test";
@Bean
public LockProvider lockProvider(RedisConnectionFactory connectionFactory) {
//环境变量 -需要区分不同环境避免冲突,如dev环境和test环境,两者都部署时,只有一个实例进行,此时会造成相关环境未启动情况
return new RedisLockProvider(connectionFactory, env);
}
}
2.编写一个简单定时任务。
/**
*
* @ClassName 类名:SimpleTask
* @Description 功能说明:
* <p>
* TODO
*</p>
************************************************************************
* @date 创建日期:2019年3月3日
* @author 创建人:oKong
* @version 版本号:V1.0
*<p>
***************************修订记录*************************************
*
* 2019年3月3日 oKong 创建该类功能。
*
***********************************************************************
*</p>
*/
@Component
@Slf4j
public class SimpleTask {
//区分服务
@Value("${server.port}")
String port;
//为了方便测试 设置cron表达式
@Scheduled(cron = "*/5 * * * * ?")
//lockAtLeastFor:保证在设置的期间类不执行多次任务,单位是毫秒,此处可以根据实际任务运行情况进行设置,
//简单来说,一个每15分钟执行的任务,若每次任务执行的时间为几分钟,则可以设置lockAtLeastFor大于其最大估计最大执行时间
//避免一次任务未执行完,下一个定时任务又启动了。
//任务执行完,会自动释放锁。
@SchedulerLock(name="simpleTask",lockAtLeastFor = 1*1000)
public void getCurrentDate() {
log.info("端口({}),Scheduled定时任务执行:{}", port, new Date());
}
}
3.编写启动类开启定时任务功能,及配置文件。
/**
*
* @ClassName 类名:ShedLockApplication
* @Description 功能说明:启动类
* <p>
* TODO
* </p>
************************************************************************
* @date 创建日期:2019年3月3日
* @author 创建人:oKong
* @version 版本号:V1.0
* <p>
*************************** 修订记录*************************************
*
* 2019年3月3日 oKong 创建该类功能。
*
***********************************************************************
* </p>
*/
@SpringBootApplication
@EnableScheduling // 开启定时任务
@Slf4j
public class ShedLockApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(ShedLockApplication.class, args);
log.info("java-shedlock-demo启动!");
}
}
application.properties
server.port=8001
# REDIS (RedisProperties)
# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
#spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制)Duration
spring.redis.lettuce.pool.max-wait=-1ms
# 连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=8
# 连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
# 连接超时时间-Duration 不能设置为0 一般上设置个200ms
spring.redis.timeout=200ms
4.利用多环境启动多个服务(8001,8002),查看是否正常运行。
8001服务
2019-03-03 23:36:30.070 INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8001),Scheduled定时任务执行:Mon Mar 03 23:36:30 CST 2019
2019-03-03 23:36:35.005 INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8001),Scheduled定时任务执行:Mon Mar 03 23:36:35 CST 2019
2019-03-03 23:36:40.002 INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8001),Scheduled定时任务执行:Mon Mar 03 23:36:40 CST 2019
2019-03-03 23:36:45.003 INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8001),Scheduled定时任务执行:Mon Mar 03 23:36:45 CST 2019
2019-03-03 23:36:50.003 INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8001),Scheduled定时任务执行:Mon Mar 03 23:36:50 CST 2019
2019-03-03 23:36:55.006 INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8001),Scheduled定时任务执行:Mon Mar 03 23:36:55 CST 2019
2019-03-03 23:37:05.002 INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8001),Scheduled定时任务执行:Mon Mar 03 23:37:05 CST 2019
2019-03-03 23:37:15.002 INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8001),Scheduled定时任务执行:Mon Mar 03 23:37:15 CST 2019
8002服务
2019-03-03 23:37:00.012 INFO 24492 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8002),Scheduled定时任务执行:Mon Mar 03 23:37:00 CST 2019
2019-03-03 23:37:10.007 INFO 24492 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8002),Scheduled定时任务执行:Mon Mar 03 23:37:10 CST 2019
通过日志输出,可以看出每次任务执行时,只有一个实例在运行。具体哪个服务,看谁获取到锁了。
SchedulerLock注解说明
@SchedulerLock注解一共支持五个参数,分别是
- name:用来标注一个定时服务的名字,被用于写入数据库作为区分不同服务的标识,如果有多个同名定时任务则同一时间点只有一个执行成功
- lockAtMostFor:成功执行任务的节点所能拥有独占锁的最长时间,单位是毫秒ms
- lockAtMostForString:成功执行任务的节点所能拥有的独占锁的最长时间的字符串表达,例如“PT14M”表示为14分钟
- lockAtLeastFor:成功执行任务的节点所能拥有独占所的最短时间,单位是毫秒ms
lockAtLeastForString:成功执行任务的节点所能拥有的独占锁的最短时间的字符串表达,例如“PT14M”表示为14分钟
两种集成模式
按官网介绍,其有两种模式:
TaskScheduler
及Method
代理,具体的可以查看官网介绍,这里就不过多阐述了。简单来说,都是使用AOP
代理机制,一个是代理了taskScheduler
,一个是代理了被注解了SchedulerLock
具体的方法。可以具体场景进行设置,比如记录定时任务日志等。这里需要注意,使用Method
代理时,其不依赖于Spring环境,但普通调用此方法时也会进行锁定的,需要注意,而且目前只支持void
的方法。
TaskScheduler代理时序图
Method代理时序图
基于统一调度中心实现任务调用
统一调度中心
:一个管理定时任务配置及发起任务执行的一个服务。简单来说,就是通过维护需要执行任务的服务列表,如api地址
、dubbo
服务信息等,通过配置的定时配置进行服务调用。从而避免了定时任务重复问题,同时也能利用注册中心实现负载均衡动态调用对应任务。
技术选型
- 核心框架:
SpringBoot 2.0.3.RELEASE
、Springcloud Finchley.SR1
- 任务调度:
Quartz
- 持久层框架:
MyBatis
+MyBatis-Plus
- 数据库:
mysql
题外话:原本想延续原先
SpringBoot1.5
版本进行开发,后面考虑此服务相对简单,所以直接尝试使用webflux
进行服务开发,顺便也学习学习WebFlux
相关操作。
数据库脚本
CREATE TABLE `sched_config` (
`id` bigint(20) NOT NULL,
`name` varchar(200) DEFAULT NULL COMMENT '任务名称',
`target_service_type` varchar(2) DEFAULT NULL COMMENT '目标任务类型:01 springcloud 02 http 03 dubbo',
`targer_service` varchar(50) DEFAULT NULL COMMENT '目标服务:可为服务地址,或者dubbo服务名',
`cron_config` varchar(20) DEFAULT NULL COMMENT 'cron表达式',
`status` varchar(1) DEFAULT NULL COMMENT '状态:1启用 0 停用',
`remark` varchar(200) DEFAULT NULL COMMENT '备注说明',
`extra_dubbo_group` varchar(50) DEFAULT NULL COMMENT 'dubbo组名',
`extra_dubbo_version` varchar(50) DEFAULT NULL COMMENT 'dubbo服务版本信息',
`gmt_create` datetime DEFAULT NULL COMMENT '创建时间',
`gmt_modified` datetime DEFAULT NULL COMMENT '修改时间',
PRIMARY KEY (`id`)
)
相关类说明
quartz工厂类(QuartzJobFactory)
为了使得自定义的job能主动注入spring的相关bean,需要额外实现此工厂类,方便调用。当然也可以直接动态获取bean实例了。
public class QuartzJobFactory extends AdaptableJobFactory {
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
// 调用父类的方法
Object jobInstance = super.createJobInstance(bundle);
//主动注入
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
同时,配置SchedulerFactoryBean
,设置其工厂类。
QuartzConfig.java
@Configuration
public class QuartzConfig {
@Bean
public SchedulerFactoryBean schedulerFactoryBean(){
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setOverwriteExistingJobs(true);
// 延时启动
factory.setStartupDelay(20);
// 自定义Job Factory,用于Spring注入
factory.setJobFactory(quartzJobFactory());
return factory;
}
@Bean
public QuartzJobFactory quartzJobFactory() {
return new QuartzJobFactory();
}
}
初始化任务(InitJob)
在服务启动时,启动开启配置的任务,同时设置其定时器。
@Component
@Slf4j
public class InitJob {
@Autowired
ISchedConfigService schedConfigService;
@Autowired
Scheduler scheduler;
/**
*
* <p>函数名称: initJob </p>
* <p>功能说明: 启动时,进行任务初始化操作,即启动相应的任务定时器
*
* </p>
*<p>参数说明:</p>
*
* @date 创建时间:2019年3月4日
* @author 作者:oKong
*/
@PostConstruct
public void initJob() {
log.info("初始化任务开始......");
//获取所有启用任务
EntityWrapper<SchedConfig> qryWrapper = new EntityWrapper<>();
qryWrapper.eq(SchedConfig.STATUS, "1");
List<SchedConfig> schedConfigList = schedConfigService.selectList(qryWrapper);
if(schedConfigList == null || schedConfigList.isEmpty()) {
log.warn("暂无定时任务");
return;
}
for(SchedConfig config : schedConfigList) {
String name = config.getName();//任务名称
JobDetail jobDetail = newJob(TaskJob.class).withIdentity(name, "okongJobGroup").build();
//设置运行时参数
JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap.put("config", config);
//创建trigger触发器
Trigger trigger = newTrigger()
.withIdentity(name, "okongTriggerGroup")
.withSchedule(cronSchedule(config.getCronConfig())).build();
//启动定时器
try {
scheduler.scheduleJob(jobDetail, trigger);
log.info("任务[{}]启动成功", name);
} catch (SchedulerException e) {
log.error("任务[{}]启动失败,{}", name,e.getMessage());
}
}
log.info("初始化任务结束......");
}
}
任务类(TaskJob)
实现具体任务的执行和调用。利用
WebClient
实现http服务的调用。暂时未实现dubbo的调用,后期再补充。
- 配置普通
WebClient
和具有负载均衡的webClient
,主要是考虑到存在访问SpringCloud
服务和普通http
的需求,原先使用负载均衡的restTemplate
时,访问普通的http请求是无法访问的,不知道webClient
是否也是一样,这里直接简单粗暴的直接设置了两个webClient
。
@Configuration
public class WebClientConfig {
/**
*
* <p>函数名称: loadBalancedWebClientBuilder </p>
* <p>功能说明: 具有负载均衡的WebClient
*
* </p>
*<p>参数说明:</p>
* @return
*
* @date 创建时间:2019年3月5日
* @author 作者:oKong
*/
@Bean("balanceWebClient")
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
return WebClient.builder();
}
/**
*
* <p>函数名称: webClientBuilder </p>
* <p>功能说明:普通WebClient
*
* </p>
*<p>参数说明:</p>
* @return
*
* @date 创建时间:2019年3月5日
* @author 作者:oKong
*/
@Bean("webClient")
public WebClient.Builder webClientBuilder() {
return WebClient.builder();
}
}
- 具体执行任务类,根据不同的类型,进行不同的调用。
//@DisallowConcurrentExecution 说明在一个任务执行时,另一个定时点来临时不会执行任务,比如一个定时是间隔3分钟一次,但任务执行了5分钟,此时会等上个任务完成后再执行下一次定时任务
@DisallowConcurrentExecution
@Slf4j
public class TaskJob implements org.quartz.Job{
/**
* spring5中 异步restTemplate已被标记位作废了
* 这里尝试使用webClient
*/
@Autowired
@Qualifier("balanceWebClient")
private WebClient.Builder balanceWebClientBuilder;
@Autowired
@Qualifier("webClient")
private WebClient.Builder webClientBuilder;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
//执行方法
//获取任务实体对象
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
SchedConfig schedConfig = (SchedConfig) jobDataMap.get("config");
log.info("执行定时任务:{}", schedConfig);
//根据不同类型进行不同的处理逻辑
Mono<String> monoRst = null;
switch (schedConfig.getTargetServiceType()) {
case "01":
//springcloud方式
//利用loadBalancerClient 获取实际服务地址
monoRst = balanceWebClientBuilder.build().post().uri(schedConfig.getTargerService()).retrieve().bodyToMono(String.class);
break;
case "02":
//普通http方式
monoRst =webClientBuilder.build().post().uri(schedConfig.getTargerService()).retrieve().bodyToMono(String.class);//无参数
break;
case "03":
//dubbo方式
//TODO 暂时未实现
break;
default:
}
if(monoRst != null) {
log.info("调用服务结果为:{}", monoRst.block());
}
}
}
服务效果
为了测试,简单改造了
java-shedlock-demo
为SpringCloud
项目,具体就不贴代码了,可直接下载相应工程进行查看。
数据库配置:
服务启动,控制台输出:
大家可自行测试下,这里只是简单的进行控制台输出。
参考资料
总结
本文主要简单介绍了一些分布式定时任务的解决方案。对于
ShedLock
大部分的分布式场景应该是够用了,特别场景下可能需要注意,实际情况实际解决了。而对于后一种,统一调度
服务而言,本身只是个简单的示例,后续会考虑加入dubbo
的支持,及一些其他的特性,如调用反馈,失败次数等等,目前只是简单的为了满足业务需要,后需要会进行优化的,目前就且看吧,一些异常之类的都还没有进行处理⊙﹏⊙‖∣。
完整示例:
统一调度中心:okong-scheduler
schedLock-demo:
https://github.com/xie19900123/java-learning/tree/master/java-shedlock-demo
原文地址:https://blog.lqdev.cn/2019/03/06/%E6%97%A5%E5%B8%B8%E7%A7%AF%E7%B4%AF/java-scheduler/