flume启动源码分析

flume启动代码分析记录

具体结合如下配置文件进行讲解,配置文件如下所示:

agent1.sources=source1
agent1.channels=channel1
agent1.sinks=sink1

agent1.sources.source1.type = TAILDIR
agent1.sources.source1.channels = channel1
agent1.sources.source1.channels.skipToEnd = True
agent1.sources.source1.positionFile = /home/flume/data/taildir_position.json
agent1.sources.source1.filegroups = f1
agent1.sources.source1.filegroups.f1 = /home/flume/test/logs/abc.log
agent1.sources.source1.headers.f1.headerKey1 = value1
agent1.sources.source1.fileHeader = true
  
agent1.channels.channel1.type=memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapacity = 1000
  
agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink1.brokerList=a:9092,b:9092,c:9092
agent1.sinks.sink1.topic=connect-test-one
agent1.sinks.sink1.requiredAcks = 1
agent1.sinks.sink1.batchSize = 20
agent1.sinks.sink1.channel = channel1      

1.Application.java–>main方法: 先根据命令行内容加载相应的文件类别:

try {
      boolean isZkConfigured = false;
      Options options = new Options();
      Option option = new Option("n", "name", true, "the name of this agent");
      option.setRequired(true);
      options.addOption(option);
      option = new Option("f", "conf-file", true,
          "specify a config file (required if -z missing)");
      option.setRequired(false);
      options.addOption(option);
      option = new Option(null, "no-reload-conf", false,
          "do not reload config file if changed");
      options.addOption(option);
      // Options for Zookeeper
      option = new Option("z", "zkConnString", true,
          "specify the ZooKeeper connection to use (required if -f missing)");
      option.setRequired(false);
      options.addOption(option);
      option = new Option("p", "zkBasePath", true,
          "specify the base path in ZooKeeper for agent configs");
      option.setRequired(false);
      options.addOption(option);
      option = new Option("h", "help", false, "display help text");
      options.addOption(option);
      CommandLineParser parser = new GnuParser();
      CommandLine commandLine = parser.parse(options, args);
      ...略

此处仅是agent配置信息的读取,不做介绍。接下来才是具体解析配置文件的过程:

...略
if (reload) {
//111
          EventBus eventBus = new EventBus(agentName + "-event-bus");
          PollingPropertiesFileConfigurationProvider configurationProvider =
              new PollingPropertiesFileConfigurationProvider(
                  agentName, configurationFile, eventBus, 30);
          components.add(configurationProvider);
          application = new Application(components);
         eventBus.register(application);
        } else {
       //2222 
          PropertiesFileConfigurationProvider configurationProvider =
              new PropertiesFileConfigurationProvider(agentName, configurationFile);
          application = new Application();
          application.handleConfigurationEvent(configurationProvider.getConfiguration());
        }
      }
      application.start();
      final Application appReference = application;
      Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
        @Override
        public void run() {
          appReference.stop();
        }
      });

    } catch (Exception e) {
      logger.error("A fatal error occurred while running. Exception follows.", e);
    }
    ...略

//111此处的reload是boolean类型,用来判断是否动态加载配置文件的标示(间隔30s),如果设置为true,则设计EventBus块的内容,详情见博文[Guava学习笔记:EventBus] 接下来主要讲解不循环加载配置文件的类型,也就是代码else处的代码//2222 此处利用

 PropertiesFileConfigurationProvider configurationProvider =
              new PropertiesFileConfigurationProvider(agentName, configurationFile);

生成了configurationProvider对象,主要主要生成下面几个对象

public AbstractConfigurationProvider(String agentName) {
    super();
    this.agentName = agentName;
    this.sourceFactory = new DefaultSourceFactory();
    this.sinkFactory = new DefaultSinkFactory();
    this.channelFactory = new DefaultChannelFactory();

    channelCache = new HashMap<Class<? extends Channel>, Map<String, Channel>>();
  }

然后主要利用:

application.handleConfigurationEvent(configurationProvider.getConfiguration());

