fabric gossip 源码解析 fabric/gossip/comm/comm_impl.go

fabric 中的 gossip 接口,最底层通信接口,实际只有两个操作,所有的 Gossip相关操作都是在这两个接口上堆砌出来的,这两个接口定义在
fabric/protos/gossip/message.proto

// Gossip
service Gossip {

    // GossipStream is the gRPC stream used for sending and receiving messages
    rpc GossipStream (stream Envelope) returns (stream Envelope) {}

    // Ping is used to probe a remote peer's aliveness
    rpc Ping (Empty) returns (Empty) {}
}

  1. GossipStream 用来通信
  2. Ping用来判断节点状态

可以看出来 fabric 通信方式使用了 grpc 的 stream,没有采用udp方式,在特定情况下可能会对性能产生一定的影响

fabric/gossip/comm/comm_impl.go 实现了最底层接口,我们来分析一下主要实现,只关注核心收发消息,不关注其他细节

消息的接受

commImpl作为Gossip模块的基本收发实现,我们只分析如何收发的,其实已经给出了 proto 文件我们应该很容易猜到,接受消息只需要实现 proto 两个接口的 server 端, 发送消息 只需要实现 proto 的client 就可以了。分别看一下收发细节

收消息 comm_impl.go GossipStream 函数,在这个函数中,前面都是做了一下准备工作,收发消息关键代码有两处

    // fabric/gossip/comm/comm_impl.go GossipStream

    h := func(m *proto.SignedGossipMessage) {
        c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
            conn:                conn,
            lock:                conn,
            SignedGossipMessage: m,
            connInfo:            connInfo,
        })
    }
    conn.handler = h
    return conn.serviceConnection()

对消息进行分发的 handler ,会在接下来的serviceConnection函数里面调用

// fabric/gossip/comm/conn.go

func (conn *connection) serviceConnection() error {
    errChan := make(chan error, 1)
    msgChan := make(chan *proto.SignedGossipMessage, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize))
    defer close(msgChan)

    // Call stream.Recv() asynchronously in readFromStream(),
    // and wait for either the Recv() call to end,
    // or a signal to close the connection, which exits
    // the method and makes the Recv() call to fail in the
    // readFromStream() method
    go conn.readFromStream(errChan, msgChan)

    go conn.writeToStream()

    for !conn.toDie() {
        select {
        case stop := <-conn.stopChan:
            conn.logger.Debug("Closing reading from stream")
            conn.stopChan <- stop
            return nil
        case err := <-errChan:
            return err
        case msg := <-msgChan:
            conn.handler(msg)
        }
    }
    return nil
}

开启了两个goroutine, 分别进行收发, go conn.readFromStream(errChan, msgChan),这个函数相当于在 stream 上不停的接收消息,收到了 就给 msgChan, 在下面这个for循环中会调用 conn.handler(msg) 来处理收到的消息, go conn.writeToStream() 是写消息,后续再讲

conn.handler(msg) 实际就是之前说过的 c.msgPublisher.DeMultiplex 函数,那么我们接下来分析一下 这个函数对消息做了什么

// fabric/gossip/comm/demux.go

// DeMultiplex broadcasts the message to all channels that were returned
// by AddChannel calls and that hold the respected predicates.
func (m *ChannelDeMultiplexer) DeMultiplex(msg interface{}) {
    defer func() {
        recover()
    }() // recover from sending on a closed channel

    if m.isClosed() {
        return
    }

    m.lock.RLock()
    channels := m.channels
    m.lock.RUnlock()

    for _, ch := range channels {
        if ch.pred(msg) {
            ch.ch <- msg
        }
    }
}

根据代码可以看出, 其实就是 将所有的 channel 拿出来,每个 调用一下 ch.pred(msg) 如果返回值为真,就将消息 发送给channel,其实到这里已经比较清楚了,相当于 哪个channel 想接受消息,就调用 AddChannel ,然后就可以接收到消息了,我们先看一下 AddChannel 函数

// fabric/gossip/comm/demux.go

