写在前面的话,笔者第一次阅读框架源码,所以可能有些地方理解错误或者没有详细解释,如果在阅读过程发现错误很欢迎在文章下面评论指出。文章后续会陆续更新,可以关注或者收藏,转发请先私信我,谢谢。对了,笔者看的是2.2.1这个版本
概述
JStorm是一个分布式的实时计算引擎,是阿里巴巴根据storm的流处理模型进行重写的一个框架,支持相同的逻辑模型(也就是拓扑结构),然后底层的实现却大有不同。不过本文并不是打算对两个框架进行比较,接下来我会从源码的角度上来解析JStorm是如何工作的。
作为第一个篇章,笔者先来介绍下nimbus以及它启动的时候做了什么。JStorm的主节点上运行着nimbus的守护进程,这个进程主要负责与ZK通信,分发代码,给集群中的从节点分配任务,监视集群状态等等。此外nimbus需要维护的所有状态都会存储在ZK中,JStorm为了减少对ZK的访问次数做了一些缓存,这个后续代码分析会说到。以上是nimbus功能的简介,接下来我们从源码的角度看看Nimbus到底做了什么。首先在Nimbus启动的时候:
//设置主线程由于未捕获异常而突然中止时调用的默认程序
Thread.setDefaultUncaughtExceptionHandler(new DefaultUncaughtExceptionHandler());
//加载集群的配置信息
Map config = Utils.readStormConfig();
//这下面这个方法内部注释掉了,笔者暂时没有太在意,后续再补充
JStormServerUtils.startTaobaoJvmMonitor();
//创建一个NimbusServer实例
NimbusServer instance = new NimbusServer();
//创建一个默认的nimbus启动类
INimbus iNimbus = new DefaultInimbus();
//开始进行实际的初始化
instance.launchServer(config, iNimbus);
其实在DefaultUncaughtExceptionHandler
中也并没有太多的处理操作,简单判断是否是内存溢出,然后正常关闭,否则就是异常直接抛出然后中断。读取配置的过程就不详细讲解了。NimbusServer
这个类主要封装了一些用于操作Nimbus的成员变量和方法,Nimbus的启动操作基本都是定义在这个类内的(上述代码就是这个类中的main方法所定义的)。
最重要的方法是launchServer
,接下来就详细的解说这个方法的作用,首先来看下launchServer
这个方法内部的代码:
private void launchServer(final Map conf, INimbus inimbus) {
LOG.info("Begin to start nimbus with conf " + conf);
try {
//判断配置模式是否正确
StormConfig.validate_distributed_mode(conf);
createPid(conf);
//设置退出时的操作
initShutdownHook();
//这个方法在默认实现中没有任何操作
inimbus.prepare(conf, StormConfig.masterInimbus(conf));
//创建NimbusData对象
data = createNimbusData(conf, inimbus);
//这个方法主要负责处理当nimbus线程称为leader线程之后的操作
initFollowerThread(conf);
int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf);
hs = new Httpserver(port, conf);
hs.start();
//如果集群是运行在yarn上,也需要做一些初始化操作。
initContainerHBThread(conf);
serviceHandler = new ServiceHandler(data);
//thrift是一个分布式的RPC框架
initThrift(conf);
} catch (Throwable e) {
if (e instanceof OutOfMemoryError) {
LOG.error("Halting due to Out Of Memory Error...");
}
LOG.error("Fail to run nimbus ", e);
} finally {
cleanup();
}
LOG.info("Quit nimbus");
}
判断配置中的模式
只是判断配置信息中的一个字段名为“storm.cluster.mode”是否是“distributed”,本地模式下是“local”。
initShutdownHook
添加退出的时候一些操作,包括设置参数提醒集群要退出,清除nimbus存储下的一些工作线程(负责处理通信,分发代码,心跳的一系列守护线程),关闭打开的各种资源等。
createNimbusData
这个方法用于创建一个NimbusData
的对象,这个对象封装了Nimbus与ZK通信的一些成员变量。下面会在每个方法内部逐渐讲到NimbusData
的一些成员变量以及他们的作用。首先来看看NimbusData
的构造方法。
public NimbusData(final Map conf, INimbus inimbus) throws Exception {
this.conf = conf;
//两个方法分别处理打开的文件流和blob传输流
createFileHandler();
mkBlobCacheMap();
this.nimbusHostPortInfo = NimbusInfo.fromConf(conf);
this.blobStore = BlobStoreUtils.getNimbusBlobStore(conf, nimbusHostPortInfo);
this.isLaunchedCleaner = false;
this.isLaunchedMonitor = false;
this.submittedCount = new AtomicInteger(0);
this.stormClusterState = Cluster.mk_storm_cluster_state(conf);
createCache();
this.taskHeartbeatsCache = new ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>>();
//创建一个调度线程池,默认大小为12
this.scheduExec = Executors.newScheduledThreadPool(SCHEDULE_THREAD_NUM);
this.statusTransition = new StatusTransition(this);
this.startTime = TimeUtils.current_time_secs();
this.inimubs = inimbus;
localMode = StormConfig.local_mode(conf);
this.metricCache = new JStormMetricCache(conf, this.stormClusterState);
this.clusterName = ConfigExtension.getClusterName(conf);
pendingSubmitTopologies = new TimeCacheMap<String, Object>(JStormUtils.MIN_10);
topologyTaskTimeout = new ConcurrentHashMap<String, Integer>();
tasksHeartbeat = new ConcurrentHashMap<String, TopologyTaskHbInfo>();
this.metricsReporter = new JStormMetricsReporter(this);
this.metricRunnable = ClusterMetricsRunnable.mkInstance(this);
String configUpdateHandlerClass = ConfigExtension.getNimbusConfigUpdateHandlerClass(conf);
this.configUpdateHandler = (ConfigUpdateHandler) Utils.newInstance(configUpdateHandlerClass);
if (conf.containsKey(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN)) {
String string = (String) conf.get(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN);
nimbusNotify = (ITopologyActionNotifierPlugin) Utils.newInstance(string);
} else {
nimbusNotify = null;
}
}
3.1 createFileHandler:在这方法内部,实现了一个匿名的内部类ExpiredCallback
,在其内部实现了一个方法叫expire
,利用回调的方式来关闭Channel
或者BufferFileInputStream
实例对象。
public void createFileHandler() {
ExpiredCallback<Object, Object> expiredCallback = new ExpiredCallback<Object, Object>() {
@Override
public void expire(Object key, Object val) {
try {
LOG.info("Close file " + String.valueOf(key));
if (val != null) {
if (val instanceof Channel) {
Channel channel = (Channel) val;
channel.close();
} else if (val instanceof BufferFileInputStream) {
BufferFileInputStream is = (BufferFileInputStream) val;
is.close();
}
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
};
//获取超时时间
int file_copy_expiration_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30);
uploaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs, expiredCallback);
downloaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs, expiredCallback);
}
然后初始化NimbusData
的两个成员变量uploaders
和downloaders
,这两个分别维护需要上传的通道和需要下载的通道。TimeCacheMap
这个类的主要实现逻辑是在其构造函数内部启动一个守护线程。首先创建一个缓冲区,只要系统不关闭,则在守护线程内部不断的缓冲区获取对象,在对象不为空的情况下调用回调函数的expire方法,并执行相应的操作,这里具体传进来的expire
方法是关闭Channel
或者BufferFileInputStream
。
3.2. mkBlobCacheMap
:和上述的方法非常类似,也是申明一个匿名内部类,然后初始化几个成员变量。代码几乎和上个方法一样就不浪费拌面去贴了。这里expire方法中要关闭的是两个流AtomicOutputStream
和BufferInputStream
,blobUploaders
和blobDownloaders
分别存放着上传和下载所打开的流。blobListers
存放上传和下载的数据。
3.3. 初始化几个成员变量,包括NimbusInfo(包含了主机名,端口和标志是否是leader),BlobStore(用来存储blob数据的,使用键值存储,阿里提供了两个不同的blob存储方式,一种是本地文件系统存储,一种的hdfs存储,两种方式的区别在于,由于本地文件存储并不能保证一致性,所以需要ZK介入来保证,这是JStorm的默认配置。如果使用hdfs来存储,则不需要ZK介入,因为hdfs能保证一致性和正确性),StormClusterState(存储整个集群的状态,这个是从ZK上获取的),为了避免多次向ZK通信,还需要设置缓存信息,任务的心跳信息等等。
3.4. 初始化好metrics相关的报告线程和监听线程。
initFollowerThread
4.1. 方法首先初始化一个回调函数,这是当一个nimbus成为leader之后就会调用的一个用于初始化一系列变量的方法,包括拓扑如何在集群上分配,拓扑状态更新,清除函数,还有监控线程等。后续会有新的篇章来介绍这个init方法,这里先放这个方法的源码。
private void init(Map conf) throws Exception {
data.init();
NimbusUtils.cleanupCorruptTopologies(data);
//拓扑分配
initTopologyAssign();
//状态更新
initTopologyStatus();
//清除函数
initCleaner(conf);
initMetricRunnable();
if (!data.isLocalMode()) {
initMonitor(conf);
//mkRefreshConfThread(data);
}
}
4.2. 初始化一个Runnable的子类,在构造方法中,首先判断集群并不是使用本地模式,然后更新ZK上的节点信息(将nimbus注册到ZK上)。然后通过ZK获取集群的状态信息,毕竟nimbus是需要维护整个集群的。紧接着判断是否存在leader,两次都无法选举出leader之后,则将ZK上的nimbus信息删除并退出。如果blobstore使用的是本地文件模式(有本文模式还有hdfs模式两种)还需要添加一个回调函数,这个回调函数执行的操作是,当这个nimbus不是leader的时候,对blob进行同步。此外还需要将那些active的blob存到ZK中,而将死掉的进行清除(原因前文3.3也说到过,本地模式存储无法保证一致性,所以需要ZK进行维护,而hdfs自带容错机制,能保证数据的一致性)。
4.3. 设置该线程为守护线程,并启动这个线程。run方法首先判断当前保存在ZK上的集群中是否有leader,如果没有则选举当前nimbus为leader线程。如果有了leader线程,则需要判断是否跟当前的nimbus相同,如果不相同则停止当前的nimbus,毕竟已经有leader存在了。如果是相同的,则需要判断本地的状态中,如果还没有设置为leader,表明当前nimbus还没有进行初始化,则先设置nimbus为leader然后回调函数进行初始化,也就是调用init(conf)
方法。
获取一个端口(默认的端口是7621)用于构建HttpServer
实例对象。可以用于处理和接受tcp连接,启动一个新的线程进行httpserver的监听。(主要作用或者说在哪里用到尚且不明确)。
initContainerHBThread
这个方法的主要作用是得知是否能在资源管理器(yarn)上运行jstorm集群,如果可以的话,则需要创建一个新的线程用于处理。(其实这里使用容器的目的是可以在一个物理集群上运行多个不一样的逻辑集群甚至多个JStorm集群,能动态调整逻辑集群分到的资源,此外,资源管理器能提供非常强的可扩展性)。容器线程会被添加到NimbusServer
中,后续使用到的时候再详细讲解。这个容器线程也是守护线程,且马上就会启动,这个线程的run方法里面包含两个处理:
6.1. handleWriteDir
:这个方法的主要作用是清除掉容器上的过期心跳信息,准确的说,如果JStorm集群容器目录下的心跳信息大于10,则需要清除(从最老的开始)。
6.2. handlReadDir
:这里主要是用于维护本地是否能接受到集群上的hb信息,如果多次超时则要抛出异常。
initThrift
thrift是JStorm使用的一个分布式RPC框架。笔者后续再添加相应的源码解析。