1. 程式人生 > >Dubbo原始碼分析:RPC協議實現-客戶端併發呼叫控制

Dubbo原始碼分析:RPC協議實現-客戶端併發呼叫控制

概述

Dubbo支援在服務或者方法粒度,通過actives引數,控制客戶端對提供者服務的所有方法或者某個方法進行併發訪問控制,即在同一時刻,客戶端只允許active個請求併發呼叫服務的某個方法,超過的請求需要等待,如果在timeout時間內還是無法執行呼叫,則異常退出。

用法

服務級別

限制 com.foo.BarService 的每個方法,每客戶端併發執行(或佔用連線的請求數)不能超過 10 個:
<dubbo:service interface=“com.foo.BarService” actives=“10” />

<dubbo:reference interface=“com.foo.BarService” actives=“10” />

方法級別

限制 com.foo.BarService 的 sayHello 方法,每客戶端併發執行(或佔用連線的請求數)不能超過 10 個:
<dubbo:service interface=“com.foo.BarService”>
<dubbo:method name=“sayHello” actives=“10” />
</dubbo:service>

<dubbo:reference interface=“com.foo.BarService”>
<dubbo:method name=“sayHello” actives=“10” />
</dubbo:service>
如果dubbo:service和 dubbo:reference都配了actives,dubbo:reference優先。

原始碼實現

  1. 實現類:在rpc包下的ActiveLimitFilter,即通過過濾器的方式對請求進行過濾,當未達到actives個併發請求時,則將rpc請求直接傳給下個過濾器,否則等待直到可以執行或者超時異常。
  2. 核心實現:通過synchronized來進行執行緒同步,實現等待;方法當前併發連線數,通過執行緒安全靜態統計類RpcStatus來實現的。
  • ActiveLimitFilter類
    在這裡插入圖片描述
    從上到下分析:
    (1)根據請求的url和method,獲取該方法的狀態統計類例項count,該例項由所有請求執行緒共享;
    (2)對count進行synchronized同步,噹噹前併發請求大於配置的max時,則通過wait執行等待,同時設定超時時間。
    (3)wait:要麼是這個請求之前的其他請求呼叫成功,呼叫notify喚醒到該請求處理執行緒,要麼wait超時。
    (4)elapsed:不管是被其他執行緒notify喚醒還是wait超時喚醒,都需要計算一下從開始到現在過去了多長時間,值為elapsed,如果超過或者剛好是達到timeout,則不管是超時喚醒,還是剛好超時時刻被其他執行緒notify快取,則都丟擲超時異常。這樣設計主要是隻有不是超時喚醒,是在超時之前,被其他執行緒notify喚醒才進行執行。
    (5)執行方法呼叫:
    在這裡插入圖片描述

    • RpcStatus.beginCcount:累加當前併發呼叫數
    • 執行方法呼叫
    • 不過是呼叫成功還是異常,則均需要遞減當前併發呼叫數
    • finally:呼叫notify通知其他等待執行緒可以進行執行
  • RpcStatus:Rpc呼叫統計類,執行緒安全
    在這裡插入圖片描述
    (1)執行緒安全:主要是使用java.concorrent包的執行緒安全類保證執行緒安全,除METHOD_STATISTICS,其他屬性為對應一個服務的一個方法的呼叫相關統計,如當前併發活躍請求數active,總請求數total等。
    (2)METHOD_STATISTICS:方法統計快取。key為uri,即服務providerURL,value為ConcurrentMap<String, RpcStatus>,其中這個key為方法名methodName,value為該方法的統計RpcStatus例項。
    (3)getStatus:獲取方法的統計例項。
    在這裡插入圖片描述
    (4)AtomicInteger:原子類保證執行緒安全
    在這裡插入圖片描述