1. 程式人生 > >[大資料] 搜尋日誌資料採集系統 flume+hbase+kafka架構 (資料搜狗實驗室)

[大資料] 搜尋日誌資料採集系統 flume+hbase+kafka架構 (資料搜狗實驗室)

1 採集規劃


說明:

D1 日誌所在伺服器1 —bigdata02.com

D2 日誌所在伺服器2 —bigdata03.com

日誌收集

日誌收集

日誌整合

儲存到kafka

儲存到HBase

2版本

  • kafka kafka_2.11-0.10.0.0
  • flume flume-1.7.0-bin
  • hbase hbase-0.98.6-cdh5.3.0

3 安裝

3.1 kafka安裝

vi config/server.properties

broker.id=1 ##其他機器修改

listeners=PLAINTEXT://bigdata01.com:9092 ##其他機器修改

port=9092

host.name=bigdata01.com ##其他機器修改

num.network.threads=3
 
num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/opt/modules/kafka_2.11-0.10.0.0/kafka-logs

num.partitions=1

num.recovery.threads.per.data.dir=1

log.retention.hours=168

log.segment.bytes=1073741824

message.max.byte=5242880

default.replication.factor=2

replica.fetch.max.bytes=5242880

log.retention.check.interval.ms=300000

log.cleaner.enable=false

zookeeper.connect=bigdata01.com:2181,bigdata02.com:2181,bigdata03.com:2181

zookeeper.connection.timeout.ms=60000

傳送到其他機器並修改server.properties

3.2 flume1安裝

1 vi conf/flume-env.sh

 export JAVA_HOME=/opt/modules/jdk1.7.0_67
 export HADOOP_HOME=/opt/modules/hadoop-2.5.0
 export HBASE_HOME=/opt/modules/hbase-0.98.6-cdh5.3.0
 export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

2 vi conf/flume-conf.properties

agent1.sources = r1
agent1.channels = kafkaC hbaseC
agent1.sinks = kafkaSink hbaseSink

#********************flume + hbase**************************

agent1.sources.r1.type = avro
agent1.sources.r1.channels = hbaseC kafkaC
agent1.sources.r1.bind = bigdata01.com
agent1.sources.r1.port = 55555
agent1.sources.r1.threads = 5

agent1.channels.hbaseC.type = memory
agent1.channels.hbaseC.capacity = 100000
agent1.channels.hbaseC.transactionCapacity = 100000 
agent1.channels.hbaseC.keep-alive = 20

agent1.sinks.hbaseSink.type = asynchbase
agent1.sinks.hbaseSink.table = weblogs
agent1.sinks.hbaseSink.columnFamily = info
agent1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer
agent1.sinks.hbaseSink.channel = hbaseC
agent1.sinks.hbaseSink.serializer.payloadColumn=datatime,userid,searchname,retorder,cliorder,cliurl

#********************flume + kafka*****************************
agent1.channels.kafkaC.type = memory
agent1.channels.kafkaC.capacity = 100000
agent1.channels.kafkaC.transactionCapacity = 100000
agent1.channels.kafkaC.keep-alive = 10

agent1.sinks.kafkaSink.channel = kafkaC
agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.brokerList = bigdata01.com:9092,bigdata02.com:9092,bigdata03.com:9092
agent1.sinks.kafkaSink.topic = weblogs
agent1.sinks.kafkaSink.zookeeperConnect= bigdata01.com:2181,bigdata02.com:2181,bigdata03.com:2181
agent1.sinks.kafkaSink.requiredAcks = 1
agent1.sinks.kafkaSink.batchSize = 1
agent1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder

3.3 flume2 安裝

vi conf/flume-conf.properties

agent2.sources = s1
agent2.channels = c1
agent2.sinks = k1

agent2.sources.s1.inputCharset = GBK
agent2.sources.s1.type = exec
agent2.sources.s1.command = tail -F /opt/datas/flume.log
agent2.sources.s1.channels=c1

#channels configuration
agent2.channels.c1.type = memory
agent2.channels.c1.capacity = 10000
agent2.channels.c1.transactionCapacity = 10000
agent2.channels.c1.keep-alive = 3

