Flume(ng) 自定義sink實現和屬性注入
阿新 • • 發佈:2018-12-23
最近需要利用flume來做收集遠端日誌,所以學習一些flume最基本的用法。這裡僅作記錄。
遠端日誌收集的整體思路是遠端自定義實現log4j的appender把訊息傳送到flume端,flume端自定義實現一個sink來按照我們的規則儲存日誌。
自定義Sink程式碼:
public class LocalFileLogSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory
. getLogger(LocalFileLogSink .class );
private static final String PROP_KEY_ROOTPATH = "rootPath";
private String rootPath;
@Override
public void configure(Context context) {
String rootPath = context.getString(PROP_KEY_ROOTPATH );
setRootPath(rootPath);
}
@Override
public Status process() throws EventDeliveryException {
logger .debug("Do process" );
}
}
實現Configurable介面,即可在初始化時,通過configure方法從context中獲取配置的引數的值。這裡,我們是想從flume的配置檔案中獲取rootPath的值,也就是日誌儲存的根路徑。在flume-conf.properties中配置如下:
agent.sinks = loggerSink agent.sinks.loggerSink.rootPath = ./logs
loggerSink是自定義sink的名稱,我們取值時的key,只需要loggerSink後面的部分即可,即這裡的rootPath。
實際業務邏輯的執行,是通過繼承複寫AbstractSink中的process方法實現。從基類的getChannel方法中獲取通道,從中取出Event處理即可。
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
logger .debug("Get event." );
Event event = ch.take();
txn.commit();
status = Status. READY ;
return status;
}finally {
Log. info( "trx close.");
txn.close();
}