1. 程式人生 > >Spark (Python版) 零基礎學習筆記(二)—— Spark Transformations總結及舉例

Spark (Python版) 零基礎學習筆記(二)—— Spark Transformations總結及舉例

1. map(func)
func函式作用到資料集的每個元素,生成一個新的分散式的資料集並返回

>>> a = sc.parallelize(('a', 'b', 'c'))
>>> a.map(lambda x: x+'1').collect()
['a1', 'b1', 'c1']

2. filter(func)
選出所有func返回值為true的元素,作為一個新的資料集返回

>>> a = sc.parallelize(range(10))
>>> a.filter(lambda x: x%2==0).collect()  # 選出0-9的偶數
[0, 2, 4, 6, 8]

3. flatMap(func)
與map相似,但是每個輸入的item能夠被map到0個或者更多的items輸出,也就是說func的返回值應當是一個Sequence,而不是一個單獨的item

>>> l = ['I am Tom', 'She is Jenny', 'He is Ben']
>>> a = sc.parallelize(l,3)
>>> a.flatMap(lambda line: line.split()).collect()  # 將每個字串中的單詞劃分出來
['I', 'am', 'Tom'
, 'She', 'is', 'Jenny', 'He', 'is', 'Ben']

4. mapPartitions(func)
與map相似,但是mapPartitions的輸入函式單獨作用於RDD的每個分割槽(block)上,因此func的輸入和返回值都必須是迭代器iterator。
例如:假設RDD有十個元素0~9,分成三個區,使用mapPartitions返回每個元素的平方。如果使用map方法,map中的輸入函式會被呼叫10次,而使用mapPartitions方法,輸入函式只會被呼叫3次,每個分割槽被呼叫1次。

>>> def squareFunc(a):
. . .     for
i in a: . . . yield i*i . . . >>> a = sc.parallelize(range(10), 3) PythonRDD[1] at RDD at PythonRDD.scala:48 >>> a.mapPartitions(squareFunc).collect() [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

5. mapPartitionsWithIndex(func)
與mapPartitions相似,但是輸入函式func提供了一個正式的引數,可以用來表示分割槽的編號。

>>> def func(index, iterator):  # 返回每個分割槽的編號和數值
. . .     yield (‘index ‘ + str(index) + ’ is: ‘ + str(list(iterator)))
. . .
>>> a = sc.parallelize(range(10),3)
>>> a.mapPartitionsWithIndex(func).collect()
['index 0 is: [0, 1, 2]', 'index 1 is: [3, 4, 5]', 'index 2 is: [6, 7, 8, 9]']
>>> def squareIndex(index, iterator):  # 返回每個數值所屬分割槽的編號和數值的平方
...     for i in iterator:
...         yield ("The index is: " + str(index) + ", and the square is: " + str(i*i))
... 
>>> a.mapPartitionsWithIndex(squareIndex).collect()
['The index is: 0, and the square is: 0', 'The index is: 0, and the square is: 1', 'The index is: 1, and the square is: 4', 'The index is: 1, and the square is: 9', 'The index is: 1, and the square is: 16', 'The index is: 2, and the square is: 25', 'The index is: 2, and the square is: 36', 'The index is: 3, and the square is: 49', 'The index is: 3, and the square is: 64', 'The index is: 3, and the square is: 81']

6. sample(withReplacement, fraction, seed)
從資料中抽樣,withReplacement表示是否有放回,withReplacement=true表示有放回抽樣,fraction為抽樣的概率(0<=fraction<=1),seed為隨機種子。
例如:從1-100之間抽取樣本,被抽取為樣本的概率為0.2

>>> data = sc.parallelize(range(1,101),2)
>>> sample = data.sample(True, 0.2)
>>> sampleData.count()
19
>>> sampleData.collect()
[16, 19, 24, 29, 32, 33, 44, 45, 55, 56, 56, 57, 65, 65, 73, 83, 84, 92, 96]

!!!注意,Spark中的sample抽樣,當withReplacement=True時,相當於採用的是泊松抽樣;當withReplacement=False時,相當於採用伯努利抽樣,fraction並不是表示抽樣得到的樣本佔原來資料總量的百分比,而是一個元素被抽取為樣本的概率。fraction=0.2並不是說明要抽出100個數字中20%的資料作為樣本,而是每個數字被抽取為樣本的概率為0.2,這些數字被認為來自同一總體,樣本的大小並不是固定的,而是服從二項分佈。

7. union(otherDataset)
並集操作,將源資料集與union中的輸入資料集取並集,預設保留重複元素(如果不保留重複元素,可以利用distinct操作去除,下邊介紹distinct時會介紹)。

>>> data1 = sc.parallelize(range(10))
>>> data2 = sc.parallelize(range(6,15))
>>> data1.union(data2).collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 6, 7, 8, 9, 10, 11, 12, 13, 14]

8. intersection(otherDataset)
交集操作,將源資料集與union中的輸入資料集取交集,並返回新的資料集。

>>> data1 = sc.parallelize(range(10))
>>> data2 = sc.parallelize(range(6,15))
>>> data1.intersection(data2).collect()
[8, 9, 6, 7]

