1. 程式人生 > >Flume NG原始碼分析(三)使用Event介面表示資料流

Flume NG原始碼分析(三)使用Event介面表示資料流

Flume NG有4個主要的元件:

Event表示在Flume各個Agent之間傳遞的資料流

Source表示從外部源接收Event資料流,然後傳遞給Channel

Channel表示對從Source傳遞的Event資料流的臨時儲存

Sink表示從Channel中接收儲存的Event資料流,並傳遞給下游的Source或者終點倉庫

這篇看一下Event介面表示的資料流。Source, Channel, Sink操作的資料流都是基於Event介面的封裝。


public interface Event {

  /**
   * Returns a map of name-value pairs describing the data stored in the body.
   */
  public Map<String, String> getHeaders();

  /**
   * Set the event headers
   * @param headers Map of headers to replace the current headers.
   */
  public void setHeaders(Map<String, String> headers);

  /**
   * Returns the raw byte array of the data contained in this event.
   */
  public byte[] getBody();

  /**
   * Sets the raw byte array of the data contained in this event.
   * @param body The data.
   */
  public void setBody(byte[] body);

}

Event介面非常簡單,資料流分為兩個部分:訊息頭和訊息體。訊息頭是一個Key-Value的,儲存字串的結構。訊息體是普通的位元組陣列。


Event的類層次結構如下


來看一下常用的SimpleEvent的具體實現,ExecSource等元件都是使用它來封裝本地日誌資料。它的實現非常簡單,就是設定了header和body兩部分資料。

public class SimpleEvent implements Event {

  private Map<String, String> headers;
  private byte[] body;

  public SimpleEvent() {
    headers = new HashMap<String, String>();
    body = new byte[0];
  }

  @Override
  public Map<String, String> getHeaders() {
    return headers;
  }

  @Override
  public void setHeaders(Map<String, String> headers) {
    this.headers = headers;
  }

  @Override
  public byte[] getBody() {
    return body;
  }

  @Override
  public void setBody(byte[] body) {
    if(body == null){
      body = new byte[0];
    }
    this.body = body;
  }

  @Override
  public String toString() {
    Integer bodyLen = null;
    if (body != null) bodyLen = body.length;
    return "[Event headers = " + headers + ", body.length = " + bodyLen + " ]";
  }

}

看一下如何建立Event物件例項

public class EventBuilder {

  /**
   * Instantiate an Event instance based on the provided body and headers.
   * If <code>headers</code> is <code>null</code>, then it is ignored.
   * @param body
   * @param headers
   * @return
   */
  public static Event withBody(byte[] body, Map<String, String> headers) {
    Event event = new SimpleEvent();

    if(body == null) {
      body = new byte[0];
    }
    event.setBody(body);

    if (headers != null) {
      event.setHeaders(new HashMap<String, String>(headers));
    }

    return event;
  }
}