1. 程式人生 > 程式設計 >Dubbo原始碼解析(二十五)遠端呼叫——hessian協議

Dubbo原始碼解析(二十五)遠端呼叫——hessian協議

遠端呼叫——hessian協議

目標:介紹遠端呼叫中跟hessian協議相關的設計和實現,介紹dubbo-rpc-hessian的原始碼。

前言

本文講解多是dubbo整合的第二種協議,hessian協議,Hessian 是 Caucho 開源的一個 RPC 框架,其通訊效率高於 WebService 和 Java 自帶的序列化。dubbo整合hessian所提供的hessian協議相關介紹可以參考官方檔案,我就不再贅述。

檔案地址:dubbo.apache.org/zh-cn/docs/…

原始碼分析

(一)DubboHessianURLConnectionFactory

該類繼承了HessianURLConnectionFactory類,是dubbo,用於建立與伺服器的連線的內部工廠,重寫了父類中open方法。

public class DubboHessianURLConnectionFactory extends HessianURLConnectionFactory {

    /**
     * 開啟與HTTP伺服器的新連線或迴圈連線
     * @param url
     * @return
     * @throws IOException
     */
    @Override
    public HessianConnection open(URL url) throws IOException {
        // 獲得一個連線
        HessianConnection connection = super
.open(url); // 獲得上下文 RpcContext context = RpcContext.getContext(); for (String key : context.getAttachments().keySet()) { // 在http協議頭裡面加入dubbo中附加值,key為 header+key value為附加值的value connection.addHeader(Constants.DEFAULT_EXCHANGER + key,context.getAttachment(key)); } return
connection; } } 複製程式碼

在hessian上加入dubbo自己所需要的附加值,放到協議頭裡面進行傳送。

(二)HttpClientConnection

該類是基於HttpClient封裝來實現HessianConnection介面,其中邏輯比較簡單。

public class HttpClientConnection implements HessianConnection {

    /**
     * http客戶端物件
     */
    private final HttpClient httpClient;

    /**
     * 位元組輸出流
     */
    private final ByteArrayOutputStream output;

    /**
     * http post請求物件
     */
    private final HttpPost request;

    /**
     * http 響應物件
     */
    private volatile HttpResponse response;

    public HttpClientConnection(HttpClient httpClient,URL url) {
        this.httpClient = httpClient;
        this.output = new ByteArrayOutputStream();
        this.request = new HttpPost(url.toString());
    }

    /**
     * 增加協議頭
     * @param key
     * @param value
     */
    @Override
    public void addHeader(String key,String value) {
        request.addHeader(new BasicHeader(key,value));
    }

    @Override
    public OutputStream getOutputStream() throws IOException {
        return output;
    }

    /**
     * 傳送請求
     * @throws IOException
     */
    @Override
    public void sendRequest() throws IOException {
        request.setEntity(new ByteArrayEntity(output.toByteArray()));
        this.response = httpClient.execute(request);
    }

    /**
     * 獲得請求後的狀態碼
     * @return
     */
    @Override
    public int getStatusCode() {
        return response == null || response.getStatusLine() == null ? 0 : response.getStatusLine().getStatusCode();
    }

    @Override
    public String getStatusMessage() {
        return response == null || response.getStatusLine() == null ? null : response.getStatusLine().getReasonPhrase();
    }

    @Override
    public String getContentEncoding() {
        return (response == null || response.getEntity() == null || response.getEntity().getContentEncoding() == null) ? null : response.getEntity().getContentEncoding().getValue();
    }

    @Override
    public InputStream getInputStream() throws IOException {
        return response == null || response.getEntity() == null ? null : response.getEntity().getContent();
    }

    @Override
    public void close() throws IOException {
        HttpPost request = this.request;
        if (request != null) {
            request.abort();
        }
    }

    @Override
    public void destroy() throws IOException {
    }
複製程式碼

(三)HttpClientConnectionFactory

該類實現了HessianConnectionFactory介面,是建立HttpClientConnection的工廠類。該類的實現跟DubboHessianURLConnectionFactory類類似,但是DubboHessianURLConnectionFactory是標準的Hessian介面呼叫會採用的工廠類,而HttpClientConnectionFactory是Dubbo 的 Hessian 協議呼叫。當然Dubbo 的 Hessian 協議也是基於http的。

public class HttpClientConnectionFactory implements HessianConnectionFactory {

