storm整合kafka舊版API(offset In Zk)示例
阿新 • • 發佈:2018-12-14
編寫主函式啟動類的Topo
package com.simon.storm.kafka; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.kafka.BrokerHosts; import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig; import org.apache.storm.kafka.ZkHosts; import org.apache.storm.spout.MultiScheme; import org.apache.storm.spout.RawMultiScheme; import org.apache.storm.spout.Scheme; import org.apache.storm.spout.SchemeAsMultiScheme; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.UUID; /** * Created by Simon on 2018/11/1. */ public class OldKafkaSpout { public static String topic = "test"; public static String zkHosts="192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"; public static String zkRoot="/simon/offsets"; public static String zkAddress="192.168.1.101,192.168.1.102,192.168.1.103"; public static int zkPort= 2181; public static int fetchSizeBytes=1048576; public static int socketTimeoutMs=30000; public static int fetchMaxWait=30000; public static int bufferSizeBytes=1048576; public static boolean useStartOffsetTimeIfOffsetOutOfRange=true; public static int metricsTimeBucketSizeInSecs=60; public static void main(String[] args) { //建立TopologyBuilder TopologyBuilder builder = new TopologyBuilder(); /** * 建立BrokerHosts物件 */ BrokerHosts brokerHosts = new ZkHosts(zkHosts); /** * 建立SpoutConfig */ SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, UUID.randomUUID().toString()); //配置spoutConfig /** * zkServer */ spoutConfig.zkServers = Arrays.asList(zkAddress.split(",")); /** * zkPort */ spoutConfig.zkPort = zkPort; /** * 發給Kafka的每個FetchRequest中, * 用此指定想要的response中總的訊息的大小 */ spoutConfig.fetchSizeBytes = fetchSizeBytes; /** * 與Kafka broker的連線的socket超時時間 */ spoutConfig.socketTimeoutMs = socketTimeoutMs; /** * 當伺服器沒有新訊息時,消費者會等待這些時間 */ spoutConfig.fetchMaxWait = fetchMaxWait; /** * SimpleConsumer所使用的SocketChannel的讀緩衝區大小 */ spoutConfig.bufferSizeBytes = bufferSizeBytes; /** * 從Kafka中取出的byte[],該如何反序列化 */ //MultiScheme scheme = new RawMultiScheme(); /** * 如果所請求的offset對應的訊息在Kafka中不存在,是否使用startOffsetTime */ spoutConfig.useStartOffsetTimeIfOffsetOutOfRange =useStartOffsetTimeIfOffsetOutOfRange; /** * 多長時間統計一次metrics */ spoutConfig.metricsTimeBucketSizeInSecs = metricsTimeBucketSizeInSecs; /** * 拉取策略 */ //強制從最先開始 //spoutConfig.ignoreZkOffsets = true; //spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); //從最新開始 spoutConfig.ignoreZkOffsets = false; spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); /** * KafkaSpout讀取的進度與目標進度相差多少,相差太多,Spout會丟棄中間的訊息 */ //spoutConfig.maxOffsetBehind = Long.MAX_VALUE; /** * 設定失敗重試拉取 */ //spoutConfig.retryLimit = 0; /** * 接收訊息轉碼 */ spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme()); /** setSpot */ builder.setSpout("kafkaSpoutLCCont", new KafkaSpout(spoutConfig)); /** setBolt */ builder.setBolt("KafkaSpoutBolt", new KafkaSpoutBolt()).localOrShuffleGrouping("kafkaSpout"); Config config = new Config(); /** * 設定supervisor和worker之間的通訊超時時間. * 超過這個時間supervisor會重啟worker (秒) */ config.put("supervisor.worker.timeout.secs",600000); /** * 設定storm和zookeeper之間的超時時間. */ config.put("storm.zookeeper.session.timeout",1200000000); /** * 設定debug模式 日誌輸出更全 * 只能在本地LocalCluster模式下啟用 */ config.setDebug(true); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("OldKafkaSpout", config, builder.createTopology()); Utils.sleep(Long.MAX_VALUE); localCluster.shutdown(); } }
自定義訊息轉碼類MessageSchema
package com.simon.storm.schema; import org.apache.storm.spout.Scheme; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.List; /** * Created by Administrator on 2018/1/17. */ public class MessageScheme implements Scheme { private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8; public static final String STRING_SCHEME_KEY = "str"; @Override public List<Object> deserialize(ByteBuffer byteBuffer) { return new Values(deserializeString(byteBuffer)); } public static String deserializeString(ByteBuffer bytes) { if (bytes.hasArray()) { int base = bytes.arrayOffset(); return new String(bytes.array(), base + bytes.position(), bytes.remaining()); } else { return new String(Utils.toByteArray(bytes), UTF8_CHARSET); } } @Override public Fields getOutputFields() { return new Fields(STRING_SCHEME_KEY); } }
編寫邏輯處理類Bolt
package com.simon.storm.kafka; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; /** * Created by Simon on 2018/10/23. */ public class KafkaSpoutBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { //只做一個輸出 String string = tuple.getString(0); System.out.println(string); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }