Flume日誌收集系統架構詳解--轉
任何一個生產系統在運行過程中都會產生大量的日誌,日誌往往隱藏了很多有價值的信息。在沒有分析方法之前,這些日誌存儲一段時間後就會被清理。隨著技術的發展和分析能力的提高,日誌的價值被重新重視起來。在分析這些日誌之前,需要將分散在各個生產系統中的日誌收集起來。本節介紹廣泛應用的Flume日誌收集系統。
一、概述Flume是Cloudera公司的一款高性能、高可用的分布式日誌收集系統,現在已經是Apache的頂級項目。同Flume相似的日誌收集系統還有Facebook Scribe、Apache Chuwka。
二、Flume發展歷程Flume 初始的發行版本目前被統稱為Flume OG(Original Generation),屬於Cloudera。但隨著 Flume 功能的擴展,Flume OG 代碼工程臃腫、核心組件設計不合理、核心配置不標準等缺點逐漸暴露出來,尤其是在 Flume OG 的最後一個發行版本0.94.0中,日誌傳輸不穩定現象尤為嚴重。為了解決這些問題,2011 年 10 月 22日,Cloudera 完成了 Flume-728,對Flume進行了裏程碑式的改動:重構核心組件、核心配置及代碼架構,重構後的版本統稱為 Flume NG(Next Generation);改動的另一原因是將 Flume 納入Apache 旗下,Cloudera Flume 更名為 Apache Flume。
1. 系統特點① 可靠性
當節點出現故障時,日誌能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次為:end-to-end(收到數據後,Agent首先將事件寫到磁盤上,當數據傳送成功後,再刪除;如果數據發送失敗,則重新發送)、Store on Failure(這也是Scribe采用的策略,當數據接收方崩潰時,將數據寫到本地,待恢復後繼續發送)、Best Effort(數據發送到接收方後,不會進行確認)。
② 可擴展性Flume采用了三層架構,分別為Agent、Collector和Storage,每一層均可以水平擴展。其中,所有的Agent和Collector均由Master統一管理,這使得系統容易被監控和維護。並且Master允許有多個(使用ZooKeeper進行管理和負載均衡),這樣就避免了單點故障問題。
當有多個Master時,Flume利用ZooKeeper和Gossip保證動態配置數據的一致性。用戶可以在Master上查看各個數據源或者數據流執行情況,並且可以對各個數據源進行配置和動態加載。Flume提供了Web和Shell Script Command兩種形式對數據流進行管理。
④ 功能可擴展性用戶可以根據需要添加自己的Agent、Collector或Storage。此外,Flume自帶了很多組件,包括各種Agent(如File、Syslog等)、Collector和Storage(如File、HDFS等)。
2. 系統架構如圖所示是Flume OG的架構。
Flume NG的架構如下圖所示。Flume采用了分層架構,分別為Agent、Collector和Storage。其中,Agent和Collector均由Source和Sink兩部分組成,Source是數據來源,Sink是數據去向。
Flume使用了兩個組件:Master和Node。Node根據在Master Shell或Web中的動態配置,決定其是作為Agent還是作為Collector。
Agent的作用是將數據源的數據發送給Collector。Flume自帶了很多直接可用的數據源(Source),如下。
text("filename"):將文件filename作為數據源,按行發送。
tail("filename"):探測filename新產生的數據,按行發送。
fsyslogTcp(5140):監聽TCP的5140端口,並將接收到的數據發送。
tailDir("dirname"[,fileregex=".*"[,startFromEnd=false[,recurseDepth=0]]]):監聽目錄中的文件末尾,使用正則表達式選定需要監聽的文件(不包含目錄),recurseDepth為遞歸監聽其下子目錄的深度,同時提供了很多Sink,如console[("format")],直接將數據顯示在console上。
text("txtfile"):將數據寫到文件txtfile中。
dfs("dfsfile"):將數據寫到HDFS上的dfsfile文件中。
syslogTcp("host",port):將數據通過TCP傳遞給host節點。
agentSink[("machine"[,port])]:等價於agentE2ESink,如果省略machine參數,則默認使用flume.collector.event.host與flume.collector.event.port作為默認collectro。
agentDFOSink[("machine"[,port])]:本地熱備Agent。Agent發現Collector節點故障後,不斷檢查Collector的存活狀態以便重新發送Event,在此期間產生的數據將緩存到本地磁盤中。
agentBESink[("machine"[,port])]:不負責的Agent。如果Collector出現故障,將不作任何處理,它發送的數據也將被直接丟棄。
agentE2EChain:指定多個Collector,以提高可用性。當向主Collector發送Event失效後,將轉向第二個Collector發送;當所有的Collector都失效後,它還會再發送一遍。
② CollectorCollector的作用是將多個Agent的數據匯總後,加載到Storage中。它的Source和Sink與Agent類似。
Source如下。
collectorSource[(port)]:Collector Source,監聽端口匯聚數據。
autoCollectorSource:通過Master協調物理節點自動匯聚數據。
logicalSource:邏輯Source,由Master分配端口並監聽rpcSink。
Sink如下。
collectorSink("fsdir","fsfileprefix",rollmillis):collectorSink,數據通過Collector匯聚之後發送到HDFS,fsdir是HDFS目錄,fsfileprefix為文件前綴碼。
customdfs("hdfspath"[,"format"]):自定義格式DFS。
③ StorageStorage是存儲系統,可以是一個普通File,也可以是HDFS、Hive、HBase、分布式存儲等。
④ MasterMaster負責管理、協調Agent和Collector的配置信息,是Flume集群的控制器。
在Flume中,最重要的抽象是Data Flow(數據流)。Data Flow描述了數據從產生、傳輸、處理到最終寫入目標的一條路徑,如下圖所示。
對於Agent數據流配置,就是從哪裏得到數據,就把數據發送到哪個Collector。
對於Collector,就是接收Agent發送過來的數據,然後把數據發送到指定的目標機器上。
註:Flume框架對Hadoop和ZooKeeper的依賴只存在於JAR包上,並不要求Flume啟動時必須將Hadoop和ZooKeeper服務同時啟動。
3. 組件介紹本文所說的Flume基於1.4.0版本。
① Client路徑:apache-flume-1.4.0-src\flume-ng-clients。
操作最初的數據,把數據發送給Agent。在Client與Agent之間建立數據溝通的方式有兩種。
第一種方式:創建一個iclient繼承Flume已經存在的Source,如AvroSource或者SyslogTcpSource,但是必須保證所傳輸的數據Source可以理解。
第二種方式:寫一個Flume Source通過IPC或者RPC協議直接與已經存在的應用通信,需要轉換成Flume可以識別的事件。
Client SDK:是一個基於RPC協議的SDK庫,可以通過RPC協議使應用與Flume直接建立連接。可以直接調用SDK的api函數而不用關註底層數據是如何交互的,提供append和appendBatch兩個接口,具體的可以看看代碼apache-flume-1.4.0-src\flume-ng-sdk\src\main\java\org\apache\ flume\api\RpcClient.java。
② NettyAvroRpcClientAvro是默認的RPC協議。NettyAvroRpcClient和ThriftRpcClient分別對RpcClient接口進行了實現,具體實現可以看下代碼apache-flume-1.4.0-src\flume-ng-sdk\src\main\java\org\apache\flume\api\ NettyAvroRpcClient.java和apache-flume-1.4.0-src\flume-ng-sdk\src\main\java\org\apache\flume\api\ ThriftRpcClient.java。
下面給出一個使用SDK與Flume建立連接的樣例如下,實際使用中可以參考實現:
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
public class MyApp {
public static void main(String[] args) {
MyRpcClientFacade client = new MyRpcClientFacade();
// Initialize client with the remote Flume agent‘s host and port
client.init("host.example.org",41414);
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = "Hello Flume!";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname,int port) {
// Setup the RPC connection
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname,port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname,port);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data,Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname,port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname,port);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
為了能夠監聽到關聯端口,需要在配置文件中增加端口和Host配置信息(配置文件apache-flume- 1.4.0-src\conf\flume-conf.properties.template)。
client.type = default (for avro) or thrift (for thrift)
hosts = h1 # default client accepts only 1 host
# (additional hosts will be ignored)
hosts.h1 = host1.example.org:41414 # host and port must both be specified
# (neither has a default)
batch-size = 100 # Must be >=1 (default:100)
connect-timeout = 20000 # Must be >=1000 (default:20000)
request-timeout = 20000 # Must be >=1000 (default:20000)
除了以上兩類實現外,FailoverRpcClient.java和LoadBalancingRpcClient.java也分別對RpcClient接口進行了實現。
③ FailoverRpcClient該接口主要實現了主備切換,采用<host>:<port>的形式,一旦當前連接失敗,就會自動尋找下一個連接。
④ LoadBalancingRpcClient該接口在有多個Host的時候起到負載均衡的作用。
⑤ Embeded Agent
Flume允許用戶在自己的Application裏內嵌一個Agent。這個內嵌的Agent是一個輕量級的Agent,不支持所有的Source Sink Channel。
⑥ Transaction
Flume的三個主要組件——Source、Sink、Channel必須使用Transaction來進行消息收發。在Channel的類中會實現Transaction的接口,不管是Source還是Sink,只要連接上Channel,就必須先獲取Transaction對象,如下圖所示。
具體使用實例如下,可以供生成環境中參考:
Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
Event eventToStage = EventBuilder.withBody("Hello Flume!",Charset.forName ("UTF-8"));
ch.put(eventToStage);
txn.commit();
} catch (Throwable t) {
txn.rollback();
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
⑦ Sink
Sink的一個重要作用就是從Channel裏獲取事件,然後把事件發送給下一個Agent,或者把事件存儲到另外的倉庫內。一個Sink會關聯一個Channel,這是配置在Flume的配置文件裏的。SinkRunner.start()函數被調用後,會創建一個線程,該線程負責管理Sink的整個生命周期。Sink需要實現LifecycleAware接口的start()和stop()方法。
Sink.start():初始化Sink,設置Sink的狀態,可以進行事件收發。
Sink.stop():進行必要的cleanup動作。
Sink.process():負責具體的事件操作。
Sink使用參考代碼實例如下:
public class MySink extends AbstractSink implements Configurable {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp","defaultValue");
// Process the myProp value (e.g. validation)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external repository (e.g. HDFS) that
// this Sink will forward Events to ..
}
@Override
public void stop () {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
// Send the Event to the external repository.
// storeSomeData(e);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception,handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}
⑧ Source
Source的作用是從Client端接收事件,然後把事件存儲到Channel中。PollableSourceRunner.start()用於創建一個線程,管理PollableSource的生命周期。同樣也需要實現start()和stop()兩種方法。需要註意的是,還有一類Source,被稱為EventDrivenSource。區別是EventDrivenSource有自己的回調函數用於捕捉事件,並不是每個線程都會驅動一個EventDrivenSource。
以下是一個PollableSource的例子:
public class MySource extends AbstractSource implements Configurable, PollableSource {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp","defaultValue");
// Process the myProp value (e.g. validation,convert to another type,...)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external client
}
@Override
public void stop () {
// Disconnect from external client and do any additional cleanup
// (e.g. releasing resources or nulling-out field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
// Receive new data
Event e = getSomeData();
// Store the Event into this Source‘s associated Channel(s)
getChannelProcessor().processEvent(e)
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception,handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}
4. Flume使用模式
Flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日誌數據(字節數組形式)並且攜帶有頭信息,這些Event由Agent外部的Source,比如上圖中的Web Server生成。當Source捕獲事件後會進行特定的格式化,然後Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區,它將保存事件直到Sink處理完該事件。Sink負責持久化日誌或者把事件推向另一個Source。
很直白的設計,其中值得註意的是,Flume提供了大量內置的Source、Channel和Sink類型。不同類型的Source,Channel和Sink可以自由組合。多Agent串聯,如下圖所示。
或者多Agent合並,如下圖所示。
如果你以為Flume就這些能耐那就大錯特錯了。Flume支持用戶建立多級流,也就是說,多個agent可以協同工作,並且支持Fan-in、Fan-out、Contextual Routing、Backup Routes。如下圖所示。
參考文獻
-
參考http://www.aboutyun.com/thread-7848-1-1.html官網用戶手冊,http://flume.apache.org/FlumeUserGuide.html。
-
Github地址https://github.com/apache/flume。
-
參考http://flume.apache.org/FlumeUserGuide.html。
Flume日誌收集系統架構詳解--轉