Spark(第二節)Spark角色介紹,Spark提交任務引數解釋,分別基於spark-shell、scala、java開發單詞計數方法
阿新 • • 發佈:2021-02-18
技術標籤:spark
目錄
Spark角色介紹
Spark架構圖如下:
Spark架構使用了分散式計算中master-slave模型,master是叢集中含有master程序的節點,slave是叢集中含有worker程序的節點。
- Driver Program :運⾏main函式並且新建SparkContext的程式。
- Application:基於Spark的應用程式,包含了driver程式和叢集上的executor。
- Cluster Manager:指的是在叢集上獲取資源的外部服務。目前有三種類型
(1)Standalone: spark原生的資源管理,由Master負責資源的分配
(2)Apache Mesos:與hadoop MR相容性良好的一種資源排程框架
(3)Hadoop Yarn: 主要是指Yarn中的ResourceManager - Worker Node: 叢集中任何可以執行Application程式碼的節點,在Standalone模式中指的是通過slaves檔案配置的Worker節點,在Spark on Yarn模式下就是NodeManager節點
- Executor:是在一個worker node上為某應⽤啟動的⼀個程序,該程序負責運⾏任務,並且負責將資料存在記憶體或者磁碟上。每個應⽤都有各自獨立的executor。
- Task :被送到某個executor上的工作單元。
Spark提交任務引數解釋
spark任務提交說明
一旦打包好,就可以使用bin/spark-submit指令碼啟動應用了. 這個指令碼負責設定spark使用的classpath和依賴,支援不同型別的叢集管理器和釋出模式:
bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode < deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
一些基本選項
- –class: 你的應用的啟動類 (如 org.apache.spark.examples.SparkPi)
- –master: 叢集的master URL (如 spark://node01:7077)
master引數 | 引數含義 |
---|---|
local | 本地以一個worker執行緒執行(例如非並行的情況). |
local[K] | 本地以K worker 執行緒 (理想情況下, K設定為你機器的CPU核數). |
local[*] | 本地以本機同樣核數的執行緒執行. |
spark://HOST:PORT | 連線到指定的Spark standalone cluster master. 埠是你的master叢集配置的埠,預設值為7077. |
mesos://HOST:PORT | 連線到指定的Mesos 叢集. Port是你配置的mesos埠, 預設是5050. 或者如果Mesos使用ZooKeeper,格式為 mesos://zk://… |
yarn-client | 以client模式連線到YARN cluster. 叢集的位置基於HADOOP_CONF_DIR 變數找到. |
yarn-cluster | 以cluster模式連線到YARN cluster. 叢集的位置基於HADOOP_CONF_DIR 變數找到. |
- –deploy-mode: 是否釋出你的驅動到worker節點(cluster) 或者作為一個本地客戶端 (client) (default: client)*
- –conf: 任意的Spark配置屬性, 格式key=value. 如果值包含空格,可以加引號“key=value”. 預設的Spark配置
- application-jar: 打包好的應用jar,包含依賴. 這個URL在叢集中全域性可見。 比如hdfs:// 共享儲存系統, 如果是 file:// path, 那麼所有的節點的path都包含同樣的jar.
- application-arguments: 傳給main()方法的引數
引數提交說明
引數 | 含義 |
---|---|
–master MASTER_URL | 可以是spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local |
–deploy-mode DEPLOY_MODE | Driver程式執行的地方,client或者cluster |
–class CLASS_NAME | 主類名稱,含包名 |
–name NAME | Application名稱 |
–jars JARS | Driver依賴的第三方jar包 |
–py-files PY_FILES | 用逗號隔開的放置在Python應用程式PYTHONPATH上的.zip, .egg, .py檔案列表 |
–files FILES | 用逗號隔開的要放置在每個executor工作目錄的檔案列表 |
–properties-file FILE | 設定應用程式屬性的檔案路徑,預設是conf/spark-defaults.conf |
–driver-memory MEM | Driver程式使用記憶體大小 |
–driver-library-path | Driver程式的庫路徑 |
–driver-class-path | Driver程式的類路徑 |
–executor-memory MEM | executor記憶體大小,預設1G |
–driver-cores NUM | Driver程式的使用CPU個數,僅限於Spark Alone模式 |
–supervise | 失敗後是否重啟Driver,僅限於Spark Alone模式 |
–total-executor-cores NUM | executor使用的總核數,僅限於Spark Alone、Spark on Mesos模式 |
–executor-cores NUM | 每個executor使用的核心數,預設為1,僅限於Spark on Yarn模式 |
–queue QUEUE_NAME | 提交應用程式給哪個YARN的佇列,預設是default佇列,僅限於Spark on Yarn模式 |
–num-executors NUM | 啟動的executor數量,預設是2個,僅限於Spark on Yarn模式 |
–archives ARCHIVES | 僅限於Spark on Yarn模式 |
基於spark-shell開發單詞計數方法
第一步:準備本地檔案
node01伺服器執行以下命令準備資料檔案
mkdir -p /export/servers/sparkdatas
cd /export/servers/sparkdatas/
vim wordcount.txt
hello me
hello you
hello her
第二步,進入spark-shell
由於是作測試,本地模式進入即可
cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/
bin/spark-shell --master local[2]
第三步:開發scala單詞統計程式碼
普通寫法
sc.textFile("file:///export/servers/sparkdatas/wordcount.txt").flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((x,y) => x + y).collect
簡介寫法(用_代替匿名函式中只使用一次的引數)
sc.textFile("file:///export/servers/sparkdatas/wordcount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).collect
程式碼說明:
sc:Spark-Shell中已經預設將SparkContext類初始化為物件sc。使用者程式碼如果需要用到,則直接應用sc即可。
textFile:讀取資料檔案
flatMap:對檔案中的每一行資料進行壓平切分,這裡按照空格分隔。
map:對出現的每一個單詞記為1(word,1)
reduceByKey:對相同的單詞出現的次數進行累加
collect:觸發任務執行,收集結果資料。
基於scala開發spark的單詞計數方法
第一步:建立maven工程並匯入jar包
pom如下
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
第二步:建立scala的object並開發scala程式碼
計劃將程式碼打包發到spark叢集上執行,所以將讀取檔案的路徑設為args(0)
,將儲存結果檔案的路徑設為args(1)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
//設定spark的配置檔案資訊
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount")
//構建sparkcontext上下文物件,它是程式的入口,所有計算的源頭
val sc: SparkContext = new SparkContext(sparkConf)
//讀取檔案
val file: RDD[String] = sc.textFile(args(0))
//對檔案中每一行單詞進行壓平切分
val words: RDD[String] = file.flatMap(_.split(" "))
//對每一個單詞計數為1 轉化為(單詞,1)
val wordAndOne: RDD[(String, Int)] = words.map(x=>(x,1))
//相同的單詞進行彙總 前一個下劃線表示累加資料,後一個下劃線表示新資料
val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
//儲存資料到HDFS
result.saveAsTextFile(args(1))
sc.stop()
}
}
第三步:準備hdfs上的檔案
將之前的wordcount.txt,傳到hdfs上
hdfs dfs -mkdir /sparkwordcount
hdfs dfs -put wordcount.txt /sparkwordcount
第四步:程式碼打包提交到spark叢集執行
在IDEA中打包專案,將不含依賴的jar包上傳到spark叢集的某個機器上。這次使用on-yarn模式來執行。
bin/spark-submit --class wordcount.WordCount \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--total-executor-cores 2 \
/export/servers/spark-word-count.jar \
hdfs://node01:8020/sparkwordcount \
hdfs://node01:8020/sparkwordcount_out
基於java開發spark的單詞計數方法
在之前的scala專案中,繼續編寫java類即可,無需重新建立專案。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class WordCountJava
{
public static void main(String[] args)
{
SparkConf conf=new SparkConf().setAppName("word-count").setMaster("local");
JavaSparkContext context=new JavaSparkContext(conf);
//讀取檔案
JavaRDD<String> text=context.textFile("本地檔案路徑");
//對每一行單詞進行切分
JavaRDD<String> flatMapped=text.flatMap(new FlatMapFunction<String, String>()
{
@Override
public Iterator<String> call(String s) throws Exception
{
String[] split=s.split(" ");
Iterator<String> iterator=Arrays.asList(split).iterator();
return iterator;
}
});
//給每個單詞計為 1
JavaPairRDD<String,Integer> wordAndOne=flatMapped.mapToPair(new PairFunction<String, String, Integer>()
{
@Override
public Tuple2<String, Integer> call(String s) throws Exception
{
return Tuple2.apply(s,1);
}
});
//相同單詞出現的次數累加
JavaPairRDD<String,Integer> resultJavaPairRDD=wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>()
{
@Override
public Integer call(Integer integer, Integer integer2) throws Exception
{
return integer+integer2;
}
});
//獲取結果
List<Tuple2<String,Integer>> collect=resultJavaPairRDD.collect();
for (Tuple2<String,Integer> t:collect)
System.out.println(t.toString());
}
}
由於Java本身不擅長開發Spark程式碼,所以這裡直接用windows機器上的檔案作測試即可,不打包在Spark叢集執行,也不生成結果檔案了。