1. 程式人生 > >Flume日誌收集系統架構詳解--轉

Flume日誌收集系統架構詳解--轉

with 指定 mwl 裏程碑 工程 生命 數據接收 dba -i

2017-09-06 朱潔 大數據和雲計算技術

任何一個生產系統在運行過程中都會產生大量的日誌,日誌往往隱藏了很多有價值的信息。在沒有分析方法之前,這些日誌存儲一段時間後就會被清理。隨著技術的發展和分析能力的提高,日誌的價值被重新重視起來。在分析這些日誌之前,需要將分散在各個生產系統中的日誌收集起來。本節介紹廣泛應用的Flume日誌收集系統。

一、概述

Flume是Cloudera公司的一款高性能、高可用的分布式日誌收集系統,現在已經是Apache的頂級項目。同Flume相似的日誌收集系統還有Facebook Scribe、Apache Chuwka。

二、Flume發展歷程

Flume 初始的發行版本目前被統稱為Flume OG(Original Generation),屬於Cloudera。但隨著 Flume 功能的擴展,Flume OG 代碼工程臃腫、核心組件設計不合理、核心配置不標準等缺點逐漸暴露出來,尤其是在 Flume OG 的最後一個發行版本0.94.0中,日誌傳輸不穩定現象尤為嚴重。為了解決這些問題,2011 年 10 月 22日,Cloudera 完成了 Flume-728,對Flume進行了裏程碑式的改動:重構核心組件、核心配置及代碼架構,重構後的版本統稱為 Flume NG(Next Generation);改動的另一原因是將 Flume 納入Apache 旗下,Cloudera Flume 更名為 Apache Flume。

三、Flume架構分析
1. 系統特點① 可靠性

當節點出現故障時,日誌能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次為:end-to-end(收到數據後,Agent首先將事件寫到磁盤上,當數據傳送成功後,再刪除;如果數據發送失敗,則重新發送)、Store on Failure(這也是Scribe采用的策略,當數據接收方崩潰時,將數據寫到本地,待恢復後繼續發送)、Best Effort(數據發送到接收方後,不會進行確認)。

② 可擴展性

Flume采用了三層架構,分別為Agent、Collector和Storage,每一層均可以水平擴展。其中,所有的Agent和Collector均由Master統一管理,這使得系統容易被監控和維護。並且Master允許有多個(使用ZooKeeper進行管理和負載均衡),這樣就避免了單點故障問題。

③ 可管理性

當有多個Master時,Flume利用ZooKeeper和Gossip保證動態配置數據的一致性。用戶可以在Master上查看各個數據源或者數據流執行情況,並且可以對各個數據源進行配置和動態加載。Flume提供了Web和Shell Script Command兩種形式對數據流進行管理。

④ 功能可擴展性

用戶可以根據需要添加自己的Agent、Collector或Storage。此外,Flume自帶了很多組件,包括各種Agent(如File、Syslog等)、Collector和Storage(如File、HDFS等)。

2. 系統架構

如圖所示是Flume OG的架構。

技術分享

Flume NG的架構如下圖所示。Flume采用了分層架構,分別為Agent、Collector和Storage。其中,Agent和Collector均由Source和Sink兩部分組成,Source是數據來源,Sink是數據去向。

Flume使用了兩個組件:Master和Node。Node根據在Master Shell或Web中的動態配置,決定其是作為Agent還是作為Collector。

技術分享

① Agent

Agent的作用是將數據源的數據發送給Collector。Flume自帶了很多直接可用的數據源(Source),如下。

text("filename"):將文件filename作為數據源,按行發送。

tail("filename"):探測filename新產生的數據,按行發送。

fsyslogTcp(5140):監聽TCP的5140端口,並將接收到的數據發送。

tailDir("dirname"[,fileregex=".*"[,startFromEnd=false[,recurseDepth=0]]]):監聽目錄中的文件末尾,使用正則表達式選定需要監聽的文件(不包含目錄),recurseDepth為遞歸監聽其下子目錄的深度,同時提供了很多Sink,如console[("format")],直接將數據顯示在console上。

