1. 程式人生 > >大資料篇:Spark入門第一個Spark應用程式詳解:WordCount

大資料篇:Spark入門第一個Spark應用程式詳解:WordCount

任務要求

編寫一個Spark應用程式,對某個檔案中的單詞進行詞頻統計。

備註:本文spark的根目錄名:spark-1.6.3-bin-hadoop2.6

#準備工作
cd /usr/local/spark-1.6.3-bin-hadoop2.6
mkdir mycode
cd mycode
mkdir wordcount
cd wordcount

#新建一個包含了一些語句的文字檔案word.txt
vi word.txt

在spark-shell中執行詞頻統計

【1】啟動spark-shell

cd /usr/local/spark-1.6.3-bin-hadoop2.6
./bin/spark-shell
....
scala>

#建議配置系統環境變數,會方便許多哦!
  • 載入本地檔案

在開始具體詞頻統計程式碼之前,需要解決一個問題,就是如何載入檔案?

要注意,檔案可能位於本地檔案系統中,也有可能存放在分散式檔案系統HDFS中,所以,下面我們分別介紹如何載入本地檔案,以及如何載入HDFS中的檔案。
首先,請在第二個終端視窗下操作,用下面命令到達“/usr/local/spark/mycode/wordcount”目錄,檢視一下上面已經建好的word.txt的內容:

cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount
cat word.txt  ##cat命令會把word.txt檔案的內容全部顯示到螢幕上。

執行結果:

[[email protected] ~]# cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount
[[email protected] wordcount]# cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount
[[email protected] wordcount]# cat word.txt 
Hello,Spark
Hello,master
Hello,slave1
Hello,slave2

切換回到第一個終端

scala> val textFile = sc.textFile("file:///usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/word.txt")

val後面的是變數textFile,而sc.textFile()中的這個textFile是sc的一個方法名稱,這個方法用來載入檔案資料。這兩個textFile不是一個東西,不要混淆。實際上,val後面的是變數textFile,你完全可以換個變數名稱。

