最近项目要使用grpc,但是关于grpc的超时和重连这一块很多文章都是说的不够详细,无奈只能自己看代码.顺手记录一下。
超时
建立连接
主要就2函数Dail和DialContext。
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error){...}
DialContext 太长了不帖了.看Dial实际上也是调用DialContext来实现的.如果你想在建立连接的时候使用超时控制.就使用DialContext传入一个Timeout的context,就像下面的例子
ctx1, cel := context.WithTimeout(context.Background(), time.Second*3)
defer cel()
conn, err := grpc.DialContext(ctx1, address, grpc.WithBlock(), grpc.WithInsecure())
另外调用Dial建立连接默认只是返回一个ClientConn的指针,相当于new了一个ClientConn 把指针返回给你。并不是一定要建立真实的h2连接.至于真实的连接建立实际上是一个异步的过程。当然了如果你想等真实的链接完全建立再返回ClientConn可以通过WithBlock传入Options来实现,当然了这样的话链接如果建立不成功就会一直阻塞直到Contex超时。真正的建立链接的代码后面介绍重试的时候会再详细介绍。
调用超时
这个比较简单
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*3)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
如上代码传入一个timeout context就可以。
重连
假设我们想这样一个问题,刚才我们说Dial实际上是new了一个ClientConn.真实的连接建立在另外一个协程中,那这个协程是建立连接后就退出了呢,还是还在运行。另外如果我们退出服务端然后启动客户端会重新建立链接吗,如果是那又是如何重试的。
grpc调用的时候启动的协程
要回答第一个问题,很简单我们在client代码中启动pprof看看有哪些协程在跑。
go func() {
log.Println(http.ListenAndServe("localhost:6006", nil))
}()
main.main.func1()
/Users/myonlyzzy/go/src/google.golang.org/grpc/examples/helloworld/greeter_client/main.go:41 +0x3e
created by main.main
/Users/myonlyzzy/go/src/google.golang.org/grpc/examples/helloworld/greeter_client/main.go:40 +0x47
goroutine 6 [select]:
google.golang.org/grpc.(*ccResolverWrapper).watcher(0xc4201941e0)
/Users/myonlyzzy/go/src/google.golang.org/grpc/resolver_conn_wrapper.go:110 +0x182
created by google.golang.org/grpc.(*ccResolverWrapper).start
/Users/myonlyzzy/go/src/google.golang.org/grpc/resolver_conn_wrapper.go:96 +0x3f
goroutine 7 [select]:
google.golang.org/grpc.(*ccBalancerWrapper).watcher(0xc42006e280)
/Users/myonlyzzy/go/src/google.golang.org/grpc/balancer_conn_wrappers.go:122 +0x14a
created by google.golang.org/grpc.newCCBalancerWrapper
/Users/myonlyzzy/go/src/google.golang.org/grpc/balancer_conn_wrappers.go:113 +0x14c
goroutine 8 [select]:
google.golang.org/grpc.(*addrConn).transportMonitor(0xc42019e280)
/Users/myonlyzzy/go/src/google.golang.org/grpc/clientconn.go:1240 +0x235
google.golang.org/grpc.(*addrConn).connect.func1(0xc42019e280)
/Users/myonlyzzy/go/src/google.golang.org/grpc/clientconn.go:839 +0x216
created by google.golang.org/grpc.(*addrConn).connect
/Users/myonlyzzy/go/src/google.golang.org/grpc/clientconn.go:829 +0xe1
我们看到有一个transportMonitor的协程一直阻塞在select中.代码都在clientconn.go 中。我们进去看看其实有4个主要的方法.
func (ac *addrConn) connect() error
func (ac *addrConn) resetTransport() error
func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error)
func (ac *addrConn) transportMonitor()
connect
// Start a goroutine connecting to the server asynchronously.
go func() {
if err := ac.resetTransport(); err != nil {
log.Printf("resetTransport %v ",err)
grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
ac.transportMonitor()
}()
return nil
上面的是connect的一部分。connect会调用resetTransport来建立链接。再启动transportMonitor来监控链接的情况。
resetTransport
for connectRetryNum := 0; ; connectRetryNum++ {
ac.mu.Lock()
if ac.backoffDeadline.IsZero() {
// This means either a successful HTTP2 connection was established
// or this is the first time this addrConn is trying to establish a
// connection.
backoffFor := ac.dopts.bs.backoff(connectRetryNum) // time.Duration.
// This will be the duration that dial gets to finish.
dialDuration := minConnectTimeout
if backoffFor > dialDuration {
// Give dial more time as we keep failing to connect.
dialDuration = backoffFor
}
start := time.Now()
backoffDeadline = start.Add(backoffFor)
connectDeadline = start.Add(dialDuration)
ridx = 0 // Start connecting from the beginning.
} else {
// Continue trying to conect with the same deadlines.
connectRetryNum = ac.connectRetryNum
backoffDeadline = ac.backoffDeadline
connectDeadline = ac.connectDeadline
ac.backoffDeadline = time.Time{}
ac.connectDeadline = time.Time{}
ac.connectRetryNum = 0
}
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
ac.printf("connecting")
if ac.state != connectivity.Connecting {
ac.state = connectivity.Connecting
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
// copy ac.addrs in case of race
addrsIter := make([]resolver.Address, len(ac.addrs))
copy(addrsIter, ac.addrs)
copts := ac.dopts.copts
ac.mu.Unlock()
connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts)
if err != nil {
return err
}
if connected {
return nil
}
}
resetTransport 主要内容就是一个for 循环,可以看到在这个for循环中会尝试建立链接。如果建立成功就返回一个nil。如果不成功会不断重试下去。实际上不管是开头的Dial或者Dial完了关闭服务器后都是由这段代码来建立真实的链接。这也就是如果你使用withBlock 但是不使用超时的话会不断的重试下去。中途断掉也会不断重联。当然了重连的过程中是使用了backoff算法来重连。而且默认会在grpc的配置中有个默认最大重试间隔时间。默认是120.
var DefaultBackoffConfig = BackoffConfig{
MaxDelay: 120 * time.Second,
baseDelay: 1.0 * time.Second,
factor: 1.6,
jitter: 0.2,
}
transportMonitor
for {
var timer *time.Timer
var cdeadline <-chan time.Time
ac.mu.Lock()
t := ac.transport
if !ac.connectDeadline.IsZero() {
timer = time.NewTimer(ac.connectDeadline.Sub(time.Now()))
cdeadline = timer.C
}
ac.mu.Unlock()
// Block until we receive a goaway or an error occurs.
select {
case <-t.GoAway():
case <-t.Error():
case <-cdeadline:
ac.mu.Lock()
// This implies that client received server preface.
if ac.backoffDeadline.IsZero() {
ac.mu.Unlock()
continue
}
ac.mu.Unlock()
timer = nil
// No server preface received until deadline.
// Kill the connection.
grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.")
t.Close()
}
if timer != nil {
timer.Stop()
}
// If a GoAway happened, regardless of error, adjust our keepalive
// parameters as appropriate.
select {
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
default:
}
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
// Set connectivity state to TransientFailure before calling
// resetTransport. Transition READY->CONNECTING is not valid.
ac.state = connectivity.TransientFailure
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.cc.resolveNow(resolver.ResolveNowOption{})
ac.curAddr = resolver.Address{}
ac.mu.Unlock()
if err := ac.resetTransport(); err != nil {
ac.mu.Lock()
ac.printf("transport exiting: %v", err)
ac.mu.Unlock()
grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
}
monitor也是运行一个for 循环如果连接断开就调用resetTransport重试。
其实我们使用etcdclient的时候的经常要使用一个DialTimeout参数其实那个参数就是用来生成一个TimeOut的Context.用来控制建立链接的超时。