大資料學習之路94-kafka叢集安裝
阿新 • • 發佈:2018-11-09
解壓 Kafka 安裝包
修改配置檔案 config/server.properties
vi server.properties broker.id=0 //為依次增長的:0、1、2、3、4,叢集中唯一id log.dirs=/kafkaData/logs // Kafka 的訊息資料儲存路徑zookeeper.connect=master:2181,slave1:2181,slave2:2181 //zookeeperServers 列表,各節點以逗號分開 Vi zookeeper.properties dataDir=/root/zkdata #指向你安裝的zk 的資料儲存目錄 # 將 Kafka server.properties zookeeper.properties 檔案拷貝到其他節點機器 KAFKA_HOME/config>scp server.properties zookeeper.properties xx:$PWD
在每臺節點上啟動:
bin/kafka-server-start.sh config/server.properties &
如果我們希望啟動在後臺,並且不把一堆日誌展現在頁面就可以這麼啟動:
bin/kafka-server-start.sh config/server.properties >> /var/kafka.log 2>&1 &
接下來我們檢視一下有哪些topic資訊,在預設情況下它沒有任何的topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
這裡的kafka-topics.sh相當於是一個客戶端,它如果想要看kafka裡面的資訊,就要連線到我們的叢集上。
所以客戶端就要先連線zookeeper才能連線到我們的叢集上。
我們如何使用kafka?其實使用kafka就是往kafka中寫資料和從kafka中讀取資料
我們在往kafka中寫資料之前,首先就要建立一個topic,就像我們在往資料庫中寫資料之前首先要建立一張表一樣。
這個topic其實就是一個分類,以後不同型別的資料寫到不同的topic
接下來我們來建立topic,其實我們在任何一臺機器上建立topic都可以,因為我們在一臺機器上建立,其他的機器會同步。
bin/kafka-topics.sh --create --zookeeper marshal:2181,marshal01:2181,marshal02:2181,
marshal03:2181,marshal04:2181,marshal05:2181 --replication-factor 3 --partitions 1
--topic test
replication-factor 就是副本因子儲存3份
partitions 就是分割槽
如何刪除topic:
bin/kafka-topics.sh --delete --zookeeper marshal:2181,marshal01:2181,marshal02:2181,
marshal03:2181,marshal04:2181,marshal05:2181 --topic cmcc
生產者向topic中寫入資料:
bin/kafka-console-producer.sh --broker-list marshal:9092,marshal01:9092,marshal02:9092,
marshal03:9092,marshal04:9092,marshal05:9092 --topic test
消費者進行消費:
bin/kafka-console-consumer.sh --zookeeper marshal:2181,marshal01:2181,marshal02:2181,
marshal03:2181,marshal04:2181,marshal05:2181 --topic test --from-beginning
這裡的 --from-beginning就是從最開始讀。如果不加這個,則在消費者程序啟動之前的資料不會被讀到。
我們上面是通過命令列的方式進行消費的,我們還可以通過寫程式的方式來消費。
package com.xiaoniu.kafka;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args)throws Exception{
//封裝配置引數
Properties props = new Properties();
//kafka的brokers列表
props.setProperty("bootstrap.servers", "marshal:9092,marshal01:9092,marshal02:9092,marshal03:9092,marshal04:9092,marshal05:9092");
//key和value的序列化方式,因為需要網路傳輸所以需要序列化
props.setProperty("key.serializer", StringSerializer.class.getName());
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/**
* 傳送資料的時候是否需要應答
* 取值範圍:
* [all, -1, 0, 1]
* 0:leader不做任何應答
* 1:leader會給producer做出應答
* all、-1:fllower->leader -> producer
* 預設值:
* 1
*/
//props.setProperty("acks", "1");
/**
* 自定義分割槽
* 預設值:org.apache.kafka.clients.producer.internals.DefaultPartitioner
*/
//props.setProperty("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
//建立一個生產者的客戶端例項
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
int count = 0;
while (count < 1000) {
int partitionNum = count % 1;
//封裝一條訊息
ProducerRecord record = new ProducerRecord("test", partitionNum,"", count + "");
//傳送一條訊息
kafkaProducer.send(record);
count++;
Thread.sleep(1 * 1000);
}
//釋放
kafkaProducer.close();
System.out.println("send End...");
}
}
package com.xiaoniu.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
public class ConsumerDemo {
public static void main(String[] args) {
HashMap<String, Object> config = new HashMap<String, Object>();
config.put("bootstrap.servers", "marshal:9092,marshal01:9092,marshal02:9092,marshal03:9092,marshal04:9092,marshal05:9092");
config.put("key.deserializer", StringDeserializer.class.getName());
config.put("value.deserializer", StringDeserializer.class.getName());
config.put("group.id", "g000001");
/**
* 從哪個位置開始獲取資料
* 取值範圍:
* [latest, earliest, none]
* 預設值:
* latest
*/
config.put("auto.offset.reset", "earliest");
/**
* 是否要自動遞交偏移量(offset)這條資料在某個分割槽所在位置的編號
*/
config.put("enable.auto.commit", false);
//建立一個消費者客戶端例項
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(config);
//訂閱主題(告訴客戶端從哪個主題獲取資料)
kafkaConsumer.subscribe(Arrays.asList("test"));
while (true) {
//拉去資料, 會從kafka所有分割槽下拉取資料
ConsumerRecords<String, String> records = kafkaConsumer.poll(2000);
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()) {
ConsumerRecord<String, String> record = iterator.next();
System.out.println("record = " + record);
}
}
//釋放連線
//kafkaConsumer.close();
}
}
我們還可以檢視叢集的狀態:
活躍的分割槽的職責負責讀寫資料,不活躍的分割槽負責同步資料。