1. 程式人生 > >redis叢集批量匯入

redis叢集批量匯入

把一個長key縮短

package com.org.util;

import java.security.MessageDigest;
import java.util.Scanner;

import org.apache.commons.lang.StringUtils;

public class StringUtil {
	/**
	 * 1個key下面掛480個field,80億資料 len=4,64*64*64*64=1677萬key
	 * 具體怎麼控制見方法體
	 */
	private static final int LEN = 4;
	private static final char CHARS[] = {
			'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j',
			'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
			'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D',
			'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N',
			'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
			'Y', 'Z', '0', '1', '2', '3', '4', '5', '6', '7',
			'8', '9', '$', '_'};
	
	/**
	 * 根據field獲取key 80億 len=4
	 * @param field
	 * @param bit 2的byte次方=key數量
	 * @return
	 */
	public static String getBucketId(String field){
		byte[] md;
		try {
			MessageDigest mdInst = MessageDigest.getInstance("MD5");
			md = mdInst.digest(field.getBytes());
		} catch (Exception e) {
			if(StringUtils.isBlank(field)||field.length()<LEN){
				return "@@@@";
			}else{
				md = field.getBytes();
			}
		}
		
		StringBuffer sb = new StringBuffer();
		
		//前4位取所有值
		byte b;
		for(int i=0; i<LEN; i++){
			b = md[i];
			if(b<0){
				b&=127;
			}
			sb.append(CHARS[b>>1]);
		}
				
		return sb.toString();
	}
	
	/**
	 * 通過field獲取key
	 * @param args 
	 */
	public static void main(String[] args) {
		Scanner scan = new Scanner(System.in);
		//通過field獲取key
		System.out.println("通過field獲取key");
		System.out.println("請輸入field");
		String field = scan.next();
		scan.close();
		System.out.println(getBucketId(field));
	}
}

匯入檔案類

package com.cpic.ffsw;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Scanner;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import com.org.util.DateUtil;
import com.org.util.StringUtil;
import com.org.util.Util;

import redis.clients.jedis.BinaryJedisCluster;
import redis.clients.jedis.Client;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisClusterConnectionHandler;
import redis.clients.jedis.JedisClusterInfoCache;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
import redis.clients.jedis.PipelineBase;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.util.JedisClusterCRC16;
import redis.clients.util.SafeEncoder;

/**
 * 指定檔案匯入
 * @author 1
 *
 */
public class JedisClusterPipeline extends PipelineBase implements Closeable {
	
	//常量定義
	private static final int CONNECTION_TIMEOUT = 10000;//連線超時時間
	private static final int SO_TIMEOUT = 10000;//返回值的超時時間
	private static final int MAX_ATTEMPTS = 5;//出現異常最大重試次數
	
