1. 程式人生 > >Flume(ng) 自定義sink實現和屬性注入

Flume(ng) 自定義sink實現和屬性注入

最近需要利用flume來做收集遠端日誌,所以學習一些flume最基本的用法。這裡僅作記錄。

遠端日誌收集的整體思路是遠端自定義實現log4j的appender把訊息傳送到flume端,flume端自定義實現一個sink來按照我們的規則儲存日誌。

自定義Sink程式碼:

public class LocalFileLogSink extends AbstractSink implements Configurable {
     private static final Logger logger = LoggerFactory
              . getLogger(LocalFileLogSink
.class ); private static final String PROP_KEY_ROOTPATH = "rootPath"; private String rootPath; @Override public void configure(Context context) { String rootPath = context.getString(PROP_KEY_ROOTPATH ); setRootPath(rootPath); } @Override
public Status process() throws EventDeliveryException { logger .debug("Do process" ); }

實現Configurable介面,即可在初始化時,通過configure方法從context中獲取配置的引數的值。這裡,我們是想從flume的配置檔案中獲取rootPath的值,也就是日誌儲存的根路徑。在flume-conf.properties中配置如下:

agent.sinks = loggerSink
agent.sinks.loggerSink.rootPath = ./logs

loggerSink是自定義sink的名稱,我們取值時的key,只需要loggerSink後面的部分即可,即這裡的rootPath。

實際業務邏輯的執行,是通過繼承複寫AbstractSink中的process方法實現。從基類的getChannel方法中獲取通道,從中取出Event處理即可。

 Channel ch = getChannel();
            Transaction txn = ch.getTransaction();
          txn.begin();
           try {
               logger .debug("Get event." );
              Event event = ch.take();
              txn.commit();
              status = Status. READY ;
              return status;
                    finally {
              Log. info( "trx close.");
              txn.close();
          }