redis叢集批量匯入
阿新 • • 發佈:2018-12-25
把一個長key縮短
package com.org.util; import java.security.MessageDigest; import java.util.Scanner; import org.apache.commons.lang.StringUtils; public class StringUtil { /** * 1個key下面掛480個field,80億資料 len=4,64*64*64*64=1677萬key * 具體怎麼控制見方法體 */ private static final int LEN = 4; private static final char CHARS[] = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '$', '_'}; /** * 根據field獲取key 80億 len=4 * @param field * @param bit 2的byte次方=key數量 * @return */ public static String getBucketId(String field){ byte[] md; try { MessageDigest mdInst = MessageDigest.getInstance("MD5"); md = mdInst.digest(field.getBytes()); } catch (Exception e) { if(StringUtils.isBlank(field)||field.length()<LEN){ return "@@@@"; }else{ md = field.getBytes(); } } StringBuffer sb = new StringBuffer(); //前4位取所有值 byte b; for(int i=0; i<LEN; i++){ b = md[i]; if(b<0){ b&=127; } sb.append(CHARS[b>>1]); } return sb.toString(); } /** * 通過field獲取key * @param args */ public static void main(String[] args) { Scanner scan = new Scanner(System.in); //通過field獲取key System.out.println("通過field獲取key"); System.out.println("請輸入field"); String field = scan.next(); scan.close(); System.out.println(getBucketId(field)); } }
匯入檔案類
package com.cpic.ffsw; import java.io.BufferedReader; import java.io.Closeable; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Scanner; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import com.org.util.DateUtil; import com.org.util.StringUtil; import com.org.util.Util; import redis.clients.jedis.BinaryJedisCluster; import redis.clients.jedis.Client; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisClusterConnectionHandler; import redis.clients.jedis.JedisClusterInfoCache; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisSlotBasedConnectionHandler; import redis.clients.jedis.PipelineBase; import redis.clients.jedis.exceptions.JedisMovedDataException; import redis.clients.jedis.exceptions.JedisRedirectionException; import redis.clients.util.JedisClusterCRC16; import redis.clients.util.SafeEncoder; /** * 指定檔案匯入 * @author 1 * */ public class JedisClusterPipeline extends PipelineBase implements Closeable { //常量定義 private static final int CONNECTION_TIMEOUT = 10000;//連線超時時間 private static final int SO_TIMEOUT = 10000;//返回值的超時時間 private static final int MAX_ATTEMPTS = 5;//出現異常最大重試次數 // 部分欄位沒有對應的獲取方法,只能採用反射來做 // 你也可以去繼承JedisCluster和JedisSlotBasedConnectionHandler來提供訪問介面 private static final Field FIELD_CONNECTION_HANDLER; private static final Field FIELD_CACHE; static { FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler"); FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache"); } private JedisSlotBasedConnectionHandler connectionHandler; private JedisClusterInfoCache clusterInfoCache; private Queue<Client> clients = new LinkedList<Client>(); // 根據順序儲存每個命令對應的Client private Map<JedisPool, Map<Long,Jedis>> jedisMap = new HashMap<>(); // 用於快取連線 private boolean hasDataInBuf = false; // 是否有資料在快取區 /** * 根據jedisCluster例項生成對應的JedisClusterPipeline * * @param * @return */ public JedisClusterPipeline(JedisCluster jedisCluster){ setJedisCluster(jedisCluster); } private void setJedisCluster(JedisCluster jedis) { connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER); clusterInfoCache = getValue(connectionHandler, FIELD_CACHE); } /** * 重新整理叢集資訊,當叢集資訊發生變更時呼叫 * * @param * @return */ private void refreshCluster() { connectionHandler.renewSlotCache(); } /** * 同步讀取所有資料. 與syncAndReturnAll()相比,sync()只是沒有對資料做反序列化 */ private void sync() { innerSync(null); } /** * 同步讀取所有資料 並按命令順序返回一個列表 * * @return 按照命令的順序返回所有的資料 */ private List<Object> syncAndReturnAll() { List<Object> responseList = new ArrayList<Object>(); innerSync(responseList); return responseList; } private void innerSync(List<Object> formatted) { HashSet<Client> clientSet = new HashSet<Client>(); try { for (Client client : clients) { // 在sync()呼叫時其實是不需要解析結果資料的,但是如果不呼叫get方法,發生了JedisMovedDataException這樣的錯誤應用是不知道的,因此需要呼叫get()來觸發錯誤。 // 其實如果Response的data屬性可以直接獲取,可以省掉解析資料的時間,然而它並沒有提供對應方法,要獲取data屬性就得用反射,不想再反射了,所以就這樣了 Object data = generateResponse(client.getOne()).get(); if (null != formatted) { formatted.add(data); } // size相同說明所有的client都已經新增,就不用再呼叫add方法了 if (clientSet.size() != jedisMap.size()) { clientSet.add(client); } } } catch (JedisRedirectionException jre) { if (jre instanceof JedisMovedDataException) { // if MOVED redirection occurred, rebuilds cluster's slot cache, // recommended by Redis cluster specification refreshCluster(); } jre.printStackTrace(); throw jre; } finally { if (clientSet.size() != jedisMap.size()) { // 所有還沒有執行過的client要保證執行(flush),防止放回連線池後後面的命令被汙染 for(Map.Entry<JedisPool, Map<Long,Jedis>> poolEntry:jedisMap.entrySet()){ for (Map.Entry<Long, Jedis> jedisEntry : poolEntry.getValue().entrySet()) { if (clientSet.contains(jedisEntry.getValue().getClient())) { continue; } flushCachedData(jedisEntry.getValue()); } } } hasDataInBuf = false; close(); } } @Override public void close() { clean(); clients.clear(); for (Map.Entry<JedisPool, Map<Long, Jedis>> poolEntry:jedisMap.entrySet()) { for(Map.Entry<Long, Jedis> jedisEntry:poolEntry.getValue().entrySet()){ if (hasDataInBuf) { flushCachedData(jedisEntry.getValue()); } jedisEntry.getValue().close(); } } jedisMap.clear(); hasDataInBuf = false; } private void flushCachedData(Jedis jedis) { try { jedis.getClient().getAll(); } catch (RuntimeException ex) { ex.printStackTrace(); } } @Override protected Client getClient(String key) { byte[] bKey = SafeEncoder.encode(key); return getClient(bKey); } @Override protected Client getClient(byte[] key) { Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key)); Client client = jedis.getClient(); clients.add(client); return client; } private Jedis getJedis(int slot) { //根據執行緒id從快取中獲取Jedis Jedis jedis = null; Map<Long,Jedis> tmpMap = null; //獲取執行緒id long id = Thread.currentThread().getId(); //獲取jedispool JedisPool pool = clusterInfoCache.getSlotPool(slot); if (jedisMap.containsKey(pool)) { tmpMap = jedisMap.get(pool); if(tmpMap.containsKey(id)){ jedis = tmpMap.get(id); }else{ jedis = pool.getResource(); tmpMap.put(id, jedis); } }else{ tmpMap = new HashMap<>(); jedis = pool.getResource(); tmpMap.put(id, jedis); jedisMap.put(pool, tmpMap); } hasDataInBuf = true; return jedis; } private static Field getField(Class<?> cls, String fieldName) { try { Field field = cls.getDeclaredField(fieldName); field.setAccessible(true); return field; } catch (NoSuchFieldException | SecurityException e) { e.printStackTrace(); throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e); } } @SuppressWarnings({ "unchecked" }) private static <T> T getValue(Object obj, Field field) { try { return (T) field.get(obj); } catch (IllegalArgumentException | IllegalAccessException e) { e.printStackTrace(); throw new RuntimeException(e); } } public static long write(String host,int port,String pwd,String fileName){ long s = System.currentTimeMillis(); File file = new File(fileName); String theadName = file.getName(); System.out.println(theadName+"-->當前時間:"+DateUtil.YYYY_MM_DD_HH_MM_SS.format(new Date())); BufferedReader br = null; JedisCluster jc = getJedisCluster(host, port, pwd); JedisClusterPipeline jcp = new JedisClusterPipeline(jc); jcp.refreshCluster(); List<Object> batchResult = null; int totalCount = 0; try { // batch write br = new BufferedReader(new FileReader(file)); int count = 0; String current; while (null!=(current=br.readLine())) { count++; totalCount++; String[] keyValue = current.split(",",2); jcp.hset(StringUtil.getBucketId(keyValue[0]),keyValue[0],keyValue[1]); if(count==1500000){ jcp.sync(); batchResult = jcp.syncAndReturnAll(); count = 0; System.out.println(theadName+"-->當前時間:"+DateUtil.YYYY_MM_DD_HH_MM_SS.format(new Date())+",totalCount:"+Util.mathFormat(totalCount)+",耗時:"+(System.currentTimeMillis() - s)/60000+"min,batchResult.size():"+batchResult.size()); } } if(count>0){ jcp.sync(); batchResult = jcp.syncAndReturnAll(); System.out.println(theadName+"-->當前時間:"+DateUtil.YYYY_MM_DD_HH_MM_SS.format(new Date())+",totalCount:"+Util.mathFormat(totalCount)+",耗時:"+(System.currentTimeMillis() - s)/60000+"min,batchResult.size():"+batchResult.size()); } }catch(Exception e){ e.printStackTrace(); } finally { try { jcp.close(); } catch (Exception e) { e.printStackTrace(); } try { jc.close(); } catch (Exception e) { e.printStackTrace(); } try { br.close(); } catch (Exception e) { e.printStackTrace(); } } // output time System.out.println(theadName+" over-->當前時間:"+DateUtil.YYYY_MM_DD_HH_MM_SS.format(new Date())+",totalCount:"+Util.mathFormat(totalCount)+",耗時:"+(System.currentTimeMillis() - s)/60000+"min"); return totalCount; } /** * 通過fields批量獲取values * @param fields * @param host * @param port * @param pwd * @return */ public static List<String> mhget(List<String> fields,String host,int port,String pwd){ List<String> results = new ArrayList<String>(); JedisCluster jc = getJedisCluster(host, port, pwd); JedisClusterPipeline jcp = new JedisClusterPipeline(jc); jcp.refreshCluster(); try { for (String field:fields) { jcp.hget(StringUtil.getBucketId(field),field); } List<Object> list=jcp.syncAndReturnAll(); for(Object o:list){ if(o!=null){ results.add(o.toString()); }else{ results.add(null); } } }catch(Exception e){ e.printStackTrace(); } finally { try { jcp.close(); } catch (Exception e) { e.printStackTrace(); } } return results; } /** * 通過field獲取value * @param field * @param host * @param port * @param pwd * @return */ public static String hget(String field,String host,int port,String pwd){ JedisCluster jc = getJedisCluster(host, port, pwd); return jc.hget(StringUtil.getBucketId(field),field); } /** * 通過field賦值 * @param field * @param value * @param host * @param port * @param pwd * @return */ public static long hset(String field,String value,String host,int port,String pwd){ JedisCluster jc = getJedisCluster(host, port, pwd); return jc.hset(StringUtil.getBucketId(field),field,value); } /** * 通過field判斷是否存在 * @param field * @param value * @param host * @param port * @param pwd * @return */ public static boolean hexists(String field,String host,int port,String pwd){ JedisCluster jc = getJedisCluster(host, port, pwd); return jc.hexists(StringUtil.getBucketId(field),field); } /** * 通過field刪除值 * @param field * @param host * @param port * @param pwd * @return */ public static long hdel(String field,String host,int port,String pwd){ JedisCluster jc = getJedisCluster(host, port, pwd); return jc.hdel(StringUtil.getBucketId(field),field); } /** * 通過key獲取value * @param key * @param host * @param port * @param pwd * @return */ public static String get(String key,String host,int port,String pwd){ JedisCluster jc = getJedisCluster(host, port, pwd); return jc.get(key); } /** * 通過key刪除 * @param key * @param host * @param port * @param pwd * @return */ public static long del(String key,String host,int port,String pwd){ JedisCluster jc = getJedisCluster(host, port, pwd); return jc.del(key); } /** * 通過key查詢剩餘有效期 * @param key * @param host * @param port * @param pwd * @return */ public static long ttl(String key,String host,int port,String pwd){ JedisCluster jc = getJedisCluster(host, port, pwd); return jc.ttl(key); } private static JedisCluster getJedisCluster(String host,int port,String pwd){ HostAndPort node = new HostAndPort(host,port); return new JedisCluster(node, CONNECTION_TIMEOUT, SO_TIMEOUT, MAX_ATTEMPTS, pwd, new GenericObjectPoolConfig()); } public static void main(String[] args) throws IOException { //控制檯輸入檔案地址 Scanner scanner= new Scanner(System.in); System.out.println("請輸入redis主機/埠/密碼認證,例如:10.186.1.1/7380/2wsx3edc"); String hostPortPwdStr;//redis主機/埠/密碼認證 String host;//redis主機 int port;//redis埠 String pwd;//redis密碼認證 while(true){ try { hostPortPwdStr = scanner.next(); String hostPortPwd[] = hostPortPwdStr.split("/"); if(hostPortPwd.length == 3){ host = hostPortPwd[0]; port = Integer.parseInt(hostPortPwd[1]); pwd = hostPortPwd[2]; break; }else{ System.out.println("redis主機/埠/密碼認證格式錯誤,請重新輸入"); } } catch(NumberFormatException e1){ System.out.println("redis密碼認證錯誤,請重新輸入"); }catch (Exception e) { System.out.println("redis主機/埠/密碼認證錯誤,請重新輸入"); } } System.out.println("請輸入檔案地址,例如:/app/redis/pipe/CUSTNO_TO_BRANCHID/0_CUSTNO_TO_BRANCHID.txt"); String filePath = scanner.next(); while(!new File(filePath).isFile()){ System.out.println("檔案地址錯誤,請重新輸入"); filePath = scanner.next(); } scanner.close(); write(host, port, pwd, filePath); } }