1. 程式人生 > >Kafka+storm+hbase

Kafka+storm+hbase

本部落格基於以下軟體:

Centos 7.31611)
kafka_2.10-0.10.2.1.tgz
zookeeper-3.4.10.tar.gz
hbase-1.3.1-bin.tar.gz
apache-storm-1.1.0.tar.gz
hadoop-2.8.0.tar.gz
jdk-8u131-linux-x64.tar.gz
IntelliJ IDEA 2017.1.3 x64

IP role
172.17.11.85 namenode、secondarynamenode、datanode、hmaster、HRegionServer
172.17.11.86 DataNode、HRegionServer
172.17.11.87 DataNode、HRegionServer

1.首先先理一理思路,kafka–>storm

我使用一個生產者給一個固定的Topic下生產資料


public class Producer {
    private final KafkaProducer<String, String> producer;
    private final String topic;

    public Producer(String topic) {
        Properties props = new
Properties(); props.put("bootstrap.servers", "172.17.11.85:9092,172.17.11.86:9092,172.17.11.87:9092"); props.put("client.id", "DemoProducer"); props.put("batch.size", 16384);//16M props.put("linger.ms", 1000); props.put("buffer.memory", 33554432);//32M props.put("key.serializer"
, "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); this.topic = topic; } public void producerMsg() throws InterruptedException { String data = "Apache Storm is a free and open source distributed realtime computation system Storm makes it easy to reliably process unbounded streams of data doing for realtime processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a lot of fun to use!\n" + "Storm has many use cases: realtime analytics, online machine learning, continuous computation, distributed RPC, ETL, and more. Storm is fast: a benchmark clocked it at over a million tuples processed per second per node. It is scalable, fault-tolerant, guarantees your data will be processed, and is easy to set up and operate.\n" + "Storm integrates with the queueing and database technologies you already use. A Storm topology consumes streams of data and processes those streams in arbitrarily complex ways, repartitioning the streams between each stage of the computation however needed. Read more in the tutorial."; data = data.replaceAll("[\\pP‘’“”]", ""); String[] words = data.split(" "); Random _rand = new Random(); Random rnd = new Random(); int events = 10; for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime(); int lastIPnum = rnd.nextInt(255); String ip = "192.168.2." + lastIPnum; String msg = words[_rand.nextInt(words.length)]; try { producer.send(new ProducerRecord<>(topic, ip, msg)); System.out.println("Sent message: (" + ip + ", " + msg + ")"); } catch (Exception e) { e.printStackTrace(); } } Thread.sleep(10000); } public static void main(String[] args) throws InterruptedException { Producer producer = new Producer(Constants.TOPIC); producer.producerMsg(); } }

生產者將在兩句英文去除標點 符號之後拆分為單個單詞,然後生產到執行的主題下
這應該是沒有什麼問題的,接下來就是消費者,同時又是Storm的Spout的kafkaSpout:

KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig
                    .builder(args[0], args[1])
                    .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
                    .setProp(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000)
                    .setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000)
                    .setOffsetCommitPeriodMs(10000)
                    .setGroupId(args[2])
                    .setMaxUncommittedOffsets(250)
                    .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
                    .build();



KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);

消費者(Spout)指定的主題消費資料然後發射到下一個Bolt


public class WordCountBolt extends BaseBasicBolt {
    private Map<String, Integer> counts = new HashMap<>();

