1. 程式人生 > 其它 >Spark(第二節)Spark角色介紹,Spark提交任務引數解釋,分別基於spark-shell、scala、java開發單詞計數方法

Spark(第二節)Spark角色介紹,Spark提交任務引數解釋,分別基於spark-shell、scala、java開發單詞計數方法

技術標籤:spark

目錄

Spark角色介紹

Spark架構圖如下:
在這裡插入圖片描述
Spark架構使用了分散式計算中master-slave模型,master是叢集中含有master程序的節點,slave是叢集中含有worker程序的節點。

  • Driver Program :運⾏main函式並且新建SparkContext的程式。
  • Application:基於Spark的應用程式,包含了driver程式和叢集上的executor。
  • Cluster Manager:指的是在叢集上獲取資源的外部服務。目前有三種類型
    (1)Standalone: spark原生的資源管理,由Master負責資源的分配
    (2)Apache Mesos:與hadoop MR相容性良好的一種資源排程框架
    (3)Hadoop Yarn: 主要是指Yarn中的ResourceManager
  • Worker Node: 叢集中任何可以執行Application程式碼的節點,在Standalone模式中指的是通過slaves檔案配置的Worker節點,在Spark on Yarn模式下就是NodeManager節點
  • Executor:是在一個worker node上為某應⽤啟動的⼀個程序,該程序負責運⾏任務,並且負責將資料存在記憶體或者磁碟上。每個應⽤都有各自獨立的executor。
  • Task :被送到某個executor上的工作單元。

Spark提交任務引數解釋

spark任務提交說明

一旦打包好,就可以使用bin/spark-submit指令碼啟動應用了. 這個指令碼負責設定spark使用的classpath和依賴,支援不同型別的叢集管理器和釋出模式:

bin/spark-submit \
  --class <main-class>
  --master <master-url> \
  --deploy-mode <
deploy-mode> \ --conf <key>=<value> \ ... # other options <application-jar> \ [application-arguments]

一些基本選項

  1. –class: 你的應用的啟動類 (如 org.apache.spark.examples.SparkPi)
  1. –master: 叢集的master URL (如 spark://node01:7077)
master引數引數含義
local本地以一個worker執行緒執行(例如非並行的情況).
local[K]本地以K worker 執行緒 (理想情況下, K設定為你機器的CPU核數).
local[*]本地以本機同樣核數的執行緒執行.
spark://HOST:PORT連線到指定的Spark standalone cluster master. 埠是你的master叢集配置的埠,預設值為7077.
mesos://HOST:PORT連線到指定的Mesos 叢集. Port是你配置的mesos埠, 預設是5050. 或者如果Mesos使用ZooKeeper,格式為 mesos://zk://…
yarn-client以client模式連線到YARN cluster. 叢集的位置基於HADOOP_CONF_DIR 變數找到.
yarn-cluster以cluster模式連線到YARN cluster. 叢集的位置基於HADOOP_CONF_DIR 變數找到.
  1. –deploy-mode: 是否釋出你的驅動到worker節點(cluster) 或者作為一個本地客戶端 (client) (default: client)*
  2. –conf: 任意的Spark配置屬性, 格式key=value. 如果值包含空格,可以加引號“key=value”. 預設的Spark配置
  3. application-jar: 打包好的應用jar,包含依賴. 這個URL在叢集中全域性可見。 比如hdfs:// 共享儲存系統, 如果是 file:// path, 那麼所有的節點的path都包含同樣的jar.
  4. application-arguments: 傳給main()方法的引數

引數提交說明

引數含義
–master MASTER_URL可以是spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local
–deploy-mode DEPLOY_MODEDriver程式執行的地方,client或者cluster
–class CLASS_NAME主類名稱,含包名
–name NAMEApplication名稱
–jars JARSDriver依賴的第三方jar包
–py-files PY_FILES用逗號隔開的放置在Python應用程式PYTHONPATH上的.zip, .egg, .py檔案列表
–files FILES用逗號隔開的要放置在每個executor工作目錄的檔案列表
–properties-file FILE設定應用程式屬性的檔案路徑,預設是conf/spark-defaults.conf
–driver-memory MEMDriver程式使用記憶體大小
–driver-library-pathDriver程式的庫路徑
–driver-class-pathDriver程式的類路徑
–executor-memory MEMexecutor記憶體大小,預設1G
–driver-cores NUMDriver程式的使用CPU個數,僅限於Spark Alone模式
–supervise失敗後是否重啟Driver,僅限於Spark Alone模式
–total-executor-cores NUMexecutor使用的總核數,僅限於Spark Alone、Spark on Mesos模式
–executor-cores NUM每個executor使用的核心數,預設為1,僅限於Spark on Yarn模式
–queue QUEUE_NAME提交應用程式給哪個YARN的佇列,預設是default佇列,僅限於Spark on Yarn模式
–num-executors NUM啟動的executor數量,預設是2個,僅限於Spark on Yarn模式
–archives ARCHIVES僅限於Spark on Yarn模式

基於spark-shell開發單詞計數方法

第一步:準備本地檔案

node01伺服器執行以下命令準備資料檔案

mkdir -p /export/servers/sparkdatas
cd /export/servers/sparkdatas/
vim wordcount.txt

hello me
hello you
hello her

第二步,進入spark-shell

由於是作測試,本地模式進入即可

cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/
bin/spark-shell --master local[2]

第三步:開發scala單詞統計程式碼

普通寫法

sc.textFile("file:///export/servers/sparkdatas/wordcount.txt").flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((x,y) => x + y).collect

簡介寫法(用_代替匿名函式中只使用一次的引數)

sc.textFile("file:///export/servers/sparkdatas/wordcount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).collect

程式碼說明:
sc:Spark-Shell中已經預設將SparkContext類初始化為物件sc。使用者程式碼如果需要用到,則直接應用sc即可。
textFile:讀取資料檔案
flatMap:對檔案中的每一行資料進行壓平切分,這裡按照空格分隔。
map:對出現的每一個單詞記為1(word,1)
reduceByKey:對相同的單詞出現的次數進行累加
collect:觸發任務執行,收集結果資料。

基於scala開發spark的單詞計數方法

第一步:建立maven工程並匯入jar包

pom如下

<properties>
   <scala.version>2.11.8</scala.version>
   <spark.version>2.2.0</spark.version>
</properties>
<dependencies>
   <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
       <version>${scala.version}</version>
   </dependency>
   <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_2.11</artifactId>
       <version>${spark.version}</version>
   </dependency>
   <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
       <version>2.7.5</version>
   </dependency>
</dependencies>
<build>
   <sourceDirectory>src/main/scala</sourceDirectory>
   <testSourceDirectory>src/test/scala</testSourceDirectory>
   <plugins>
       <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-compiler-plugin</artifactId>
           <version>3.0</version>
           <configuration>
               <source>1.8</source>
               <target>1.8</target>
               <encoding>UTF-8</encoding>
               <!--    <verbal>true</verbal>-->
           </configuration>
       </plugin>
       <plugin>
           <groupId>net.alchim31.maven</groupId>
           <artifactId>scala-maven-plugin</artifactId>
           <version>3.2.0</version>
           <executions>
               <execution>
                   <goals>
                       <goal>compile</goal>
                       <goal>testCompile</goal>
                   </goals>
                   <configuration>
                       <args>
                           <arg>-dependencyfile</arg>
                           <arg>${project.build.directory}/.scala_dependencies</arg>
                       </args>
                   </configuration>
               </execution>
           </executions>
       </plugin>
       <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-shade-plugin</artifactId>
           <version>3.1.1</version>
           <executions>
               <execution>
                   <phase>package</phase>
                   <goals>
                       <goal>shade</goal>
                   </goals>
                   <configuration>
                       <filters>
                           <filter>
                               <artifact>*:*</artifact>
                               <excludes>
                                   <exclude>META-INF/*.SF</exclude>
                                   <exclude>META-INF/*.DSA</exclude>
                                   <exclude>META-INF/*.RSA</exclude>
                               </excludes>
                           </filter>
                       </filters>
                       <transformers>
                           <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                               <mainClass></mainClass>
                           </transformer>
                       </transformers>
                   </configuration>
               </execution>
           </executions>
       </plugin>
   </plugins>
</build>

第二步:建立scala的object並開發scala程式碼

計劃將程式碼打包發到spark叢集上執行,所以將讀取檔案的路徑設為args(0),將儲存結果檔案的路徑設為args(1)

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    //設定spark的配置檔案資訊
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount")

    //構建sparkcontext上下文物件,它是程式的入口,所有計算的源頭
    val sc: SparkContext = new SparkContext(sparkConf)
    //讀取檔案
    val file: RDD[String] = sc.textFile(args(0))

    //對檔案中每一行單詞進行壓平切分
    val words: RDD[String] = file.flatMap(_.split(" "))
    //對每一個單詞計數為1 轉化為(單詞,1)
    val wordAndOne: RDD[(String, Int)] = words.map(x=>(x,1))
    //相同的單詞進行彙總 前一個下劃線表示累加資料,後一個下劃線表示新資料
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
    //儲存資料到HDFS
    result.saveAsTextFile(args(1))
    sc.stop()
  }
}

第三步:準備hdfs上的檔案

將之前的wordcount.txt,傳到hdfs上

hdfs dfs -mkdir /sparkwordcount
hdfs dfs -put wordcount.txt /sparkwordcount

第四步:程式碼打包提交到spark叢集執行

在IDEA中打包專案,將不含依賴的jar包上傳到spark叢集的某個機器上。這次使用on-yarn模式來執行。

bin/spark-submit --class wordcount.WordCount \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--total-executor-cores 2 \
/export/servers/spark-word-count.jar \
hdfs://node01:8020/sparkwordcount \
hdfs://node01:8020/sparkwordcount_out

基於java開發spark的單詞計數方法

在之前的scala專案中,繼續編寫java類即可,無需重新建立專案。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

public class WordCountJava
{
    public static void main(String[] args)
    {
        SparkConf conf=new SparkConf().setAppName("word-count").setMaster("local");
        JavaSparkContext context=new JavaSparkContext(conf);
        //讀取檔案
        JavaRDD<String> text=context.textFile("本地檔案路徑");
        //對每一行單詞進行切分
        JavaRDD<String> flatMapped=text.flatMap(new FlatMapFunction<String, String>()
        {
            @Override
            public Iterator<String> call(String s) throws Exception
            {
                String[] split=s.split(" ");
                Iterator<String> iterator=Arrays.asList(split).iterator();
                return iterator;
            }
        });
        //給每個單詞計為 1
        JavaPairRDD<String,Integer> wordAndOne=flatMapped.mapToPair(new PairFunction<String, String, Integer>()
        {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception
            {
                return Tuple2.apply(s,1);
            }
        });
        //相同單詞出現的次數累加
        JavaPairRDD<String,Integer> resultJavaPairRDD=wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>()
        {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception
            {
                return integer+integer2;
            }
        });
        //獲取結果
        List<Tuple2<String,Integer>> collect=resultJavaPairRDD.collect();
        for (Tuple2<String,Integer> t:collect)
            System.out.println(t.toString());
    }
}

由於Java本身不擅長開發Spark程式碼,所以這裡直接用windows機器上的檔案作測試即可,不打包在Spark叢集執行,也不生成結果檔案了。