    /**
     * httpClient物件
     */
    private final HttpClient httpClient = new DefaultHttpClient();

    @Override
    public void setHessianProxyFactory(HessianProxyFactory factory) {
        // 設定連線超時時間
        HttpConnectionParams.setConnectionTimeout(httpClient.getParams(),(int) factory.getConnectTimeout());
        // 設定讀取資料時阻塞鏈路的超時時間
        HttpConnectionParams.setSoTimeout(httpClient.getParams(),(int) factory.getReadTimeout());
    }

    
    @Override
    public HessianConnection open(URL url) throws IOException {
        // 建立一個HttpClientConnection例項
        HttpClientConnection httpClientConnection = new HttpClientConnection(httpClient,url);
        // 獲得上下文,用來獲得附加值
        RpcContext context = RpcContext.getContext();
        // 遍歷附加值,放入到協議頭裡面
        for (String key : context.getAttachments().keySet()) {
            httpClientConnection.addHeader(Constants.DEFAULT_EXCHANGER + key,context.getAttachment(key));
        }
        return httpClientConnection;
    }

}
複製程式碼

實現了兩個方法,第一個方法是給http連線設定兩個引數配置,第二個方法是建立一個連線。

(四)HessianProtocol

該類繼承了AbstractProxyProtocol類,是hessian協議的實現類。其中實現類基於hessian協議的服務引用、服務暴露等方法。

1.屬性

/**
 * http伺服器集合
 * key為ip:port
 */
private final Map<String,HttpServer> serverMap = new ConcurrentHashMap<String,HttpServer>();

/**
 * HessianSkeleto&emsp;集合
 * key為服務名
 */
private final Map<String,HessianSkeleton> skeletonMap = new ConcurrentHashMap<String,HessianSkeleton>();

/**
 * HttpBinder物件,預設是jetty實現
 */
private HttpBinder httpBinder;
複製程式碼

2.doExport

@Override
protected <T> Runnable doExport(T impl,Class<T> type,URL url) throws RpcException {
    // 獲得ip地址
    String addr = getAddr(url);
    // 獲得http伺服器物件
    HttpServer server = serverMap.get(addr);
    // 如果為空,則重新建立一個server,然後放入集合
    if (server == null) {
        server = httpBinder.bind(url,new HessianHandler());
        serverMap.put(addr,server);
    }
    // 獲得服務path
    final String path = url.getAbsolutePath();
    // 建立Hessian服務端物件
    final HessianSkeleton skeleton = new HessianSkeleton(impl,type);
    // 加入集合
    skeletonMap.put(path,skeleton);

    // 獲得通用的path
    final String genericPath = path + "/" + Constants.GENERIC_KEY;
    // 加入集合
    skeletonMap.put(genericPath,new HessianSkeleton(impl,GenericService.class));

    // 返回一個執行緒
    return new Runnable() {
        @Override
        public void run() {
            skeletonMap.remove(path);
            skeletonMap.remove(genericPath);
        }
    };
}
複製程式碼

該方法是服務暴露的主要邏輯實現。

3.doRefer

@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(Class<T> serviceType,URL url) throws RpcException {
    // 獲得泛化的引數
    String generic = url.getParameter(Constants.GENERIC_KEY);
    // 是否是泛化呼叫
    boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);
    // 如果是泛化呼叫。則設定泛化的path和附加值
    if (isGeneric) {
        RpcContext.getContext().setAttachment(Constants.GENERIC_KEY,generic);
        url = url.setPath(url.getPath() + "/" + Constants.GENERIC_KEY);
    }

