【2019春招準備:106.storm(1)】
0.簡介
a million tuples processed per second per node
單個節點美妙百萬數量級的實時計算
scalable 可擴充套件性
fault-tolerant 容錯性
生於Twitter,收購的BackType,並開源到apache
底層語言clojure,java混合體
api:javadoc
hadoop VS storm
hadoop: map reduce
storm:spout(產生資料來源) bolt(處理)
storm不同的是沒有結束程序,就算沒有資料進來,也不會結束(扶梯和電梯的區別)
使用場景不同(實時流處理-離線批處理)
storm VS sparkStreaming
並不是一個真正意義的實時流處理 微小批處理 可以設定,屬於spark生態圈
1. 核心概念
http://storm.apache.org/releases/1.2.2/Concepts.html
Streams 資料流
Spouts 產生資料流的東西(可能是多個)reliable(ack,fail), unreliable
Bolts 處理資料流的東西(可能是多個)filtering,functions,aggregations.joins,talking todatabases execute 多執行緒,非同步
Tuple 資料流裡的資料,nextTuple方法
Topologies 整個資料處理的生產線(類似於mapreduce)
2. storm程式設計
Idea & Maven 構建storm專案
ISpout介面
IComponent介面
IBolt介面
求和 + 詞頻統計案例
環境:jdk1.8 IDEA2018.2 Maven 3.5.4
【ISpout】:負責將資料傳送到topology中處理
storm會跟蹤每一個spout發出去的tuple的DAG(tuple中含一個messageID 任意型別)
storm在每一個執行緒裡面執行ack.fail.nextTuple方法,這意味著使用ISpout的時候不用擔心併發的問題,因為都是執行緒安全的。但是這些的前提是使用者必須保證,nextTuple是非阻塞的,不然ack和fail會被阻塞掉void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
一個task在worker上面執行的時候的初始化步驟void close();
spout 在shutdowm的時候呼叫的方法,但是並不保證close會被呼叫void nextTuple();傳送資料
storm 請求spout傳送tuple的時候呼叫這個方法,該方法不能阻塞void ack(Object msgId);
tuple處理成功,storm返回給spout成功的訊息
【IComponent】
給topology裡面的所有元件提供公用的方法void declareOutputFields(OutputFieldsDeclarer declarer);
用於宣告Spout/Bolt傳送的tuple的名稱
【IBolt】
一個ibolt代表一個元件(component),先接受拿出tuple並且處理它
能夠filtering joining functions和aggregation生命週期:
在客戶端上面建立,序列化,提交到主叢集上面(Nimbus)(有點像yarn中的resourcemanager)。Nimbus啟動一個worker去反序列化,啟動處理tuple的工作。void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
準備好一個outputCollectorvoid execute(Tuple input);
tuple含有自己的元資料:來源地,值(getValue可以訪問到)cleanup
資源清理操作,就算是寫了也不一定被執行
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
/**
* 使用storm實現累計求和的操作
*/
public class LocalSumStormTopology {
/**
* Spout需要繼承BaseRichSpout
* 資料來源需要產生資料併發射
*/
public static class DataSourceSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
int number=0;
/**
* 只會被呼叫一次
* @param conf 配置引數 暫時不管
* @param context 上下文
* @param collector 資料的發射器
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector=collector;
}
/**
* 會產生資料,在生產上肯定是從訊息佇列中獲取資料
* 應該設計成一個死迴圈,一直髮送,因為是處理實時流資料
*/
@Override
public void nextTuple() {
this.collector.emit(new Values(++number));//Values是一個ArrayList陣列,所有的構造方法的引數都能加入到陣列中
System.out.println("Spout:"+number);
//不要一次性太多;防止資料產生太快
Utils.sleep(1000);
}
/**
* 宣告輸出欄位
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("number_"));//因為前面values裡面只有一個東西,因襲這裡對應上只給一個名稱
}
}
/**
* 資料的累積求和Bolt:接受資料並處理
*/
public static class SumBolt extends BaseRichBolt{
int sum=0;
/**
* 初始化方法,只會被執行一次
* @param stormConf
* @param context
* @param collector 因此這一次的業務邏輯很簡單,不需要繼續往下面一個bolt傳送
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
/**
* 也是一個死迴圈:獲取spout傳送給過來的資料
* @param input
*/
@Override
public void execute(Tuple input) {
//可以bolt中獲取值,可以根據index獲取,也可以根據field名稱獲取,建議使用名稱獲取
Integer value = input.getIntegerByField("number_");
sum+=value;
System.out.println("Bolt: sum=[ "+sum+"]");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static void main(String[] args) {
/**
* 如果不是本地,而是真正在storm叢集上面提交,需要用的不是LocalCluster而是StormSubmitter
*/
LocalCluster cluster=new LocalCluster();//建立一個模擬的本地storm叢集,本地模式執行不需要搭建storm叢集
//builder中可以設定spout和bolt的執行順序,其中id都是自定義的
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout("DataSourceSpout_",new DataSourceSpout());
builder.setBolt("SumBolt_",new SumBolt()).shuffleGrouping("DataSourceSpout_");//shuffle指定執行順序
StormTopology topology = builder.createTopology();
/**
* 向叢集提交一個topology,引數在原始碼中並沒有,
* 第一個是類名稱
* 第二個是new Config
* 第三個是一StormTopology,使用TopologyBuilder建立
*/
cluster.submitTopology("LocalSumStormTopology",new Config(),topology);
}
}
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
/**
* 使用storm實現累計求和的操作
*/
public class LocalSumStormTopology {
/**
* Spout需要繼承BaseRichSpout
* 資料來源需要產生資料併發射
*/
public static class DataSourceSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
int number=0;
/**
* 只會被呼叫一次
* @param conf 配置引數 暫時不管
* @param context 上下文
* @param collector 資料的發射器
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector=collector;
}
/**
* 會產生資料,在生產上肯定是從訊息佇列中獲取資料
* 應該設計成一個死迴圈,一直髮送,因為是處理實時流資料
*/
@Override
public void nextTuple() {
this.collector.emit(new Values(++number));//Values是一個ArrayList陣列,所有的構造方法的引數都能加入到陣列中
System.out.println("Spout:"+number);
//不要一次性太多;防止資料產生太快
Utils.sleep(1000);
}
/**
* 宣告輸出欄位
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("number_"));//因為前面values裡面只有一個東西,因襲這裡對應上只給一個名稱
}
}
/**
* 資料的累積求和Bolt:接受資料並處理
*/
public static class SumBolt extends BaseRichBolt{
int sum=0;
/**
* 初始化方法,只會被執行一次
* @param stormConf
* @param context
* @param collector 因此這一次的業務邏輯很簡單,不需要繼續往下面一個bolt傳送
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
/**
* 也是一個死迴圈:獲取spout傳送給過來的資料
* @param input
*/
@Override
public void execute(Tuple input) {
//可以bolt中獲取值,可以根據index獲取,也可以根據field名稱獲取,建議使用名稱獲取
Integer value = input.getIntegerByField("number_");
sum+=value;
System.out.println("Bolt: sum=[ "+sum+"]");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static void main(String[] args) {
/**
* 如果不是本地,而是真正在storm叢集上面提交,需要用的不是LocalCluster而是StormSubmitter
*/
LocalCluster cluster=new LocalCluster();//建立一個模擬的本地storm叢集,本地模式執行不需要搭建storm叢集
//builder中可以設定spout和bolt的執行順序,其中id都是自定義的
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout("DataSourceSpout_",new DataSourceSpout());
builder.setBolt("SumBolt_",new SumBolt()).shuffleGrouping("DataSourceSpout_");//shuffle指定執行順序
StormTopology topology = builder.createTopology();
/**
* 向叢集提交一個topology,引數在原始碼中並沒有,
* 第一個是類名稱
* 第二個是new Config
* 第三個是一StormTopology,使用TopologyBuilder建立
*/
cluster.submitTopology("LocalSumStormTopology",new Config(),topology);
}
}
但是發現結果hin恐怖:
a=8425
b=16849
c=8425
d=12636
a=8425
b=16850
c=8425
d=12636
a=8425
b=16851
c=8425
d=12636
a=8425
b=16851
c=8425
d=12637
a=8425
b=16851
c=8425
d=12638
。。。
原因是:storm中,不管是Spout的nextTuple還是Bolt中的execute都是死迴圈(因為是處理實時流資料的),所以在讀取和傳送資料的時候是永動的!!!!
(解決方法,讀完檔案之後將檔名稱改掉,因為設定的時候讀取的是該資料夾下面的字尾txt檔案)
import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.io.File;
import java.io.IOException;
import java.util.*;
/**
* 使用storm完成詞頻統計功能
*/
public class LocalWordCountStormTopology {
public static class DataSourceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
/**
* 處理的業務邏輯
* 1.讀取指定目錄的資料夾下的資料D:\ZZBfiles\StormFile\wordCount
* 2.每一行資料發射出去
*/
@Override
public void nextTuple() {
//採用的common.io裡面的工具類方便讀取操作,不要老是inputstream
//獲取所有檔案
Collection<File> files = FileUtils.listFiles(new File("D:\\ZZBfiles\\StormFile\\wordCount"),
new String[]{"txt"}, true);
for (File file : files) {//獲取一個檔案
try {
List<String> lines = FileUtils.readLines(file);
for (String line : lines) {//獲取檔案中的一行
this.collector.emit(new Values(line));//傳送這一行
}
//在這裡修改檔名稱就不會重複讀取了
FileUtils.moveFile(file,new File(file.getAbsolutePath()+System.currentTimeMillis()));
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line_"));
}
}
/**
* 對資料進行分割,併發送分隔好的單詞出去
*/
public static class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* 業務邏輯:
* line對其分割,按照“,”
*
* @param input
*/
@Override
public void execute(Tuple input) {
String line = input.getStringByField("line_");
String[] words = line.split(",");
for(String word:words){
this.collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word_"));
}
}
/**
* 詞頻彙總Bolt
*/
public static class CountBolt extends BaseRichBolt{
Map<String,Integer> map=new HashMap<>();
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
/**
* 業務邏輯:
* 1.獲取每一個單詞
* 2.對所有單詞進行彙總
* 3.輸出
* @param input
*/
@Override
public void execute(Tuple input) {
String word=input.getStringByField("word_");
Integer count=map.get(word);
if(count==null){
count=1;
}else{
count++;
}
map.put(word,count);//新增的時候hashmap會自動覆蓋相同的key的entry
System.out.println("======================");
Set<Map.Entry<String, Integer>> entrySet = map.entrySet();
for(Map.Entry<String, Integer> entry:entrySet){
System.out.println(entry);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static void main(String[] args) {
//topo建立
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout_",new DataSourceSpout());
builder.setBolt("SplitBolt_",new SplitBolt()).shuffleGrouping("DataSourceSpout_");
builder.setBolt("CountBolt_",new CountBolt()).shuffleGrouping("SplitBolt_");
//建立本地叢集
LocalCluster cluster=new LocalCluster();
cluster.submitTopology("LocalWordCountStormTopology",new Config(),builder.createTopology());
}
}
【注意事項】
- Spout還是Bolt的名稱不要重複
- 名稱不要以下劃線開頭
- topology名稱不能重複(就是說不能執行相同名稱的topology),但是local的時候似乎可以成功