1. 程式人生 > 實用技巧 >Flume基礎(十一):自定義 Interceptor

Flume基礎(十一):自定義 Interceptor

1)案例需求 使用 Flume 採集伺服器本地日誌,需要按照日誌型別的不同,將不同種類的日誌發往不同的分析系統。 2)需求分析   在實際的開發中,一臺伺服器產生的日誌型別可能有很多種,不同型別的日誌可能需要傳送到不同的分析系統。此時會用到 Flume 拓撲結構中的 Multiplexing 結構,Multiplexing的原理是,根據 event 中 Header 的某個 key 的值,將不同的 event 傳送到不同的 Channel中,所以我們需要自定義一個 Interceptor,為不同型別的 event 的 Header 中的 key 賦予不同的值。   在該案例中,我們以埠資料模擬日誌,以數字(單個)和字母(單個)模擬不同型別的日誌,我們需要自定義 interceptor 區分數字和字母,將其分別發往不同的分析系統(Channel)。 3)實現步驟 1.建立一個 maven 專案,並引入以下依賴。
<dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-core</artifactId>
 <version>1.7.0</version>
</dependency>
2.定義 CustomInterceptor 類並實現 Interceptor 介面。
package com.atguigu.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
public class CustomInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { byte[] body = event.getBody(); if (body[0] < 'z' && body[0] > 'a') { event.getHeaders().put("type", "letter"); } else if (body[0] > '0'
&& body[0] < '9') { event.getHeaders().put("type", "number"); } return event; } @Override public List<Event> intercept(List<Event> events) { for (Event event : events) { intercept(event); } return events; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override
public Interceptor build() { return new CustomInterceptor(); } @Override public void configure(Context context) { } } }
3.編輯 flume 配置檔案 為 hadoop102 上的 Flume1 配置 1 個 netcat source,1 個 sink group(2 個 avro sink),並配置相應的 ChannelSelector 和 interceptor。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = 
com.atguigu.flume.interceptor.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.mapping.number = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
為 hadoop103 上的 Flume2 配置一個 avro source 和一個 logger sink。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
為 hadoop104 上的 Flume3 配置一個 avro source 和一個 logger sink。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
4.分別在 hadoop102,hadoop103,hadoop104 上啟動 flume 程序,注意先後順序。 5.在 hadoop102 使用 netcat 向 localhost:44444 傳送字母和數字。 6.觀察 hadoop103 和 hadoop104 列印的日誌。