加载配置文件及形成components,接下来主要围绕这行代码进行讲解

  public MaterializedConfiguration getConfiguration() {
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
    // getFlumeConfiguration()方法,是关键核心,负责整个配置加载,下边代码说明
    FlumeConfiguration fconfig = getFlumeConfiguration();
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
    if (agentConf != null) {
      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
      try {
        loadChannels(agentConf, channelComponentMap);
        loadSources(agentConf, channelComponentMap, sourceRunnerMap);
        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
        Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
        for (String channelName : channelNames) {
          ChannelComponent channelComponent = channelComponentMap.get(channelName);
          if (channelComponent.components.isEmpty()) {
            LOGGER.warn(String.format("Channel %s has no components connected" +
                " and has been removed.", channelName));
            channelComponentMap.remove(channelName);
            Map<String, Channel> nameChannelMap =
                channelCache.get(channelComponent.channel.getClass());
            if (nameChannelMap != null) {
              nameChannelMap.remove(channelName);
            }
          } else {
            LOGGER.info(String.format("Channel %s connected to %s",
                channelName, channelComponent.components.toString()));
            conf.addChannel(channelName, channelComponent.channel);
          }
        }
        for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
          conf.addSourceRunner(entry.getKey(), entry.getValue());
        }
        for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
          conf.addSinkRunner(entry.getKey(), entry.getValue());
        }
      } catch (InstantiationException ex) {
        LOGGER.error("Failed to instantiate component", ex);
      } finally {
        channelComponentMap.clear();
        sourceRunnerMap.clear();
        sinkRunnerMap.clear();
      }
    } else {
      LOGGER.warn("No configuration found for this host:{}", getAgentName());
    }
    return conf;
  }

new FlumeConfiguration(toMap(properties)),代码在下边:

//读取配置文件并转换为map,从而转换为FlumeConfiguration对象
  @Override
  public FlumeConfiguration getFlumeConfiguration() {
    BufferedReader reader = null;
    try {
      reader = new BufferedReader(new FileReader(file));
      String resolverClassName = System.getProperty("propertiesImplementation",
          DEFAULT_PROPERTIES_IMPLEMENTATION);
      Class<? extends Properties> propsclass = Class.forName(resolverClassName)
          .asSubclass(Properties.class);
      Properties properties = propsclass.newInstance();
      properties.load(reader);
      return new FlumeConfiguration(toMap(properties));
    } catch (IOException ex) {
      LOGGER.error("Unable to load file:" + file
          + " (I/O failure) - Exception follows.", ex);
    } catch (ClassNotFoundException e) {
      LOGGER.error("Configuration resolver class not found", e);
    } catch (InstantiationException e) {
      LOGGER.error("Instantiation exception", e);
    } catch (IllegalAccessException e) {
      LOGGER.error("Illegal access exception", e);
    } finally {
      if (reader != null) {
        try {
          reader.close();
        } catch (IOException ex) {
          LOGGER.warn(
              "Unable to close file reader for file: " + file, ex);
        }
      }
    }
    return new FlumeConfiguration(new HashMap<String, String>());
  }

new FlumeConfiguration(Map<String, String> properties)的代码如下:

/**
   * Creates a populated Flume Configuration object.
   */
  public FlumeConfiguration(Map<String, String> properties) {
    agentConfigMap = new HashMap<String, AgentConfiguration>();
    errors = new LinkedList<FlumeConfigurationError>();
    // Construct the in-memory component hierarchy
    for (String name : properties.keySet()) {
      String value = properties.get(name);
// addRawProperty里对agentConfigMap初始化,  
      // 1:这里插入的是agentConfiguration对象里的contextMap
      if (!addRawProperty(name, value)) {
        logger.warn("Configuration property ignored: " + name + " = " + value);
      }
    }
    // Now iterate thru the agentContext and create agent configs and add them
    // to agentConfigMap
   //这里插入的是agentConfiguration对象里的configMap
    // validate and remove improperly configured components
    //2
    validateConfiguration();
  }

上面代码最重要的有两处,分别为1处的addRawProperty和2处的validateConfiguration,接下来分别进行分析

  1. 进入FlumeConfiguration.addRawProperty(name,value):