9. distinct([numTasks])
去除資料集中的重複元素。

>>> data1 = sc.parallelize(range(10))
>>> data2 = sc.parallelize(range(6,15))
>>> data1.union(data2).distinct().collect()
[0, 8, 1, 9, 2, 10, 11, 3, 12, 4, 5, 13, 14, 6, 7]

下邊的一系列transactions會用的鍵(Key)這一概念,在進行下列有關Key操作時使用的資料集為記錄倫敦各個片區(英文稱為ward)中學校和學生人數相關資訊的表格,下載地址:
https://data.london.gov.uk/dataset/london-schools-atlas/resource/64f771ee-38b1-4eff-8cd2-e9ba31b90685#
下載後將其中命名為WardtoSecSchool_LDS_2015的sheet裡邊的資料儲存為csv格式,刪除第一行的表頭,並重新命名為school.csv
資料格式為:
(Ward_CODE, Ward_NAME, TotalWardPupils, Ward2Sec_Flow_No., Secondary_School_URN, Secondary_School_Name, Pupil_count)
首先對資料進行一些預處理:

>>> school = sc.textFile("file:///home/yang/下載/school.csv")  
Data = sc.textFile("file:///home/yang/下載/school.csv") 
>>> school.count()  # 共有16796行資料
16796
>>> import re  # 引入python的正則表示式包
>>> rows = school.map(lambda line: re.subn(',[\s]+',': ', line))

注意:1. 從本地讀取資料時,程式碼中要通過 “file://” 字首指定讀取本地檔案。Spark shell 預設是讀取 HDFS 中的檔案,需要先上傳檔案到 HDFS 中,否則會有“org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/school.csv”的錯誤。
2. 對資料集進行了一下預處理,利用正則匹配替換字串,由於一些學校的名字的字串中本身含有逗號,比如“The City Academy, Hackney”, 此時如果利用csv的分隔符’,’進行分割,並不能將名字分割為“The City Academy”和“Hackney”。我們注意到csv的分隔符逗號後邊是沒有空格的,而名字裡邊的逗號後邊都會有空格(英語書寫習慣),因此,先利用re.subn語句對逗號後邊含有至少一個空格(正則表示式為’,[\s]+’)的子字串進行替換,替換為’: ’,然後再進行後續操作。以上即為對這一資料集的預處理過程。

10. groupByKey([numTasks])
作用於由鍵值對(K, V)組成的資料集上,將Key相同的資料放在一起,返回一個由鍵值對(K, Iterable)組成的資料集。
注意:1. 如果這一操作是為了後續在每個鍵上進行聚集(aggregation),比如sum或者average,此時使用reduceByKey或者aggregateByKey的效率更高。2. 預設情況下,輸出的並行程度取決於RDD分割槽的數量,但也可以通過給可選引數numTasks賦值來調整併發任務的數量。

