1. 程式人生 > 程式設計 >Dubbo原始碼解析(八)遠端通訊——開篇

Dubbo原始碼解析(八)遠端通訊——開篇

遠端通訊——開篇

目標:介紹之後解讀遠端通訊模組的內容如何編排、介紹dubbo-remoting-api中的包結構設計以及最外層的的原始碼解析。

前言

服務治理框架中可以大致分為服務通訊和服務管理兩個部分,前面我先講到有關註冊中心的內容,也就是服務管理,當然dubbo的服務管理還包括監控中心、 telnet 命令,它們起到的是人工的服務管理作用,這個後續再介紹。接下來我要講解的就是跟服務通訊有關的部分,也就是遠端通訊模組。我在《dubbo原始碼解析(一)Hello,Dubbo》的"(六)dubbo-remoting——遠端通訊模組“中提到過一些內容。該模組中提供了多種客戶端和服務端通訊的功能,而在對NIO框架選型上,dubbo交由使用者選擇,它集成了mina、netty、grizzly等各類NIO框架來搭建NIO伺服器和客戶端,並且利用dubbo的SPI擴充套件機制可以讓使用者自定義選擇。如果對SPI不太瞭解的朋友可以檢視

《dubbo原始碼解析(二)Dubbo擴充套件機制SPI》

接下來我們先來看看dubbo-remoting的包結構:

remoting目錄

我接下來解讀遠端通訊模組的內容並不是按照一個包一篇文章的編排,先來看看dubbo-remoting-api的包結構:

dubbo-remoting-api

可以看到,大篇幅的邏輯在dubbo-remoting-api中,所以我對於dubbo-remoting-api的解讀會分為下面五個部分來說明,其中第五點會在本文介紹,其他四點會分別用四篇文章來介紹:

  1. buffer包:緩衝在NIO框架中是很重要的存在,各個NIO框架都實現了自己相應的快取操作。這個buffer包下包括了緩衝區的介面以及抽象
  2. exchange包:資訊交換層,其中封裝了請求響應模式,在傳輸層之上重新封裝了 Request-Response 語義,為了滿足RPC的需求。這層可以認為專注在Request和Response攜帶的資訊上。該層是RPC呼叫的通訊基礎之一。
  3. telnet包:dubbo支援通過telnet命令來進行服務治理,該包下就封裝了這些通用指令的邏輯實現。
  4. transport包:網路傳輸層,它只負責單向訊息傳輸,是對 Mina,Netty,Grizzly 的抽象,它也可以擴充套件 UDP 傳輸。該層是RPC呼叫的通訊基礎之一。
  5. 最外層的原始碼:該部分我會在下面之間給出介紹。

為什麼我要把一個api分成這麼多文章來講解,我們先來看看下面的圖:

dubbo-framework

我們可以看到紅框內的是遠端通訊的框架,序列化我會在後面的主題中介紹,而Exchange層和Transport層在框架設計中起到了很重要的作用,也是支撐Remoting的核心,所以我要分開來介紹。

除了上述的五點外,根據慣例,我還是會分別介紹dubbo支援的實現客戶端和服務端通訊的七種方案,也就是說該遠端通訊模組我會用12篇文章詳細的講解。

最外層原始碼解析

(一)介面Endpoint

dubbo抽象出一個端的概念,也就是Endpoint介面,這個端就是一個點,而點對點之間是可以雙向傳輸。在端的基礎上在衍生出通道、客戶端以及服務端的概念,也就是下面要介紹的Channel、Client、Server三個介面。在傳輸層,其實Client和Server的區別只是在語義上區別,並不區分請求和應答職責,在交換層客戶端和服務端也是一個點,但是已經是有方向的點,所以區分了明確的請求和應答職責。兩者都具備傳送的能力,只是客戶端和服務端所關注的事情不一樣,這個在後面會分開介紹,而Endpoint介面抽象的方法就是它們共同擁有的方法。這也就是它們都能被抽象成端的原因。

來看一下它的原始碼:

public interface Endpoint {

    // 獲得該端的url
    URL getUrl();

    // 獲得該端的通道處理器
    ChannelHandler getChannelHandler();
    
    // 獲得該端的本地地址
    InetSocketAddress getLocalAddress();
    
    // 傳送訊息
    void send(Object message) throws RemotingException;
    
    // 傳送訊息,sent是是否已經傳送的標記
    void send(Object message,boolean sent) throws RemotingException;
    
    // 關閉
    void close();
    
    // 優雅的關閉,也就是加入了等待時間
    void close(int timeout);
    
    // 開始關閉
    void startClose();
    
    // 判斷是否已經關閉
    boolean isClosed();

}
複製程式碼
  1. 前三個方法是獲得該端本身的一些屬性,
  2. 兩個send方法是傳送訊息,其中第二個方法多了一個sent的引數,為了區分是否是第一次傳送訊息。
  3. 後面幾個方法是提供了關閉通道的操作以及判斷通道是否關閉的操作。

(二)介面Channel