text("txtfile"):將數據寫到文件txtfile中。

dfs("dfsfile"):將數據寫到HDFS上的dfsfile文件中。

syslogTcp("host",port):將數據通過TCP傳遞給host節點。

agentSink[("machine"[,port])]:等價於agentE2ESink,如果省略machine參數,則默認使用flume.collector.event.host與flume.collector.event.port作為默認collectro。

agentDFOSink[("machine"[,port])]:本地熱備Agent。Agent發現Collector節點故障後,不斷檢查Collector的存活狀態以便重新發送Event,在此期間產生的數據將緩存到本地磁盤中。

agentBESink[("machine"[,port])]:不負責的Agent。如果Collector出現故障,將不作任何處理,它發送的數據也將被直接丟棄。

agentE2EChain:指定多個Collector,以提高可用性。當向主Collector發送Event失效後,將轉向第二個Collector發送;當所有的Collector都失效後,它還會再發送一遍。

② Collector

Collector的作用是將多個Agent的數據匯總後,加載到Storage中。它的Source和Sink與Agent類似。

Source如下。

collectorSource[(port)]:Collector Source,監聽端口匯聚數據。

autoCollectorSource:通過Master協調物理節點自動匯聚數據。

logicalSource:邏輯Source,由Master分配端口並監聽rpcSink。

Sink如下。

collectorSink("fsdir","fsfileprefix",rollmillis):collectorSink,數據通過Collector匯聚之後發送到HDFS,fsdir是HDFS目錄,fsfileprefix為文件前綴碼。

customdfs("hdfspath"[,"format"]):自定義格式DFS。

③ Storage

Storage是存儲系統,可以是一個普通File,也可以是HDFS、Hive、HBase、分布式存儲等。

④ Master

Master負責管理、協調Agent和Collector的配置信息,是Flume集群的控制器。

在Flume中,最重要的抽象是Data Flow(數據流)。Data Flow描述了數據從產生、傳輸、處理到最終寫入目標的一條路徑,如下圖所示。

技術分享

對於Agent數據流配置,就是從哪裏得到數據,就把數據發送到哪個Collector。

對於Collector,就是接收Agent發送過來的數據,然後把數據發送到指定的目標機器上。

註:Flume框架對Hadoop和ZooKeeper的依賴只存在於JAR包上,並不要求Flume啟動時必須將Hadoop和ZooKeeper服務同時啟動。

3. 組件介紹

本文所說的Flume基於1.4.0版本。

① Client

路徑:apache-flume-1.4.0-src\flume-ng-clients。

操作最初的數據,把數據發送給Agent。在Client與Agent之間建立數據溝通的方式有兩種。

第一種方式:創建一個iclient繼承Flume已經存在的Source,如AvroSource或者SyslogTcpSource,但是必須保證所傳輸的數據Source可以理解。

第二種方式:寫一個Flume Source通過IPC或者RPC協議直接與已經存在的應用通信,需要轉換成Flume可以識別的事件。

Client SDK:是一個基於RPC協議的SDK庫,可以通過RPC協議使應用與Flume直接建立連接。可以直接調用SDK的api函數而不用關註底層數據是如何交互的,提供append和appendBatch兩個接口,具體的可以看看代碼apache-flume-1.4.0-src\flume-ng-sdk\src\main\java\org\apache\ flume\api\RpcClient.java。

② NettyAvroRpcClient

Avro是默認的RPC協議。NettyAvroRpcClient和ThriftRpcClient分別對RpcClient接口進行了實現,具體實現可以看下代碼apache-flume-1.4.0-src\flume-ng-sdk\src\main\java\org\apache\flume\api\ NettyAvroRpcClient.java和apache-flume-1.4.0-src\flume-ng-sdk\src\main\java\org\apache\flume\api\ ThriftRpcClient.java。

下面給出一個使用SDK與Flume建立連接的樣例如下,實際使用中可以參考實現:

import org.apache.flume.Event;

import org.apache.flume.EventDeliveryException;

import org.apache.flume.api.RpcClient;

import org.apache.flume.api.RpcClientFactory;

