Flume基礎(十一):自定義 Interceptor
阿新 • • 發佈:2020-07-27
1)案例需求
使用 Flume 採集伺服器本地日誌,需要按照日誌型別的不同,將不同種類的日誌發往不同的分析系統。
2)需求分析
在實際的開發中,一臺伺服器產生的日誌型別可能有很多種,不同型別的日誌可能需要傳送到不同的分析系統。此時會用到 Flume 拓撲結構中的 Multiplexing 結構,Multiplexing的原理是,根據 event 中 Header 的某個 key 的值,將不同的 event 傳送到不同的 Channel中,所以我們需要自定義一個 Interceptor,為不同型別的 event 的 Header 中的 key 賦予不同的值。
在該案例中,我們以埠資料模擬日誌,以數字(單個)和字母(單個)模擬不同型別的日誌,我們需要自定義 interceptor 區分數字和字母,將其分別發往不同的分析系統(Channel)。
3)實現步驟
1.建立一個 maven 專案,並引入以下依賴。
<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency>2.定義 CustomInterceptor 類並實現 Interceptor 介面。
package com.atguigu.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.List;3.編輯 flume 配置檔案 為 hadoop102 上的 Flume1 配置 1 個 netcat source,1 個 sink group(2 個 avro sink),並配置相應的 ChannelSelector 和 interceptor。public class CustomInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { byte[] body = event.getBody(); if (body[0] < 'z' && body[0] > 'a') { event.getHeaders().put("type", "letter"); } else if (body[0] > '0'&& body[0] < '9') { event.getHeaders().put("type", "number"); } return event; } @Override public List<Event> intercept(List<Event> events) { for (Event event : events) { intercept(event); } return events; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Overridepublic Interceptor build() { return new CustomInterceptor(); } @Override public void configure(Context context) { } } }
# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.CustomInterceptor$Builder a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = type a1.sources.r1.selector.mapping.letter = c1 a1.sources.r1.selector.mapping.number = c2 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop103 a1.sinks.k1.port = 4141 a1.sinks.k2.type=avro a1.sinks.k2.hostname = hadoop104 a1.sinks.k2.port = 4242 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Use a channel which buffers events in memory a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2為 hadoop103 上的 Flume2 配置一個 avro source 和一個 logger sink。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = hadoop103 a1.sources.r1.port = 4141 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1為 hadoop104 上的 Flume3 配置一個 avro source 和一個 logger sink。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = hadoop104 a1.sources.r1.port = 4242 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c14.分別在 hadoop102,hadoop103,hadoop104 上啟動 flume 程序,注意先後順序。 5.在 hadoop102 使用 netcat 向 localhost:44444 傳送字母和數字。 6.觀察 hadoop103 和 hadoop104 列印的日誌。