1. 程式人生 > >Flume 、Kafka 與SparkStreaming 整合程式設計

Flume 、Kafka 與SparkStreaming 整合程式設計

Flume 、Kafka 與SparkStreaming 整合程式設計

一、 Kafka 與SparkStreaming 整合程式設計
1、程式
pull方式,可靠Recerver ,工作常用
com.imooc.spark . FlumePullWordCount .scala

package com.imooc.spark
import
org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.flume.FlumeUtils
/** * 可靠Recerver ,工作常用 */ object
FlumePullWordCount { def main(args: Array[ String ]): Unit = { if (args.length != 2 ) { System. err .println( "Usage: FlumePushWordCount <hostname> <port>" ) System.
exit ( 1 ) } val Array (hostname, port) = args val sparkConf = new SparkConf() //.setMaster("local[2]").setAppName("FlumePullWordCount") val ssc = new StreamingContext(sparkConf, Seconds ( 5 ))
val flumeStreame=FlumeUtils. createPollingStream (ssc,hostname, port.toInt) flumeStreame.map(x=> new String(x.event.getBody.array()).trim).flatMap(_.split( " " )).map((_, 1 )).reduceByKey(_+_).print()
ssc.start() ssc.awaitTermination() } }

push方式
com.imooc.spark . FlumePushWordCount .scala
package com.imooc.spark
import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext}
object FlumePushWordCount { def main(args: Array[ String ]): Unit = { if (args.length != 2 ) { System.err.println("Usage: FlumePushWordCount <hostname> <port>") System.exit(1) } val Array (hostname, port) = args val sparkConf = new SparkConf() //.setMaster("local[2]").setAppName("FlumePushWordCount") val ssc = new StreamingContext(sparkConf, Seconds ( 5 ))
val flumeStreame=FlumeUtils. createStream (ssc,hostname, port.toInt) flumeStreame.map(x=> new String(x. event .getBody.array()).trim).flatMap(_.split( " " )).map((_, 1 )).reduceByKey(_+_).print()
ssc.start() ssc.awaitTermination() } }

