程序印象

gRPC 之 LoadBalancer

2018/09/07 Share

基于 Etcd 实现的服务注册和发现的 grpclb 参见: grpclb

前提

gRPC 负载均衡是针对每次请求,而不是连接,这样可以保证服务端负载的均衡性。负载均衡器按照实现侧不同一般分为两种:

1. Proxy

2. Client Side

Thick Client

所有负载均衡算法实现都在客户端。

Lookaside Load Balancing

也称单臂路由。

Etcd Loadbanlacer 实现

gPRC 当前最新版本为 Release 1.14.0,由于 Etcd 不同版本的 Banlancer 实现方式不仅相同,可以参考:client-architecture/ Etcd v3.4.0 (TBD 2018-09)。组合条件有以下几类算法:

  • clientv3-grpc1.0

    • 客户端同时保留与服务端 Nodes 多个连接,在出现错误时候,可以快速重试其他的连接

    • limit: 保持多个连接浪费资源;对于 Nodes 的健康状态或者集群 Membership 关系未知,在某个 Node 出现问题时可能不能正常工作。

  • clientv3-grpc1.7 (可选者方案)

    • 与服务端中的某一个 Node 保持连接。当集群中有多个endpoints 时候,尝试连接到所有的 endpoints,一旦选中了一个连接(pinned address),关闭其他的连接,直至该连接被关闭。如果调用中间出现错误,则进入错误处理流程。

      client-architecture-balancer-figure-02
      client-architecture-balancer-figure-03
      client-architecture-balancer-figure-04
      client-architecture-balancer-figure-05
      client-architecture-balancer-figure-06

    • 限制:使用 HTTP/2 keepalives 来保持心跳,可能会出现脑裂的情况。


  • clientv3-grpc1.14 (v3.4.0 开发中)

gRPC 1.14 LD 分析

dnsResolver

注册 dnsResolver:

1
2
3
func init() {
resolver.Register(NewBuilder())
}

Resolver 的 Builder 接口。Builder 会创建一个用于监视名称解析更新的 resolver 。

1
2
3
4
5
6
7
8
9
10
11
// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
// Build creates a new resolver for the given target.
//
// gRPC dial calls Build synchronously, and fails if the returned error is
// not nil.
Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error)
// Scheme returns the scheme supported by this resolver.
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
Scheme() string
}

Resover 的接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
// ResolveNow will be called by gRPC to try to resolve the target name
// again. It's just a hint, resolver can ignore this if it's not necessary.
//
// It could be called multiple times concurrently.
ResolveNow(ResolveNowOption)
// Close closes the resolver.
Close()
}

代码分析,首先 NewBuilder 作为一个工厂来创建一个 resolver.Builder,供 gRPC 程序来进行调用:

1
2
3
4
// NewBuilder creates a dnsBuilder which is used to factory DNS resolvers.
func NewBuilder() resolver.Builder {
return &dnsBuilder{minFreq: defaultFreq}
}

dnsResolver 结构定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// dnsResolver watches for the name resolution update for a non-IP target.
type dnsResolver struct {
freq time.Duration
backoff backoff.Exponential
retryCount int
host string
port string
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
// rn 是一个 channel, 用于 ResolveNow() 调用的时候强制进行解析
// rn channel is used by ResolveNow() to force an immediate resolution of the target.
rn chan struct{}
t *time.Timer
// wg 用于等待 watcher 的 goroutine 结束,否则可能导致 data race
// wg is used to enforce Close() to return after the watcher() goroutine has finished.
// Otherwise, data race will be possible. [Race Example] in dns_resolver_test we
// replace the real lookup functions with mocked ones to facilitate testing.
// If Close() doesn't wait for watcher() goroutine finishes, race detector sometimes
// will warns lookup (READ the lookup function pointers) inside watcher() goroutine
// has data race with replaceNetFunc (WRITE the lookup function pointers).
wg sync.WaitGroup
disableServiceConfig bool
}

