flume启动代码分析记录
具体结合如下配置文件进行讲解,配置文件如下所示:
agent1.sources=source1
agent1.channels=channel1
agent1.sinks=sink1
agent1.sources.source1.type = TAILDIR
agent1.sources.source1.channels = channel1
agent1.sources.source1.channels.skipToEnd = True
agent1.sources.source1.positionFile = /home/flume/data/taildir_position.json
agent1.sources.source1.filegroups = f1
agent1.sources.source1.filegroups.f1 = /home/flume/test/logs/abc.log
agent1.sources.source1.headers.f1.headerKey1 = value1
agent1.sources.source1.fileHeader = true
agent1.channels.channel1.type=memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapacity = 1000
agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink1.brokerList=a:9092,b:9092,c:9092
agent1.sinks.sink1.topic=connect-test-one
agent1.sinks.sink1.requiredAcks = 1
agent1.sinks.sink1.batchSize = 20
agent1.sinks.sink1.channel = channel1
1.Application.java–>main方法: 先根据命令行内容加载相应的文件类别:
try {
boolean isZkConfigured = false;
Options options = new Options();
Option option = new Option("n", "name", true, "the name of this agent");
option.setRequired(true);
options.addOption(option);
option = new Option("f", "conf-file", true,
"specify a config file (required if -z missing)");
option.setRequired(false);
options.addOption(option);
option = new Option(null, "no-reload-conf", false,
"do not reload config file if changed");
options.addOption(option);
// Options for Zookeeper
option = new Option("z", "zkConnString", true,
"specify the ZooKeeper connection to use (required if -f missing)");
option.setRequired(false);
options.addOption(option);
option = new Option("p", "zkBasePath", true,
"specify the base path in ZooKeeper for agent configs");
option.setRequired(false);
options.addOption(option);
option = new Option("h", "help", false, "display help text");
options.addOption(option);
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);
...略
此处仅是agent配置信息的读取,不做介绍。接下来才是具体解析配置文件的过程:
...略
if (reload) {
//111
EventBus eventBus = new EventBus(agentName + "-event-bus");
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(
agentName, configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components);
eventBus.register(application);
} else {
//2222
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName, configurationFile);
application = new Application();
application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
}
application.start();
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
@Override
public void run() {
appReference.stop();
}
});
} catch (Exception e) {
logger.error("A fatal error occurred while running. Exception follows.", e);
}
...略
//111
此处的reload是boolean类型,用来判断是否动态加载配置文件的标示(间隔30s),如果设置为true,则设计EventBus块的内容,详情见博文[Guava学习笔记:EventBus] 接下来主要讲解不循环加载配置文件的类型,也就是代码else处的代码//2222
此处利用
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName, configurationFile);
生成了configurationProvider对象,主要主要生成下面几个对象
public AbstractConfigurationProvider(String agentName) {
super();
this.agentName = agentName;
this.sourceFactory = new DefaultSourceFactory();
this.sinkFactory = new DefaultSinkFactory();
this.channelFactory = new DefaultChannelFactory();
channelCache = new HashMap<Class<? extends Channel>, Map<String, Channel>>();
}
然后主要利用:
application.handleConfigurationEvent(configurationProvider.getConfiguration());
加载配置文件及形成components,接下来主要围绕这行代码进行讲解
public MaterializedConfiguration getConfiguration() {
MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
// getFlumeConfiguration()方法,是关键核心,负责整个配置加载,下边代码说明
FlumeConfiguration fconfig = getFlumeConfiguration();
AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
if (agentConf != null) {
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
try {
loadChannels(agentConf, channelComponentMap);
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
for (String channelName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.get(channelName);
if (channelComponent.components.isEmpty()) {
LOGGER.warn(String.format("Channel %s has no components connected" +
" and has been removed.", channelName));
channelComponentMap.remove(channelName);
Map<String, Channel> nameChannelMap =
channelCache.get(channelComponent.channel.getClass());
if (nameChannelMap != null) {
nameChannelMap.remove(channelName);
}
} else {
LOGGER.info(String.format("Channel %s connected to %s",
channelName, channelComponent.components.toString()));
conf.addChannel(channelName, channelComponent.channel);
}
}
for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
} catch (InstantiationException ex) {
LOGGER.error("Failed to instantiate component", ex);
} finally {
channelComponentMap.clear();
sourceRunnerMap.clear();
sinkRunnerMap.clear();
}
} else {
LOGGER.warn("No configuration found for this host:{}", getAgentName());
}
return conf;
}
new FlumeConfiguration(toMap(properties)),代码在下边:
//读取配置文件并转换为map,从而转换为FlumeConfiguration对象
@Override
public FlumeConfiguration getFlumeConfiguration() {
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(file));
String resolverClassName = System.getProperty("propertiesImplementation",
DEFAULT_PROPERTIES_IMPLEMENTATION);
Class<? extends Properties> propsclass = Class.forName(resolverClassName)
.asSubclass(Properties.class);
Properties properties = propsclass.newInstance();
properties.load(reader);
return new FlumeConfiguration(toMap(properties));
} catch (IOException ex) {
LOGGER.error("Unable to load file:" + file
+ " (I/O failure) - Exception follows.", ex);
} catch (ClassNotFoundException e) {
LOGGER.error("Configuration resolver class not found", e);
} catch (InstantiationException e) {
LOGGER.error("Instantiation exception", e);
} catch (IllegalAccessException e) {
LOGGER.error("Illegal access exception", e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ex) {
LOGGER.warn(
"Unable to close file reader for file: " + file, ex);
}
}
}
return new FlumeConfiguration(new HashMap<String, String>());
}
new FlumeConfiguration(Map<String, String> properties)的代码如下:
/**
* Creates a populated Flume Configuration object.
*/
public FlumeConfiguration(Map<String, String> properties) {
agentConfigMap = new HashMap<String, AgentConfiguration>();
errors = new LinkedList<FlumeConfigurationError>();
// Construct the in-memory component hierarchy
for (String name : properties.keySet()) {
String value = properties.get(name);
// addRawProperty里对agentConfigMap初始化,
// 1:这里插入的是agentConfiguration对象里的contextMap
if (!addRawProperty(name, value)) {
logger.warn("Configuration property ignored: " + name + " = " + value);
}
}
// Now iterate thru the agentContext and create agent configs and add them
// to agentConfigMap
//这里插入的是agentConfiguration对象里的configMap
// validate and remove improperly configured components
//2
validateConfiguration();
}
上面代码最重要的有两处,分别为1处的addRawProperty
和2处的validateConfiguration
,接下来分别进行分析
- 进入FlumeConfiguration.addRawProperty(name,value):
private boolean addRawProperty(String name, String value) {
// Null names and values not supported
if (name == null || value == null) {
errors
.add(new FlumeConfigurationError("", "",
FlumeConfigurationErrorType.AGENT_NAME_MISSING,
ErrorOrWarning.ERROR));
return false;
}
// Empty values are not supported
if (value.trim().length() == 0) {
errors
.add(new FlumeConfigurationError(name, "",
FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
ErrorOrWarning.ERROR));
return false;
}
// Remove leading and trailing spaces
name = name.trim();
value = value.trim();
int index = name.indexOf('.');//获得每一个key的第一个`.`处的索引
// All configuration keys must have a prefix defined as agent name
if (index == -1) {
errors
.add(new FlumeConfigurationError(name, "",
FlumeConfigurationErrorType.AGENT_NAME_MISSING,
ErrorOrWarning.ERROR));
return false;
}
String agentName = name.substring(0, index);//根据`.`切割出来agentName的名称。
// Agent name must be specified for all properties
if (agentName.length() == 0) {
errors
.add(new FlumeConfigurationError(name, "",
FlumeConfigurationErrorType.AGENT_NAME_MISSING,
ErrorOrWarning.ERROR));
return false;
}
String configKey = name.substring(index + 1);//333然后将所有的key值都将agentName切分出去
// Configuration key must be specified for every property
if (configKey.length() == 0) {
errors
.add(new FlumeConfigurationError(name, "",
FlumeConfigurationErrorType.PROPERTY_NAME_NULL,
ErrorOrWarning.ERROR));
return false;
}
AgentConfiguration aconf = agentConfigMap.get(agentName);
// 这里创建AgentConfiguration,并插入到FlumeConfiguration的Map<String, AgentConfiguration> agentConfigMap中。此处的AgentConfiguration对象就是该agentName对应的所有agent的配置信息。具体可深入看下
if (aconf == null) {
aconf = new AgentConfiguration(agentName, errors);
agentConfigMap.put(agentName, aconf);
}
// Each configuration key must begin with one of the three prefixes:
// sources, sinks, or channels.
// 最终,键值对被加载到agentConfiguration中,此处是最重要的。
return aconf.addProperty(configKey, value);
}
经过切分后的//33处的配置文件应该变为:
//这里应该是map形式的,按`=`分割。
sources=source1
channels=channel1
sinks=sink1
sources.source1.type = TAILDIR
sources.source1.channels = channel1
sources.source1.channels.skipToEnd = True
sources.source1.positionFile = /home/flume/data/taildir_position.json
sources.source1.filegroups = f1
sources.source1.filegroups.f1 = /home/flume/test/logs/abc.log
sources.source1.headers.f1.headerKey1 = value1
sources.source1.fileHeader = true
channels.channel1.type=memory
channels.channel1.capacity = 1000
channels.channel1.transactionCapacity = 1000
sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
sinks.sink1.brokerList=a:9092,b:9092,c:9092
sinks.sink1.topic=connect-test-one
sinks.sink1.requiredAcks = 1
sinks.sink1.batchSize = 20
sinks.sink1.channel = channel1
之后的代码如下:
private boolean addProperty(String key, String value) {
// Check for configFilters
if (CONFIG_CONFIGFILTERS.equals(key)) {
if (configFilters == null) {
configFilters = value;
return true;
} else {
LOGGER.warn("Duplicate configfilter list specified for agent: {}", agentName);
addError(CONFIG_CONFIGFILTERS, DUPLICATE_PROPERTY, ERROR);
return false;
}
}
// Check for sources
if (CONFIG_SOURCES.equals(key)) {
if (sources == null) {
sources = value;//获取sources的值
return true;
} else {
LOGGER.warn("Duplicate source list specified for agent: {}", agentName);
addError(CONFIG_SOURCES, DUPLICATE_PROPERTY, ERROR);
return false;
}
}
// Check for sinks
if (CONFIG_SINKS.equals(key)) {
if (sinks == null) {
sinks = value;//获取sinks的值
LOGGER.info("Added sinks: {} Agent: {}", sinks, agentName);
return true;
} else {
LOGGER.warn("Duplicate sink list specfied for agent: {}", agentName);
addError(CONFIG_SINKS, DUPLICATE_PROPERTY, ERROR);
return false;
}
}
// Check for channels
if (CONFIG_CHANNELS.equals(key)) {
if (channels == null) {
channels = value;//获取channels的值
return true;
} else {
LOGGER.warn("Duplicate channel list specified for agent: {}", agentName);
addError(CONFIG_CHANNELS, DUPLICATE_PROPERTY, ERROR);
return false;
}
}
// Check for sinkgroups
if (CONFIG_SINKGROUPS.equals(key)) {
if (sinkgroups == null) {
sinkgroups = value;//获取消费组的值
return true;
} else {
LOGGER.warn("Duplicate sinkgroup list specfied for agent: {}", agentName);
addError(CONFIG_SINKGROUPS, DUPLICATE_PROPERTY, ERROR);
return false;
}
}
//重点在这*******,其中的五个方法分别对各自的compoent进行配置的读取,
if (addAsSourceConfig(key, value)
|| addAsChannelValue(key, value)
|| addAsSinkConfig(key, value)
|| addAsSinkGroupConfig(key, value)
|| addAsConfigFilterConfig(key, value)
) {
return true;
}
LOGGER.warn("Invalid property specified: {}", key);
addError(key, INVALID_PROPERTY, ERROR);
return false;
}
此处以addAsSourceConfig(key, value)
为例来说明,代码如下:
//此处代码从上向下依次深入,可以从下向上看。
//1
private boolean addAsSourceConfig(String key, String value) {
return addComponentConfig(
key, value, CONFIG_SOURCES_PREFIX, sourceContextMap
);
}
--->//2
private boolean addComponentConfig(
String key, String value, String configPrefix, Map<String, Context> contextMap
) {
ComponentNameAndConfigKey parsed = parseConfigKey(key, configPrefix);
if (parsed != null) {
String name = parsed.getComponentName().trim();//获得sourc等组件的名称
LOGGER.info("Processing:{}", name);
Context context = contextMap.get(name);
if (context == null) {
LOGGER.debug("Created context for {}: {}", name, parsed.getConfigKey());
context = new Context();
contextMap.put(name, context);//将组件名及key后面的值存到source的Context上下文中。
}
context.put(parsed.getConfigKey(), value);//将组件信息及
return true;
}
return false;
}
--->//3
private ComponentNameAndConfigKey parseConfigKey(String key, String prefix) {
//例如<sources.source1.type , TAILDIR>
// key must start with prefix
if (!key.startsWith(prefix)) {//key:sources.source1.type,是以sources.开头的
return null;
}
// key must have a component name part after the prefix of the format:
// <prefix><component-name>.<config-key>
int index = key.indexOf('.', prefix.length() + 1);
if (index == -1) {
return null;
}
String name = key.substring(prefix.length(), index);//将source1从key中切出来->source.name
String configKey = key.substring(prefix.length() + name.length() + 1);//configKey就是type了
// name and config key must be non-empty
if (name.isEmpty() || configKey.isEmpty()) {
return null;
}
return new ComponentNameAndConfigKey(name, configKey);//然后将其组成ComponentNameAndConfigKey对象
}
-->//4
private ComponentNameAndConfigKey(String name, String configKey) {
this.componentName = name;
this.configKey = configKey;
}
然后又回到//2
处的if (parsed != null)
处了。进入深入。
所以最后是加载到AgentConfiguration中对应的*ContextMap中了;
- 最后在代码中还有一处
// validate and remove improperly configured components
validateConfiguration();
对应详细代码有:
private void validateConfiguration() {
Iterator<String> it = agentConfigMap.keySet().iterator();
while (it.hasNext()) {
String agentName = it.next();
AgentConfiguration aconf = agentConfigMap.get(agentName);
if (!aconf.isValid()) {
logger.warn("Agent configuration invalid for agent '" + agentName
+ "'. It will be removed.");
errors.add(new FlumeConfigurationError(agentName, "",
FlumeConfigurationErrorType.AGENT_CONFIGURATION_INVALID,
ErrorOrWarning.ERROR));
it.remove();
}
logger.debug("Channels:" + aconf.channels + "\n");
logger.debug("Sinks " + aconf.sinks + "\n");
logger.debug("Sources " + aconf.sources + "\n");
}
logger.info("Post-validation flume configuration contains configuration"
+ " for agents: " + agentConfigMap.keySet());
}
aconf.isValid()
private boolean isValid() {
logger.debug("Starting validation of configuration for agent: {}", agentName);
if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
logger.debug("Initial configuration: {}", this.getPrevalidationConfig());
}
// Make sure that at least one channel is specified
if (channels == null || channels.trim().length() == 0) {
logger.warn("Agent configuration for '" + agentName
+ "' does not contain any channels. Marking it as invalid.");
errorList.add(new FlumeConfigurationError(agentName,
BasicConfigurationConstants.CONFIG_CHANNELS,
FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
ErrorOrWarning.ERROR));
return false;
}
channelSet =
new HashSet<String>(Arrays
.asList(channels.split("\\s+")));
// validateComponent(channelSet, channelConfigMap, CLASS_CHANNEL,
// ATTR_TYPE);
channelSet = validateChannels(channelSet);
if (channelSet.size() == 0) {
logger.warn("Agent configuration for '" + agentName
+ "' does not contain any valid channels. Marking it as invalid.");
errorList.add(new FlumeConfigurationError(agentName,
BasicConfigurationConstants.CONFIG_CHANNELS,
FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
ErrorOrWarning.ERROR));
return false;
}
sourceSet = validateSources(channelSet);
sinkSet = validateSinks(channelSet);
sinkgroupSet = validateGroups(sinkSet);
// If no sources or sinks are present, then this is invalid
if (sourceSet.size() == 0 && sinkSet.size() == 0) {
logger.warn("Agent configuration for '" + agentName
+ "' has no sources or sinks. Will be marked invalid.");
errorList.add(new FlumeConfigurationError(agentName,
BasicConfigurationConstants.CONFIG_SOURCES,
FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
ErrorOrWarning.ERROR));
errorList.add(new FlumeConfigurationError(agentName,
BasicConfigurationConstants.CONFIG_SINKS,
FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
ErrorOrWarning.ERROR));
return false;
}
// Now rewrite the sources/sinks/channels
this.sources = getSpaceDelimitedList(sourceSet);
this.channels = getSpaceDelimitedList(channelSet);
this.sinks = getSpaceDelimitedList(sinkSet);
this.sinkgroups = getSpaceDelimitedList(sinkgroupSet);
if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
logger.debug("Post validation configuration for {}", agentName);
logger.debug(this.getPostvalidationConfig());
}
return true;
}
validateChannels
private Set<String> validateChannels(Set<String> channelSet) {
Iterator<String> iter = channelSet.iterator();
Map<String, Context> newContextMap = new HashMap<String, Context>();
ChannelConfiguration conf = null;
while (iter.hasNext()) {
String channelName = iter.next();
Context channelContext = channelContextMap.get(channelName);
// Context exists in map.
if (channelContext != null) {
// Get the configuration object for the channel:
ChannelType chType = getKnownChannel(channelContext.getString(
BasicConfigurationConstants.CONFIG_TYPE));
boolean configSpecified = false;
String config = null;
// Not a known channel - cannot do specific validation to this channel
if (chType == null) {
config = channelContext.getString(BasicConfigurationConstants.CONFIG_CONFIG);
if (config == null || config.isEmpty()) {
config = "OTHER";
} else {
configSpecified = true;
}
} else {
config = chType.toString().toUpperCase(Locale.ENGLISH);
configSpecified = true;
}
try {
//建立channel组件
conf =
(ChannelConfiguration) ComponentConfigurationFactory.create(
channelName, config, ComponentType.CHANNEL);
logger.debug("Created channel " + channelName);
if (conf != null) {
//配置组件信息
conf.configure(channelContext);
}
if ((configSpecified && conf.isNotFoundConfigClass()) ||
!configSpecified) {
newContextMap.put(channelName, channelContext);
} else if (configSpecified) {
channelConfigMap.put(channelName, conf);
}
if (conf != null) {
errorList.addAll(conf.getErrors());
}
} catch (ConfigurationException e) {
// Could not configure channel - skip it.
// No need to add to error list - already added before exception is
// thrown
if (conf != null) errorList.addAll(conf.getErrors());
iter.remove();
logger.warn("Could not configure channel " + channelName
+ " due to: " + e.getMessage(), e);
}
} else {
iter.remove();
errorList.add(new FlumeConfigurationError(agentName, channelName,
FlumeConfigurationErrorType.CONFIG_ERROR, ErrorOrWarning.ERROR));
}
}
channelContextMap = newContextMap;
Set<String> tempchannelSet = new HashSet<String>();
tempchannelSet.addAll(channelConfigMap.keySet());
tempchannelSet.addAll(channelContextMap.keySet());
channelSet.retainAll(tempchannelSet);
return channelSet;
}
至此,一个完整的FlumeConfiguration对象已经完全解析好了;下面继续; 接下来开始创建组件了
AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());//得到对应agent的所有组件的信息
if (agentConf != null) {
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
try {
loadChannels(agentConf, channelComponentMap);
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
for (String channelName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.get(channelName);
if (channelComponent.components.isEmpty()) {
LOGGER.warn(String.format("Channel %s has no components connected" +
" and has been removed.", channelName));
channelComponentMap.remove(channelName);
Map<String, Channel> nameChannelMap =
channelCache.get(channelComponent.channel.getClass());
if (nameChannelMap != null) {
nameChannelMap.remove(channelName);
}
} else {
LOGGER.info(String.format("Channel %s connected to %s",
channelName, channelComponent.components.toString()));
conf.addChannel(channelName, channelComponent.channel);
}
}
for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
} catch (InstantiationException ex) {
LOGGER.error("Failed to instantiate component", ex);
} finally {
channelComponentMap.clear();
sourceRunnerMap.clear();
sinkRunnerMap.clear();
}
}
此处以channel为例说明问题。
private void loadChannels(AgentConfiguration agentConf,
Map<String, ChannelComponent> channelComponentMap)
throws InstantiationException {
LOGGER.info("Creating channels");
ListMultimap<Class<? extends Channel>, String> channelsNotReused =
ArrayListMultimap.create();
// assume all channels will not be re-used
for (Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry :
channelCache.entrySet()) {
Class<? extends Channel> channelKlass = entry.getKey();
Set<String> channelNames = entry.getValue().keySet();
channelsNotReused.get(channelKlass).addAll(channelNames);
}
Set<String> channelNames = agentConf.getChannelSet();
Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
/*
* Components which have a ComponentConfiguration object
*/
for (String chName : channelNames) {
ComponentConfiguration comp = compMap.get(chName);
if (comp != null) {
Channel channel = getOrCreateChannel(channelsNotReused,
comp.getComponentName(), comp.getType());
try {
Configurables.configure(channel, comp);
channelComponentMap.put(comp.getComponentName(),
new ChannelComponent(channel));
LOGGER.info("Created channel " + chName);
} catch (Exception e) {
String msg = String.format("Channel %s has been removed due to an " +
"error during configuration", chName);
LOGGER.error(msg, e);
}
}
}
/*
* Components which DO NOT have a ComponentConfiguration object
* and use only Context
*/
for (String chName : channelNames) {
Context context = agentConf.getChannelContext().get(chName);
if (context != null) {
Channel channel = getOrCreateChannel(channelsNotReused, chName,
context.getString(BasicConfigurationConstants.CONFIG_TYPE));
try {
Configurables.configure(channel, context);
channelComponentMap.put(chName, new ChannelComponent(channel));
LOGGER.info("Created channel " + chName);
} catch (Exception e) {
String msg = String.format("Channel %s has been removed due to an " +
"error during configuration", chName);
LOGGER.error(msg, e);
}
}
}
/*
* Any channel which was not re-used, will have it's reference removed
*/
for (Class<? extends Channel> channelKlass : channelsNotReused.keySet()) {
Map<String, Channel> channelMap = channelCache.get(channelKlass);
if (channelMap != null) {
for (String channelName : channelsNotReused.get(channelKlass)) {
if (channelMap.remove(channelName) != null) {
LOGGER.info("Removed {} of type {}", channelName, channelKlass);
}
}
if (channelMap.isEmpty()) {
channelCache.remove(channelKlass);
}
}
}
}
上述代码是创建channel的。