fabric 中的 gossip 接口,最底层通信接口,实际只有两个操作,所有的 Gossip相关操作都是在这两个接口上堆砌出来的,这两个接口定义在
fabric/protos/gossip/message.proto
// Gossip
service Gossip {
// GossipStream is the gRPC stream used for sending and receiving messages
rpc GossipStream (stream Envelope) returns (stream Envelope) {}
// Ping is used to probe a remote peer's aliveness
rpc Ping (Empty) returns (Empty) {}
}
- GossipStream 用来通信
- Ping用来判断节点状态
可以看出来 fabric 通信方式使用了 grpc 的 stream,没有采用udp方式,在特定情况下可能会对性能产生一定的影响
fabric/gossip/comm/comm_impl.go 实现了最底层接口,我们来分析一下主要实现,只关注核心收发消息,不关注其他细节
消息的接受
commImpl作为Gossip模块的基本收发实现,我们只分析如何收发的,其实已经给出了 proto 文件我们应该很容易猜到,接受消息只需要实现 proto 两个接口的 server 端, 发送消息 只需要实现 proto 的client 就可以了。分别看一下收发细节
收消息 comm_impl.go GossipStream 函数,在这个函数中,前面都是做了一下准备工作,收发消息关键代码有两处
// fabric/gossip/comm/comm_impl.go GossipStream
h := func(m *proto.SignedGossipMessage) {
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
conn: conn,
lock: conn,
SignedGossipMessage: m,
connInfo: connInfo,
})
}
conn.handler = h
return conn.serviceConnection()
对消息进行分发的 handler ,会在接下来的serviceConnection函数里面调用
// fabric/gossip/comm/conn.go
func (conn *connection) serviceConnection() error {
errChan := make(chan error, 1)
msgChan := make(chan *proto.SignedGossipMessage, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize))
defer close(msgChan)
// Call stream.Recv() asynchronously in readFromStream(),
// and wait for either the Recv() call to end,
// or a signal to close the connection, which exits
// the method and makes the Recv() call to fail in the
// readFromStream() method
go conn.readFromStream(errChan, msgChan)
go conn.writeToStream()
for !conn.toDie() {
select {
case stop := <-conn.stopChan:
conn.logger.Debug("Closing reading from stream")
conn.stopChan <- stop
return nil
case err := <-errChan:
return err
case msg := <-msgChan:
conn.handler(msg)
}
}
return nil
}
开启了两个goroutine, 分别进行收发, go conn.readFromStream(errChan, msgChan),这个函数相当于在 stream 上不停的接收消息,收到了 就给 msgChan, 在下面这个for循环中会调用 conn.handler(msg) 来处理收到的消息, go conn.writeToStream() 是写消息,后续再讲
conn.handler(msg) 实际就是之前说过的 c.msgPublisher.DeMultiplex 函数,那么我们接下来分析一下 这个函数对消息做了什么
// fabric/gossip/comm/demux.go
// DeMultiplex broadcasts the message to all channels that were returned
// by AddChannel calls and that hold the respected predicates.
func (m *ChannelDeMultiplexer) DeMultiplex(msg interface{}) {
defer func() {
recover()
}() // recover from sending on a closed channel
if m.isClosed() {
return
}
m.lock.RLock()
channels := m.channels
m.lock.RUnlock()
for _, ch := range channels {
if ch.pred(msg) {
ch.ch <- msg
}
}
}
根据代码可以看出, 其实就是 将所有的 channel 拿出来,每个 调用一下 ch.pred(msg) 如果返回值为真,就将消息 发送给channel,其实到这里已经比较清楚了,相当于 哪个channel 想接受消息,就调用 AddChannel ,然后就可以接收到消息了,我们先看一下 AddChannel 函数
// fabric/gossip/comm/demux.go
// AddChannel registers a channel with a certain predicate
func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) chan interface{} {
m.lock.Lock()
defer m.lock.Unlock()
ch := &channel{ch: make(chan interface{}, 10), pred: predicate}
m.channels = append(m.channels, ch)
return ch.ch
}
根据代码可以很清晰的看出来,这个函数值需要一个 common.MessageAcceptor 函数类型的参数,返回 接收到的消息 chan (上面已经分析过了返回的这个 chan 就是接收到的消息的 chan), 对于 common.MessageAcceptor 也很容易看出来就是一个消息过滤器,可以自定义规则想接收哪些消息, 全文搜索一下 AddChannel 一下,可以很容易发现 就是实现 gossip service 的 Accept
到这里已经很清晰了,接收消息总过就进行如下几个过程,
- 实现message.proto 的 GossipStream 接口,启动一个goroutine 不停的在 grpc stream 上面接收消息(go conn.readFromStream(errChan, msgChan))
- 接收到消息以后,使用 DeMultiplex 函数 像 注册过的Channel中分发(也就是调用了 AddChannel) 的分发
- 在 AddChannel的时候给消息添加了一个过滤器
消息的发送
发送代码很明显在下面这个函数
// fabric/gossip/comm/comm_impl.go
func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer) {
if c.isStopping() || len(peers) == 0 {
return
}
c.logger.Debug("Entering, sending", msg, "to ", len(peers), "peers")
for _, peer := range peers {
go func(peer *RemotePeer, msg *proto.SignedGossipMessage) {
c.sendToEndpoint(peer, msg)
}(peer, msg)
}
}
这个函数很简单就是 msg, peers 两个参数,将 msg 发给 所有的 peer,有一点需要注意下 在发送使用了 go ,这样可以提高发送效率,相当于同时给 所有 peer 发送。接下来 看看 sendToEndpoint 函数
// fabric/gossip/comm/comm_impl.go
func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage) {
if c.isStopping() {
return
}
c.logger.Debug("Entering, Sending to", peer.Endpoint, ", msg:", msg)
defer c.logger.Debug("Exiting")
var err error
conn, err := c.connStore.getConnection(peer)
if err == nil {
disConnectOnErr := func(err error) {
c.logger.Warning(peer, "isn't responsive:", err)
c.disconnect(peer.PKIID)
}
conn.send(msg, disConnectOnErr)
return
}
c.logger.Warning("Failed obtaining connection for", peer, "reason:", err)
c.disconnect(peer.PKIID)
}
这个函数我们只分析两句代码 getConnection, conn.send(), 一个是获取conn, 一个是发送消息
getConnection
// fabric/gossip/comm/conn.go
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {
cs.RLock()
isClosing := cs.isClosing
cs.RUnlock()
if isClosing {
return nil, errors.New("Shutting down")
}
pkiID := peer.PKIID
endpoint := peer.Endpoint
cs.Lock()
destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]
if !hasConnected {
destinationLock = &sync.RWMutex{}
cs.destinationLocks[string(pkiID)] = destinationLock
}
cs.Unlock()
destinationLock.Lock()
cs.RLock()
conn, exists := cs.pki2Conn[string(pkiID)]
if exists {
cs.RUnlock()
destinationLock.Unlock()
return conn, nil
}
cs.RUnlock()
createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)
destinationLock.Unlock()
cs.RLock()
isClosing = cs.isClosing
cs.RUnlock()
if isClosing {
return nil, errors.New("ConnStore is closing")
}
cs.Lock()
delete(cs.destinationLocks, string(pkiID))
defer cs.Unlock()
// check again, maybe someone connected to us during the connection creation?
conn, exists = cs.pki2Conn[string(pkiID)]
if exists {
if createdConnection != nil {
createdConnection.close()
}
return conn, nil
}
// no one connected to us AND we failed connecting!
if err != nil {
return nil, err
}
// at this point in the code, we created a connection to a remote peer
conn = createdConnection
cs.pki2Conn[string(createdConnection.pkiID)] = conn
go conn.serviceConnection()
return conn, nil
}
这个函数代码很多,对于主逻辑只有两句,一个是 createConnection,一个是 conn.serviceConnection(前文已经分析过,就是打开两个goroutine,分别监听接收,发送) ,其余的都是优化,逻辑完成性的代码,无需关心, 我们看看 createConnection
// fabric/gossip/comm/comm_impl.go
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
var err error
var cc *grpc.ClientConn
var stream proto.Gossip_GossipStreamClient
var pkiID common.PKIidType
var connInfo *proto.ConnectionInfo
c.logger.Debug("Entering", endpoint, expectedPKIID)
defer c.logger.Debug("Exiting")
if c.isStopping() {
return nil, errors.New("Stopping")
}
cc, err = grpc.Dial(endpoint, append(c.opts, grpc.WithBlock())...)
if err != nil {
return nil, err
}
cl := proto.NewGossipClient(cc)
if _, err = cl.Ping(context.Background(), &proto.Empty{}); err != nil {
cc.Close()
return nil, err
}
if stream, err = cl.GossipStream(context.Background()); err == nil {
connInfo, err = c.authenticateRemotePeer(stream)
if err == nil {
pkiID = connInfo.ID
if expectedPKIID != nil && !bytes.Equal(pkiID, expectedPKIID) {
// PKIID is nil when we don't know the remote PKI id's
c.logger.Warning("Remote endpoint claims to be a different peer, expected", expectedPKIID, "but got", pkiID)
return nil, errors.New("Authentication failure")
}
conn := newConnection(cl, cc, stream, nil)
conn.pkiID = pkiID
conn.info = connInfo
conn.logger = c.logger
h := func(m *proto.SignedGossipMessage) {
c.logger.Debug("Got message:", m)
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
conn: conn,
lock: conn,
SignedGossipMessage: m,
connInfo: connInfo,
})
}
conn.handler = h
return conn, nil
}
}
cc.Close()
return nil, err
}
代码很长,不过我们只需要简单看一下,可以很明显的看出来就是我们文章开头猜测的,发送消息就是 实现 message.proto 里面的client,其余的都是对这个client的封装,对于理解代码而言不需要太关注
接下来我们只剩下一个 conn.send()
// fabric/gossip/comm/conn.go
func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error)) {
conn.Lock()
defer conn.Unlock()
if len(conn.outBuff) == util.GetIntOrDefault("peer.gossip.sendBuffSize", defSendBuffSize) {
go onErr(errSendOverflow)
return
}
m := &msgSending{
envelope: msg.Envelope,
onErr: onErr,
}
conn.outBuff <- m
}
代码异常简单就是 将msg 传送给 conn.outBuff, 其实我们看到这里已经可以确定肯定有另外一个地方在 等着 conn.outBuff 发送过来的消息,然后写进去,聪明的读者已经想到了,没错,就是前面 serviceConnection 中的 go writeToStream()
func (conn *connection) writeToStream() {
for !conn.toDie() {
stream := conn.getStream()
if stream == nil {
conn.logger.Error(conn.pkiID, "Stream is nil, aborting!")
return
}
select {
case m := <-conn.outBuff:
err := stream.Send(m.envelope)
if err != nil {
go m.onErr(err)
return
}
break
case stop := <-conn.stopChan:
conn.logger.Debug("Closing writing to stream")
conn.stopChan <- stop
return
}
}
}
代码清晰明了, 利用 grpc stream 进行写消息,至此 我们已经把 fabric 中 gossip 模块的最底层收发消息分析清楚了,gossip 模块所有的功能都是在这个代码基础上进行的
总结
读代码我一直没有找到特别好的办法,我现在的方法就是 大致浏览一下代码,知道代码结构,然后从底层一层一层的读,读代码的时候一定不能纠结细节,先读懂主线,对于其他的关心的在一点点看。