前言:
这些天一直奔波于长沙和武汉之间,忙着腾讯的笔试、面试,以至于对hadoop RPC(Remote Procedure Call Protocol ,远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。可以参考:
http://baike.baidu.com/view/32726.htm
)机制分析的博客一直耽搁了下来。昨天晚上胡老大和我抱怨说:最近乱的很。呵呵,老是往武汉跑,能不乱嘛。不过差不多腾讯面试的事就该告一段落了。五一期间,云计算小组的成员们,我们再搞起来吧。记住,我们还有一本hadoop的手册没出来呢。胡老大已经答应给我们写提纲了,在这期间,我们还是先把内功再修炼修炼吧。
分析对象:
hadoop版本:hadoop 0.20.203.0
必备技术点:
1. 动态代理(参考
:http://weixiaolu.iteye.com/blog/1477774
)
2. Java NIO(参考 :
http://weixiaolu.iteye.com/blog/1479656
)
3. Java网络编程
目录:
一.RPC协议
二.ipc.RPC源码分析
三.ipc.Client源码分析
四.ipc.Server源码分析
分析:
一.RPC协议
在分析协议之前,我觉得我们很有必要先搞清楚协议是什么。下面我就谈一点自己的认识吧。如果你学过java的网络编程,你一定知道:当客户端发送一个字节给服务端时,服务端必须也要有一个读字节的方法在阻塞等待;反之亦然。 这种我把它称为底层的通信协议。可是对于一个大型的网络通信系统来说,很显然这种说法的协议粒度太小,不方便我们理解整个网络通信的流程及架构,所以我造了个说法:架构层次的协议。通俗一点说,就是我把某些接口和接口中的方法称为协议,客户端和服务端只要实现这些接口中的方法就可以进行通信了,从这个角度来说,架构层次协议的说法就可以成立了(注:如果从架构层次的协议来分析系统,我们就先不要太在意方法的具体实现,呵呵,我相信你懂得~)。
Hadoop的RPC机制正是采用了这种“架构层次的协议”,有一整套作为协议的接口。如图:
下面就几个重点的协议介绍一下吧:
VersionedProtocol :它是所有RPC协议接口的父接口,其中只有一个方法:getProtocolVersion()
(1)HDFS相关
ClientDatanodeProtocol :一个客户端和datanode之间的协议接口,用于数据块恢复
ClientProtocol :client与Namenode交互的接口,所有控制流的请求均在这里,如:创建文件、删除文件等;
DatanodeProtocol : Datanode与Namenode交互的接口,如心跳、blockreport等;
NamenodeProtocol :SecondaryNode与Namenode交互的接口。
(2)Mapreduce相关
InterDatanodeProtocol :Datanode内部交互的接口,用来更新block的元数据;
InnerTrackerProtocol :TaskTracker与JobTracker交互的接口,功能与DatanodeProtocol相似;
JobSubmissionProtocol :JobClient与JobTracker交互的接口,用来提交Job、获得Job等与Job相关的操作;
TaskUmbilicalProtocol :Task中子进程与母进程交互的接口,子进程即map、reduce等操作,母进程即TaskTracker,该接口可以回报子进程的运行状态(词汇扫盲: umbilical 脐带的, 关系亲密的) 。
一下子罗列了这么多的协议,有些人可能要问了,hadoop是怎么使用它们的呢?呵呵,不要着急哦,其实本篇博客所分析的是hadoop的RPC机制底层的具体实现,而这些协议却是应用层上的东西,比如hadoop是怎么样保持“心跳”的啊。所以在我的下一篇博客:源码级分析hadoop的心跳机制中会详细说明以上协议是怎样被使用的。尽请期待哦~。现在就开始我们的RPC源码之旅吧•••
二.ipc.RPC源码分析
ipc.RPC类中有一些内部类,为了大家对RPC类有个初步的印象,就先罗列几个我们感兴趣的分析一下吧:
Invocation :用于封装方法名和参数,作为数据传输层,相当于VO吧。
ClientCache :用于存储client对象,用socket factory作为hash key,存储结构为hashMap <SocketFactory, Client>。
Invoker :是动态代理中的调用实现类,继承了InvocationHandler.
Server :是ipc.Server的实现类。
从以上的分析可以知道,Invocation类仅作为VO,ClientCache类只是作为缓存,而Server类用于服务端的处理,他们都和客户端的数据流和业务逻辑没有关系。现在就只剩下Invoker类了。如果你对动态代理(参考: http://weixiaolu.iteye.com/blog/1477774 )比较了解的话,你一下就会想到,我们接下来去研究的就是RPC.Invoker类中的invoke()方法了。代码如下:
代码一:
- public Object invoke(Object proxy, Method method, Object[] args)
- throws Throwable {
- •••
- ObjectWritable value = (ObjectWritable)
- client.call(new Invocation(method, args), remoteId);
- •••
- return value.get();
- }
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { ••• ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId); ••• return value.get(); }
呵呵,如果你发现这个invoke()方法实现的有些奇怪的话,那你就对了。一般我们看到的动态代理的invoke()方法中总会有 method.invoke(ac, arg); 这句代码。而上面代码中却没有,这是为什么呢?其实使用 method.invoke(ac, arg); 是在本地JVM中调用;而在hadoop中,是将数据发送给服务端,服务端将处理的结果再返回给客户端,所以这里的invoke()方法必然需要进行网络通信。而网络通信就是下面的这段代码实现的:
代码二:
- ObjectWritable value = (ObjectWritable)
- client.call(new Invocation(method, args), remoteId);
ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);
Invocation类在这里封装了方法名和参数,充当VO。其实这里网络通信只是调用了Client类的call()方法。那我们接下来分析一下ipc.Client源码吧。不过在分析ipc.Client源码之前,为了不让我们像盲目的苍蝇一样乱撞,我想先确定一下我们分析的目的是什么,我总结出了三点需要解决的问题:
1. 客户端和服务端的连接是怎样建立的?
2. 客户端是怎样给服务端发送数据的?
3. 客户端是怎样获取服务端的返回数据的?
基于以上三个问题,我们开始吧!!!
三.ipc.Client源码分析
同样,为了对Client类有个初步的了解,我们也先罗列几个我们感兴趣的内部类:
Call :用于封装Invocation对象,作为VO,写到服务端,同时也用于存储从服务端返回的数据
Connection :用以处理远程连接对象。继承了Thread
ConnectionId :唯一确定一个连接
问题1:客户端和服务端的连接是怎样建立的?
下面我们来看看Client类中的cal()方法吧:
代码三:
- public Writable call(Writable param, ConnectionId remoteId)
- throws InterruptedException, IOException {
- Call call = new Call(param); //将传入的数据封装成call对象
- Connection connection = getConnection(remoteId, call); //获得一个连接
- connection.sendParam(call); // 向服务端发送call对象
- boolean interrupted = false;
- synchronized (call) {
- while (!call.done) {
- try {
- call.wait(); // 等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程
- } catch (InterruptedException ie) {
- // 因中断异常而终止,设置标志interrupted为true
- interrupted = true;
- }
- }
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- if (call.error != null) {
- if (call.error instanceof RemoteException) {
- call.error.fillInStackTrace();
- throw call.error;
- } else { // 本地异常
- throw wrapException(remoteId.getAddress(), call.error);
- }
- } else {
- return call.value; //返回结果数据
- }
- }
- }
public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(param); //将传入的数据封装成call对象 Connection connection = getConnection(remoteId, call); //获得一个连接 connection.sendParam(call); // 向服务端发送call对象 boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); // 等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程 } catch (InterruptedException ie) { // 因中断异常而终止,设置标志interrupted为true interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { // 本地异常 throw wrapException(remoteId.getAddress(), call.error); } } else { return call.value; //返回结果数据 } } }
具体代码的作用我已做了注释,所以这里不再赘述。但到目前为止,你依然不知道RPC机制底层的网络连接是怎么建立的。呵呵,那我们只好再去深究了,分析代码后,我们会发现和网络通信有关的代码只会是下面的两句了:
代码四:
- Connection connection = getConnection(remoteId, call); //获得一个连接
- connection.sendParam(call); // 向服务端发送call对象
Connection connection = getConnection(remoteId, call); //获得一个连接 connection.sendParam(call); // 向服务端发送call对象
先看看是怎么获得一个到服务端的连接吧,下面贴出ipc.Client类中的getConnection()方法。
代码五:
- private Connection getConnection(ConnectionId remoteId,
- Call call)
- throws IOException, InterruptedException {
- if (!running.get()) {
- // 如果client关闭了
- throw new IOException(“The client is stopped”);
- }
- Connection connection;
- //如果connections连接池中有对应的连接对象,就不需重新创建了;如果没有就需重新创建一个连接对象。
- //但请注意,该//连接对象只是存储了remoteId的信息,其实还并没有和服务端建立连接。
- do {
- synchronized (connections) {
- connection = connections.get(remoteId);
- if (connection == null) {
- connection = new Connection(remoteId);
- connections.put(remoteId, connection);
- }
- }
- } while (!connection.addCall(call)); //将call对象放入对应连接中的calls池,就不贴出源码了
- //这句代码才是真正的完成了和服务端建立连接哦~
- connection.setupIOstreams();
- return connection;
- }
private Connection getConnection(ConnectionId remoteId, Call call) throws IOException, InterruptedException { if (!running.get()) { // 如果client关闭了 throw new IOException("The client is stopped"); } Connection connection; //如果connections连接池中有对应的连接对象,就不需重新创建了;如果没有就需重新创建一个连接对象。 //但请注意,该//连接对象只是存储了remoteId的信息,其实还并没有和服务端建立连接。 do { synchronized (connections) { connection = connections.get(remoteId); if (connection == null) { connection = new Connection(remoteId); connections.put(remoteId, connection); } } } while (!connection.addCall(call)); //将call对象放入对应连接中的calls池,就不贴出源码了 //这句代码才是真正的完成了和服务端建立连接哦~ connection.setupIOstreams(); return connection; }
如果你还有兴趣继续分析下去,那我们就一探建立连接的过程吧,下面贴出Client.Connection类中的setupIOstreams()方法:
代码六:
- private synchronized void setupIOstreams() throws InterruptedException {
- •••
- try {
- •••
- while (true) {
- setupConnection(); //建立连接
- InputStream inStream = NetUtils.getInputStream(socket); //获得输入流
- OutputStream outStream = NetUtils.getOutputStream(socket); //获得输出流
- writeRpcHeader(outStream);
- •••
- this.in = new DataInputStream(new BufferedInputStream
- (new PingInputStream(inStream))); //将输入流装饰成DataInputStream
- this.out = new DataOutputStream
- (new BufferedOutputStream(outStream)); //将输出流装饰成DataOutputStream
- writeHeader();
- // 跟新活动时间
- touch();
- //当连接建立时,启动接受线程等待服务端传回数据,注意:Connection继承了Tread
- start();
- return;
- }
- } catch (IOException e) {
- markClosed(e);
- close();
- }
- }
private synchronized void setupIOstreams() throws InterruptedException { ••• try { ••• while (true) { setupConnection(); //建立连接 InputStream inStream = NetUtils.getInputStream(socket); //获得输入流 OutputStream outStream = NetUtils.getOutputStream(socket); //获得输出流 writeRpcHeader(outStream); ••• this.in = new DataInputStream(new BufferedInputStream (new PingInputStream(inStream))); //将输入流装饰成DataInputStream this.out = new DataOutputStream (new BufferedOutputStream(outStream)); //将输出流装饰成DataOutputStream writeHeader(); // 跟新活动时间 touch(); //当连接建立时,启动接受线程等待服务端传回数据,注意:Connection继承了Tread start(); return; } } catch (IOException e) { markClosed(e); close(); } }
再有一步我们就知道客户端的连接是怎么建立的啦,下面贴出Client.Connection类中的setupConnection()方法:
代码七:
- private synchronized void setupConnection() throws IOException {
- short ioFailures = 0;
- short timeoutFailures = 0;
- while (true) {
- try {
- this.socket = socketFactory.createSocket(); //终于看到创建socket的方法了
- this.socket.setTcpNoDelay(tcpNoDelay);
- •••
- // 设置连接超时为20s
- NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
- this.socket.setSoTimeout(pingInterval);
- return;
- } catch (SocketTimeoutException toe) {
- /* 设置最多连接重试为45次。
- * 总共有20s*45 = 15 分钟的重试时间。
- */
- handleConnectionFailure(timeoutFailures++, 45, toe);
- } catch (IOException ie) {
- handleConnectionFailure(ioFailures++, maxRetries, ie);
- }
- }
- }
private synchronized void setupConnection() throws IOException { short ioFailures = 0; short timeoutFailures = 0; while (true) { try { this.socket = socketFactory.createSocket(); //终于看到创建socket的方法了 this.socket.setTcpNoDelay(tcpNoDelay); ••• // 设置连接超时为20s NetUtils.connect(this.socket, remoteId.getAddress(), 20000); this.socket.setSoTimeout(pingInterval); return; } catch (SocketTimeoutException toe) { /* 设置最多连接重试为45次。 * 总共有20s*45 = 15 分钟的重试时间。 */ handleConnectionFailure(timeoutFailures++, 45, toe); } catch (IOException ie) { handleConnectionFailure(ioFailures++, maxRetries, ie); } } }
终于,我们知道了客户端的连接是怎样建立的了,其实就是创建一个普通的socket进行通信。呵呵,那服务端是不是也是创建一个ServerSocket进行通信的呢?呵呵,先不要急,到这里我们只解决了客户端的第一个问题,下面还有两个问题没有解决呢,我们一个一个地来解决吧。
问题2:客户端是怎样给服务端发送数据的?
我们回顾一下代码四吧。第一句为了完成连接的建立,我们已经分析完毕;而第二句是为了发送数据,呵呵,分析下去,看能不能解决我们的问题呢。下面贴出Client.Connection类的sendParam()方法吧:
代码八:
- public void sendParam(Call call) {
- if (shouldCloseConnection.get()) {
- return;
- }
- DataOutputBuffer d=null;
- try {
- synchronized (this.out) {
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ” sending #” + call.id);
- //创建一个缓冲区
- d = new DataOutputBuffer();
- d.writeInt(call.id);
- call.param.write(d);
- byte[] data = d.getData();
- int dataLength = d.getLength();
- out.writeInt(dataLength); //首先写出数据的长度
- out.write(data, 0, dataLength); //向服务端写数据
- out.flush();
- }
- } catch(IOException e) {
- markClosed(e);
- } finally {
- IOUtils.closeStream(d);
- }
- }
public void sendParam(Call call) { if (shouldCloseConnection.get()) { return; } DataOutputBuffer d=null; try { synchronized (this.out) { if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); //创建一个缓冲区 d = new DataOutputBuffer(); d.writeInt(call.id); call.param.write(d); byte[] data = d.getData(); int dataLength = d.getLength(); out.writeInt(dataLength); //首先写出数据的长度 out.write(data, 0, dataLength); //向服务端写数据 out.flush(); } } catch(IOException e) { markClosed(e); } finally { IOUtils.closeStream(d); } }
其实这就是java io的socket发送数据的一般过程哦,没有什么特别之处。到这里问题二也解决了,来看看问题三吧。
问题3:客户端是怎样获取服务端的返回数据的?
我们再回顾一下代码六吧。代码六中,当连接建立时会启动一个线程用于处理服务端返回的数据,我们看看这个处理线程是怎么实现的吧,下面贴出Client.Connection类和Client.Call类中的相关方法吧:
代码九:
- 方法一:
- public void run() {
- •••
- while (waitForWork()) {
- receiveResponse(); //具体的处理方法
- }
- close();
- •••
- }
- 方法二:
- private void receiveResponse() {
- if (shouldCloseConnection.get()) {
- return;
- }
- touch();
- try {
- int id = in.readInt(); // 阻塞读取id
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ” got value #” + id);
- Call call = calls.get(id); //在calls池中找到发送时的那个对象
- int state = in.readInt(); // 阻塞读取call对象的状态
- if (state == Status.SUCCESS.state) {
- Writable value = ReflectionUtils.newInstance(valueClass, conf);
- value.readFields(in); // 读取数据
- //将读取到的值赋给call对象,同时唤醒Client等待线程,贴出setValue()代码方法三
- call.setValue(value);
- calls.remove(id); //删除已处理的call
- } else if (state == Status.ERROR.state) {
- •••
- } else if (state == Status.FATAL.state) {
- •••
- }
- } catch (IOException e) {
- markClosed(e);
- }
- }
- 方法三:
- public synchronized void setValue(Writable value) {
- this.value = value;
- callComplete(); //具体实现
- }
- protected synchronized void callComplete() {
- this.done = true;
- notify(); // 唤醒client等待线程
- }
方法一: public void run() { ••• while (waitForWork()) { receiveResponse(); //具体的处理方法 } close(); ••• } 方法二: private void receiveResponse() { if (shouldCloseConnection.get()) { return; } touch(); try { int id = in.readInt(); // 阻塞读取id if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + id); Call call = calls.get(id); //在calls池中找到发送时的那个对象 int state = in.readInt(); // 阻塞读取call对象的状态 if (state == Status.SUCCESS.state) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // 读取数据 //将读取到的值赋给call对象,同时唤醒Client等待线程,贴出setValue()代码方法三 call.setValue(value); calls.remove(id); //删除已处理的call } else if (state == Status.ERROR.state) { ••• } else if (state == Status.FATAL.state) { ••• } } catch (IOException e) { markClosed(e); } } 方法三: public synchronized void setValue(Writable value) { this.value = value; callComplete(); //具体实现 } protected synchronized void callComplete() { this.done = true; notify(); // 唤醒client等待线程 }
代码九完成的功能主要是:启动一个处理线程,读取从服务端传来的call对象,将call对象读取完毕后,唤醒client处理线程。就这么简单,客户端就获取了服务端返回的数据了哦~。客户端的源码分析就到这里了哦,下面我们来分析Server端的源码吧。
四.ipc.Server源码分析
同样,为了让大家对ipc.Server有个初步的了解,我们先分析一下它的几个内部类吧:
Call :用于存储客户端发来的请求
Listener : 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。
Responder :响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
Connection :连接类,真正的客户端请求读取逻辑在这个类中。
Handler :请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。
如果你看过ipc.Server的源码,你会发现其实ipc.Server是一个abstract修饰的抽象类。那随之而来的问题就是:hadoop是怎样初始化RPC的Server端的呢?这个问题着实也让我想了好长时间。不过后来我想到Namenode初始化时一定初始化了RPC的Sever端,那我们去看看Namenode的初始化源码吧:
1. 初始化Server
代码十:
- private void initialize(Configuration conf) throws IOException {
- •••
- // 创建 rpc server
- InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
- if (dnSocketAddr != null) {
- int serviceHandlerCount =
- conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
- DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
- //获得serviceRpcServer
- this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
- dnSocketAddr.getPort(), serviceHandlerCount,
- false, conf, namesystem.getDelegationTokenSecretManager());
- this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
- setRpcServiceServerAddress(conf);
- }
- //获得server
- this.server = RPC.getServer(this, socAddr.getHostName(),
- socAddr.getPort(), handlerCount, false, conf, namesystem
- .getDelegationTokenSecretManager());
- •••
- this.server.start(); //启动 RPC server Clients只允许连接该server
- if (serviceRpcServer != null) {
- serviceRpcServer.start(); //启动 RPC serviceRpcServer 为HDFS服务的server
- }
- startTrashEmptier(conf);
- }
private void initialize(Configuration conf) throws IOException { ••• // 创建 rpc server InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf); if (dnSocketAddr != null) { int serviceHandlerCount = conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); //获得serviceRpcServer this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount, false, conf, namesystem.getDelegationTokenSecretManager()); this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress(); setRpcServiceServerAddress(conf); } //获得server this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(), handlerCount, false, conf, namesystem .getDelegationTokenSecretManager()); ••• this.server.start(); //启动 RPC server Clients只允许连接该server if (serviceRpcServer != null) { serviceRpcServer.start(); //启动 RPC serviceRpcServer 为HDFS服务的server } startTrashEmptier(conf); }
查看Namenode初始化源码得知:RPC的server对象是通过ipc.RPC类的getServer()方法获得的。下面咱们去看看ipc.RPC类中的getServer()源码吧:
代码十一:
- public static Server getServer(final Object instance, final String bindAddress, final int port,
- final int numHandlers,
- final boolean verbose, Configuration conf,
- SecretManager<? extends TokenIdentifier> secretManager)
- throws IOException {
- return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
- }
public static Server getServer(final Object instance, final String bindAddress, final int port, final int numHandlers, final boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager); }
这时我们发现getServer()是一个创建Server对象的工厂方法,但创建的却是RPC.Server类的对象。哈哈,现在你明白了我前面说的“RPC.Server是ipc.Server的实现类”了吧。不过RPC.Server的构造函数还是调用了ipc.Server类的构造函数的,因篇幅所限,就不贴出相关源码了。
2. 运行Server
如代码十所示,初始化Server后,Server端就运行起来了,看看ipc.Server的start()源码吧:
代码十二:
- /** 启动服务 */
- public synchronized void start() {
- responder.start(); //启动responder
- listener.start(); //启动listener
- handlers = new Handler[handlerCount];
- for (int i = 0; i < handlerCount; i++) {
- handlers[i] = new Handler(i);
- handlers[i].start(); //逐个启动Handler
- }
- }
/** 启动服务 */ public synchronized void start() { responder.start(); //启动responder listener.start(); //启动listener handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); //逐个启动Handler } }
3. Server处理请求
1)建立连接
分析过ipc.Client源码后,我们知道Client端的底层通信直接采用了阻塞式IO编程,当时我们曾做出猜测:Server端是不是也采用了阻塞式IO。现在我们仔细地分析一下吧,如果Server端也采用阻塞式IO,当连接进来的Client端很多时,势必会影响Server端的性能。hadoop的实现者们考虑到了这点,所以他们采用了java NIO来实现Server端,java NIO可参考博客: http://weixiaolu.iteye.com/blog/1479656 。那Server端采用java NIO是怎么建立连接的呢?分析源码得知,Server端采用Listener监听客户端的连接,下面先分析一下Listener的构造函数吧:
代码十三:
- public Listener() throws IOException {
- address = new InetSocketAddress(bindAddress, port);
- // 创建ServerSocketChannel,并设置成非阻塞式
- acceptChannel = ServerSocketChannel.open();
- acceptChannel.configureBlocking(false);
- // 将server socket绑定到本地端口
- bind(acceptChannel.socket(), address, backlogLength);
- port = acceptChannel.socket().getLocalPort();
- // 获得一个selector
- selector= Selector.open();
- readers = new Reader[readThreads];
- readPool = Executors.newFixedThreadPool(readThreads);
- //启动多个reader线程,为了防止请求多时服务端响应延时的问题
- for (int i = 0; i < readThreads; i++) {
- Selector readSelector = Selector.open();
- Reader reader = new Reader(readSelector);
- readers[i] = reader;
- readPool.execute(reader);
- }
- // 注册连接事件
- acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
- this.setName(“IPC Server listener on “ + port);
- this.setDaemon(true);
- }
public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // 创建ServerSocketChannel,并设置成非阻塞式 acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // 将server socket绑定到本地端口 bind(acceptChannel.socket(), address, backlogLength); port = acceptChannel.socket().getLocalPort(); // 获得一个selector selector= Selector.open(); readers = new Reader[readThreads]; readPool = Executors.newFixedThreadPool(readThreads); //启动多个reader线程,为了防止请求多时服务端响应延时的问题 for (int i = 0; i < readThreads; i++) { Selector readSelector = Selector.open(); Reader reader = new Reader(readSelector); readers[i] = reader; readPool.execute(reader); } // 注册连接事件 acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); }
在启动Listener线程时,服务端会一直等待客户端的连接,下面贴出Server.Listener类的run()方法:
代码十四:
- public void run() {
- •••
- while (running) {
- SelectionKey key = null;
- try {
- selector.select();
- Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
- while (iter.hasNext()) {
- key = iter.next();
- iter.remove();
- try {
- if (key.isValid()) {
- if (key.isAcceptable())
- doAccept(key); //具体的连接方法
- }
- } catch (IOException e) {
- }
- key = null;
- }
- } catch (OutOfMemoryError e) {
- •••
- }
public void run() { ••• while (running) { SelectionKey key = null; try { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) doAccept(key); //具体的连接方法 } } catch (IOException e) { } key = null; } } catch (OutOfMemoryError e) { ••• }
下面贴出Server.Listener类中doAccept ()方法中的关键源码吧:
代码十五:
- void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
- Connection c = null;
- ServerSocketChannel server = (ServerSocketChannel) key.channel();
- SocketChannel channel;
- while ((channel = server.accept()) != null) { //建立连接
- channel.configureBlocking(false);
- channel.socket().setTcpNoDelay(tcpNoDelay);
- Reader reader = getReader(); //从readers池中获得一个reader
- try {
- reader.startAdd(); // 激活readSelector,设置adding为true
- SelectionKey readKey = reader.registerChannel(channel);//将读事件设置成兴趣事件
- c = new Connection(readKey, channel, System.currentTimeMillis());//创建一个连接对象
- readKey.attach(c); //将connection对象注入readKey
- synchronized (connectionList) {
- connectionList.add(numConnections, c);
- numConnections++;
- }
- •••
- } finally {
- //设置adding为false,采用notify()唤醒一个reader,其实代码十三中启动的每个reader都使
- //用了wait()方法等待。因篇幅有限,就不贴出源码了。
- reader.finishAdd();
- }
- }
- }
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { //建立连接 channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); Reader reader = getReader(); //从readers池中获得一个reader try { reader.startAdd(); // 激活readSelector,设置adding为true SelectionKey readKey = reader.registerChannel(channel);//将读事件设置成兴趣事件 c = new Connection(readKey, channel, System.currentTimeMillis());//创建一个连接对象 readKey.attach(c); //将connection对象注入readKey synchronized (connectionList) { connectionList.add(numConnections, c); numConnections++; } ••• } finally { //设置adding为false,采用notify()唤醒一个reader,其实代码十三中启动的每个reader都使 //用了wait()方法等待。因篇幅有限,就不贴出源码了。 reader.finishAdd(); } } }
当reader被唤醒,reader接着执行doRead()方法。
2)接收请求
下面贴出Server.Listener.Reader类中的doRead()方法和Server.Connection类中的readAndProcess()方法源码:
代码十六:
- 方法一:
- void doRead(SelectionKey key) throws InterruptedException {
- int count = 0;
- Connection c = (Connection)key.attachment(); //获得connection对象
- if (c == null) {
- return;
- }
- c.setLastContact(System.currentTimeMillis());
- try {
- count = c.readAndProcess(); // 接受并处理请求
- } catch (InterruptedException ieo) {
- •••
- }
- •••
- }
- 方法二:
- public int readAndProcess() throws IOException, InterruptedException {
- while (true) {
- •••
- if (!rpcHeaderRead) {
- if (rpcHeaderBuffer == null) {
- rpcHeaderBuffer = ByteBuffer.allocate(2);
- }
- //读取请求头
- count = channelRead(channel, rpcHeaderBuffer);
- if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
- return count;
- }
- // 读取请求版本号
- int version = rpcHeaderBuffer.get(0);
- byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
- •••
- data = ByteBuffer.allocate(dataLength);
- }
- // 读取请求
- count = channelRead(channel, data);
- if (data.remaining() == 0) {
- •••
- if (useSasl) {
- •••
- } else {
- processOneRpc(data.array());//处理请求
- }
- •••
- }
- }
- return count;
- }
- }
方法一: void doRead(SelectionKey key) throws InterruptedException { int count = 0; Connection c = (Connection)key.attachment(); //获得connection对象 if (c == null) { return; } c.setLastContact(System.currentTimeMillis()); try { count = c.readAndProcess(); // 接受并处理请求 } catch (InterruptedException ieo) { ••• } ••• } 方法二: public int readAndProcess() throws IOException, InterruptedException { while (true) { ••• if (!rpcHeaderRead) { if (rpcHeaderBuffer == null) { rpcHeaderBuffer = ByteBuffer.allocate(2); } //读取请求头 count = channelRead(channel, rpcHeaderBuffer); if (count < 0 || rpcHeaderBuffer.remaining() > 0) { return count; } // 读取请求版本号 int version = rpcHeaderBuffer.get(0); byte[] method = new byte[] {rpcHeaderBuffer.get(1)}; ••• data = ByteBuffer.allocate(dataLength); } // 读取请求 count = channelRead(channel, data); if (data.remaining() == 0) { ••• if (useSasl) { ••• } else { processOneRpc(data.array());//处理请求 } ••• } } return count; } }
3)获得call对象
下面贴出Server.Connection类中的processOneRpc()方法和processData()方法的源码。
代码十七:
- 方法一:
- private void processOneRpc(byte[] buf) throws IOException,
- InterruptedException {
- if (headerRead) {
- processData(buf);
- } else {
- processHeader(buf);
- headerRead = true;
- if (!authorizeConnection()) {
- throw new AccessControlException(“Connection from “ + this
- + ” for protocol “ + header.getProtocol()
- + ” is unauthorized for user “ + user);
- }
- }
- }
- 方法二:
- private void processData(byte[] buf) throws IOException, InterruptedException {
- DataInputStream dis =
- new DataInputStream(new ByteArrayInputStream(buf));
- int id = dis.readInt(); // 尝试读取id
- Writable param = ReflectionUtils.newInstance(paramClass, conf);//读取参数
- param.readFields(dis);
- Call call = new Call(id, param, this); //封装成call
- callQueue.put(call); // 将call存入callQueue
- incRpcCount(); // 增加rpc请求的计数
- }
方法一: private void processOneRpc(byte[] buf) throws IOException, InterruptedException { if (headerRead) { processData(buf); } else { processHeader(buf); headerRead = true; if (!authorizeConnection()) { throw new AccessControlException("Connection from " + this + " for protocol " + header.getProtocol() + " is unauthorized for user " + user); } } } 方法二: private void processData(byte[] buf) throws IOException, InterruptedException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); int id = dis.readInt(); // 尝试读取id Writable param = ReflectionUtils.newInstance(paramClass, conf);//读取参数 param.readFields(dis); Call call = new Call(id, param, this); //封装成call callQueue.put(call); // 将call存入callQueue incRpcCount(); // 增加rpc请求的计数 }
4)处理call对象
你还记得Server类中还有个Handler内部类吗?呵呵,对call对象的处理就是它干的。下面贴出Server.Handler类中run()方法中的关键代码:
代码十八:
- while (running) {
- try {
- final Call call = callQueue.take(); //弹出call,可能会阻塞
- •••
- //调用ipc.Server类中的call()方法,但该call()方法是抽象方法,具体实现在RPC.Server类中
- value = call(call.connection.protocol, call.param, call.timestamp);
- synchronized (call.connection.responseQueue) {
- setupResponse(buf, call,
- (error == null) ? Status.SUCCESS : Status.ERROR,
- value, errorClass, error);
- •••
- //给客户端响应请求
- responder.doRespond(call);
- }
- }
while (running) { try { final Call call = callQueue.take(); //弹出call,可能会阻塞 ••• //调用ipc.Server类中的call()方法,但该call()方法是抽象方法,具体实现在RPC.Server类中 value = call(call.connection.protocol, call.param, call.timestamp); synchronized (call.connection.responseQueue) { setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error); ••• //给客户端响应请求 responder.doRespond(call); } }
5)返回请求
下面贴出Server.Responder类中的doRespond()方法源码:
代码十九:
- 方法一:
- void doRespond(Call call) throws IOException {
- synchronized (call.connection.responseQueue) {
- call.connection.responseQueue.addLast(call);
- if (call.connection.responseQueue.size() == 1) {
- // 返回响应结果,并激活writeSelector
- processResponse(call.connection.responseQueue, true);
- }
- }
- }
方法一: void doRespond(Call call) throws IOException { synchronized (call.connection.responseQueue) { call.connection.responseQueue.addLast(call); if (call.connection.responseQueue.size() == 1) { // 返回响应结果,并激活writeSelector processResponse(call.connection.responseQueue, true); } } }
小结:
到这里,hadoop RPC机制的源码分析就结束了,请继续关注我的后续博客:hadoop心跳机制的源码分析。在这里需要感谢我所参考的iteye上相关博主的文章,因太多了,就不一一列举了,不过最感谢的是wikieno的博客,博客地址为: http://www.wikieno.com/2012/02/hadoop-ipc-server/ 。