1. 程式人生 > >第九篇:Spark SQL 源碼分析之 In-Memory Columnar Storage源碼分析之 cache table

第九篇:Spark SQL 源碼分析之 In-Memory Columnar Storage源碼分析之 cache table

gravity base field eof 授權 葉子節點 command ref gist

/** Spark SQL源碼分析系列文章*/

Spark SQL 可以將數據緩存到內存中,我們可以見到的通過調用cache table tableName即可將一張表緩存到內存中,來極大的提高查詢效率。

這就涉及到內存中的數據的存儲形式,我們知道基於關系型的數據可以存儲為基於行存儲結構 或 者基於列存儲結構,或者基於行和列的混合存儲,即Row Based Storage、Column Based Storage、 PAX Storage。

Spark SQL 的內存數據是如何組織的?

Spark SQL 將數據加載到內存是以列的存儲結構。稱為In-Memory Columnar Storage。

若直接存儲Java Object 會產生很大的內存開銷,並且這樣是基於Row的存儲結構。查詢某些列速度略慢,雖然數據以及載入內存,查詢效率還是低於面向列的存儲結構。

基於Row的Java Object存儲:

內存開銷大,且容易FULL GC,按列查詢比較慢。

技術分享

基於Column的ByteBuffer存儲(Spark SQL):

內存開銷小,按列查詢速度較快。

技術分享

Spark SQL的In-Memory Columnar Storage是位於spark列下面org.apache.spark.sql.columnar包內:

核心的類有 ColumnBuilder, InMemoryColumnarTableScan, ColumnAccessor, ColumnType.

如果列有壓縮的情況:compression包下面有具體的build列和access列的類。

技術分享

一、引子

當我們調用spark sql 裏的cache table command時,會生成一CacheCommand,這個Command是一個物理計劃。

[java] view plain copy
  1. scala> val cached = sql("cache table src")
[java] view plain copy
  1. cached: org.apache.spark.sql.SchemaRDD =
  2. SchemaRDD[0] at RDD at SchemaRDD.scala:103
  3. == Query Plan ==
  4. == Physical Plan ==
  5. CacheCommand src, true

這裏打印出來tableName是src, 和一個是否要cache的boolean flag.

我們看下CacheCommand的構造:

CacheCommand支持2種操作,一種是把數據源加載帶內存中,一種是將數據源從內存中卸載。

對應於SQLContext下的cacheTable和uncacheTabele。

[java] view plain copy
  1. case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext)
  2. extends LeafNode with Command {
  3. override protected[sql] lazy val sideEffectResult = {
  4. if (doCache) {
  5. context.cacheTable(tableName) //緩存表到內存
  6. } else {
  7. context.uncacheTable(tableName)//從內存中移除該表的數據
  8. }
  9. Seq.empty[Any]
  10. }
  11. override def execute(): RDD[Row] = {
  12. sideEffectResult
  13. context.emptyResult
  14. }
  15. override def output: Seq[Attribute] = Seq.empty
  16. }

如果調用cached.collect(),則會根據Command命令來執行cache或者uncache操作,這裏我們執行cache操作。

cached.collect()將會調用SQLContext下的cacheTable函數:

首先通過catalog查詢關系,構造一個SchemaRDD。

[java] view plain copy
  1. /** Returns the specified table as a SchemaRDD */
  2. def table(tableName: String): SchemaRDD =
  3. new SchemaRDD(this, catalog.lookupRelation(None, tableName))


找到該Schema的analyzed計劃。匹配構造InMemoryRelation:

[java] view plain copy
  1. /** Caches the specified table in-memory. */
  2. def cacheTable(tableName: String): Unit = {
  3. val currentTable = table(tableName).queryExecution.analyzed //構造schemaRDD並將其執行analyze計劃操作
  4. val asInMemoryRelation = currentTable match {
  5. case _: InMemoryRelation => //如果已經是InMemoryRelation,則返回
  6. currentTable.logicalPlan
  7. case _ => //如果不是(默認剛剛cache的時候是空的)則構建一個內存關系InMemoryRelation
  8. InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)
  9. }
  10. //將構建好的InMemoryRelation註冊到catalog裏。
  11. catalog.registerTable(None, tableName, asInMemoryRelation)
  12. }

