Flume NG原始碼分析(四)使用ExecSource從本地日誌檔案中收集日誌
阿新 • • 發佈:2018-11-01
常見的日誌收集方式有兩種,一種是經由本地日誌檔案做媒介,非同步地傳送到遠端日誌倉庫,一種是基於RPC方式的同步日誌收集,直接傳送到遠端日誌倉庫。這篇講講Flume NG如何從本地日誌檔案中收集日誌。
ExecSource是用來執行本地shell命令,並把本地日誌檔案中的資料封裝成Event事件流在Flume NG中流動。它的典型配置如下,指定source型別是exec,指定Source下游的Channel是哪個,指定要執行的shell命令。最常用的命令就是tail -F命令,可以從本地日誌檔案中獲取新追加的日誌。
producer.sources.s1.type = exec producer.sources.s1.channels = channel producer.sources.s1.command = tail -F /data/logs/test.log
看一下ExecSource的實現流程
1. ExecSource維護了一個單執行緒的執行緒池executor,以及配置的shell命令,計數器等屬性
2. ExecRunnable物件實現了Runnable介面,被executor執行緒池執行。 ExecRunnable實現了獲取本地日誌的主要流程
3. ExecRunnable維護了一個定時執行的執行緒池timedFlushService,定時去檢查Event列表,如果符合批量輸出的要求,就批量flush event
4. ExecRunnable使用Runtime.getRuntime().exec以及java.lang.ProcessBuilder來使用Java平臺執行作業系統的Shell命令,並把這個Shell命令建立的程序的輸出流重定向到Java平臺的流,從而在Java平臺可以獲取到本地日誌檔案的資料。這裡的Shell命令是tail -F
這裡最主要的是步驟是在Java平臺中使用Shell命令來獲取本地日誌檔案的資料,主要的程式碼如下
// ExecRuannable.run() try { if(shell != null) { String[] commandArgs = formulateShellCommand(shell, command); process = Runtime.getRuntime().exec(commandArgs); } else { String[] commandArgs = command.split("\\s+"); process = new ProcessBuilder(commandArgs).start(); } reader = new BufferedReader( new InputStreamReader(process.getInputStream(), charset)); // 當tail -F沒有資料時,reader.readLine會阻塞,直到有資料到達 while ((line = reader.readLine()) != null) { synchronized (eventList) { sourceCounter.incrementEventReceivedCount(); eventList.add(EventBuilder.withBody(line.getBytes(charset))); if(eventList.size() >= bufferCount || timeout()) { flushEventBatch(eventList); } } }
將java.lang.Process代表的本地程序的輸出流重定向到Java的輸入流中,當tail -F沒有資料時,Java輸入流的reader.readLine會阻塞,直到有新資料到達。獲取到新資料後,首先是將資料封裝成Event,如果超過了批量限制,就flushEventBatch
flushEventBatch會將Event列表交給ChannelProcessor批量處理。
// EventBuilder.withBdoy
public static Event withBody(byte[] body, Map<String, String> headers) {
Event event = new SimpleEvent();
if(body == null) {
body = new byte[0];
}
event.setBody(body);
if (headers != null) {
event.setHeaders(new HashMap<String, String>(headers));
}
return event;
}
// ExecSource.flushEventBatch
private void flushEventBatch(List<Event> eventList){
channelProcessor.processEventBatch(eventList);
sourceCounter.addToEventAcceptedCount(eventList.size());
eventList.clear();
lastPushToChannel = systemClock.currentTimeMillis();
}
ExecSource是非同步收集本地日誌的實現,它不保證可靠性,比如Java平臺建立的tail -F程序出問題了,那麼目標日誌檔案的收集會收到影響。ExecSource的好處是效能比RPC方式要好,減少了網路的流量,同時避免了對應用程式的傾入性,可以無縫地接入。