// AddChannel registers a channel with a certain predicate
func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) chan interface{} {
    m.lock.Lock()
    defer m.lock.Unlock()
    ch := &channel{ch: make(chan interface{}, 10), pred: predicate}
    m.channels = append(m.channels, ch)
    return ch.ch
}

根据代码可以很清晰的看出来,这个函数值需要一个 common.MessageAcceptor 函数类型的参数,返回 接收到的消息 chan (上面已经分析过了返回的这个 chan 就是接收到的消息的 chan), 对于 common.MessageAcceptor 也很容易看出来就是一个消息过滤器,可以自定义规则想接收哪些消息, 全文搜索一下 AddChannel 一下,可以很容易发现 就是实现 gossip service 的 Accept

到这里已经很清晰了,接收消息总过就进行如下几个过程,

  1. 实现message.proto 的 GossipStream 接口,启动一个goroutine 不停的在 grpc stream 上面接收消息(go conn.readFromStream(errChan, msgChan))
  2. 接收到消息以后,使用 DeMultiplex 函数 像 注册过的Channel中分发(也就是调用了 AddChannel) 的分发
  3. 在 AddChannel的时候给消息添加了一个过滤器

消息的发送

发送代码很明显在下面这个函数

// fabric/gossip/comm/comm_impl.go

func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer) {
    if c.isStopping() || len(peers) == 0 {
        return
    }

    c.logger.Debug("Entering, sending", msg, "to ", len(peers), "peers")

    for _, peer := range peers {
        go func(peer *RemotePeer, msg *proto.SignedGossipMessage) {
            c.sendToEndpoint(peer, msg)
        }(peer, msg)
    }
}

这个函数很简单就是 msg, peers 两个参数,将 msg 发给 所有的 peer,有一点需要注意下 在发送使用了 go ,这样可以提高发送效率,相当于同时给 所有 peer 发送。接下来 看看 sendToEndpoint 函数

// fabric/gossip/comm/comm_impl.go

func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage) {
    if c.isStopping() {
        return
    }
    c.logger.Debug("Entering, Sending to", peer.Endpoint, ", msg:", msg)
    defer c.logger.Debug("Exiting")
    var err error

    conn, err := c.connStore.getConnection(peer)
    if err == nil {
        disConnectOnErr := func(err error) {
            c.logger.Warning(peer, "isn't responsive:", err)
            c.disconnect(peer.PKIID)
        }
        conn.send(msg, disConnectOnErr)
        return
    }
    c.logger.Warning("Failed obtaining connection for", peer, "reason:", err)
    c.disconnect(peer.PKIID)
}

这个函数我们只分析两句代码 getConnection, conn.send(), 一个是获取conn, 一个是发送消息

getConnection

// fabric/gossip/comm/conn.go

func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {
    cs.RLock()
    isClosing := cs.isClosing
    cs.RUnlock()

    if isClosing {
        return nil, errors.New("Shutting down")
    }

    pkiID := peer.PKIID
    endpoint := peer.Endpoint

    cs.Lock()
    destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]
    if !hasConnected {
        destinationLock = &sync.RWMutex{}
        cs.destinationLocks[string(pkiID)] = destinationLock
    }
    cs.Unlock()

    destinationLock.Lock()

    cs.RLock()
    conn, exists := cs.pki2Conn[string(pkiID)]
    if exists {
        cs.RUnlock()
        destinationLock.Unlock()
        return conn, nil
    }
    cs.RUnlock()

    createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)

    destinationLock.Unlock()

    cs.RLock()
    isClosing = cs.isClosing
    cs.RUnlock()
    if isClosing {
        return nil, errors.New("ConnStore is closing")
    }

    cs.Lock()
    delete(cs.destinationLocks, string(pkiID))
    defer cs.Unlock()

    // check again, maybe someone connected to us during the connection creation?
    conn, exists = cs.pki2Conn[string(pkiID)]

    if exists {
        if createdConnection != nil {
            createdConnection.close()
        }
        return conn, nil
    }

    // no one connected to us AND we failed connecting!
    if err != nil {
        return nil, err
    }

    // at this point in the code, we created a connection to a remote peer
    conn = createdConnection
    cs.pki2Conn[string(createdConnection.pkiID)] = conn

    go conn.serviceConnection()

    return conn, nil
}