    public void execute(Tuple input, BasicOutputCollector collector) {
        String level = input.getStringByField("value");
        Integer count = counts.get(level);
        if (count == null)
            count = 0;
        count++;
        counts.put(level, count);
        System.out.println("WordCountBolt Receive : "+level+"   "+count);
        collector.emit(new Values(level, count.toString()));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

2.storm->hbase

首先要從叢集中拷貝下hbase-site.xml配置檔案
這裡寫圖片描述

接下來就是API的呼叫:

 SimpleHBaseMapper Mapper = new SimpleHBaseMapper()
                    .withRowKeyField("word")
                    .withColumnFields(new Fields("count"))
                    .withColumnFamily("result");
            HBaseBolt hbaseBolt = new HBaseBolt(args[3], Mapper)
                    .withConfigKey("hbase");

3.整個拓撲的構造

builder.setSpout("kafkaSpout", kafkaSpout, 1);
//            builder.setBolt("wordSplitBolt", new WordSplitBolt(), 2)
//                    .shuffleGrouping("kafkaSpout");
            builder.setBolt("countBolt", new WordCountBolt(), 2)
                    .fieldsGrouping("kafkaSpout", new Fields("value"));
            builder.setBolt("HbaseBolt", hbaseBolt, 1)
                    .addConfiguration("hbase", new HashMap<String, Object>())
                    .shuffleGrouping("countBolt");

接下來才是真正重點的時候,重點!重點!重點!

重點1:-pom檔案的版本資訊

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.1.0</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-hbase</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

對的!我匯入的是hadoop-client 2.7.3,至於為什麼呢?如果我寫成2.8.0,那將會產生以下異常

java.lang.NoSuchMethodError: org.apache.hadoop.security.authentication.util.KerberosUtil.hasKerberosTicket(Ljavax/security/auth/Subject;)Z
    at org.apache.hadoop.security.UserGroupInformation.<init>(UserGroupInformation.java:652) ~[hadoop-common-2.8.0.jar:?]
    at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:843) ~[hadoop-common-2.8.0.jar:?]
    at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:802) ~[hadoop-common-2.8.0.jar:?]
    at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:675) ~[hadoop-common-2.8.0.jar:?]
    at org.apache.hadoop.hbase.security.User$SecureHadoopUser.<init>(User.java:285) ~[hbase-common-1.1.0.jar:1.1.0]
    at org.apache.hadoop.hbase.security.User$SecureHadoopUser.<init>(User.java:281) ~[hbase-common-1.1.0.jar:1.1.0]
    at org.apache.hadoop.hbase.security.User.getCurrent(User.java:185) ~[hbase-common-1.1.0.jar:1.1.0]
    at org.apache.hadoop.hbase.security.UserProvider.getCurrent(UserProvider.java:88) ~[hbase-common-1.1.0.jar:1.1.0]
    at org.apache.storm.hbase.common.HBaseClient.<init>(HBaseClient.java:43) ~[storm-hbase-1.1.0.jar:1.1.0]
    at org.apache.storm.hbase.bolt.AbstractHBaseBolt.prepare(AbstractHBaseBolt.java:75) ~[storm-hbase-1.1.0.jar:1.1.0]
    at org.apache.storm.hbase.bolt.HBaseBolt.prepare(HBaseBolt.java:109) ~[storm-hbase-1.1.0.jar:1.1.0]
    at org.apache.storm.daemon.executor$fn__5044$fn__5057.invoke(executor.clj:791) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482) [storm-core-1.1.0.jar:1.1.0]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
72750 [Thread-22-HbaseBolt-executor[1 1]] ERROR o.a.s.d.executor - 
java.lang.NoSuchMethodError: org.apache.hadoop.security.authentication.util.KerberosUtil.hasKerberosTicket(Ljavax/security/auth/Subject;)Z
    at org.apache.hadoop.security.UserGroupInformation.<init>(UserGroupInformation.java:652) ~[hadoop-common-2.8.0.jar:?]
    at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:843) ~[hadoop-common-2.8.0.jar:?]
    at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:802) ~[hadoop-common-2.8.0.jar:?]
    at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:675) ~[hadoop-common-2.8.0.jar:?]
    at org.apache.hadoop.hbase.security.User$SecureHadoopUser.<init>(User.java:285) ~[hbase-common-1.1.0.jar:1.1.0]
    at org.apache.hadoop.hbase.security.User$SecureHadoopUser.<init>(User.java:281) ~[hbase-common-1.1.0.jar:1.1.0]
    at org.apache.hadoop.hbase.security.User.getCurrent(User.java:185) ~[hbase-common-1.1.0.jar:1.1.0]
    at org.apache.hadoop.hbase.security.UserProvider.getCurrent(UserProvider.java:88) ~[hbase-common-1.1.0.jar:1.1.0]
    at org.apache.storm.hbase.common.HBaseClient.<init>(HBaseClient.java:43) ~[storm-hbase-1.1.0.jar:1.1.0]
    at org.apache.storm.hbase.bolt.AbstractHBaseBolt.prepare(AbstractHBaseBolt.java:75) ~[storm-hbase-1.1.0.jar:1.1.0]
    at org.apache.storm.hbase.bolt.HBaseBolt.prepare(HBaseBolt.java:109) ~[storm-hbase-1.1.0.jar:1.1.0]
    at org.apache.storm.daemon.executor$fn__5044$fn__5057.invoke(executor.clj:791) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482) [storm-core-1.1.0.jar:1.1.0]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
72787 [Thread-22-HbaseBolt-executor[1 1]] ERROR o.a.s.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.1.0.jar:1.1.0]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
    at org.apache.storm.daemon.worker$fn__5642$fn__5643.invoke(worker.clj:759) [storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.daemon.executor$mk_executor_data$fn__4863$fn__4864.invoke(executor.clj:274) [storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:494) [storm-core-1.1.0.jar:1.1.0]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]

應該是版本不相容引起的

重點2:log4j-over-slf4j.jar AND slf4j-log4j12.jar衝突

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/geekp/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.8/log4j-slf4j-impl-2.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/geekp/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]


....

SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. 
SLF4J: See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.

