1. 程式人生 > >大資料學習之路94-kafka叢集安裝

大資料學習之路94-kafka叢集安裝

解壓 Kafka 安裝包

修改配置檔案 config/server.properties

vi  server.properties
broker.id=0	//為依次增長的:0、1、2、3、4,叢集中唯一id
log.dirs=/kafkaData/logs // Kafka 的訊息資料儲存路徑zookeeper.connect=master:2181,slave1:2181,slave2:2181   //zookeeperServers   列表,各節點以逗號分開

Vi  zookeeper.properties
dataDir=/root/zkdata #指向你安裝的zk 的資料儲存目錄

#  將 Kafka server.properties	zookeeper.properties    檔案拷貝到其他節點機器
KAFKA_HOME/config>scp server.properties	zookeeper.properties xx:$PWD

在每臺節點上啟動:

bin/kafka-server-start.sh     config/server.properties  &

如果我們希望啟動在後臺,並且不把一堆日誌展現在頁面就可以這麼啟動:

bin/kafka-server-start.sh     config/server.properties  >> /var/kafka.log 2>&1 &

接下來我們檢視一下有哪些topic資訊,在預設情況下它沒有任何的topic:

bin/kafka-topics.sh --list --zookeeper localhost:2181

這裡的kafka-topics.sh相當於是一個客戶端,它如果想要看kafka裡面的資訊,就要連線到我們的叢集上。

所以客戶端就要先連線zookeeper才能連線到我們的叢集上。

我們如何使用kafka?其實使用kafka就是往kafka中寫資料和從kafka中讀取資料

我們在往kafka中寫資料之前,首先就要建立一個topic,就像我們在往資料庫中寫資料之前首先要建立一張表一樣。

這個topic其實就是一個分類,以後不同型別的資料寫到不同的topic

接下來我們來建立topic,其實我們在任何一臺機器上建立topic都可以,因為我們在一臺機器上建立,其他的機器會同步。

bin/kafka-topics.sh --create --zookeeper marshal:2181,marshal01:2181,marshal02:2181,
marshal03:2181,marshal04:2181,marshal05:2181 --replication-factor 3 --partitions 1 
--topic test

replication-factor 就是副本因子儲存3份

partitions  就是分割槽

如何刪除topic:

bin/kafka-topics.sh --delete --zookeeper marshal:2181,marshal01:2181,marshal02:2181,
marshal03:2181,marshal04:2181,marshal05:2181 --topic cmcc

生產者向topic中寫入資料:

bin/kafka-console-producer.sh --broker-list marshal:9092,marshal01:9092,marshal02:9092,
marshal03:9092,marshal04:9092,marshal05:9092 --topic test

消費者進行消費:

bin/kafka-console-consumer.sh --zookeeper marshal:2181,marshal01:2181,marshal02:2181,
marshal03:2181,marshal04:2181,marshal05:2181 --topic test --from-beginning

這裡的 --from-beginning就是從最開始讀。如果不加這個,則在消費者程序啟動之前的資料不會被讀到。

我們上面是通過命令列的方式進行消費的,我們還可以通過寫程式的方式來消費。

package com.xiaoniu.kafka;



import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;


import java.util.Properties;

public class ProducerDemo {
    public static void main(String[] args)throws Exception{
        //封裝配置引數
        Properties props = new Properties();
        //kafka的brokers列表
        props.setProperty("bootstrap.servers", "marshal:9092,marshal01:9092,marshal02:9092,marshal03:9092,marshal04:9092,marshal05:9092");
        //key和value的序列化方式,因為需要網路傳輸所以需要序列化
        props.setProperty("key.serializer", StringSerializer.class.getName());
        props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        /**
         * 傳送資料的時候是否需要應答
         * 取值範圍:
         *  [all, -1, 0, 1]
         *  0:leader不做任何應答
         *  1:leader會給producer做出應答
         *  all、-1:fllower->leader -> producer
         * 預設值:
         *  1
         */
        //props.setProperty("acks", "1");

        /**
         * 自定義分割槽
         * 預設值:org.apache.kafka.clients.producer.internals.DefaultPartitioner
         */
        //props.setProperty("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");

        //建立一個生產者的客戶端例項
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);

        int count = 0;
        while (count < 1000) {
            int partitionNum = count % 1;

            //封裝一條訊息
            ProducerRecord record = new ProducerRecord("test", partitionNum,"", count + "");
            //傳送一條訊息
            kafkaProducer.send(record);

            count++;
            Thread.sleep(1 * 1000);
        }
        //釋放
        kafkaProducer.close();
        System.out.println("send End...");

    }
}

package com.xiaoniu.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;

public class ConsumerDemo {
    public static void main(String[] args) {

        HashMap<String, Object> config  = new HashMap<String, Object>();
        config.put("bootstrap.servers", "marshal:9092,marshal01:9092,marshal02:9092,marshal03:9092,marshal04:9092,marshal05:9092");
        config.put("key.deserializer", StringDeserializer.class.getName());
        config.put("value.deserializer", StringDeserializer.class.getName());
        config.put("group.id", "g000001");

        /**
         * 從哪個位置開始獲取資料
         * 取值範圍:
         *  [latest, earliest, none]
         * 預設值:
         *  latest
         */
        config.put("auto.offset.reset", "earliest");
        /**
         * 是否要自動遞交偏移量(offset)這條資料在某個分割槽所在位置的編號
         */
        config.put("enable.auto.commit", false);

        //建立一個消費者客戶端例項
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(config);
        //訂閱主題(告訴客戶端從哪個主題獲取資料)
        kafkaConsumer.subscribe(Arrays.asList("test"));

        while (true) {
            //拉去資料, 會從kafka所有分割槽下拉取資料
            ConsumerRecords<String, String> records = kafkaConsumer.poll(2000);
            Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
            while (iterator.hasNext()) {
                ConsumerRecord<String, String> record = iterator.next();
                System.out.println("record = " + record);
            }
        }

        //釋放連線
        //kafkaConsumer.close();
    }
}

我們還可以檢視叢集的狀態:

活躍的分割槽的職責負責讀寫資料,不活躍的分割槽負責同步資料。