Spark(二)【sc.textfile的分割槽策略原始碼分析】
阿新 • • 發佈:2020-08-01
sparkcontext.textFile()返回的是HadoopRDD!
關於HadoopRDD的官方介紹,使用的是舊版的hadoop api
ctrl+F12搜尋 HadoopRDD的getPartitions方法,這裡進行了分割槽計算
讀取的是txt檔案,用的是TextInputFormat的切片規則
當前spark3.0的HadoopRDD依賴於hadoop的切片規則。其中HadoopRDD用的是舊版hadoop API,還有個NewHadoopRDD用的是新版hadoop API
進去TextInputFromat的檢視split方法
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { // 獲取要操作的所有檔案的屬性資訊 FileStatus[] files = listStatus(job); // 所有檔案的總大小 long totalSize = 0; // compute total size // 目標切片大小 numSplits=defaultMinPartitions long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); //預設為1 long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); // generate splits ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); NetworkTopology clusterMap = new NetworkTopology(); // 切片是以檔案為單位切 for (FileStatus file: files) { //獲取檔案大小 long length = file.getLen(); //檔案不為空 if (length != 0) { // 檔案是否可切,一般普通檔案都可切,如果是壓縮格式,只有lzo,Bzip2可切 if (isSplitable(fs, path)) { // 獲取檔案的塊大小 預設128M long blockSize = file.getBlockSize(); // 計算片大小 long splitSize = computeSplitSize(goalSize, minSize, blockSize); long bytesRemaining = length; // 迴圈切片,以splitSize為基礎進行切片 , 切的片大小,最後一片有可能小於片大小的1.1倍 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); // makeSplit()切片 splits.add(makeSplit(path, length-bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; } //剩餘部分,不夠一片,全部作為1片 if (bytesRemaining != 0) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1])); } } else { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } } else { // 檔案為空,建立一個空的切片 //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits.toArray(new FileSplit[splits.size()]); }
計算片大小:片大小的計算以所有檔案的總大小計算,切片時以檔案為單位進行切片。
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
// minSize預設為1
return Math.max(minSize, Math.min(goalSize, blockSize));
}
總結:在大資料的計算領域,一般情況下,塊大小就是片大小!
分割槽數過多,會導致切片大小 < 塊大小。
分割槽數過少,task個數也會少,資料處理效率低,合理設定分割槽數。