grpc 超时和重连

最近项目要使用grpc,但是关于grpc的超时和重连这一块很多文章都是说的不够详细,无奈只能自己看代码.顺手记录一下。

超时

建立连接

主要就2函数Dail和DialContext。

// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
    return DialContext(context.Background(), target, opts...)
}
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error){...}

DialContext 太长了不帖了.看Dial实际上也是调用DialContext来实现的.如果你想在建立连接的时候使用超时控制.就使用DialContext传入一个Timeout的context,就像下面的例子

ctx1, cel := context.WithTimeout(context.Background(), time.Second*3)
defer cel()
conn, err := grpc.DialContext(ctx1, address, grpc.WithBlock(), grpc.WithInsecure())

另外调用Dial建立连接默认只是返回一个ClientConn的指针,相当于new了一个ClientConn 把指针返回给你。并不是一定要建立真实的h2连接.至于真实的连接建立实际上是一个异步的过程。当然了如果你想等真实的链接完全建立再返回ClientConn可以通过WithBlock传入Options来实现,当然了这样的话链接如果建立不成功就会一直阻塞直到Contex超时。真正的建立链接的代码后面介绍重试的时候会再详细介绍。

调用超时

这个比较简单

ctx, cancel := context.WithTimeout(context.TODO(), time.Second*3)
defer cancel()
 r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})

如上代码传入一个timeout context就可以。

重连

假设我们想这样一个问题,刚才我们说Dial实际上是new了一个ClientConn.真实的连接建立在另外一个协程中,那这个协程是建立连接后就退出了呢,还是还在运行。另外如果我们退出服务端然后启动客户端会重新建立链接吗,如果是那又是如何重试的。

grpc调用的时候启动的协程

要回答第一个问题,很简单我们在client代码中启动pprof看看有哪些协程在跑。

   go func() {
        log.Println(http.ListenAndServe("localhost:6006", nil))
    }()
main.main.func1()
    /Users/myonlyzzy/go/src/google.golang.org/grpc/examples/helloworld/greeter_client/main.go:41 +0x3e
created by main.main
    /Users/myonlyzzy/go/src/google.golang.org/grpc/examples/helloworld/greeter_client/main.go:40 +0x47

goroutine 6 [select]:
google.golang.org/grpc.(*ccResolverWrapper).watcher(0xc4201941e0)
    /Users/myonlyzzy/go/src/google.golang.org/grpc/resolver_conn_wrapper.go:110 +0x182
created by google.golang.org/grpc.(*ccResolverWrapper).start
    /Users/myonlyzzy/go/src/google.golang.org/grpc/resolver_conn_wrapper.go:96 +0x3f

goroutine 7 [select]:
google.golang.org/grpc.(*ccBalancerWrapper).watcher(0xc42006e280)
    /Users/myonlyzzy/go/src/google.golang.org/grpc/balancer_conn_wrappers.go:122 +0x14a
created by google.golang.org/grpc.newCCBalancerWrapper
    /Users/myonlyzzy/go/src/google.golang.org/grpc/balancer_conn_wrappers.go:113 +0x14c

goroutine 8 [select]:
google.golang.org/grpc.(*addrConn).transportMonitor(0xc42019e280)
    /Users/myonlyzzy/go/src/google.golang.org/grpc/clientconn.go:1240 +0x235
google.golang.org/grpc.(*addrConn).connect.func1(0xc42019e280)
    /Users/myonlyzzy/go/src/google.golang.org/grpc/clientconn.go:839 +0x216
created by google.golang.org/grpc.(*addrConn).connect
    /Users/myonlyzzy/go/src/google.golang.org/grpc/clientconn.go:829 +0xe1

我们看到有一个transportMonitor的协程一直阻塞在select中.代码都在clientconn.go 中。我们进去看看其实有4个主要的方法.

func (ac *addrConn) connect() error 
func (ac *addrConn) resetTransport() error
func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error)
func (ac *addrConn) transportMonitor()

connect

    // Start a goroutine connecting to the server asynchronously.
    go func() {
        if err := ac.resetTransport(); err != nil {
            log.Printf("resetTransport %v ",err)
            grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
            if err != errConnClosing {
                // Keep this ac in cc.conns, to get the reason it's torn down.
                ac.tearDown(err)
            }
            return
        }
        ac.transportMonitor()
    }()
    return nil

上面的是connect的一部分。connect会调用resetTransport来建立链接。再启动transportMonitor来监控链接的情况。

resetTransport