二、InMemoryRelation

InMemoryRelation繼承自LogicalPlan,是Spark1.1 Spark SQL裏新添加的一種TreeNode,也是catalyst裏的一種plan. 現在TreeNode變成了4種:

1、BinaryNode 二元節點

2、LeafNode 葉子節點

3、UnaryNode 單孩子節點

4、InMemoryRelation 內存關系型節點

技術分享

類圖如下:

值得註意的是,_cachedColumnBuffers這個類型為RDD[Array[ByteBuffer]]的私有字段。

這個封裝就是面向列的存儲ByteBuffer。前面提到相較於plain java object存儲記錄,用ByteBuffer能顯著的提高存儲效率,減少內存占用。並且按列查詢的速度會非常快。

技術分享

InMemoryRelation具體實現如下:

構造一個InMemoryRelation需要該Relation的output Attributes,是否需要useCoompression來壓縮,默認為false,一次處理的多少行數據batchSize, child 即SparkPlan。

[java] view plain copy
  1. private[sql] case class InMemoryRelation(
  2. output: Seq[Attribute], //輸出屬性,比如src表裏就是[key,value]
  3. useCompression: Boolean, //操作時是否使用壓縮,默認false
  4. batchSize: Int, //批的大小量
  5. child: SparkPlan) //spark plan 具體child


可以通過設置:

spark.sql.inMemoryColumnarStorage.compressed 為true來設置內存中的列存儲是否需要壓縮。

spark.sql.inMemoryColumnarStorage.batchSize 來設置一次處理多少row

spark.sql.defaultSizeInBytes 來設置初始化的column的bufferbytes的默認大小,這裏只是其中一個參數。

這些參數都可以在源碼中設置,都在SQL Conf