#sinks configuration
agent2.sinks.k1.type = avro
agent2.sinks.k1.hostname = bigdata01.com
agent2.sinks.k1.port = 55555
agent2.sinks.k1.channel = c1

3.3 flume3安裝

vi conf/flume-conf.properties

agent3.sources = s1
agent3.channels = c1
agent3.sinks = k1

agent3.sources.s1.inputCharset = GBK
agent3.sources.s1.type = exec
agent3.sources.s1.command = tail -F /opt/datas/flume.log
agent3.sources.s1.channels=c1

#channels configuration
agent3.channels.c1.type = memory
agent3.channels.c1.capacity = 10000
agent3.channels.c1.transactionCapacity = 10000
agent3.channels.c1.keep-alive = 3

#sinks configuration
agent3.sinks.k1.type = avro
agent3.sinks.k1.hostname = bigdata01.com
agent3.sinks.k1.port = 55555
agent3.sinks.k1.channel = c1

3.3 資料下載和預處理

下載地址

資料預處理 文字中有’\t’和" " 兩種分割符 預處理的時候將兩種分隔符統一用","分割

cat weblog.log |tr "\t" "," >weblog2.log
cat weblog2.log |tr " " "," >weblog3.log

3.4 flume ->hbase 原始碼修改

原始碼修改原因: 在初始原始碼中是一條資料Event只帶一個列簇資訊,而在這裡一個event帶了6個列簇資訊 所以需要修改原始碼

  1. 下載原始碼
  2. 匯入idea flume-ng-hbase-sink專案
  3. 新建org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer.java
 
package org.apache.flume.sink.hbase;

import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

 
public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
  private byte[] table;
  private byte[] cf;
  private byte[] payload;
  private byte[] payloadColumn;
  private byte[] incrementColumn;
  private String rowPrefix;
  private byte[] incrementRow;
  private KeyType keyType;

  @Override
  public void initialize(byte[] table, byte[] cf) {
    this.table = table;
    this.cf = cf;
  }

  //修改在這裡  
  @Override
  public List<PutRequest> getActions() {
    List<PutRequest> actions = new ArrayList<PutRequest>();
    if (payloadColumn != null) {
      byte[] rowKey;
      try {
        String [] colums = new String(this.payloadColumn).split(",");
        String [] columValue = new String(this.payload).split(",");
        for(int i =0;i<6;i++) {
          if(colums.length != columValue.length) {
            break;
          }
          String datetime = columValue[0].toString();
          String userid   = columValue[1].toString();
          byte [] hColColum = colums[i].getBytes();
          byte [] values = columValue[i].getBytes(Charsets.UTF_8);
          rowKey = SimpleRowKeyGenerator.getKfkTimestampKey(datetime,userid);
          PutRequest putRequest = new PutRequest(table, rowKey, cf,
                  hColColum, values);
          actions.add(putRequest);
        }
      } catch (Exception e) {
        throw new FlumeException("Could not get row key!", e);
      }
    }
    return actions;
  }

  public List<AtomicIncrementRequest> getIncrements() {
    List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
    if (incrementColumn != null) {
      AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
          incrementRow, cf, incrementColumn);
      actions.add(inc);
    }
    return actions;
  }

  @Override
  public void cleanUp() {
    // TODO Auto-generated method stub

  }

  @Override
  public void configure(Context context) {
    String pCol = context.getString("payloadColumn", "pCol");
    String iCol = context.getString("incrementColumn", "iCol");
    rowPrefix = context.getString("rowPrefix", "default");
    String suffix = context.getString("suffix", "uuid");
    if (pCol != null && !pCol.isEmpty()) {
      if (suffix.equals("timestamp")) {
        keyType = KeyType.TS;
      } else if (suffix.equals("random")) {
        keyType = KeyType.RANDOM;
      } else if (suffix.equals("nano")) {
        keyType = KeyType.TSNANO;
      } else {
        keyType = KeyType.UUID;
      }
      payloadColumn = pCol.getBytes(Charsets.UTF_8);
    }
    if (iCol != null && !iCol.isEmpty()) {
      incrementColumn = iCol.getBytes(Charsets.UTF_8);
    }
    incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
  }

  @Override
  public void setEvent(Event event) {
    this.payload = event.getBody();
  }

  @Override
  public void configure(ComponentConfiguration conf) {
    // TODO Auto-generated method stub
  }

}

  1. 重新匯出jar包 並修改名字 flume-ng-hbase-sink-1.7.0.jar

