本文以roundRobin为例介绍gRPC负载均衡实现。
代码
https://github.com/messixukej…
在liangzhiyang/annotate-grpc-go基础上补充了部分注释
关键interface
负载均衡器:
type Balancer interface{
//启动负载均衡器,dialing的时候调用。
Start(target string, config BalancerConfig) error
//通知负载均衡器由新地址连接ok
Up(addr Address) (down func(error))
//获取下一个有效的负载均衡地址
Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)
//地址更新通知channel,这里返回的是全量地址
Notify() <-chan[]Address
Close() error
}
//地址解析器接口
type Resolver interface{
//解析器Resolver 用于创建 服务地址watcher。
Resolve(target string) (Watcher, error)
}
//服务地址发现器接口
//watcher用于观察服务地址的变化
type Watcher interface{
//阻塞接口,知道有服务地址变化或者错误发生
Next() ([]*Update, error)
Close()
}
关键数据结构
type testWatcher struct{
//用于接收地址更新
update chan*naming.Update
//用于表示多少个更新发生
side chan int
//用于通知地址注入者更新读已结束
readDone chanint
}
实现流程(以RoundRobin为例)
1、注入负载均衡规则
使用Dial时,使用WithBalance注入负载均衡规则
func WithBalancer(b Balancer) DialOption {
return func(o *dialOptions) {
o.balancer = b
}
}
例如,这里注入了RoundRobin 轮寻规则。
cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
2、 启动负载均衡器
roundRobin.Start -> testNameResolver.Resolve 生成服务地址发现器 -> 建立服务地址发现任务 watchAddrUpdates(新的goroutine循环监控地址)
3、服务地址发现器(开启负载均衡后,不管是初始化还是过程中的变化,都是通过服务地址发现器来获取服务端地址)
roundRobin.watchAddrUpdates ->阻塞在watcher.Next获取地址变更动作 -> 更新roundRobin.addrs -> 写入roundRobin.addrCh
3.1、开工过程,根据balancer.Notify获取服务端地址 -> 根据地址列表创建连接resetAddrConn
地址注入方式:Resolve中将服务端初始地址注入
3.2、动态变更地址 ClientConn.lbWatcher,根据balancer.Notify获取服务端地址 -> 新增地址resetAddrConn,删除地址tearDown。
地址注入方式:testWatcher.inject
3.3 创建连接细化resetAddrConn->resetTransport->roundRobin.Up
3.4 删除连接细化tearDown->roundRobin.down
4、提交记录
Invoke->getTransport->balancer.Get获取有效地址(策略:根据connected状态,循环找到有效地址。如果找不到则阻塞在waitCh)
waitCh阻塞解除由3.3触发