elastic-job是当当网开源的一个分布式弹性作业框架。
主要功能:
- 分布式:重写Quartz基于数据库的分布式功能,改用Zookeeper实现注册中心。
- 并行调度:采用任务分片方式实现。将一个任务拆分为n个独立的任务项,由分布式的服务器并行执行各自分配到的分片项。
- 弹性扩容缩容:将任务拆分为n个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服务器加入集群,或现有服务器下线,elastic-job将在保留本次任务执行不变的情况下,下次任务开始前触发任务重分片。
- 集中管理:采用基于Zookeeper的注册中心,集中管理和协调分布式作业的状态,分配和监听。外部系统可直接根据Zookeeper的数据管理和监控elastic-job。
- 定制化流程型任务:作业可分为简单和数据流处理两种模式,数据流又分为高吞吐处理模式和顺序性处理模式,其中高吞吐处理模式可以开启足够多的线程快速的处理数据,而顺序性处理模式将每个分片项分配到一个独立线程,用于保证同一分片的顺序性,这点类似于kafka的分区顺序性。
快速入门
1、maven依赖
<!-- elastic-job -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.0.5</version>
</dependency>
2、定义job
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
/**
* ${DESCRIPTION}
*
* @author Ricky Fung
* @date 2017-02-28 20:15
*/
public class DemoElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
System.out.println("execute...");
switch (context.getShardingItem()) {
case 0:
// do something by sharding item 0
break;
case 1:
// do something by sharding item 1
break;
case 2:
// do something by sharding item 2
break;
// case n: ...
}
}
}
3、job配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd
">
<!-- 引入配置文件 -->
<util:properties id="zk" location="classpath:zk_config.properties"/>
<!--configure registry center -->
<reg:zookeeper id="regCenter" server-lists="#{zk['host']}" namespace="#{zk['namespace']}"
base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
<!--configure job -->
<job:simple id="demoElasticJob" class="com.bytebeats.elasticjob.sample.job.DemoElasticJob"
registry-center-ref="regCenter"
cron="* 0/5 * * * ?"
sharding-total-count="1" />
</beans>