pom.xml檔案
<project xmlns=" http://maven.apache.org/POM/4.0.0 " xmlns:xsi=" http://www.w3.org/2001/XMLSchema-instance "          xsi:schemaLocation=" http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd ">     <modelVersion>4.0.0</modelVersion>     <groupId>com.imooc.spark</groupId>     <artifactId>sparktrain</artifactId>     <version>1.0</version>     <inceptionYear>2008</inceptionYear>     <properties>         <scala.version>2.11.8</scala.version>         <kafka.version>0.9.0.0</kafka.version>         <spark.version>2.2.0</spark.version>         <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>         <hbase.version>1.2.0-cdh5.7.0</hbase.version>     </properties>
    <!--新增cloudera的repository-->     <repositories>         <repository>             <id>cloudera</id>             <url> https://repository.cloudera.com/artifactory/cloudera-repos </url>         </repository>     </repositories>
    <dependencies>         <dependency>             <groupId>org.scala-lang</groupId>             <artifactId>scala-library</artifactId>             <version>${scala.version}</version>         </dependency>
        <!-- Kafka 依賴-->         <!--         <dependency>             <groupId>org.apache.kafka</groupId>             <artifactId>kafka_2.11</artifactId>             <version>${kafka.version}</version>         </dependency>         -->
        <!-- Hadoop 依賴-->         <dependency>             <groupId>org.apache.hadoop</groupId>             <artifactId>hadoop-client</artifactId>             <version>${hadoop.version}</version>         </dependency>
        <!-- HBase 依賴-->         <dependency>             <groupId>org.apache.hbase</groupId>             <artifactId>hbase-client</artifactId>             <version>${hbase.version}</version>         </dependency>
        <dependency>             <groupId>org.apache.hbase</groupId>             <artifactId>hbase-server</artifactId>             <version>${hbase.version}</version>         </dependency>
        <!-- Spark Streaming 依賴-->         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-streaming_2.11</artifactId>             <version>${spark.version}</version>         </dependency>

        <!-- Spark Streaming整合Flume 依賴-->         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-streaming-flume_2.11</artifactId>             <version>${spark.version}</version>         </dependency>
        <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-streaming-flume-sink_2.11</artifactId>             <version>${spark.version}</version>         </dependency>
        <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>             <version>${spark.version}</version>         </dependency>
        <dependency>             <groupId> org.apache.commons </groupId>             <artifactId>commons-lang3</artifactId>             <version>3.5</version>         </dependency>
        <!-- Spark SQL 依賴-->         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-sql_2.11</artifactId>             <version>${spark.version}</version>         </dependency>

        <dependency>             <groupId>com.fasterxml.jackson.module</groupId>             <artifactId>jackson-module-scala_2.11</artifactId>             <version>2.6.5</version>         </dependency>
        <dependency>             <groupId>net.jpountz.lz4</groupId>             <artifactId>lz4</artifactId>             <version>1.3.0</version>         </dependency>
        <dependency>              <groupId>mysql</groupId>             <artifactId>mysql-connector-java</artifactId>             <version>5.1.38</version>         </dependency>
        <dependency>             <groupId>org.apache.flume.flume-ng-clients</groupId>             <artifactId>flume-ng-log4jappender</artifactId>             <version>1.6.0</version>         </dependency>
    </dependencies>
    <build>         <sourceDirectory>src/main/scala</sourceDirectory>         <testSourceDirectory>src/test/scala</testSourceDirectory>         <plugins>             <plugin>                 <groupId>org.scala-tools</groupId>                 <artifactId>maven-scala-plugin</artifactId>                 <executions>                     <execution>                         <goals>                             <goal>compile</goal>                             <goal>testCompile</goal>                         </goals>                     </execution>                 </executions>                 <configuration>                     <scalaVersion>${scala.version}</scalaVersion>                     <args>                         <arg>-target:jvm-1.5</arg>                     </args>                 </configuration>             </plugin>             <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-eclipse-plugin</artifactId>                 <configuration>                     <downloadSources>true</downloadSources>                     <buildcommands>                         <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>                     </buildcommands>                     <additionalProjectnatures>                         <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>                     </additionalProjectnatures>                     <classpathContainers>                         <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>                         <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>                     </classpathContainers>                 </configuration>             </plugin>         </plugins>     </build>     <reporting>         <plugins>             <plugin>                 <groupId>org.scala-tools</groupId>                 <artifactId>maven-scala-plugin</artifactId>                 <configuration>                     <scalaVersion>${scala.version}</scalaVersion>                 </configuration>             </plugin>         </plugins>     </reporting> </project>

2、部署
1)、kafka部署   
 啟動kafka : kafka-server-start .sh $KAFKA_HOME/config /server .properties  建立topic : kafka-topics .sh --create --zookeeper hadoop000 :2181 --replication-factor 1 --partitions 1 --topic test    生產者         :   kafka-console-producer.sh --broker-list   hadoop000 :9092 --topic test
2)、提交作業(非聯網環境,不用packages ,而是用jars)
spark-submit \ --class com.imooc.spark. KafkaDirectWordCount \ --master local[2] \ --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0    \ #  --jars  spark-streaming-kafka-0-8-assembly had oop000 :9092 test

/www/lib/sparktrain-1.0.jar \ hadoop000 414 二、 Kafka 與SparkStreaming 整合程式設計
1、程式
com.imooc.spark.KafkaDirectWordCount.scala
package com.imooc.spark