private boolean addRawProperty(String name, String value) {
    // Null names and values not supported
    if (name == null || value == null) {
      errors
          .add(new FlumeConfigurationError("", "",
              FlumeConfigurationErrorType.AGENT_NAME_MISSING,
              ErrorOrWarning.ERROR));
      return false;
    }
    // Empty values are not supported
    if (value.trim().length() == 0) {
      errors
          .add(new FlumeConfigurationError(name, "",
              FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
              ErrorOrWarning.ERROR));
      return false;
    }
    // Remove leading and trailing spaces
    name = name.trim();
    value = value.trim();
    int index = name.indexOf('.');//获得每一个key的第一个`.`处的索引
    // All configuration keys must have a prefix defined as agent name
    if (index == -1) {
      errors
          .add(new FlumeConfigurationError(name, "",
              FlumeConfigurationErrorType.AGENT_NAME_MISSING,
              ErrorOrWarning.ERROR));
      return false;
    }
    String agentName = name.substring(0, index);//根据`.`切割出来agentName的名称。
    // Agent name must be specified for all properties
    if (agentName.length() == 0) {
      errors
          .add(new FlumeConfigurationError(name, "",
              FlumeConfigurationErrorType.AGENT_NAME_MISSING,
              ErrorOrWarning.ERROR));
      return false;
    }
    String configKey = name.substring(index + 1);//333然后将所有的key值都将agentName切分出去
    // Configuration key must be specified for every property
    if (configKey.length() == 0) {
      errors
          .add(new FlumeConfigurationError(name, "",
              FlumeConfigurationErrorType.PROPERTY_NAME_NULL,
              ErrorOrWarning.ERROR));
      return false;
    }
    AgentConfiguration aconf = agentConfigMap.get(agentName);
    // 这里创建AgentConfiguration,并插入到FlumeConfiguration的Map<String, AgentConfiguration> agentConfigMap中。此处的AgentConfiguration对象就是该agentName对应的所有agent的配置信息。具体可深入看下
    if (aconf == null) {
      aconf = new AgentConfiguration(agentName, errors);
      agentConfigMap.put(agentName, aconf);
    }
    // Each configuration key must begin with one of the three prefixes:
    // sources, sinks, or channels.
    // 最终,键值对被加载到agentConfiguration中,此处是最重要的。
    return aconf.addProperty(configKey, value);
  }

经过切分后的//33处的配置文件应该变为:

//这里应该是map形式的,按`=`分割。
sources=source1
channels=channel1
sinks=sink1

sources.source1.type = TAILDIR
sources.source1.channels = channel1
sources.source1.channels.skipToEnd = True
sources.source1.positionFile = /home/flume/data/taildir_position.json
sources.source1.filegroups = f1
sources.source1.filegroups.f1 = /home/flume/test/logs/abc.log
sources.source1.headers.f1.headerKey1 = value1
sources.source1.fileHeader = true

channels.channel1.type=memory
channels.channel1.capacity = 1000
channels.channel1.transactionCapacity = 1000

sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
sinks.sink1.brokerList=a:9092,b:9092,c:9092
sinks.sink1.topic=connect-test-one
sinks.sink1.requiredAcks = 1
sinks.sink1.batchSize = 20
sinks.sink1.channel = channel1 

之后的代码如下:

    private boolean addProperty(String key, String value) {
      // Check for configFilters
      if (CONFIG_CONFIGFILTERS.equals(key)) {
        if (configFilters == null) {
          configFilters = value;
          return true;
        } else {
          LOGGER.warn("Duplicate configfilter list specified for agent: {}", agentName);
          addError(CONFIG_CONFIGFILTERS, DUPLICATE_PROPERTY, ERROR);
          return false;
        }
      }
      // Check for sources
      if (CONFIG_SOURCES.equals(key)) {
        if (sources == null) {
          sources = value;//获取sources的值
          return true;
        } else {
          LOGGER.warn("Duplicate source list specified for agent: {}", agentName);
          addError(CONFIG_SOURCES, DUPLICATE_PROPERTY, ERROR);
          return false;
        }
      }
      // Check for sinks
      if (CONFIG_SINKS.equals(key)) {
        if (sinks == null) {
          sinks = value;//获取sinks的值
          LOGGER.info("Added sinks: {} Agent: {}", sinks, agentName);
          return true;
        } else {
          LOGGER.warn("Duplicate sink list specfied for agent: {}", agentName);
          addError(CONFIG_SINKS, DUPLICATE_PROPERTY, ERROR);
          return false;
        }
      }
      // Check for channels
      if (CONFIG_CHANNELS.equals(key)) {
        if (channels == null) {
          channels = value;//获取channels的值
          return true;
        } else {
          LOGGER.warn("Duplicate channel list specified for agent: {}", agentName);
          addError(CONFIG_CHANNELS, DUPLICATE_PROPERTY, ERROR);
          return false;
        }
      }
      // Check for sinkgroups
      if (CONFIG_SINKGROUPS.equals(key)) {
        if (sinkgroups == null) {
          sinkgroups = value;//获取消费组的值
          return true;
        } else {
          LOGGER.warn("Duplicate sinkgroup list specfied for agent: {}", agentName);
          addError(CONFIG_SINKGROUPS, DUPLICATE_PROPERTY, ERROR);
          return false;
        }
      }
      //重点在这*******,其中的五个方法分别对各自的compoent进行配置的读取,
      if (addAsSourceConfig(key, value)
          || addAsChannelValue(key, value)
          || addAsSinkConfig(key, value)
          || addAsSinkGroupConfig(key, value)
          || addAsConfigFilterConfig(key, value)
      ) {
        return true;
      }
      LOGGER.warn("Invalid property specified: {}", key);
      addError(key, INVALID_PROPERTY, ERROR);
      return false;
    }

