1. 程式人生 > >【Dubbo 源碼解析】06_Dubbo 服務調用

【Dubbo 源碼解析】06_Dubbo 服務調用

阻塞 發現 cluster iss 長連接 lean NPU void lib

Dubbo 服務調用

技術分享圖片

技術分享圖片

根據上圖,可以看出,服務調用過程為:

  1. Consumer 端的 Proxy 調用 Cluster 層選擇集群中的某一個 Invoker(負載均衡)

  2. Invoker 最終會調用 Protocol 層進行 RPC 通訊(netty,tcp 長連接),將服務調用信息和配置信息進行傳遞

  3. Provider 端 Protocol 層接收到服務調用信息後,最終會調用真實的服務實現

Consumer 端調用過程

通過前面 Dubbo 服務發現&引用 的學習,我們知道,Consumer 端的調用過程大體如下:

  1. 執行 FailoverClusterInvoker#invoke(Invocation invocation) (負載均衡。當有多個 provider 時走這一步,沒有的話,就跳過)

  2. 執行 Filter#invoke(Invoker<?> invoker, Invocation invocation) (所有 group="provider" 的 Filter)

  3. 執行 DubboInvoker#invoke(Invocation inv)

所以,最終的通訊過程是由 DubboInvoker 來完成的:

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    
final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); // 選擇 ExchangeClient(一般情況下只有一個) ExchangeClient currentClient; if (clients.length == 1) { currentClient
= clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { // dubbo 異步調用 ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { // 服務調用分支 RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }

閱讀源碼可以發現,獲取 Rpc 調用結果的方法是: ExchangeClient#request(inv, timeout).get(), 最終會調用 NettyChannel#send(Object message, false) 去寫消息。而消息的編解碼是通過 com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec 來處理的。

// 對 Request 消息進行編碼(DubboCodec.class)
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
    RpcInvocation inv = (RpcInvocation) data;
    out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION));
    out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
    out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
    out.writeUTF(inv.getMethodName()); // 寫出調用方法的名稱
    out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); // 寫出調用方法的參數
    Object[] args = inv.getArguments();
    if (args != null)
        for (int i = 0; i < args.length; i++) {
            out.writeObject(encodeInvocationArgument(channel, inv, i)); // 寫出調用的實參
        }
    out.writeObject(inv.getAttachments()); // 寫出 attachments
}

Provider 端調用過程

Provider 端接收到 Consumer 端的消息後,會通過 DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)處理,將消息轉化為 RpcInvocation。最終通過預先設置好的 netty 的 handler 來將 RpcInvocation 轉化為 Invoker 的調用。

通過 Dubbo 服務發現&引用 的學習,我們知道,這個 handler 最終會調用 DubboProtocol#requestHandler 來處理,最終將 Invocation 轉化為 Invoker 的調用。

調用完成後,返回結果通過DubboCodec#encodeResponseData(Channel channel, ObjectOutput out, Object data)進行編碼,最終寫回到 Consumer 端:

// 對返回結果進行編碼
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
    Result result = (Result) data;
    Throwable th = result.getException();
    if (th == null) {
        Object ret = result.getValue();
        if (ret == null) {
            out.writeByte(RESPONSE_NULL_VALUE);
        } else {
            out.writeByte(RESPONSE_VALUE);
            out.writeObject(ret);
        }
    } else {
        out.writeByte(RESPONSE_WITH_EXCEPTION);
        out.writeObject(th);
    }
}

Consumer 端接收調用返回

Conmuser 端接收到調用的返回數據後,會由 DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)將數據轉化成 RpcResult。

ExchangeClient#request(inv, timeout).get() 最終會調用 DefaultFuture#get() 來獲取 Response 中的返回數據。DefaultFuture#get() 使用了同步阻塞的方式來等待 provider 端的返回數據。

官方如是說:

滿眼都是 Invoker

由於 Invoker 是 Dubbo 領域模型中非常重要的一個概念,很多設計思路都是向它靠攏。這就使得 Invoker滲透在整個實現代碼裏,對於剛開始接觸 Dubbo 的人,確實容易給搞混了。 下面我們用一個精簡的圖來說明最重要的兩種 Invoker:服務提供 Invoker 和服務消費 Invoker
技術分享圖片

技術分享圖片

服務提供 Invoker:DubboInvoker(默認)、HessianRpcInvoker等(視協議而定)

服務消費 Invoker:AbstractProxyInvoker

【Dubbo 源碼解析】06_Dubbo 服務調用