[Thread-22-HbaseBolt-executor[1 1]] ERROR o.a.s.util - Async loop died!
java.lang.NoSuchMethodError: org.apache.hadoop.security.authentication.util.KerberosUtil.hasKerberosTicket(Ljavax/security/auth/Subject;)Z
    at org.apache.hadoop.security.UserGroupInformation.<init>(UserGroupInformation.java:652) ~[hadoop-common-2.8.0.jar:?]
    at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:843) ~[hadoop-common-2.8.0.jar:?]
    at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:802) ~[hadoop-common-2.8.0.jar:?]
    at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:675) ~[hadoop-common-2.8.0.jar:?]
    at org.apache.hadoop.hbase.security.User$SecureHadoopUser.<init>(User.java:285) ~[hbase-common-1.1.0.jar:1.1.0]
    at org.apache.hadoop.hbase.security.User$SecureHadoopUser.<init>(User.java:281) ~[hbase-common-1.1.0.jar:1.1.0]
    at org.apache.hadoop.hbase.security.User.getCurrent(User.java:185) ~[hbase-common-1.1.0.jar:1.1.0]
    at org.apache.hadoop.hbase.security.UserProvider.getCurrent(UserProvider.java:88) ~[hbase-common-1.1.0.jar:1.1.0]
    at org.apache.storm.hbase.common.HBaseClient.<init>(HBaseClient.java:43) ~[storm-hbase-1.1.0.jar:1.1.0]
    at org.apache.storm.hbase.bolt.AbstractHBaseBolt.prepare(AbstractHBaseBolt.java:75) ~[storm-hbase-1.1.0.jar:1.1.0]
    at org.apache.storm.hbase.bolt.HBaseBolt.prepare(HBaseBolt.java:109) ~[storm-hbase-1.1.0.jar:1.1.0]
    at org.apache.storm.daemon.executor$fn__5044$fn__5057.invoke(executor.clj:791) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482) [storm-core-1.1.0.jar:1.1.0]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
71976 [Thread-22-HbaseBolt-executor[1 1]] ERROR o.a.s.d.executor - 
java.lang.NoSuchMethodError: org.apache.hadoop.security.authentication.util.KerberosUtil.hasKerberosTicket(Ljavax/security/auth/Subject;)Z
    at org.apache.hadoop.security.UserGroupInformation.<init>(UserGroupInformation.java:652) ~[hadoop-common-2.8.0.jar:?]
    at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:843) ~[hadoop-common-2.8.0.jar:?]
    at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:802) ~[hadoop-common-2.8.0.jar:?]
    at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:675) ~[hadoop-common-2.8.0.jar:?]
    at org.apache.hadoop.hbase.security.User$SecureHadoopUser.<init>(User.java:285) ~[hbase-common-1.1.0.jar:1.1.0]
    at org.apache.hadoop.hbase.security.User$SecureHadoopUser.<init>(User.java:281) ~[hbase-common-1.1.0.jar:1.1.0]
    at org.apache.hadoop.hbase.security.User.getCurrent(User.java:185) ~[hbase-common-1.1.0.jar:1.1.0]
    at org.apache.hadoop.hbase.security.UserProvider.getCurrent(UserProvider.java:88) ~[hbase-common-1.1.0.jar:1.1.0]
    at org.apache.storm.hbase.common.HBaseClient.<init>(HBaseClient.java:43) ~[storm-hbase-1.1.0.jar:1.1.0]
    at org.apache.storm.hbase.bolt.AbstractHBaseBolt.prepare(AbstractHBaseBolt.java:75) ~[storm-hbase-1.1.0.jar:1.1.0]
    at org.apache.storm.hbase.bolt.HBaseBolt.prepare(HBaseBolt.java:109) ~[storm-hbase-1.1.0.jar:1.1.0]
    at org.apache.storm.daemon.executor$fn__5044$fn__5057.invoke(executor.clj:791) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482) [storm-core-1.1.0.jar:1.1.0]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
71976 [Thread-26-kafkaSpout-executor[4 4]] INFO  o.a.s.k.s.KafkaSpout - Initialization complete
71992 [Thread-22-HbaseBolt-executor[1 1]] ERROR o.a.s.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.1.0.jar:1.1.0]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
    at org.apache.storm.daemon.worker$fn__5642$fn__5643.invoke(worker.clj:759) [storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.daemon.executor$mk_executor_data$fn__4863$fn__4864.invoke(executor.clj:274) [storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:494) [storm-core-1.1.0.jar:1.1.0]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]

Process finished with exit code 1

這個只需要不引入slf4j-log4j12就可以了,修改一下pom檔案:

<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

重點3:在伺服器上的Hbase配置檔案hbase-site.xml
這個問題真的特別特別重要,md困擾了我一天

我在伺服器叢集上的配置檔案是這樣的

<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://master:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>master,slave1,slave2</value>
</property>
<property>
<name>hbase.master.info.bindAddress</name>
<value>0.0.0.0</