分布式作业系统 Elastic-Job-Cloud 源码分析 —— 本地运行模式

摘要: 原创出处 www.iocoder.cn/Elastic-Job… 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享

《分布式作业系统 Elastic-Job-Cloud 源码分析 —— 本地运行模式》

🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Elastic-Job-Cloud 本地运行模式,对应《官方文档 —— 本地运行模式》

有什么用呢?引用官方解答:

在开发 Elastic-Job-Cloud 作业时,开发人员可以脱离 Mesos 环境,在本地运行和调试作业。可以利用本地运行模式充分的调试业务功能以及单元测试,完成之后再部署至 Mesos 集群。
本地运行作业无需安装 Mesos 环境。

😈 是不是很赞 + 1024?!

本文涉及到主体类的类图如下( 打开大图 ):

《分布式作业系统 Elastic-Job-Cloud 源码分析 —— 本地运行模式》

你行好事会因为得到赞赏而愉悦
同理,开源项目贡献者会因为 Star 而更加有动力
为 Elastic-Job 点赞!传送门

2. 配置

LocalCloudJobConfiguration,本地云作业配置,在《Elastic-Job-Cloud 源码分析 —— 作业配置》「3.2 本地云作业配置」有详细解析。

创建本地云作业配置示例代码如下(来自官方):

LocalCloudJobConfiguration config = new LocalCloudJobConfiguration(
    new SimpleJobConfiguration(
    // 配置作业类型和作业基本信息
    JobCoreConfiguration.newBuilder("FooJob", "*/2 * * * * ?", 3) 
        .shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
        .jobParameter("dbName=dangdang").build(), "com.dangdang.foo.FooJob"),
        // 配置当前运行的作业是第几个分片 
        1,  
        // 配置Spring相关参数。如果不配置,代表不使用 Spring 配置。
        "testSimpleJob" , "applicationContext.xml");

3. 运行

LocalTaskExecutor,本地作业执行器。

创建本地作业执行器示例代码如下(来自官方):

new LocalTaskExecutor(localJobConfig).execute();

可以看到,调用 LocalTaskExecutor#execute() 方法,执行作业逻辑,实现代码如下:

// LocalTaskExecutor.java
public void execute() {
   AbstractElasticJobExecutor jobExecutor;
   CloudJobFacade jobFacade = new CloudJobFacade(getShardingContexts(), getJobConfigurationContext(), new JobEventBus());
   // 创建执行器
   switch (localCloudJobConfiguration.getTypeConfig().getJobType()) {
       case SIMPLE:
           jobExecutor = new SimpleJobExecutor(getJobInstance(SimpleJob.class), jobFacade);
           break;
       case DATAFLOW:
           jobExecutor = new DataflowJobExecutor(getJobInstance(DataflowJob.class), jobFacade);
           break;
       case SCRIPT:
           jobExecutor = new ScriptJobExecutor(jobFacade);
           break;
       default:
           throw new UnsupportedOperationException(localCloudJobConfiguration.getTypeConfig().getJobType().name());
   }
   // 执行作业
   jobExecutor.execute();
}
  • 调用 #getShardingContexts() 方法,创建分片上下文集合( ShardingContexts ),实现代码如下:

    private ShardingContexts getShardingContexts() {
       JobCoreConfiguration coreConfig = localCloudJobConfiguration.getTypeConfig().getCoreConfig();
       Map<Integer, String> shardingItemMap = new HashMap<>(1, 1);
       shardingItemMap.put(localCloudJobConfiguration.getShardingItem(),
               new ShardingItemParameters(coreConfig.getShardingItemParameters()).getMap().get(localCloudJobConfiguration.getShardingItem()));
       return new ShardingContexts(
               // taskId 👇
               Joiner.on("@-@").join(localCloudJobConfiguration.getJobName(), localCloudJobConfiguration.getShardingItem(), "READY", "foo_slave_id", "foo_uuid"),
               localCloudJobConfiguration.getJobName(), coreConfig.getShardingTotalCount(), coreConfig.getJobParameter(), shardingItemMap);
    }
  • 调用 #getJobConfigurationContext() 方法,创建内部的作业配置上下文( JobConfigurationContext ),实现代码如下:

    private JobConfigurationContext getJobConfigurationContext() {
       Map<String, String> jobConfigurationMap = new HashMap<>();
       jobConfigurationMap.put("jobClass", localCloudJobConfiguration.getTypeConfig().getJobClass());
       jobConfigurationMap.put("jobType", localCloudJobConfiguration.getTypeConfig().getJobType().name());
       jobConfigurationMap.put("jobName", localCloudJobConfiguration.getJobName());
       jobConfigurationMap.put("beanName", localCloudJobConfiguration.getBeanName());
       jobConfigurationMap.put("applicationContext", localCloudJobConfiguration.getApplicationContext());
       if (JobType.DATAFLOW == localCloudJobConfiguration.getTypeConfig().getJobType()) { // 数据流作业
           jobConfigurationMap.put("streamingProcess", Boolean.toString(((DataflowJobConfiguration) localCloudJobConfiguration.getTypeConfig()).isStreamingProcess()));
       } else if (JobType.SCRIPT == localCloudJobConfiguration.getTypeConfig().getJobType()) { // 脚本作业
           jobConfigurationMap.put("scriptCommandLine", ((ScriptJobConfiguration) localCloudJobConfiguration.getTypeConfig()).getScriptCommandLine());
       }
       return new JobConfigurationContext(jobConfigurationMap);
    }
  • 调用 #getJobInstance(...) 方法, 获得分布式作业( ElasticJob )实现实例,实现代码如下:

    private <T extends ElasticJob> T getJobInstance(final Class<T> clazz) {
       Object result;
       if (Strings.isNullOrEmpty(localCloudJobConfiguration.getApplicationContext())) { // 直接创建 ElasticJob
           String jobClass = localCloudJobConfiguration.getTypeConfig().getJobClass();
           try {
               result = Class.forName(jobClass).newInstance();
           } catch (final ReflectiveOperationException ex) {
               throw new JobSystemException("Elastic-Job: Class '%s' initialize failure, the error message is '%s'.", jobClass, ex.getMessage());
           }
       } else { // Spring 环境获得 ElasticJob
           result = new ClassPathXmlApplicationContext(localCloudJobConfiguration.getApplicationContext()).getBean(localCloudJobConfiguration.getBeanName());
       }
       return clazz.cast(result);
    }
  • 调用 AbstractElasticJobExecutor#execute() 方法,执行作业逻辑。 Elastic-Job-Lite 和 Elastic-Job-Cloud 作业执行基本一致,在《Elastic-Job-Lite 源码分析 —— 作业执行》有详细解析。

666. 彩蛋

芋道君:可能有点水更,和大家实际开发太相关,想想还是更新下。
旁白君:哎哟哟,哎哟喂。

《分布式作业系统 Elastic-Job-Cloud 源码分析 —— 本地运行模式》

道友,赶紧上车,分享一波朋友圈!

    原文作者:HashMap源码分析
    原文地址: https://juejin.im/entry/5a24750951882575cb73ed49
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