1. 程式人生 > >flume+kafka+storm整合實現實時計算小案例

flume+kafka+storm整合實現實時計算小案例

    我們做資料分析的時候常常會遇到這樣兩個場景,一個是統計歷史資料,這個就是要分析歷史儲存的日誌。我們會使用hadoop,具體框架可以設計為:
1.flume收集日誌;
2.HDFS輸入路徑儲存日誌;
3.MapReduce計算,將結果輸出到HDFS輸出路徑;
4.hive+sqoop實現將結果轉儲到mysql
5.我們會使用crontab定時執行一個指令碼來做

具體這裡就不展開來說了,我會在另一個帖子講到。這裡我們詳細介紹第二個場景:實時計算。這個用的比較多的如天貓雙十一實時展示交易額,還有比如說銀行等。

    實現實時計算要用到storm或者spark等,這裡我介紹flume+kafka+storm方案。


使用flume採集日誌資料,flume是一個分散式、可靠和高可用的海量日誌採集、聚合和傳輸的系統。它的核心是一個agent,其中包含3個元件,source、channel和sink。Agent會監控日誌目錄,通過source元件將日誌蒐集到channel中快取起來,當sink處理完之後會將快取的記錄刪除,已經掃描過的檔案會新增.COMPLETED字尾,下次不會重新掃描該檔案。
因為日誌蒐集的速度和日誌處理的速度是不一樣的,所以加了一個kafka元件,其實也是作為一個緩衝的作用。Sink將日誌中一行資料作為訊息釋出到kafka中,storm通過kafkaSpout來消費kafka訊息,storm消費完一條訊息需要對kafkaSpout迴應(比如我們自定義bolt時如果是繼承BaseRichBolt,需要顯示呼叫collector.ack(tuple)和collector.fail(tuple)),這樣才不會重複消費訊息。 Storm從kafka中拿到一條資料,通過解析字串,對日誌做一定的日誌清洗工作,然後計算之後可以將資料存到mysql,然後就可以在前端展示了。我們需要自定義DailyStatisticsAnalysisTopology來處理任務程式碼如下:
pom.xml<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!-- kafka整合storm --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.3</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.3</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.30</version> </dependency> </dependencies> <build> <finalName>Storm_DailyStatisticsAnalysis</finalName> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.wyd.kafkastorm.DailyStatisticsAnalysisTopology</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> -----------------------------------------------------------------------------------------------public class DailyStatisticsAnalysisTopology { private static String topicName = "dailyStatisticsAnalysis"; private static String zkRoot = "/stormKafka/"+topicName; public static void main(String[] args) { BrokerHosts hosts = new ZkHosts("192.168.*.*:2181"); SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,zkRoot,UUID.randomUUID().toString()); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafkaSpout",kafkaSpout); builder.setBolt("dailyStatisticsAnalysisBolt", new DailyStatisticsAnalysisBolt(), 2).shuffleGrouping("kafkaSpout"); Config conf = new Config(); conf.setDebug(true); if(args != null && args.length > 0) { conf.setNumWorkers(1); try { StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); }catch (Exception e) { e.printStackTrace(); } } else { conf.setMaxSpoutPending(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("dailyAnalysis", conf, builder.createTopology()); } }}-----------------------------------------------------------------------------public class DailyStatisticsAnalysisBolt extends BaseRichBolt { /** * */ private static final long serialVersionUID = 2262767962772699286L; private OutputCollector _collector; LogInfoHandler loginfohandler = new LogInfoHandler(); @Override public void execute(Tuple tuple) { // 存入mysql try{ String value = tuple.getString(0); loginfohandler.splitHandl(value); DbUtil.insert(loginfohandler.getTarget(), loginfohandler.getTime(), loginfohandler.getDistrictServer(), loginfohandler.getChannel(), loginfohandler.getCounts());_collector.ack(tuple); }catch(Exception e){_collector.fail(tuple); e.printStackTrace(); } } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this._collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { }}-----------------------------------------------------------------------------進入maven專案的根目錄執行mvn assembly:assembly打包成功後會生成兩個jar包:
將他們上傳到storm目錄下,執行 nohup bin/storm jar Storm_DailyStatisticsAnalysis.jar com.wyd.kafkastorm.DailyStatisticsAnalysisTopology &