該介面是通道介面,通道是通訊的載體。還是用自動販賣機的例子,自動販賣機就好比是一個通道,訊息傳送端會往通道輸入訊息,而接收端會從通道讀訊息。並且接收端發現通道沒有訊息,就去做其他事情了,不會造成阻塞。所以channel可以讀也可以寫,並且可以非同步讀寫。channel是client和server的傳輸橋樑。channel和client是一一對應的,也就是一個client對應一個channel,但是channel和server是多對一對關係,也就是一個server可以對應多個channel。

public interface Channel extends Endpoint {

    // 獲得遠端地址
    InetSocketAddress getRemoteAddress();

    // 判斷通道是否連線
    boolean isConnected();

    // 判斷是否有該key的值
    boolean hasAttribute(String key);

    // 獲得該key對應的值
    Object getAttribute(String key);

    // 新增屬性
    void setAttribute(String key,Object value);

    // 移除屬性
    void removeAttribute(String key);

}
複製程式碼

可以看到Channel繼承了Endpoint,也就是端抽象出來的方法也同樣是channel所需要的。上面的幾個方法很好理解,我就不多介紹了。

(三)介面ChannelHandler

@SPI
public interface ChannelHandler {

    // 連線該通道
    void connected(Channel channel) throws RemotingException;

    // 斷開該通道
    void disconnected(Channel channel) throws RemotingException;

    // 傳送給這個通道訊息
    void sent(Channel channel,Object message) throws RemotingException;

    // 從這個通道內接收訊息
    void received(Channel channel,Object message) throws RemotingException;

    // 從這個通道內捕獲異常
    void caught(Channel channel,Throwable exception) throws RemotingException;

}
複製程式碼

該介面是負責channel中的邏輯處理,並且可以看到這個介面有註解@SPI,是個可擴充套件介面,到時候都會在下面介紹各類NIO框架的時候會具體講到它的實現類。

(四)介面Client

public interface Client extends Endpoint,Channel,Resetable {
    
    // 重連
    void reconnect() throws RemotingException;

    // 重置,不推薦使用
    @Deprecated
    void reset(com.alibaba.dubbo.common.Parameters parameters);

}
複製程式碼

客戶端介面,可以看到它繼承了Endpoint、Channel和Resetable介面,繼承Endpoint的原因上面我已經提到過了,客戶端和服務端其實只是語義上的不同,客戶端就是一個點。繼承Channel是因為客戶端跟通道是一一對應的,所以做了這樣的設計,還繼承了Resetable介面是為了實現reset方法,該方法,不過已經打上@Deprecated註解,不推薦使用。除了這些客戶端就只需要關注一個重連的操作。

這裡插播一個公共模組下的介面Resetable:

public interface Resetable {

    // 用於根據新傳入的 url 屬性,重置自己內部的一些屬性
    void reset(URL url);

}
複製程式碼

該方法就是根據新的url來重置內部的屬性。

(五)介面Server

public interface Server extends Endpoint,Resetable {

    // 判斷是否繫結到本地埠,也就是該伺服器是否啟動成功,能夠連線、接收訊息,提供服務。
    boolean isBound();

    // 獲得連線該伺服器的通道
    Collection<Channel> getChannels();

    // 通過遠端地址獲得該地址對應的通道
    Channel getChannel(InetSocketAddress remoteAddress);

    @Deprecated
    void reset(com.alibaba.dubbo.common.Parameters parameters);

}
複製程式碼

該介面是服務端介面,繼承了Endpoint和Resetable,繼承Endpoint是因為服務端也是一個點,繼承Resetable介面是為了繼承reset方法。除了這些以外,服務端獨有的是檢測是否啟動成功,還有事獲得連線該伺服器上所有通道,這裡獲得所有通道其實就意味著獲得了所有連線該伺服器的客戶端,因為客戶端和通道是一一對應的。

(六)介面Codec && Codec2

這兩個都是編解碼器,那麼什麼叫做編解碼器,在網路中只是講資料看成是原始的位元組序列,但是我們的應用程式會把這些位元組組織成有意義的資訊,那麼網路位元組流和資料間的轉化就是很常見的任務。而編碼器是講應用程式的資料轉化為網路格式,解碼器則是講網路格式轉化為應用程式,同時具備這兩種功能的單一元件就叫編解碼器。在dubbo中Codec是老的編解碼器介面,而Codec2是新的編解碼器介面,並且dubbo已經用CodecAdapter把Codec適配成Codec2了。所以在這裡我就介紹Codec2介面,畢竟人總要往前看。

@SPI
public interface Codec2 {
  	//編碼
    @Adaptive({Constants.CODEC_KEY})
    void encode(Channel channel,ChannelBuffer buffer,Object message) throws IOException;
    //解碼
    @Adaptive({Constants.CODEC_KEY})
    Object decode(Channel channel,ChannelBuffer buffer) throws IOException;

    enum DecodeResult {
        // 需要更多輸入和忽略一些輸入
        NEED_MORE_INPUT,SKIP_SOME_INPUT
    }

}
複製程式碼

