1. 程式人生 > >Dubbo原始碼實現五:RPC中的服務消費方實現

Dubbo原始碼實現五:RPC中的服務消費方實現

        剛開始使用Dubbo的人,可能對Dubbo的第一印象就是它是一個RPC框架,當然,所有的分散式框架都少不了相互通訊的過程,何況Dubbo的任務就是幫助分散式業務系統完成服務的通訊、負載、註冊、發現和監控等功能。不得不承認,RPC是Dubbo提供服務的核心流程,為了相容多種使用場景,Dubbo顯然需要提供多種RPC方式(協議).

        開發一個簡單的RPC框架,重點需要考慮的是兩點,即編解碼方式和底層通訊協議的選型,編解碼方式指的是需要傳輸的資料在呼叫方將以什麼組織形式拆解成位元組流並在服務提供方以什麼形式解析出來。編解碼方式的設計需要考慮到後期的版本升級,所以很多RPC協議在設計時都會帶上當前協議的版本資訊。而底層通訊協議的選型都大同小異,一般都是TCP(當然也可以選擇建立於TCP之上更高階的協議,比如Avro、Thrift和HTTP等),在Java語言中就是指套接字Socket,當然,在Netty出現後,很少RPC框架會直接以自己寫Socket作為預設實現的通訊方式,但通常也會自己實現一個aio、nio或bio版本給那些“不方便”依賴Netty庫的應用系統來使用。

        在Dubbo的原始碼中,有一個單獨模組dubbo-rpc,其中,最重要的應該是Protocol和Invoker兩個介面,代表著協議(編解碼方式)和呼叫過程(通訊方式)。Invoker介面繼承於Node介面,Node介面規範了Dubbo體系中各元件之間通訊的基本要素: 

public interface Node {
    // 協議資料載體
    URL getUrl();
    // 狀態監測,當前是否可用
    boolean isAvailable();
    // 銷燬方法
    void destroy();
}

而Invoker介面則更簡單:

public interface Invoker<T> extends Node {
    // 獲取呼叫的介面
    Class<T> getInterface();
    // 呼叫過程
    Result invoke(Invocation invocation) throws RpcException;
}

從原始碼dubbo-rpc下的子模組來看,我們能知道目前Dubbo支援dubbo(預設)、hessian、http、injvm(本地呼叫)、memcached、redis、rmi、thrift和webservice等9中RPC方式。根據Dubbo的官方手冊,injvm是一個偽協議,它不開啟埠,不發起遠端呼叫,只在JVM內直接關聯,但執行Dubbo的Filter鏈,所以這一般用於線下測試。可是為啥Memcached和Redis也能用作RPC?這裡是指Dubbo端作為服務消費方,而Memcached或Redis作為服務提供方。

       我們這裡重點看呼叫方(服務消費方)部分的程式碼。

      雖然Invoker介面中定義的是invoke方法,invoker方法的實現理應RPC的整個操作,但為了狀態檢查、上下文切換和準備、異常捕獲等,抽象類AbstractInvoker中定義了一個doInvoker抽象方法來支援不同的RPC方式所應做的純粹而具體的RPC過程,我們直接看AbstractInvoker中的invoker實現:

public Result invoke(Invocation inv) throws RpcException {
    if(destroyed) {
        throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost() 
                                        + " use dubbo version " + Version.getVersion()
                                        + " is DESTROYED, can not be invoked any more!");
    }
    RpcInvocation invocation = (RpcInvocation) inv;
    invocation.setInvoker(this);
    // 填充介面引數
    if (attachment != null && attachment.size() > 0) {
       invocation.addAttachmentsIfAbsent(attachment);
    }
    // 填充業務系統需要透傳的引數
    Map<String, String> context = RpcContext.getContext().getAttachments();
    if (context != null) {
       invocation.addAttachmentsIfAbsent(context);
    }
    // 預設是同步呼叫,但也支援非同步
    if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){
       invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
    }

    /**
     * 冪等操作:非同步操作預設新增invocation id,它是一個自增的AtomicLong
     * 可以在RpcContext中設定attachments的{@link Constants.ASYNC_KEY}值來設定是同步還是非同步
     */
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

    try {
        
        // 執行具體的RPC操作
        return doInvoke(invocation);

    // 異常處理的程式碼略去
    } catch (InvocationTargetException e) {
    } catch (RpcException e) {
    } catch (Throwable e) {
    }
}

      可以看出主要是用來做引數填充(包括方法引數、業務引數和Dubbo內定的引數),然後就直接呼叫具體的doInvoker方法了。Dubbo所支援的RPC協議都需繼承AbstractInvoker類。

         我們先來看看Dubbo中預設的dubbo協議的實現,即DubboInvoker,直接看其doInvoker的實現: 

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);

    // 確定此次呼叫該使用哪個client(一個client代表一個connection)
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        // 如果是多個client,則使用簡單的輪詢方式來決定
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // 是否非同步呼叫
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        // 是否單向呼叫,注意,單向呼叫和非同步呼叫相比不同,單向呼叫不等待被呼叫方的應答就直接返回
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
           boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            // 單向呼叫只負責傳送訊息,不等待服務端應答,所以沒有返回值
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
           ResponseFuture future = currentClient.request(inv, timeout);
            // 非同步呼叫先儲存future,便於後期處理
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else {
            // 預設的同步呼叫
           RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

從上面的程式碼可以看出,dubbo協議中分為三種呼叫方式同步(預設)、非同步和OneWay,同步好理解,就是阻塞等拿到被呼叫方的結果再返回,非同步也好理解,不等待被呼叫者的處理結果就直接返回,但需要等到被呼叫者接收到非同步請求的應答,OneWay(單向呼叫)在很多MQRPC框架中都有出現,即呼叫方只負責呼叫一次,不管被呼叫方是否接收到該請求,更不會去理會被呼叫方的任何應答,OneWay一般只會在無需保證呼叫結果的時候使用。在《Dubbo原始碼實現二》中我們已經提到過,負載的策略決定此次服務呼叫是請求哪個服務提供方(也就是哪臺伺服器),當確定了呼叫哪個服務提供房後,其實也就是確定了使用哪個Invoker,這裡指DubboInvoker例項。RPC框架為了提高服務的吞吐量,通常服務消費方和服務提供方的伺服器之間會建立多個連線,如上面程式碼中的clients所以在確定使用哪個DubboInvoker例項後,會從中選擇一個(如上面程式碼的取模輪詢)client來進行RPC呼叫。從上面給出的程式碼可以看出,同步和非同步的區別只是同步直接在currentClient.request返回的Future物件上進行了get操作來直接等待結果的返回。

       Dubbo中的Client例項都是ExchangeClient的實現,而每個Client例項都會繫結一個Channel的例項,來處理通訊的具體細節,而所有的Channel例項都實現了ExchangeChannel介面。這裡我們先來看看HeaderExchangeChannel#request的實現: 

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    Request req = new Request();
    req.setVersion("2.0.0");
    // 相比OneWay,同步和非同步呼叫屬於TwoWay
    req.setTwoWay(true);
    req.setData(request);
    // 建立DefaultFuture,用於將請求和應答關聯起來
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try{
        // 直接傳送呼叫請求
        channel.send(req);
    }catch (RemotingException e) {
        future.cancel();
        throw e;
    }

    // 將future返回,用於拿到服務呼叫的返回值
    return future;
}

從上面程式碼可以看出,在直接呼叫channel.send傳送資料時,先建立了一個DefaultFuture,它主要用於關聯請求和應答,DefaultFuture將稍後分析。後面,直接呼叫了Channel的send方法,dubbo協議底層直接使用了Netty框架,所以這裡指的是NettyChannel,見NettyChannel#send的程式碼:

public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);
    
    boolean success = true;
    int timeout = 0;
    try {
        ChannelFuture future = channel.write(message);
        /**
         * sent值只是為了效能調優,預設是false
         */
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        Throwable cause = future.getCause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }
    
    // 當sent為true且資料傳送時間超過指定的超時時間時,由Dubbo負責丟擲異常
    if(! success) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}

根據Dubbo使用者手冊中所說,sent引數的配置主要用於效能調優,這裡當sent為true時(預設為false),將直接使用Netty的ChannelFuture來實現在給定的超時時間內等待,如果資料傳送時間超過指定的超時時間,則丟擲異常。之所以這樣做,是為了將Netty框架處理時間控制在超時時間範圍內,否則Dubbo框架在外圍做的超時機制(DefaultFuture)將徒勞。

      接下來,我們看看Dubbo如何將請求和應答關聯起來的,前面看到的HeaderExchangeChannel#request實現中,建立了一個Request物件,Request中有一個mId,用來唯一表示一個請求物件,而該mId在new的時候就會建立:

