1. 程式人生 > >SODBASE CEP學習進階篇(二):日誌採集-Flume

SODBASE CEP學習進階篇(二):日誌採集-Flume

在IT系統運維和效能監控中,常常需要對日誌進行分析,得到系統的故障點或效能瓶頸。採用現成的日誌分析軟體,通常著重於監測節點和網路狀態,幾乎都難以滿足大型應用系統對故障點或效能瓶頸分析的要求。

舉幾個例子:

(1)找出故障的上下層呼叫的關係,定位應用層故障對應的底層介面

(2)分析父子呼叫的時間差,找出效能瓶頸

(3)分析指定系統呼叫和服務的響應時間、是否超時

SODBASE CEP可以處理各類複雜的日誌實時分析和圖表顯示功能。使用者可以自己定義日誌服務介面,採集日誌資料,也可以用一些日誌採集客戶端,如flume、splunk等,完成採集功能。

1.操作步驟

(1)Windows環境(Linux環境類似),安裝JDK1.6+

解壓點選cepstudio.exe開啟

在SODBASE Studio中點選選單“檔案”->“匯入" 選擇loganalysis.sod


在工作區面板空白處,右鍵點選測試執行

(3)下載apache-flume-1.5.2-bin.zip,解壓到E:\software或其它自定義目錄。將SODBASE Studio lib目錄下的sodbase-cep-engine.jar,sodbase-studio.jar,sodbase-dataadaptor-socket.jar,sodbase-dataadaptor-flume.jar拷貝到flume的lib目錄下

用記事本代開flume的bin/flume-win.bat,需要編輯兩個地方:FLUME_HOME,flume解壓後的目錄;JAVA_HOME,設定自己的JDK安裝目錄。

(4)執行flume-win.bat

(5)結果輸出。在Studio中可以看到接收的日誌結果,如下圖所示


(6)SODBASE Studio主要用於建模和測試。如果想把日誌分析功能部署到伺服器,請參考

示例操作完成,如果想了解工作原理,請看下文。

2.工作原理

flume比較其它日誌採集客戶端的優點是Java編寫跨平臺,輕量級。本文用flume作為日誌採集客戶端,將日誌資訊傳送到SODBASE CEP引擎。示例中,我們實現一個將資料通過socket傳輸到CEP引擎的Sink。在CEP引擎中通過socket輸入介面卡負責接收資料。

需要用到的類庫有sodbase-cep-engine.jar,sodbase-studio.jar,sodbase-dataadaptor-socket.jar,sodbase-dataadaptor-flume.jar, 執行時放到flume的lib目錄下即可。注:sodbase-cep-engine.jar在flume中用的版本需要和CEPserver中的版本一致,保證物件能夠正常解序列化。

package com.sodbase.dataadaptor.flume;

import java.io.IOException;
import java.util.Date;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

import com.sodbase.outputadaptor.socket.SocketUtil;

import zstreamplus.eventbuffer.PrimitiveEvent;
import zstreamplus.eventbuffer.ValueType;

public class CEPServerSink extends AbstractSink implements Configurable
{
	private String CEPServerSocketIpPort;
	private String retrynum;
	private static final String DEFAULT_ENCODING = "UTF-8";
	private SocketUtil socketUtil=new SocketUtil();
	@Override
	public void configure(Context context)
	{
		/**
		 * 在flume-conf.properties中配置
		 */
		CEPServerSocketIpPort = context.getString("CEPServerSocketIpPort",
				"localhost:12345");
		retrynum = context.getString("CEPServerSocketRetryNum", "30");
	}

	@Override
	public void start()
	{
		socketUtil.setRunning(true);	
	}

	@Override
	public void stop()
	{
		socketUtil.setRunning(false);
		if (socketUtil.getClient() != null)
			try
			{
				socketUtil.getClient().close();
			} catch (IOException e)
			{
				e.printStackTrace();
			}

	}

	@SuppressWarnings("unchecked")
	@Override
	public Status process() throws EventDeliveryException
	{
		Status status = null;

		// Start transaction
		Channel ch = getChannel();
		Transaction txn = ch.getTransaction();
		txn.begin();
		try
		{

			Event event = ch.take();
			//prepare the log data
			String eventData = new String(event.getBody(), DEFAULT_ENCODING);
			PrimitiveEvent primitiveEvent = new PrimitiveEvent();
			ValueType valueType = new ValueType(eventData, "string");
			primitiveEvent.getAttributeMap().put("flumeeventdata", valueType);
			Date date = new Date();
			long time = date.getTime();
			primitiveEvent.setStart_ts(time);
			primitiveEvent.setEnd_ts(time);
			
			//transfer data to cep server
			String[] address = CEPServerSocketIpPort.split(":");
			socketUtil.setIp(address[0]);
			socketUtil.setPort(address[1]);
			socketUtil.setRetrynum(retrynum);
			socketUtil.outputPrimitiveEvent(primitiveEvent);
			
			
			txn.commit();
			status = Status.READY;
		} catch (Throwable t)
		{
			txn.rollback();

			status = Status.BACKOFF;

			if (t instanceof Error)
			{
				throw (Error) t;
			}
		} finally
		{
			txn.close();
		}
		return status;
	}
}

linux的flume啟動呼叫sh指令碼即可,windows中可參考下面指令碼

set FLUME_HOME=E:\software\apache-flume-1.5.2-bin\apache-flume-1.5.2-bin
set JAVA_HOME=D:\Program Files\Java\jdk1.7.0_51
set JAVA="%JAVA_HOME%\bin\java.exe"
set JAVA_OPTS=-Xmx256m
set CONF=%FLUME_HOME%\conf\flume-cep-conf.properties
set AGENT=agent
%JAVA%  %JAVA_OPTS% -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 -Dlog4j.configuration=file:\\\%FLUME_HOME%\conf\log4j.properties -cp "%FLUME_HOME%\lib\*" org.apache.flume.node.Application -f %FLUME_HOME%\conf\flume-cep-conf.properties -n %AGENT%

flume-cep-conf.properties

agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = loggerSink


# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = seq


# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel


# Each sink's type must be defined
agent.sinks.loggerSink.type = logger


#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel


# Each channel's type is defined.
agent.channels.memoryChannel.type = memory


# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100
agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = loggerSink


agent.sources.seqGenSrc.type = seq
agent.sources.seqGenSrc.channels = memoryChannel
agent.sources.seqGenSrc.type = exec
agent.sources.seqGenSrc.command = cmd.exe /c echo test
agent.sources.seqGenSrc.restart = true
agent.sources.seqGenSrc.restartThrottle = 1000
agent.sources.seqGenSrc.batchSize = 100


agent.sinks.loggerSink.type = com.sodbase.dataadaptor.flume.CEPServerSink
agent.sinks.loggerSink.channel = memoryChannel
agent.sinks.loggerSink.CEPServerSocketIpPort=localhost:12345
agent.sinks.loggerSink.CEPServerSocketRetryNum=2


agent.channels.memoryChannel.type = memory

command可以用tail命令在監測日誌,windows下有python版的命令。

日誌資料採集上來以後,資料分析和展示請參考