1. 程式人生 > >從原始碼看Spark讀取Hive表資料小檔案和分塊的問題

從原始碼看Spark讀取Hive表資料小檔案和分塊的問題

原文連結:https://mp.csdn.net/postedit/82423831 

使用Spark進行資料分析和計算早已成趨勢,你是否關注過讀取一張Hive表時Task數為什麼是那麼多呢?它跟什麼有關係呢? 最近剛好碰到這個問題,而之前對此有些模糊,所以做了些整理,希望大家拍磚探討

前言

有同事問到,Spark讀取一張Hive表的資料Task有一萬多個,看了Hive表分割槽下都是3MB~4MB的小檔案,每個Task只處理這麼小的檔案,實在浪費資源浪費時間。而我們都知道Spark的Task數由partitions決定,所以他想通過repartition(num)的方式來改變分割槽數,結果發現讀取檔案的時候Task數並沒有改變。遂問我有什麼引數可以設定,從而改變讀取Hive表時的Task數,將小檔案合併大檔案讀上來

> 本文涉及原始碼基於Spark2.0.0和Hadoop2.6.0,不同版本程式碼可能不一致,需自己對應。此外針對TextInputFormat格式的Hive表,其他格式的比如Parquet有Spark自己的高效實現,不在討論範圍之內

分析

Spark讀取Hive表是通過HadoopRDD掃描上來的,具體可見 org.apache.spark.sql.hive.TableReader類,構建HadoopRDD的程式碼如下

val rdd = new HadoopRDD(
sparkSession.sparkContext,
_broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
Some(initializeJobConfFunc),
inputFormatClass,
classOf[Writable],
classOf[Writable],
_minSplitsPerRDD)

 

這裡inputFormatClass是Hive建立時指定的,預設不指定為 org.apache.hadoop.mapred.TextInputFormat,由它就涉及到了HDFS檔案的FileSplit數,從而決定了上層Spark的partition數。在進入HadoopRDD類檢視之前,還有一個引數需要我們注意,就是 _minSplitsPerRDD,它在後面SplitSize的計算中是起了作用的。

我們看一下它的定義

private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
math.max(hadoopConf.getInt("mapred.map.tasks", 1),
sparkSession.sparkContext.defaultMinPartitions)
}

在我們指定以--master local模式跑的時候,它為0,而在其他模式下,則是求的一個最大值。這裡重點看 defaultMinPartitions,如下

def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

// defaultParallelism 在yarn和standalone模式下的計算
def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

 

從這裡可以看到,defaultMinPartitions的值一般為2,而 mapred.map.tasks 或者 mapreduce.job.maps( 新版引數)是Hadoop的內建引數,其預設值也為2,一般很少去改變它。所以這裡_minSplitsPerRDD的值基本就是2了。

下面我們跟到HadoopRDD類裡,去看看它的partitions是如何來的

def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
// inputFormat就是上面引數inputFormatClass所配置的類的例項
val inputFormat = getInputFormat(jobConf)
// 此處獲取FileSplit數,minPartitions就是上面的_minSplitsPerRDD
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
// 從這裡可以看出FileSplit數決定了Spark掃描Hive表的partition數
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
}

 

在 getPartitions 方法裡我們可以看到 FileSplit數最後決定了Spark讀取Hive表的Task數,下面我們再來看看 mapred.TextInputFormat 類裡 getSplits 的實現

分兩步來看,首先是掃描檔案,計算檔案大小的部分

FileStatus[] files = listStatus(job);

.....

long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}

// numSplits就是上面傳入的minPartitions,為2
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1), minSplitSize);

// minSplitSize 預設為1,唯一可通過 setMinSplitSize 方法設定
private long minSplitSize = 1;

 

針對Hive表的分割槽,Spark對每個分割槽都構建了一個HadoopRDD,每個分割槽目錄下就是實際的資料檔案,例如我們叢集的某一張表按天分割槽,每天下面有200個數據檔案,每個檔案大概3MB~4MB之間,這些實際上是reduce設定不合理導致的小檔案產生,如下圖

 

HiveFile

 

此處 listStatus 方法就是掃描的分割槽目錄,它返回的就是圖中顯示的具體 part-*****檔案的FileStatus物件,一共200個。從 totalSize 的計算可以看出,它是這200個檔案的總大小,為838MB,因此 goalSize 就為419MB。

引數 mapreduce.input.fileinputformat.split.minsize 在Spark程式沒有配的情況下,獲取的值為0,而 minSplitSize在Spark獲取FileSplits的時候並沒有被設定,所以為預設值1,那麼 minSize 就為1

其次,我們再來看從檔案劃分Split,部分程式碼如下(部分解釋見註釋)