    // 建立代理工廠
    HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();
    // 是否是Hessian2的請求 預設為否
    boolean isHessian2Request = url.getParameter(Constants.HESSIAN2_REQUEST_KEY,Constants.DEFAULT_HESSIAN2_REQUEST);
    // 設定是否應使用Hessian協議的版本2來解析請求
    hessianProxyFactory.setHessian2Request(isHessian2Request);
    // 是否應為遠端呼叫啟用過載方法,預設為否
    boolean isOverloadEnabled = url.getParameter(Constants.HESSIAN_OVERLOAD_METHOD_KEY,Constants.DEFAULT_HESSIAN_OVERLOAD_METHOD);
    // 設定是否應為遠端呼叫啟用過載方法。
    hessianProxyFactory.setOverloadEnabled(isOverloadEnabled);
    // 獲得client實現方式,預設為jdk
    String client = url.getParameter(Constants.CLIENT_KEY,Constants.DEFAULT_HTTP_CLIENT);
    if ("httpclient".equals(client)) {
        // 用http來建立
        hessianProxyFactory.setConnectionFactory(new HttpClientConnectionFactory());
    } else if (client != null && client.length() > 0 && !Constants.DEFAULT_HTTP_CLIENT.equals(client)) {
        // 丟擲不支援的協議異常
        throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!");
    } else {
        // 建立一個HessianConnectionFactory物件
        HessianConnectionFactory factory = new DubboHessianURLConnectionFactory();
        // 設定代理工廠
        factory.setHessianProxyFactory(hessianProxyFactory);
        // 設定工廠
        hessianProxyFactory.setConnectionFactory(factory);
    }
    // 獲得超時時間
    int timeout = url.getParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
    // 設定超時時間
    hessianProxyFactory.setConnectTimeout(timeout);
    hessianProxyFactory.setReadTimeout(timeout);
    // 建立代理
    return (T) hessianProxyFactory.create(serviceType,url.setProtocol("http").toJavaURL(),Thread.currentThread().getContextClassLoader());
}
複製程式碼

該方法是服務引用的主要邏輯實現,根據客戶端配置,來選擇標準 Hessian 介面呼叫還是Dubbo 的 Hessian 協議呼叫。

4.getErrorCode

@Override
protected int getErrorCode(Throwable e) {
    // 如果屬於HessianConnectionException異常
    if (e instanceof HessianConnectionException) {
        if (e.getCause() != null) {
            Class<?> cls = e.getCause().getClass();
            // 如果屬於超時異常,則返回超時異常
            if (SocketTimeoutException.class.equals(cls)) {
                return RpcException.TIMEOUT_EXCEPTION;
            }
        }
        // 否則返回網路異常
        return RpcException.NETWORK_EXCEPTION;
    } else if (e instanceof HessianMethodSerializationException) {
        // 序列化異常
        return RpcException.SERIALIZATION_EXCEPTION;
    }
    return super.getErrorCode(e);
}
複製程式碼

該方法是針對異常的處理。

5.HessianHandler

private class HessianHandler implements HttpHandler {

    @Override
    public void handle(HttpServletRequest request,HttpServletResponse response)
            throws IOException,ServletException {
        // 獲得請求的uri
        String uri = request.getRequestURI();
        // 獲得對應的HessianSkeleton物件
        HessianSkeleton skeleton = skeletonMap.get(uri);
        // 如果如果不是post方法
        if (!request.getMethod().equalsIgnoreCase("POST")) {
            // 返回狀態設定為500
            response.setStatus(500);
        } else {
            // 設定遠端地址
            RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(),request.getRemotePort());

            // 獲得請求頭內容
            Enumeration<String> enumeration = request.getHeaderNames();
            // 遍歷請求頭內容
            while (enumeration.hasMoreElements()) {
                String key = enumeration.nextElement();
                // 如果key開頭是deader,則把附加值取出來放入上下文
                if (key.startsWith(Constants.DEFAULT_EXCHANGER)) {
                    RpcContext.getContext().setAttachment(key.substring(Constants.DEFAULT_EXCHANGER.length()),request.getHeader(key));
                }
            }

            try {
                // 執行下一個
                skeleton.invoke(request.getInputStream(),response.getOutputStream());
            } catch (Throwable e) {
                throw new ServletException(e);
            }
        }
    }

}
複製程式碼

該內部類是Hessian的處理器,用來處理請求中的協議頭內容。

後記

該部分相關的原始碼解析地址:github.com/CrazyHZM/in…

該文章講解了遠端呼叫中關於hessian協議的部分,內容比較簡單,可以參考著官方檔案瞭解一下。接下來我將開始對rpc模組關於Http協議部分進行講解。