此处以addAsSourceConfig(key, value)为例来说明,代码如下:

//此处代码从上向下依次深入,可以从下向上看。
//1
private boolean addAsSourceConfig(String key, String value) {
      return addComponentConfig(
          key, value, CONFIG_SOURCES_PREFIX, sourceContextMap
      );
    }
 --->//2
 private boolean addComponentConfig(
        String key, String value, String configPrefix, Map<String, Context> contextMap
    ) {
      ComponentNameAndConfigKey parsed = parseConfigKey(key, configPrefix);
      if (parsed != null) {
        String name = parsed.getComponentName().trim();//获得sourc等组件的名称
        LOGGER.info("Processing:{}", name);
        Context context = contextMap.get(name);
        if (context == null) {
          LOGGER.debug("Created context for {}: {}", name, parsed.getConfigKey());
          context = new Context();
          contextMap.put(name, context);//将组件名及key后面的值存到source的Context上下文中。
        }
        context.put(parsed.getConfigKey(), value);//将组件信息及
        return true;
      }
      return false;
    }
 --->//3
  private ComponentNameAndConfigKey parseConfigKey(String key, String prefix) {
  //例如<sources.source1.type , TAILDIR>
      // key must start with prefix
      if (!key.startsWith(prefix)) {//key:sources.source1.type,是以sources.开头的
        return null;
      }
      // key must have a component name part after the prefix of the format:
      // <prefix><component-name>.<config-key>
      int index = key.indexOf('.', prefix.length() + 1);
      if (index == -1) {
        return null;
      }
      String name = key.substring(prefix.length(), index);//将source1从key中切出来->source.name
      String configKey = key.substring(prefix.length() + name.length() + 1);//configKey就是type了
      // name and config key must be non-empty
      if (name.isEmpty() || configKey.isEmpty()) {
        return null;
      }
      return new ComponentNameAndConfigKey(name, configKey);//然后将其组成ComponentNameAndConfigKey对象
    }
    -->//4
      private ComponentNameAndConfigKey(String name, String configKey) {
      this.componentName = name;
      this.configKey = configKey;
    }

然后又回到//2处的if (parsed != null)处了。进入深入。

所以最后是加载到AgentConfiguration中对应的*ContextMap中了;

  1. 最后在代码中还有一处
// validate and remove improperly configured components
validateConfiguration();

对应详细代码有:

  private void validateConfiguration() {
    Iterator<String> it = agentConfigMap.keySet().iterator();
    while (it.hasNext()) {
      String agentName = it.next();
      AgentConfiguration aconf = agentConfigMap.get(agentName);
      if (!aconf.isValid()) {
        logger.warn("Agent configuration invalid for agent '" + agentName
            + "'. It will be removed.");
        errors.add(new FlumeConfigurationError(agentName, "",
            FlumeConfigurationErrorType.AGENT_CONFIGURATION_INVALID,
            ErrorOrWarning.ERROR));
        it.remove();
      }
      logger.debug("Channels:" + aconf.channels + "\n");
      logger.debug("Sinks " + aconf.sinks + "\n");
      logger.debug("Sources " + aconf.sources + "\n");
    }

    logger.info("Post-validation flume configuration contains configuration"
        + " for agents: " + agentConfigMap.keySet());
  }