如,val lines = sc.textFile(“file:///usr/local/spark/mycode/wordcount/word.txt”)。這裡使用相同名稱,就是有意強調二者的區別。注意,要載入本地檔案,必須採用“file:///”開頭的這種格式。執行上上面這條命令以後,並不會馬上顯示結果,因為,Spark採用惰性機制,只有遇到“行動”型別的操作,才會從頭到尾執行所有操作。所以,下面我們執行一條“行動”型別的語句,就可以看到結果

scala> textFile.first()

first()是一個“行動”(Action)型別的操作,會啟動真正的計算過程,從檔案中載入資料到變數textFile中,並取出第一行文字。螢幕上會顯示很多反饋資訊,這裡不再給出,你可以從這些結果資訊中,找到word.txt檔案中的第一行的內容。
正因為Spark採用了惰性機制,在執行轉換操作的時候,即使我們輸入了錯誤的語句,spark-shell也不會馬上報錯,而是等到執行“行動”型別的語句時啟動真正的計算,那個時候“轉換”操作語句中的錯誤就會顯示出來

val textFile = sc.textFile("file:///usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/word123.txt")
#注:上面我們使用了一個根本就不存在的word123.txt,執行上面語句時,spark-shell根本不會報錯,因為,沒有遇到“行動”型別的first()操作之前,這個載入操作時不會真正執行的。然後,我們執行一個“行動”型別的操作first()

scala> textFile.first()
#注:執行上面語句後,你會發現,會返回錯誤資訊,其中有四個醒目的中文文字“拒絕連線”,因為,這個word123.txt檔案根本就不存在

下面練習一下如何把textFile變數中的內容再次寫回到另外一個文字檔案wordback.txt中?

val textFile = sc.textFile("file:///usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/word.txt")
textFile.saveAsTextFile("file:///usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/writeback")

上面的saveAsTextFile()括號裡面的引數是儲存檔案的路徑,不是檔名。saveAsTextFile()是一個“行動”(Action)型別的操作,所以,馬上會執行真正的計算過程,從word.txt中載入資料到變數textFile中。

##另一個終端查看回寫的內容
cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/writeback/
ls
cat part-00000 ##檢視寫入的內容

執行結果:

[[email protected] wordcount]# cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/writeback/
[[email protected] writeback]# ls
part-00000  _SUCCESS
[[email protected] writeback]# cat part-00000
Hello,Spark
Hello,master
Hello,slave1
Hello,slave2
cat part-00000 ##檢視寫入的內容
  • 載入HDFS中的檔案

為了能夠讀取HDFS中的檔案,請首先啟動Hadoop中的HDFS元件。注意,之前我們在“Spark安裝”這章內容已經介紹瞭如何安裝Hadoop和Spark,所以,這裡我們可以使用以下命令直接啟動Hadoop中的HDFS元件(由於用不到MapReduce元件,所以,不需要啟動MapReduce或者YARN)。請到第二個終端視窗,使用Linux Shell命令提示符狀態,然後輸入下面命令:

cd /usr/local/hadoop
./sbin/start-dfs.sh

#我添加了環境變數比較方便

Shell 命令

啟動結束後,HDFS開始進入可用狀態。如果你在HDFS檔案系統中,還沒有為當前Linux登入使用者建立目錄(我這裡建立的名字叫root的使用者),請使用下面命令建立:

./bin/hdfs dfs -mkdir -p /user/root

也就是說,HDFS檔案系統為Linux登入使用者開闢的預設目錄是“/user/使用者名稱”(注意:是user,不是usr),本教程統一使用使用者名稱hadoop登入Linux系統,所以,上面建立了“/user/root”目錄,再次強調,這個目錄是在HDFS檔案系統中,不在本地檔案系統中。建立好以後,下面我們使用命令檢視一下HDFS檔案系統中的目錄和檔案:

./bin/hdfs dfs -ls .

上面命令中,最後一個點號“.”,表示要檢視Linux當前登入使用者hadoop在HDFS檔案系統中與hadoop對應的目錄下的檔案,也就是檢視HDFS檔案系統中“/user/root/”目錄下的檔案,所以,下面兩條命令是等價的:

./bin/hdfs dfs -ls .
./bin/hdfs dfs -ls /user/root

如果要檢視HDFS檔案系統根目錄下的內容,需要使用下面命令:

./bin/hdfs dfs -ls /

下面,我們把本地檔案系統中的“/usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/word.txt”上傳到分散式檔案系統HDFS中(放到hadoop使用者目錄下):

./bin/hdfs dfs -put /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/word.txt .

然後,用命令檢視一下HDFS的hadoop使用者目錄下是否多了word.txt檔案,可以使用下面命令列出hadoop目錄下的內容:

./bin/hdfs dfs -ls .

可以看到,確實多了一個word.txt檔案,我們使用cat命令檢視一個HDFS中的word.txt檔案的內容,命令如下:

./bin/hdfs dfs -cat ./word.txt

上面命令執行後,就會看到HDFS中word.txt的內容了。

 

現在,讓我們切換回到spark-shell視窗,編寫語句從HDFS中載入word.txt檔案,並顯示第一行文字內容:

scala> val textFile = sc.textFile("hdfs://localhost:9000/user/root/word.txt")
scala> textFile.first()

執行上面語句後,就可以看到HDFS檔案系統中(不是本地檔案系統)的word.txt的第一行內容了。

備註:我這裡通過master:9000連線不通,hdfs拒絕連線,所以我才用了下面等價的三條替換了需要9000埠的表達!

需要注意的是,sc.textFile(“hdfs://localhost:9000/user/root/word.txt”)中,“hdfs://localhost:9000/”是前面介紹Hadoop安裝內容時確定下來的埠地址9000。實際上,也可以省略不寫,如下三條語句都是等價的:

val textFile = sc.textFile("hdfs://localhost:9000/user/root/word.txt")
val textFile = sc.textFile("/user/root/word.txt")
val textFile = sc.textFile("word.txt")

下面,我們再把textFile的內容寫回到HDFS檔案系統中(寫到root使用者目錄下):

scala> val textFile = sc.textFile("word.txt")
scala> textFile.saveAsTextFile("writeback")

執行上面命令後,文字內容會被寫入到HDFS檔案系統的“/user/root/writeback”目錄下,我們可以切換到Linux Shell命令提示符視窗另一個終端檢視一下:

./bin/hdfs dfs -ls .
#執行上述命令後,在執行結果中,可以看到有個writeback目錄,下面我們檢視該目錄下有什麼檔案:
./bin/hdfs dfs -ls ./writeback

執行結果中,可以看到存在兩個檔案:part-00000和_SUCCESS。我們使用下面命令輸出part-00000檔案的內容(注意:part-00000裡面有五個零):

./bin/hdfs dfs -cat ./writeback/part-00000

執行結果中,就可以看到和word.txt檔案中一樣的文字內容,結果如下:

[[email protected] ~]# hdfs dfs -ls .
Found 2 items
-rw-r--r--   2 root supergroup         51 2018-11-03 14:13 word.txt
drwxr-xr-x   - root supergroup          0 2018-11-03 14:30 writeback
[[email protected] ~]# hdfs dfs -ls ./writeback
Found 2 items
-rw-r--r--   2 root supergroup          0 2018-11-03 14:30 writeback/_SUCCESS
-rw-r--r--   2 root supergroup         51 2018-11-03 14:30 writeback/part-00000
[[email protected] ~]# hdfs dfs -cat ./writeback/part-00000
Hello,Spark
Hello,master
Hello,slave1
Hello,slave2
  • 【2】詞頻統計

有了前面的鋪墊性介紹,下面我們就可以開始第一個Spark應用程式:WordCount。
請切換到spark-shell視窗:

scala> val textFile = sc.textFile("file:///usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/word.txt")
scala> val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
scala> wordCount.collect()

上面只給了程式碼,省略了執行過程中返回的結果資訊,因為返回資訊很多。
下面簡單解釋一下上面的語句:

textFile包含了多行文字內容,textFile.flatMap(line => line.split(” “))會遍歷textFile中的每行文字內容,當遍歷到其中一行文字內容時,會把文字內容賦值給變數line,並執行Lamda表示式line => line.split(” “)。line => line.split(” “)是一個Lamda表示式,左邊表示輸入引數,右邊表示函式裡面執行的處理邏輯,這裡執行line.split(” “),也就是針對line中的一行文字內容,採用空格作為分隔符進行單詞切分,從一行文字切分得到很多個單詞構成的單詞集合。這樣,對於textFile中的每行文字,都會使用Lamda表示式得到一個單詞集合,最終,多行文字,就得到多個單詞集合。textFile.flatMap()操作就把這多個單詞集合“拍扁”得到一個大的單詞集合。

然後,針對這個大的單詞集合,執行map()操作,也就是map(word => (word, 1)),這個map操作會遍歷這個集合中的每個單詞,當遍歷到其中一個單詞時,就把當前這個單詞賦值給變數word,並執行Lamda表示式word => (word, 1),這個Lamda表示式的含義是,word作為函式的輸入引數,然後,執行函式處理邏輯,這裡會執行(word, 1),也就是針對輸入的word,構建得到一個tuple,形式為(word,1),key是word,value是1(表示該單詞出現1次)。

程式執行到這裡,已經得到一個RDD,這個RDD的每個元素是(key,value)形式的tuple。最後,針對這個RDD,執行reduceByKey((a, b) => a + b)操作,這個操作會把所有RDD元素按照key進行分組,然後使用給定的函式(這裡就是Lamda表示式:(a, b) => a + b),對具有相同的key的多個value進行reduce操作,返回reduce後的(key,value),比如(“hadoop”,1)和(“hadoop”,1),具有相同的key,進行reduce以後就得到(“hadoop”,2),這樣就計算得到了這個單詞的詞頻。

  • 【3】編寫獨立應用程式執行詞頻統計

下面我們編寫一個Scala應用程式來實現詞頻統計。
請登入Linux系統(本教程統一採用使用者名稱hadoop進行登入),進入Shell命令提示符狀態,然後,執行下面命令:

cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/
mkdir -p src/main/scala //這裡加入-p選項,可以一起建立src目錄及其子目錄

請在“/usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/src/main/scala”目錄下新建一個test.scala檔案,裡面包含如下程式碼:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object WordCount {
    def main(args: Array[String]) {
        val inputFile =  "file:///usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/word.txt"
        val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
        val sc = new SparkContext(conf)
                val textFile = sc.textFile(inputFile)
                val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
                wordCount.foreach(println)       
    }
}

注意,SparkConf().setAppName(“WordCount”).setMaster(“local[2]”)這句語句,也可以刪除.setMaster(“local[2]”),只保留 val conf = new SparkConf().setAppName(“WordCount”)。
如果test.scala沒有呼叫SparkAPI,那麼,只要使用scalac命令編譯後執行即可。但是,這個test.scala程式依賴 Spark API,因此我們需要通過 sbt 進行編譯打包(“Spark-shell的測試及Scala獨立應用程式的編寫與sbt打包”這部分已經介紹過如何使用sbt進行編譯打包)。下面再編譯一次。

請執行如下命令:

cd /usr/local/spark/mycode/wordcount/
vi simple.sbt

通過上面程式碼,新建一個simple.sbt檔案,請在該檔案中輸入下面程式碼:

下面是我的spark及scala版本:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.3"

注意, “org.apache.spark”後面是兩個百分號,千萬不要少些一個百分號%,如果少了,編譯時候會報錯。
下面我們使用 sbt 打包 Scala 程式。為保證 sbt 能正常執行,先執行如下命令檢查整個應用程式的檔案結構:

cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/
find .

應該是類似下面的檔案結構:

.
./src
./src/main
./src/main/scala
./src/main/scala/test.scala
./simple.sbt
./word.txt

接著,我們就可以通過如下程式碼將整個應用程式打包成 JAR(首次運行同樣需要下載依賴包 ):

cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/ //請一定把這目錄設定為當前目錄
/usr/local/sbt/sbt package

上面執行過程需要消耗幾分鐘時間,螢幕上會返回一下資訊:

[[email protected] wordcount]# cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/
[[email protected] wordcount]# /usr/local/sbt/sbt package
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256M; support was removed in 8.0
[info] Set current project to Simple Project (in build file:/usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/)
[info] Updating {file:/usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/}wordcount...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] downloading https://repo1.maven.org/maven2/org/apache/avro/avro/1.7.7/avro-1.7.7.jar ...
[info] 	[SUCCESSFUL ] org.apache.avro#avro;1.7.7!avro.jar (32854ms)
[info] Done updating.
[info] Compiling 1 Scala source to /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/target/scala-2.10/classes...
[info] Packaging /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/target/scala-2.10/simple-project_2.10-1.0.jar ...
[info] Done packaging.
[success] Total time: 145 s, completed 2018-11-3 14:52:13

#螢幕上返回上述資訊表明打包成功

生成的 jar 包的位置為 /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/target/scala-2.10/simple-project_2.10-1.0.jar。
最後,通過 spark-submit 執行程式。我們就可以將生成的 jar 包通過 spark-submit 提交到 Spark 中運行了,命令如下:

/usr/local/spark-1.6.3-bin-hadoop2.6/bin/spark-submit --class "WordCount" /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/target/scala-2.10/simple-project_2.10-1.0.jar

下面是我的word.txt進行詞頻統計後的結果:

Problem:

 Input path does not exist: file:/usr/local/spark/mycode/wordcount/word.txt

 解析:InputFile找不到的時候,當你修改了test.scala的時候,也不可以直接用jar來跑,你需要重新編譯用sbt重新打包生成新的jar包,然後再來跑。否則用之前打包的jar,它還是之前的錯誤編譯生成的jar,程式即使main中源程式的test.scala程式原始碼修改了,也會出錯。

解決:sbt重新編譯生成jar,重新執行!

[[email protected] wordcount]# /usr/local/spark-1.6.3-bin-hadoop2.6/bin/spark-submit --class "WordCount" /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/target/scala-2.10/simple-project_2.10-1.0.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/spark-1.6.3-bin-hadoop2.6/lib/spark-assembly-1.6.3-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop-2.8.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
18/11/03 15:01:36 INFO spark.SparkContext: Running Spark version 1.6.3
18/11/03 15:01:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/11/03 15:01:36 INFO spark.SecurityManager: Changing view acls to: root
18/11/03 15:01:36 INFO spark.SecurityManager: Changing modify acls to: root
18/11/03 15:01:36 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
18/11/03 15:01:37 INFO util.Utils: Successfully started service 'sparkDriver' on port 34369.
18/11/03 15:01:37 INFO slf4j.Slf4jLogger: Slf4jLogger started
18/11/03 15:01:38 INFO Remoting: Starting remoting
18/11/03 15:01:38 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 42135.
18/11/03 15:01:38 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42135]
18/11/03 15:01:38 INFO spark.SparkEnv: Registering MapOutputTracker
18/11/03 15:01:38 INFO spark.SparkEnv: Registering BlockManagerMaster
18/11/03 15:01:38 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-a907e9a7-0ec9-4a2c-84ca-4e97b450043c
18/11/03 15:01:38 INFO storage.MemoryStore: MemoryStore started with capacity 517.4 MB
18/11/03 15:01:38 INFO spark.SparkEnv: Registering OutputCommitCoordinator
18/11/03 15:01:38 INFO server.Server: jetty-8.y.z-SNAPSHOT
18/11/03 15:01:38 INFO server.AbstractConnector: Started [email protected]:4040
18/11/03 15:01:38 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
18/11/03 15:01:38 INFO ui.SparkUI: Started SparkUI at http://192.168.10.251:4040
18/11/03 15:01:38 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-8dfe652a-0642-4289-9687-0ff0af307dea/httpd-bde01377-d78e-4cdc-9f6b-d95543c17cd0
18/11/03 15:01:38 INFO spark.HttpServer: Starting HTTP Server
18/11/03 15:01:38 INFO server.Server: jetty-8.y.z-SNAPSHOT
18/11/03 15:01:38 INFO server.AbstractConnector: Started [email protected]:33810
18/11/03 15:01:38 INFO util.Utils: Successfully started service 'HTTP file server' on port 33810.
18/11/03 15:01:38 INFO spark.SparkContext: Added JAR file:/usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/target/scala-2.10/simple-project_2.10-1.0.jar at http://192.168.10.251:33810/jars/simple-project_2.10-1.0.jar with timestamp 1541228498915
18/11/03 15:01:38 INFO executor.Executor: Starting executor ID driver on host localhost
18/11/03 15:01:39 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45968.
18/11/03 15:01:39 INFO netty.NettyBlockTransferService: Server created on 45968
18/11/03 15:01:39 INFO storage.BlockManagerMaster: Trying to register BlockManager
18/11/03 15:01:39 INFO storage.BlockManagerMasterEndpoint: Registering block manager localhost:45968 with 517.4 MB RAM, BlockManagerId(driver, localhost, 45968)
18/11/03 15:01:39 INFO storage.BlockManagerMaster: Registered BlockManager
18/11/03 15:01:39 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.5 KB, free 517.3 MB)
18/11/03 15:01:39 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 14.2 KB, free 517.3 MB)
18/11/03 15:01:39 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:45968 (size: 14.2 KB, free: 517.4 MB)
18/11/03 15:01:39 INFO spark.SparkContext: Created broadcast 0 from textFile at test.scala:10
Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/usr/local/spark/mycode/wordcount/word.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:331)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:331)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:330)
	at WordCount$.main(test.scala:11)
	at WordCount.main(test.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
18/11/03 15:01:40 INFO spark.SparkContext: Invoking stop() from shutdown hook
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
18/11/03 15:01:40 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.10.251:4040
18/11/03 15:01:40 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/11/03 15:01:40 INFO storage.MemoryStore: MemoryStore cleared
18/11/03 15:01:40 INFO storage.BlockManager: BlockManager stopped
18/11/03 15:01:40 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
18/11/03 15:01:40 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
18/11/03 15:01:40 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
18/11/03 15:01:40 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/11/03 15:01:41 INFO spark.SparkContext: Successfully stopped SparkContext
18/11/03 15:01:41 INFO util.ShutdownHookManager: Shutdown hook called
18/11/03 15:01:41 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-8dfe652a-0642-4289-9687-0ff0af307dea/httpd-bde01377-d78e-4cdc-9f6b-d95543c17cd0
18/11/03 15:01:41 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-8dfe652a-0642-4289-9687-0ff0af307dea