位元組跳動在Spark SQL上的核心優化實踐 | 位元組跳動技術沙龍
以下是位元組跳動資料倉庫架構負責人-郭俊的分享主題沉澱:《位元組跳動在Spark SQL上的核心優化實踐》。
團隊介紹
資料倉庫架構團隊負責資料倉庫領域架構設計,支援位元組跳動幾乎所有產品線(包含但不限於抖音、今日頭條、西瓜視訊、火山視訊)資料倉庫方向的需求,如 Spark SQL / Druid 的二次開發和優化。概括
今天的分享分為三個部分,第一個部分是 SparkSQL 的架構簡介,第二部分介紹位元組跳動在 SparkSQL 引擎上的優化實踐,第三部分是位元組跳動在 Spark Shuffle 穩定性提升和效能優化上的實踐與探索。Spark SQL 架構簡介
上述資訊存於 Catalog 內。在生產環境中,一般由 Hive Metastore 提供 Catalog 服務。Analyzer 會結合 Catalog 將 Unresolved Logical Plan 轉換為 Resolved Logical Plan。
到這裡還不夠。不同的人寫出來的 SQL 不一樣,生成的 Resolved Logical Plan 也就不一樣,執行效率也不一樣。為了保證無論使用者如何寫 SQL 都可以高效的執行,Spark SQL 需要對 Resolved Logical Plan 進行優化,這個優化由 Optimizer 完成。Optimizer 包含了一系列規則,對 Resolved Logical Plan 進行等價轉換,最終生成 Optimized Logical Plan。該 Optimized Logical Plan 不能保證是全域性最優的,但至少是接近最優的。
上述過程只與 SQL 有關,與查詢有關,但是與 Spark 無關,因此無法直接提交給 Spark 執行。Query Planner 負責將 Optimized Logical Plan 轉換為 Physical Plan,進而可以直接由 Spark 執行。由於同一種邏輯運算元可以有多種物理實現。如 Join 有多種實現,ShuffledHashJoin、BroadcastHashJoin、BroadcastNestedLoopJoin、SortMergeJoin 等。因此 Optimized Logical Plan 可被 Query Planner 轉換為多個 Physical Plan。如何選擇最優的 Physical Plan 成為一件非常影響最終執行效能的事情。一種比較好的方式是,構建一個 Cost Model,並對所有候選的 Physical Plan 應用該 Model 並挑選 Cost 最小的 Physical Plan 作為最終的 Selected Physical Plan。
後面介紹位元組跳動在 Spark SQL 上做的一些優化,主要圍繞這一節介紹的邏輯計劃優化與物理計劃優化展開。
Spark SQL引擎優化
Bucket Join改進
在 Spark 裡,實際並沒有 Bucket Join 運算元。這裡說的 Bucket Join 泛指不需要 Shuffle 的 SortMergeJoin。下圖展示了 SortMergeJoin 的基本原理。用虛線框代表的 Table 1 和 Table 2 是兩張需要按某欄位進行 Join 的表。虛線框內的 partition 0 到 partition m 是該錶轉換成 RDD 後的 Partition,而非表的分割槽。假設 Table 1 與 Table 2 轉換為 RDD 後分別包含 m 和 k 個 Partition。為了進行 Join,需要通過 Shuffle 保證相同 Join Key 的資料在同一個 Partition 內且 Partition 內按 Key 排序,同時保證 Table 1 與 Table 2 經過 Shuffle 後的 RDD 的 Partition 數相同。
如下圖所示,經過 Shuffle 後只需要啟動 n 個 Task,每個 Task 處理 Table 1 與 Table 2 中對應 Partition 的資料進行 Join 即可。如 Task 0 只需要順序掃描 Shuffle 後的左右兩邊的 partition 0 即可完成 Join。對於大資料的場景來講,資料一般是一次寫入多次查詢。如果經常對兩張表按相同或類似的方式進行 Join,每次都需要付出 Shuffle 的代價。與其這樣,不如讓資料在寫的時候,就讓資料按照利於 Join 的方式分佈,從而使得 Join 時無需進行 Shuffle。如下圖所示,Table 1 與 Table 2 內的資料按照相同的 Key 進行分桶且桶數都為 n,同時桶內按該 Key 排序。對這兩張表進行 Join 時,可以避免 Shuffle,直接啟動 n 個 Task 進行 Join。
改進一:支援與 Hive 相容
在過去一段時間,位元組跳動把大量的 Hive 作業遷移到了 SparkSQL。而 Hive 與 Spark SQL 的 Bucket 表不相容。對於使用 Bucket 表的場景,如果直接更新計算引擎,會造成 Spark SQL 寫入 Hive Bucket 表的資料無法被下游的 Hive 作業當成 Bucket 表進行 Bucket Join,從而造成作業執行時間變長,可能影響 SLA。為瞭解決這個問題,我們讓 Spark SQL 支援 Hive 相容模式,從而保證 Spark SQL 寫入的 Bucket 表與 Hive 寫入的 Bucket 表效果一致,並且這種表可以被 Hive 和 Spark SQL 當成 Bucket 表進行 Bucket Join 而不需要 Shuffle。通過這種方式保證 Hive 向 Spark SQL 的透明遷移。
第一個需要解決的問題是,Hive 的一個 Bucket 一般只包含一個檔案,而 Spark SQL 的一個 Bucket 可能包含多個檔案。解決辦法是動態增加一次以 Bucket Key 為 Key 並且並行度與 Bucket 個數相同的 Shuffle。改進二:支援倍數關係Bucket Join
Spark SQL 要求只有 Bucket 相同的表才能(必要非充分條件)進行 Bucket Join。對於兩張大小相差很大的表,比如幾百 GB 的維度表與幾十 TB (單分割槽)的事實表,它們的 Bucket 個數往往不同,並且個數相差很多,預設無法進行 Bucket Join。因此我們通過兩種方式支援了倍數關係的 Bucket Join,即當兩張 Bucket 表的 Bucket 數是倍數關係時支援 Bucket Join。第一種方式,Task 個數與小表 Bucket 個數相同。如下圖所示,Table A 包含 3 個 Bucket,Table B 包含 6 個 Bucket。此時 Table B 的 bucket 0 與 bucket 3 的資料合集應該與 Table A 的 bucket 0 進行 Join。這種情況下,可以啟動 3 個 Task。其中 Task 0 對 Table A 的 bucket 0 與 Table B 的 bucket 0 + bucket 3 進行 Join。在這裡,需要對 Table B 的 bucket 0 與 bucket 3 的資料再做一次 merge sort 從而保證合集有序。
如果 Table A 與 Table B 的 Bucket 個數相差不大,可以使用上述方式。如果 Table B 的 Bucket 個數是 Bucket A Bucket 個數的 10 倍,那上述方式雖然避免了 Shuffle,但可能因為並行度不夠反而比包含 Shuffle 的 SortMergeJoin 速度慢。此時可以使用另外一種方式,即 Task 個數與大表 Bucket 個數相等,如下圖所示。
改進三:支援BucketJoin 降級
公司內部過去使用 Bucket 的表較少,在我們對 Bucket 做了一系列改進後,大量使用者希望將錶轉換為 Bucket 表。轉換後,表的元資訊顯示該表為 Bucket 表,而歷史分割槽內的資料並未按 Bucket 表要求分佈,在查詢歷史資料時會出現無法識別 Bucket 的問題。同時,由於資料量上漲快,平均 Bucket 大小也快速增長。這會造成單 Task 需要處理的資料量過大進而引起使用 Bucket 後的效果可能不如直接使用基於 Shuffle 的 Join。
為瞭解決上述問題,我們實現了支援降級的 Bucket 表。基本原理是,每次修改 Bucket 資訊(包含上述兩種情況——將非 Bucket 錶轉為 Bucket 表,以及修改 Bucket 個數)時,記錄修改日期。並且在決定使用哪種 Join 方式時,對於 Bucket 表先檢查所查詢的資料是否只包含該日期之後的分割槽。如果是,則當成 Bucket 表處理,支援 Bucket Join;否則當成普通無 Bucket 的表。改進四:支援超集
對於一張常用表,可能會與另外一張表按 User 欄位做 Join,也可能會與另外一張表按 User 和 App 欄位做 Join,與其它表按 User 與 Item 欄位進行 Join。而 Spark SQL 原生的 Bucket Join 要求 Join Key Set 與表的 Bucket Key Set 完全相同才能進行 Bucket Join。在該場景中,不同 Join 的 Key Set 不同,因此無法同時使用 Bucket Join。這極大的限制了 Bucket Join 的適用場景。 針對此問題,我們支援了超集場景下的 Bucket Join。只要 Join Key Set 包含了 Bucket Key Set,即可進行 Bucket Join。如下圖所示,Table X 與 Table Y,都按欄位 A 分 Bucket。而查詢需要對 Table X 與 Table Y 進行 Join,且 Join Key Set 為 A 與 B。此時,由於 A 相等的資料,在兩表中的 Bucket ID 相同,那 A 與 B 各自相等的資料在兩表中的 Bucket ID 肯定也相同,所以資料分佈是滿足 Join 要求的,不需要 Shuffle。同時,Bucket Join 還需要保證兩表按 Join Key Set 即 A 和 B 排序,此時只需要對 Table X 與 Table Y 進行分割槽內排序即可。由於兩邊已經按欄位 A 排序了,此時再按 A 與 B 排序,代價相對較低。
物化列
Spark SQL 處理巢狀型別資料時,存在以下問題:
- 讀取大量不必要的資料:對於 Parquet / ORC 等列式儲存格式,可只讀取需要的欄位,而直接跳過其它欄位,從而極大節省 IO。而對於巢狀資料型別的欄位,如下圖中的 Map 型別的 people 欄位,往往只需要讀取其中的子欄位,如 people.age。卻需要將整個 Map 型別的 people 欄位全部讀取出來然後抽取出 people.age 欄位。這會引入大量的無意義的 IO 開銷。在我們的場景中,存在不少 Map 型別的欄位,而且很多包含幾十至幾百個 Key,這也就意味著 IO 被放大了幾十至幾百倍。
- 無法進行向量化讀取:而向量化讀能極大的提升效能。但截止到目前(2019年10月26日),Spark 不支援包含巢狀資料型別的向量化讀取。這極大的影響了包含巢狀資料型別的查詢效能
不支援 Filter 下推:目前(2019年10月26日)的 Spark 不支援巢狀型別欄位上的 Filter 的下推
重複計算:JSON 欄位,在 Spark SQL 中以 String 型別存在,嚴格來說不算巢狀資料型別。不過實踐中也常用於儲存不固定的多個欄位,在查詢時通過 JSON Path 抽取目標子欄位,而大型 JSON 字串的欄位抽取非常消耗 CPU。對於熱點表,頻繁重複抽取相同子欄位非常浪費資源。
對於這個問題,做數倉的同學也想了一些解決方案。如下圖所示,在名為 base_table 的表之外建立了一張名為 sub_table 的表,並且將高頻使用的子欄位 people.age 設定為一個額外的 Integer 型別的欄位。下游不再通過 base_table 查詢 people.age,而是使用 sub_table 上的 age 欄位代替。通過這種方式,將巢狀型別欄位上的查詢轉為了 Primitive 型別欄位的查詢,同時解決了上述問題。
- 額外維護了一張表,引入了大量的額外儲存/計算開銷。
- 無法在新表上查詢新增欄位的歷史資料(如要支援對歷史資料的查詢,需要重跑歷史作業,開銷過大,無法接受)。
- 表的維護方需要在修改表結構後修改插入資料的作業。
- 需要下游查詢方修改查詢語句,推廣成本較大。
- 運營成本高:如果高頻子欄位變化,需要刪除不再需要的獨立子欄位,並新增新子欄位為獨立欄位。刪除前,需要確保下游無業務使用該欄位。而新增欄位需要通知並推進下游業務方使用新欄位。
- 新增一個 Primitive 型別欄位,比如 Integer 型別的 age 欄位,並且指定它是 people.age 的物化欄位。
- 插入資料時,為物化欄位自動生成資料,並在 Partition Parameter 內儲存物化關係。因此對插入資料的作業完全透明,表的維護方不需要修改已有作業。
- 查詢時,檢查所需查詢的所有 Partition,如果都包含物化資訊(people.age 到 age 的對映),直接將 select people.age 自動重寫為 select age,從而實現對下游查詢方的完全透明優化。同時相容歷史資料。
下圖展示了在某張核心表上使用物化列的收益:
在 OLAP 領域,經常會對相同表的某些固定欄位進行 Group By 和 Aggregate / Join 等耗時操作,造成大量重複性計算,浪費資源,且影響查詢效能,不利於提升使用者體驗。
我們實現了基於物化檢視的優化功能:如上圖所示,查詢歷史顯示大量查詢根據 user 進行 group by,然後對 num 進行 sum 或 count 計算。此時可建立一張物化檢視,且對 user 進行 gorup by,對 num 進行 avg(avg 會自動轉換為 count 和 sum)。使用者對原始表進行 select user,sum(num) 查詢時,Spark SQL 自動將查詢重寫為對物化檢視的 select user,sum_num 查詢。
Spark SQL 引擎上的其它優化
下圖展示了我們在 Spark SQL 上進行的其它部分優化工作:Spark Shuffle穩定性提升與效能優化
Spark Shuffle 存在的問題
Shuffle的原理,很多同學應該已經很熟悉了。鑑於時間關係,這裡不介紹過多細節,只簡單介紹下基本模型。如上圖所示,我們將 Shuffle 上游 Stage 稱為 Mapper Stage,其中的 Task 稱為 Mapper。Shuffle 下游 Stage 稱為 Reducer Stage,其中的 Task 稱為 Reducer。
每個 Mapper 會將自己的資料分為最多 N 個部分,N 為 Reducer 個數。每個 Reducer 需要去最多 M (Mapper 個數)個 Mapper 獲取屬於自己的那部分資料。
這個架構存在兩個問題:- 穩定性問題:Mapper 的 Shuffle Write 資料存於 Mapper 本地磁碟,只有一個副本。當該機器出現磁碟故障,或者 IO 滿載,CPU 滿載時,Reducer 無法讀取該資料,從而引起 FetchFailedException,進而導致 Stage Retry。Stage Retry 會造成作業執行時間增長,直接影響 SLA。同時,執行時間越長,出現 Shuffle 資料無法讀取的可能性越大,反過來又會造成更多 Stage Retry。如此迴圈,可能導致大型作業無法成功執行。
- 效能問題:每個 Mapper 的資料會被大量 Reducer 讀取,並且是隨機讀取不同部分。假設 Mapper 的 Shuffle 輸出為 512MB,Reducer 有 10 萬個,那平均每個 Reducer 讀取資料 512MB / 100000 = 5.24KB。並且,不同 Reducer 並行讀取資料。對於 Mapper 輸出檔案而言,存在大量的隨機讀取。而 HDD 的隨機 IO 效能遠低於順序 IO。最終的現象是,Reducer 讀取 Shuffle 資料非常慢,反映到 Metrics 上就是 Reducer Shuffle Read Blocked Time 較長,甚至佔整個 Reducer 執行時間的一大半,如下圖所示。
基於HDFS的Shuffle穩定性提升
經觀察,引起 Shuffle 失敗的最大因素不是磁碟故障等硬體問題,而是 CPU 滿載和磁碟 IO 滿載。如上圖所示,機器的 CPU 使用率接近 100%,使得 Mapper 側的 Node Manager 內的 Spark External Shuffle Service 無法及時提供 Shuffle 服務。
無論是何種原因,問題的癥結都是 Mapper 側的 Shuffle Write 資料只儲存在本地,一旦該節點出現問題,會造成該節點上所有 Shuffle Write 資料無法被 Reducer 讀取。解決這個問題的一個通用方法是,通過多副本保證可用性。
最初始的一個簡單方案是,Mapper 側最終資料檔案與索引檔案不寫在本地磁碟,而是直接寫到 HDFS。Reducer 不再通過 Mapper 側的 External Shuffle Service 讀取 Shuffle 資料,而是直接從 HDFS 上獲取資料,如下圖所示。
快速實現這個方案後,我們做了幾組簡單的測試。結果表明:
- Mapper 與 Reducer 不多時,Shuffle 讀寫效能與原始方案相比無差異。
- Mapper 與 Reducer 較多時,Shuffle 讀變得非常慢。
原因在於,總共 10000 Reducer,需要從 10000 個 Mapper 處讀取資料檔案和索引檔案,總共需要讀取 HDFS 10000 * 1000 * 2 = 2 億次。
如果只是 Name Node 的單點效能問題,還可以通過一些簡單的方法解決。例如在 Spark Driver 側儲存所有 Mapper 的 Block Location,然後 Driver 將該資訊廣播至所有 Executor,每個 Reducer 可以直接從 Executor 處獲取 Block Location,然後無須連線 Name Node,而是直接從 Data Node 讀取資料。但鑑於 Data Node 的執行緒模型,這種方案會對 Data Node 造成較大沖擊。
最後我們選擇了一種比較簡單可行的方案,如下圖所示。Mapper 的 Shuffle 輸出資料仍然按原方案寫本地磁碟,寫完後上傳到 HDFS。Reducer 仍然按原始方案通過 Mapper 側的 External Shuffle Service 讀取 Shuffle 資料。如果失敗了,則從 HDFS 讀取。這種方案極大減少了對 HDFS 的訪問頻率。
該方案上線近一年:
- 覆蓋 57% 以上的 Spark Shuffle 資料。
- 使得 Spark 作業整體效能提升 14%。
- 天級大作業效能提升 18%。
小時級作業效能提升 12%。
該方案旨在提升 Spark Shuffle 穩定性從而提升作業穩定性,但最終沒有使用方差等指標來衡量穩定性的提升。原因在於每天叢集負載不一樣,整體方差較大。Shuffle 穩定性提升後,Stage Retry 大幅減少,整體作業執行時間減少,也即效能提升。最終通過對比使用該方案前後的總的作業執行時間來對比效能的提升,用於衡量該方案的效果。
Shuffle效能優化實踐與探索如上文所分析,Shuffle 效能問題的原因在於,Shuffle Write 由 Mapper 完成,然後 Reducer 需要從所有 Mapper 處讀取資料。這種模型,我們稱之為以 Mapper 為中心的 Shuffle。它的問題在於:
- Mapper 側會有 M 次順序寫 IO。
- Mapper 側會有 M * N * 2 次隨機讀 IO(這是最大的效能瓶頸)。
- Mapper 側的 External Shuffle Service 必須與 Mapper 位於同一臺機器,無法做到有效的儲存計算分離,Shuffle 服務無法獨立擴充套件。
- 將 M * N * 2 次隨機 IO 變為 N 次順序 IO。
- Shuffle Service 可以獨立於 Mapper 或者 Reducer 部署,從而做到獨立擴充套件,做到儲存計算分離。
- Shuffle Service 可將資料直接存於 HDFS 等高可用儲存,因此可同時解決 Shuffle 穩定性問題。
QA集錦
- 提問:物化列新增一列,是否需要修改歷史資料?回答:歷史資料太多,不適合修改歷史資料。
- 提問:如果使用者的請求同時包含新資料和歷史資料,如何處理?回答:一般而言,使用者修改資料都是以 Partition 為單位。所以我們在 Partition Parameter 上儲存了物化列相關資訊。如果使用者的查詢同時包含了新 Partition 與歷史 Partition,我們會在新 Partition 上針對物化列進行 SQL Rewrite,歷史 Partition 不 Rewrite,然後將新老 Partition 進行 Union,從而在保證資料正確性的前提下儘可能充分利用物化列的優勢。
- 提問:你好,你們針對使用者的場景,做了很多挺有價值的優化。像物化列、物化檢視,都需要根據使用者的查詢 Pattern 進行設定。目前你們是人工分析這些查詢,還是有某種機制自動去分析並優化?回答:目前我們主要是通過一些審計資訊輔助人工分析。同時我們也正在做物化列與物化檢視的推薦服務,最終做到智慧建設物化列與物化檢視。
- 提問:剛剛介紹的基於 HDFS 的 Spark Shuffle 穩定性提升方案,是否可以非同步上傳 Shuffle 資料至 HDFS?回答:這個想法挺好,我們之前也考慮過,但基於幾點考慮,最終沒有這樣做。第一,單 Mapper 的 Shuffle 輸出資料量一般很小,上傳到 HDFS 耗時在 2 秒以內,這個時間開銷可以忽略;第二,我們廣泛使用 External Shuffle Service 和 Dynamic Allocation,Mapper 執行完成後可能 Executor 就回收了,如果要非同步上傳,就必須依賴其它元件,這會提升複雜度,ROI 較低。
更多精彩分享
上海沙龍回顧 | Apache Kylin 原理介紹與新架構分享(Kylin On Parquet)
上海沙龍回顧 | Redis 快取記憶體在大資料場景中的應用
位元組跳動技術沙龍
位元組跳動技術沙龍是由位元組跳動技術學院發起,位元組跳動技術學院、掘金技術社群聯合主辦的技術交流活動。位元組跳動技術沙龍邀請來自位元組跳動及業內網際網路公司的技術專家,分享熱門技術話題與一線實踐經驗,內容覆蓋架構、大資料、前端、測試、運維、演演算法、系統等技術領域。
位元組跳動技術沙龍旨在為技術領域人才提供一個開放、自由的交流學習平臺,幫助技術人學習成長,不斷進階。
歡迎關注「位元組跳動技術團隊」