aconf.isValid()

    private boolean isValid() {
      logger.debug("Starting validation of configuration for agent: {}", agentName);
      if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
        logger.debug("Initial configuration: {}", this.getPrevalidationConfig());
      }
      // Make sure that at least one channel is specified
      if (channels == null || channels.trim().length() == 0) {
        logger.warn("Agent configuration for '" + agentName
            + "' does not contain any channels. Marking it as invalid.");
        errorList.add(new FlumeConfigurationError(agentName,
            BasicConfigurationConstants.CONFIG_CHANNELS,
            FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
            ErrorOrWarning.ERROR));
        return false;
      }
      channelSet =
          new HashSet<String>(Arrays
              .asList(channels.split("\\s+")));
      // validateComponent(channelSet, channelConfigMap, CLASS_CHANNEL,
      // ATTR_TYPE);
      channelSet = validateChannels(channelSet);
      if (channelSet.size() == 0) {
        logger.warn("Agent configuration for '" + agentName
            + "' does not contain any valid channels. Marking it as invalid.");
        errorList.add(new FlumeConfigurationError(agentName,
            BasicConfigurationConstants.CONFIG_CHANNELS,
            FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
            ErrorOrWarning.ERROR));
        return false;
      }

      sourceSet = validateSources(channelSet);
      sinkSet = validateSinks(channelSet);
      sinkgroupSet = validateGroups(sinkSet);

      // If no sources or sinks are present, then this is invalid
      if (sourceSet.size() == 0 && sinkSet.size() == 0) {
        logger.warn("Agent configuration for '" + agentName
            + "' has no sources or sinks. Will be marked invalid.");
        errorList.add(new FlumeConfigurationError(agentName,
            BasicConfigurationConstants.CONFIG_SOURCES,
            FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
            ErrorOrWarning.ERROR));
        errorList.add(new FlumeConfigurationError(agentName,
            BasicConfigurationConstants.CONFIG_SINKS,
            FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
            ErrorOrWarning.ERROR));
        return false;
      }
      // Now rewrite the sources/sinks/channels
      this.sources = getSpaceDelimitedList(sourceSet);
      this.channels = getSpaceDelimitedList(channelSet);
      this.sinks = getSpaceDelimitedList(sinkSet);
      this.sinkgroups = getSpaceDelimitedList(sinkgroupSet);

      if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
        logger.debug("Post validation configuration for {}", agentName);
        logger.debug(this.getPostvalidationConfig());
      }
      return true;
    }

validateChannels

private Set<String> validateChannels(Set<String> channelSet) {
      Iterator<String> iter = channelSet.iterator();
      Map<String, Context> newContextMap = new HashMap<String, Context>();
      ChannelConfiguration conf = null;
      while (iter.hasNext()) {
        String channelName = iter.next();
        Context channelContext = channelContextMap.get(channelName);
        // Context exists in map.
        if (channelContext != null) {
          // Get the configuration object for the channel:
          ChannelType chType = getKnownChannel(channelContext.getString(
              BasicConfigurationConstants.CONFIG_TYPE));
          boolean configSpecified = false;
          String config = null;
          // Not a known channel - cannot do specific validation to this channel
          if (chType == null) {
            config = channelContext.getString(BasicConfigurationConstants.CONFIG_CONFIG);
            if (config == null || config.isEmpty()) {
              config = "OTHER";
            } else {
              configSpecified = true;
            }
          } else {
            config = chType.toString().toUpperCase(Locale.ENGLISH);
            configSpecified = true;
          }

          try {
          //建立channel组件
            conf =
                (ChannelConfiguration) ComponentConfigurationFactory.create(
                    channelName, config, ComponentType.CHANNEL);
            logger.debug("Created channel " + channelName);
            if (conf != null) {
            //配置组件信息
              conf.configure(channelContext);
            }
            if ((configSpecified && conf.isNotFoundConfigClass()) ||
                !configSpecified) {
              newContextMap.put(channelName, channelContext);
            } else if (configSpecified) {
              channelConfigMap.put(channelName, conf);
            }
            if (conf != null) {
              errorList.addAll(conf.getErrors());
            }
          } catch (ConfigurationException e) {
            // Could not configure channel - skip it.
            // No need to add to error list - already added before exception is
            // thrown
            if (conf != null) errorList.addAll(conf.getErrors());
            iter.remove();
            logger.warn("Could not configure channel " + channelName
                + " due to: " + e.getMessage(), e);

          }
        } else {
          iter.remove();
          errorList.add(new FlumeConfigurationError(agentName, channelName,
              FlumeConfigurationErrorType.CONFIG_ERROR, ErrorOrWarning.ERROR));
        }
      }
      channelContextMap = newContextMap;
      Set<String> tempchannelSet = new HashSet<String>();
      tempchannelSet.addAll(channelConfigMap.keySet());
      tempchannelSet.addAll(channelContextMap.keySet());
      channelSet.retainAll(tempchannelSet);
      return channelSet;
    }