[java] view plain copy
  1. private[spark] object SQLConf {
  2. val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
  3. val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
  4. val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"

再回到case class InMemoryRelation:

_cachedColumnBuffers就是我們最終將table放入內存的存儲句柄,是一個RDD[Array[ByteBuffer]。

緩存主流程:

1、判斷_cachedColumnBuffers是否為null,如果不是null,則已經Cache了當前table,重復cache不會觸發cache操作。

2、child是SparkPlan,即執行hive table scan,測試我拿sbt/sbt hive/console裏test裏的src table為例,操作是掃描這張表。這個表有2個字的key是int, value 是string

3、拿到child的output, 這裏的output就是 key, value2個列。

4、執行mapPartitions操作,對當前RDD的每個分區的數據進行操作。

5、對於每一個分區,叠代裏面的數據生成新的Iterator。每個Iterator裏面是Array[ByteBuffer]

6、對於child.output的每一列,都會生成一個ColumnBuilder,最後組合為一個columnBuilders是一個數組。

7、數組內每個CommandBuilder持有一個ByteBuffer

8、遍歷原始分區的記錄,將對於的行轉為列,並將數據存到ByteBuffer內。

9、最後將此RDD調用cache方法,將RDD緩存。

10、將cached賦給_cachedColumnBuffers。

此操作總結下來是:執行hive table scan操作,返回的MapPartitionsRDD對其重新定義mapPartition方法,將其行轉列,並且最終cache到內存中。

所有流程如下:

[java] view plain copy
  1. // If the cached column buffers were not passed in, we calculate them in the constructor.
  2. // As in Spark, the actual work of caching is lazy.
  3. if (_cachedColumnBuffers == null) { //判斷是否已經cache了當前table
  4. val output = child.output
  5. /**
  6. * child.output
  7. res65: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(key#6, value#7)
  8. */
  9. val cached = child.execute().mapPartitions { baseIterator =>
  10. /**
  11. * child.execute()是Row的集合,叠代Row
  12. * res66: Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([238,val_238])
  13. *
  14. * val row1 = child.execute().take(1)
  15. * res67: Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([238,val_238])
  16. * */
  17. /*
  18. * 對每個Partition進行map,映射生成一個Iterator[Array[ByteBuffer],對應java的Iterator<List<ByteBuffer>>
  19. * */
  20. new Iterator[Array[ByteBuffer]] {
  21. def next() = {
  22. //遍歷每一列,首先attribute是key 為 IntegerType ,然後attribute是value是String
  23. //最後封裝成一個Array, index 0 是 IntColumnBuilder, 1 是StringColumnBuilder
  24. val columnBuilders = output.map { attribute =>
  25. val columnType = ColumnType(attribute.dataType)
  26. val initialBufferSize = columnType.defaultSize * batchSize
  27. ColumnBuilder(columnType.typeId, initialBufferSize, attribute.name, useCompression)
  28. }.toArray
  29. //src表裏Row是[238,val_238] 這行Row的length就是2
  30. var row: Row = null
  31. var rowCount = 0
  32. //batchSize默認1000
  33. while (baseIterator.hasNext && rowCount < batchSize) {
  34. //遍歷每一條記錄
  35. row = baseIterator.next()
  36. var i = 0
  37. //這裏row length是2,i的取值是0 和 1
  38. while (i < row.length) {
  39. //獲取columnBuilders, 0是IntColumnBuilder,
  40. //BasicColumnBuilder的appendFrom
  41. //Appends `row(ordinal)` to the column builder.
  42. columnBuilders(i).appendFrom(row, i)
  43. i += 1
  44. }
  45. //該行已經插入完畢
  46. rowCount += 1
  47. }
  48. //limit and rewind,Returns the final columnar byte buffer.
  49. columnBuilders.map(_.build())
  50. }
  51. def hasNext = baseIterator.hasNext
  52. }
  53. }.cache()
  54. cached.setName(child.toString)
  55. _cachedColumnBuffers = cached
  56. }

三、Columnar Storage

初始化ColumnBuilders:

[java] view plain copy
  1. val columnBuilders = output.map { attribute =>
  2. val columnType = ColumnType(attribute.dataType)
  3. val initialBufferSize = columnType.defaultSize * batchSize
  4. ColumnBuilder(columnType.typeId, initialBufferSize, attribute.name, useCompression)
  5. }.toArray

這裏會聲明一個數組,來對應每一列的存儲,如下圖:

技術分享

然後初始化類型builder的時候會傳入的參數:

initialBufferSize:文章開頭的圖中會有ByteBuffer,ByteBuffer的初始化大小是如何計算的?

initialBufferSize = 列類型默認長度 × batchSize ,默認batchSize是1000

拿Int類型舉例,initialBufferSize of IntegerType = 4 * 1000

attribute.name即字段名age,name etc。。。

ColumnType:

ColumnType封裝了 該類型的 typeId 和 該類型的 defaultSize。並且提供了extract、append\getField方法,來向buffer裏追加和獲取數據。

如IntegerType typeId 為0, defaultSize 4 ......

詳細看下類圖,畫的不是非常嚴格的類圖,主要為了展示目前類型系統:

技術分享

ColumnBuilder:

ColumnBuilder的主要職責是:管理ByteBuffer,包括初始化buffer,添加數據到buffer內,檢查剩余空間,和申請新的空間這幾項主要職責。

initialize負責初始化buffer。

appendFrom是負責添加數據。

ensureFreeSpace確保buffer的長度動態增加。

類圖如下:

技術分享

ByteBuffer的初始化過程:

初始化大小initialSize:拿Int舉例,在前面builder初始化傳入的是4×batchSize=4*1000,initialSize也就是4KB,如果沒有傳入initialSize,則默認是1024×1024。

列名稱,是否需要壓縮,都是需要傳入的。

ByteBuffer聲明時預留了4個字節,為了放column type id,這個在ColumnType的構造裏有介紹過。

[java] view plain copy
  1. override def initialize(
  2. initialSize: Int,
  3. columnName: String = "",
  4. useCompression: Boolean = false) = {
  5. val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize //如果沒有默認1024×1024 byte
  6. this.columnName = columnName
  7. // Reserves 4 bytes for column type ID
  8. buffer = ByteBuffer.allocate(4 + size * columnType.defaultSize) // buffer的初始化長度,需要加上4byte類型ID空間。
  9. buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)//根據nativeOrder排序,然後首先放入typeId
  10. }

存儲的方式如下:

Int的type id 是0, string的 type id 是 7. 後面就是實際存儲的數據了。

技術分享

ByteBuffer寫入過程:

存儲結構都介紹完畢,最後開始對Table進行scan了,scan後對每一個分區的每個Row進行操作遍歷:

1、讀每個分區的每條Row

2、獲取每個列的值,從builders數組裏找到索引 i 對應的bytebuffer,追加至bytebuffer。

[java] view plain copy
  1. while (baseIterator.hasNext && rowCount < batchSize) {
  2. //遍歷每一條記錄
  3. row = baseIterator.next()
  4. var i = 0
  5. //這裏row length是2,i的取值是0 和 1 Ps:還是拿src table做測試,每一個Row只有2個字段,key, value所有長度為2
  6. while (i < row.length) {
  7. //獲取columnBuilders, 0是IntColumnBuilder,
  8. //BasicColumnBuilder的appendFrom
  9. //Appends `row(ordinal)` to the column builder.
  10. columnBuilders(i).appendFrom(row, i) //追加到對應的bytebuffer
  11. i += 1
  12. }
  13. //該行已經插入完畢
  14. rowCount += 1
  15. }
  16. //limit and rewind,Returns the final columnar byte buffer.
  17. columnBuilders.map(_.build())


追加過程:

根據當前builder的類型,從row的對應索引中取出值,最後追加到builder的bytebuffer內。

[java] view plain copy
  1. override def appendFrom(row: Row, ordinal: Int) {
  2. //ordinal是Row的index,0就是第一列值,1就是第二列值,獲取列的值為field
  3. //最後在將該列的值put到該buffer內
  4. val field = columnType.getField(row, ordinal)
  5. buffer = ensureFreeSpace(buffer, columnType.actualSize(field))//動態擴容
  6. columnType.append(field, buffer)
  7. }


ensureFreeSpace:

主要是操作buffer,如果要追加的數據大於剩余空間,就擴大buffer。

[java] view plain copy
  1. //確保剩余空間能容下,如果剩余空間小於 要放入的大小,則重新分配一看內存空間
  2. private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
  3. if (orig.remaining >= size) { //當前buffer剩余空間比要追加的數據大,則什麽都不做,返回自身
  4. orig
  5. } else { //否則擴容
  6. // grow in steps of initial size
  7. val capacity = orig.capacity()
  8. val newSize = capacity + size.max(capacity / 8 + 1)
  9. val pos = orig.position()
  10. orig.clear()
  11. ByteBuffer
  12. .allocate(newSize)
  13. .order(ByteOrder.nativeOrder())
  14. .put(orig.array(), 0, pos)
  15. }
  16. }


......

最後調用MapPartitionsRDD.cache(),將該RDD緩存並添加到spark cache管理中。

至此,我們將一張spark sql table緩存到了spark的jvm中。

四、總結

對於數據的存儲結構,我們常常關註持久化的存儲結構,並且在長久時間內有了很多種高效結構。

但是在實時性的要求下,內存數據庫越來越被關註,如何優化內存數據庫的存儲結構,是一個重點,也是一個難點。

對於Spark SQL 和 Shark 裏的列存儲 是一種優化方案,提高了關系查詢中列查詢的速度,和減少了內存占用。但是中存儲方式還是比較簡單的,沒有額外的元數據和索引來提高查詢效率,希望以後能了解到更多的In-Memory Storage。

——EOF——

創文章,轉載請註明:

轉載自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文鏈接地址:http://blog.csdn.net/oopsoom/article/details/39525483

註:本文基於署名-非商業性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協議,歡迎轉載、轉發和評論,但是請保留本文作者署名和文章鏈接。如若需要用於商業目的或者與授權方面的協商,請聯系我。

技術分享

轉自:http://blog.csdn.net/oopsoom/article/details/39525483

第九篇:Spark SQL 源碼分析之 In-Memory Columnar Storage源碼分析之 cache table