1. 程式人生 > 其它 >Pyspark學習筆記(四)---彈性分散式資料集 RDD [Resilient Distribute Data](上)

Pyspark學習筆記(四)---彈性分散式資料集 RDD [Resilient Distribute Data](上)

在這裡插入圖片描述

Pyspark學習筆記(四)---彈性分散式資料集 RDD [Resilient Distribute Data] (上)

1.RDD簡述

RDD是Spark程式設計中最基本的資料物件,
無論是最初載入的資料集,還是任何中間結果的資料集,或是最終的結果資料集,都是RDD


在Pyspark中,RDD是由分佈在各節點上的python物件組成,如列表,元組,字典等。
RDD主要是儲存在記憶體中(亦可持久化到硬碟上),這就是相對於Hadoop的MapReduce的優點,節省了重新讀取硬碟資料的時間。

  • 彈性:RDD是有彈性的,意思就是說如果Spark中一個執行任務的節點丟失了,資料集依然可以被重建出來;
  • 分散式:RDD是分散式的,RDD中的資料被分到至少一個分割槽中,在叢集上跨工作節點分散式地作為物件集合儲存在記憶體中;
  • 資料集:RDD是由記錄組成的資料集。

RDD的另一個關鍵特性是不可變,也即是在例項化出來匯入資料後,就無法更新了。
每次對已有RDD進行轉化操作(transformation)都會生成新的RDD;

2.載入資料到RDD

要開始一個Spark程式,需要從外部源的資料初始化出至少一個RDD。
然後才是經過一系列轉化操作行動操作,得到中間的RDD和結果RDD。
初始RDD的建立方法:

  • A 從檔案中讀取資料;
  • B 從SQL或者NoSQL等資料來源讀取
  • C 通過程式設計載入資料
  • D 從流資料中讀取資料。
#建立一個SparkSession物件,方便下面使用
from pyspark.sql import SparkSession
spark = SparkSession\
                    .builder\
                    .appName('exam1'
)\ .enableHiveSupport()\ .getOrCreate() sc = spark.sparkContext

A 從檔案中讀取資料

Ⅰ·從文字檔案建立RDD

sc.textFile(name, minPartitions=None, use_unicode=True)
#示例:
#①讀取整個目錄下的內容
Example=sc.textFile(“hdfs://exam_dir/running_logs/)

#②讀取目錄下的單個檔案
Example=sc.textFile(“hdfs://exam_dir/running_logs/log_001.txt”)

#③使用萬用字元讀取檔案
Example=sc.textFile(“hdfs://exam_dir/running_logs/*_001.txt”)
#####################################################################
sc.wholeTextFiles(path, minPartitions=None, use_unicode=True)
#讀取包含多個檔案的整個目錄,每個檔案會作為一條記錄(鍵-值對);
#其中檔名是記錄的鍵,而檔案的全部內容是記錄的值。
#使用textFile()讀取目錄下的所有檔案時,每個檔案的每一行成為了一條單獨的記錄,
#而該行屬於哪個檔案是不記錄的。

Ⅱ·從物件檔案建立RDD

物件檔案指序列化後的資料結構,有幾個方法可以讀取相應的物件檔案:
hadoopFile(), sequenceFile(), pickleFile()

B 從資料來源建立RDD

一般是使用SparkSession中的函式,SparkSession物件提供了read method,返回一個DataFrameReader物件。官網連結如下
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession.read
用該物件將資料讀取到DataFrame中,DataFrame是一種特殊的RDD,老版本中稱為SchemaRDD
比如說,spark現在是一個已經被建立的SparkSession物件,然後呼叫read方法,spark.read就是一個DataFrameReader物件,然後就呼叫該物件(DataFrameReader)的一系列方法,來讀取各種資料,參考如下連結:http://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/DataFrameReader.html

官方github中給了很多例子:
https://github.com/apache/spark/blob/master/examples/src/main/python/sql/datasource.py

C.通過程式設計建立RDD

sc.parallelize(c, numSlices=None)

parallelize()方法要求列表已經建立好,並作為c引數傳入。引數numSlices指定了所需建立的分割槽數量。
http://spark.apache.org/docs/latest/api/python/_modules/pyspark/context.html#SparkContext.parallelize

3.RDD操作

轉化操作:操作RDD並返回一個 新RDD 的函式;
行動操作:操作RDD並返回 一個值 或者 進行輸出 的函式。

粗粒度轉化操作:把函式作用於資料的每一個元素(無差別覆蓋),比如map,filter
細粒度轉化操作:可以針對單條記錄或單元格進行操作。

惰性求值
在處理Spark程式時,Spark使用惰性求值(lazy evaluation),也叫做惰性執行(lazy execution)。惰性執行指的 是在呼叫行動操作時(也就是需要進行輸出時)再處理資料。這是因為每個語句僅僅解析了語法和引用物件, 在請求了行動操作之後,Spark會創建出DAG圖以及邏輯執行計劃和物理執行計劃,接下來驅動器程序就跨執行器協調並管理計劃的執行。

4.RDD持久化與重用

RDD主要建立和存在於執行器的記憶體中。預設情況下,RDD是易逝物件,僅在需要的時候存在。
在它們被轉化為新的RDD,並不被其他操作所依賴後,這些RDD就會被刪除。
若一RDD在多個行動操作中用到,就每次都會重新計算,則可呼叫cache()persist( )方法快取或持久化RDD

5.RDD譜系

Spark維護每個RDD的譜系,也就是獲取這個RDD所需要的一系列轉化操作的序列。
預設情況下,每個RDD都會重新計算整個譜系,除非呼叫了RDD持久化

6.窄依賴(窄操作)- 寬依賴(寬操作):

  • 窄操作:
  • ①多個操作可以合併為一個階段,比如同時對一個數據集進行的map操作或者filter操作可以在資料集的各元 素的一輪遍歷中處理;
  • ②子RDD只依賴於一個父RDD
  • ③不需要進行節點間的資料混洗
  • 寬操作:
  • ①通常需要資料混洗
  • ②RDD有多個依賴,比如在join或者union的時候

7.RDD容錯性

因為每個RDD的譜系都被記錄,所以一個節點崩潰時,任何RDD都可以將其全部分割槽重建為原始狀態。(當 然,如果存在一些非確定性函式,比如random,因為其隨機性,所以可能影響到RDD的重建。)

8.RDD型別

除了包含通用屬性和函式的基本型別BaseRDD外,RDD還有以下附加型別:
http://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/rdd/RDD.html
在這裡插入圖片描述

9.基本的RDD操作

Pyspark學習筆記(四)—彈性分散式資料集 RDD 【Resilient Distribute Data】(下)