1. 程式人生 > 實用技巧 >Spark(二)【sc.textfile的分割槽策略原始碼分析】

Spark(二)【sc.textfile的分割槽策略原始碼分析】

sparkcontext.textFile()返回的是HadoopRDD!

關於HadoopRDD的官方介紹,使用的是舊版的hadoop api

ctrl+F12搜尋 HadoopRDD的getPartitions方法,這裡進行了分割槽計算

讀取的是txt檔案,用的是TextInputFormat的切片規則

當前spark3.0的HadoopRDD依賴於hadoop的切片規則。其中HadoopRDD用的是舊版hadoop API,還有個NewHadoopRDD用的是新版hadoop API

進去TextInputFromat的檢視split方法

 public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
        
    // 獲取要操作的所有檔案的屬性資訊
    FileStatus[] files = listStatus(job);
    
   
    // 所有檔案的總大小
    long totalSize = 0;  // compute total size
        
   
    // 目標切片大小  numSplits=defaultMinPartitions
    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
        //預設為1
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
        
        // 切片是以檔案為單位切
    for (FileStatus file: files) {
        
      //獲取檔案大小
      long length = file.getLen();
        
        //檔案不為空
      if (length != 0) {
       // 檔案是否可切,一般普通檔案都可切,如果是壓縮格式,只有lzo,Bzip2可切
        if (isSplitable(fs, path)) {
            // 獲取檔案的塊大小  預設128M
          long blockSize = file.getBlockSize();
            // 計算片大小
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);

          long bytesRemaining = length;
            // 迴圈切片,以splitSize為基礎進行切片 , 切的片大小,最後一片有可能小於片大小的1.1倍
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                length-bytesRemaining, splitSize, clusterMap);
              // makeSplit()切片
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts[0], splitHosts[1]));
            bytesRemaining -= splitSize;
          }

            //剩餘部分,不夠一片,全部作為1片
          if (bytesRemaining != 0) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                - bytesRemaining, bytesRemaining, clusterMap);
            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                splitHosts[0], splitHosts[1]));
          }
        } else {
          String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
          splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
        }
      } else { 
          // 檔案為空,建立一個空的切片
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits.toArray(new FileSplit[splits.size()]);
  }

計算片大小:片大小的計算以所有檔案的總大小計算,切片時以檔案為單位進行切片。

protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    // minSize預設為1
    return Math.max(minSize, Math.min(goalSize, blockSize));
  }

總結:在大資料的計算領域,一般情況下,塊大小就是片大小!

​ 分割槽數過多,會導致切片大小 < 塊大小。

​ 分割槽數過少,task個數也會少,資料處理效率低,合理設定分割槽數。