1. 程式人生 > >redis cluster模式key的模糊刪除-java操作

redis cluster模式key的模糊刪除-java操作

不管是redis單節點還是redis cluster模式都是不支援模糊刪除的。當然你也可以自己去寫shell指令碼去刪除。這裡我自己想的一種方式,採用多執行緒操作叢集下redis單節點,countdownlatch統計彙總刪除。

上程式碼:

執行緒池:

public class ThreadSinleton {
	private final static Logger LOGGER = LoggerFactory
			.getLogger(ThreadSinleton.class);

	private static class SingletonHolder {
		static ExecutorService es = null;
		static {
			es = Executors.newFixedThreadPool(Runtime.getRuntime()
					.availableProcessors());
			LOGGER.error("執行緒池初始化完成!!!!");

		}
		private static final ThreadSinleton sinleton = new ThreadSinleton();
	}

	private ThreadSinleton() {
	}

	public static final ThreadSinleton getSinleton() {
		return SingletonHolder.sinleton;
	}

	public static final ExecutorService getExecutorService() {
		return SingletonHolder.es;
	}
}

執行呼叫:
public long getKeys(String key,boolean isDel) throws Exception {
		Long startTime = System.currentTimeMillis();
		long resSum=0;
		List<Jedis> jedisList = new ArrayList<Jedis>();// jedis 操作集合
		for (HostAndPort hostAndport : jedisCluster.getHaps()) {
			jedisList.add(new Jedis(hostAndport.getHost(), hostAndport
					.getPort()));
		}
		int size =jedisList.size();
		
		if (null != jedisList && size > 0) {
			ScanParams  params = new ScanParams();
			params.match(key+"*");/**模糊匹配**/
			//首選計算主node數量
			List<Jedis> masterList = new  ArrayList<Jedis>();
			for (int i = 0; i < size; i++) {
				Jedis jedis = null;
				jedis = jedisList.get(i);
				if (JedisUtils.isMaster(jedis)) {
					masterList.add(jedis);
				}else{
					/**關掉slave連線 **/
					jedis.close();
				}
			}
			int masterSize =masterList.size();
			LOGGER.info("jedis操作例項建立完畢,master數量:" +masterSize);
			
			if(null!=masterList&&masterSize>0){
				CountDownLatch countDownLatch = new CountDownLatch(masterSize);
				@SuppressWarnings("rawtypes")
				Future[] future_Arr = new Future[masterSize] ;
				ExecutorService es =ThreadSinleton.getExecutorService();
				for (int j =0; j < masterSize; j++) {
					Jedis jedis = null;
					Pipeline pipeline=null;
					jedis = masterList.get(j);
					future_Arr[j] = es.submit(new CacheBodyThread(
							"子執行緒"+j, jedis, pipeline, isDel, params, countDownLatch));
				}
				try {
//					LOGGER.info("*******主執行緒正在彙總************");
					countDownLatch.await();
					try {
						if(null!=future_Arr&&future_Arr.length>0){
							for (int i = 0; i < future_Arr.length; i++) {
								resSum+=(long)future_Arr[i].get();
							}
						}
						
					} catch (InterruptedException e) {
						e.printStackTrace();
						LOGGER.error(e.getMessage());
					} catch (ExecutionException e) {
						e.printStackTrace();
						LOGGER.error(e.getMessage());
					}     
					//es.shutdown(); 執行緒池不需關閉
					LOGGER.info("*******執行緒池關閉,主執行緒正在彙總完畢==========,"+resSum);
				} catch (InterruptedException e) {
					e.printStackTrace();
					LOGGER.error(e.getMessage());
				}
			}
		}
		Long endTime = System.currentTimeMillis();
		LOGGER.error((true==isDel?"清理":"統計")+"快取,[執行模糊查詢所有鍵]end,待處理集合資料長度:"+resSum+",using time is<耗時>:" + (endTime - startTime)+"ms");
		return resSum;
	}

執行緒(因為這裡我們不僅僅要刪除,還要知道刪除的量,所有用callable):
public class CacheBodyThread implements Callable<Long> {
	private final static Logger LOGGER = LoggerFactory
			.getLogger(CacheBodyThread.class);

	String threadName;
	Jedis jedis;
	Pipeline pipeline;
	boolean isDel;// isDel 清除 TRUE 統計FALSE
	ScanParams params;
	CountDownLatch countDownLatch;

	public CacheBodyThread(String threadName, Jedis jedis, Pipeline pipeline,
			boolean isDel, ScanParams params, CountDownLatch countDownLatch) {
		super();
		this.threadName = threadName;
		this.jedis = jedis;
		this.pipeline = pipeline;
		this.isDel = isDel;
		this.params = params;
		this.countDownLatch = countDownLatch;
	}

	@SuppressWarnings("deprecation")
	@Override
	public Long call() throws Exception {
		long sum =0;
		LOGGER.info("【" + threadName + "】執行緒,統計快取thread正在執行中========");

//		List<String> keys = null;
//		keys = new ArrayList<String>(); // 待處理key集合

		String scanRet = "0";// 起始遊標
		ScanResult<String> scanResult = null;
		
		do {
			scanResult = jedis.scan(scanRet, params);
			scanRet = scanResult.getStringCursor();
			sum += scanResult.getResult().size();
			if (isDel) { 
				pipeline = jedis.pipelined();// 批量操作
				for (String key_i : scanResult.getResult()) {
					pipeline.del(key_i);
				}
				pipeline.sync();// 執行
			}
		} 
		while (0 != scanResult.getCursor());/** 未來版本此方法會刪除 **/
		if (null != pipeline) {
			try {
				pipeline.close();
				LOGGER.info("pipeline管道連線已關閉===============");
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		if (null != jedis) {
			jedis.close();// 關閉連線
			LOGGER.info("jedis連線已關閉===============");
		}
		countDownLatch.countDown();
		LOGGER.info("【" + threadName + "】執行緒執行完畢========");
		return sum;
	}

}

thread 配合countdownlatch 有點fork/join的意思,只是這裡的fork,我們直接定位到單node上了,也可以直接使用jdk1.7之後的forkjointask配合forkjoinpool處理,原理差不多。