上一篇介绍了整个调度系统的流程,今天先来介绍一下Quartz
简单使用
集群模式锁的使用
整合springboot
简单使用
//一分钟执行一次
private static void min() throws ParseException {
//指定开始时间
String time="2018-12-19 15:22:50";
SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
//创建scheduler
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
//定义一个Trigger
Trigger trigger = newTrigger().withIdentity("trigger1", "group1")
//.startNow()
.startAt(simpleDateFormat.parse(time))
.withSchedule(calendarIntervalSchedule()
.withIntervalInMinutes(1)
.withMisfireHandlingInstructionDoNothing()
)
.build();
JobDetail job = newJob(HelloQuartz.class)
.withIdentity("job1", "group1")
.usingJobData("name", "quartz")
.build();
scheduler.scheduleJob(job, trigger);
scheduler.start();
/*//运行一段时间后关闭 Thread.sleep(10000); scheduler.shutdown(true);*/
} catch (Exception e) {
e.printStackTrace();
}
}
//我们可以改变Trigger来实现不同的调度方式
//比如一天执行一次
Trigger trigger = newTrigger().withIdentity("trigger1", "group1")
.startNow()
.withSchedule(calendarIntervalSchedule()
.withIntervalInDays(1))
.build();
//一次性任务
Trigger trigger = newTrigger().withIdentity("trigger1", "group1")
//.startNow()
.startAt(simpleDateFormat.parse(time))
.withSchedule(simpleSchedule()
.withRepeatCount(0) //只执行一次
.repeatForever()
.withIntervalInSeconds(1)
.withMisfireHandlingInstructionFireNow()
)
.build();
//小时任务
Trigger trigger = newTrigger().withIdentity("trigger1", "group1")
.startNow()
.withSchedule(calendarIntervalSchedule()
.withIntervalInHours(1))
.build();
//定义我们的job类
/** * @author dalizu on 2018/11/1. * @version v1.0 * @desc @DisallowConcurrentExecution (测试:如果加此注解,虽然是1秒调度一次withIntervalInSeconds(1),但是还是要一个任务一个任务执行,日志打印会是5s一次, * 一定要等上次任务执行完成,才会执行下次任务) */
@DisallowConcurrentExecution
public class HelloQuartz implements Job {
public void execute(JobExecutionContext context) throws JobExecutionException {
JobDetail detail = context.getJobDetail();
String name = detail.getJobDataMap().getString("name");
SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("==============>测试 say hello to " + name + " at " + simpleDateFormat.format(new Date()).toString());
}
}
配置
org.quartz.scheduler.instanceName: FayaQuartzScheduler
#调度器实例编号自动生成
org.quartz.scheduler.instanceId = AUTO
# 持久化配置
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
#quartz相关数据表前缀名
org.quartz.jobStore.tablePrefix = QRTZ_
#开启分布式部署
org.quartz.jobStore.isClustered = true
#分布式节点有效性检查时间间隔,单位:毫秒
org.quartz.jobStore.clusterCheckinInterval = 20000
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
#线程池实现类
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 50
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
org.quartz.jobStore.misfireThreshold: 60000
锁的简单介绍
Quartz使用锁来实现数据库集群
可以看到quartz提供的表
CREATE TABLE `qrtz_locks` (
`SCHED_NAME` varchar(120) COLLATE utf8mb4_unicode_ci NOT NULL,
`LOCK_NAME` varchar(40) COLLATE utf8mb4_unicode_ci NOT NULL,
PRIMARY KEY (`SCHED_NAME`,`LOCK_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
行级锁:每次操作锁住一行数据。开销大,加锁慢;会出现死锁;锁定粒度最小,发生锁冲突的概率最低,并发度也最高
默认情况下,select语句是不会对数据加写锁的,也就是不会阻止写入,通过使用 for update (属于悲观锁)可以对数据加写锁.
Quartz获取锁执行流程
QuartzScheduler被创建时候,会创建一个QuartzSchedulerThread实例,作为调度线程
会去获取未来30s内将会被触发的trigger
triggers = this.qsRsrcs.getJobStore().acquireNextTriggers(now + this.idleWaitTime, Math.min(availThreadCount, this.qsRsrcs.getMaxBatchSize()), this.qsRsrcs.getBatchTimeWindow());
此时会先去获取TRIGGER_ACCESS相关的锁
获取连接
conn = DBConnectionManager.getInstance().getConnection(this.getDataSource());
//关闭mysql数据库的自动提交属性
if (!this.isDontSetAutoCommitFalse()) {
conn.setAutoCommit(false);
}
会执行SQL
public static final String SELECT_FOR_LOCK = "SELECT * FROM {0}LOCKS WHERE SCHED_NAME = {1} AND LOCK_NAME = ? FOR UPDATE";
可以看到sql使用的 FOR UPDATE,会锁住查询数据的行。
其他线程执行此操作会被阻塞,直到上一个获取锁的操作提交后。
更多数据库锁和quartz的细节大家可以去查看更多的资料,也可以一起讨论
springboot 整合quartz
/** * quartz的配置 * */
@EnableScheduling
@Configuration
public class QuartzConfiguration {
@Bean
public JobFactory jobFactory(ApplicationContext applicationContext)
{
AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory,@Qualifier("dataSource") DataSource dataSource){
SchedulerFactoryBean schedulerFactoryBean=new SchedulerFactoryBean();
//将spring管理job自定义工厂交由调度器维护
schedulerFactoryBean.setJobFactory(jobFactory);
//设置配置文件位置
schedulerFactoryBean.setConfigLocation(new ClassPathResource("/quartz.properties"));
//设置覆盖已存在的任务
schedulerFactoryBean.setOverwriteExistingJobs(true);
//项目启动完成后,等待2秒后开始执行调度器初始化
schedulerFactoryBean.setStartupDelay(2);
//设置调度器自动运行
schedulerFactoryBean.setAutoStartup(true);
//设置数据源,使用与项目统一数据源
schedulerFactoryBean.setDataSource(dataSource);
return schedulerFactoryBean;
}
/** * 继承org.springframework.scheduling.quartz.SpringBeanJobFactory * 实现任务实例化方式 */
public static class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements
ApplicationContextAware {
private transient AutowireCapableBeanFactory beanFactory;
@Override
public void setApplicationContext(final ApplicationContext context) {
beanFactory = context.getAutowireCapableBeanFactory();
}
/** * 将job实例交给spring ioc托管 * 我们在job实例实现类内可以直接使用spring注入的调用被spring ioc管理的实例 */
@Override
protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
final Object job = super.createJobInstance(bundle);
//将job实例交付给spring ioc
beanFactory.autowireBean(job);
return job;
}
}
}
//配置完成后就可以在service里面引用了
@Autowired
private Scheduler scheduler;
总结
完整代码可以去github上查看:https://github.com/lizu18xz/faya-job