背景
在選擇負載均衡算法時,我們希望滿足以下要求:
具備分區和機房調度親和性每次選擇的節點盡量是負載最低的每次盡可能選擇響應最快的節點無需人工干預故障節點當一個節點有故障時,負載均衡算法可以自動隔離該節點當故障節點恢複時,能夠自動恢複對該節點的流量分發
基於這些考慮,本案例選擇 p2c+EWMA 算法來實現。
算法的核心思想
這裡就說說p2c和EWMA
p2c
p2c (Pick Of 2 Choices) 二選一: 在多個節點中隨機選擇兩個節點。
go-zero 中的會隨機的選擇3次,如果其中一次選擇的節點的健康條件滿足要求,就中斷選擇,採用這兩個節點。
EWMA
EWMA (Exponentially Weighted Moving-Average) 指數移動加權平均法: 是指各數值的加權系數隨時間呈指數遞減,越靠近當前時刻的數值加權系數就越大,體現了最近一段時間内的平均值。
公式:
EWMA公式
變量解釋:
Vt: 代表的是第 t 次請求的 EWMA值Vt-1: 代表的是第 t-1 次請求的 EWMA值β: 是一個常量
EWMA 算法的優勢
相較於普通的計算平均值算法,EWMA 不需要保存過去所有的數值,計算量顯著減少,同時也減小了存儲資源。傳統的計算平均值算法對網路耗時不敏感, 而 EWMA 可以通過請求頻繁來調節 β,進而迅速監控到網路毛刺或更多的體現整體平均值。當請求較爲頻繁時, 說明節點網路負載升高了, 我們想監測到此時節點處理請求的耗時(側面反映了節點的負載情況), 我們就相應的調小β。β越小,EWMA值 就越接近本次耗時,進而迅速監測到網路毛刺;當請求較爲不頻繁時, 我們就相對的調大β值。這樣計算出來的 EWMA值 越接近平均值
β計算
go-zero 採用的是牛頓冷卻定律中的衰減函數模型計算 EWMA 算法中的 β 值:
牛頓冷卻定律中的衰減函數
其中 Δt 爲兩次請求的間隔,e,k 爲常數
gRPC 中實現自定義負載均衡器
首先我們需要實現 google.golang.org/grpc/balancer/base/base.go/PickerBuilder 接口, 這個接口是有服務節點更新的時候會調用接口裡的Build方法
type PickerBuilder interface { // Build returns a picker that will be used by gRPC to pick a SubConn. Build(info PickerBuildInfo) balancer.Picker }
還要實現 google.golang.org/grpc/balancer/balancer.go/Picker 接口。這個接口主要實現負載均衡,挑選一個節點供請求使用
type Picker interface { Pick(info PickInfo) (PickResult, error) }
最後向負載均衡 map 中注冊我們實現的負載均衡器
實現負載均衡的主要邏輯
在每次節點更新,gRPC 會調用 Build 方法,此時在 Build 裡實現保存所有的節點信息。gRPC 在獲取節點處理請求時,會調用 Pick 方法以獲取節點。go-zero 在 Pick 方法裡實現了 p2c 算法,挑選節點,並通過節點的 EWMA值 計算負載情況,返回負載低的節點供 gRPC 使用。在請求結束的時候 gRPC 會調用 PickResult.Done 方法,go-zero 在這個方法裡實現了本次請求耗時等信息的存儲,並計算出了 EWMA值 保存了起來,供下次請求時計算負載等情況的使用。
負載均衡代碼分析
保存服務的所有節點信息
我們需要保存節點處理本次請求的耗時、EWMA 等信息,go-zero 給每個節點設計了如下結構:
type subConn struct { addr resolver.Address conn balancer.SubConn lag uint64 // 用來保存 ewma 值 inflight int64 // 用在保存當前節點正在處理的請求總數 success uint64 // 用來標識一段時間内此連接的健康狀態 requests int64 // 用來保存請求總數 last int64 // 用來保存上一次請求耗時, 用於計算 ewma 值 pick int64 // 保存上一次被選中的時間點 }
p2cPicker 實現了 balancer.Picker 接口,conns 保存了服務的所有節點信息
type p2cPicker struct { conns []*subConn // 保存所有節點的信息 r *rand.Rand stamp *syncx.AtomicDuration lock sync.Mutex }
gRPC 在節點有更新的時候會調用 Build 方法,傳入所有節點信息,我們在這裡把每個節點信息用 subConn 結構保存起來。並歸並到一起用 p2cPicker 結構保存起來
func (b *p2cPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { ...... var conns []*subConn for conn, connInfo := range readySCs { conns = append(conns, &subConn{ addr: connInfo.Address, conn: conn, success: initSuccess, }) } return &p2cPicker{ conns: conns, r: rand.New(rand.NewSource(time.Now().UnixNano())), stamp: syncx.NewAtomicDuration(), } }
隨機挑選節點信息,在這裡分了三種情況:
主要實現代碼如下:
switch len(p.conns) { case 0: // 沒有節點,返回錯誤 return emptyPickResult, balancer.ErrNoSubConnAvailable case 1: // 有一個節點,直接返回這個節點 chosen = p.choose(p.conns[0], nil) case 2: // 有兩個節點,計算負載,返回負載低的節點 chosen = p.choose(p.conns[0], p.conns[1]) default: // 有多個節點,p2c 挑選兩個節點,比較這兩個節點的負載,返回負載低的節點 var node1, node2 *subConn // 3次隨機選擇兩個節點 for i := 0; i < pickTimes; i++ { a := p.r.Intn(len(p.conns)) b := p.r.Intn(len(p.conns) - 1) if b >= a { b++ } node1 = p.conns[a] node2 = p.conns[b] // 如果這次選擇的節點達到了健康要求, 就中斷選擇 if node1.healthy() && node2.healthy() { break } } // 比較兩個節點的負載情況,選擇負載低的 chosen = p.choose(node1, node2) }
只有一個服務節點,此時直接返回供 gRPC 使用即可有兩個服務節點,通過 EWMA值 計算負載,並返回負載低的節點返回供 gRPC 使用有多個服務節點,此時通過 p2c 算法選出兩個節點,比較負載情況,返回負載低的節點供 gRPC 使用
load計算節點的負載情況
上面的 choose 方法會調用 load 方法來計算節點負載。
計算負載的公式是: load = ewma * inflight
在這裡簡單解釋下:ewma 相當於平均請求耗時,inflight 是當前節點正在處理請求的數量,相乘大致計算出了當前節點的網路負載。
func (c *subConn) load() int64 { // 通過 EWMA 計算節點的負載情況; 加 1 是爲了避免爲 0 的情況 lag := int64(math.Sqrt(float64(atomic.LoadUint64(&c.lag) + 1))) load := lag * (atomic.LoadInt64(&c.inflight) + 1) if load == 0 { return penalty } return load }
請求結束,更新節點的 EWMA 等信息
func (p *p2cPicker) buildDoneFunc(c *subConn) func(info balancer.DoneInfo) { start := int64(timex.Now()) return func(info balancer.DoneInfo) { // 正在處理的請求數減 1 atomic.AddInt64(&c.inflight, -1) now := timex.Now() // 保存本次請求結束時的時間點,並取出上次請求時的時間點 last := atomic.SwapInt64(&c.last, int64(now)) td := int64(now) - last if td < 0 { td = 0 } // 用牛頓冷卻定律中的衰減函數模型計算EWMA算法中的β值 w := math.Exp(float64(-td) / float64(decayTime)) // 保存本次請求的耗時 lag := int64(now) - start if lag < 0 { lag = 0 } olag := atomic.LoadUint64(&c.lag) if olag == 0 { w = 0 } // 計算 EWMA 值 atomic.StoreUint64(&c.lag, uint64(float64(olag)*w+float64(lag)*(1-w))) success := initSuccess if info.Err != nil && !codes.Acceptable(info.Err) { success = 0 } osucc := atomic.LoadUint64(&c.success) atomic.StoreUint64(&c.success, uint64(float64(osucc)*w+float64(success)*(1-w))) stamp := p.stamp.Load() if now-stamp >= logInterval { if p.stamp.CompareAndSwap(stamp, now) { p.logStats() } } } }
把節點正在處理請求的總數減1保存處理請求結束的時間點,用於計算距離上次節點處理請求的差值,並算出 EWMA中的 β值計算本次請求耗時,並計算出 EWMA值 保存到節點的 lag 屬性裡計算節點的健康狀態保存到節點的 success 屬性中
項目Git地址 https://github.com/tal-tech/go-zero