Dubbo原始碼解析(二十二)遠端呼叫——Protocol
遠端呼叫——Protocol
目標:介紹遠端呼叫中協議的設計和實現,介紹dubbo-rpc-api中的各種protocol包的原始碼,是重點內容。
前言
在遠端呼叫中協議是非常重要的一層,看下面這張圖:
該層是在資訊交換層之上,分為了並且夾雜在服務暴露和服務引用中間,為了有一個約定的方式進行呼叫。
dubbo支援不同協議的擴充套件,比如http、thrift等等,具體的可以參照官方檔案。本文講解的原始碼大部分是對於公共方法的實現,而具體的服務暴露和服務引用會在各個協議實現中講到。
下面是該包下面的類圖:
原始碼分析
(一)AbstractProtocol
該類是協議的抽象類,實現了Protocol介面,其中實現了一些公共的方法,抽象方法在它的子類AbstractProxyProtocol中定義。
1.屬性
/**
* 服務暴露者集合
*/
protected final Map<String,Exporter<?>> exporterMap = new ConcurrentHashMap<String,Exporter<?>>();
/**
* 服務引用者集合
*/
//TODO SOFEREFENCE
protected final Set<Invoker<?>> invokers = new ConcurrentHashSet<Invoker<?>>();
複製程式碼
2.serviceKey
protected static String serviceKey(URL url) {
// 獲得繫結的埠號
int port = url.getParameter(Constants.BIND_PORT_KEY,url.getPort());
return serviceKey(port,url.getPath(),url.getParameter(Constants.VERSION_KEY),url.getParameter(Constants.GROUP_KEY));
}
protected static String serviceKey(int port,String serviceName,String serviceVersion,String serviceGroup) {
return ProtocolUtils.serviceKey(port,serviceName,serviceVersion,serviceGroup);
}
複製程式碼
該方法是為了得到服務key group+"/"+serviceName+":"+serviceVersion+":"+port
3.destroy
@Override
public void destroy() {
// 遍歷服務引用實體
for (Invoker<?> invoker : invokers) {
if (invoker != null) {
// 從集合中移除
invokers.remove(invoker);
try {
if (logger.isInfoEnabled()) {
logger.info("Destroy reference: " + invoker.getUrl());
}
// 銷燬
invoker.destroy();
} catch (Throwable t) {
logger.warn(t.getMessage(),t);
}
}
}
// 遍歷服務暴露者
for (String key : new ArrayList<String>(exporterMap.keySet())) {
// 從集合中移除
Exporter<?> exporter = exporterMap.remove(key);
if (exporter != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Unexport service: " + exporter.getInvoker().getUrl());
}
// 取消暴露
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(),t);
}
}
}
}
複製程式碼
該方法是對invoker和exporter的銷燬。
(二)AbstractProxyProtocol
該類繼承了AbstractProtocol類,其中利用了代理工廠對AbstractProtocol中的兩個集合進行了填充,並且對異常做了處理。
1.屬性
/**
* rpc的異常類集合
*/
private final List<Class<?>> rpcExceptions = new CopyOnWriteArrayList<Class<?>>();
/**
* 代理工廠
*/
private ProxyFactory proxyFactory;
複製程式碼
2.export
@Override
@SuppressWarnings("unchecked")
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
// 獲得uri
final String uri = serviceKey(invoker.getUrl());
// 獲得服務暴露者
Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
if (exporter != null) {
return exporter;
}
// 新建一個執行緒
final Runnable runnable = doExport(proxyFactory.getProxy(invoker,true),invoker.getInterface(),invoker.getUrl());
exporter = new AbstractExporter<T>(invoker) {
/**
* 取消暴露
*/
@Override
public void unexport() {
super.unexport();
// 移除該key對應的服務暴露者
exporterMap.remove(uri);
if (runnable != null) {
try {
// 啟動執行緒
runnable.run();
} catch (Throwable t) {
logger.warn(t.getMessage(),t);
}
}
}
};
// 加入集合
exporterMap.put(uri,exporter);
return exporter;
}
複製程式碼
其中分為兩個步驟,建立一個exporter,放入到集合匯中。在建立exporter時對unexport方法進行了重寫。
3.refer
@Override
public <T> Invoker<T> refer(final Class<T> type,final URL url) throws RpcException {
// 通過代理獲得實體域
final Invoker<T> target = proxyFactory.getInvoker(doRefer(type,url),type,url);
Invoker<T> invoker = new AbstractInvoker<T>(type,url) {
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
// 獲得呼叫結果
Result result = target.invoke(invocation);
Throwable e = result.getException();
// 如果丟擲異常,則丟擲相應異常
if (e != null) {
for (Class<?> rpcException : rpcExceptions) {
if (rpcException.isAssignableFrom(e.getClass())) {
throw getRpcException(type,url,invocation,e);
}
}
}
return result;
} catch (RpcException e) {
// 丟擲未知異常
if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
e.setCode(getErrorCode(e.getCause()));
}
throw e;
} catch (Throwable e) {
throw getRpcException(type,e);
}
}
};
// 加入集合
invokers.add(invoker);
return invoker;
}
複製程式碼
該方法是服務引用,先從代理工廠中獲得Invoker物件target,然後建立了真實的invoker在重寫方法中呼叫代理的方法,最後加入到集合。
protected abstract <T> Runnable doExport(T impl,Class<T> type,URL url) throws RpcException;
protected abstract <T> T doRefer(Class<T> type,URL url) throws RpcException;
複製程式碼
可以看到其中抽象了服務引用和暴露的方法,讓各類協議各自實現。
(三)AbstractInvoker
該類是invoker的抽象方法,因為協議被夾在服務引用和服務暴露中間,無論什麼協議都有一些通用的Invoker和exporter的方法實現,而該類就是實現了Invoker的公共方法,而把doInvoke抽象出來,讓子類只關注這個方法。
1.屬性
/**
* 服務型別
*/
private final Class<T> type;
/**
* url物件
*/
private final URL url;
/**
* 附加值
*/
private final Map<String,String> attachment;
/**
* 是否可用
*/
private volatile boolean available = true;
/**
* 是否銷燬
*/
private AtomicBoolean destroyed = new AtomicBoolean(false);
複製程式碼
2.convertAttachment
private static Map<String,String> convertAttachment(URL url,String[] keys) {
if (keys == null || keys.length == 0) {
return null;
}
Map<String,String> attachment = new HashMap<String,String>();
// 遍歷key,把值放入附加值集合中
for (String key : keys) {
String value = url.getParameter(key);
if (value != null && value.length() > 0) {
attachment.put(key,value);
}
}
return attachment;
}
複製程式碼
該方法是轉化為附加值,把url中的值轉化為服務呼叫invoker的附加值。
3.invoke
@Override
public Result invoke(Invocation inv) throws RpcException {
// if invoker is destroyed due to address refresh from registry,let's allow the current invoke to proceed
// 如果服務引用銷燬,則列印告警日誌,但是通過
if (destroyed.get()) {
logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed,"
+ ",dubbo version is " + Version.getVersion() + ",this invoker should not be used any longer");
}
RpcInvocation invocation = (RpcInvocation) inv;
// 會話域中加入該呼叫鏈
invocation.setInvoker(this);
// 把附加值放入會話域
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);
}
// 把上下文的附加值放入會話域
Map<String,String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
/**
* invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,* because the {@link RpcContext#setAttachment(String,String)} is passed in the Filter when the call is triggered
* by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work,which is
* a mistake in most cases (for example,through Filter to RpcContext output traceId and spanId and other information).
*/
invocation.addAttachments(contextAttachments);
}
// 如果開啟的是非同步呼叫,則把該設定也放入附加值
if (getUrl().getMethodParameter(invocation.getMethodName(),Constants.ASYNC_KEY,false)) {
invocation.setAttachment(Constants.ASYNC_KEY,Boolean.TRUE.toString());
}
// 加入編號
RpcUtils.attachInvocationIdIfAsync(getUrl(),invocation);
try {
// 執行呼叫鏈
return doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
return new RpcResult(e);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
return new RpcResult(te);
}
} catch (RpcException e) {
if (e.isBiz()) {
return new RpcResult(e);
} else {
throw e;
}
} catch (Throwable e) {
return new RpcResult(e);
}
}
複製程式碼
該方法做了一些公共的操作,比如服務引用銷燬的檢測,加入附加值,加入呼叫鏈實體域到會話域中等。然後執行了doInvoke抽象方法。各協議自己去實現。
(四)AbstractExporter
該類和AbstractInvoker類似,也是在服務暴露中實現了一些公共方法。
1.屬性
/**
* 實體域
*/
private final Invoker<T> invoker;
/**
* 是否取消暴露服務
*/
private volatile boolean unexported = false;
複製程式碼
2.unexport
@Override
public void unexport() {
// 如果已經消取消暴露,則之間返回
if (unexported) {
return;
}
// 設定為true
unexported = true;
// 銷燬該實體域
getInvoker().destroy();
}
複製程式碼
(五)InvokerWrapper
該類是Invoker的包裝類,其中用到類裝飾模式,不過並沒有實現實際的功能增強。
public class InvokerWrapper<T> implements Invoker<T> {
/**
* invoker物件
*/
private final Invoker<T> invoker;
private final URL url;
public InvokerWrapper(Invoker<T> invoker,URL url) {
this.invoker = invoker;
this.url = url;
}
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return url;
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
}
複製程式碼
(六)ProtocolFilterWrapper
該類實現了Protocol介面,其中也用到了裝飾模式,是對Protocol的裝飾,是在服務引用和暴露的方法上加上了過濾器功能。
1.buildInvokerChain
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker,String key,String group) {
Invoker<T> last = invoker;
// 獲得過濾器的所有擴充套件實現類例項集合
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(),key,group);
if (!filters.isEmpty()) {
// 從最後一個過濾器開始迴圈,建立一個帶有過濾器鏈的invoker物件
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
// 記錄last的invoker
final Invoker<T> next = last;
// 新建last
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
/**
* 關鍵在這裡,呼叫下一個filter代表的invoker,把每一個過濾器串起來
* @param invocation
* @return
* @throws RpcException
*/
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next,invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
複製程式碼
該方法就是建立帶 Filter 鏈的 Invoker 物件。倒序的把每一個過濾器串連起來,形成一個invoker。
2.export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 如果是註冊中心,則直接暴露服務
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
// 服務提供側暴露服務
return protocol.export(buildInvokerChain(invoker,Constants.SERVICE_FILTER_KEY,Constants.PROVIDER));
}
複製程式碼
該方法是在服務暴露上做了過濾器鏈的增強,也就是加上了過濾器。
3.refer
@Override
public <T> Invoker<T> refer(Class<T> type,URL url) throws RpcException {
// 如果是註冊中心,則直接引用
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type,url);
}
// 消費者側引用服務
return buildInvokerChain(protocol.refer(type,Constants.REFERENCE_FILTER_KEY,Constants.CONSUMER);
}
複製程式碼
該方法是在服務引用上做了過濾器鏈的增強,也就是加上了過濾器。
(七)ProtocolListenerWrapper
該類也實現了Protocol,也是裝飾了Protocol介面,但是它是在服務引用和暴露過程中加上了監聽器的功能。
1.export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 如果是註冊中心,則暴露該invoker
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
// 建立一個暴露者監聽器包裝類物件
return new ListenerExporterWrapper<T>(protocol.export(invoker),Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(),Constants.EXPORTER_LISTENER_KEY)));
}
複製程式碼
該方法是在服務暴露上做了監聽器功能的增強,也就是加上了監聽器。
2.refer
@Override
public <T> Invoker<T> refer(Class<T> type,URL url) throws RpcException {
// 如果是註冊中心。則直接引用服務
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type,url);
}
// 建立引用服務監聽器包裝類物件
return new ListenerInvokerWrapper<T>(protocol.refer(type,Collections.unmodifiableList(
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url,Constants.INVOKER_LISTENER_KEY)));
}
複製程式碼
該方法是在服務引用上做了監聽器功能的增強,也就是加上了監聽器。
後記
該部分相關的原始碼解析地址:github.com/CrazyHZM/in…
該文章講解了遠端呼叫中關於協議的部分,其實就是講了一些公共的方法,並且把關鍵方法抽象出來讓子類實現,具體的方法實現都在各個協議中自己實現。接下來我將開始對rpc模組的代理進行講解。