1. 程式人生 > >微服務優化之並行呼叫

微服務優化之並行呼叫

微服務優化之並行呼叫

原文連結

網際網路產品隨著使用者的增加,系統對服務的高效能、高可用、可伸縮、可擴充套件的支援,大都採用分散式RPC框架。然而隨著業務的增加,系統越來越多,系統之間的呼叫也越來越複雜,原本一個系統中一次請求就可以完成的工作,現在可能被分散在多個系統中,一次請求需要多個系統響應。這樣就會放大RPC呼叫延遲帶來的副作用,影響系統的高效能需求。

例如:一個RPC介面中需要依賴另外三個系統的RPC服務,各RPC服務的響應時間分別是20ms、10ms、10ms,那麼這個介面的對外系統依賴的耗時40ms。如果介面依賴越多,響應時間就會越長。

對此,需要在業務範圍內進行效能優化,優化思路總的來說有兩種:

第一:如果對RPC介面呼叫,不需要關心介面的返回值,那麼可以採用非同步RPC呼叫。

第二:如果依賴RPC介面返回值,並且連續呼叫的多個RPC之間沒有依賴關係,可以採用並行化處理。

本文主要分享一下通過並行化處理,來優化RPC介面響應時間,如上例子中的RPC採用並行呼叫,對外系統介面的依賴耗時會降低到20ms。

第一:因為Java對執行緒的使用非常方便,所以完成並行呼叫對於Java語言來說是相對簡單,根據依賴外部介面分別建立一個執行緒來呼叫就可以完成。

第二:那麼問題來:如果我的介面在完成其他介面呼叫後,還需要完成額外的功能而且需要依賴其他介面呼叫結果,該怎麼處理呢?Thread類通過join呼叫,可以讓主執行緒等待子執行緒處理結果。

第三:那麼問題又來了:子執行緒內部的異常無法在外部獲取,而需要依賴外部介面的呼叫結果的情況下,如果RPC介面丟擲異常,必須在主執行緒中獲取並作出相應處理,這個工作可以通過FutureTask來完成。

第四:那麼問題又來了:如果一個介面依賴十個外部系統,那麼每次請求就需要建立十個執行緒,隨著介面TPS增加,系統建立執行緒和銷燬的執行緒耗費的資源越來越高,這個時候需要考慮採用執行緒池方案了。

第五:那麼問題又來了:以上例項只是一個單應用的測試Demo,真實應用情況下如上這樣在程式碼中建立執行緒池並沒太大意義,應該建立全域性的執行緒池,所有請求共用執行緒池才能達到執行緒資源共用。但是Executors中執行緒池都預設採用AbortPolicy 的拒絕策略,在高併發情況下,就會頻繁出現的執行緒池拒絕服務異常。此時可以考慮自定義執行緒池,採用CallerRunsPolicy拒絕策略,在高併發量,當執行緒池無法提供服務的情況下,採用主執行緒自己建立執行緒,達到併發量和計算資源的最優協調。

第六:完成以上操作就可以完美了嗎?然而情況並非如此,如果細心測試發現,如果其中一個介面丟擲異常時,主執行緒就結束了,而其他還沒有執行結束的子執行緒將繼續執行,一開始我們通過Thread.join()來協調主子執行緒的先後順序,而現在採用執行緒池,無法在獲取執行緒並且呼叫join方法,而是採用FutureTask.get()來協調先後順序,那麼還可以採用哪些方式保證主執行緒最後結束呢?此時可以採用一些特有的併發工具,如:閉鎖,柵欄,訊號量。如下為網路摘抄的三個工具對比:

閉鎖(CountDownLatch)

類似於門。門初始是關閉的,試圖進門的執行緒掛起等待開門。當負責開門程序將門開啟後,所有等待執行緒被喚醒。

門一旦開啟就不能再關閉了。

CountDownLatch(int n):指定閉鎖計數器

await() :掛起等待閉鎖計數器為0

countDown():閉鎖計數器減1

柵欄(CyclicBarrier)

和閉鎖有類似之處。閉鎖是等待“開門”事件;柵欄是等待其他執行緒。例如有N個執行緒檢視通過柵欄,此時先到的要等待,直到所有執行緒到到達後,柵欄開啟,所有等待執行緒被喚醒通過柵欄。

CyclicBarrier(int n):需要等待的執行緒數量

await():掛起等待達到執行緒數量

訊號量(Semaphore)

和鎖的作用類似。區別是鎖只允許被一個執行緒獲取,但是訊號量可以設定資源數量。當沒有可用資源時,才被掛起等待。

Semaphore(int n):指定初始的資源數量

acquire():試圖獲取資源。當沒有可用資源時掛起

release():釋放一個資源

       本文采用柵欄完成例項程式碼如下:

package com.halfworlders.test.domo;

import java.util.concurrent.Callable;

import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.FutureTask;

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

 

import com.halfworlders.test.exp.AppException;

import com.halfworlders.test.impl.ServiceImpl;

import com.halfworlders.test.intf.ServiceInterface;

 

publicclass App {

    /**

     * 外介面總數

     */

    privatestaticfinalintINTERFACE_COUNT = 10;

    ExecutorService executorService = new ThreadPoolExecutor(INTERFACE_COUNT, INTERFACE_COUNT*3, 10L,

           TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), newThreadPoolExecutor.CallerRunsPolicy());

 

    publicstaticvoid main(String[] args) {

       longstart = System.currentTimeMillis();

       App test = new App();

       test.test();

       longend = System.currentTimeMillis();

       System.out.println("總耗時:"+(end-start)+"ms");

    }

 

    @SuppressWarnings("unchecked")

    publicvoid test() {

       final CyclicBarrier cb = new CyclicBarrier(INTERFACE_COUNT + 1);

       final ServiceInterface[] services = assembles();

       final FutureTask<Integer>[] futureTasks = new FutureTask[INTERFACE_COUNT];

       for (inti = 0; i < INTERFACE_COUNT; i++) {

           final Integer fi = i;

           futureTasks[i] = new FutureTask<Integer>(new Callable<Integer>() {

              @Override

              public Integer call() throws Exception {

                  try {

                     returnservices[fi].service();

                  } finally {

                     cb.await();

                  }

              }

           });

           executorService.submit(futureTasks[i]);

       }

       String serviceName = null;

       try {

           // 開啟柵欄

           cb.await();

           // 如果有其他系統呼叫異常,則將該異常向外層丟擲

           for (inti = 0; i < INTERFACE_COUNT; i++) {

              serviceName = services[i].getName();

              futureTasks[i].get();

           }

       } catch (Exception e) {

           if ((einstanceof ExecutionException) && (e.getCause() instanceof AppException)) {

              throw (AppException) e.getCause();

           } else {

              thrownew RuntimeException(serviceName+"系統異常", e);

           }

       }

    }

   

    private ServiceInterface[] assembles(){

       ServiceInterface[] service = new ServiceInterface[INTERFACE_COUNT];

       for (inti = 0; i < INTERFACE_COUNT; i++) {

           service[i] = new ServiceImpl("介面"+i);

       }

        returnservice;

    }

}