for connectRetryNum := 0; ; connectRetryNum++ {
       ac.mu.Lock()
       if ac.backoffDeadline.IsZero() {
           // This means either a successful HTTP2 connection was established
           // or this is the first time this addrConn is trying to establish a
           // connection.
           backoffFor := ac.dopts.bs.backoff(connectRetryNum) // time.Duration.
           // This will be the duration that dial gets to finish.
           dialDuration := minConnectTimeout
           if backoffFor > dialDuration {
               // Give dial more time as we keep failing to connect.
               dialDuration = backoffFor
           }
           start := time.Now()
           backoffDeadline = start.Add(backoffFor)
           connectDeadline = start.Add(dialDuration)
           ridx = 0 // Start connecting from the beginning.
       } else {
           // Continue trying to conect with the same deadlines.
           connectRetryNum = ac.connectRetryNum
           backoffDeadline = ac.backoffDeadline
           connectDeadline = ac.connectDeadline
           ac.backoffDeadline = time.Time{}
           ac.connectDeadline = time.Time{}
           ac.connectRetryNum = 0
       }
       if ac.state == connectivity.Shutdown {
           ac.mu.Unlock()
           return errConnClosing
       }
       ac.printf("connecting")
       if ac.state != connectivity.Connecting {
           ac.state = connectivity.Connecting
           ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
       }
       // copy ac.addrs in case of race
       addrsIter := make([]resolver.Address, len(ac.addrs))
       copy(addrsIter, ac.addrs)
       copts := ac.dopts.copts
       ac.mu.Unlock()
       connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts)
       if err != nil {
           return err
       }
       if connected {
           return nil
       }

   }

resetTransport 主要内容就是一个for 循环,可以看到在这个for循环中会尝试建立链接。如果建立成功就返回一个nil。如果不成功会不断重试下去。实际上不管是开头的Dial或者Dial完了关闭服务器后都是由这段代码来建立真实的链接。这也就是如果你使用withBlock 但是不使用超时的话会不断的重试下去。中途断掉也会不断重联。当然了重连的过程中是使用了backoff算法来重连。而且默认会在grpc的配置中有个默认最大重试间隔时间。默认是120.

var DefaultBackoffConfig = BackoffConfig{
    MaxDelay:  120 * time.Second,
    baseDelay: 1.0 * time.Second,
    factor:    1.6,
    jitter:    0.2,
}

transportMonitor

for {
        var timer *time.Timer
        var cdeadline <-chan time.Time
        ac.mu.Lock()
        t := ac.transport
        if !ac.connectDeadline.IsZero() {
            timer = time.NewTimer(ac.connectDeadline.Sub(time.Now()))
            cdeadline = timer.C
        }
        ac.mu.Unlock()
        // Block until we receive a goaway or an error occurs.
        select {
        case <-t.GoAway():
        case <-t.Error():
        case <-cdeadline:
            ac.mu.Lock()
            // This implies that client received server preface.
            if ac.backoffDeadline.IsZero() {
                ac.mu.Unlock()
                continue
            }
            ac.mu.Unlock()
            timer = nil
            // No server preface received until deadline.
            // Kill the connection.
            grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.")
            t.Close()
        }

        if timer != nil {
            timer.Stop()
        }
        // If a GoAway happened, regardless of error, adjust our keepalive
        // parameters as appropriate.
        select {
        case <-t.GoAway():
            ac.adjustParams(t.GetGoAwayReason())
        default:
        }
        ac.mu.Lock()
        if ac.state == connectivity.Shutdown {
            ac.mu.Unlock()
            return
        }
        // Set connectivity state to TransientFailure before calling
        // resetTransport. Transition READY->CONNECTING is not valid.
        ac.state = connectivity.TransientFailure
        ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
        ac.cc.resolveNow(resolver.ResolveNowOption{})
        ac.curAddr = resolver.Address{}
        ac.mu.Unlock()
        if err := ac.resetTransport(); err != nil {
            ac.mu.Lock()
            ac.printf("transport exiting: %v", err)
            ac.mu.Unlock()
            grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
            if err != errConnClosing {
                // Keep this ac in cc.conns, to get the reason it's torn down.
                ac.tearDown(err)
            }
            return
        }

    }

monitor也是运行一个for 循环如果连接断开就调用resetTransport重试。

其实我们使用etcdclient的时候的经常要使用一个DialTimeout参数其实那个参数就是用来生成一个TimeOut的Context.用来控制建立链接的超时。

    原文作者:myonlyzzy
    原文地址: https://www.jianshu.com/p/a5dec04d042b
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