第九篇:Spark SQL 源碼分析之 In-Memory Columnar Storage源碼分析之 cache table
/** Spark SQL源碼分析系列文章*/
Spark SQL 可以將數據緩存到內存中,我們可以見到的通過調用cache table tableName即可將一張表緩存到內存中,來極大的提高查詢效率。
這就涉及到內存中的數據的存儲形式,我們知道基於關系型的數據可以存儲為基於行存儲結構 或 者基於列存儲結構,或者基於行和列的混合存儲,即Row Based Storage、Column Based Storage、 PAX Storage。
Spark SQL 的內存數據是如何組織的?
Spark SQL 將數據加載到內存是以列的存儲結構。稱為In-Memory Columnar Storage。
若直接存儲Java Object 會產生很大的內存開銷,並且這樣是基於Row的存儲結構。查詢某些列速度略慢,雖然數據以及載入內存,查詢效率還是低於面向列的存儲結構。
基於Row的Java Object存儲:
內存開銷大,且容易FULL GC,按列查詢比較慢。
基於Column的ByteBuffer存儲(Spark SQL):
內存開銷小,按列查詢速度較快。
Spark SQL的In-Memory Columnar Storage是位於spark列下面org.apache.spark.sql.columnar包內:
核心的類有 ColumnBuilder, InMemoryColumnarTableScan, ColumnAccessor, ColumnType.
如果列有壓縮的情況:compression包下面有具體的build列和access列的類。
一、引子
當我們調用spark sql 裏的cache table command時,會生成一CacheCommand,這個Command是一個物理計劃。
[java] view plain copy
- scala> val cached = sql("cache table src")
- cached: org.apache.spark.sql.SchemaRDD =
- SchemaRDD[0] at RDD at SchemaRDD.scala:103
- == Query Plan ==
- == Physical Plan ==
- CacheCommand src, true
這裏打印出來tableName是src, 和一個是否要cache的boolean flag.
我們看下CacheCommand的構造:
CacheCommand支持2種操作,一種是把數據源加載帶內存中,一種是將數據源從內存中卸載。
對應於SQLContext下的cacheTable和uncacheTabele。
[java] view plain copy
- case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext)
- extends LeafNode with Command {
- override protected[sql] lazy val sideEffectResult = {
- if (doCache) {
- context.cacheTable(tableName) //緩存表到內存
- } else {
- context.uncacheTable(tableName)//從內存中移除該表的數據
- }
- Seq.empty[Any]
- }
- override def execute(): RDD[Row] = {
- sideEffectResult
- context.emptyResult
- }
- override def output: Seq[Attribute] = Seq.empty
- }
如果調用cached.collect(),則會根據Command命令來執行cache或者uncache操作,這裏我們執行cache操作。
cached.collect()將會調用SQLContext下的cacheTable函數:
首先通過catalog查詢關系,構造一個SchemaRDD。
[java] view plain copy- /** Returns the specified table as a SchemaRDD */
- def table(tableName: String): SchemaRDD =
- new SchemaRDD(this, catalog.lookupRelation(None, tableName))
找到該Schema的analyzed計劃。匹配構造InMemoryRelation:
[java] view plain copy
- /** Caches the specified table in-memory. */
- def cacheTable(tableName: String): Unit = {
- val currentTable = table(tableName).queryExecution.analyzed //構造schemaRDD並將其執行analyze計劃操作
- val asInMemoryRelation = currentTable match {
- case _: InMemoryRelation => //如果已經是InMemoryRelation,則返回
- currentTable.logicalPlan
- case _ => //如果不是(默認剛剛cache的時候是空的)則構建一個內存關系InMemoryRelation
- InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)
- }
- //將構建好的InMemoryRelation註冊到catalog裏。
- catalog.registerTable(None, tableName, asInMemoryRelation)
- }
二、InMemoryRelation
InMemoryRelation繼承自LogicalPlan,是Spark1.1 Spark SQL裏新添加的一種TreeNode,也是catalyst裏的一種plan. 現在TreeNode變成了4種:
1、BinaryNode 二元節點
2、LeafNode 葉子節點
3、UnaryNode 單孩子節點
4、InMemoryRelation 內存關系型節點
類圖如下:
值得註意的是,_cachedColumnBuffers這個類型為RDD[Array[ByteBuffer]]的私有字段。
這個封裝就是面向列的存儲ByteBuffer。前面提到相較於plain java object存儲記錄,用ByteBuffer能顯著的提高存儲效率,減少內存占用。並且按列查詢的速度會非常快。
InMemoryRelation具體實現如下:
構造一個InMemoryRelation需要該Relation的output Attributes,是否需要useCoompression來壓縮,默認為false,一次處理的多少行數據batchSize, child 即SparkPlan。
[java] view plain copy
- private[sql] case class InMemoryRelation(
- output: Seq[Attribute], //輸出屬性,比如src表裏就是[key,value]
- useCompression: Boolean, //操作時是否使用壓縮,默認false
- batchSize: Int, //批的大小量
- child: SparkPlan) //spark plan 具體child
可以通過設置:
spark.sql.inMemoryColumnarStorage.compressed 為true來設置內存中的列存儲是否需要壓縮。
spark.sql.inMemoryColumnarStorage.batchSize 來設置一次處理多少row
spark.sql.defaultSizeInBytes 來設置初始化的column的bufferbytes的默認大小,這裏只是其中一個參數。
這些參數都可以在源碼中設置,都在SQL Conf
[java] view plain copy
- private[spark] object SQLConf {
- val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
- val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
- val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
再回到case class InMemoryRelation:
_cachedColumnBuffers就是我們最終將table放入內存的存儲句柄,是一個RDD[Array[ByteBuffer]。
緩存主流程:
1、判斷_cachedColumnBuffers是否為null,如果不是null,則已經Cache了當前table,重復cache不會觸發cache操作。
2、child是SparkPlan,即執行hive table scan,測試我拿sbt/sbt hive/console裏test裏的src table為例,操作是掃描這張表。這個表有2個字的key是int, value 是string
3、拿到child的output, 這裏的output就是 key, value2個列。
4、執行mapPartitions操作,對當前RDD的每個分區的數據進行操作。
5、對於每一個分區,叠代裏面的數據生成新的Iterator。每個Iterator裏面是Array[ByteBuffer]
6、對於child.output的每一列,都會生成一個ColumnBuilder,最後組合為一個columnBuilders是一個數組。
7、數組內每個CommandBuilder持有一個ByteBuffer
8、遍歷原始分區的記錄,將對於的行轉為列,並將數據存到ByteBuffer內。
9、最後將此RDD調用cache方法,將RDD緩存。
10、將cached賦給_cachedColumnBuffers。
此操作總結下來是:執行hive table scan操作,返回的MapPartitionsRDD對其重新定義mapPartition方法,將其行轉列,並且最終cache到內存中。
所有流程如下:
[java] view plain copy- // If the cached column buffers were not passed in, we calculate them in the constructor.
- // As in Spark, the actual work of caching is lazy.
- if (_cachedColumnBuffers == null) { //判斷是否已經cache了當前table
- val output = child.output
- /**
- * child.output
- res65: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(key#6, value#7)
- */
- val cached = child.execute().mapPartitions { baseIterator =>
- /**
- * child.execute()是Row的集合,叠代Row
- * res66: Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([238,val_238])
- *
- * val row1 = child.execute().take(1)
- * res67: Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([238,val_238])
- * */
- /*
- * 對每個Partition進行map,映射生成一個Iterator[Array[ByteBuffer],對應java的Iterator<List<ByteBuffer>>
- * */
- new Iterator[Array[ByteBuffer]] {
- def next() = {
- //遍歷每一列,首先attribute是key 為 IntegerType ,然後attribute是value是String
- //最後封裝成一個Array, index 0 是 IntColumnBuilder, 1 是StringColumnBuilder
- val columnBuilders = output.map { attribute =>
- val columnType = ColumnType(attribute.dataType)
- val initialBufferSize = columnType.defaultSize * batchSize
- ColumnBuilder(columnType.typeId, initialBufferSize, attribute.name, useCompression)
- }.toArray
- //src表裏Row是[238,val_238] 這行Row的length就是2
- var row: Row = null
- var rowCount = 0
- //batchSize默認1000
- while (baseIterator.hasNext && rowCount < batchSize) {
- //遍歷每一條記錄
- row = baseIterator.next()
- var i = 0
- //這裏row length是2,i的取值是0 和 1
- while (i < row.length) {
- //獲取columnBuilders, 0是IntColumnBuilder,
- //BasicColumnBuilder的appendFrom
- //Appends `row(ordinal)` to the column builder.
- columnBuilders(i).appendFrom(row, i)
- i += 1
- }
- //該行已經插入完畢
- rowCount += 1
- }
- //limit and rewind,Returns the final columnar byte buffer.
- columnBuilders.map(_.build())
- }
- def hasNext = baseIterator.hasNext
- }
- }.cache()
- cached.setName(child.toString)
- _cachedColumnBuffers = cached
- }
三、Columnar Storage
初始化ColumnBuilders:
[java] view plain copy
- val columnBuilders = output.map { attribute =>
- val columnType = ColumnType(attribute.dataType)
- val initialBufferSize = columnType.defaultSize * batchSize
- ColumnBuilder(columnType.typeId, initialBufferSize, attribute.name, useCompression)
- }.toArray
這裏會聲明一個數組,來對應每一列的存儲,如下圖:
然後初始化類型builder的時候會傳入的參數:
initialBufferSize:文章開頭的圖中會有ByteBuffer,ByteBuffer的初始化大小是如何計算的?
initialBufferSize = 列類型默認長度 × batchSize ,默認batchSize是1000
拿Int類型舉例,initialBufferSize of IntegerType = 4 * 1000
attribute.name即字段名age,name etc。。。
ColumnType:
ColumnType封裝了 該類型的 typeId 和 該類型的 defaultSize。並且提供了extract、append\getField方法,來向buffer裏追加和獲取數據。
如IntegerType typeId 為0, defaultSize 4 ......
詳細看下類圖,畫的不是非常嚴格的類圖,主要為了展示目前類型系統:
ColumnBuilder:
ColumnBuilder的主要職責是:管理ByteBuffer,包括初始化buffer,添加數據到buffer內,檢查剩余空間,和申請新的空間這幾項主要職責。
initialize負責初始化buffer。
appendFrom是負責添加數據。
ensureFreeSpace確保buffer的長度動態增加。
類圖如下:
ByteBuffer的初始化過程:
初始化大小initialSize:拿Int舉例,在前面builder初始化傳入的是4×batchSize=4*1000,initialSize也就是4KB,如果沒有傳入initialSize,則默認是1024×1024。
列名稱,是否需要壓縮,都是需要傳入的。
ByteBuffer聲明時預留了4個字節,為了放column type id,這個在ColumnType的構造裏有介紹過。
[java] view plain copy
- override def initialize(
- initialSize: Int,
- columnName: String = "",
- useCompression: Boolean = false) = {
- val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize //如果沒有默認1024×1024 byte
- this.columnName = columnName
- // Reserves 4 bytes for column type ID
- buffer = ByteBuffer.allocate(4 + size * columnType.defaultSize) // buffer的初始化長度,需要加上4byte類型ID空間。
- buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)//根據nativeOrder排序,然後首先放入typeId
- }
存儲的方式如下:
Int的type id 是0, string的 type id 是 7. 後面就是實際存儲的數據了。
ByteBuffer寫入過程:
存儲結構都介紹完畢,最後開始對Table進行scan了,scan後對每一個分區的每個Row進行操作遍歷:
1、讀每個分區的每條Row
2、獲取每個列的值,從builders數組裏找到索引 i 對應的bytebuffer,追加至bytebuffer。
[java] view plain copy
- while (baseIterator.hasNext && rowCount < batchSize) {
- //遍歷每一條記錄
- row = baseIterator.next()
- var i = 0
- //這裏row length是2,i的取值是0 和 1 Ps:還是拿src table做測試,每一個Row只有2個字段,key, value所有長度為2
- while (i < row.length) {
- //獲取columnBuilders, 0是IntColumnBuilder,
- //BasicColumnBuilder的appendFrom
- //Appends `row(ordinal)` to the column builder.
- columnBuilders(i).appendFrom(row, i) //追加到對應的bytebuffer
- i += 1
- }
- //該行已經插入完畢
- rowCount += 1
- }
- //limit and rewind,Returns the final columnar byte buffer.
- columnBuilders.map(_.build())
追加過程:
根據當前builder的類型,從row的對應索引中取出值,最後追加到builder的bytebuffer內。
[java] view plain copy
- override def appendFrom(row: Row, ordinal: Int) {
- //ordinal是Row的index,0就是第一列值,1就是第二列值,獲取列的值為field
- //最後在將該列的值put到該buffer內
- val field = columnType.getField(row, ordinal)
- buffer = ensureFreeSpace(buffer, columnType.actualSize(field))//動態擴容
- columnType.append(field, buffer)
- }
ensureFreeSpace:
主要是操作buffer,如果要追加的數據大於剩余空間,就擴大buffer。
[java] view plain copy
- //確保剩余空間能容下,如果剩余空間小於 要放入的大小,則重新分配一看內存空間
- private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
- if (orig.remaining >= size) { //當前buffer剩余空間比要追加的數據大,則什麽都不做,返回自身
- orig
- } else { //否則擴容
- // grow in steps of initial size
- val capacity = orig.capacity()
- val newSize = capacity + size.max(capacity / 8 + 1)
- val pos = orig.position()
- orig.clear()
- ByteBuffer
- .allocate(newSize)
- .order(ByteOrder.nativeOrder())
- .put(orig.array(), 0, pos)
- }
- }
......
最後調用MapPartitionsRDD.cache(),將該RDD緩存並添加到spark cache管理中。
至此,我們將一張spark sql table緩存到了spark的jvm中。
四、總結
對於數據的存儲結構,我們常常關註持久化的存儲結構,並且在長久時間內有了很多種高效結構。
但是在實時性的要求下,內存數據庫越來越被關註,如何優化內存數據庫的存儲結構,是一個重點,也是一個難點。
對於Spark SQL 和 Shark 裏的列存儲 是一種優化方案,提高了關系查詢中列查詢的速度,和減少了內存占用。但是中存儲方式還是比較簡單的,沒有額外的元數據和索引來提高查詢效率,希望以後能了解到更多的In-Memory Storage。
——EOF——
創文章,轉載請註明:
轉載自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory
本文鏈接地址:http://blog.csdn.net/oopsoom/article/details/39525483
註:本文基於署名-非商業性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協議,歡迎轉載、轉發和評論,但是請保留本文作者署名和文章鏈接。如若需要用於商業目的或者與授權方面的協商,請聯系我。
轉自:http://blog.csdn.net/oopsoom/article/details/39525483
第九篇:Spark SQL 源碼分析之 In-Memory Columnar Storage源碼分析之 cache table