>>> newRows = rows.map(lambda r: r[0].split(','))  
>>> ward_schoolname = newRows .map(lambda r: (r[1], r[5])).groupByKey()  # r[1]為ward的名字,r[5]為學校的名字
>>> ward_schoolname.map(lambda x: {x[0]: list(x[1])}).collect()  # 列出每個ward區域內所有的學校的名字
[{'Stifford Clays': ['William Edwards School', 'Brentwood County High School', "The Coopers' Company and Coborn School", 'Becket Keys Church of England Free School', ...] # 輸出結果為在Stifford Clays這個ward裡的學校有William Edwards School,Brentwood County High School,The Coopers' Company and Coborn School等等...

11. reduceByKey(func, [numTasks])
作用於鍵值對(K, V)上,按Key分組,然後將Key相同的鍵值對的Value都執行func操作,得到一個值,注意func的型別必須滿足

>>> pupils = newRows.map(lambda r: (r[1], int(r[6])))  # r[1]為ward的名字,r[6]為每個學校的學生數
>>> ward_pupils = pupils.reduceByKey(lambda x, y: x+y)   # 計算各個ward中的學生數
>>> ward_pupils.collect()  # 輸出各個ward中的學生數
[('Stifford Clays', 1566), ('Shenley', 1625), ('Southbury', 3526), ('Rainham and Wennington', 769), ('Bromley Town', 574), ('Waltham Abbey Honey Lane', 835), ('Telegraph Hill', 1238), ('Chigwell Village', 1506), ('Gooshays', 2097), ('Edgware', 2585), ('Camberwell Green', 1374), ('Glyndon', 4633),...]

12. aggregateByKey(zeroValue, seqOp, comOp, [numTasks])
在於鍵值對(K, V)的RDD中,按key將value進行分組合並,合併時,將每個value和初始值作為seqOp函式的引數,進行計算,返回的結果作為一個新的鍵值對(K, V),然後再將結果按照key進行合併,最後將每個分組的value傳遞給comOp函式進行計算(先將前兩個value進行計算,將返回結果和下一個value傳給comOp函式,以此類推),將key與計算結果作為一個新的鍵值對(K, V)輸出。
例子: 上述統計ward內學生人數的操作也可以通過aggregateByKey實現,此時,seqOpcomOp都是進行加法操作,程式碼如下:

>>> ward_pupils = pupils.aggregateByKey(0, lambda x, y: x+y, lambda x, y: x+y)
>>> ward_pupils.collect()  
[('Stifford Clays', 1566), ('Shenley', 1625), ('Southbury', 3526), ('Rainham and Wennington', 769), ('Bromley Town', 574), ('Waltham Abbey Honey Lane', 835), ('Telegraph Hill', 1238), ('Chigwell Village', 1506), ('Gooshays', 2097), ('Edgware', 2585), ('Camberwell Green', 1374), ('Glyndon', 4633),...]

13. sortByKey([ascending=True], [numTasks])
按照Key進行排序,ascending的值預設為True,True/False表示升序還是降序
例如:將上述ward按照ward名字降序排列,打印出前十個

>>> ward_pupils.sortByKey(False, 4).take(10)
[('Yiewsley', 2560), ('Wormholt and White City', 1455), ('Woodside', 1204), ('Woodhouse', 2930), ('Woodcote', 1214), ('Winchmore Hill', 1116), ('Wilmington', 2243), ('Willesden Green', 1896), ('Whitefoot', 676), ('Whalebone', 2294)]

14. join(otherDataset, [numTasks])
類似於SQL中的連線操作,即作用於鍵值對(K, V)和(K, W)上,返回元組 (K, (V, W)),spark也支援外連線,包括leftOuterJoin,rightOuterJoin和fullOuterJoin。例子:

>>> class1 = sc.parallelize(('Tom', 'Jenny', 'Bob')).map(lambda a: (a, 'attended'))
>>> class2 = sc.parallelize(('Tom', 'Amy', 'Alice', 'John')).map(lambda a: (a, 'attended'))
>>> class1.join(class2).collect()
[('Tom', ('attended', 'attended'))]
>>> class1.leftOuterJoin(class2).collect()
[('Tom', ('attended', 'attended')), ('Jenny', ('attended', None)), ('Bob', ('attended', None))]
>>> class1.rightOuterJoin(class2).collect()
[('John', (None, 'attended')), ('Tom', ('attended', 'attended')), ('Amy', (None, 'attended')), ('Alice', (None, 'attended'))]
>>> class1.fullOuterJoin(class2).collect()
[('John', (None, 'attended')), ('Tom', ('attended', 'attended')), ('Jenny', ('attended', None)), ('Bob', ('attended', None)), ('Amy', (None, 'attended')), ('Alice', (None, 'attended'))]

15. cogroup(otherDataset, [numTasks])
作用於鍵值對(K, V)和(K, W)上,返回元組 (K, (Iterable, Iterable))。這一操作可叫做groupWith

>>> class1 = sc.parallelize(('Tom', 'Jenny', 'Bob')).map(lambda a: (a, 'attended'))
>>> class2 = sc.parallelize(('Tom', 'Amy', 'Alice', 'John')).map(lambda a: (a, 'attended'))
>>> group = class1.cogroup(class2)
>>> group.collect()
[('John', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808afd0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a1d0>)), ('Tom', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808a7f0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a048>)), ('Jenny', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808a9b0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a208>)), ('Bob', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808ae80>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b448d0>)), ('Amy', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44c88>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44588>)), ('Alice', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44748>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44f98>))]
>>> group.map(lambda x: {x[0]: [list(x[1][0]), list(x[1][1])]}).collect()
[{'John': [[], ['attended']]}, {'Tom': [['attended'], ['attended']]}, {'Jenny': [['attended'], []]}, {'Bob': [['attended'], []]}, {'Amy': [[], ['attended']]}, {'Alice': [[], ['attended']]}]

16. cartesian(otherDataset)
笛卡爾乘積,作用於資料集T和U上,返回(T, U),即資料集中每個元素的兩兩組合

>>> a = sc.parallelize(('a', 'b', 'c'))
>>> b = sc.parallelize(('d', 'e', 'f'))
>>> a.cartesian(b).collect()
[('a', 'd'), ('a', 'e'), ('a', 'f'), ('b', 'd'), ('b', 'e'), ('b', 'f'), ('c', 'd'), ('c', 'e'), ('c', 'f')]

17. pipe(command, [envVars])
將驅動程式中的RDD交給shell處理(外部程序),例如Perl或bash指令碼。RDD元素作為標準輸入傳給指令碼,指令碼處理之後的標準輸出會作為新的RDD返回給驅動程式。

18. coalesce(numPartitions)
將RDD的分割槽數減小到numPartitions個。當資料集通過過濾規模減小時,使用這個操作可以提升效能。

19. repartition(numPartitions)
重組資料,資料被重新隨機分割槽為numPartitions個,numPartitions可以比原來大,也可以比原來小,平衡各個分割槽。這一操作會將整個資料集在網路中重新洗牌。

20. repartitionAndSortWithinPartitions(partitioner)
根據給定的partitioner函式重新將RDD分割槽,並在分割槽內排序。這比先repartition然後在分割槽內sort高效,原因是這樣迫使排序操作被移到了shuffle階段。