hbase通信主要涵盖了两个技术,一个是google的protobuf rpc通信框架,一个是java的NIO通信;
启动入口
org.apache.hadoop.hbase.regionserver.HRegionServer这个类是regionserver的启动类;
org.apache.hadoop.hbase.master.HMaster这个类是Hmaster的启动类,继承了HRegionServer;
而HRegionServer定义了一个org.apache.hadoop.hbase.regionserver.RSRpcServices变量:
private RSRpcServices rsRpcServices;
- rpcservices主要实现了对于所有客户端请求的核心处理过程;
- rpcservices中使用了一个关键的类RpcServer用来和客户端通信:
rpcServer = new RpcServer(rs, name, getServices(),
bindAddress, // use final bindAddress for this server.
rs.conf,
rpcSchedulerFactory.create(rs.conf, this, rs));
因此整个通信过程最核心的就是这两个类:RSRpcServices和RpcServer
利用google的protobuf 实现rpc通信
hbase的protobuf的使用流程如下:
1.在Client.proto中定义service:ClientService,通过protobuf提供的命令生成对应服务接口和message类
service ClientService {
rpc Get(GetRequest)
returns(GetResponse);
rpc Scan(ScanRequest)
returns(ScanResponse);
rpc BulkLoadHFile(BulkLoadHFileRequest)
returns(BulkLoadHFileResponse);
rpc ExecService(CoprocessorServiceRequest)
returns(CoprocessorServiceResponse);
rpc ExecRegionServerService(CoprocessorServiceRequest)
returns(CoprocessorServiceResponse);
rpc Multi(MultiRequest)
returns(MultiResponse);
}
2.在服务端定义一个类RSRpcServices实现自动生成的接口:
- org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface
- org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface
它们继承自: - org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface
因此,RSRpcServices便是所有请求的具体实现类。因为它实现了所有的请求AdminService和ClientService。
3.在服务端调用 ClientService.newReflectiveBlockingService(final BlockingInterface impl)方法生成对应的com.google.protobuf.BlockingService实现类。
public static com.google.protobuf.BlockingService
newReflectiveBlockingService(final BlockingInterface impl) {
return new com.google.protobuf.BlockingService() {
public final com.google.protobuf.Descriptors.ServiceDescriptor
getDescriptorForType() {
return getDescriptor();
}
public final com.google.protobuf.Message callBlockingMethod(
com.google.protobuf.Descriptors.MethodDescriptor method,
com.google.protobuf.RpcController controller,
com.google.protobuf.Message request)
throws com.google.protobuf.ServiceException {
if (method.getService() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"Service.callBlockingMethod() given method descriptor for " +
"wrong service type.");
}
switch(method.getIndex()) {
case 0:
return impl.get(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest)request);
case 1:
return impl.mutate(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest)request);
case 2:
return impl.scan(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest)request);
case 3:
return impl.bulkLoadHFile(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest)request);
case 4:
return impl.execService(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest)request);
case 5:
return impl.execRegionServerService(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest)request);
case 6:
return impl.multi(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest)request);
default:
throw new java.lang.AssertionError("Can't get here.");
}
}
4.解析客户端发送的请求,映射成MethodDescriptor和Message对象;
此过程主要是以下要讨论的 JAVA NIO做的工作;
5.使用BlockingService执行callBlockingMethod方法进行对客户端请求进行处理
Message callBlockingMethod(MethodDescriptor var1, RpcController var2, Message var3) throws ServiceException;
java NIO
关于 java NIO的使用,主要集中于RpcServer类中:
主要使用了一个listener,但是实际情况这不是一个常见的listener模式,而是用来监听请求的监听器。
// Start the listener here and let it bind to the port
listener = new Listener(name);
而它的实现如下:
public Listener(final String name) throws IOException {
super(name);
backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// Bind the server socket to the binding addrees (can be different from the default interface)
bind(acceptChannel.socket(), bindAddress, backlogLength);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads,
new ThreadFactoryBuilder().setNameFormat(
"RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
",port=" + port).setDaemon(true).build());
for (int i = 0; i < readThreads; ++i) {
Reader reader = new Reader();
readers[i] = reader;
readPool.execute(reader);
}
LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
// Register accepts on the server socket with the selector.
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("RpcServer.listener,port=" + port);
this.setDaemon(true);
}
主要定义了一个acceptChannel,一个selector和多个readers,每个reader对应一个selector;
1.accept connection
它的主线程是监控selector中的accept请求,进行doAccept操作:
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
try {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(tcpKeepAlive);
} catch (IOException ioe) {
channel.close();
throw ioe;
}
Reader reader = getReader();
try {
reader.startAdd();
SelectionKey readKey = reader.registerChannel(channel);
c = getConnection(channel, System.currentTimeMillis());
readKey.attach(c);
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": connection from " + c.toString() +
"; # active connections: " + numConnections);
} finally {
reader.finishAdd();
}
}
}
主要是对每个accept请求创建了一个connection对象,每个connection对应一个读写数据的channel,然后注册channel给某一个reader的selector;
2.read and process操作
对于每个reader线程来说,会对自己selector绑定的所有的SelectionKey进行查看,如果接收到数据,那么对绑定的connection进行处理,最后调用connection的process方法;
解析收到的请求,然后创建请求;通过scheduler执行,
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
totalRequestSize, traceInfo, this.addr);
if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
callQueueSize.add(-1 * call.getSize());
scheduler是整个regionserver处理所有请求的核心,创建scheduler需要用到参数如下,因此hbase.regionserver.handler.count参数决定了同时进行处理请求的handler个数,即regionserver的并发能力。
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
public static final String REGION_SERVER_HANDLER_COUNT = "hbase.regionserver.handler.count";
最后再rpcserver中调用call函数:
Message result = service.callBlockingMethod(md, controller, param);
3.返回数据序列化
上边写到数据的具体执行在CallRunner中,执行结束后调用Call.setResponse方法,
protected synchronized void setResponse(Object m, final CellScanner cells,
Throwable t, String errorMsg) {
...
Message header = headerBuilder.build();
// Organize the response as a set of bytebuffers rather than collect it all together inside
// one big byte array; save on allocations.
ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
(this.cellBlock == null? 0: this.cellBlock.limit());
ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
...
} catch (IOException e) {
LOG.warn("Exception while creating response " + e);
}
this.response = bc;
}
其中通过IPCUtil.getDelimitedMessageAsByteBuffer(result)把messgae数据序列化成buffer,调用google提供的com.google.protobuf.CodedOutputStream实现的序列化方法。
CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
// This will write out the vint preamble and the message serialized.
cos.writeMessageNoTag(m);
5.response数据写回channel
以下是reponder提供的方法:
void doRespond(Call call) throws IOException {
boolean added = false;
// If there is already a write in progress, we don't wait. This allows to free the handlers
// immediately for other tasks.
if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
try {
if (call.connection.responseQueue.isEmpty()) {
// If we're alone, we can try to do a direct call to the socket. It's
// an optimisation to save on context switches and data transfer between cores..
if (processResponse(call)) {
return; // we're done.
}
// Too big to fit, putting ahead.
call.connection.responseQueue.addFirst(call);
added = true; // We will register to the selector later, outside of the lock.
}
} finally {
call.connection.responseWriteLock.unlock();
}
}
if (!added) {
call.connection.responseQueue.addLast(call);
}
call.responder.registerForWrite(call.connection);
// set the serve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();
}
最后将数据写进属于自己的channel中。