public Request() {
    mId = newId();
}
 
private static long newId() {
    // getAndIncrement()增長到MAX_VALUE時,再增長會變為MIN_VALUE,負數也可以做為ID
    return INVOKE_ID.getAndIncrement();
}

而DefaultFuture靠的就是這個mId來關聯請求和應答訊息,DefaultFuture中有兩個很重要的屬性:FUTURS和CHANNELS,它們型別都是ConcurrentHashMap,key為mId,在新建DefaultFuture物件時會把mId和相關的Future和Channel塞到這兩個Map中,還有一個ReentrantLock型別的lock屬性,用於阻塞來等待應答,我們直接看DefaultFuture中獲取結果和接收到應答後的實現:

public Object get(int timeout) throws RemotingException {
    if (timeout <= 0) {
        // 預設的超時時間是1秒
        timeout = Constants.DEFAULT_TIMEOUT;
    }
    if (! isDone()) {
        long start = System.currentTimeMillis();
        lock.lock();
        try {
            while (! isDone()) {
                // 最多等制定的超時時間
                done.await(timeout, TimeUnit.MILLISECONDS);
                // 如果已經有結果或者已經超過超時時間,則break
                if (isDone() || System.currentTimeMillis() - start > timeout) {
                    break;
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
        if (! isDone()) {
            throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
        }
    }
    return returnFromResponse();
}
 
public static void received(Channel channel, Response response) {
    try {
        // 獲取並移除該mId的Future
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at " 
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                        + ", response " + response 
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
                            + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        // 獲取並移除該mId的Channel
        CHANNELS.remove(response.getId());
    }
}

private void doReceived(Response res) {
    lock.lock();
    try {
        response = res;
        if (done != null) {
            // 釋放訊號
            done.signal();
        }
    } finally {
        lock.unlock();
    }
    if (callback != null) {
        invokeCallback(callback);
    }
}

由於received是靜態方法,所以可以直接在Netty中註冊的Handler中使用。

      那服務消費方和服務提供方的連線數量是由誰決定的呢?這個我們可以直接看DubboInvoker的建立方DubboProtocol中的程式碼:

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}

private ExchangeClient[] getClients(URL url){
    //是否共享連線
    boolean service_share_connect = false;
    /** 如果在dubbo:reference中沒有設定{@link Constants.CONNECTIONS_KEY},則預設是共享連線  */
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    //如果connections不配置,則共享連線,否則每服務每連線
    if (connections == 0){
        service_share_connect = true;
        connections = 1;
    }

    // 一個client維護一個connection
    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (service_share_connect){
            // 使用共享的TCP長連線
            clients[i] = getSharedClient(url);
        } else {
            // 單獨為該URL建立TCP長連線
            clients[i] = initClient(url);
        }
    }
    return clients;
}

從getClients的程式碼可以看出,服務消費方和服務提供方的伺服器之間的連線數量是可以配置的,服務消費方和服務提供方都可以配置,當然服務消費方優先順序更高,例如:

服務消費方A:<dubbo:reference   interface="com.foo.BarServiceA"  /> 

服務消費方A:<dubbo:reference   interface="com.foo.BarServiceB"  connections="5"  /> 

服務提供方B:<dubbo:service  interface="com.foo.BarServiceA"  /> 

服務提供方B:<dubbo:service  interface="com.foo.BarServiceB"  connections="10"  /> 

對於服務BarServiceA,由於消費方和提供方都沒有配置connections,所以,所有類似於BarServiceA這樣沒有配置connections的服務,消費方伺服器和提供方伺服器將公用一個TCP長連線,即上面程式碼說提到的共享連線。而對於服務BarServiceA,因為配置了connections屬性,消費方A和提供方B之間將單獨建立5個(消費方配置優先順序高於服務端配置,所以這裡是5而不是10TCP長連線來專門給服務BarServiceA使用,以提高吞吐量和效能,至於每次呼叫應該如何從這5個連線中選,前面已經提到,這裡不再闡述。所以,為了提高某個服務的吞吐量,可以試著配置connections屬性,當然,前提是服務提供方效能過剩。

         對於非同步呼叫,Dubbo的預設呼叫過濾鏈中有一個FutureFilter,當我們在dubbo:reference中配置了async="true"後,將會執行FutureFilter中的非同步邏輯,這裡不再闡述,感興趣的同學可以去閱讀FutureFilter#asyncCallback部分的程式碼。