1. 程式人生 > 前端設計 >大資料實踐解析(上):聊一聊spark的檔案組織方式

大資料實踐解析(上):聊一聊spark的檔案組織方式

摘要:

在大資料/資料庫領域,資料的儲存格式直接影響著系統的讀寫效能。Spark針對不同的使用者/開發者,支援了多種資料檔案儲存方式。本文的內容主要來自於Spark AI Summit 2019中的一個talk【1】,我們將整個talk分為上下兩個部分,上文會以概念為主介紹spark的檔案/資料組織方式,下文中則通過例子講解spark中的讀寫流程。本文是上半部分,首先會對spark中幾種流行的檔案源(File Sources)進行特性介紹,這裡會涉及行列儲存的比較。然後會介紹兩種不同的資料佈置(Data layout),分別是partitioning以及bucketing,它們是spark中兩種重要的查詢優化手段。

檔案格式

在介紹檔案格式之前,不得不提一下在儲存過程中的行(Row-oriented)、列(Column-oriented)儲存這兩個重要的資料組織方式,它們分別適用於資料庫中OLTP和OLAP不同的場景。spark對這兩類檔案格式都有支援,列存的有parquet, ORC;行存的則有Avro,JSON,CSV,Text,Binary。

下面用一個簡單的例子說明行列兩種儲存格式的適用場景:

在上圖的music表中,如果用列存和行存儲存會得到下面兩種不同的組織方式。在左邊的列存中,同一列的資料被組織在一起,當一列資料儲存完畢時,接著下一列的資料存放,直到資料全部存好;而在行存中,資料按照行的順序依次放置,同一行中包括了不同列的一個數據,在圖中通過不同的顏色標識了資料的排列方法。

如果使用列存去處理下面的查詢,可以發現它只涉及到了兩列資料(album和artist),而列存中同一列的資料放在一起,那麼我們就可以快速定位到所需要的列的位置,然後只讀取查詢中所需要的列,有效減少了無用的資料IO(year 以及 genre)。同樣的,如果使用行存處理該查詢就無法起到 “列裁剪“” 的作用,因為一列中的資料被分散在檔案中的各個位置,每次IO不可避免地需要讀取到其他的資料,所以需要讀取表中幾乎所有的資料才能滿足查詢的條件。

通過這個例子可以發現,列存適合處理在幾個列上作分析的查詢,因為可以避免讀取到不需要的列資料,同時,同一列中的資料放置在一起也十分適合壓縮。但是,如果需要對列存進行INSET INTO操作呢?它需要挪動幾乎所有資料,效率十分低下。行存則只需要在檔案末尾append一行資料即可。在學術界,有人為了中和這兩種“極端”的儲存方式,提出了行列混存來設計HTAP(Hybrid transactional/analytical processing)資料庫,感興趣的讀者可以參考【2】。

所以簡單總結就是:列存適合讀密集的workload,特別是那些僅僅需要部分列的分析型查詢;行存適合寫密集的workload,或者是要求所有列的查詢。

檔案結構介紹

  • Parquet

在Parquet中,首尾都是parquet的magic number,用於檢驗該檔案是否是一個parquet檔案。Footer放在檔案的末尾,存放了元資料資訊,這裡包括schema資訊,以及每個row group的meta data。每個row group是一系列行資料的組成,row group中的每個column是一個列。

parquet格式能有效應用查詢優化中的優化規則,比如說謂詞下推(Predicate Push),將filter的條件推到掃描(Scan)資料時進行,減少了上層操作節點不必要的計算。又比如通過設定元資料中的min/max,在查詢時可以拿著條件和元資料進行對比,如果查詢條件完全不符合min/max,則可以直接跳過元資料所指的資料塊,減少了無用的資料IO。

  • ORC

ORC全稱是Optimized Row Columnar,它的組織方式如下圖,其中

Postsctipt儲存了該表的行數,壓縮引數,壓縮大小,列等資訊;

File Footer中是各個stripe的位置資訊,以及該表的統計結果;

資料分成一個個stripe,對應於parquet中的row group;

Stripe Footer主要是記錄每個stripe的統計資訊,包括min,max,count等;

