1. 程式人生 > >storm整合kafka,spout作為kafka的消費者

storm整合kafka,spout作為kafka的消費者

在之前的部落格中記錄,如何在專案storm中把每條記錄作為訊息傳送到kafka訊息佇列中的。這裡講述如何在storm中消費kafka佇列中的訊息。為何在專案中兩個拓撲檔案校驗和預處理之間要用kafka訊息佇列進行資料的暫存仍需要去落實。

專案中直接使用storm提供的kafkaSpout作為訊息佇列的消費者。實現spout從kafka訊息佇列獲取資料,作為拓撲的資料來源。

package com.lancy.topology;

import java.util.Arrays;

import org.apache.storm.Config;
import org.apache.storm
.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.kafka.BrokerHosts; import org.apache.storm.kafka
.KafkaSpout; import org.apache.storm.kafka.SpoutConfig; import org.apache.storm.kafka.StringScheme; import org.apache.storm.kafka.ZkHosts; import org.apache.storm.spout.SchemeAsMultiScheme; import org.apache.storm.topology.TopologyBuilder; import com.lancy.common.ConfigCommon; import com.lancy.common
.pre.TopoStaticName; import com.lancy.spout.GetDataFromKafkaSpoutBolt; public class LntPreHandleTopology implements Runnable { private static final String CONFIG_ZOOKEEPER_HOST = ConfigCommon.getInstance().ZOOKEEPER_HOST_PORT + "/kafka";//127.0.0.1:2181/kafka類似此 private static final String CONFIG_TOPIC = ConfigCommon.getInstance().KAFKA_LNT_VALID_DATA_TOPIC;//topic的名稱 private static final String CONFIG_OFFSET_ZK_PATH = "/kafka/storm_offset" + "/" + CONFIG_TOPIC;//偏移量offset的根目錄 private static final String CONFIG_OFFSET_ZK_CUSTOMER_GROUP_ID = ConfigCommon.getInstance().KAFKA_LNT_VALID_CUSTOMER_ID; @Override public void run() { exe(new String[] { "lnt" }); } public static void exe(String[] args) { // 註冊 ZooKeeper 主機 BrokerHosts brokerHosts = new ZkHosts(CONFIG_ZOOKEEPER_HOST, "/brokers"); // 配置 Spout SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, CONFIG_TOPIC, CONFIG_OFFSET_ZK_PATH,CONFIG_OFFSET_ZK_CUSTOMER_GROUP_ID); if (args == null || args.length == 0) { //如果輸入引數為空,這裡把這種情況弄成了本地模式 //KafkaSpout初始化時,會去取spoutConfig.zkServers 和 spoutConfig.zkPort 變數的值,而該值預設是沒塞的,所以是空, //那麼它就會去取當前執行的Storm所配置的zookeeper地址和埠,而本地執行的Storm,是一個臨時的zookeeper例項, //並不會真正持久化。所以,每次關閉後,資料就沒了。本地模式,要顯示的去配置 String CONFIG_OFFSET_ZK_HOST = ConfigCommon.getInstance().ZOOKEEPER_HOST; int CONFIG_OFFSET_ZK_PORT = Integer.parseInt(ConfigCommon.getInstance().ZOOKEEPER_PORT); // kafka offet記錄,,使用的zookeeper地址 spoutConfig.zkServers = Arrays.asList(CONFIG_OFFSET_ZK_HOST.split(",")); // kafka offet記錄,,使用的zookeeper埠 spoutConfig.zkPort = CONFIG_OFFSET_ZK_PORT; // spoutConfig.ignoreZkOffsets = true; } // spoutConfig.ignoreZkOffsets = true; // 配置 Scheme(可選) spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());//StringScheme告訴KafkaSpout如何去解碼資料,生成Storm內部傳遞資料 KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); TopologyBuilder builder = builderTopology(kafkaSpout); Config config = new Config(); config.setDebug(false); config.setNumWorkers(8); config.setNumAckers(8); config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10240); config.put(Config.TOPOLOGY_BACKPRESSURE_ENABLE, false); config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); if (args != null && args.length > 0) { try { StormSubmitter.submitTopology("prehanlder-topology", config, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace(); } } else { // 測試環境採用 local mode 本地模式 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("prehanlder-topology-local-mode", config, builder.createTopology()); try { Thread.sleep(12000 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } localCluster.killTopology("local-prehanlder-topology-local-mode"); localCluster.shutdown(); } } public static TopologyBuilder builderTopology(KafkaSpout kafkaSpout) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(TopoStaticName.KafkaSpout, kafkaSpout, 10); builder.setBolt(TopoStaticName.DATAFROMKAFKASPOUT, new GetDataFromKafkaSpoutBolt(), 10).shuffleGrouping(TopoStaticName.KafkaSpout); //省略後面的bolt return builder; } }

靜態的引數配置類

package com.lancy.common.pre;
/**
 * @ClassName: TopoStaticName
 * @Description: Topology靜態值
 */
public class TopoStaticName {
    // 資料處理Topology的Id
    public static final String  KafkaSpout                  = "01.KafkaSpout";
    public static final String  DATAFROMKAFKASPOUT          = "02.DataFromKafkaSpout";

}

後續瞭解如何初始化zookeeper節點資訊以及如何整合kafka,storm和zookeeper的,加油