1. 程式人生 > 程式設計 >Spring Cloud Gateway重試機制原理解析

Spring Cloud Gateway重試機制原理解析

重試,我相信大家並不陌生。在我們呼叫Http介面的時候,總會因為某種原因呼叫失敗,這個時候我們可以通過重試的方式,來重新請求介面。

生活中這樣的事例很多,比如打電話,對方正在通話中啊,訊號不好啊等等原因,你總會打不通,當你第一次沒打通之後,你會打第二次,第三次…第四次就通了。

重試也要注意應用場景,讀資料的介面比較適合重試的場景,寫資料的介面就需要注意介面的冪等性了。還有就是重試次數如果太多的話會導致請求量加倍,給後端造成更大的壓力,設定合理的重試機制才是最關鍵的。
今天我們來簡單的瞭解下Spring Cloud Gateway中的重試機制和使用。

使用講解

RetryGatewayFilter是Spring Cloud Gateway對請求重試提供的一個GatewayFilter Factory。

配置方式:

spring:
 cloud:
  gateway:
   routes:
   - id: fsh-house
    uri: lb://fsh-house
    predicates:
    - Path=/house/**
    filters:
    - name: Retry
     args:
      retries: 3
      series:
      - SERVER_ERROR
      statuses:
      - OK
      methods:
      - GET
      - POST
      exceptions:
      - java.io.IOException

配置講解

配置類原始碼:org.springframework.cloud.gateway.filter.factory.RetryGatewayFilterFactory.RetryConfig:

public static class RetryConfig {
  private int retries = 3;

  private List<Series> series = toList(Series.SERVER_ERROR);

  private List<HttpStatus> statuses = new ArrayList<>();

  private List<HttpMethod> methods = toList(HttpMethod.GET);

  private List<Class<? extends Throwable>> exceptions = toList(IOException.class);

  // .....
}

retries:重試次數,預設值是3次

series:狀態碼配置(分段),符合的某段狀態碼才會進行重試邏輯,預設值是SERVER_ERROR,值是5,也就是5XX(5開頭的狀態碼),共有5個值:

public enum Series {
  INFORMATIONAL(1),SUCCESSFUL(2),REDIRECTION(3),CLIENT_ERROR(4),SERVER_ERROR(5);
}

statuses:狀態碼配置,和series不同的是這邊是具體狀態碼的配置,取值請參考:org.springframework.http.HttpStatus

methods:指定哪些方法的請求需要進行重試邏輯,預設值是GET方法,取值如下:

public enum HttpMethod {
  GET,HEAD,POST,PUT,PATCH,DELETE,OPTIONS,TRACE;
}

exceptions:指定哪些異常需要進行重試邏輯,預設值是java.io.IOException

程式碼測試

就寫個介面,在介面中記錄請求次數,然後丟擲一個異常模擬500,通過閘道器訪問這個介面,如果你配置了重試次數是3,那麼介面中會輸出4次結果才是對的,證明重試生效了。

AtomicInteger ac = new AtomicInteger();

@GetMapping("/data")
public HouseInfo getData(@RequestParam("name") String name) {
  if (StringUtils.isBlank(name)) {
    throw new RuntimeException("error");
  }
  System.err.println(ac.addAndGet(1));
  return new HouseInfo(1L,"上海","虹口","XX小區");
}

原始碼欣賞

@Override
public GatewayFilter apply(RetryConfig retryConfig) {
  // 驗證重試配置格式是否正確
  retryConfig.validate();

  Repeat<ServerWebExchange> statusCodeRepeat = null;
  if (!retryConfig.getStatuses().isEmpty() || !retryConfig.getSeries().isEmpty()) {
    Predicate<RepeatContext<ServerWebExchange>> repeatPredicate = context -> {
      ServerWebExchange exchange = context.applicationContext();
      // 判斷重試次數是否已經達到了配置的最大值
      if (exceedsMaxIterations(exchange,retryConfig)) {
        return false;
      }
      // 獲取響應的狀態碼
      HttpStatus statusCode = exchange.getResponse().getStatusCode();
      // 獲取請求方法型別
      HttpMethod httpMethod = exchange.getRequest().getMethod();
      // 判斷響應狀態碼是否在配置中存在
      boolean retryableStatusCode = retryConfig.getStatuses().contains(statusCode);

      if (!retryableStatusCode && statusCode != null) { // null status code might mean a network exception?
        // try the series
        retryableStatusCode = retryConfig.getSeries().stream()
            .anyMatch(series -> statusCode.series().equals(series));
      }
      // 判斷方法是否包含在配置中
      boolean retryableMethod = retryConfig.getMethods().contains(httpMethod);
      // 決定是否要進行重試
      return retryableMethod && retryableStatusCode;
    };

    statusCodeRepeat = Repeat.onlyIf(repeatPredicate)
        .doOnRepeat(context -> reset(context.applicationContext()));
  }

  //TODO: support timeout,backoff,jitter,etc... in Builder

  Retry<ServerWebExchange> exceptionRetry = null;
  if (!retryConfig.getExceptions().isEmpty()) {
    Predicate<RetryContext<ServerWebExchange>> retryContextPredicate = context -> {
      if (exceedsMaxIterations(context.applicationContext(),retryConfig)) {
        return false;
      }
      // 異常判斷
      for (Class<? extends Throwable> clazz : retryConfig.getExceptions()) {       
        if (clazz.isInstance(context.exception())) {
          return true;
        }
      }
      return false;
    };
    // 使用reactor extra的retry元件
    exceptionRetry = Retry.onlyIf(retryContextPredicate)
        .doOnRetry(context -> reset(context.applicationContext()))
        .retryMax(retryConfig.getRetries());
   }

   return apply(statusCodeRepeat,exceptionRetry);
}

public boolean exceedsMaxIterations(ServerWebExchange exchange,RetryConfig retryConfig) {
  Integer iteration = exchange.getAttribute(RETRY_ITERATION_KEY);

  //TODO: deal with null iteration
  return iteration != null && iteration >= retryConfig.getRetries();
}

public void reset(ServerWebExchange exchange) {
  //TODO: what else to do to reset SWE?
  exchange.getAttributes().remove(ServerWebExchangeUtils.GATEWAY_ALREADY_ROUTED_ATTR);
}

public GatewayFilter apply(Repeat<ServerWebExchange> repeat,Retry<ServerWebExchange> retry) {
  return (exchange,chain) -> {
    if (log.isTraceEnabled()) {
      log.trace("Entering retry-filter");
    }

    // chain.filter returns a Mono<Void>
    Publisher<Void> publisher = chain.filter(exchange)
        //.log("retry-filter",Level.INFO)
        .doOnSuccessOrError((aVoid,throwable) -> {
          // 獲取已經重試的次數,預設值為-1
          int iteration = exchange.getAttributeOrDefault(RETRY_ITERATION_KEY,-1);
          // 增加重試次數
          exchange.getAttributes().put(RETRY_ITERATION_KEY,iteration + 1);
        });

    if (retry != null) {
      // retryWhen returns a Mono<Void>
      // retry needs to go before repeat
      publisher = ((Mono<Void>)publisher).retryWhen(retry.withApplicationContext(exchange));
    }
    if (repeat != null) {
      // repeatWhen returns a Flux<Void>
      // so this needs to be last and the variable a Publisher<Void>
      publisher = ((Mono<Void>)publisher).repeatWhen(repeat.withApplicationContext(exchange));
    }

    return Mono.fromDirect(publisher);
  };
}

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。