1. 程式人生 > 資料庫 >在Redis叢集中使用pipeline批量插入的實現方法

在Redis叢集中使用pipeline批量插入的實現方法

由於專案中需要使用批量插入功能,所以在網上查詢到了Redis 批量插入可以使用pipeline來高效的插入,示例程式碼如下:

String key = "key";
Jedis jedis = new Jedis("xx.xx.xx.xx");
Pipeline p = jedis.pipelined();
List<String> myData = .... //要插入的資料列表
for(String data: myData){
  p.hset(key,data);
}
p.sync();
jedis.close();

但實際上遇到的問題是,專案上所用到的Redis是叢集,初始化的時候使用的類是JedisCluster而不是Jedis. 去查了JedisCluster的文件,並沒有發現提供有像Jedis一樣的獲取Pipeline物件的 pipelined()方法.

Google了一下,發現瞭解決方案.

Redis叢集規範有說: Redis 叢集的鍵空間被分割為 16384 個槽(slot), 叢集的最大節點數量也是 16384 個。每個主節點都負責處理 16384 個雜湊槽的其中一部分。當我們說一個叢集處於“穩定”(stable)狀態時, 指的是叢集沒有在執行重配置(reconfiguration)操作, 每個雜湊槽都只由一個節點進行處理。

所以我們可以根據要插入的key知道這個key所對應的槽的號碼,再通過這個槽的號碼從叢集中找到對應Jedis. 具體實現如下

//初始化得到了jedis cluster,如何獲取HostAndPort集合程式碼就不寫了

Set<HostAndPort> nodes = .....

JedisCluster jedisCluster = new JedisCluster(nodes);



Map<String,JedisPool> nodeMap = jedisCluster.getClusterNodes();

String anyHost = nodeMap.keySet().iterator().next();

//getSlotHostMap方法在下面有

TreeMap<Long,String> slotHostMap = getSlotHostMap(anyHost); 

  private static TreeMap<Long,String> getSlotHostMap(String anyHostAndPortStr) {
    TreeMap<Long,String> tree = new TreeMap<Long,String>();
    String parts[] = anyHostAndPortStr.split(":");
    HostAndPort anyHostAndPort = new HostAndPort(parts[0],Integer.parseInt(parts[1]));
    try{
      Jedis jedis = new Jedis(anyHostAndPort.getHost(),anyHostAndPort.getPort());
      List<Object> list = jedis.clusterSlots();
      for (Object object : list) {
        List<Object> list1 = (List<Object>) object;
        List<Object> master = (List<Object>) list1.get(2);
        String hostAndPort = new String((byte[]) master.get(0)) + ":" + master.get(1);
        tree.put((Long) list1.get(0),hostAndPort);
        tree.put((Long) list1.get(1),hostAndPort);
      }
      jedis.close();
    }catch(Exception e){
      
    }
    return tree;
  }

上面這幾步可以在初始化的時候就完成. 不需要每次都呼叫,把nodeMap和slotHostMap都定義為靜態變數.

//獲取槽號

int slot = JedisClusterCRC16.getSlot(key); 

//獲取到對應的Jedis物件

Map.Entry<Long,String> entry = slotHostMap.lowerEntry(Long.valueOf(slot));

Jedis jedis = nodeMap.get(entry.getValue()).getResource();

建議上面這步操作可以封裝成一個靜態方法,比如命名為public static Jedis getJedisByKey(String key) 之類的. 意思就是在叢集中,通過key獲取到這個key所對應的Jedis物件.

這樣再通過上面的jedis.pipelined();來就可以進行批量插入了.

注:這個方法是從Google上搜來的,直到目前我使用起來還沒發現什麼問題. 如果哪位大神發現有什麼不對的地方歡迎提出來.

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。