redis cluster模式key的模糊刪除-java操作
阿新 • • 發佈:2018-12-25
不管是redis單節點還是redis cluster模式都是不支援模糊刪除的。當然你也可以自己去寫shell指令碼去刪除。這裡我自己想的一種方式,採用多執行緒操作叢集下redis單節點,countdownlatch統計彙總刪除。
上程式碼:
執行緒池:
public class ThreadSinleton { private final static Logger LOGGER = LoggerFactory .getLogger(ThreadSinleton.class); private static class SingletonHolder { static ExecutorService es = null; static { es = Executors.newFixedThreadPool(Runtime.getRuntime() .availableProcessors()); LOGGER.error("執行緒池初始化完成!!!!"); } private static final ThreadSinleton sinleton = new ThreadSinleton(); } private ThreadSinleton() { } public static final ThreadSinleton getSinleton() { return SingletonHolder.sinleton; } public static final ExecutorService getExecutorService() { return SingletonHolder.es; } }
執行呼叫:
public long getKeys(String key,boolean isDel) throws Exception { Long startTime = System.currentTimeMillis(); long resSum=0; List<Jedis> jedisList = new ArrayList<Jedis>();// jedis 操作集合 for (HostAndPort hostAndport : jedisCluster.getHaps()) { jedisList.add(new Jedis(hostAndport.getHost(), hostAndport .getPort())); } int size =jedisList.size(); if (null != jedisList && size > 0) { ScanParams params = new ScanParams(); params.match(key+"*");/**模糊匹配**/ //首選計算主node數量 List<Jedis> masterList = new ArrayList<Jedis>(); for (int i = 0; i < size; i++) { Jedis jedis = null; jedis = jedisList.get(i); if (JedisUtils.isMaster(jedis)) { masterList.add(jedis); }else{ /**關掉slave連線 **/ jedis.close(); } } int masterSize =masterList.size(); LOGGER.info("jedis操作例項建立完畢,master數量:" +masterSize); if(null!=masterList&&masterSize>0){ CountDownLatch countDownLatch = new CountDownLatch(masterSize); @SuppressWarnings("rawtypes") Future[] future_Arr = new Future[masterSize] ; ExecutorService es =ThreadSinleton.getExecutorService(); for (int j =0; j < masterSize; j++) { Jedis jedis = null; Pipeline pipeline=null; jedis = masterList.get(j); future_Arr[j] = es.submit(new CacheBodyThread( "子執行緒"+j, jedis, pipeline, isDel, params, countDownLatch)); } try { // LOGGER.info("*******主執行緒正在彙總************"); countDownLatch.await(); try { if(null!=future_Arr&&future_Arr.length>0){ for (int i = 0; i < future_Arr.length; i++) { resSum+=(long)future_Arr[i].get(); } } } catch (InterruptedException e) { e.printStackTrace(); LOGGER.error(e.getMessage()); } catch (ExecutionException e) { e.printStackTrace(); LOGGER.error(e.getMessage()); } //es.shutdown(); 執行緒池不需關閉 LOGGER.info("*******執行緒池關閉,主執行緒正在彙總完畢==========,"+resSum); } catch (InterruptedException e) { e.printStackTrace(); LOGGER.error(e.getMessage()); } } } Long endTime = System.currentTimeMillis(); LOGGER.error((true==isDel?"清理":"統計")+"快取,[執行模糊查詢所有鍵]end,待處理集合資料長度:"+resSum+",using time is<耗時>:" + (endTime - startTime)+"ms"); return resSum; }
執行緒(因為這裡我們不僅僅要刪除,還要知道刪除的量,所有用callable):
public class CacheBodyThread implements Callable<Long> { private final static Logger LOGGER = LoggerFactory .getLogger(CacheBodyThread.class); String threadName; Jedis jedis; Pipeline pipeline; boolean isDel;// isDel 清除 TRUE 統計FALSE ScanParams params; CountDownLatch countDownLatch; public CacheBodyThread(String threadName, Jedis jedis, Pipeline pipeline, boolean isDel, ScanParams params, CountDownLatch countDownLatch) { super(); this.threadName = threadName; this.jedis = jedis; this.pipeline = pipeline; this.isDel = isDel; this.params = params; this.countDownLatch = countDownLatch; } @SuppressWarnings("deprecation") @Override public Long call() throws Exception { long sum =0; LOGGER.info("【" + threadName + "】執行緒,統計快取thread正在執行中========"); // List<String> keys = null; // keys = new ArrayList<String>(); // 待處理key集合 String scanRet = "0";// 起始遊標 ScanResult<String> scanResult = null; do { scanResult = jedis.scan(scanRet, params); scanRet = scanResult.getStringCursor(); sum += scanResult.getResult().size(); if (isDel) { pipeline = jedis.pipelined();// 批量操作 for (String key_i : scanResult.getResult()) { pipeline.del(key_i); } pipeline.sync();// 執行 } } while (0 != scanResult.getCursor());/** 未來版本此方法會刪除 **/ if (null != pipeline) { try { pipeline.close(); LOGGER.info("pipeline管道連線已關閉==============="); } catch (IOException e) { e.printStackTrace(); } } if (null != jedis) { jedis.close();// 關閉連線 LOGGER.info("jedis連線已關閉==============="); } countDownLatch.countDown(); LOGGER.info("【" + threadName + "】執行緒執行完畢========"); return sum; } }
thread 配合countdownlatch 有點fork/join的意思,只是這裡的fork,我們直接定位到單node上了,也可以直接使用jdk1.7之後的forkjointask配合forkjoinpool處理,原理差不多。