3.5 模擬使用者日誌生成程式碼

package main.java;

import java.io.*;
public class ReadWrite {

      static String readFileName;
      static String writeFileName;


      public static void main(String args[]){
           readFileName = args[0];
           writeFileName = args[1];
          try {
             // readInput();
            readFileByLines(readFileName);
          }catch(Exception e){
          }
      }

    public static void readFileByLines(String fileName) {
        FileInputStream fis = null;
        InputStreamReader isr = null;
        BufferedReader br = null;
        String tempString = null;
        try {
            System.out.println("以行為單位讀取檔案內容,一次讀一整行:");
            fis = new FileInputStream(fileName);// FileInputStream
            // 從檔案系統中的某個檔案中獲取位元組
            isr = new InputStreamReader(fis,"GBK");
            br = new BufferedReader(isr);
            int count=0;
            while ((tempString = br.readLine()) != null) {
                count++;
                // 顯示行號
                Thread.sleep(300);
                String str = new String(tempString.getBytes("UTF8"),"GBK");
               // System.out.println("row:"+count+">>>>>>>>"+tempString);
                method1(writeFileName,tempString);
                //appendMethodA(writeFileName,tempString);
            }
            isr.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (isr != null) {
                try {
                    isr.close();
                } catch (IOException e1) {
                }
            }
        }
    }
    public static void method1(String file, String conent) {
        BufferedWriter out = null;
        try {
            out = new BufferedWriter(new OutputStreamWriter(
                    new FileOutputStream(file, true)));
            out.write("\n");
            out.write(conent);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }




}


建立啟動指令碼

#bin/bash
echo "start ..."

java -jar /opt/jar/weblogs.jar /opt/datas/weblog3.log /opt/datas/flume.log

3.6 啟動各個元件

  1. 啟動kafka

啟動kafka