	// 部分欄位沒有對應的獲取方法,只能採用反射來做
	// 你也可以去繼承JedisCluster和JedisSlotBasedConnectionHandler來提供訪問介面
	private static final Field FIELD_CONNECTION_HANDLER;
	private static final Field FIELD_CACHE;
	static {
		FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class,
				"connectionHandler");
		FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
	}

	private JedisSlotBasedConnectionHandler connectionHandler;
	private JedisClusterInfoCache clusterInfoCache;
	private Queue<Client> clients = new LinkedList<Client>(); // 根據順序儲存每個命令對應的Client
	private Map<JedisPool, Map<Long,Jedis>> jedisMap = new HashMap<>(); // 用於快取連線
	private boolean hasDataInBuf = false; // 是否有資料在快取區

	/**
	 * 根據jedisCluster例項生成對應的JedisClusterPipeline
	 * 
	 * @param
	 * @return
	 */
	public JedisClusterPipeline(JedisCluster jedisCluster){
		setJedisCluster(jedisCluster);
	}

	private void setJedisCluster(JedisCluster jedis) {
		connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);
		clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);
	}

	/**
	 * 重新整理叢集資訊,當叢集資訊發生變更時呼叫
	 * 
	 * @param
	 * @return
	 */
	private void refreshCluster() {
		connectionHandler.renewSlotCache();
	}

	/**
	 * 同步讀取所有資料. 與syncAndReturnAll()相比,sync()只是沒有對資料做反序列化
	 */
	private void sync() {
		innerSync(null);
	}

	/**
	 * 同步讀取所有資料 並按命令順序返回一個列表
	 * 
	 * @return 按照命令的順序返回所有的資料
	 */
	private List<Object> syncAndReturnAll() {
		List<Object> responseList = new ArrayList<Object>();

		innerSync(responseList);

		return responseList;
	}

	private void innerSync(List<Object> formatted) {
		HashSet<Client> clientSet = new HashSet<Client>();

		try {
			for (Client client : clients) {
				// 在sync()呼叫時其實是不需要解析結果資料的,但是如果不呼叫get方法,發生了JedisMovedDataException這樣的錯誤應用是不知道的,因此需要呼叫get()來觸發錯誤。
				// 其實如果Response的data屬性可以直接獲取,可以省掉解析資料的時間,然而它並沒有提供對應方法,要獲取data屬性就得用反射,不想再反射了,所以就這樣了
				Object data = generateResponse(client.getOne()).get();
				if (null != formatted) {
					formatted.add(data);
				}

				// size相同說明所有的client都已經新增,就不用再呼叫add方法了
				if (clientSet.size() != jedisMap.size()) {
					clientSet.add(client);
				}
			}
		} catch (JedisRedirectionException jre) {
			if (jre instanceof JedisMovedDataException) {
				// if MOVED redirection occurred, rebuilds cluster's slot cache,
				// recommended by Redis cluster specification
				refreshCluster();
			}
			jre.printStackTrace();
			throw jre;
		} finally {
			if (clientSet.size() != jedisMap.size()) {
				// 所有還沒有執行過的client要保證執行(flush),防止放回連線池後後面的命令被汙染
				for(Map.Entry<JedisPool, Map<Long,Jedis>> poolEntry:jedisMap.entrySet()){
					for (Map.Entry<Long, Jedis> jedisEntry : poolEntry.getValue().entrySet()) {
						if (clientSet.contains(jedisEntry.getValue().getClient())) {
							continue;
						}

						flushCachedData(jedisEntry.getValue());
					}
				}
			}

			hasDataInBuf = false;
			close();
		}
	}

	@Override
	public void close() {
		clean();

		clients.clear();

		for (Map.Entry<JedisPool, Map<Long, Jedis>> poolEntry:jedisMap.entrySet()) {
			for(Map.Entry<Long, Jedis> jedisEntry:poolEntry.getValue().entrySet()){
				if (hasDataInBuf) {
					flushCachedData(jedisEntry.getValue());
				}
				jedisEntry.getValue().close();
			}
		}

		jedisMap.clear();

		hasDataInBuf = false;
	}

	private void flushCachedData(Jedis jedis) {
		try {
			jedis.getClient().getAll();
		} catch (RuntimeException ex) {
			ex.printStackTrace();
		}
	}

	@Override
	protected Client getClient(String key) {
		byte[] bKey = SafeEncoder.encode(key);

		return getClient(bKey);
	}

	@Override
	protected Client getClient(byte[] key) {
		Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));

		Client client = jedis.getClient();
		clients.add(client);

		return client;
	}

	private Jedis getJedis(int slot) {
		
		//根據執行緒id從快取中獲取Jedis
		Jedis jedis = null;
		Map<Long,Jedis> tmpMap = null;
		
		//獲取執行緒id
		long id = Thread.currentThread().getId();
		
		//獲取jedispool
		JedisPool pool = clusterInfoCache.getSlotPool(slot);

		if (jedisMap.containsKey(pool)) {
			tmpMap = jedisMap.get(pool);
			if(tmpMap.containsKey(id)){
				jedis = tmpMap.get(id);
			}else{
				jedis = pool.getResource();
				tmpMap.put(id, jedis);
			}
		}else{
			tmpMap = new HashMap<>();
			jedis = pool.getResource();
			tmpMap.put(id, jedis);
			jedisMap.put(pool, tmpMap);
		}

		hasDataInBuf = true;
		return jedis;
	}

	private static Field getField(Class<?> cls, String fieldName) {
		try {
			Field field = cls.getDeclaredField(fieldName);
			field.setAccessible(true);

			return field;
		} catch (NoSuchFieldException | SecurityException e) {
			e.printStackTrace();
			throw new RuntimeException("cannot find or access field '"
					+ fieldName + "' from " + cls.getName(), e);
		}
	}

	@SuppressWarnings({ "unchecked" })
	private static <T> T getValue(Object obj, Field field) {
		try {
			return (T) field.get(obj);
		} catch (IllegalArgumentException | IllegalAccessException e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		}
	}

	public static long write(String host,int port,String pwd,String fileName){
		long s = System.currentTimeMillis();
		File file = new File(fileName);
		String theadName = file.getName();
		System.out.println(theadName+"-->當前時間:"+DateUtil.YYYY_MM_DD_HH_MM_SS.format(new Date()));
		BufferedReader br = null;
		JedisCluster jc = getJedisCluster(host, port, pwd);
		JedisClusterPipeline jcp = new JedisClusterPipeline(jc);
		jcp.refreshCluster();
		List<Object> batchResult = null;
		int totalCount = 0;
		try {
			// batch write
			
			br = new BufferedReader(new FileReader(file));
			int count = 0;
			String current;
			while (null!=(current=br.readLine())) {
				count++;
				totalCount++;
				String[] keyValue = current.split(",",2);
				jcp.hset(StringUtil.getBucketId(keyValue[0]),keyValue[0],keyValue[1]);
				if(count==1500000){
					jcp.sync();
					batchResult = jcp.syncAndReturnAll();
					count = 0;
					System.out.println(theadName+"-->當前時間:"+DateUtil.YYYY_MM_DD_HH_MM_SS.format(new Date())+",totalCount:"+Util.mathFormat(totalCount)+",耗時:"+(System.currentTimeMillis() - s)/60000+"min,batchResult.size():"+batchResult.size());
				}
			}
			if(count>0){
				jcp.sync();
				batchResult = jcp.syncAndReturnAll();
				System.out.println(theadName+"-->當前時間:"+DateUtil.YYYY_MM_DD_HH_MM_SS.format(new Date())+",totalCount:"+Util.mathFormat(totalCount)+",耗時:"+(System.currentTimeMillis() - s)/60000+"min,batchResult.size():"+batchResult.size());
			}

		}catch(Exception e){
			e.printStackTrace();
		} finally {
			try {
				jcp.close();
			} catch (Exception e) {
				e.printStackTrace();
			}
			try {
				jc.close();
			} catch (Exception e) {
				e.printStackTrace();
			}
			try {
				br.close();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

		// output time
		System.out.println(theadName+" over-->當前時間:"+DateUtil.YYYY_MM_DD_HH_MM_SS.format(new Date())+",totalCount:"+Util.mathFormat(totalCount)+",耗時:"+(System.currentTimeMillis() - s)/60000+"min");
		return totalCount;
	}
	
	/**
	 * 通過fields批量獲取values
	 * @param fields
	 * @param host
	 * @param port
	 * @param pwd
	 * @return
	 */
	public static List<String> mhget(List<String> fields,String host,int port,String pwd){
		List<String> results = new ArrayList<String>(); 
		JedisCluster jc = getJedisCluster(host, port, pwd);
		JedisClusterPipeline jcp = new JedisClusterPipeline(jc);
		jcp.refreshCluster();
		try {
			for (String field:fields)     
	        {
				jcp.hget(StringUtil.getBucketId(field),field);
	        }
	        List<Object> list=jcp.syncAndReturnAll();
	        for(Object o:list){
	        	if(o!=null){
	        		results.add(o.toString());
	        	}else{
	        		results.add(null);
	        	}
	        }
		}catch(Exception e){
			e.printStackTrace();
		} finally {
			try {
				jcp.close();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

		return results;

	}
	
	/**
	 * 通過field獲取value
	 * @param field
	 * @param host
	 * @param port
	 * @param pwd
	 * @return
	 */
	public static String hget(String field,String host,int port,String pwd){
		JedisCluster jc = getJedisCluster(host, port, pwd);
		return jc.hget(StringUtil.getBucketId(field),field);
	}
	
	/**
	 * 通過field賦值
	 * @param field
	 * @param value
	 * @param host
	 * @param port
	 * @param pwd
	 * @return
	 */
	public static long hset(String field,String value,String host,int port,String pwd){
		JedisCluster jc = getJedisCluster(host, port, pwd);
		return jc.hset(StringUtil.getBucketId(field),field,value);
	}
	
	/**
	 * 通過field判斷是否存在
	 * @param field
	 * @param value
	 * @param host
	 * @param port
	 * @param pwd
	 * @return
	 */
	public static boolean hexists(String field,String host,int port,String pwd){
		JedisCluster jc = getJedisCluster(host, port, pwd);
		return jc.hexists(StringUtil.getBucketId(field),field);
	}
	
	/**
	 * 通過field刪除值
	 * @param field
	 * @param host
	 * @param port
	 * @param pwd
	 * @return
	 */
	public static long hdel(String field,String host,int port,String pwd){
		JedisCluster jc = getJedisCluster(host, port, pwd);
		return jc.hdel(StringUtil.getBucketId(field),field);
	}
	
	/**
	 * 通過key獲取value
	 * @param key
	 * @param host
	 * @param port
	 * @param pwd
	 * @return
	 */
	public static String get(String key,String host,int port,String pwd){
		JedisCluster jc = getJedisCluster(host, port, pwd);
		return jc.get(key);
	}
	
	/**
	 * 通過key刪除
	 * @param key
	 * @param host
	 * @param port
	 * @param pwd
	 * @return
	 */
	public static long del(String key,String host,int port,String pwd){
		JedisCluster jc = getJedisCluster(host, port, pwd);
		return jc.del(key);
	}
	
	/**
	 * 通過key查詢剩餘有效期
	 * @param key
	 * @param host
	 * @param port
	 * @param pwd
	 * @return
	 */
	public static long ttl(String key,String host,int port,String pwd){
		JedisCluster jc = getJedisCluster(host, port, pwd);
		return jc.ttl(key);
	}
	
	private static JedisCluster getJedisCluster(String host,int port,String pwd){
		HostAndPort node = new HostAndPort(host,port);
		return new JedisCluster(node, CONNECTION_TIMEOUT, SO_TIMEOUT, MAX_ATTEMPTS, pwd, new GenericObjectPoolConfig());
	}
	
	public static void main(String[] args) throws IOException {
		//控制檯輸入檔案地址
		Scanner scanner= new Scanner(System.in);
		System.out.println("請輸入redis主機/埠/密碼認證,例如:10.186.1.1/7380/2wsx3edc");
		String hostPortPwdStr;//redis主機/埠/密碼認證
		String host;//redis主機
		int port;//redis埠
		String pwd;//redis密碼認證
		while(true){
			try {
				hostPortPwdStr = scanner.next();
				String hostPortPwd[] = hostPortPwdStr.split("/");
				if(hostPortPwd.length == 3){
					host = hostPortPwd[0];
					port = Integer.parseInt(hostPortPwd[1]);
					pwd = hostPortPwd[2];
					break;
				}else{
					System.out.println("redis主機/埠/密碼認證格式錯誤,請重新輸入");
				}
			} catch(NumberFormatException e1){
				System.out.println("redis密碼認證錯誤,請重新輸入");
			}catch (Exception e) {
				System.out.println("redis主機/埠/密碼認證錯誤,請重新輸入");
			}
		}
		System.out.println("請輸入檔案地址,例如:/app/redis/pipe/CUSTNO_TO_BRANCHID/0_CUSTNO_TO_BRANCHID.txt");
		String filePath = scanner.next();
		while(!new File(filePath).isFile()){
			System.out.println("檔案地址錯誤,請重新輸入");
			filePath = scanner.next();
		}
		scanner.close();
		write(host, port, pwd, filePath);
	}
}