1. 程式人生 > >API 限流器(三) 在Spring Cloud 微服務體系中整合RedisRateLimiter

API 限流器(三) 在Spring Cloud 微服務體系中整合RedisRateLimiter

  這篇是API限流器這個系列的終章,就是講述如何在Spring Cloud 微服務開發中應用我發明的先進限流器。

開篇明義,基本思路如下:

1. 定義一個annotation - RedisLimiter

2. 在RestController 中有URL Mapping 的方法上應用RedisLimiter註解

3. 定義一個攔截器 - RateLimitCheckInterceptor,繼承org.springframework.web.servlet.HandlerInterceptor

4. 定義一個WebMvcConfigurerAdapter,把上面的RateLimitCheckInterceptor啟用,攔截所有的URL請求

5. 定義一個RedisRateLimiterFactory,這個工廠對RedisLimiter物件做了一些快取。

專案的整個程式碼結構如圖所示:


基本的呼叫時序圖如下:


下面我將就一些重要的類和配置逐一講解:

1. 首先是application.yml

這個檔案是Spring Cloud 微服務的重點配置檔案,內容為:

server:
  port: 8080                           #微服務埠
spring:
  application:
    name: ratelimitersample            #微服務名稱
  jpa:                                 #JPA的配置資訊
    generate-ddl: false
    show-sql: true
    hibernate:
      ddl-auto: none
  datasource:                           # 指定資料來源
    platform: h2                        # 指定資料來源型別
    schema: classpath:schema.sql        # 指定h2資料庫的建表指令碼
    data: classpath:data.sql            # 指定h2資料庫的資料指令碼
  redis:                                # Redis配置資訊   
    host: 192.168.0.135
    port: 6379
    password: foobared
    timeout: 2000
    pool:                               #Jedis Pool的配置資訊
      maxIdle: 300
      minIdle: 100
      max-wait: -1
      max-active: 600
logging:                                # 配置日誌級別,讓hibernate打印出執行的SQL
  level:
    root: INFO
    org.hibernate: INFO
    org.hibernate.type.descriptor.sql.BasicBinder: TRACE
    org.hibernate.type.descriptor.sql.BasicExtractor: TRACE                                                        
eureka:                                 #eureka的配置資訊
  client:
    healthcheck:                        #開啟服務健康檢查
      enabled: true
    serviceUrl:
      defaultZone: http://192.168.0.118:8761/eureka/
  instance:                             #服務例項細資訊
    app-group-name: product
    prefer-ip-address: true
    instance-id: ${spring.application.name}:${server.port}
    lease-expiration-duration-in-seconds: 10
    lease-renewal-interval-in-seconds: 5
    

  

2. RateLimiter自定義annotation

package com.tay.ratelimitersample.ratelimiter;

import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;

@Retention(RUNTIME)
@Target({ TYPE, METHOD })
public @interface RateLimiter {

	public enum Base {
		General, //統一控制 
		IP, 	 //按IP限制
		User     //按使用者控制
	};

	Base base();
	
	String path() default "";
	
	TimeUnit timeUnit() default TimeUnit.SECONDS;

	int permits();
}

這個註解有4個屬性,第一個為base,意思是限流的基準是什麼,內建的列舉型別支援三種基準,統一控制、基於IP控制、基於使用者控制。

第二屬性為path, 一般來說這個不用配置,但是如果你的RestController方法對映的URL上帶有URL引數,類似@GetMapping("/testforid/{id}") 這種,這時候這個path配置就很有必要了,本例中,你可以將path配置為path="/testforid"。

第三個屬性為timeUnit, 限流的時間單位,支援TimeUnit.SECONDS,TimeUnit.MINUTES,TimeUnit.HOURS,TimeUnit.DAYS, 即秒,分,小時,天

第四個屬性為permits,意思是單位時間允許訪問的次數限制。

3. RedisRateLimiter, 此類為核心類,演算法原理在此係列文章二中做了詳細講解,本文中略過。

4. RedisRateLimiterFactory, RedisRateLimiter的工廠類,主要是負責建立RedisRateLimiter物件,快取RedisRateLimiter物件。

package com.tay.ratelimitersample.ratelimiter;

import java.util.WeakHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import redis.clients.jedis.JedisPool;

@Component
public class RedisRateLimiterFactory {
    
    @Autowired
    private JedisPool jedisPool;
    private final WeakHashMap<String, RedisRateLimiter> limiterMap = new WeakHashMap<String, RedisRateLimiter>();
    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();  
    public JedisPool getJedisPool() {
        return jedisPool;
    }

    public void setJedisPool(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }
    
