1. 程式人生 > >Spark多資料來源計算實踐及其在GrowingIO的實踐

Spark多資料來源計算實踐及其在GrowingIO的實踐

本文作者:田毅,目前在資料分析服務公司GrowingIO資料平臺部門工作,Spark社群的Contributor,北京Spark Meetup組織者,2010年開始在電信領域實踐應用hadoop,2013年開始關注Spark,從Shark開始向社群貢獻程式碼。目前主要的研究方向是使用Spark搭建企業級的資料計算分析平臺。
責任編輯:仲浩([email protected]
本文為《程式設計師》原創文章,未經允許不得轉載,更多精彩文章請訂閱2016年《程式設計師》

本文主要介紹如何使用Apache Spark中的DataSource API以實現多個數據源混合計算的實踐,那麼這麼做的意義何在,其主要歸結於3個方面:

  • 首先,我們身邊存在大量的資料,結構化、非結構化,各種各樣的資料結構、格局格式,這種資料的多樣性本身即是大資料的特性之一,從而也決定了一種儲存方式不可能通吃所有。因此,資料本身決定了多種資料來源存在的必然性。
  • 其次:從業務需求來看,因為每天會開發各種各樣的應用系統,應用系統中所遇到的業務場景是互不相同的,各種各樣的需求決定了目前市面上不可能有一種軟體架構同時能夠解決這麼多種業務場景,所以在資料儲存包括資料查詢、計算這一塊也不可能只有一種技術就能解決所有問題。
  • 最後,從軟體的發展來看,現在市面上出現了越來越多面對某一個細分領域的軟體技術,比如像資料儲存、查詢搜尋引擎,MPP資料庫,以及各種各樣的查詢引擎。這麼多不同的軟體中,每一個軟體都相對擅長處理某一個領域的業務場景,只是涉及的領域大小不相同。因此,越來越多軟體的產生也決定了我們所接受的資料會儲存到越來越多不同的資料來源。

Apache Spark的多資料來源方案

傳統方案中,實現多資料來源通常有兩種方案:冗餘儲存,一份業務資料有多個儲存,或者內部互相引用;集中的計算,不同的資料使用不同儲存,但是會在統一的地方集中計算,算的時候把這些資料從不同位置讀取出來。下面一起討論這兩種解決方案中存在的問題:

圖片描述

圖1 多資料來源方案

第一種方案中存在的一個問題是資料一致性,一樣的資料放在不同的儲存裡面或多或少會有格式上的不相容,或者查詢的差異,從而導致從不同位置查詢的資料可能出現不一致。比如有兩個報表相同的指標,但是因為是放在不同儲存裡查出來的結果對不上,這點非常致命。第二個問題是儲存的成本,隨著儲存成本越來越低,這點倒是容易解決。

第二種方案也存在兩個問題,其一是不同儲存出來的資料型別不同,從而在計算時需求相互轉換,因此如何轉換至關重要。第二個問題是讀取效率,需要高效能的資料抽取機制,儘量避免從遠端讀取不必要的資料,並且需要保證一定的併發性。

Spark在1.2.0版本首次釋出了一個新的DataSourceAPI,這個API提供了非常靈活的方案,讓Spark可以通過一個標準的介面訪問各種外部資料來源,目標是讓Spark各個元件以非常方便的通過SparkSQL訪問外部資料來源。很顯然,Spark的DataSourceAPI其採用的是方案二,那麼它是如何解決其中那個的問題的呢?

圖片描述

圖2 External Datasource API

首先,資料型別轉換,Spark中定義了一個統一的資料型別標準,不同的資料來源自己定義資料型別的轉換方法,這樣解決資料來源之間相互型別轉換的問題;
關於資料處理效率的問題,Spark定義了一個比較簡單的API的介面,主要有3個方式:

1./* 全量資料抽取 */
2.trait TableScan {
3.def buildScan(): RDD[Row]
4.}
5.
6./* 列剪枝資料抽取 */
7.trait PrunedScan {
8.def buildScan(requiredColumns: Array[String]): RDD[Row]
9.}
10.
11./* 列剪枝+行過濾資料抽取 */
12.trait PrunedFilteredScan {
13.def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
14.}

TableScan。這種方式需要將1TB的資料從資料抽取,再把這些資料傳到Spark中。在把這1TB的資料穿過網路IO傳給Spark端之後,Spark還要逐行的進行過濾,從而消耗大量的計算資源,這是目前最低效的方式。

PrunedScan。這個方式有一個好處是資料來源只需要從磁碟讀取1TB的資料,並只返回一些列的資料,Spark不需要計算就可以使用1GB的資料,這個過程中節省了大量的網路IO。

PrunedFilteredScan。它需要資料來源既支援列過濾也支援行過濾,其好處是在磁碟IO這一層進行資料過濾,因此如果需要1GB資料,可能只抽出2GB大小,經過列過濾的規則再抽出1GB的資料,隨後傳給Spark,因此這種資料來源介面最高效,這也是目前市面上實現的最高效的資料介面。

可直接使用的DataSource實現

目前市面上可以找到的Spark DataSource實現程式碼有三大類:Spark自帶;Spark Packages(http://Spark-packages.org/)網站中存放的第三方軟體包;跟隨其他專案一同釋出的內建的Spark的實現。這裡介紹其中幾個:

1.JDBCRelation

1.private[sql] case class JDBCRelation(
2.url: String,
3.table: String,
4.parts: Array[Partition],
5.properties: Properties = new Properties())(@transient val sqlContext: SQLContext)
6.extends BaseRelation
7.with PrunedFilteredScan
8.with InsertableRelation {
9….
10.}

以JDBC方式連線外部資料來源在國內十分流行,Spark也內建了最高效的PrunedFilteredScan介面,同時還實現了資料插入的介面,使用起來非常方便,可以方便地把資料庫中的表用到Spark。以Postgres為例:

1.sqlContext.read.jdbc(
2.“jdbc:postgresql://testhost:7531/testdb”,
3.“testTable”,
4.“idField”, ——-索引列
5.10000, ——-起始index
6.1000000, ——-結束index
7.10, ——-partition數量
8.new Properties
9.).registerTempTable(“testTable”)

實現機制:預設使用單個Task從遠端資料庫讀取資料,如果設定了partitionColumn、lowerBound、upperBound、numPartitions這4個引數,那麼還可以控制Spark把針對這個資料來源的訪問任務進行拆分,得到numPartitions個任務,每個Executor收到任務之後會併發的去連線資料庫的Server讀取資料。

具體型別:PostgreSQL, MySQL。

問題:在實際使用中需要注意一個問題,所有的Spark都會併發連線一個Server,併發過高時可能會對資料庫造成較大的衝擊(對於MPP等新型的關係型資料庫還好)。

建議:個人感覺,JDBC的資料來源適合從MPP等分散式資料庫中讀取資料,對於傳統意義上單機的資料庫建議只處理一些相對較小的資料。

2.HadoopFsRelation

第二個在Spark內建的資料來源實現,HadoopFs,也是實現中最高效的PrunedFilteredScan介面,使用起來相對來說比JDBC更方便。

1.sqlContext
2..read
3..parquet(“hdfs://testFS/testPath”)
4..registerTempTable(“test”)

實現機制:執行的時候Spark在Driver端會直接獲取列表,根據檔案的格式型別和壓縮方式生成多個TASK,再把這些TASK分配下去。Executor端會根據檔案列表訪問,這種方式訪問HDFS不會出現IO集中的地方,所以具備很好的擴充套件性,可以處理相當大規模的資料。

具體型別:ORC,Parquet,JSon。

問題:在實時場景下如果使用HDFS作為資料輸出的資料來源,在寫資料就會產生非常大量零散的資料,在HDFS上積累大量的零碎檔案,就會帶來很大的壓力,後續處理這些小檔案的時候也非常頭疼。

建議:這種方式適合離線資料處理程式輸入和輸出資料,還有一些資料處理Pipeline中的臨時資料,資料量比較大,可以臨時放在HDFS。實時場景下不推薦使用HDFS作為資料輸出。

3.ElasticSearch

越來越多的網際網路公司開始使用ELK(ElasticSearch+LogStash+Kibana)作為基礎資料分析查詢的工具,但是有多少人知道其實ElasticSearch也支援在Spark中掛載為一個DataSource進行查詢呢?

1.EsSparkSQL
2..esDF(hc,indexName,esQuery)
3..registerTempTable(”testTable”)

實現機制:ES DataSource的實現機制是通過對esQuery進行解析,將實際要發往多個ES Nodes的請求分為多個Task,在每個Executor上並行執行。

圖片描述

圖3 ElasticSearch架構

問題:原生程式使用HTTP方式進行資料載入,吞吐量很低,需要修改為Transport方式。

建議:儲存doc資料,隨機資料搜尋場景使用,做其他資料來源的Index。
Apache Phoenix(https://github.com/apache/phoenix
Phoenix提供了一個通過SQL方式訪問HBase資料的途徑,在不瞭解HBase實現細節的情況下很方便讀寫資料。其屬於阿帕奇官方釋出,位於Apache Phoenix專案裡面的子模組。

實現機制:Phoenix的實現機制是通過對SQL解析,將執行計劃中並行的部分轉換為多個Task在Executor上執行。

1.sqlContext
2..read
3..format(“org.apache.phoenix.Spark”)
4..options(Map(“table” -> table, “zkUrl” -> zookeeperUrl))
5..load.registerTempTable(“testTable”)

實現機制:ES DataSource的實現機制是通過對esQuery進行解析,將實際要發往多個ES Nodes的請求分為多個Task,在每個Executor上並行執行。

問題:需要對Phoenix表模型非常瞭解,需要使用Rowkey欄位進行查詢。
建議:實時處理輸出的資料,如果後面要進行資料查詢,也可以把這個資料直接插入到Apache Phoenix,這樣後面查詢的資料可以及時得到更新結果。

Phoenix提供了一個通過SQL方式訪問HBase資料的途徑,在不瞭解HBase實現細節的情況下很方便讀寫資料。其屬於阿帕奇官方釋出,位於Apache Phoenix專案裡面的子模組。

實現機制:Phoenix的實現機制是通過對SQL解析,將執行計劃中並行的部分轉換為多個Task在Executor上執行。

1.sqlContext
2..read
3..format(“org.apache.phoenix.Spark”)
4..options(Map(“table” -> table, “zkUrl” -> zookeeperUrl))
5..load.registerTempTable(“testTable”)
程式碼6
問題:需要對Phoenix表模型非常瞭解,需要使用Rowkey欄位進行查詢。
建議:實時處理輸出的資料,如果後面要進行資料查詢,也可以把這個資料直接插入到Apache Phoenix,這樣後面查詢的資料可以及時得到更新結果。

5.其他

GrowingIO實踐

GrowingIO的資料平臺主要分為兩部分應用。首先是實時應用,負責實時處理流入資料的ETL,將清洗後的資料錄入HBase與ES;然後是離線應用,負責定時執行離線模型計算,將實時資料的結果進行彙總分析。上層搭建了自己的QueryService,負責根據前端需要快速查詢返回需要的統計資料。

實時計算部分最主要的功能是資料的ETL,當實時資料從Kafka消費後,會利用Spark提供的JSon DataSource將資料轉化為一個Table,再通過JDBC將配置資料引入,通過Phoenix和ES的DataSource將最終儲存的目標位置對映成為多個Table,最終通過SparkSQL的load操作插入目標資料來源。

離線部分的主要工作是二次彙總分析,這裡將模型計算所需的內容從各個資料來源掛載到Spark,然後寫很多複雜的SQL進行計算,再將結果儲存到HBase供QueryService使用。

圖片描述

圖4 整體架構

圖片描述

圖5 實時計算架

圖片描述

圖6 離線計算架構

外部資料來源使用中遇到的問題和解決途徑

問題:Elastic Search查詢資料時,當Mapping資料的列大於Source中列時報Index Out of Bound Exception。
解決:修改RowValueReader的addToBuffer方法。
問題:Elastic Search資料載入預設通過HTTP的介面載入資料,效能極差。
解決:修改為Transport方式載入使得效能提升2-3倍。
問題:Elastic Search效能優化。
解決:需要詳細設計Index, 儘量減少每次查詢的資料量。
問題:Phoenix4.4與Spark 1.5相容性。
解決:Spark 1.5修改的DecimalType型別適配,GenericMutableRow 修改為 InternalRow。
問題:PHOENIX-2279 Limit與Union All相關的BUG。
解決:修改Phoenix程式碼。
問題與解決:Phoenix打包過程中解決與Hadoop版本相容性
問題與解決:Region Split導致快取中的Region資訊失效(暫時無解)。
問題:由於YARN資源控制導致Excutor端報錯:Phoenix JDBC Driver has been closed。
解決:配置額外的記憶體避免Executor被YARN殺掉。
問題:PhoenixRDD讀取資料時Partition數量過少導致讀取速度慢。
解決:通過Phoenix的BucketTable增加查詢的並行度(建議控制Bucket的數量,避免Table自動split,BucketTable在split後有BUG)。

圖片描述

圖7 JobServer架構

其他問題——Spark Job Server

由於大量使用了SparkSQL和DataSource,所以面臨到的一個新問題就是如何更加有效地去提升Spark平臺的資源利用率。 社群中提供了一個開源的Spark Job Server實現(https://github.com/Spark-jobserver/Spark-jobserver) ,相較之下,覺得這個實現對於GrowingIO來說有些複雜,於是自己設計實現了一個簡化版的JobServer 。

簡化版的JobServer允許使用者通過一個SDK提交自己的Job(需要提前部署對應的jar包到JobServer);JobServer使用FairScheduler平均分配每個Job使用的資源;允許設定最大的Job併發數量;允許在Job中設定優先級別。

總結

總體來說,GrowingIO通過使用SparkSQL加DataSourceAPI的方法在很短時間內搭建起一套完整的資料處理平臺,並且擴充套件性很好。對於大多數中小企業來說,是一個便捷有效的途徑。 由於SQL的易學,對於團隊中的新人上手也是比較容易。

但是大量使用SQL也帶來一些問題,比如:所有資料模型的管理,例如所有欄位的長度,型別的統一;需要根據效能監控對各個資料來源進行不斷的優化(例如ES的Index,HBase的Rowkey)。

相關推薦

Spark資料來源計算實踐及其GrowingIO實踐

本文作者:田毅,目前在資料分析服務公司GrowingIO資料平臺部門工作,Spark社群的Contributor,北京Spark Meetup組織者,2010年開始在電信領域實踐應用hadoop,2013年開始關注Spark,從Shark開始向社群貢獻程式

Spark介紹及Spark資料來源分析

本期分享專家:沐遠 —阿里多模型資料庫專家 本期分享主題:Spark介紹及Spark多資料來源分析 視訊地址:https://yunqivedio.alicdn.com/od/Fni7p1542851946894.mp4 PPT地址:https://yq.aliyun.com/download/31

SpringBoot+AOP構建資料來源的切換實踐

針對微服務架構中常用的設計模組,通常我們都會需要使用到druid作為我們的資料連線池,當架構發生擴充套件的時候 ,通常面對的資料儲存伺服器也會漸漸增加,從原本的單庫架構逐漸擴充套件為複雜的多庫架構。 當在業務層需要涉及到查詢多種同資料庫的場景下,我們通常需要在執行sql的時候動態指定對應的datasource

Spark Streaming調優引數及最佳實踐深入剖析-Spark商業調優實戰

本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何商業交流,可隨時聯絡。

攜程實時計算平臺架構與實踐丨DataPipeline

文 | 潘國慶 攜程大資料平臺實時計算平臺負責人 本文主要從攜程大資料平臺概況、架構設計及實現、在實現當中踩坑及填坑的過程、實時計算領域詳細的應用場景,以及未來規劃五個方面闡述攜程實時計算平臺架構與實踐,希望對需要構建實時資料平臺的公司和同學有所借鑑。 一、攜程大資料平臺之總體架構 攜程

郝振亞:工業網際網路領域邊緣計算與區塊鏈實踐

隨著智慧製造與工業網際網路的不斷髮展,工業網際網路領域與其它新興技術體系是否可以挖掘出更多的有效結合方式,區塊鏈在實體經濟建設過程中如何體現其特有價值,區塊鏈的BaaS服務形態,怎樣才能更靈活的服務多種專案需求,上海億喆網路公司試圖通過工業網際網路領域的實際案例瞭解,為以上問題找到一些有價值的思考方向和解答

Docker與自動化測試及其測試實踐

Docker 與自動化測試 對於重複枯燥的手動測試任務,可以考慮將其進行自動化改造。自動化的成本在於自動化程式的編寫和維護,而收益在於節省了手動執行用例的時間。簡而言之,如果收益大於成本,測試任務就有價值自動化,否則受益的只是測試人員的自動化技能得到了提升。利用 Docker 的快速部署、環境共享等特性,

遺傳演算法原理簡介及其MATLAB實踐

目錄 遺傳演算法簡介 遺傳演算法(Genetic Algorithm,GA)是一種進化演算法,其基本原理是仿效生物界中的“物競天擇、適者生存”的演化法則,它最初由美國Michigan大學的J. Holland教授於1967年提出。 遺傳演算法

《雲端計算架構技術與實踐 第2版》下載

2018年11月01日 12:52:33 qq_43553691 閱讀數:9 標籤: 程式設計 資料

MDC介紹 -- 一種執行緒下日誌管理實踐方式

一:MDC介紹   MDC(Mapped Diagnostic Context,對映除錯上下文)是 log4j 和 logback 提供的一種方便在多執行緒條件下記錄日誌的功能。某些應用程式採用多執行緒的方式來處理多個使用者的請求。在一個使用者的使用過程中,可能有多個不

深度學習在目標檢測中的應用及其tensorflowAPI實踐(二)

這系列文章的內容目錄如下: 目標檢測的任務 深度學習在目標檢測中的應用 RCNN fast RCNN faster RCNN RFCN yolo yolo V2 SSD tensorflow目標檢測API的使用 在第一篇裡說完了RCNN和fast RC

資料視覺化領域的6個著名實踐及其原始碼

1 計算宇宙的年齡【主要效果】他的計算結果與宇宙的接受年齡相比只有-0.187%。【資料來源】H

《雲端計算架構技術與實踐》連載23:2.4.8 電信NFV雲

版權所有,未經華為許可,請勿轉載或轉發   基於雲端計算總體架構下的電信NFV雲解決方案如圖2-36所示。 圖2-36 電信NFV雲解決方案架構子系統組合 NFV(Network Function Virsualization網路功能虛擬化)旨在通過研究標準IT虛擬化技術,使得電信網路裝置的功

Spark SQL下的Parquet使用最佳實踐和程式碼實戰

一、Spark SQL下的Parquet使用最佳實踐 1)過去整個業界對大資料的分析的技術棧的Pipeline一般分為以下兩種方式: a)Data Source -> HDFS -> MR/Hive/Spark(相當於ETL)-> HDFS Par

深度學習在目標檢測中的應用及其tensorflowAPI實踐(一)

近些年深度學習在影象領域大放光彩,這篇文章先對目標檢測領域深度學習的發展做一個總結,再結合一個例子對tensorflow model zoo中的目標檢測API使用做一個說明。 本文內容如下(會分幾次發出來): 目標檢測的任務 深度學習在目標檢測中的應用

SpringMVC接受個同類型物件最佳實踐

1.複雜的javabean (1)javaBean public class Spitter { private long id; @NotNull @Size(min

Sloth:網易流計算服務化平臺架構實踐

網易經歷了多年的發展,眾多業務線沉澱了豐富的資料。大資料平臺除了滿足各業務線的資料儲存、計算需求,還負責集團資料整合,提供全方位的大資料服務。 本文PPT重點分享網易大資料基於Flink研發的流計算服務化平臺Sloth,以SQL為主要開發方式,支援DDL,支援豐富的DML,如Join、Window,支援UD

Java執行緒程式設計學習與實踐

怎麼樣才算得上熟悉多執行緒程式設計?第一,明白程序和執行緒的基本概念第二,明白保護執行緒安全的基本方法有哪些第三,明白這些執行緒安全的方法,包括互斥鎖,自旋鎖,無鎖程式設計的適用的業務場景是什麼?從OS和硬體角度說說原理是怎麼樣的?開銷在哪裡?第四,能在現場藉助cas操作,風

G7在實時計算的探索與實踐

業務開發 調度 save 事故 code stat 並行度 耗時 調試 作者: 張皓 G7業務快覽 G7主要通過在貨車上的傳感器感知車輛的軌跡、油耗、點熄火、載重、溫度等數據,將車輛、司機、車隊、貨主連接到一起,優化貨物運輸的時效、安全、成本等痛點問題。 整個數據是通過車載

【朝花夕拾】Android自定義View篇之(九)點觸控(下)實踐出真知

前言        在上一篇文章中,已經總結了MotionEvent以及多點觸控相關的基礎理論知識和常用的函式。本篇將通過實現單指拖動圖片,多指拖動圖片的實際案例來進行練習並實現一些效果,來理解前面的理論知識。要理解本文的程式碼,需要先掌握上一篇的理論知識,事件處理基