1. 程式人生 > >kafka無法收到flume採集的資料的解決辦法

kafka無法收到flume採集的資料的解決辦法

問題重現

在寫黑名單那篇博文的時候,我是通過直接copy log日誌檔案到監控目錄下的方式來模擬資料的,在前幾次模擬訪問日誌檔案的時候挺正常的,copy進去基本都是秒採集(檔案顯示直接加了.COMPLETED字尾)。

但到後來再往採集目錄下copy log日誌檔案的時候,待採集目錄下的檔案並不會顯示被採集(檔案沒有.COMPLETED字尾),kafka也一直收不到flume採集來的資料。但重啟flume客戶端後又會正常採集。經過多方排查後,最終定位問題在flume配置檔案有錯,之前flume配置檔案如下:

flume2kafka.sources=r3
flume2kafka.sinks=k3
flume2kafka.channels=c3

# 配置source
flume2kafka.sources.r3.type = spooldir
flume2kafka.sources.r3.channels = c3
flume2kafka.sources.r3.spoolDir = /home/jbw/log/
flume2kafka.sources.r3.fileHeader = true

# 配置channel,將buffer事件放在記憶體中
flume2kafka.channels.c3.type = memory
flume2kafka.channels.c3.capacity = 100000
flume2kafka.channels.c3.transactionCapacity = 1000

# 配置sink
flume2kafka.sinks.k3.type=org.apache.flume.sink.kafka.KafkaSink
flume2kafka.sinks.k3.brokerList=172.18.9.119:9092,172.18.9.120:9092,172.18.9.121:9092
flume2kafka.sinks.k3.topic=peopleVisitTopic
flume2kafka.sinks.k3.requiredAcks = 0
flume2kafka.sinks.k3.batchSize = 20
#flume2kafka.sinks.k3.channel = memcnl

# 把source和sink繫結在channel上
flume2kafka.sources.r3.channels=c3

解決方案

檢查source和channel部分配置都沒問題,後來發現是sinks.k3.requiredAcks這一引數導致的問題,之前是0,現在改為1後

配置檔案如下,重新執行flume客戶端,發現kafka消費端會收到資料了,問題解決。

flume2kafka.sources=r3
flume2kafka.sinks=k3
flume2kafka.channels=c3

# 配置source
flume2kafka.sources.r3.type = spooldir
flume2kafka.sources.r3.spoolDir = /home/jbw/log/
flume2kafka.sources.r3.fileHeader = true

# 配置channel,將buffer事件放在記憶體中
flume2kafka.channels.c3.type = memory
flume2kafka.channels.c3.capacity = 10000
flume2kafka.channels.c3.transactionCapacity = 1000

# 配置sink
flume2kafka.sinks.k3.type=org.apache.flume.sink.kafka.KafkaSink
flume2kafka.sinks.k3.brokerList=172.18.9.119:9092,172.18.9.120:9092,172.18.9.121:9092
flume2kafka.sinks.k3.topic=peopleVisitTopic
flume2kafka.sinks.k3.serializer.class=kafka.serializer.StringEncoder
flume2kafka.sinks.k3.requiredAcks = 1
flume2kafka.sinks.k3.batchSize = 20

# 把source和sink繫結在channel上
flume2kafka.sources.r3.channels=c3
flume2kafka.sinks.k3.channel=c3

原因分析  

我們先看下 request.required.acks 這個引數到底是什麼玩意。

它其實是配置 kafka 中的 ack確認機制的引數,kafka 中有三種確認機制,如下:

  •  機制一:producer 端不等待來自 broker 的確認訊息,直接傳送下一條訊息。
  •  機制二:producer 端會在得到 leader broker 確認收到資料訊息後,才傳送下一條訊息。
  •  機制三:producer 端會在得到所有 follower broker 副本確認收到資料訊息後,才會傳送下一條資料。

現在大家應該就明白 requiredAcks 引數的意義了,它就是制定需要多少副本確認收到訊息才會被認定為成功寫入。

不難發現,三種機制,kafka 效能依次遞減 (producer吞吐量降低),但資料安全性依次遞增。

request.required.acks 意義
0 代表不需要等待任何 broker 的確認,對應於機制一。
1 代表僅需要角色為 leader 的 broker 確認收到訊息即可,對應於機制二。
-1 代表需要所有 follower broker 副本都需要確認收到訊息才行,對應於機制三。

我們之前遇到的問題,可能是 kafka 的 leader broker 掛掉了,但由於 request.required.acks 引數指定的是0,說明 producer 端不需要經過broker端的確認就可以發下一條資料,然而此時我們 broker 已經掛了,你再寫就沒用了。故在保持效能的情況下,將此引數改為1即可。