Dubbo原始碼解析(二十)遠端呼叫——Filter
遠端呼叫——Filter
目標:介紹dubbo-rpc-api中的各種filter過濾器的實現邏輯。
前言
本文會介紹在dubbo中的過濾器,先來看看下面的圖:
可以看到紅色圈圈不服,在服務發現和服務引用中都會進行一些過濾器過濾。具體有哪些過濾器,就看下面的介紹。
原始碼分析
(一)AccessLogFilter
該過濾器是對記錄日誌的過濾器,它所做的工作就是把引用服務或者暴露服務的呼叫鏈資訊寫入到檔案中。日誌訊息先被放入日誌集合,然後加入到日誌佇列,然後被放入到寫入檔案到任務中,最後進入檔案。
1.屬性
private static final Logger logger = LoggerFactory.getLogger(AccessLogFilter.class);
/**
* 日誌訪問名稱,預設的日誌訪問名稱
*/
private static final String ACCESS_LOG_KEY = "dubbo.accesslog";
/**
* 日期格式
*/
private static final String FILE_DATE_FORMAT = "yyyyMMdd";
private static final String MESSAGE_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
/**
* 日誌佇列大小
*/
private static final int LOG_MAX_BUFFER = 5000;
/**
* 日誌輸出的頻率
*/
private static final long LOG_OUTPUT_INTERVAL = 5000;
/**
* 日誌佇列 key為訪問日誌的名稱,value為該日誌名稱對應的日誌集合
*/
private final ConcurrentMap<String,Set<String>> logQueue = new ConcurrentHashMap<String,Set<String>>();
/**
* 日誌執行緒池
*/
private final ScheduledExecutorService logScheduled = Executors.newScheduledThreadPool(2 ,new NamedThreadFactory("Dubbo-Access-Log",true));
/**
* 日誌記錄任務
*/
private volatile ScheduledFuture<?> logFuture = null;
複製程式碼
按照我上面講到日誌流向,日誌先進入到是日誌佇列中的日誌集合,再進入logQueue,在進入logFuture,最後落地到檔案。
2.init
private void init() {
// synchronized是一個重操作消耗效能,所有加上判空
if (logFuture == null) {
synchronized (logScheduled) {
// 為了不重複初始化
if (logFuture == null) {
// 建立日誌記錄任務
logFuture = logScheduled.scheduleWithFixedDelay(new LogTask(),LOG_OUTPUT_INTERVAL,TimeUnit.MILLISECONDS);
}
}
}
}
複製程式碼
該方法是初始化方法,就建立了日誌記錄任務。
3.log
private void log(String accesslog,String logmessage) {
init();
Set<String> logSet = logQueue.get(accesslog);
if (logSet == null) {
logQueue.putIfAbsent(accesslog,new ConcurrentHashSet<String>());
logSet = logQueue.get(accesslog);
}
if (logSet.size() < LOG_MAX_BUFFER) {
logSet.add(logmessage);
}
}
複製程式碼
該方法是增加日誌資訊到日誌集合中。
4.invoke
@Override
public Result invoke(Invoker<?> invoker,Invocation inv) throws RpcException {
try {
// 獲得日誌名稱
String accesslog = invoker.getUrl().getParameter(Constants.ACCESS_LOG_KEY);
if (ConfigUtils.isNotEmpty(accesslog)) {
// 獲得rpc上下文
RpcContext context = RpcContext.getContext();
// 獲得呼叫的介面名稱
String serviceName = invoker.getInterface().getName();
// 獲得版本號
String version = invoker.getUrl().getParameter(Constants.VERSION_KEY);
// 獲得組,是消費者側還是生產者側
String group = invoker.getUrl().getParameter(Constants.GROUP_KEY);
StringBuilder sn = new StringBuilder();
sn.append("[").append(new SimpleDateFormat(MESSAGE_DATE_FORMAT).format(new Date())).append("] ").append(context.getRemoteHost()).append(":").append(context.getRemotePort())
.append(" -> ").append(context.getLocalHost()).append(":").append(context.getLocalPort())
.append(" - ");
// 拼接組
if (null != group && group.length() > 0) {
sn.append(group).append("/");
}
// 拼接服務名稱
sn.append(serviceName);
// 拼接版本號
if (null != version && version.length() > 0) {
sn.append(":").append(version);
}
sn.append(" ");
// 拼接方法名
sn.append(inv.getMethodName());
sn.append("(");
// 拼接引數型別
Class<?>[] types = inv.getParameterTypes();
// 拼接引數型別
if (types != null && types.length > 0) {
boolean first = true;
for (Class<?> type : types) {
if (first) {
first = false;
} else {
sn.append(",");
}
sn.append(type.getName());
}
}
sn.append(") ");
// 拼接引數
Object[] args = inv.getArguments();
if (args != null && args.length > 0) {
sn.append(JSON.toJSONString(args));
}
String msg = sn.toString();
// 如果用預設的日誌訪問名稱
if (ConfigUtils.isDefault(accesslog)) {
LoggerFactory.getLogger(ACCESS_LOG_KEY + "." + invoker.getInterface().getName()).info(msg);
} else {
// 把日誌加入集合
log(accesslog,msg);
}
}
} catch (Throwable t) {
logger.warn("Exception in AcessLogFilter of service(" + invoker + " -> " + inv + ")",t);
}
// 呼叫下一個呼叫鏈
return invoker.invoke(inv);
}
複製程式碼
該方法是最重要的方法,從拼接了日誌資訊,把日誌加入到集合,並且呼叫下一個呼叫鏈。
4.LogTask
private class LogTask implements Runnable {
@Override
public void run() {
try {
if (logQueue != null && logQueue.size() > 0) {
// 遍歷日誌佇列
for (Map.Entry<String,Set<String>> entry : logQueue.entrySet()) {
try {
// 獲得日誌名稱
String accesslog = entry.getKey();
// 獲得日誌集合
Set<String> logSet = entry.getValue();
// 如果檔案不存在則建立檔案
File file = new File(accesslog);
File dir = file.getParentFile();
if (null != dir && !dir.exists()) {
dir.mkdirs();
}
if (logger.isDebugEnabled()) {
logger.debug("Append log to " + accesslog);
}
if (file.exists()) {
// 獲得現在的時間
String now = new SimpleDateFormat(FILE_DATE_FORMAT).format(new Date());
// 獲得檔案最後一次修改的時間
String last = new SimpleDateFormat(FILE_DATE_FORMAT).format(new Date(file.lastModified()));
// 如果檔案最後一次修改的時間不等於現在的時間
if (!now.equals(last)) {
// 獲得重新生成檔名稱
File archive = new File(file.getAbsolutePath() + "." + last);
// 因為都是file的絕對路徑,所以沒有進行移動檔案,而是修改檔名
file.renameTo(archive);
}
}
// 把日誌集合中的日誌寫入到檔案
FileWriter writer = new FileWriter(file,true);
try {
for (Iterator<String> iterator = logSet.iterator();
iterator.hasNext();
iterator.remove()) {
writer.write(iterator.next());
writer.write("\r\n");
}
writer.flush();
} finally {
writer.close();
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
}
}
複製程式碼
該內部類實現了Runnable,是把日誌訊息落地到檔案到執行緒。
(二)ActiveLimitFilter
該類時對於每個服務的每個方法的最大可並行呼叫數量限制的過濾器,它是在服務消費者側的過濾。
@Override
public Result invoke(Invoker<?> invoker,Invocation invocation) throws RpcException {
// 獲得url物件
URL url = invoker.getUrl();
// 獲得方法名稱
String methodName = invocation.getMethodName();
// 獲得併發呼叫數(單個服務的單個方法),預設為0
int max = invoker.getUrl().getMethodParameter(methodName,Constants.ACTIVES_KEY,0);
// 通過方法名來獲得對應的狀態
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(),invocation.getMethodName());
if (max > 0) {
// 獲得該方法呼叫的超時次數
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(),Constants.TIMEOUT_KEY,0);
// 獲得系統時間
long start = System.currentTimeMillis();
long remain = timeout;
// 獲得該方法的呼叫數量
int active = count.getActive();
// 如果活躍數量大於等於最大的併發呼叫數量
if (active >= max) {
synchronized (count) {
// 當活躍數量大於等於最大的併發呼叫數量時一直迴圈
while ((active = count.getActive()) >= max) {
try {
// 等待超時時間
count.wait(remain);
} catch (InterruptedException e) {
}
// 獲得累計時間
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
// 如果累計時間大於超時時間,則丟擲異常
if (remain <= 0) {
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ",method: "
+ invocation.getMethodName() + ",elapsed: " + elapsed
+ ",timeout: " + timeout + ". concurrent invokes: " + active
+ ". max concurrent invoke limit: " + max);
}
}
}
}
}
try {
// 獲得系統時間作為開始時間
long begin = System.currentTimeMillis();
// 開始計數
RpcStatus.beginCount(url,methodName);
try {
// 呼叫後面的呼叫鏈,如果沒有丟擲異常,則算成功
Result result = invoker.invoke(invocation);
// 結束計數,記錄時間
RpcStatus.endCount(url,methodName,System.currentTimeMillis() - begin,true);
return result;
} catch (RuntimeException t) {
RpcStatus.endCount(url,false);
throw t;
}
} finally {
if (max > 0) {
synchronized (count) {
// 喚醒count
count.notify();
}
}
}
}
複製程式碼
該類只有這一個方法。該過濾器是用來限制呼叫數量,先進行呼叫數量的檢測,如果沒有到達最大的呼叫數量,則先呼叫後面的呼叫鏈,如果在後面的呼叫鏈失敗,則記錄相關時間,如果成功也記錄相關時間和呼叫次數。
(三)ClassLoaderFilter
該過濾器是做類載入器切換的。
@Override
public Result invoke(Invoker<?> invoker,Invocation invocation) throws RpcException {
// 獲得當前的類載入器
ClassLoader ocl = Thread.currentThread().getContextClassLoader();
// 設定invoker攜帶的服務的類載入器
Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
try {
// 呼叫下面的呼叫鏈
return invoker.invoke(invocation);
} finally {
// 最後切換回原來的類載入器
Thread.currentThread().setContextClassLoader(ocl);
}
}
複製程式碼
可以看到先切換成當前的執行緒鎖攜帶的類載入器,然後呼叫結束後,再切換回原先的類載入器。
(四)CompatibleFilter
該過濾器是做相容性的過濾器。
@Override
public Result invoke(Invoker<?> invoker,Invocation invocation) throws RpcException {
// 呼叫下一個呼叫鏈
Result result = invoker.invoke(invocation);
// 如果方法前面沒有$或者結果沒有異常
if (!invocation.getMethodName().startsWith("$") && !result.hasException()) {
Object value = result.getValue();
if (value != null) {
try {
// 獲得方法
Method method = invoker.getInterface().getMethod(invocation.getMethodName(),invocation.getParameterTypes());
// 獲得返回的資料型別
Class<?> type = method.getReturnType();
Object newValue;
// 序列化方法
String serialization = invoker.getUrl().getParameter(Constants.SERIALIZATION_KEY);
// 如果是json或者fastjson形式
if ("json".equals(serialization)
|| "fastjson".equals(serialization)) {
// 獲得方法的泛型返回值型別
Type gtype = method.getGenericReturnType();
// 把資料結果進行型別轉化
newValue = PojoUtils.realize(value,type,gtype);
// 如果value不是type型別
} else if (!type.isInstance(value)) {
// 如果是pojo,則,轉化為type型別,如果不是,則進行相容型別轉化。
newValue = PojoUtils.isPojo(type)
? PojoUtils.realize(value,type)
: CompatibleTypeUtils.compatibleTypeConvert(value,type);
} else {
newValue = value;
}
// 重新設定RpcResult的result
if (newValue != value) {
result = new RpcResult(newValue);
}
} catch (Throwable t) {
logger.warn(t.getMessage(),t);
}
}
}
return result;
}
複製程式碼
可以看到對於呼叫鏈的返回結果,如果返回值型別和返回值不一樣的時候,就需要做相容型別的轉化。重新把結果放入RpcResult,返回。
(五)ConsumerContextFilter
該過濾器做的是在當前的RpcContext中記錄本地呼叫的一次狀態資訊。
@Override
public Result invoke(Invoker<?> invoker,Invocation invocation) throws RpcException {
// 設定rpc上下文
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(),0)
.setRemoteAddress(invoker.getUrl().getHost(),invoker.getUrl().getPort());
// 如果該會話域是rpc會話域
if (invocation instanceof RpcInvocation) {
// 設定實體域
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
// 呼叫下個呼叫鏈
RpcResult result = (RpcResult) invoker.invoke(invocation);
// 設定附加值
RpcContext.getServerContext().setAttachments(result.getAttachments());
return result;
} finally {
// 情況附加值
RpcContext.getContext().clearAttachments();
}
}
複製程式碼
可以看到RpcContext記錄了一次呼叫狀態資訊,然後先呼叫後面的呼叫鏈,再回來把附加值設定到RpcContext中。然後返回RpcContext,再清空,這樣是因為後面的呼叫鏈中的附加值對前面的呼叫鏈是不可見的。
(六)ContextFilter
該過濾器做的是初始化rpc上下文。
@Override
public Result invoke(Invoker<?> invoker,Invocation invocation) throws RpcException {
// 獲得會話域的附加值
Map<String,String> attachments = invocation.getAttachments();
// 刪除非同步屬性以避免傳遞給以下呼叫鏈
if (attachments != null) {
attachments = new HashMap<String,String>(attachments);
attachments.remove(Constants.PATH_KEY);
attachments.remove(Constants.GROUP_KEY);
attachments.remove(Constants.VERSION_KEY);
attachments.remove(Constants.DUBBO_VERSION_KEY);
attachments.remove(Constants.TOKEN_KEY);
attachments.remove(Constants.TIMEOUT_KEY);
attachments.remove(Constants.ASYNC_KEY);// Remove async property to avoid being passed to the following invoke chain.
}
// 在rpc上下文新增上一個呼叫鏈的資訊
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
// .setAttachments(attachments) // merged from dubbox
.setLocalAddress(invoker.getUrl().getHost(),invoker.getUrl().getPort());
// mreged from dubbox
// we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
if (attachments != null) {
// 把會話域中的附加值全部加入RpcContext中
if (RpcContext.getContext().getAttachments() != null) {
RpcContext.getContext().getAttachments().putAll(attachments);
} else {
RpcContext.getContext().setAttachments(attachments);
}
}
// 如果會話域屬於rpc的會話域,則設定實體域
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
// 呼叫下一個呼叫鏈
RpcResult result = (RpcResult) invoker.invoke(invocation);
// pass attachments to result 把附加值加入到RpcResult
result.addAttachments(RpcContext.getServerContext().getAttachments());
return result;
} finally {
// 移除本地的上下文
RpcContext.removeContext();
// 清空附加值
RpcContext.getServerContext().clearAttachments();
}
}
複製程式碼
在《 dubbo原始碼解析(十九)遠端呼叫——開篇》中我已經介紹了RpcContext的作用,角色。該過濾器就是做了初始化RpcContext的作用。
(七)DeprecatedFilter
該過濾器的作用是呼叫了廢棄的方法時列印錯誤日誌。
private static final Logger LOGGER = LoggerFactory.getLogger(DeprecatedFilter.class);
/**
* 日誌集合
*/
private static final Set<String> logged = new ConcurrentHashSet<String>();
@Override
public Result invoke(Invoker<?> invoker,Invocation invocation) throws RpcException {
// 獲得key 服務+方法
String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
// 如果集合中沒有該key
if (!logged.contains(key)) {
// 則加入集合
logged.add(key);
// 如果該服務方法是廢棄的,則列印錯誤日誌
if (invoker.getUrl().getMethodParameter(invocation.getMethodName(),Constants.DEPRECATED_KEY,false)) {
LOGGER.error("The service method " + invoker.getInterface().getName() + "." + getMethodSignature(invocation) + " is DEPRECATED! Declare from " + invoker.getUrl());
}
}
// 呼叫下一個呼叫鏈
return invoker.invoke(invocation);
}
/**
* 獲得方法定義
* @param invocation
* @return
*/
private String getMethodSignature(Invocation invocation) {
// 方法名
StringBuilder buf = new StringBuilder(invocation.getMethodName());
buf.append("(");
// 引數型別
Class<?>[] types = invocation.getParameterTypes();
// 拼接引數
if (types != null && types.length > 0) {
boolean first = true;
for (Class<?> type : types) {
if (first) {
first = false;
} else {
buf.append(",");
}
buf.append(type.getSimpleName());
}
}
buf.append(")");
return buf.toString();
}
複製程式碼
該過濾器比較簡單。
(八)EchoFilter
該過濾器是處理回聲測試的方法。
@Override
public Result invoke(Invoker<?> invoker,Invocation inv) throws RpcException {
// 如果呼叫的方法是回聲測試的方法 則直接返回結果,否則 呼叫下一個呼叫鏈
if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1)
return new RpcResult(inv.getArguments()[0]);
return invoker.invoke(inv);
}
複製程式碼
如果呼叫的方法是回聲測試的方法 則直接返回結果,否則 呼叫下一個呼叫鏈。
(九)ExceptionFilter
該過濾器是作用是對異常的處理。
private final Logger logger;
public ExceptionFilter() {
this(LoggerFactory.getLogger(ExceptionFilter.class));
}
public ExceptionFilter(Logger logger) {
this.logger = logger;
}
@Override
public Result invoke(Invoker<?> invoker,Invocation invocation) throws RpcException {
try {
// 呼叫下一個呼叫鏈,返回結果
Result result = invoker.invoke(invocation);
// 如果結果有異常,並且該服務不是一個泛化呼叫
if (result.hasException() && GenericService.class != invoker.getInterface()) {
try {
// 獲得異常
Throwable exception = result.getException();
// directly throw if it's checked exception
// 如果這是一個checked的異常,則直接返回異常,也就是介面上宣告的Unchecked的異常
if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
return result;
}
// directly throw if the exception appears in the signature
// 如果已經在介面方法上宣告瞭該異常,則直接返回
try {
// 獲得方法
Method method = invoker.getInterface().getMethod(invocation.getMethodName(),invocation.getParameterTypes());
// 獲得異常型別
Class<?>[] exceptionClassses = method.getExceptionTypes();
for (Class<?> exceptionClass : exceptionClassses) {
if (exception.getClass().equals(exceptionClass)) {
return result;
}
}
} catch (NoSuchMethodException e) {
return result;
}
// for the exception not found in method's signature,print ERROR message in server's log.
// 列印錯誤 該異常沒有在方法上申明
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ",method: " + invocation.getMethodName()
+ ",exception: " + exception.getClass().getName() + ": " + exception.getMessage(),exception);
// directly throw if exception class and interface class are in the same jar file.
// 如果異常類和介面類在同一個jar包裡面,則丟擲異常
String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
return result;
}
// directly throw if it's JDK exception
// 如果是jdk中定義的異常,則直接丟擲
String className = exception.getClass().getName();
if (className.startsWith("java.") || className.startsWith("javax.")) {
return result;
}
// directly throw if it's dubbo exception
// 如果 是dubbo的異常,則直接丟擲
if (exception instanceof RpcException) {
return result;
}
// otherwise,wrap with RuntimeException and throw back to the client
// 如果不是以上的異常,則包裝成為RuntimeException並且丟擲
return new RpcResult(new RuntimeException(StringUtils.toString(exception)));
} catch (Throwable e) {
logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ",exception: " + e.getClass().getName() + ": " + e.getMessage(),e);
return result;
}
}
return result;
} catch (RuntimeException e) {
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ",method: " + invocation.getMethodName()
+ ",e);
throw e;
}
}
複製程式碼
可以看到除了介面上宣告的Unchecked的異常和有定義的異常外,都會包裝成RuntimeException來返回,為了防止客戶端反序列化失敗。
(十)ExecuteLimitFilter
該過濾器是限制最大可並行執行請求數,該過濾器是服務提供者側,而上述講到的ActiveLimitFilter是在消費者側的限制。
@Override
public Result invoke(Invoker<?> invoker,Invocation invocation) throws RpcException {
// 獲得url物件
URL url = invoker.getUrl();
// 方法名稱
String methodName = invocation.getMethodName();
Semaphore executesLimit = null;
boolean acquireResult = false;
int max = url.getMethodParameter(methodName,Constants.EXECUTES_KEY,0);
// 如果該方法設定了executes並且值大於0
if (max > 0) {
// 獲得該方法對應的RpcStatus
RpcStatus count = RpcStatus.getStatus(url,invocation.getMethodName());
// if (count.getActive() >= max) {
/**
* http://manzhizhen.iteye.com/blog/2386408
* use semaphore for concurrency control (to limit thread number)
*/
// 獲得訊號量
executesLimit = count.getSemaphore(max);
// 如果不能獲得許可,則丟擲異常
if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ",cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
}
}
long begin = System.currentTimeMillis();
boolean isSuccess = true;
// 計數加1
RpcStatus.beginCount(url,methodName);
try {
// 呼叫下一個呼叫鏈
Result result = invoker.invoke(invocation);
return result;
} catch (Throwable t) {
isSuccess = false;
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter",t);
}
} finally {
// 計數減1
RpcStatus.endCount(url,isSuccess);
if(acquireResult) {
executesLimit.release();
}
}
}
複製程式碼
為什麼這裡需要用到訊號量來控制,可以看一下以下連結的介紹:manzhizhen.iteye.com/blog/238640…
(十一)GenericFilter
該過濾器就是對於泛化呼叫的請求和結果進行反序列化和序列化的操作,它是服務提供者側的。
@Override
public Result invoke(Invoker<?> invoker,Invocation inv) throws RpcException {
// 如果是泛化呼叫
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !invoker.getInterface().equals(GenericService.class)) {
// 獲得請求名字
String name = ((String) inv.getArguments()[0]).trim();
// 獲得請求引數型別
String[] types = (String[]) inv.getArguments()[1];
// 獲得請求引數
Object[] args = (Object[]) inv.getArguments()[2];
try {
// 獲得方法
Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(),name,types);
// 獲得該方法的引數型別
Class<?>[] params = method.getParameterTypes();
if (args == null) {
args = new Object[params.length];
}
// 獲得附加值
String generic = inv.getAttachment(Constants.GENERIC_KEY);
// 如果附加值為空,在用上下文攜帶的附加值
if (StringUtils.isBlank(generic)) {
generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
}
// 如果附加值還是為空或者是預設的泛化序列化型別
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {
// 直接進行型別轉化
args = PojoUtils.realize(args,params,method.getGenericParameterTypes());
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (int i = 0; i < args.length; i++) {
if (byte[].class == args[i].getClass()) {
try {
UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i]);
// 使用nativejava方式反序列化
args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.deserialize(null,is).readObject();
} catch (Exception e) {
throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.",e);
}
} else {
throw new RpcException(
"Generic serialization [" +
Constants.GENERIC_SERIALIZATION_NATIVE_JAVA +
"] only support message type " +
byte[].class +
" and your message type is " +
args[i].getClass());
}
}
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (int i = 0; i < args.length; i++) {
if (args[i] instanceof JavaBeanDescriptor) {
// 用JavaBean方式反序列化
args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
} else {
throw new RpcException(
"Generic serialization [" +
Constants.GENERIC_SERIALIZATION_BEAN +
"] only support message type " +
JavaBeanDescriptor.class.getName() +
" and your message type is " +
args[i].getClass().getName());
}
}
}
// 呼叫下一個呼叫鏈
Result result = invoker.invoke(new RpcInvocation(method,args,inv.getAttachments()));
if (result.hasException()
&& !(result.getException() instanceof GenericException)) {
return new RpcResult(new GenericException(result.getException()));
}
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
try {
UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
// 用nativejava方式序列化
ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.serialize(null,os).writeObject(result.getValue());
return new RpcResult(os.toByteArray());
} catch (IOException e) {
throw new RpcException("Serialize result failed.",e);
}
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
// 使用JavaBean方式序列化返回結果
return new RpcResult(JavaBeanSerializeUtil.serialize(result.getValue(),JavaBeanAccessor.METHOD));
} else {
// 直接轉化為pojo型別然後返回
return new RpcResult(PojoUtils.generalize(result.getValue()));
}
} catch (NoSuchMethodException e) {
throw new RpcException(e.getMessage(),e);
} catch (ClassNotFoundException e) {
throw new RpcException(e.getMessage(),e);
}
}
// 呼叫下一個呼叫鏈
return invoker.invoke(inv);
}
複製程式碼
(十二)GenericImplFilter
該過濾器也是對於泛化呼叫的序列化檢查和處理,它是消費者側的過濾器。
private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class);
/**
* 引數集合
*/
private static final Class<?>[] GENERIC_PARAMETER_TYPES = new Class<?>[]{String.class,String[].class,Object[].class};
@Override
public Result invoke(Invoker<?> invoker,Invocation invocation) throws RpcException {
// 獲得泛化的值
String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY);
// 如果該值是nativejava或者bean或者true,並且不是一個返回呼叫
if (ProtocolUtils.isGeneric(generic)
&& !Constants.$INVOKE.equals(invocation.getMethodName())
&& invocation instanceof RpcInvocation) {
RpcInvocation invocation2 = (RpcInvocation) invocation;
// 獲得方法名稱
String methodName = invocation2.getMethodName();
// 獲得引數型別集合
Class<?>[] parameterTypes = invocation2.getParameterTypes();
// 獲得引數集合
Object[] arguments = invocation2.getArguments();
// 把引數型別的名稱放入集合
String[] types = new String[parameterTypes.length];
for (int i = 0; i < parameterTypes.length; i++) {
types[i] = ReflectUtils.getName(parameterTypes[i]);
}
Object[] args;
// 對引數集合進行序列化
if (ProtocolUtils.isBeanGenericSerialization(generic)) {
args = new Object[arguments.length];
for (int i = 0; i < arguments.length; i++) {
args[i] = JavaBeanSerializeUtil.serialize(arguments[i],JavaBeanAccessor.METHOD);
}
} else {
args = PojoUtils.generalize(arguments);
}
// 重新把序列化的引數放入
invocation2.setMethodName(Constants.$INVOKE);
invocation2.setParameterTypes(GENERIC_PARAMETER_TYPES);
invocation2.setArguments(new Object[]{methodName,types,args});
// 呼叫下一個呼叫鏈
Result result = invoker.invoke(invocation2);
if (!result.hasException()) {
Object value = result.getValue();
try {
Method method = invoker.getInterface().getMethod(methodName,parameterTypes);
if (ProtocolUtils.isBeanGenericSerialization(generic)) {
if (value == null) {
return new RpcResult(value);
} else if (value instanceof JavaBeanDescriptor) {
// 用javabean方式反序列化
return new RpcResult(JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) value));
} else {
throw new RpcException(
"The type of result value is " +
value.getClass().getName() +
" other than " +
JavaBeanDescriptor.class.getName() +
",and the result is " +
value);
}
} else {
// 直接轉化為pojo型別
return new RpcResult(PojoUtils.realize(value,method.getReturnType(),method.getGenericReturnType()));
}
} catch (NoSuchMethodException e) {
throw new RpcException(e.getMessage(),e);
}
// 如果呼叫鏈中有異常丟擲,並且是GenericException型別的異常
} else if (result.getException() instanceof GenericException) {
GenericException exception = (GenericException) result.getException();
try {
// 獲得異常類名
String className = exception.getExceptionClass();
Class<?> clazz = ReflectUtils.forName(className);
Throwable targetException = null;
Throwable lastException = null;
try {
targetException = (Throwable) clazz.newInstance();
} catch (Throwable e) {
lastException = e;
for (Constructor<?> constructor : clazz.getConstructors()) {
try {
targetException = (Throwable) constructor.newInstance(new Object[constructor.getParameterTypes().length]);
break;
} catch (Throwable e1) {
lastException = e1;
}
}
}
if (targetException != null) {
try {
Field field = Throwable.class.getDeclaredField("detailMessage");
if (!field.isAccessible()) {
field.setAccessible(true);
}
field.set(targetException,exception.getExceptionMessage());
} catch (Throwable e) {
logger.warn(e.getMessage(),e);
}
result = new RpcResult(targetException);
} else if (lastException != null) {
throw lastException;
}
} catch (Throwable e) {
throw new RpcException("Can not deserialize exception " + exception.getExceptionClass() + ",message: " + exception.getExceptionMessage(),e);
}
}
return result;
}
// 如果是泛化呼叫
if (invocation.getMethodName().equals(Constants.$INVOKE)
&& invocation.getArguments() != null
&& invocation.getArguments().length == 3
&& ProtocolUtils.isGeneric(generic)) {
Object[] args = (Object[]) invocation.getArguments()[2];
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (Object arg : args) {
// 如果呼叫訊息不是位元組陣列型別,則丟擲異常
if (!(byte[].class == arg.getClass())) {
error(generic,byte[].class.getName(),arg.getClass().getName());
}
}
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (Object arg : args) {
if (!(arg instanceof JavaBeanDescriptor)) {
error(generic,JavaBeanDescriptor.class.getName(),arg.getClass().getName());
}
}
}
// 設定附加值
((RpcInvocation) invocation).setAttachment(
Constants.GENERIC_KEY,invoker.getUrl().getParameter(Constants.GENERIC_KEY));
}
return invoker.invoke(invocation);
}
/**
* 丟擲錯誤異常
* @param generic
* @param expected
* @param actual
* @throws RpcException
*/
private void error(String generic,String expected,String actual) throws RpcException {
throw new RpcException(
"Generic serialization [" +
generic +
"] only support message type " +
expected +
" and your message type is " +
actual);
}
複製程式碼
(十三)TimeoutFilter
該過濾器是當服務呼叫超時的時候,記錄告警日誌。
@Override
public Result invoke(Invoker<?> invoker,Invocation invocation) throws RpcException {
// 獲得開始時間
long start = System.currentTimeMillis();
// 呼叫下一個呼叫鏈
Result result = invoker.invoke(invocation);
// 獲得呼叫使用的時間
long elapsed = System.currentTimeMillis() - start;
// 如果服務呼叫超時,則列印告警日誌
if (invoker.getUrl() != null
&& elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(),"timeout",Integer.MAX_VALUE)) {
if (logger.isWarnEnabled()) {
logger.warn("invoke time out. method: " + invocation.getMethodName()
+ " arguments: " + Arrays.toString(invocation.getArguments()) + ",url is "
+ invoker.getUrl() + ",invoke elapsed " + elapsed + " ms.");
}
}
return result;
}
複製程式碼
(十四)TokenFilter
該過濾器提供了token的驗證功能,關於token的介紹可以檢視官方檔案。
@Override
public Result invoke(Invoker<?> invoker,Invocation inv)
throws RpcException {
// 獲得token值
String token = invoker.getUrl().getParameter(Constants.TOKEN_KEY);
if (ConfigUtils.isNotEmpty(token)) {
// 獲得服務型別
Class<?> serviceType = invoker.getInterface();
// 獲得附加值
Map<String,String> attachments = inv.getAttachments();
String remoteToken = attachments == null ? null : attachments.get(Constants.TOKEN_KEY);
// 如果令牌不一樣,則丟擲異常
if (!token.equals(remoteToken)) {
throw new RpcException("Invalid token! Forbid invoke remote service " + serviceType + " method " + inv.getMethodName() + "() from consumer " + RpcContext.getContext().getRemoteHost() + " to provider " + RpcContext.getContext().getLocalHost());
}
}
// 呼叫下一個呼叫鏈
return invoker.invoke(inv);
}
複製程式碼
(十五)TpsLimitFilter
該過濾器的作用是對TPS限流。
/**
* TPS 限制器物件
*/
private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
@Override
public Result invoke(Invoker<?> invoker,Invocation invocation) throws RpcException {
// 如果限流器不允許,則丟擲異常
if (!tpsLimiter.isAllowable(invoker.getUrl(),invocation)) {
throw new RpcException(
"Failed to invoke service " +
invoker.getInterface().getName() +
"." +
invocation.getMethodName() +
" because exceed max service tps.");
}
// 呼叫下一個呼叫鏈
return invoker.invoke(invocation);
}
複製程式碼
其中關鍵是TPS 限制器物件,請看下面的分析。
(十六)TPSLimiter
public interface TPSLimiter {
/**
* judge if the current invocation is allowed by TPS rule
* 是否允許通過
* @param url url
* @param invocation invocation
* @return true allow the current invocation,otherwise,return false
*/
boolean isAllowable(URL url,Invocation invocation);
}
複製程式碼
該介面是tps限流器的介面,只定義了一個是否允許通過的方法。
(十七)StatItem
該類是統計的資料結構。
class StatItem {
/**
* 服務名
*/
private String name;
/**
* 最後一次重置的時間
*/
private long lastResetTime;
/**
* 週期
*/
private long interval;
/**
* 剩餘多少流量
*/
private AtomicInteger token;
/**
* 限制大小
*/
private int rate;
StatItem(String name,int rate,long interval) {
this.name = name;
this.rate = rate;
this.interval = interval;
this.lastResetTime = System.currentTimeMillis();
this.token = new AtomicInteger(rate);
}
public boolean isAllowable() {
long now = System.currentTimeMillis();
// 如果限制的時間大於最後一次時間加上週期,則重置
if (now > lastResetTime + interval) {
token.set(rate);
lastResetTime = now;
}
int value = token.get();
boolean flag = false;
// 直到有流量
while (value > 0 && !flag) {
flag = token.compareAndSet(value,value - 1);
value = token.get();
}
// 返回flag
return flag;
}
long getLastResetTime() {
return lastResetTime;
}
int getToken() {
return token.get();
}
@Override
public String toString() {
return new StringBuilder(32).append("StatItem ")
.append("[name=").append(name).append(",")
.append("rate = ").append(rate).append(",")
.append("interval = ").append(interval).append("]")
.toString();
}
}
複製程式碼
可以看到該類中記錄了一些訪問的流量,並且設定了週期重置機制。
(十八)DefaultTPSLimiter
該類實現了TPSLimiter,是預設的tps限流器實現。
public class DefaultTPSLimiter implements TPSLimiter {
/**
* 統計項集合
*/
private final ConcurrentMap<String,StatItem> stats
= new ConcurrentHashMap<String,StatItem>();
@Override
public boolean isAllowable(URL url,Invocation invocation) {
// 獲得tps限制大小,預設-1,不限制
int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY,-1);
// 獲得限流週期
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,Constants.DEFAULT_TPS_LIMIT_INTERVAL);
String serviceKey = url.getServiceKey();
// 如果限制
if (rate > 0) {
// 從集合中獲得統計項
StatItem statItem = stats.get(serviceKey);
// 如果為空,則新建
if (statItem == null) {
stats.putIfAbsent(serviceKey,new StatItem(serviceKey,rate,interval));
statItem = stats.get(serviceKey);
}
// 返回是否允許
return statItem.isAllowable();
} else {
StatItem statItem = stats.get(serviceKey);
if (statItem != null) {
// 移除該服務的統計項
stats.remove(serviceKey);
}
}
return true;
}
}
複製程式碼
是否允許的邏輯還是呼叫了統計項中的isAllowable方法。
本文介紹了很多的過濾器,哪些過濾器是在服務引用的,哪些伺服器是服務暴露的,可以檢視相應原始碼過濾器的實現上的註解,
例如ActiveLimitFilter上:
@Activate(group = Constants.CONSUMER,value = Constants.ACTIVES_KEY)
複製程式碼
可以看到group為consumer組的,也就是服務消費者側的,則是服務引用過程中的的過濾器。
例如ExecuteLimitFilter上:
@Activate(group = Constants.PROVIDER,value = Constants.EXECUTES_KEY)
複製程式碼
可以看到group為provider組的,也就是服務消費者側的,則是服務暴露過程中的的過濾器。
後記
該部分相關的原始碼解析地址:github.com/CrazyHZM/in…
該文章講解了在服務引用和服務暴露中的各種filter過濾器。接下來我將開始對rpc模組的監聽器進行講解。