1. 程式人生 > >通過flume把日誌檔案內容寫入kafka主題

通過flume把日誌檔案內容寫入kafka主題

首先自行安裝flume和 kafka當然還要jdk,我flume版本是1.6的kafka版本2.11,jdk1.8。

首先在路徑flume下的conf裡面建立一個logtokafka.conf檔案進行配置配置內容如下。

agent.sources=r1
agent.sinks=k1
agent.channels=c1

agent.sources.r1.type=exec
agent.sources.r1.command=tail -F /opt/log.log
agent.sources.r1.restart=true
agent.sources.r1.batchSize=1000
agent.sources.r1.batchTimeout=3000
agent.sources.r1.channels=c1

agent.channels.c1.type=memory
agent.channels.c1.capacity=102400
agent.channels.c1.transactionCapacity=1000

agent.channels.c1.byteCapacity=134217728
agent.channels.c1.byteCapacityBufferPercentage=80

agent.sinks.k1.channel=c1
agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.kafka.topic=logtokafka123
agent.sinks.k1.kafka.bootstrap.servers=192.168.72.129:9092,192.168.72.130:9092,192.168.72.131:9092
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
agent.sinks.k1.flumeBatchSize=1000
agent.sinks.k1.useFlumeEventFormat=true

配置conf 中的flume-env.sh 檔案,沒有的cp 一下flume-env.sh .template 。

export JAVA_HOME=/opt/jdk1.8.0_161/

啟動flume-agent 

./bin/flume-ng agent -c conf -f conf/logtokafka.conf -n agent -Dflume.root.logger=INFO,console 

最後可以通過以下命令在kafka下進行檢視主題中是否我們日誌裡面的內容:

 ./kafka-console-consumer.sh  --zookeeper localhost--topic logtokafka --from-beginning