1. 程式人生 > >SparkStreaming(14):log4j日誌-flume-kafka-SparkStreaming的整合

SparkStreaming(14):log4j日誌-flume-kafka-SparkStreaming的整合

一、功能實現

模擬log4j的日誌生產,將日誌輸出到flume伺服器。然後,通過flume將日誌資訊輸出到kafka,進而Streaming可以從kafka獲得日誌,並且進行簡單的處理。

二、步驟

1.目的:

使用log4j將日誌輸按照一定格式輸出,並且傳遞給flume伺服器特定埠接收資料。然後使用kafka接收,並使用streaming處理。

2.產生log4j日誌:


(1)在IDEA的test資料夾下面建立java測試資料夾,並且設定為測試程式碼!

(2)指定log4j日誌格式,並且和flume對接

 -》 新加test的resources資料夾,新建log4j.properties

log4j.rootCategory=INFO,stdout,flume
#...log4j輸出格式
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss}  [%t] [%C] [%p] - %m%n


#...log4j輸出到flume位置
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = bigdata.ibeifeng.com
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

實現功能:(b)指定日誌生產格式,(b)指定輸出到特定的flume伺服器埠,即與flume進行關聯

【參考官網:http://flume.apache.org/FlumeUserGuide.html搜尋Log4J Appender】

日誌格式:
2018-09-23 12:13:52  [main] [LoggerGenerator] [INFO] - current value is :0
2018-09-23 12:13:54  [main] [LoggerGenerator] [INFO] - current value is :1
2018-09-23 12:13:55  [main] [LoggerGenerator] [INFO] - current value is :2

(3)新增依賴

    <dependency>
      <groupId>org.apache.flume.flume-ng-clients</groupId>
      <artifactId>flume-ng-log4jappender</artifactId>
      <version>1.6.0</version>
    </dependency>

(4)重新執行java程式LoggerGenerator 

import org.apache.log4j.Logger;

public class LoggerGenerator {
    private static Logger logger= Logger.getLogger(LoggerGenerator.class.getName());

    public static void main(String[] args) throws Exception{
        int index=0;
        while (true){
            Thread.sleep(100);
            logger.info("value is :"+ index++);
        }
    }
}

 

3.flume接收日誌配置

(1)flume日誌檔案streaming2.conf

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = streamingtopic
agent1.sinks.kafka-sink.brokerList = bigdata.ibeifeng.com:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

(2)啟動flume【暫時不啟動,因為kafka還沒有啟動,啟動後不會報錯,但是一旦有資料,就會報錯!】

bin/flume-ng agent --conf conf --conf-file conf/streaming2.conf --name agent1 -Dflume.root.logger=INFO,console

 

4.kafka接收flume傳遞的資料

(1)啟動zookeeper

(2)啟動kafka server

bin/kafka-server-start.sh -daemon config/server.properties 

(3)建立topic

bin/kafka-topics.sh --create --topic streamingtopic --zookeeper bigdata.ibeifeng.com:2181/kafka08 --partitions 1 --replication-factor 1

(4)進行簡單測試,驗證從日誌到kafka的流程

-》開啟flume

bin/flume-ng agent --conf conf --conf-file conf/streaming2.conf --name agent1 -Dflume.root.logger=INFO,console

-》開啟kafka消費者

bin/kafka-console-consumer.sh --topic streamingtopic --zookeeper bigdata.ibeifeng.com:2181/kafka08

(經測試成功!)

 

5.spark streaming程式碼處理從kafka得到的資訊

(1)程式碼

package Spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
  * Streaming和kafka對接
  */
object KafkaStreamingApp_product {
  def main(args: Array[String]): Unit = {
    if(args.length!=4){
      System.err.println("Usage: KafkaStreamingApp_product <zkQuorum><group><topics><numThreads>")
    }

    val Array(zkQuorum,group,topics,numThreads)=args
    //因為這個是生產環境,所以註釋
    val sparkConf=new SparkConf().setAppName("KafkaStreamingApp_product")
      .setMaster("local[2]")

    val ssc=new StreamingContext(sparkConf,Seconds(5))

    val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap
    //TODO: Spark streaming如何對接kafka
    //參考原始碼createStream
    val messages: ReceiverInputDStream[(String, String)] =KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
    //取第2個
//    messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
    messages.map(_._2).count().print()


    ssc.start()
    ssc.awaitTermination()
  }
}

(2)執行環境配置,新增引數

bigdata.ibeifeng.com:2181/kafka08 test streamingtopic 1

三、測試

1.啟動zk

2.啟動flume

bin/flume-ng agent --conf conf --conf-file conf/streaming2.conf --name agent1 -Dflume.root.logger=INFO,console

3.啟動kafka伺服器

bin/kafka-server-start.sh -daemon config/server.properties 

4.啟動日誌生產類LoggerGenerator

5.啟動SparkStreaming類KafkaStreamingApp_product

 

(經測試,成功!)