实现一个分布式调度系统-Quartz

上一篇介绍了整个调度系统的流程,今天先来介绍一下Quartz
简单使用
集群模式锁的使用
整合springboot

简单使用

  //一分钟执行一次
  private static void min() throws ParseException {

        //指定开始时间
        String time="2018-12-19 15:22:50";
        SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        try {
            //创建scheduler
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();

            //定义一个Trigger
            Trigger trigger = newTrigger().withIdentity("trigger1", "group1")
                    //.startNow()
                    .startAt(simpleDateFormat.parse(time))
                    .withSchedule(calendarIntervalSchedule()
                            .withIntervalInMinutes(1)
                            .withMisfireHandlingInstructionDoNothing()
                             )
                    .build();
            JobDetail job = newJob(HelloQuartz.class)
                    .withIdentity("job1", "group1")
                    .usingJobData("name", "quartz")
                    .build();
            scheduler.scheduleJob(job, trigger);
            scheduler.start();
            /*//运行一段时间后关闭 Thread.sleep(10000); scheduler.shutdown(true);*/
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
	
//我们可以改变Trigger来实现不同的调度方式
//比如一天执行一次
Trigger trigger = newTrigger().withIdentity("trigger1", "group1") 
                    .startNow()                   
                    .withSchedule(calendarIntervalSchedule()
                           .withIntervalInDays(1))
                          .build();	
						  
//一次性任务
Trigger trigger = newTrigger().withIdentity("trigger1", "group1")
                    //.startNow()
                    .startAt(simpleDateFormat.parse(time))
                    .withSchedule(simpleSchedule()
                            .withRepeatCount(0)  //只执行一次
                            .repeatForever()
                            .withIntervalInSeconds(1)
                            .withMisfireHandlingInstructionFireNow()
                    )
                    .build();						  
	
//小时任务
Trigger trigger = newTrigger().withIdentity("trigger1", "group1")
                    .startNow()                  
                    .withSchedule(calendarIntervalSchedule() 
                            .withIntervalInHours(1)) 
                    .build();	
									
//定义我们的job类
/** * @author dalizu on 2018/11/1. * @version v1.0 * @desc @DisallowConcurrentExecution (测试:如果加此注解,虽然是1秒调度一次withIntervalInSeconds(1),但是还是要一个任务一个任务执行,日志打印会是5s一次, * 一定要等上次任务执行完成,才会执行下次任务) */
@DisallowConcurrentExecution
public class HelloQuartz implements Job {

    public void execute(JobExecutionContext context) throws JobExecutionException {
        JobDetail detail = context.getJobDetail();
        String name = detail.getJobDataMap().getString("name");

        SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("==============>测试 say hello to " + name + " at " + simpleDateFormat.format(new Date()).toString());
    }

}					

配置

  • 我们使用了数据库存储信息,支持集群。
org.quartz.scheduler.instanceName: FayaQuartzScheduler

#调度器实例编号自动生成
org.quartz.scheduler.instanceId = AUTO

# 持久化配置
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX

#quartz相关数据表前缀名
org.quartz.jobStore.tablePrefix = QRTZ_

#开启分布式部署
org.quartz.jobStore.isClustered = true

#分布式节点有效性检查时间间隔,单位:毫秒
org.quartz.jobStore.clusterCheckinInterval = 20000

org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false

#线程池实现类
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 50
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true

org.quartz.jobStore.misfireThreshold: 60000

锁的简单介绍

Quartz使用锁来实现数据库集群
可以看到quartz提供的表
CREATE TABLE `qrtz_locks` (
  `SCHED_NAME` varchar(120) COLLATE utf8mb4_unicode_ci NOT NULL,
  `LOCK_NAME` varchar(40) COLLATE utf8mb4_unicode_ci NOT NULL,
  PRIMARY KEY (`SCHED_NAME`,`LOCK_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

行级锁:每次操作锁住一行数据。开销大,加锁慢;会出现死锁;锁定粒度最小,发生锁冲突的概率最低,并发度也最高
默认情况下,select语句是不会对数据加写锁的,也就是不会阻止写入,通过使用 for update (属于悲观锁)可以对数据加写锁.

Quartz获取锁执行流程
QuartzScheduler被创建时候,会创建一个QuartzSchedulerThread实例,作为调度线程
会去获取未来30s内将会被触发的trigger
triggers = this.qsRsrcs.getJobStore().acquireNextTriggers(now + this.idleWaitTime, Math.min(availThreadCount, this.qsRsrcs.getMaxBatchSize()), this.qsRsrcs.getBatchTimeWindow());

此时会先去获取TRIGGER_ACCESS相关的锁
获取连接
conn = DBConnectionManager.getInstance().getConnection(this.getDataSource());
//关闭mysql数据库的自动提交属性
if (!this.isDontSetAutoCommitFalse()) {
                    conn.setAutoCommit(false);
}

会执行SQL
public static final String SELECT_FOR_LOCK = "SELECT * FROM {0}LOCKS WHERE SCHED_NAME = {1} AND LOCK_NAME = ? FOR UPDATE";


可以看到sql使用的 FOR UPDATE,会锁住查询数据的行。
其他线程执行此操作会被阻塞,直到上一个获取锁的操作提交后。

更多数据库锁和quartz的细节大家可以去查看更多的资料,也可以一起讨论

springboot 整合quartz

/** * quartz的配置 * */
@EnableScheduling
@Configuration
public class QuartzConfiguration {

    @Bean
    public JobFactory jobFactory(ApplicationContext applicationContext)
    {
        AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
        jobFactory.setApplicationContext(applicationContext);
        return jobFactory;
    }

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory,@Qualifier("dataSource") DataSource dataSource){

        SchedulerFactoryBean schedulerFactoryBean=new SchedulerFactoryBean();
        //将spring管理job自定义工厂交由调度器维护
        schedulerFactoryBean.setJobFactory(jobFactory);
        //设置配置文件位置
        schedulerFactoryBean.setConfigLocation(new ClassPathResource("/quartz.properties"));
        //设置覆盖已存在的任务
        schedulerFactoryBean.setOverwriteExistingJobs(true);
        //项目启动完成后,等待2秒后开始执行调度器初始化
        schedulerFactoryBean.setStartupDelay(2);
        //设置调度器自动运行
        schedulerFactoryBean.setAutoStartup(true);

        //设置数据源,使用与项目统一数据源
        schedulerFactoryBean.setDataSource(dataSource);

        return schedulerFactoryBean;
    }

    /** * 继承org.springframework.scheduling.quartz.SpringBeanJobFactory * 实现任务实例化方式 */
    public static class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements
            ApplicationContextAware {

        private transient AutowireCapableBeanFactory beanFactory;
        @Override
        public void setApplicationContext(final ApplicationContext context) {
            beanFactory = context.getAutowireCapableBeanFactory();
        }
        /** * 将job实例交给spring ioc托管 * 我们在job实例实现类内可以直接使用spring注入的调用被spring ioc管理的实例 */
        @Override
        protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
            final Object job = super.createJobInstance(bundle);
            //将job实例交付给spring ioc
            beanFactory.autowireBean(job);
            return job;
        }
    }

}

//配置完成后就可以在service里面引用了
@Autowired
private Scheduler scheduler;

总结

完整代码可以去github上查看:https://github.com/lizu18xz/faya-job
点赞