Row data是資料的具體儲存;

Index Data儲存該stripe資料的具體位置,總行數等。

它們之間的關係在上圖中用虛實線做了很好的補充。

行存檔案格式

行存相較於列存會比較簡單,在實際開發中可能也接觸會相對較多,所以這裡簡單介紹其優缺點。

  • Avro:Avro的特點就是快速以及可壓縮,並且支援schema的操作,比如增加/刪除/重新命名一個欄位,更改預設值等。
  • JSON:在Spark中,通常被當做是一個結構體,在使用過程中需要注意key的數目(容易觸發OOM錯誤),它對schema的支援並不是很好。優點是輕量,容易部署以及便於debug。
  • CSV:通常用於資料的收集,比如說日誌資訊等,寫效能比讀效能好,它的缺點是檔案規範的不夠標準(分隔符,轉義符,引號),對巢狀資料型別的支援不足等。它和JSON都屬於半結構化的文字結構。
  • Raw text file:基於行的文字檔案,在spark中可通過 spark.read.text()直接讀入並按行切分,但是需要保持行的size在一個合理的值,支援有限的schema。
  • Binary:二進位制檔案,是Spark 3.0 的新特性。Spark會讀取每個binary檔案並轉化成一條記錄(record),該記錄(record)會儲存原始的二進位制資料以及檔案的matedata。這裡記錄(record)是一個schema,包括檔案的路徑(StringType),檔案被修改的時間(TimestampType),檔案的長度(LongType)以及內容(BinaryType)。

例如,如果我們需要遞迴讀取某目錄下所有的JPG檔案則可以通過下面的API來完成:

spark.read.format("binaryFile")
.option("pathGlobFilter","*.jpg")
.option("recursiveFileLookup","true")
.load("/path/to/dir")複製程式碼

資料佈置(Data layout)

partitioning

分割槽(Partition)是指當資料量很大時,可以按照某種方式對資料進行粗粒度切分的方式,比如在上圖中按year欄位進行了切分,在year欄位內部,又將genre欄位進行了切分。這樣帶來的好處也是顯而易見的,當處理“year = 2019 and genre = ‘folk’”的查詢時,就可以過濾掉不需要掃描的資料,直接定位到相應的切片中去做查詢,提高了查詢效率。

在Spark SQL和DataFrame API分別提供了相應的建立partition的方式。

同時,越多的分割槽並不意味著越好的效能。當分割槽越多時,分割槽的檔案數也隨著增多,這給metastore獲取分割槽的資料以及檔案系統list files帶來了很大的壓力,這也降低了查詢的效能。所以建議就是,選取合適的欄位做分割槽,該欄位不應出現過多的distinct values,使分割槽數處於一個合適的數目。如果distinct values很多怎麼辦?可以嘗試將欄位hash到合適的桶中,或是可以使用欄位中的一小部分作為分割槽欄位,比如name中的第一個字母。

bucketing

在Spark的join操作中,如果兩邊的表都比較大,會需要資料的shuffle,shuffle資料會佔據查詢過程中大量的時間,當某個耗時的Join的欄位被頻繁使用時,我們可以通過使用分桶(bucketing)的手段來優化該類查詢。通過分桶,我們將資料按照joinkey預先shuffle及排序,每次處理sort merge join時,只需要各自將自己本地的資料處理完畢即可,減少了shuffle的耗時。這裡要注意,分桶表的效能和分桶的個數密切相關,過多的分桶會導致小檔案問題,而過少的分桶會導致併發度太小從而影響效能。

分桶前的Sort merge join:

分桶後:

在Spark SQL和DataFrame API分別提供了相應的建立分桶的方式。通過排序,我們也可以記錄好min/max,從而避免讀取無用的資料。

參考

【1】Databricks. 2020. Apache Spark's Built-In File Sources In Depth - Databricks. [online] Available at: <databricks.com/session_eu1…>.

【2】 Bridging the Archipelago betweenRow-Stores and Column-Stores for Hybrid Workloads (SIGMOD'16)


點選關注,第一時間瞭解華為雲新鮮技術~