【redis】spring boot利用redis的Keyspace Notifications實現消息通知
前言
需求:當redis中的某個key失效的時候,把失效時的value寫入數據庫。
github: https://github.com/vergilyn/RedisSamples
1、修改redis.conf
安裝的redis服務默認是: notify-keyspace-events "",修改成 notify-keyspace-events Ex;
位置:redis安裝目下的redis.windows-service.conf 或 redis.windows.conf。(具體看redis服務加載的哪個配置, 貌似要redis 2.8+才支持)
可以在redis.conf中找到對應的描述
# K 鍵空間通知,以__keyspace@<db>__為前綴 # E 鍵事件通知,以__keysevent@<db>__為前綴 # g del , expipre , rename 等類型無關的通用命令的通知, ... # $ String命令 # l List命令 # s Set命令 # h Hash命令 # z 有序集合命令 # x 過期事件(每次key過期時生成) # e 驅逐事件(當key在內存滿了被清除時生成) # A g$lshzxe的別名,因此”AKE”意味著所有的事件
2、通過JedisPubSub實現
省略spring boot配置,完整代碼見github。
/** * key過期事件推送到topic中只有key,無value,因為一旦過期,value就不存在了。 */ @Component public class JedisExpiredListener extends JedisPubSub { /** 參考redis目錄下redis.conf中的"EVENT NOTIFICATION", redis默認的db{0, 15}一共16個數據庫 * K Keyspace events, published with __keyspace@<db>__ prefix. * E Keyevent events, published with __keyevent@<db>__ prefix. * */public final static String LISTENER_PATTERN = "__keyevent@*__:expired"; /*** 雖然能註入,但貌似在listener-class中jedis無法使用(無法建立連接到redis),exception message:* "only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context" */ @Autowired private Jedis jedis; /** * 初始化按表達式的方式訂閱時候的處理 */ @Override public void onPSubscribe(String pattern, int subscribedChannels) { System.out.print("onPSubscribe >> "); System.out.println(String.format("pattern: %s, subscribedChannels: %d", pattern, subscribedChannels)); } /** * 取得按表達式的方式訂閱的消息後的處理 */ @Override public void onPMessage(String pattern, String channel, String message) { System.out.print("onPMessage >> "); System.out.println(String.format("key: %s, pattern: %s, channel: %s", message, pattern, channel)); } /** * 取得訂閱的消息後的處理 */ @Override public void onMessage(String channel, String message) { super.onMessage(channel, message); } /** * 初始化訂閱時候的處理 */ @Override public void onSubscribe(String channel, int subscribedChannels) { super.onSubscribe(channel, subscribedChannels); } /** * 取消訂閱時候的處理 */ @Override public void onUnsubscribe(String channel, int subscribedChannels) { super.onUnsubscribe(channel, subscribedChannels); } /** * 取消按表達式的方式訂閱時候的處理 */ @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { super.onPUnsubscribe(pattern, subscribedChannels); } }
@RunWith(SpringRunner.class) @SpringBootTest(classes=JedisExpiredApplication.class) public class JedisExpiredApplicationTest { @Autowired private Jedis jedis; @Autowired private JedisExpiredListener expiredListener; @Before public void before() throws Exception { jedis.flushAll(); jedis.set(JedisConfig.DEFAULE_KEY,"123321"); System.out.println(JedisConfig.DEFAULE_KEY + " = " + jedis.get(JedisConfig.DEFAULE_KEY)); System.out.println("set expired 5s"); jedis.expire(JedisConfig.DEFAULE_KEY,5); } @Test public void testPSubscribe(){ /* psubscribe是一個阻塞的方法,在取消訂閱該頻道前,會一直阻塞在這,只有當取消了訂閱才會執行下面的other code * 可以onMessage/onPMessage裏面收到消息後,調用了unsubscribe()/onPUnsubscribe(); 來取消訂閱,這樣才會執行後面的other code */ jedis.psubscribe(expiredListener,JedisExpiredListener.LISTENER_PATTERN); // other code } }
輸出結果:
vkey = 123321
set expired 5s
onPSubscribe >> pattern: __keyevent@*__:expired, subscribedChannels: 1
onPMessage >> key: vkey, pattern: __keyevent@*__:expired, channel: [email protected]__:expired
3、通過實現添加MessageListener
省略spring boot的redis配置。
@SpringBootApplication public class RedisExpiredApplication implements CommandLineRunner{ @Autowired private RedisTemplate redisTemplate; @Autowired private RedisExpiredListener expiredListener; /** * 解決redisTemplate的key/value亂碼問題: * <br/> <a href="http://www.zhimengzhe.com/shujuku/other/192111.html">http://www.zhimengzhe.com/shujuku/other/192111.html</a> * <br/> <a href="http://blog.csdn.net/tianyaleixiaowu/article/details/70595073">http://blog.csdn.net/tianyaleixiaowu/article/details/70595073</a> * @return */ @Bean("redis") @Primary public RedisTemplate redisTemplate(){ RedisSerializer<String> stringSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringSerializer); redisTemplate.setValueSerializer(stringSerializer); redisTemplate.setHashKeySerializer(stringSerializer); redisTemplate.setHashValueSerializer(stringSerializer); return redisTemplate; } @Bean public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnection, Executor executor){ RedisMessageListenerContainer container = new RedisMessageListenerContainer(); // 設置Redis的連接工廠 container.setConnectionFactory(redisConnection); // 設置監聽使用的線程池 // container.setTaskExecutor(executor); // 設置監聽的Topic: PatternTopic/ChannelTopic Topic topic = new PatternTopic(RedisExpiredListener.LISTENER_PATTERN); // 設置監聽器 container.addMessageListener(new RedisExpiredListener(), topic); return container; } @Bean public Executor executor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(100); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("V-Thread"); // rejection-policy:當pool已經達到max size的時候,如何處理新任務 // CALLER_RUNS:不在新線程中執行任務,而是由調用者所在的線程來執行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } public static void main(String[] args) { SpringApplication.run(RedisExpiredApplication.class,args); } @Override public void run(String... strings) throws Exception { redisTemplate.opsForValue().set("vkey", "vergilyn", 5, TimeUnit.SECONDS); System.out.println("init : set vkey vergilyn ex 5"); System.out.println("thread sleep: 10s"); Thread.sleep(10 * 1000); System.out.println("thread recover: get vkey = " + redisTemplate.opsForValue().get("vkey")); } }
@Component public class RedisExpiredListener implements MessageListener { public final static String LISTENER_PATTERN = "__key*__:*"; /** * 客戶端監聽訂閱的topic,當有消息的時候,會觸發該方法; * 並不能得到value, 只能得到key。 * 姑且理解為: redis服務在key失效時(或失效後)通知到java服務某個key失效了, 那麽在java中不可能得到這個redis-key對應的redis-value。 *
* 解決方案: * 創建copy/shadow key, 例如 set vkey "vergilyn"; 對應copykey: set copykey:vkey "" ex 10; * 真正的key是"vkey"(業務中使用), 失效觸發key是"copykey:vkey"(其value為空字符為了減少內存空間消耗)。 * 當"copykey:vkey"觸發失效時, 從"vkey"得到失效時的值, 並在邏輯處理完後"del vkey" * * 缺陷: * 1: 存在多余的key; (copykey/shadowkey) * 2: 不嚴謹, 假設copykey在 12:00:00失效, 通知在12:10:00收到, 這間隔的10min內程序修改了key, 得到的並不是 失效時的value. * (第1點影響不大; 第2點貌似redis本身的Pub/Sub就不是嚴謹的, 失效後還存在value的修改, 應該在設計/邏輯上杜絕) * 當"copykey:vkey"觸發失效時, 從"vkey"得到失效時的值, 並在邏輯處理完後"del vkey" * */ @Override public void onMessage(Message message, byte[] bytes) { byte[] body = message.getBody();// 建議使用: valueSerializer byte[] channel = message.getChannel(); System.out.print("onMessage >> " ); System.out.println(String.format("channel: %s, body: %s, bytes: %s" ,new String(channel), new String(body), new String(bytes))); } }
輸出結果:
init : set vkey vergilyn ex 5
thread sleep: 10s
onMessage >> channel: [email protected]__:expired, body: vkey, bytes: __key*__:*
thread recover: get vkey = null
4、問題
1) 不管是JedisPubSub,還是MessageListener都不可能得到value。
個人理解:在12:00:00,java推送給redis一條命令”set vkey vergilyn ex 10”。此時redis服務已經完整的知道了這個key的失效時間,在12:00:10時redis服務把”vkey”失效。
然後通知到java(即回調到JedisPubSub/MessageListener),此時不可能在java中通過”vkey”得到其value。
(最簡單的測試,在Listener中打斷點,然後通過redis-cli.exe命令查看,“vkey”已經不存在了,但Listener才進入到message()方法)
2) redis的expire不是嚴格的即時執行
摘自 http://redisdoc.com/topic/notification.html
Redis 使用以下兩種方式刪除過期的鍵:
- 當一個鍵被訪問時,程序會對這個鍵進行檢查,如果鍵已經過期,那麽該鍵將被刪除。
- 底層系統會在後臺漸進地查找並刪除那些過期的鍵,從而處理那些已經過期、但是不會被訪問到的鍵。
當過期鍵被以上兩個程序的任意一個發現、 並且將鍵從數據庫中刪除時, Redis 會產生一個
expired
通知。Redis 並不保證生存時間(TTL)變為
0
的鍵會立即被刪除: 如果程序沒有訪問這個過期鍵, 或者帶有生存時間的鍵非常多的話, 那麽在鍵的生存時間變為0
, 直到鍵真正被刪除這中間, 可能會有一段比較顯著的時間間隔。因此, Redis 產生
expired
通知的時間為過期鍵被刪除的時候, 而不是鍵的生存時間變為0
的時候。如果業務無法容忍從過期到刪除中間的時間間隔,那麽就只有用其他的方式了。
3) 如何在expire回調中得到expire key的value
參考:https://stackoverflow.com/questions/26406303/redis-key-expire-notification-with-jedis
set vkey somevalue set shadowkey:vkey "" ex 10
相當於每個key都有對應的一個shadowkey,”shadowkey”只是用來設置expire時間,”key”才保存value及參與業務邏輯。
所以當”shadowkey”失效通知到listener時,程序中可以通過”key”得到其value,並在邏輯處理完時”del key”。
(“shadowkey”的value為null或空字符串,目的是為了節約內存空間。)
缺陷:
1. 多余了很多無效的 shadowkey;
2. 數據不嚴謹。假設copykey在 12:00:00失效, 通知在12:10:00收到, 這間隔的10min內程序修改了key, 得到的並不是 失效時的value.
相對來說,第1點無關緊要,只是暫時多了一些輔助用的key,但會被程序自己清理掉,不用再去維護,或一直存在於redis緩存中。
第2點,更多的是設計邏輯有缺陷,可以把失效時間定的更長,保證在”那個間隔”內不可能出現失效key的修改。
4) 特別
摘自 http://blog.csdn.net/gqtcgq/article/details/50808729
Redis的發布/訂閱目前是即發即棄(fire and forget)模式的,因此無法實現事件的可靠通知。也就是說,如果發布/訂閱的客戶端斷鏈之後又重連,則在客戶端斷鏈期間的所有事件都丟失了。
未來計劃支持事件的可靠通知,但是這可能會通過讓訂閱與發布功能本身變得更可靠來實現,也可能會在Lua腳本中對消息的訂閱與發布進行監聽,從而實現類似將事件推入到列表這樣的操作。
參考:
redis設置鍵的生存時間或過期時間
Redis Key expire notification with Jedis
(以下的文章都講的差不多)
JAVA實現redis超時失效key 的監聽觸發
spring boot-使用redis的Keyspace Notifications實現定時任務隊列
redis 超時失效key 的監聽觸發
Redis鍵空間通知(keyspace notifications)
【redis】spring boot利用redis的Keyspace Notifications實現消息通知