这个函数代码很多,对于主逻辑只有两句,一个是 createConnection,一个是 conn.serviceConnection(前文已经分析过,就是打开两个goroutine,分别监听接收,发送) ,其余的都是优化,逻辑完成性的代码,无需关心, 我们看看 createConnection

// fabric/gossip/comm/comm_impl.go

func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
    var err error
    var cc *grpc.ClientConn
    var stream proto.Gossip_GossipStreamClient
    var pkiID common.PKIidType
    var connInfo *proto.ConnectionInfo

    c.logger.Debug("Entering", endpoint, expectedPKIID)
    defer c.logger.Debug("Exiting")

    if c.isStopping() {
        return nil, errors.New("Stopping")
    }
    cc, err = grpc.Dial(endpoint, append(c.opts, grpc.WithBlock())...)
    if err != nil {
        return nil, err
    }

    cl := proto.NewGossipClient(cc)

    if _, err = cl.Ping(context.Background(), &proto.Empty{}); err != nil {
        cc.Close()
        return nil, err
    }

    if stream, err = cl.GossipStream(context.Background()); err == nil {
        connInfo, err = c.authenticateRemotePeer(stream)
        if err == nil {
            pkiID = connInfo.ID
            if expectedPKIID != nil && !bytes.Equal(pkiID, expectedPKIID) {
                // PKIID is nil when we don't know the remote PKI id's
                c.logger.Warning("Remote endpoint claims to be a different peer, expected", expectedPKIID, "but got", pkiID)
                return nil, errors.New("Authentication failure")
            }
            conn := newConnection(cl, cc, stream, nil)
            conn.pkiID = pkiID
            conn.info = connInfo
            conn.logger = c.logger

            h := func(m *proto.SignedGossipMessage) {
                c.logger.Debug("Got message:", m)
                c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
                    conn:                conn,
                    lock:                conn,
                    SignedGossipMessage: m,
                    connInfo:            connInfo,
                })
            }
            conn.handler = h
            return conn, nil
        }
    }
    cc.Close()
    return nil, err
}

代码很长,不过我们只需要简单看一下,可以很明显的看出来就是我们文章开头猜测的,发送消息就是 实现 message.proto 里面的client,其余的都是对这个client的封装,对于理解代码而言不需要太关注

接下来我们只剩下一个 conn.send()

// fabric/gossip/comm/conn.go

func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error)) {
    conn.Lock()
    defer conn.Unlock()

    if len(conn.outBuff) == util.GetIntOrDefault("peer.gossip.sendBuffSize", defSendBuffSize) {
        go onErr(errSendOverflow)
        return
    }

    m := &msgSending{
        envelope: msg.Envelope,
        onErr:    onErr,
    }

    conn.outBuff <- m
}

代码异常简单就是 将msg 传送给 conn.outBuff, 其实我们看到这里已经可以确定肯定有另外一个地方在 等着 conn.outBuff 发送过来的消息,然后写进去,聪明的读者已经想到了,没错,就是前面 serviceConnection 中的 go writeToStream()

func (conn *connection) writeToStream() {
    for !conn.toDie() {
        stream := conn.getStream()
        if stream == nil {
            conn.logger.Error(conn.pkiID, "Stream is nil, aborting!")
            return
        }
        select {
        case m := <-conn.outBuff:
            err := stream.Send(m.envelope)
            if err != nil {
                go m.onErr(err)
                return
            }
            break
        case stop := <-conn.stopChan:
            conn.logger.Debug("Closing writing to stream")
            conn.stopChan <- stop
            return
        }
    }
}

代码清晰明了, 利用 grpc stream 进行写消息,至此 我们已经把 fabric 中 gossip 模块的最底层收发消息分析清楚了,gossip 模块所有的功能都是在这个代码基础上进行的

总结

读代码我一直没有找到特别好的办法,我现在的方法就是 大致浏览一下代码,知道代码结构,然后从底层一层一层的读,读代码的时候一定不能纠结细节,先读懂主线,对于其他的关心的在一点点看。

    原文作者:上海大坤哥
    原文地址: https://www.jianshu.com/p/82db9b2fa197
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