Pyspark學習筆記(四)---彈性分散式資料集 RDD [Resilient Distribute Data](上)
Pyspark學習筆記(四)---彈性分散式資料集 RDD [Resilient Distribute Data] (上)
1.RDD簡述
RDD
是Spark程式設計中最基本的資料物件,
無論是最初載入的資料集
,還是任何中間結果的資料集
,或是最終的結果資料集
,都是RDD
在Pyspark中,RDD是由分佈在各節點上的python物件組成,如列表,元組,字典等。
RDD主要是儲存在記憶體中(亦可持久化到硬碟上),這就是相對於Hadoop的MapReduce的優點,節省了重新讀取硬碟資料的時間。
彈性
:RDD是有彈性的,意思就是說如果Spark中一個執行任務的節點丟失了,資料集依然可以被重建出來;分散式
:RDD是分散式的,RDD中的資料被分到至少一個分割槽中,在叢集上跨工作節點分散式地作為物件集合儲存在記憶體中;資料集
:RDD是由記錄組成的資料集。
RDD的另一個關鍵特性是不可變,也即是在例項化出來匯入資料後,就無法更新了。
每次對已有RDD進行轉化操作(transformation)都會生成新的RDD;
2.載入資料到RDD
要開始一個Spark程式,需要從外部源的資料初始化出至少一個RDD。
然後才是經過一系列轉化操作
、行動操作
,得到中間的RDD和結果RDD。
初始RDD的建立方法:
- A 從檔案中讀取資料;
- B 從SQL或者NoSQL等資料來源讀取
- C 通過程式設計載入資料
- D 從流資料中讀取資料。
#建立一個SparkSession物件,方便下面使用
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName('exam1' )\
.enableHiveSupport()\
.getOrCreate()
sc = spark.sparkContext
A 從檔案中讀取資料
Ⅰ·從文字檔案建立RDD
sc.textFile(name, minPartitions=None, use_unicode=True)
#示例:
#①讀取整個目錄下的內容
Example=sc.textFile(“hdfs://exam_dir/running_logs/”)
#②讀取目錄下的單個檔案
Example=sc.textFile(“hdfs://exam_dir/running_logs/log_001.txt”)
#③使用萬用字元讀取檔案
Example=sc.textFile(“hdfs://exam_dir/running_logs/*_001.txt”)
#####################################################################
sc.wholeTextFiles(path, minPartitions=None, use_unicode=True)
#讀取包含多個檔案的整個目錄,每個檔案會作為一條記錄(鍵-值對);
#其中檔名是記錄的鍵,而檔案的全部內容是記錄的值。
#使用textFile()讀取目錄下的所有檔案時,每個檔案的每一行成為了一條單獨的記錄,
#而該行屬於哪個檔案是不記錄的。
Ⅱ·從物件檔案建立RDD
物件檔案
指序列化後的資料結構,有幾個方法可以讀取相應的物件檔案:
hadoopFile()
, sequenceFile()
, pickleFile()
B 從資料來源建立RDD
一般是使用SparkSession
中的函式,SparkSession
物件提供了read
method,返回一個DataFrameReader
物件。官網連結如下
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession.read
用該物件將資料讀取到DataFrame
中,DataFrame
是一種特殊的RDD
,老版本中稱為SchemaRDD
。
比如說,spark
現在是一個已經被建立的SparkSession
物件,然後呼叫read
方法,spark.read
就是一個DataFrameReader
物件,然後就呼叫該物件(DataFrameReader
)的一系列方法,來讀取各種資料,參考如下連結:http://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/DataFrameReader.html
官方github中給了很多例子:
https://github.com/apache/spark/blob/master/examples/src/main/python/sql/datasource.py
C.通過程式設計建立RDD
sc.parallelize(c, numSlices=None)
parallelize()
方法要求列表已經建立好,並作為c引數傳入。引數numSlices
指定了所需建立的分割槽數量。
http://spark.apache.org/docs/latest/api/python/_modules/pyspark/context.html#SparkContext.parallelize
3.RDD操作
轉化操作
:操作RDD並返回一個 新RDD 的函式;
行動操作
:操作RDD並返回 一個值 或者 進行輸出 的函式。
粗粒度轉化操作
:把函式作用於資料的每一個元素(無差別覆蓋),比如map,filter
細粒度轉化操作
:可以針對單條記錄或單元格進行操作。
惰性求值
在處理Spark程式時,Spark使用惰性求值(lazy evaluation),也叫做惰性執行(lazy execution)。惰性執行指的 是在呼叫行動操作時(也就是需要進行輸出時)再處理資料。這是因為每個語句僅僅解析了語法和引用物件, 在請求了行動操作之後,Spark會創建出DAG圖以及邏輯執行計劃和物理執行計劃,接下來驅動器程序就跨執行器協調並管理計劃的執行。
4.RDD持久化與重用
RDD主要建立和存在於執行器的記憶體中。預設情況下,RDD是易逝物件,僅在需要的時候存在。
在它們被轉化為新的RDD,並不被其他操作所依賴後,這些RDD就會被刪除。
若一RDD在多個行動操作中用到,就每次都會重新計算,則可呼叫cache()
或persist( )
方法快取或持久化RDD。
5.RDD譜系
Spark維護每個RDD的譜系,也就是獲取這個RDD所需要的一系列轉化操作的序列。
預設情況下,每個RDD都會重新計算整個譜系,除非呼叫了RDD持久化。
6.窄依賴(窄操作)- 寬依賴(寬操作):
- 窄操作:
- ①多個操作可以合併為一個階段,比如同時對一個數據集進行的map操作或者filter操作可以在資料集的各元 素的一輪遍歷中處理;
- ②子RDD只依賴於一個父RDD
- ③不需要進行節點間的資料混洗
- 寬操作:
- ①通常需要資料混洗
- ②RDD有多個依賴,比如在join或者union的時候
7.RDD容錯性
因為每個RDD的譜系都被記錄,所以一個節點崩潰時,任何RDD都可以將其全部分割槽重建為原始狀態。(當 然,如果存在一些非確定性函式,比如random,因為其隨機性,所以可能影響到RDD的重建。)
8.RDD型別
除了包含通用屬性和函式的基本型別BaseRDD外,RDD還有以下附加型別:
http://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/rdd/RDD.html