Build 创建并启动一个 DNS 解析器,用于监视目标的名称解析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Build creates and starts a DNS resolver that watches the name resolution of the target.
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
// ...
host, port, err := parseTarget(target.Endpoint)

// IP address.
// 省略分析 ...

// DNS address (non-IP).
// 创建一个用于跟踪的 ctx 和 cancel
ctx, cancel := context.WithCancel(context.Background())
d := &dnsResolver{
freq: b.minFreq,
backoff: backoff.Exponential{MaxDelay: b.minFreq},
host: host,
port: port,
ctx: ctx,
cancel: cancel,
cc: cc,
t: time.NewTimer(0),
rn: make(chan struct{}, 1),
disableServiceConfig: opts.DisableServiceConfig,
}

d.wg.Add(1)
go d.watcher() // 启动 watcher 进行监听
return d, nil
}

传入参数 cc resolver.ClientConn,ClientConn 包含 resolver 的回调,以通知 gRPC ClientConn 的任何更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ClientConn contains the callbacks for resolver to notify any updates
// to the gRPC ClientConn.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {
// NewAddress is called by resolver to notify ClientConn a new list
// of resolved addresses.
// The address list should be the complete list of resolved addresses.
NewAddress(addresses []Address)
// NewServiceConfig is called by resolver to notify ClientConn a new
// service config. The service config should be provided as a json string.
NewServiceConfig(serviceConfig string)
}

watcher 函数的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (d *dnsResolver) watcher() {
defer d.wg.Done()
// 设置一个 watcher 死循环,一直在监听
for {
select {
case <-d.ctx.Done(): // 如果被取消,通过 ctx,则直接结束 for 循环
return
case <-d.t.C: // 定时器触发,第一次为0,所以直接通过
case <-d.rn: // rn 是一个 channel, 用于 ResolveNow() 调用的时候强制进行解析
// ResolveNow 的时候操作 ase d.rn <- struct{}{}:
}
// 开始异步进行解析
result, sc := d.lookup()
// Next lookup should happen within an interval defined by d.freq. It may be
// more often due to exponential retry on empty address list.
if len(result) == 0 {
d.retryCount++
d.t.Reset(d.backoff.Backoff(d.retryCount))
} else {
d.retryCount = 0
d.t.Reset(d.freq)
}
d.cc.NewServiceConfig(sc) // 回调进行通知
d.cc.NewAddress(result) // 回调进行通知
}
}

通过 Close 函数的调用来达到停止 watcher 的效果:

1
2
3
4
5
6
// Close closes the dnsResolver.
func (d *dnsResolver) Close() {
d.cancel()
d.wg.Wait()
d.t.Stop()
}

RR Balancer

Banlancer Builder 接口定义如下:

1
2
3
4
5
6
7
8
// Builder creates a balancer.
type Builder interface {
// Build creates a new balancer with the ClientConn.
Build(cc ClientConn, opts BuildOptions) Balancer
// Name returns the name of balancers built by this builder.
// It will be used to pick balancers (for example in service config).
Name() string
}

Banlancer 接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Balancer takes input from gRPC, manages SubConns, and collects and aggregates
// the connectivity states.
//
// It also generates and updates the Picker used by gRPC to pick SubConns for RPCs.
//
// HandleSubConnectionStateChange, HandleResolvedAddrs and Close are guaranteed
// to be called synchronously from the same goroutine.
// There's no guarantee on picker.Pick, it may be called anytime.
type Balancer interface {
// HandleSubConnStateChange is called by gRPC when the connectivity state
// of sc has changed.
// Balancer is expected to aggregate all the state of SubConn and report
// that back to gRPC.
// Balancer should also generate and update Pickers when its internal state has
// been changed by the new state.
HandleSubConnStateChange(sc SubConn, state connectivity.State)
// HandleResolvedAddrs is called by gRPC to send updated resolved addresses to
// balancers.
// Balancer can create new SubConn or remove SubConn with the addresses.
// An empty address slice and a non-nil error will be passed if the resolver returns
// non-nil error to gRPC.
HandleResolvedAddrs([]resolver.Address, error)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Close()
}