    public RedisRateLimiter get(String keyPrefix, TimeUnit timeUnit, int permits) {
        RedisRateLimiter redisRateLimiter = null;
        try {
            lock.readLock().lock();
            if(limiterMap.containsKey(keyPrefix)) {
                redisRateLimiter = limiterMap.get(keyPrefix);
            }
        }
        finally {
            lock.readLock().unlock();  
        }
        
        if(redisRateLimiter == null) {
            try {
                lock.writeLock().lock();
                if(limiterMap.containsKey(keyPrefix)) {
                    redisRateLimiter = limiterMap.get(keyPrefix);
                }
                if(redisRateLimiter == null) {
                    redisRateLimiter = new RedisRateLimiter(jedisPool, timeUnit, permits);
                    limiterMap.put(keyPrefix, redisRateLimiter);
                }
            }
            finally {
                lock.writeLock().unlock();
            }
        }
        return redisRateLimiter;
    }
}


注意這裡我是用WeakHashMap作為RedisRateLimiter快取容器的,是為了垃圾收集器能回收長期沒有使用的RedisRateeLimiter物件,防止記憶體洩漏

5. RedisConfig, Redis配置資訊讀取類,並在其中聲明瞭一個RedisRateLimiterFactory Bean.

package com.tay.ratelimitersample.ratelimiter;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

@Configuration
public class RedisConfig {

	@Value("${spring.redis.host}")
	private String host;

	@Value("${spring.redis.port}")
	private int port;

	@Value("${spring.redis.password}")
	private String password;

	@Value("${spring.redis.timeout}")
	private int timeout;

	@Value("${spring.redis.pool.maxIdle}")
	private int maxIdle;

	@Value("${spring.redis.pool.minIdle}")
	private int minIdle;

	@Value("${spring.redis.pool.max-wait}")
	private long maxWaitMillis;

	@Value("${spring.redis.pool.max-active}")
	private int maxActive;

	@Bean
	public JedisPool redisPoolFactory() {
		JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
		jedisPoolConfig.setMaxIdle(maxIdle);
		jedisPoolConfig.setMinIdle(minIdle);
		jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
		jedisPoolConfig.setMaxTotal(maxActive);
		JedisPool jedisPool = new JedisPool(jedisPoolConfig, host, port, timeout, password);
		return jedisPool;
	}
}

6. RateLimitCheckInterceptor類,此類為Spring cloud整合的關鍵類。

package com.tay.ratelimitersample.ratelimiter;

import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.ModelAndView;

@Component
public class RateLimitCheckInterceptor implements HandlerInterceptor {
	
	private static final String[] IP_HEADER_KYES = { 
			"X-Forwarded-For", 
			"X-Real-IP", 
			"Proxy-Client-IP",
			"WL-Proxy-Client-IP", 
			"HTTP_CLIENT_IP", 
			"HTTP_X_FORWARDED_FOR" };
	
	private static final String USER_TOKEN_KEY = "UserToken";
	
	@Autowired
	private RedisRateLimiterFactory redisRateLimiterFactory;
	
	@Override
	public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
			throws Exception {
		if (!(handler instanceof HandlerMethod)) {
			return true;
		}
		boolean isSuccess = true;
		HandlerMethod handlerMethod = (HandlerMethod) handler;
		Method method = handlerMethod.getMethod();
		if (method.isAnnotationPresent(RateLimiter.class)) {
			RateLimiter rateLimiterAnnotation = method.getAnnotation(RateLimiter.class);
			int permits = rateLimiterAnnotation.permits();
			TimeUnit timeUnit = rateLimiterAnnotation.timeUnit();
			String path = rateLimiterAnnotation.path();
			if ("".equals(path)) {
				path = request.getRequestURI();
			}

			if (rateLimiterAnnotation.base() == RateLimiter.Base.General) {
				String rateLimiterKey = path;
				RedisRateLimiter redisRatelimiter = redisRateLimiterFactory.get(path, timeUnit,
						permits);
				isSuccess = rateCheck(redisRatelimiter, rateLimiterKey, response);
			} else if (rateLimiterAnnotation.base() == RateLimiter.Base.IP) {
				String ip = getIP(request);
				if (ip != null) {
					String rateLimiterKey = path + ":" + ip;
					RedisRateLimiter redisRatelimiter = redisRateLimiterFactory.get(rateLimiterKey, timeUnit,
							permits);
					isSuccess =	rateCheck(redisRatelimiter, rateLimiterKey, response);
				}
			} else if (rateLimiterAnnotation.base() == RateLimiter.Base.User) {
				String userToken = getUserToken(request);
				if (userToken != null) {
					String rateLimiterKey = path + ":" + userToken;
					RedisRateLimiter redisRatelimiter = redisRateLimiterFactory.get(rateLimiterKey, timeUnit,
							permits);
					isSuccess =rateCheck(redisRatelimiter, rateLimiterKey, response);
				}
			}
			
		}
		return isSuccess;
	}

	
	