import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext}
//Spark Streaming對接Kafka的方式二
object KafkaDirectWordCount {
def main(args: Array[String]): Unit = {
if (args.length != 2) { System. err .println( "Usage: KafkaDirectWordCount <brokers> <topics>" ) System. exit (1) }
val Array (brokers, topics) = args
val sparkConf = new SparkConf() //.setAppName("KafkaReceiverWordCount") //.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds (5))
val topicsSet = topics.split( "," ).toSet val kafkaParams = Map [String,String]( "metadata.broker.list" -> brokers)
// TODO... Spark Streaming如何對接Kafka val messages = KafkaUtils. createDirectStream [String,String,StringDecoder,StringDecoder]( ssc,kafkaParams,topicsSet )
// TODO... 自己去測試為什麼要取第二個 messages.map(_._2).flatMap(_.split( " " )).map((_,1)).reduceByKey(_+_).print()
ssc.start() ssc.awaitTermination() } }
pom.xml檔案
<project xmlns=" http://maven.apache.org/POM/4.0.0 " xmlns:xsi=" http://www.w3.org/2001/XMLSchema-instance "          xsi:schemaLocation=" http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd ">     <modelVersion>4.0.0</modelVersion>     <groupId>com.imooc.spark</groupId>     <artifactId>sparktrain</artifactId>     <version>1.0</version>     <inceptionYear>2008</inceptionYear>     <properties>         <scala.version>2.11.8</scala.version>         <kafka.version>0.9.0.0</kafka.version>         <spark.version>2.2.0</spark.version>         <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>         <hbase.version>1.2.0-cdh5.7.0</hbase.version>     </properties>
    <!--新增cloudera的repository-->     <repositories>         <repository>             <id>cloudera</id>             <url> https://repository.cloudera.com/artifactory/cloudera-repos </url>         </repository>     </repositories>
    <dependencies>         <dependency>             <groupId>org.scala-lang</groupId>             <artifactId>scala-library</artifactId>             <version>${scala.version}</version>         </dependency>
        <!-- Kafka 依賴-->         <!--         <dependency>             <groupId>org.apache.kafka</groupId>             <artifactId>kafka_2.11</artifactId>             <version>${kafka.version}</version>         </dependency>         -->
        <!-- Hadoop 依賴-->         <dependency>             <groupId>org.apache.hadoop</groupId>             <artifactId>hadoop-client</artifactId>             <version>${hadoop.version}</version>         </dependency>
        <!-- HBase 依賴-->         <dependency>             <groupId>org.apache.hbase</groupId>             <artifactId>hbase-client</artifactId>             <version>${hbase.version}</version>         </dependency>
        <dependency>             <groupId>org.apache.hbase</groupId>             <artifactId>hbase-server</artifactId>             <version>${hbase.version}</version>         </dependency>
        <!-- Spark Streaming 依賴-->         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-streaming_2.11</artifactId>             <version>${spark.version}</version>         </dependency>

        <!-- Spark Streaming整合Flume 依賴-->         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-streaming-flume_2.11</artifactId>             <version>${spark.version}</version>         </dependency>
        <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-streaming-flume-sink_2.11</artifactId>             <version>${spark.version}</version>         </dependency>
        <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>             <version>${spark.version}</version>         </dependency>
        <dependency>             <groupId> org.apache.commons </groupId>             <artifactId>commons-lang3</artifactId>             <version>3.5</version>         </dependency>
        <!-- Spark SQL 依賴-->         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-sql_2.11</artifactId>             <version>${spark.version}</version>         </dependency>

        <dependency>             <groupId>com.fasterxml.jackson.module</groupId>             <artifactId>jackson-module-scala_2.11</artifactId>             <version>2.6.5</version>         </dependency>
        <dependency>             <groupId>net.jpountz.lz4</groupId>             <artifactId>lz4</artifactId>             <version>1.3.0</version>         </dependency>
        <dependency>              <groupId>mysql</groupId>             <artifactId>mysql-connector-java</artifactId>             <version>5.1.38</version>         </dependency>
        <dependency>             <groupId>org.apache.flume.flume-ng-clients</groupId>             <artifactId>flume-ng-log4jappender</artifactId>             <version>1.6.0</version>         </dependency>
    </dependencies>
    <build>         <sourceDirectory>src/main/scala</sourceDirectory>         <testSourceDirectory>src/test/scala</testSourceDirectory>         <plugins>             <plugin>                 <groupId>org.scala-tools</groupId>                 <artifactId>maven-scala-plugin</artifactId>                 <executions>                     <execution>                         <goals>                             <goal>compile</goal>                             <goal>testCompile</goal>                         </goals>                     </execution>                 </executions>                 <configuration>                     <scalaVersion>${scala.version}</scalaVersion>                     <args>                         <arg>-target:jvm-1.5</arg>                     </args>                 </configuration>             </plugin>             <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-eclipse-plugin</artifactId>                 <configuration>                     <downloadSources>true</downloadSources>                     <buildcommands>                         <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>                     </buildcommands>                     <additionalProjectnatures>                         <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>                     </additionalProjectnatures>                     <classpathContainers>                         <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>                         <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>                     </classpathContainers>                 </configuration>             </plugin>         </plugins>     </build>