banlancer/base 中实现了大多数 banlancer 功能,不同选择算法的实现,基于 base 实现 pickBuilder 与 picker 接口即可。

base.NewBalancerBuilder 函数定义如下:

1
2
3
4
5
6
7
8
9
// Base 实现的函数
// NewBalancerBuilder returns a balancer builder. The balancers
// built by this builder will use the picker builder to build pickers.
func NewBalancerBuilder(name string, pb PickerBuilder) balancer.Builder {
return &baseBuilder{
name: name,
pickerBuilder: pb,
}
}

RR Banlancer 的实现基于 balancer/base 基础实现,核心功能主体在 balancer/base 中实现,而 RR Banlancer 基于 base.NewBalancerBuilder 实现了 balancer.Builder 接口,可以用于注册。

1
2
3
4
5
6
7
8
9
10
11
// Name is the name of round_robin balancer.
const Name = "round_robin"

// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &rrPickerBuilder{})
}

func init() {
balancer.Register(newBuilder())
}

PickerBuilder 接口定义如下:

1
2
3
4
5
6
// PickerBuilder creates balancer.Picker.
type PickerBuilder interface {
// Build takes a slice of ready SubConns, and returns a picker that will be
// used by gRPC to pick a SubConn.
Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker
}

RoundRobin Banlancer Builder 实现了 PickerBuilder 的接口:

1
2
3
4
5
6
7
8
9
10
11
func (*rrPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
grpclog.Infof("roundrobinPicker: newPicker called with readySCs: %v", readySCs)
// 将 map 接口转化成 slice[] 结构,并使用期构造 rrPicker 并返回
var scs []balancer.SubConn
for _, sc := range readySCs {
scs = append(scs, sc)
}
return &rrPicker{
subConns: scs,
}
}

banlancer.Picker 由 rrPicker 来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type rrPicker struct {
// subConns is the snapshot of the roundrobin balancer when this picker was
// created. The slice is immutable. Each Get() will do a round robin
// selection from it and return the selected SubConn.
subConns []balancer.SubConn

mu sync.Mutex
// 用于记录下一个位移量
next int
}

func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
if len(p.subConns) <= 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
}

p.mu.Lock()
sc := p.subConns[p.next]
p.next = (p.next + 1) % len(p.subConns)
p.mu.Unlock()
return sc, nil, nil
}

Etcd 服务注册与发现

clientv3-grpc1.14: Official client implementation, with grpc-go v1.14.x, which is used in latest etcd v3.4.

etcdv3Client -> autoSync() -> Sync() -> c.SetEndpoints(eps…) -> gc.resolverGroup.SetEndpoints(eps) -> EveryResover -> ClientConn update

参考

  1. gRPC Load Balancing
  2. Load Balancing in gRPC
  3. gRPC服务发现&负载均衡
  4. go语言gRPC负载均衡库grpc-lb的使用
  5. bsm/grpclb External Load Balancing Service solution for gRPC written in Go
  6. Writing gRPC Interceptors in Go
  7. proposal/L9-go-resolver-balancer-API.md
  8. https://github.com/DavadDi/wonaming
  9. etcd学习笔记(etcdv3, gRPC服务发现和负载均衡)
  10. etcd v3 服务注册与发现 Go代码
  11. https://godoc.org/github.com/coreos/etcd/clientv3/namespace 自动使用前缀
  12. 使用gvm管理多版本golang gvm 管理多个 golang 环境,类似于 python virtural env 方式
CATALOG
  1. 1. 前提
    1. 1.1. 1. Proxy
    2. 1.2. 2. Client Side
      1. 1.2.1. Thick Client
      2. 1.2.2. Lookaside Load Balancing
    3. 1.3. Etcd Loadbanlacer 实现
  2. 2. gRPC 1.14 LD 分析
    1. 2.1. dnsResolver
    2. 2.2. RR Balancer
  3. 3. Etcd 服务注册与发现
  4. 4. 参考