基於Flume+kafka打造實時日誌收集分析系統
阿新 • • 發佈:2019-01-05
- Kafka broker
配置中sources的bind和port需修改為本機配置。sinks的brokerList為kafka broker叢集地址,無需修改。2. 修改log4j.propertiesflume.log.dir=/data/flume/flume-collecters/logs3. 啟動flumenohup bin/flume-ng agent -c conf -f conf/flume-collecters.properties -n agent -Dflume.root.logger=INFO,console,LOGFILE > nohup-collecters.out &flume-order:負責在每臺order-server機器上收集訂單產生的日誌,傳送至flume-collecters進行集中處理。#flume collecters agent.sources = s1Flume agent.channels = c1 agent.sinks =sinkKafka # For each one of the sources, the type is defined agent.sources.s1Flume.channels = c1 agent.sources.s1Flume.type = avro agent.sources.s1Flume.bind = 172.16.137.205 agent.sources.s1Flume.port = 6333 # The channel can be defined as follows. agent.sources.s1Flume.channels = c1 # Each sink's type must be defined agent.sinks.sinkKafka.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.sinkKafka.topic = topic-pear agent.sinks.sinkKafka.brokerList = 172.16.137.196:10985,172.16.137.197:10985,172.16.137.198:10985,172.16.137.199:10985 agent.sinks.sinkKafka.requiredAcks = 1 agent.sinks.sinkKafka.batchSize = 20 agent.sinks.sinkKafka.channel = c1 #Specify the channel the sink should use #agent.sinks.loggerSink.channel = memoryChannel # Each channel's type is defined. agent.channels.c1.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel agent.channels.c1.capacity = 100
- 新增flume-order-collect.properties
#order-server訂單日誌收集flume
agent.sources = fileSource
agent.channels = memoryChannel
agent.sinks = collecter1 collecter2
agent.sinkgroups = gCollecters
agent.sinkgroups.gCollecters.sinks = collecter1 collecter2
#sink排程模式 load_balance failover
agent.sinkgroups.gCollecters.processor.type = load_balance
#負載均衡模式 輪詢 random round_robin
agent.sinkgroups.gCollecters.processor.selector=round_robin
#失效降級
agent.sinkgroups.gCollecters.processor.backoff=true
#降級時間30秒
agent.sinkgroups.gCollecters.processor.maxTimeOut=30000
agent.sources.fileSource.type = exec
agent.sources.fileSource.command = tail -F /data/pear/v1/orderdb-server/COLLECT/DATA_COLLECT.log
#agent.sources.fileSource.charset=utf-8
agent.sources.fileSource.channels = memoryChannel
agent.sources.fileSource.restartThrottle = 10000
agent.sources.fileSource.restart = true
agent.sources.fileSource.logStdErr = true
# Each sink's type must be defined
agent.sinks.collecter1.channel = memoryChannel
agent.sinks.collecter1.type = avro
agent.sinks.collecter1.hostname = 172.16.137.205
agent.sinks.collecter1.port = 6333
agent.sinks.collecter1.batch-size = 10
agent.sinks.collecter2.channel = memoryChannel
agent.sinks.collecter2.type = avro
agent.sinks.collecter2.hostname = 172.16.137.206
agent.sinks.collecter2.port = 6333
agent.sinks.collecter2.batch-size = 10
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
#The max number of events stored in the channel
agent.channels.memoryChannel.capacity = 100
#The max number of events stored in the channel per transaction
agent.channels.memoryChannel.transactionCapacity = 100
#Timeout in seconds for adding or removing an event
agent.channels.memoryChannel.keep-alive=30
配置中ip、port為flumecollers對應的配置,無需修改。tail –F 命令需修改正確的日誌檔案路徑。- 修改log4j.properties
- 啟動flume