import org.apache.flume.event.EventBuilder;

import java.nio.charset.Charset;

public class MyApp {

public static void main(String[] args) {

MyRpcClientFacade client = new MyRpcClientFacade();

// Initialize client with the remote Flume agent‘s host and port

client.init("host.example.org",41414);

// Send 10 events to the remote Flume agent. That agent should be

// configured to listen with an AvroSource.

String sampleData = "Hello Flume!";

for (int i = 0; i < 10; i++) {

client.sendDataToFlume(sampleData);

}

client.cleanUp();

}

}

class MyRpcClientFacade {

private RpcClient client;

private String hostname;

private int port;

public void init(String hostname,int port) {

// Setup the RPC connection

this.hostname = hostname;

this.port = port;

this.client = RpcClientFactory.getDefaultInstance(hostname,port);

// Use the following method to create a thrift client (instead of the above line):

// this.client = RpcClientFactory.getThriftInstance(hostname,port);

}

public void sendDataToFlume(String data) {

// Create a Flume Event object that encapsulates the sample data

Event event = EventBuilder.withBody(data,Charset.forName("UTF-8"));

// Send the event

try {

client.append(event);

} catch (EventDeliveryException e) {

// clean up and recreate the client

client.close();

client = null;

client = RpcClientFactory.getDefaultInstance(hostname,port);

// Use the following method to create a thrift client (instead of the above line):

// this.client = RpcClientFactory.getThriftInstance(hostname,port);

}

}

public void cleanUp() {

// Close the RPC connection

client.close();

}

}

為了能夠監聽到關聯端口,需要在配置文件中增加端口和Host配置信息(配置文件apache-flume- 1.4.0-src\conf\flume-conf.properties.template)。

client.type = default (for avro) or thrift (for thrift)

hosts = h1 # default client accepts only 1 host

# (additional hosts will be ignored)

hosts.h1 = host1.example.org:41414 # host and port must both be specified

# (neither has a default)

batch-size = 100 # Must be >=1 (default:100)

connect-timeout = 20000 # Must be >=1000 (default:20000)

request-timeout = 20000 # Must be >=1000 (default:20000)

除了以上兩類實現外,FailoverRpcClient.java和LoadBalancingRpcClient.java也分別對RpcClient接口進行了實現。

③ FailoverRpcClient

該接口主要實現了主備切換,采用<host>:<port>的形式,一旦當前連接失敗,就會自動尋找下一個連接。

④ LoadBalancingRpcClient

該接口在有多個Host的時候起到負載均衡的作用。

⑤ Embeded Agent

Flume允許用戶在自己的Application裏內嵌一個Agent。這個內嵌的Agent是一個輕量級的Agent,不支持所有的Source Sink Channel。

⑥ Transaction

Flume的三個主要組件——Source、Sink、Channel必須使用Transaction來進行消息收發。在Channel的類中會實現Transaction的接口,不管是Source還是Sink,只要連接上Channel,就必須先獲取Transaction對象,如下圖所示。

技術分享

具體使用實例如下,可以供生成環境中參考:

Channel ch = new MemoryChannel();

Transaction txn = ch.getTransaction();

txn.begin();

try {

Event eventToStage = EventBuilder.withBody("Hello Flume!",Charset.forName ("UTF-8"));

ch.put(eventToStage);

txn.commit();

} catch (Throwable t) {

txn.rollback();

if (t instanceof Error) {

throw (Error)t;

}

} finally {

txn.close();

}

⑦ Sink

Sink的一個重要作用就是從Channel裏獲取事件,然後把事件發送給下一個Agent,或者把事件存儲到另外的倉庫內。一個Sink會關聯一個Channel,這是配置在Flume的配置文件裏的。SinkRunner.start()函數被調用後,會創建一個線程,該線程負責管理Sink的整個生命周期。Sink需要實現LifecycleAware接口的start()和stop()方法。

Sink.start():初始化Sink,設置Sink的狀態,可以進行事件收發。

Sink.stop():進行必要的cleanup動作。

Sink.process():負責具體的事件操作。

