Flume NG原始碼分析(三)使用Event介面表示資料流
阿新 • • 發佈:2018-11-01
Flume NG有4個主要的元件:
Event表示在Flume各個Agent之間傳遞的資料流
Source表示從外部源接收Event資料流,然後傳遞給Channel
Channel表示對從Source傳遞的Event資料流的臨時儲存
Sink表示從Channel中接收儲存的Event資料流,並傳遞給下游的Source或者終點倉庫
這篇看一下Event介面表示的資料流。Source, Channel, Sink操作的資料流都是基於Event介面的封裝。
public interface Event { /** * Returns a map of name-value pairs describing the data stored in the body. */ public Map<String, String> getHeaders(); /** * Set the event headers * @param headers Map of headers to replace the current headers. */ public void setHeaders(Map<String, String> headers); /** * Returns the raw byte array of the data contained in this event. */ public byte[] getBody(); /** * Sets the raw byte array of the data contained in this event. * @param body The data. */ public void setBody(byte[] body); }
Event介面非常簡單,資料流分為兩個部分:訊息頭和訊息體。訊息頭是一個Key-Value的,儲存字串的結構。訊息體是普通的位元組陣列。
Event的類層次結構如下
來看一下常用的SimpleEvent的具體實現,ExecSource等元件都是使用它來封裝本地日誌資料。它的實現非常簡單,就是設定了header和body兩部分資料。
public class SimpleEvent implements Event { private Map<String, String> headers; private byte[] body; public SimpleEvent() { headers = new HashMap<String, String>(); body = new byte[0]; } @Override public Map<String, String> getHeaders() { return headers; } @Override public void setHeaders(Map<String, String> headers) { this.headers = headers; } @Override public byte[] getBody() { return body; } @Override public void setBody(byte[] body) { if(body == null){ body = new byte[0]; } this.body = body; } @Override public String toString() { Integer bodyLen = null; if (body != null) bodyLen = body.length; return "[Event headers = " + headers + ", body.length = " + bodyLen + " ]"; } }
看一下如何建立Event物件例項
public class EventBuilder { /** * Instantiate an Event instance based on the provided body and headers. * If <code>headers</code> is <code>null</code>, then it is ignored. * @param body * @param headers * @return */ public static Event withBody(byte[] body, Map<String, String> headers) { Event event = new SimpleEvent(); if(body == null) { body = new byte[0]; } event.setBody(body); if (headers != null) { event.setHeaders(new HashMap<String, String>(headers)); } return event; } }