至此,一个完整的FlumeConfiguration对象已经完全解析好了;下面继续; 接下来开始创建组件了

AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());//得到对应agent的所有组件的信息
    if (agentConf != null) {
      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
      try {
        loadChannels(agentConf, channelComponentMap);
        loadSources(agentConf, channelComponentMap, sourceRunnerMap);
        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
        Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
        for (String channelName : channelNames) {
          ChannelComponent channelComponent = channelComponentMap.get(channelName);
          if (channelComponent.components.isEmpty()) {
            LOGGER.warn(String.format("Channel %s has no components connected" +
                " and has been removed.", channelName));
            channelComponentMap.remove(channelName);
            Map<String, Channel> nameChannelMap =
                channelCache.get(channelComponent.channel.getClass());
            if (nameChannelMap != null) {
              nameChannelMap.remove(channelName);
            }
          } else {
            LOGGER.info(String.format("Channel %s connected to %s",
                channelName, channelComponent.components.toString()));
            conf.addChannel(channelName, channelComponent.channel);
          }
        }
        for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
          conf.addSourceRunner(entry.getKey(), entry.getValue());
        }
        for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
          conf.addSinkRunner(entry.getKey(), entry.getValue());
        }
      } catch (InstantiationException ex) {
        LOGGER.error("Failed to instantiate component", ex);
      } finally {
        channelComponentMap.clear();
        sourceRunnerMap.clear();
        sinkRunnerMap.clear();
      }
    }

此处以channel为例说明问题。

  private void loadChannels(AgentConfiguration agentConf,
      Map<String, ChannelComponent> channelComponentMap)
          throws InstantiationException {
    LOGGER.info("Creating channels");
    ListMultimap<Class<? extends Channel>, String> channelsNotReused =
        ArrayListMultimap.create();
    // assume all channels will not be re-used
    for (Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry :
         channelCache.entrySet()) {
      Class<? extends Channel> channelKlass = entry.getKey();
      Set<String> channelNames = entry.getValue().keySet();
      channelsNotReused.get(channelKlass).addAll(channelNames);
    }

    Set<String> channelNames = agentConf.getChannelSet();
    Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
    /*
     * Components which have a ComponentConfiguration object
     */
    for (String chName : channelNames) {
      ComponentConfiguration comp = compMap.get(chName);
      if (comp != null) {
        Channel channel = getOrCreateChannel(channelsNotReused,
            comp.getComponentName(), comp.getType());
        try {
          Configurables.configure(channel, comp);
          channelComponentMap.put(comp.getComponentName(),
              new ChannelComponent(channel));
          LOGGER.info("Created channel " + chName);
        } catch (Exception e) {
          String msg = String.format("Channel %s has been removed due to an " +
              "error during configuration", chName);
          LOGGER.error(msg, e);
        }
      }
    }
    /*
     * Components which DO NOT have a ComponentConfiguration object
     * and use only Context
     */
    for (String chName : channelNames) {
      Context context = agentConf.getChannelContext().get(chName);
      if (context != null) {
        Channel channel = getOrCreateChannel(channelsNotReused, chName,
            context.getString(BasicConfigurationConstants.CONFIG_TYPE));
        try {
          Configurables.configure(channel, context);
          channelComponentMap.put(chName, new ChannelComponent(channel));
          LOGGER.info("Created channel " + chName);
        } catch (Exception e) {
          String msg = String.format("Channel %s has been removed due to an " +
              "error during configuration", chName);
          LOGGER.error(msg, e);
        }
      }
    }
    /*
     * Any channel which was not re-used, will have it's reference removed
     */
    for (Class<? extends Channel> channelKlass : channelsNotReused.keySet()) {
      Map<String, Channel> channelMap = channelCache.get(channelKlass);
      if (channelMap != null) {
        for (String channelName : channelsNotReused.get(channelKlass)) {
          if (channelMap.remove(channelName) != null) {
            LOGGER.info("Removed {} of type {}", channelName, channelKlass);
          }
        }
        if (channelMap.isEmpty()) {
          channelCache.remove(channelKlass);
        }
      }
    }
  }

上述代码是创建channel的。

参看:

https://www.cnblogs.com/aquariusm/p/6118976.html

    原文作者:人工智能
    原文地址: https://my.oschina.net/112612/blog/3040481#comments
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