Sink使用參考代碼實例如下:

public class MySink extends AbstractSink implements Configurable {

private String myProp;

@Override

public void configure(Context context) {

String myProp = context.getString("myProp","defaultValue");

// Process the myProp value (e.g. validation)

// Store myProp for later retrieval by process() method

this.myProp = myProp;

}

@Override

public void start() {

// Initialize the connection to the external repository (e.g. HDFS) that

// this Sink will forward Events to ..

}

@Override

public void stop () {

// Disconnect from the external respository and do any

// additional cleanup (e.g. releasing resources or nulling-out

// field values) ..

}

@Override

public Status process() throws EventDeliveryException {

Status status = null;

// Start transaction

Channel ch = getChannel();

Transaction txn = ch.getTransaction();

txn.begin();

try {

// This try clause includes whatever Channel operations you want to do

Event event = ch.take();

// Send the Event to the external repository.

// storeSomeData(e);

txn.commit();

status = Status.READY;

} catch (Throwable t) {

txn.rollback();

// Log exception,handle individual exceptions as needed

status = Status.BACKOFF;

// re-throw all Errors

if (t instanceof Error) {

throw (Error)t;

}

} finally {

txn.close();

}

return status;

}

}

⑧ Source

Source的作用是從Client端接收事件,然後把事件存儲到Channel中。PollableSourceRunner.start()用於創建一個線程,管理PollableSource的生命周期。同樣也需要實現start()和stop()兩種方法。需要註意的是,還有一類Source,被稱為EventDrivenSource。區別是EventDrivenSource有自己的回調函數用於捕捉事件,並不是每個線程都會驅動一個EventDrivenSource。

以下是一個PollableSource的例子:

public class MySource extends AbstractSource implements Configurable, PollableSource {

private String myProp;

@Override

public void configure(Context context) {

String myProp = context.getString("myProp","defaultValue");

// Process the myProp value (e.g. validation,convert to another type,...)

// Store myProp for later retrieval by process() method

this.myProp = myProp;

}

@Override

public void start() {

// Initialize the connection to the external client

}

@Override

public void stop () {

// Disconnect from external client and do any additional cleanup

// (e.g. releasing resources or nulling-out field values) ..

}

@Override

public Status process() throws EventDeliveryException {

Status status = null;

// Start transaction

Channel ch = getChannel();

Transaction txn = ch.getTransaction();

txn.begin();

try {

// This try clause includes whatever Channel operations you want to do

// Receive new data

Event e = getSomeData();

// Store the Event into this Source‘s associated Channel(s)

getChannelProcessor().processEvent(e)

txn.commit();

status = Status.READY;

} catch (Throwable t) {

txn.rollback();

// Log exception,handle individual exceptions as needed

status = Status.BACKOFF;

// re-throw all Errors

if (t instanceof Error) {

throw (Error)t;

}

} finally {

txn.close();

}

return status;

}

}

4. Flume使用模式

Flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日誌數據(字節數組形式)並且攜帶有頭信息,這些Event由Agent外部的Source,比如上圖中的Web Server生成。當Source捕獲事件後會進行特定的格式化,然後Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區,它將保存事件直到Sink處理完該事件。Sink負責持久化日誌或者把事件推向另一個Source。

很直白的設計,其中值得註意的是,Flume提供了大量內置的Source、Channel和Sink類型。不同類型的Source,Channel和Sink可以自由組合。多Agent串聯,如下圖所示。

技術分享

或者多Agent合並,如下圖所示。

技術分享

如果你以為Flume就這些能耐那就大錯特錯了。Flume支持用戶建立多級流,也就是說,多個agent可以協同工作,並且支持Fan-in、Fan-out、Contextual Routing、Backup Routes。如下圖所示。

技術分享

參考文獻

  • 參考http://www.aboutyun.com/thread-7848-1-1.html官網用戶手冊,http://flume.apache.org/FlumeUserGuide.html。

  • Github地址https://github.com/apache/flume。

  • 參考http://flume.apache.org/FlumeUserGuide.html。

Flume日誌收集系統架構詳解--轉