ArrayList splits = new ArrayList(numSplits);
NetworkTopology clusterMap = new NetworkTopology();

// files是上面掃描的分割槽目錄下的part-*****檔案
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
// 判斷檔案是否可切割
if (isSplitable(fs, path)) {
// 這裡獲取的不是檔案本身的大小,它的大小從上面的length就可以知道,這裡獲取的是HDFS檔案塊(跟檔案本身沒有關係)的大小
// HDFS檔案塊的大小由兩個引數決定,分別是 dfs.block.size 和 fs.local.block.size
// 在HDFS叢集模式下,由 dfs.block.size 決定,對於Hadoop2.0來說,預設值是128MB
// 在HDFS的local模式下,由 fs.local.block.size 決定,預設值是32MB
long blockSize = file.getBlockSize(); // 128MB

// 這裡計算splitSize,根據前面計算的goalSize=419MB,minSize為1
long splitSize = computeSplitSize(goalSize, minSize, blockSize);

long bytesRemaining = length;
// 如果檔案大小大於splitSize,就按照splitSize對它進行分塊
// 由此可以看出,這裡是為了並行化更好,所以按照splitSize會對檔案分的更細,因而split會更多
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts[0], splitHosts[1]));
}
} else {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}

 

從上面可以看到,splitSize是從 computeSplitSize(goalSize, minSize, blockSize) 計算來的,這三個引數我們都知道大小,那麼計算規則是怎麼樣的呢

規則:Math.max(minSize, Math.min(goalSize, blockSize)),從而我們可以知道 splitSize = 128MB,對於3MB~4MB的小檔案來說,就 決定了一個小檔案就是一個split了,從而對應了一個Spark的partition,所以我們一個分割槽下就有200個partition,當取兩個月的資料時,就是 200 * 30 * 2 = 12000,從而是12000個Task,跟同事所說的吻合!

> 而從TextInputFormat裡分Split的邏輯來看,它只會把一個檔案分得越來越小,而不會對小檔案採取合併,所以無論調整哪個引數,都沒法改變這種情況!而通過repartition強行分割槽,也是在拿到HDFS檔案之後對這12000個partition進行重分割槽,改變不了小檔案的問題,也無法改變讀取Hive表Task數多的情況

總結

1、Block是物理概念,而Split是邏輯概念,最後資料的分片是根據Split來的。一個檔案可能大於BlockSize也可能小於BlockSize,大於它就會被分成多個Block儲存到不同的機器上,SplitSize可能大於BlockSize也可能小於BlockSize,SplitSize如果大於BlockSize,那麼一個Split就可能要跨多個Block。對於資料分隔符而言,不用擔心一個完整的句子分在兩個Block裡,因為在Split構建RecordReader時,它會被補充完整

2、對於採用 org.apache.hadoop.mapred.TextInputFormat 作為InputFormat的Hive表,如果存在小檔案,Spark在讀取的時候單憑調引數和repartition是改變不了分割槽數的!對於小檔案的合併,目前除了Hadoop提供的Archive方式之外,也只能通過寫MR來手動合了,最好的方式還是寫資料的時候自己控制reduce的個數,把握檔案數

3、對於Spark直接通過SparkContext的 textFile(inputPath, numPartitions) 方法讀取HDFS檔案的,它底層也是通過HadoopRDD構建的,它的引數numPartitions就是上面計算goalSize的numSplits引數,這篇 文章 對原理描述的非常詳細,非常值得一讀

4、對於小檔案合併的InputFormat有 org.apache.hadoop.mapred.lib.CombineFileInputFormat,跟它相關的引數是 mapreduce.input.fileinputformat.split.maxsize,它用於設定一個Split的最大值

5、跟mapred.TextInputFormat 裡的Split劃分相關的引數

mapreduce.input.fileinputformat.split.minsize : 決定了計算Split劃分時的minSize
mapreduce.job.maps 或 mapred.map.tasks : 決定了getSplits(JobConf job, int numSplits)方法裡的numSplits,從而可以影響goalSize的大小
dfs.block.size 或 fs.local.block.size : 決定了HDFS的BlockSize

6、MapReduce新版API裡的 org.apache.hadoop.mapreduce.lib.input.TextInputFormat,它的SplitSize與上面說到的計算方式不一樣,getSplits方法的簽名為 getSplits(JobContext job),不再有numSplilts這個引數,splitSize的計算規則改為 Math.max(minSize, Math.min(maxSize, blockSize)),minSize和blockSize跟之前一樣,新的maxSize為conf.getLong("mapreduce.input.fileinputformat.split.maxsize", Long.MAX_VALUE)

7、在Spark2.0.0裡,設定Hadoop相關的引數(比如mapreduce開頭的)要通過 spark.sparkContext.hadoopConfiguration 來設定