因為是編解碼器,所以有兩個方法分別是編碼和解碼,上述有以下幾個關注的:

  1. Codec2是一個可擴充套件的介面,因為有@SPI註解。
  2. 用到了Adaptive機制,首先去url中尋找key為codec的value,來載入url攜帶的配置中指定的codec的實現。
  3. 該介面中有個列舉型別DecodeResult,因為解碼過程中,需要解決 TCP 拆包、粘包的場景,所以增加了這兩種解碼結果,關於TCP 拆包、粘包的場景我就不多解釋,不懂得朋友可以google一下。

(七)介面Decodeable

public interface Decodeable {

    //解碼
    public void decode() throws Exception;

}
複製程式碼

該介面是可解碼的介面,該介面有兩個作用,第一個是在呼叫真正的decode方法實現的時候會有一些校驗,判斷是否可以解碼,並且對解碼失敗會有一些訊息設定;第二個是被用來message核對用的。後面看具體的實現會更瞭解該介面的作用。

(八)介面Dispatcher

@SPI(AllDispatcher.NAME)
public interface Dispatcher {

    // 排程
    @Adaptive({Constants.DISPATCHER_KEY,"dispather","channel.handler"})
    // The last two parameters are reserved for compatibility with the old configuration
    ChannelHandler dispatch(ChannelHandler handler,URL url);

}
複製程式碼

該介面是排程器介面,dispatch是執行緒池的排程方法,這邊有幾個注意點:

  1. 該介面是一個可擴充套件介面,並且預設實現AllDispatcher,也就是所有訊息都派發到執行緒池,包括請求,響應,連線事件,斷開事件,心跳等。
  2. 用了Adaptive註解,也就是按照URL中配置來載入實現類,後面兩個引數是為了相容老版本,如果這是三個key對應的值都為空,就選擇AllDispatcher來實現。

(九)介面Transporter

@SPI("netty")
public interface Transporter {
    
    // 繫結一個伺服器
    @Adaptive({Constants.SERVER_KEY,Constants.TRANSPORTER_KEY})
    Server bind(URL url,ChannelHandler handler) throws RemotingException;
    
    // 連線一個伺服器,即建立一個客戶端
    @Adaptive({Constants.CLIENT_KEY,Constants.TRANSPORTER_KEY})
    Client connect(URL url,ChannelHandler handler) throws RemotingException;

}
複製程式碼

該介面是網路傳輸介面,有以下幾個注意點:

  1. 該介面是一個可擴充套件的介面,並且預設實現NettyTransporter。
  2. 用了dubbo SPI擴充套件機制中的Adaptive註解,載入對應的bind方法,使用url攜帶的server或者transporter屬性值,載入對應的connect方法,使用url攜帶的client或者transporter屬性值,不瞭解SPI擴充套件機制的可以檢視《dubbo原始碼解析(二)Dubbo擴充套件機制SPI》

(十)Transporters

public class Transporters {

    static {
        // check duplicate jar package
        // 檢查重複的 jar 包
        Version.checkDuplicate(Transporters.class);
        Version.checkDuplicate(RemotingException.class);
    }

    private Transporters() {
    }

    public static Server bind(String url,ChannelHandler... handler) throws RemotingException {
        return bind(URL.valueOf(url),handler);
    }

    public static Server bind(URL url,ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        // 建立handler
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        // 呼叫Transporter的實現類物件的bind方法。
        // 例如實現NettyTransporter,則呼叫NettyTransporter的connect,並且返回相應的server
        return getTransporter().bind(url,handler);
    }

    public static Client connect(String url,ChannelHandler... handler) throws RemotingException {
        return connect(URL.valueOf(url),handler);
    }

    public static Client connect(URL url,ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        // 呼叫Transporter的實現類物件的connect方法。
        // 例如實現NettyTransporter,則呼叫NettyTransporter的connect,並且返回相應的client
        return getTransporter().connect(url,handler);
    }

    public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }

}
複製程式碼
  1. 該類用到了設計模式的外觀模式,通過該類的包裝,我們就不會看到內部具體的實現細節,這樣降低了程式的複雜度,也提高了程式的可維護性。比如這個類,包裝了呼叫各種實現Transporter介面的方法,通過getTransporter來獲得Transporter的實現物件,具體實現哪個實現類,取決於url中攜帶的配置資訊,如果url中沒有相應的配置,則預設選擇@SPI中的預設值netty。
  2. bind和connect方法分別有兩個過載方法,其中的操作只是把把字串的url轉化為URL物件。
  3. 靜態程式碼塊中檢測了一下jar包是否有重複。

(十一)RemotingException && ExecutionException && TimeoutException

這三個類是遠端通訊的異常類:

  1. RemotingException繼承了Exception類,是遠端通訊的基礎異常。
  2. ExecutionException繼承了RemotingException類,ExecutionException是遠端通訊的執行異常。
  3. TimeoutException繼承了RemotingException類,TimeoutException是超時異常。

為了不影響篇幅,這三個類原始碼我就不介紹了,因為比較簡單。

後記

該部分相關的原始碼解析地址:github.com/CrazyHZM/in…

該文章講解了dubbo-remoting-api中的包結構設計以及最外層的的原始碼解析,其中關鍵的是理解端的概念,明白在哪一層才區分了傳送和接收的職責,後續文章會按照我上面的編排去寫。如果我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見。