	private boolean rateCheck(RedisRateLimiter redisRatelimiter, String keyPrefix, HttpServletResponse response)
			throws Exception {
		if (!redisRatelimiter.acquire(keyPrefix)) {
			response.setStatus(HttpStatus.FORBIDDEN.value());
			response.getWriter().print("Access denied because of exceeding access rate");
			return false;
		}
		return true;
	}
	private String getIP(HttpServletRequest request) {
		for (String ipHeaderKey : IP_HEADER_KYES) {
			String ip = request.getHeader(ipHeaderKey);
			if (ip != null && ip.length() != 0 && (!"unknown".equalsIgnoreCase(ip))) {
				return ip;
			}
			else {
				return request.getRemoteHost();
			}
		}
		return null;
	}
	
	private String getUserToken(HttpServletRequest request) {
		String userToken = request.getHeader(USER_TOKEN_KEY);
		return userToken;
	}
	
	@Override
	public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
			ModelAndView modelAndView) throws Exception {
	}

	@Override
	public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)
			throws Exception {
	}
	
}

此類尤為關鍵,當我們api限流是基於IP或使用者的話,那麼我們有一些約定:

基於IP限流的話,則http request的中header裡面必須帶有ip資訊,ip的header支援

            "X-Forwarded-For",

            "X-Real-IP",
            "Proxy-Client-IP",
            "WL-Proxy-Client-IP",
            "HTTP_CLIENT_IP",
            "HTTP_X_FORWARDED_FOR"

特別注意,現在流行會把Nginx作為分發伺服器,則Nginx forward正式客戶端請求到下游微服務時,務必要把客戶的真實IP塞入到header中往下傳遞,需要在Nginx server配置中加入

proxy_set_header    X-Real-IP        $remote_addr;
基於使用者限流的話,本案約定使用者token的header名稱為:UserToken

7. WebMvcConfigurer, 此類的作用即將攔截器RateLimitCheckInterceptor對所有的API訪問生效

package com.tay.ratelimitersample.ratelimiter;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;

@Configuration
public class WebMvcConfigurer extends WebMvcConfigurerAdapter {
	@Autowired
	private RateLimitCheckInterceptor rateLimitCheckInterceptor;
	@Override
	public void addInterceptors(InterceptorRegistry registry) {
		registry.addInterceptor(rateLimitCheckInterceptor).addPathPatterns("/**");
		super.addInterceptors(registry);
	}
}

地址patterns指定為"/**",即對所有請求生效。

8. RestController

package com.tay.ratelimitersample.controller;

import java.util.concurrent.TimeUnit;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.tay.ratelimitersample.entity.User;
import com.tay.ratelimitersample.ratelimiter.RateLimiter;
import com.tay.ratelimitersample.repository.UserRepository;

@RestController
@RequestMapping("/api")
public class UserController {
  @Autowired
  private UserRepository userRepository;

  @GetMapping("/{id}")
  public User findById(@PathVariable Long id, @RequestHeader HttpHeaders headers) {
	  
    User findOne = this.userRepository.findOne(id);
  
    return findOne;
  }
  
  @RateLimiter(base = RateLimiter.Base.General, permits = 2, timeUnit = TimeUnit.MINUTES)
  @GetMapping("/test")
  public String test() {
	  return "test!";
  }
  
  @RateLimiter(base = RateLimiter.Base.IP, path="/testforid", permits = 4, timeUnit = TimeUnit.MINUTES)
  @GetMapping("/testforid/{id}")
  public String testforid(@PathVariable Long id) {
	  return "test! " + id;
  }

}

其中,/api/test限流是對所有請求限定為1分鐘內僅限2次。 對於/api/testforid/{id}, 是針對於每個請求ip,限定為每分鐘4次,注意,這裡跟上面一個配置有點小區別,就是指定了path, 因為這裡的url中帶了PathVariable, 但實際上我們限流時並不關心具體的PathVariable是什麼,所以要指定path為/testforid, 就是所有請求都會忽略{id}, 都歸結到/testforid請求。

8. 執行環境, 本例執行是需要redis服務,和一個Eureka服務。如果你不想啟動Eureka亦可,把ProviderUserApplication類的註解@EnableDiscoveryClient去掉,並把application.yml中的關於Eureka的配置去掉:

eureka:
  client:
    healthcheck:
      enabled: true
    serviceUrl:
      defaultZone: http://192.168.0.118:8761/eureka/
  instance:
    app-group-name: product
    prefer-ip-address: true
    instance-id: ${spring.application.name}:${server.port}
    lease-expiration-duration-in-seconds: 10
    lease-renewal-interval-in-seconds: 5

9.測試

測試工具firefox上的外掛RESTED Client和Redis Desktop Manager

a. 通過RESTED client 訪問  http://192.168.0.118:8080/api/testforid/1,訪問成功,則:


連續點選訪問,達到訪問頻率限制,則:


觀察redis中資料: