深入Dubbo 原始碼解析 — 負載均衡LoadBalance
技術點
- 面試中Dubbo負載均衡常問的幾點
- 常見負載均衡演算法簡介
- Dubbo 官方文件介紹
- Dubbo 負載均衡的策略
- Dubbo 負載均衡原始碼解析
面試中Dubbo負載均衡常問的幾點
負載均衡演算法 最小活躍數 一致性雜湊演算法
常見負載均衡演算法簡介
首先引出一點 負載均衡的目的是什麼?
當一臺伺服器的承受能力達到上限時,那麼就需要多臺伺服器來組成叢集,提升應用整體的吞吐量,那麼這個時候就涉及到如何合理分配客戶端請求到叢集中不同的機器,這個過程就叫做負載均衡
下面簡單介紹幾種負載均衡演算法,有利於理解原始碼中為什麼這樣設計
權重隨機演算法
策略就是根據權重佔比隨機。演算法很簡單,就是一根數軸。然後利用偽隨機數產生點, *看點落在了哪個區域從而選擇對應的 伺服器*
權重輪詢演算法
輪詢演算法是指依次訪問可用伺服器列表,其和隨機本質是一樣的處理,在無權重因素下,輪詢只是在選數軸上的點時採取自增對長度取餘方式。有權重因素下依然自增取餘,再看選取的點落在了哪個區域。
一致性Hash負載均衡演算法
利用Hash演算法定位相同的伺服器
- 普通的Hash :當客戶端請求到達是則使用 hash(client) % N,其中N是伺服器數量,利用這個表示式計算出該客戶端對應的Server處理
- 一致性Hash :一致性Hash是把伺服器分佈變成一個環形,每一個hash(clinet)的結果會在該環上順時針尋找第一個與其鄰的
Server
節點
—————————— 下面這部分是來源於dubbo 官方文件 ————————————
Dubbo 官方文件介紹
負載均衡
在叢集負載均衡時,Dubbo 提供了多種均衡策略,預設為 Random LoadBalance 隨機呼叫
負載均衡策略
Random LoadBalance
- 隨機 ,按權重設定隨機概率。
- 在一個截面上碰撞的概率高,但呼叫量越大分佈越均勻,而且按概率使用權重後也比較均勻,有利於動態調整提供者權重。
RoundRobin LoadBalance
- 輪詢 ,按公約後的權重設定輪詢比率。
- 存在慢的提供者累積請求的問題,比如:第二臺機器很慢,但沒掛,當請求調到第二臺時就卡在那,久而久之,所有請求都卡在調到第二臺上。
LeastActive LoadBalance
- 最少活躍呼叫數 ,相同活躍數的隨機,活躍數指呼叫前後計數差。
- 使慢的提供者收到更少請求,因為越慢的提供者的呼叫前後計數差會越大。
ConsistentHash LoadBalance
- 一致性 Hash ,相同引數的請求總是發到同一提供者。
- 當某一臺提供者掛時,原本發往該提供者的請求,基於虛擬節點,平攤到其它提供者,不會引起劇烈變動。
- 演算法參見: http://en.wikipedia.org/wiki/Consistent_hashing
- 預設只對第一個引數 Hash,如果要修改,請配置
<dubbo:parameter key="hash.arguments" value="0,1" />
- 預設用 160 份虛擬節點,如果要修改,請配置
<dubbo:parameter key="hash.nodes" value="320" />
配置
服務端服務級別
<dubbo:service interface="..." loadbalance="roundrobin" />
客戶端服務級別
<dubbo:reference interface="..." loadbalance="roundrobin" />
服務端方法級別
<dubbo:service interface="..."> <dubbo:method name="..." loadbalance="roundrobin"/> </dubbo:service>
客戶端方法級別
<dubbo:reference interface="..."> <dubbo:method name="..." loadbalance="roundrobin"/> </dubbo:reference>
———————————————— Dubbo 官方文件已結束 ——————————————
Dubbo 負載均衡的策略
上面官網文件已經說明 Dubbo 的負載均衡演算法總共有4種
- 隨機演算法 RandomLoadBalance (預設)
- 輪訓演算法 RoundRobinLoadBalance
- 最小活躍數演算法 LeastActiveLoadBalance
- 一致性hash演算法 ConsistentHashLoadBalance
我們先看下介面的繼承圖
LoadBalance
首先檢視 LoadBalance 介面
Invoker select(List invokers, URL url, Invocation invocation) throws RpcException;
LoadBalance 定義了一個方法就是從 invokers 列表中選取一個
AbstractLoadBalance
AbstractLoadBalance 抽象類是所有負載均衡策略實現類的父類,實現了LoadBalance介面 的方法,同時提供抽象方法交由子類實現,
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) { if (invokers == null || invokers.size() == 0) return null; if (invokers.size() == 1) return invokers.get(0); return doSelect(invokers, url, invocation); } protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
下面對四種均衡策略依次解析
RandomLoadBalance (隨機)
- 隨機 ,按權重設定隨機概率。
- 在一個截面上碰撞的概率高,但呼叫量越大分佈越均勻,而且按概率使用權重後也比較均勻,有利於動態調整提供者權重。
@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { //先獲得invoker 集合大小 int length = invokers.size(); // Number of invokers //總權重 int totalWeight = 0; // The sum of weights //每個invoker是否有相同的權重 boolean sameWeight = true; // Every invoker has the same weight? // 計算總權重 for (int i = 0; i < length; i++) { //獲得單個invoker 的權重 int weight = getWeight(invokers.get(i), invocation); //累加 totalWeight += weight; // Sum if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; } } // 權重不相等,隨機後,判斷在哪個 Invoker 的權重區間中 if (totalWeight > 0 && !sameWeight) { // 隨機 // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offset = ThreadLocalRandom.current().nextInt(totalWeight); // 區間判斷 // Return a invoker based on the random value. for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } // 權重相等,平均隨機 // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(ThreadLocalRandom.current().nextInt(length)); }
演算法分析
假定有3臺dubbo provider: 10.0.0.1:20884, weight=2 10.0.0.1:20886, weight=3 10.0.0.1:20888, weight=4 隨機演算法的實現: totalWeight=9; 假設offset=1(即random.nextInt(9)=1) 1-2=-1<0?是,所以選中 10.0.0.1:20884, weight=2
假設offset=4(即random.nextInt(9)=4) 4-2=2<0?否,這時候offset=2, 2-3<0?是,所以選中 10.0.0.1:20886, weight=3
假設offset=7(即random.nextInt(9)=7) 7-2=5<0?否,這時候offset=5, 5-3=2<0?否,這時候offset=2, 2-4<0?是,所以選中 10.0.0.1:20888, weight=4
流程圖
RoundRobinLoadBalance#doSelect()(輪詢)
- 輪詢 ,按公約後的權重設定輪詢比率。
- 存在慢的提供者累積請求的問題,比如:第二臺機器很慢,但沒掛,當請求調到第二臺時就卡在那,久而久之,所有請求都卡在調到第二臺上。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); int length = invokers.size(); // Number of invokers int maxWeight = 0; // The maximum weight int minWeight = Integer.MAX_VALUE; // The minimum weight final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>(); int weightSum = 0; // 計算最小、最大權重,總的權重和。 for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight minWeight = Math.min(minWeight, weight); // Choose the minimum weight if (weight > 0) { invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight)); weightSum += weight; } } // 計算最小、最大權重,總的權重和。 AtomicPositiveInteger sequence = sequences.get(key); if (sequence == null) { sequences.putIfAbsent(key, new AtomicPositiveInteger()); sequence = sequences.get(key); } // 獲得當前順序號,並遞增 + 1 int currentSequence = sequence.getAndIncrement(); // 權重不相等,順序根據權重分配 if (maxWeight > 0 && minWeight < maxWeight) { int mod = currentSequence % weightSum;// 剩餘權重 for (int i = 0; i < maxWeight; i++) {// 迴圈最大權重 for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { final Invoker<T> k = each.getKey(); final IntegerWrapper v = each.getValue(); // 剩餘權重歸 0 ,當前 Invoker 還有剩餘權重,返回該 Invoker 物件 if (mod == 0 && v.getValue() > 0) { return k; } // 若 Invoker 還有權重值,扣除它( value )和剩餘權重( mod )。 if (v.getValue() > 0) { v.decrement(); mod--; } } } } // 權重相等,平均順序獲得 // Round robin return invokers.get(currentSequence % length); }
演算法說明
假定有3臺權重都一樣的dubbo provider: 10.0.0.1:20884, weight=100 10.0.0.1:20886, weight=100 10.0.0.1:20888, weight=100 輪詢演算法的實現: 其呼叫方法某個方法(key)的sequence從0開始: sequence=0時,選擇invokers.get(0%3)=10.0.0.1:20884 sequence=1時,選擇invokers.get(1%3)=10.0.0.1:20886 sequence=2時,選擇invokers.get(2%3)=10.0.0.1:20888 sequence=3時,選擇invokers.get(3%3)=10.0.0.1:20884 sequence=4時,選擇invokers.get(4%3)=10.0.0.1:20886 sequence=5時,選擇invokers.get(5%3)=10.0.0.1:20888
LeastActiveLoadBalance (最少活躍數)
- 最少活躍呼叫數 ,相同活躍數的隨機,活躍數指呼叫前後計數差。
- 使慢的提供者收到更少請求,因為越慢的提供者的呼叫前後計數差會越大。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // 總個數 int length = invokers.size(); // Number of invokers // 最少的活躍數 int leastActive = -1; // The least active value of all invokers // 相同最小活躍數的個數 int leastCount = 0; // The number of invokers having the same least active value (leastActive) // 相同最小活躍數的下標 int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive) //總權重 int totalWeight = 0; // The sum of weights // 第一個權重,用於於計算是否相同 int firstWeight = 0; // Initial value, used for comparision // 是否所有權重相同 boolean sameWeight = true; // Every invoker has the same weight value? // 計算獲得相同最小活躍數的陣列和個數 for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // 活躍數 int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number // 權重 int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight // 發現更小的活躍數,重新開始 if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value. // 記錄最小活躍數 leastActive = active; // Record the current least active value // 重新統計相同最小活躍數的個數 leastCount = 1; // Reset leastCount, count again based on current leastCount // 重新記錄最小活躍數下標 leastIndexs[0] = i; // Reset // 重新統計總權重 totalWeight = weight; // Reset // 記錄第一個權重 firstWeight = weight; // Record the weight the first invoker // 還原權重標識 sameWeight = true; // Reset, every invoker has the same weight value? // 累計相同最小的活躍數 } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating. // 累計相同最小活躍數下標 leastIndexs[leastCount++] = i; // Record index number of this invoker // 累計總權重 totalWeight += weight; // Add this invoker's weight to totalWeight. // 判斷所有權重是否一樣 // If every invoker has the same weight? if (sameWeight && i > 0 && weight != firstWeight) { sameWeight = false; } } } // assert(leastCount > 0) if (leastCount == 1) { // 如果只有一個最小則直接返回 // If we got exactly one invoker having the least active value, return this invoker directly. return invokers.get(leastIndexs[0]); } if (!sameWeight && totalWeight > 0) { // 如果權重不相同且權重大於0則按總權重數隨機 // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); // 並確定隨機值落在哪個片斷上 // Return a invoker based on the random value. for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexs[i]; offsetWeight -= getWeight(invokers.get(leastIndex), invocation); if (offsetWeight <= 0) return invokers.get(leastIndex); } } // 如果權重相同或權重為0則均等隨機 // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(leastIndexs[ThreadLocalRandom.current().nextInt(leastCount)]); }
簡單思路介紹
概括起來就兩部分,一部分是 活躍數
和 權重
的統計,另一部分是選擇 invoker
.也就是他把最小活躍數的 invoker
統計到 leastIndexs
陣列中,如果權重一致(這個一致的規則參考上面的隨機演算法)或者總權重為0,則均等隨機呼叫,如果不同,則從 leastIndexs
陣列中按照權重比例呼叫
演算法說明
最小活躍數演算法實現: 假定有3臺dubbo provider: 10.0.0.1:20884, weight=2,active=2 10.0.0.1:20886, weight=3,active=4 10.0.0.1:20888, weight=4,active=3 active=2最小,且只有一個2,所以選擇10.0.0.1:20884
假定有3臺dubbo provider: 10.0.0.1:20884, weight=2,active=2 10.0.0.1:20886, weight=3,active=2 10.0.0.1:20888, weight=4,active=3 active=2最小,且有2個,所以從[10.0.0.1:20884,10.0.0.1:20886 ]中選擇; 接下來的演算法與隨機演算法類似: 假設offset=1(即random.nextInt(5)=1) 1-2=-1<0?是,所以選中 10.0.0.1:20884, weight=2 假設offset=4(即random.nextInt(5)=4) 4-2=2<0?否,這時候offset=2, 2-3<0?是,所以選中 10.0.0.1:20886, weight=3
流程圖
ConsistentHashLoadBalance (一致性雜湊)
- 一致性 Hash ,相同引數的請求總是發到同一提供者。
- 當某一臺提供者掛時,原本發往該提供者的請求,基於虛擬節點,平攤到其它提供者,不會引起劇烈變動。
原始碼其實分為四個步驟
- 定義全域性一致性hash選擇器的
ConcurrentMap<String, ConsistentHashSelector<?>> selectors
,key為方法名稱,例如com.alibaba.dubbo.demo.TestService.getRandomNumber - 如果一致性hash選擇器不存在或者與以前儲存的一致性hash選擇器不一樣(即dubbo服務provider有變化,通過System.identityHashCode(invokers)計算一個identityHashCode值) 則需要重新構造一個一致性hash選擇器
- 構造一個一致性hash選擇器ConsistentHashSelector的原始碼如下,通過引數i和h打散Invoker在TreeMap上的位置,replicaNumber預設值為160,所以最終virtualInvokers這個TreeMap的size為
invokers.size()*replicaNumber
- 選擇Invoker的步驟
- 根據Invocation中的引數invocation.getArguments()轉成key
- 算出這個key的md5值
- 根據md5值的hash值從TreeMap中選擇一個Invoker
下面原始碼解析+註釋在此我向大家推薦一個架構學習交流裙。交流學習裙號:821169538,裡面會分享一些資深架構師錄製的視訊錄影
public class ConsistentHashLoadBalance extends AbstractLoadBalance { public static final String NAME = "consistenthash"; /** * 服務方法與一致性雜湊選擇器的對映 * * KEY:serviceKey + "." + methodName */ private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>(); @SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String methodName = RpcUtils.getMethodName(invocation); // 基於 invokers 集合,根據物件記憶體地址來計算定義雜湊值 String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; int identityHashCode = System.identityHashCode(invokers); // 獲得 ConsistentHashSelector 物件。若為空,或者定義雜湊值變更(說明 invokers 集合發生變化), // 進行建立新的 ConsistentHashSelector 物件 ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } return selector.select(invocation); } private static final class ConsistentHashSelector<T> { /** * 虛擬節點與 Invoker 的對映關係 */ private final TreeMap<Long, Invoker<T>> virtualInvokers; /** * 每個Invoker 對應的虛擬節點數 */ private final int replicaNumber; /** * 定義雜湊值 */ private final int identityHashCode; /** * 取值引數位置陣列 */ private final int[] argumentIndex; ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); // 設定 identityHashCode this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); // 初始化 replicaNumber this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); // 初始化 argumentIndex String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } // 初始化 virtualInvokers for (Invoker<T> invoker : invokers) { String address = invoker.getUrl().getAddress(); // 每四個虛擬結點為一組,為什麼這樣?下面會說到 for (int i = 0; i < replicaNumber / 4; i++) { // 這組虛擬結點得到惟一名稱 byte[] digest = md5(address + i); // Md5是一個16位元組長度的陣列,將16位元組的陣列每四個位元組一組, // 分別對應一個虛擬結點,這就是為什麼上面把虛擬結點四個劃分一組的原因 for (int h = 0; h < 4; h++) { // 對於每四個位元組,組成一個long值數值,做為這個虛擬節點的在環中的惟一key long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } } public Invoker<T> select(Invocation invocation) { // 基於方法引數,獲得 KEY String key = toKey(invocation.getArguments()); // 計算 MD5 值 byte[] digest = md5(key); // 計算 KEY 值 return selectForKey(hash(digest, 0)); } /** * 基於方法引數,獲得 KEY * @param args * @return */ private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); } /** * 選一個 Invoker 物件 * @param hash * @return */ private Invoker<T> selectForKey(long hash) { // 得到大於當前 key 的那個子 Map ,然後從中取出第一個 key ,就是大於且離它最近的那個 key Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash); // 不存在,則取 virtualInvokers 第一個 if (entry == null) { entry = virtualInvokers.firstEntry(); } // 存在,則返回 return entry.getValue(); } /** * 對於每四個位元組,組成一個 Long 值數值,做為這個虛擬節點的在環中的惟一 KEY * @param digest * @param number * @return */ private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[number * 4] & 0xFF)) & 0xFFFFFFFFL; } /** * MD5 是一個 16 位元組長度的陣列,將 16 位元組的陣列每四個位元組一組, * 分別對應一個虛擬結點,這就是為什麼上面把虛擬結點四個劃分一組的原因 * @param value * @return */ private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes; try { bytes = value.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new IllegalStateException(e.getMessage(), e); } md5.update(bytes); return md5.digest(); } } }