Dubbo原始碼解析(二十六)遠端呼叫——http協議
阿新 • • 發佈:2020-06-24
遠端呼叫——http協議
目標:介紹遠端呼叫中跟http協議相關的設計和實現,介紹dubbo-rpc-http的原始碼。
前言
基於HTTP表單的遠端呼叫協議,採用 Spring 的HttpInvoker實現,關於http協議就不用多說了吧。
原始碼分析
(一)HttpRemoteInvocation
該類繼承了RemoteInvocation類,是在RemoteInvocation上增加了泛化呼叫的引數設定,以及增加了dubbo本身需要的附加值設定。
public class HttpRemoteInvocation extends RemoteInvocation {
private static final long serialVersionUID = 1L;
/**
* dubbo的附加值名稱
*/
private static final String dubboAttachmentsAttrName = "dubbo.attachments";
public HttpRemoteInvocation(MethodInvocation methodInvocation) {
super(methodInvocation);
// 把附加值加入到會話域的屬性裡面
addAttribute(dubboAttachmentsAttrName,new HashMap<String,String>(RpcContext.getContext().getAttachments()));
}
@Override
public Object invoke(Object targetObject) throws NoSuchMethodException,IllegalAccessException,InvocationTargetException {
// 獲得上下文
RpcContext context = RpcContext.getContext();
// 獲得附加值
context.setAttachments((Map<String,String>) getAttribute(dubboAttachmentsAttrName));
// 泛化標誌
String generic = (String) getAttribute(Constants.GENERIC_KEY);
// 如果不為空,則設定泛化標誌
if (StringUtils.isNotEmpty(generic)) {
context.setAttachment(Constants.GENERIC_KEY,generic);
}
try {
// 呼叫下一個呼叫鏈
return super.invoke(targetObject);
} finally {
context.setAttachments(null);
}
}
}
複製程式碼
(二)HttpProtocol
該類是http實現的核心,跟我在《dubbo原始碼解析(二十五)遠端呼叫——hessian協議》中講到的HessianProtocol實現有很多地方相似。
1.屬性
/**
* 預設的埠號
*/
public static final int DEFAULT_PORT = 80;
/**
* http伺服器集合
*/
private final Map<String,HttpServer> serverMap = new ConcurrentHashMap<String,HttpServer>();
/**
* Spring HttpInvokerServiceExporter 集合
*/
private final Map<String,HttpInvokerServiceExporter> skeletonMap = new ConcurrentHashMap<String,HttpInvokerServiceExporter>();
/**
* HttpBinder物件
*/
private HttpBinder httpBinder;
複製程式碼
2.doExport
@Override
protected <T> Runnable doExport(final T impl,Class<T> type,URL url) throws RpcException {
// 獲得ip地址
String addr = getAddr(url);
// 獲得http伺服器
HttpServer server = serverMap.get(addr);
// 如果伺服器為空,則重新建立伺服器,並且加入到集合
if (server == null) {
server = httpBinder.bind(url,new InternalHandler());
serverMap.put(addr,server);
}
// 獲得服務path
final String path = url.getAbsolutePath();
// 加入集合
skeletonMap.put(path,createExporter(impl,type));
// 通用path
final String genericPath = path + "/" + Constants.GENERIC_KEY;
// 新增泛化的服務呼叫
skeletonMap.put(genericPath,GenericService.class));
return new Runnable() {
@Override
public void run() {
skeletonMap.remove(path);
skeletonMap.remove(genericPath);
}
};
}
複製程式碼
該方法是暴露服務等邏輯,因為dubbo實現http協議採用了Spring 的HttpInvoker實現,所以呼叫了createExporter方法來建立建立HttpInvokerServiceExporter。
3.createExporter
private <T> HttpInvokerServiceExporter createExporter(T impl,Class<?> type) {
// 建立HttpInvokerServiceExporter
final HttpInvokerServiceExporter httpServiceExporter = new HttpInvokerServiceExporter();
// 設定要訪問的服務的介面
httpServiceExporter.setServiceInterface(type);
// 設定服務實現
httpServiceExporter.setService(impl);
try {
// 在BeanFactory設定了所有提供的bean屬性,初始化bean的時候執行,可以針對某個具體的bean進行配
httpServiceExporter.afterPropertiesSet();
} catch (Exception e) {
throw new RpcException(e.getMessage(),e);
}
return httpServiceExporter;
}
複製程式碼
該方法是建立一個spring 的HttpInvokerServiceExporter。
4.doRefer
@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(final Class<T> serviceType,final URL url) throws RpcException {
// 獲得泛化配置
final String generic = url.getParameter(Constants.GENERIC_KEY);
// 是否為泛化呼叫
final boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);
// 建立HttpInvokerProxyFactoryBean
final HttpInvokerProxyFactoryBean httpProxyFactoryBean = new HttpInvokerProxyFactoryBean();
// 設定RemoteInvocation的工廠類
httpProxyFactoryBean.setRemoteInvocationFactory(new RemoteInvocationFactory() {
/**
* 為給定的AOP方法呼叫建立一個新的RemoteInvocation物件。
* @param methodInvocation
* @return
*/
@Override
public RemoteInvocation createRemoteInvocation(MethodInvocation methodInvocation) {
// 新建一個HttpRemoteInvocation
RemoteInvocation invocation = new HttpRemoteInvocation(methodInvocation);
// 如果是泛化呼叫
if (isGeneric) {
// 設定標誌
invocation.addAttribute(Constants.GENERIC_KEY,generic);
}
return invocation;
}
});
// 獲得identity message
String key = url.toIdentityString();
// 如果是泛化呼叫
if (isGeneric) {
key = key + "/" + Constants.GENERIC_KEY;
}
// 設定服務url
httpProxyFactoryBean.setServiceUrl(key);
// 設定服務介面
httpProxyFactoryBean.setServiceInterface(serviceType);
// 獲得客戶端引數
String client = url.getParameter(Constants.CLIENT_KEY);
if (client == null || client.length() == 0 || "simple".equals(client)) {
// 建立SimpleHttpInvokerRequestExecutor連線池 使用的是JDK HttpClient
SimpleHttpInvokerRequestExecutor httpInvokerRequestExecutor = new SimpleHttpInvokerRequestExecutor() {
@Override
protected void prepareConnection(HttpURLConnection con,int contentLength) throws IOException {
super.prepareConnection(con,contentLength);
// 設定讀取超時時間
con.setReadTimeout(url.getParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT));
// 設定連線超時時間
con.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY,Constants.DEFAULT_CONNECT_TIMEOUT));
}
};
httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor);
} else if ("commons".equals(client)) {
// 建立 HttpComponentsHttpInvokerRequestExecutor連線池 使用的是Apache HttpClient
HttpComponentsHttpInvokerRequestExecutor httpInvokerRequestExecutor = new HttpComponentsHttpInvokerRequestExecutor();
// 設定讀取超時時間
httpInvokerRequestExecutor.setReadTimeout(url.getParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT));
// 設定連線超時時間
httpInvokerRequestExecutor.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY,Constants.DEFAULT_CONNECT_TIMEOUT));
httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor);
} else {
throw new IllegalStateException("Unsupported http protocol client " + client + ",only supported: simple,commons");
}
httpProxyFactoryBean.afterPropertiesSet();
// 返回HttpInvokerProxyFactoryBean物件
return (T) httpProxyFactoryBean.getObject();
}
複製程式碼
該方法是服務引用的方法,其中根據url配置simple還是commons來選擇建立連線池的方式。其中的區別就是SimpleHttpInvokerRequestExecutor使用的是JDK HttpClient,HttpComponentsHttpInvokerRequestExecutor 使用的是Apache HttpClient。
5.getErrorCode
@Override
protected int getErrorCode(Throwable e) {
if (e instanceof RemoteAccessException) {
e = e.getCause();
}
if (e != null) {
Class<?> cls = e.getClass();
if (SocketTimeoutException.class.equals(cls)) {
// 返回超時異常
return RpcException.TIMEOUT_EXCEPTION;
} else if (IOException.class.isAssignableFrom(cls)) {
// 返回網路異常
return RpcException.NETWORK_EXCEPTION;
} else if (ClassNotFoundException.class.isAssignableFrom(cls)) {
// 返回序列化異常
return RpcException.SERIALIZATION_EXCEPTION;
}
}
return super.getErrorCode(e);
}
複製程式碼
該方法是處理異常情況,設定錯誤碼。
6.InternalHandler
private class InternalHandler implements HttpHandler {
@Override
public void handle(HttpServletRequest request,HttpServletResponse response)
throws IOException,ServletException {
// 獲得請求uri
String uri = request.getRequestURI();
// 獲得服務暴露者HttpInvokerServiceExporter物件
HttpInvokerServiceExporter skeleton = skeletonMap.get(uri);
// 如果不是post,則返回碼設定500
if (!request.getMethod().equalsIgnoreCase("POST")) {
response.setStatus(500);
} else {
// 遠端地址放到上下文
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(),request.getRemotePort());
try {
// 呼叫下一個呼叫
skeleton.handleRequest(request,response);
} catch (Throwable e) {
throw new ServletException(e);
}
}
}
}
複製程式碼
該內部類實現了HttpHandler,做了設定遠端地址的邏輯。
後記
該部分相關的原始碼解析地址:github.com/CrazyHZM/in…
該文章講解了遠端呼叫中關於http協議的部分,內容比較簡單,可以參考著官方檔案瞭解一下。接下來我將開始對rpc模組關於injvm本地呼叫部分進行講解。