Storm入門之第三章拓撲
在這一章,你將學到如何在同一個Storm拓撲結構內的不同元件之間傳遞元組,以及如何向一個執行中的Storm叢集釋出一個拓撲。
資料流組
設計一個拓撲時,你要做的最重要的事情之一就是定義如何在各元件之間交換資料(資料流是如何被bolts消費的)。一個資料流組指定了每個bolt會消費哪些資料流,以及如何消費它們。
NOTE:一個節點能夠釋出一個以上的資料流,一個數據流組允許我們選擇接收哪個。
資料流組在定義拓撲時設定,就像我們在第二章看到的:
··· builder.setBolt("word-normalizer", new WordNormalizer()) .shuffleGrouping("word-reader"); ···
在前面的程式碼塊裡,一個bolt由TopologyBuilder物件設定, 然後使用隨機資料流組指定資料來源。資料流組通常將資料來源元件的ID作為引數,取決於資料流組的型別不同還有其它可選引數。
NOTE:每個InputDeclarer可以有一個以上的資料來源,而且每個資料來源可以分到不同的組。
隨機資料流組
隨機流組是最常用的資料流組。它只有一個引數(資料來源元件),並且資料來源會向隨機選擇的bolt傳送元組,保證每個消費者收到近似數量的元組。
隨機資料流組用於數學計算這樣的原子操作。然而,如果操作不能被隨機分配,就像第二章為單詞計數的例子,你就要考慮其它分組方式了。
域資料流組
域資料流組允許你基於元組的一個或多個域控制如何把元組傳送給bolts
··· builder.setBolt("word-counter", new WordCounter(),2) .fieldsGrouping("word-normalizer", new Fields("word")); ···
NOTE: 在域資料流組中的所有域集合必須存在於資料來源的域宣告中。
全部資料流組
全部資料流組,為每個接收資料的例項複製一份元組副本。這種分組方式用於向bolts
public void execute(Tuple input) { String str = null; try{ if(input.getSourceStreamId().equals("signals")){ str = input.getStringByField("action"); if("refreshCache".equals(str)) counters.clear(); } }catch (IllegalArgumentException e){ //什麼也不做 } ··· }
我們添加了一個if分支,用來檢查源資料流。Storm允許我們宣告具名資料流(如果你不把元組傳送到一個具名資料流,預設傳送到名為”default“的資料流)。這是一個識別元組的極好的方式,就像這個例子中,我們想識別signals一樣。 在拓撲定義中,你要向word-counter bolt新增第二個資料流,用來接收從signals-spout資料流傳送到所有bolt例項的每一個元組。
builder.setBolt("word-counter", new WordCounter(),2) .fieldsGroupint("word-normalizer",new Fields("word")) .allGrouping("signals-spout","signals");
signals-spout的實現請參考git倉庫。
自定義資料流組
你可以通過實現backtype.storm.grouping.CustormStreamGrouping介面建立自定義資料流組,讓你自己決定哪些bolt接收哪些元組。
讓我們修改單詞計數器示例,使首字母相同的單詞由同一個bolt接收。
public class ModuleGrouping mplents CustormStreamGrouping, Serializable{ int numTasks = 0; @Override public List<Integer> chooseTasks(List<Object> values) { List<Integer> boltIds = new ArrayList<Integer>(); if(values.size()>0){ String str = values.get(0).toString(); if(str.isEmpty()){ boltIds.add(0); }else{ boltIds.add(str.charAt(0) % numTasks); } } return boltIds; } @Override public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) { numTasks = targetTasks.size(); } }
這是一個CustomStreamGrouping的簡單實現,在這裡我們採用單詞首字母字元的整數值與任務數的餘數,決定接收元組的bolt。
按下述方式word-normalizer修改即可使用這個自定義資料流組。
builder.setBolt("word-normalizer", new WordNormalizer()) .customGrouping("word-reader", new ModuleGrouping());
直接資料流組
這是一個特殊的資料流組,資料來源可以用它決定哪個元件接收元組。與前面的例子類似,資料來源將根據單詞首字母決定由哪個bolt接收元組。要使用直接資料流組,在WordNormalizer bolt中,使用emitDirect方法代替emit。
public void execute(Tuple input) { ... for(String word : words){ if(!word.isEmpty()){ ... collector.emitDirect(getWordCountIndex(word),new Values(word)); } } //對元組做出應答 collector.ack(input); } public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase(); if(word.isEmpty()){ return 0; }else{ return word.charAt(0) % numCounterTasks; } }
在prepare方法中計算任務數
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.numCounterTasks = context.getComponentTasks("word-counter"); }
在拓撲定義中指定資料流將被直接分組:
builder.setBolt("word-counter", new WordCounter(),2) .directGrouping("word-normalizer");
全域性資料流組
全域性資料流組把所有資料來源建立的元組傳送給單一目標例項(即擁有最低ID的任務)。
不分組
寫作本書時(Stom0.7.1版),這個資料流組相當於隨機資料流組。也就是說,使用這個資料流組時,並不關心資料流是如何分組的。
LocalCluster VS StormSubmitter
到目前為止,你已經用一個叫做LocalCluster的工具在你的本地機器上運行了一個拓撲。Storm的基礎工具,使你能夠在自己的計算機上方便的執行和除錯不同的拓撲。但是你怎麼把自己的拓撲提交給執行中的Storm叢集呢?Storm有一個有趣的功能,在一個真實的叢集上執行自己的拓撲是很容易的事情。要實現這一點,你需要把LocalCluster換成StormSubmitter並實現submitTopology方法, 它負責把拓撲傳送給叢集。
下面是修改後的程式碼:
//LocalCluster cluster = new LocalCluster(); //cluster.submitTopology("Count-Word-Topology-With-Refresh-Cache", conf, //builder.createTopology()); StormSubmitter.submitTopology("Count-Word-Topology-With_Refresh-Cache", conf, builder.createTopology()); //Thread.sleep(1000); //cluster.shutdown();
NOTE: 當你使用StormSubmitter時,你就不能像使用LocalCluster時一樣通過程式碼控制叢集了。
接下來,把原始碼壓縮成一個jar包,執行Storm客戶端命令,把拓撲提交給叢集。如果你已經使用了Maven, 你只需要在命令列進入原始碼目錄執行:mvn package。
現在你生成了一個jar包,使用storm jar命令提交拓撲(關於如何安裝Storm客戶端請參考附錄A)。命令格式:storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3。
對於這個例子,在拓撲工程目錄下面執行:
storm jar target/Topologies-0.0.1-SNAPSHOT.jar countword.TopologyMain src/main/resources/words.txt
通過這些命令,你就把拓撲釋出叢集上了。
如果想停止或殺死它,執行:
storm kill Count-Word-Topology-With-Refresh-Cache
NOTE:拓撲名稱必須保證惟一性。
NOTE:如何安裝Storm客戶端,參考附錄A
DRPC拓撲
有一種特殊的拓撲型別叫做分散式遠端過程呼叫(DRPC),它利用Storm的分散式特性執行遠端過程呼叫(RPC)(見下圖)。Storm提供了一些用來實現DRPC的工具。第一個是DRPC伺服器,它就像是客戶端和Storm拓撲之間的聯結器,作為拓撲的spout的資料來源。它接收一個待執行的函式和函式引數,然後對於函式操作的每一個數據塊,這個伺服器都會通過拓撲分配一個請求ID用來識別RPC請求。拓撲執行最後的bolt時,它必須分配RPC請求ID和結果,使DRPC伺服器把結果返回正確的客戶端。
NOTE:單例項DRPC伺服器能夠執行許多函式。每個函式由一個惟一的名稱標識。
Storm提供的第二個工具(已在例子中用過)是LineDRPCTopologyBuilder,一個輔助構建DRPC拓撲的抽象概念。生成的拓撲建立DRPCSpouts——它連線到DRPC伺服器並向拓撲的其它部分分發資料——幷包裝bolts,使結果從最後一個bolt返回。依次執行所有新增到LinearDRPCTopologyBuilder物件的bolts。
作為這種型別的拓撲的一個例子,我們建立了一個執行加法運算的程序。雖然這是一個簡單的例子,但是這個概念可以擴充套件到複雜的分散式計算。
bolt按下面的方式宣告輸出:
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id","result")); }
因為這是拓撲中惟一的bolt,它必須釋出RPC ID和結果。execute方法負責執行加法運算。
public void execute(Tuple input) { String[] numbers = input.getString(1).split("\\+"); Integer added = 0; if(numbers.length<2){ throw new InvalidParameterException("Should be at least 2 numbers"); } for(String num : numbers){ added += Integer.parseInt(num); } collector.emit(new Values(input.getValue(0),added)); }
包含加法bolt的拓撲定義如下:
public static void main(String[] args) { LocalDRPC drpc = new LocalDRPC(); LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add"); builder.addBolt(AdderBolt(),2); Config conf = new Config(); conf.setDebug(true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpcder-topology", conf, builder.createLocalTopology(drpc)); String result = drpc.execute("add", "1+-1"); checkResult(result,0); result = drpc.execute("add", "1+1+5+10"); checkResult(result,17); cluster.shutdown(); drpc.shutdown(); }
建立一個LocalDRPC物件在本地執行DRPC伺服器。接下來,建立一個拓撲構建器(譯者注:LineDRpctopologyBuilder物件),把bolt新增到拓撲。執行DRPC物件(LocalDRPC物件)的execute方法測試拓撲。
NOTE:使用DRPCClient類連線遠端DRPC伺服器。DRPC伺服器暴露了Thrift API,因此可以跨語言程式設計;並且不論是在本地還是在遠端執行DRPC伺服器,它們的API都是相同的。 對於採用Storm配置的DRPC配置引數的Storm叢集,呼叫構建器物件的createRemoteTopology向Storm叢集提交一個拓撲,而不是呼叫createLocalTopology。