hadoop(十三)storm流式計算(實時處理)
storm介紹
Storm是一個開源的分散式實時計算系統,可以簡單、可靠的處理大量的資料流。被稱作“實時的hadoop”。Storm有很多使用場景:如實時分析,線上機器學習,持續計算, 分散式RPC,ETL等等。Storm支援水平擴充套件,具有高容錯性,保證每個訊息都會得到處理,而且處理速度很快(在一個小叢集中,每個結點每秒可以處理 數以百萬計的訊息)。Storm的部署和運維都很便捷,而且更為重要的是可以使用任意程式語言來開發應用。
高可靠性
高容錯性
Storm叢集和Hadoop叢集表面上看很類似。Hadoop上執行的是MapReduce jobs,而在Storm上執行的是拓撲(topology);
Hadoop擅長於分散式離線批處理,而Storm設計為支援分散式實時計算;
Hadoop新的spark元件提供了在hadoop平臺上執行storm的可能性
在深入理解Storm之前,需要了解一些概念:
Topologies : 拓撲,也俗稱一個任務
Spouts : 拓撲的訊息源
Bolts : 拓撲的處理邏輯單元
tuple:訊息元組
Streams : 流
Stream groupings :流的分組策略
Tasks : 任務處理單元
Executor :工作執行緒
Workers :工作程序
Configuration : topology的配置
Topology 與 Mapreduce
一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會執行(除非你手動kill掉)
Nimbus 與 ResourManager
在Storm的叢集裡面有兩種節點: 控制節點(master node)和工作節點(worker node)。控制節點上面執行一個叫
Supervisor (worker程序)與NodeManager(YarnChild)
每一個工作節點上面執行一個叫做Supervisor的節點。Supervisor會監聽分配給它那臺機器的工作,根據需要啟動/關閉工作程序。每一個工作程序執行一個topology的一個子集;一個執行的topology由執行在很多機器上的很多工作程序組成。
storm安裝
1、安裝一個zookeeper叢集
2、上傳storm的安裝包,解壓
3、修改配置檔案storm.yaml
#所使用的zookeeper叢集主機
storm.zookeeper.servers:
- "weekend05"
- "weekend06"
- "weekend07"
#nimbus所在的主機名
nimbus.host: "weekend05"
supervisor.slots.ports
-6701
-6702
-6703
-6704
-6705
啟動storm
在nimbus主機上
nohup ./storm nimbus 1>/dev/null 2>&1 &
nohup ./storm ui 1>/dev/null 2>&1 &
在supervisor主機上
nohup ./storm supervisor 1>/dev/null 2>&1 &
storm的深入學習:
分散式共享鎖的實現
事務topology的實現機制及開發模式
在具體場景中的跟其他框架的整合(flume/activeMQ/kafka(分散式的訊息佇列系統) /redis/hbase/mysql cluster)
demo
完成
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class RandomWordSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
//模擬一些資料
String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};
//不斷地往下一個元件傳送tuple訊息
//這裡面是該spout元件的核心邏輯
@Override
public void nextTuple() {
//可以從kafka訊息佇列中拿到資料,簡便起見,我們從words陣列中隨機挑選一個商品名傳送出去
Random random = new Random();
int index = random.nextInt(words.length);
//通過隨機數拿到一個商品名
String godName = words[index];
//將商品名封裝成tuple,傳送訊息給下一個元件
collector.emit(new Values(godName));
//每傳送一個訊息,休眠500ms
Utils.sleep(500);
}
//初始化方法,在spout元件例項化時呼叫一次
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
//宣告本spout元件傳送出去的tuple中的資料的欄位名
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("orignname"));
}
}
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class UpperBolt extends BaseBasicBolt{
//業務處理邏輯
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//先獲取到上一個元件傳遞過來的資料,資料在tuple裡面
String godName = tuple.getString(0);
//將商品名轉換成大寫
String godName_upper = godName.toUpperCase();
//將轉換完成的商品名傳送出去
collector.emit(new Values(godName_upper));
}
//宣告該bolt元件要發出去的tuple的欄位
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("uppername"));
}
}
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class SuffixBolt extends BaseBasicBolt{
FileWriter fileWriter = null;
//在bolt元件執行過程中只會被呼叫一次
@Override
public void prepare(Map stormConf, TopologyContext context) {
try {
fileWriter = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
//該bolt元件的核心處理邏輯
//每收到一個tuple訊息,就會被呼叫一次
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//先拿到上一個元件傳送過來的商品名稱
String upper_name = tuple.getString(0);
String suffix_name = upper_name + "_itisok";
//為上一個元件傳送過來的商品名稱新增字尾
try {
fileWriter.write(suffix_name);
fileWriter.write("\n");
fileWriter.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
//本bolt已經不需要傳送tuple訊息到下一個元件,所以不需要再宣告tuple的欄位
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
}
}
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
/**
* 組織各個處理元件形成一個完整的處理流程,就是所謂的topology(類似於mapreduce程式中的job)
* 並且將該topology提交給storm叢集去執行,topology提交到集群后就將永無休止地執行,除非人為或者異常退出
* @author [email protected]
*
*/
public class TopoMain {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
//將我們的spout元件設定到topology中去
//parallelism_hint :4 表示用4個excutor來執行這個元件
//setNumTasks(8) 設定的是該元件執行時的併發task數量,也就意味著1個excutor會執行2個task
builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);
//將大寫轉換bolt元件設定到topology,並且指定它接收randomspout元件的訊息
//.shuffleGrouping("randomspout")包含兩層含義:
//1、upperbolt元件接收的tuple訊息一定來自於randomspout元件
//2、randomspout元件和upperbolt元件的大量併發task例項之間收發訊息時採用的分組策略是隨機分組shuffleGrouping
builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
//將新增字尾的bolt元件設定到topology,並且指定它接收upperbolt元件的訊息
builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
//用builder來建立一個topology
StormTopology demotop = builder.createTopology();
//配置一些topology在叢集中執行時的引數
Config conf = new Config();
//這裡設定的是整個demotop所佔用的槽位數,也就是worker的數量
conf.setNumWorkers(4);
conf.setDebug(true);
conf.setNumAckers(0);
//將這個topology提交給storm叢集執行
StormSubmitter.submitTopology("demotopo", conf, demotop);
}
}