 bin/kafka-server-start.sh config/server.properties

建立topic 命令

bin/kafka-topics.sh --create --zookeeper bigdata01.com:2181,bigdata02.com:2181,bigdata03.com:2181 --replication-factor 3 --partitions 1 --topic weblogs

建立消費指令碼

#bin/bash
echo “kfk-kafka-comsumer.sh start ...”
bin/kafka-console-consumer.sh -zookeeper bigdata01.com:2181,bigdata02.com:2181,bigdata03.com:2181 --from-beginning --topic weblogs
  1. 啟動hbase
  2. 啟動flume(建立3個啟動指令碼 先啟動flume2,flume3 再啟動flume1)
#/bin/bash

echo "flume-1 start......"

bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent1 -Dflume.root.logger=INFO,console

4 啟動weblogs.jar

4 執行

1 檢視hbase 中的資料


hbase(main):001:0> count 'weblogs'
Current count: 1000, row: 6062969462004942-00:00:07-1525921408064                                                                          
1559 row(s) in 1.2250 seconds

=> 1559

2 檢視kafka中的資料

00:00:32,269736677015411,[itfm],2,11,chanye.finance.sina.com.cn/fz/2007-09-10/334786.shtml
00:00:32,393693921083884,[奧運聖火河南路線],1,3,yanziha.pengpeng.com/bbs/thread/1148790.html
00:00:31,14386146687311085,[PSP遊戲《怪物獵人2G》中文版下載],3,2,bbs.2u.com.cn/archiver/tid-93698.html
00:00:31,6747965581699283,[韓國首都為什麼改名],1,1,ks.cn.yahoo.com/question/1406120803100.html
00:00:31,5540122643843461,[感恩的心+試聽],4,1,www.yymp3.com/Play/7326/92974.htm
00:00:31,9874717412370105,[小馬過河的部落格],5,5,gaoshanliuyun200.blog.163.com/blog/static/2448501200692303238515/
00:00:31,3978551963724469,[3.44x33.com/],1,1,3.44x33.com/
00:00:31,6345435406335671,[李成儒+離婚],1,1,ent.sina.com.cn/2004-12-28/0646612073.html
00:00:31,5275533831056154,[華國峰同志逝世],6,1,www.meizu.com/bbs/showerr.asp?BoardID=10&ErrCodes=29&action=%BB%AA%B9%FA%B7%E5%CD%AC%D6%BE%CA%C5%CA%C0
00:00:31,3949828035015059,[old+woman],3,21,www.xxxmovieforum.com/
00:00:31,19186326774082868,[張雅],5,5,tv.mofile.com/tags/???\xa8\xa6??:0,1,20,1,0,0,audittime,0,
00:00:31,6009454949181303,[緬甸第三特區],9,13,www.xzqh.org/bbs/read.php?tid=31074
00:00:31,9472812716405814,[軟體],6,12,www.onlinedown.net/
00:00:32,9311412621037496,[哄搶救災物資],2,1,pic.news.mop.com/gs/2008/0528/12985.shtml
00:00:32,3691729199504175,[哭泣的星空+MP3],2,2,yr0201.blog.sohu.com/22352924.html
00:00:32,40320548674212914,[楊丞琳辱華事件],1,1,you.video.sina.com.cn/b/1084004-1261359184.html
00:00:32,8561366108033201,[哄搶救災物資],1,3,news.21cn.com/social/daqian/2008/05/29/4777194_1.shtml
00:00:32,141278734311103,[網站建設],1,1,www.qicaispace.com/
00:00:32,056513944508728375,[黎姿],2,1,news.baidu.com/f/17/lizi.html
00:00:32,269736677015411,[itfm],2,11,chanye.finance.sina.com.cn/fz/2007-09-10/334786.shtml
00:00:32,393693921083884,[奧運聖火河南路線],1,3,yanziha.pengpeng.com/bbs/thread/1148790.html
00:00:32,9994672352241558,[高階妓女],6,216,lady.anhuinews.com/system/2003/01/07/000213154.shtml
00:00:32,9994672352241558,[高階妓女],6,216,lady.anhuinews.com/system/2003/01/07/000213154.shtml
00:00:32,7954374672498488,[臺灣空軍叛逃大陸],6,4,www.hanhuncn.com/Html/Twsj/20060921074835205_2.html
00:00:32,2896977267956338,[荔枝核深加工],4,4,www.ilib.cn/A-spkj200603040.html
00:00:33,41800714861954374,[月見草油],7,13,www.hisuppliers.com/remen/list/yuejiancaoyou/yuejiancaoyou.html
00:00:33,2699849326058153,[見過這樣的另類嗎],9,1,bbs.vogue.com.cn/archiver/?tid-92752.html
00:00:33,12931893747701723,[美軍審訊越南女戰俘],15,59,xzd.2000y.net/mb/1/ReadNews.asp?NewsID=547170
00:00:33,4554795388682654,[寧王府],1,4,www.cdol.net/html/29/109929-15687.html
00:00:33,9921372392180088,[尺寸鏈],12,55,www.ilib.cn/A-kjqbkfyjj200307090.html
00:00:33,14386146687311085,[PSP遊戲《怪物獵人2G》中文版下載],1,3,games.qq.com/a/20080401/000413.htm
00:00:33,9700485503618976,[如何讓頭髮快速長長],2,12,zhidao.baidu.com/question/24246694.html
00:00:33,6242029922450475,[掃地車報價],19,40,liqixianjin.b2b.hc360.com/supply/27323118.html
00:00:33,8480586467887667,[科比81分視訊],1,1,www.tudou.com/programs/view/cZMRnhWcGtw/
00:00:33,9378259159932798,[隆武帝],120,46,www.openow.net/details/e2007.html
00:00:33,8933412496786006,[沈國放間諜事件],1,9,news.qq.com/a/20060425/
00:00:33,48530765688455246,[胡其美],27,4,bbs1.hxsd.com.cn/user/info?uid=182634
00:00:33,28250791446280643,[命名],10,7,www.namers.cn/
00:00:33,21071231987753036,[莎朗斯通],3,6,ent.qq.com/a/20060214/000136.htm
00:00:33,9586356230570776,[學有所教、勞有所得、老有所養、病有所醫、住有所居],1,5,cpc.people.com.cn/GB/67481/94156/105719/105723/106451/6738281.html
00:00:34,2199783436347869,[如何下載56視訊],1,1,wenwen.soso.com/z/q8527818.htm