【Kafka學習筆記】一、Kafka單機搭建(基於kafka_2.11-1.0.0)
阿新 • • 發佈:2020-12-14
技術標籤:Javaspringbootkafkajavakafka
PS:歡迎轉載,但請註明出處,謝謝配合。
Kafka單機搭建(基於kafka_2.11-1.0.0)
一、基礎環境準備(CentOS,docker)
本人作業系統Windows,不想用VMware搭建Linux系統,正好學過Docker,所以用Docker容器實現Linux環境(CentOS)。
如何在Windows上安裝Docker環境,可參考之前的文章 Redis單機搭建(基於redis-6.0.6)
1、搜尋 centos映象
docker search centos
2、拉取 centos映象
docker pull centos
3、啟動 centos 容器
docker run -it -p 2181:2181 -p 9092:9092 --name kafka-single -v D:\DockerContainerSharedFile:/DockerContainerSharedFile centos
exit 退出
docker start kafka-single
docker exec -it kafka-single /bin/bash
二、環境準備(JDK)
1、下載JDK包
登入Oracle官網,選擇對應版本的JDK,這裡選擇 jdk-8u261-linux-x64.tar.gz (jdk8,linux,64位)
(linux檢視系統位數,可以通過命令:uname -a,結果有x86_64,表示是64位)
2、解壓縮JDK包
mkdir -p /usr/java
cp /DockerContainerSharedFile/jdk-8u261-linux-x64.tar.gz /usr/java
cd /usr/java
tar -zxvf jdk-8u261-linux-x64.tar.gz
3、配置環境變數
vi /etc/profile 然後在最下面進行如下設定,其中jdk需要改成自己對應的版本 #set java enviroment JAVA_HOME=/usr/java/jdk1.8.0_261 JRE_HOME=/usr/java/jdk1.8.0_261/jre CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin export JAVA_HOME JRE_HOME CLASS_PATH PATH 設定完成後,退出vim,然後繼續輸入如下指令,重新整理環境變數 source /etc/profile
4、測試是否安裝成功
java -version
三、安裝 Kafka
1、獲取Kafka安裝包(離線方式)
將下載好的Kafka包,先放到共享目錄下,然後執行如下命令:
cp /DockerContainerSharedFile/kafka_2.11-1.0.0.tgz /usr/local/
cd /usr/local/
補充說明:線上安裝,可參考官網命令:wget http://download.redis.io/releases/redis-6.0.6.tar.gz
2、解壓Redis原始碼包
tar -zxf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0
3、修改配置檔案
config/server.properties
listeners=PLAINTEXT://:9092 【#監聽埠】
advertised.listeners=PLAINTEXT://127.0.0.1:9092 【監聽器釋出到Zookeeper供客戶端使用。若不配置,則當專案執行的伺服器和kafka執行的不是同一臺伺服器,會連線不上】
4、啟動Zookeeper伺服器(Kafka內建)
命令1:
bin/zookeeper-server-start.sh config/zookeeper.properties 【退出,則終止】
命令2:(推薦)
nohup bin/zookeeper-server-start.sh config/zookeeper.properties >>zk-temp.out & 【後臺啟動,退出,不終止】
啟動日誌:
[[email protected] kafka_2.11-1.0.0]# bin/zookeeper-server-start.sh config/zookeeper.properties
[2020-09-22 10:00:18,476] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
......
[2020-09-22 10:00:18,551] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
5、啟動Kafka伺服器
nohup bin/kafka-server-start.sh config/server.properties >>kafka-temp.out & 【後臺啟動,退出,不終止】
啟動日誌:
[[email protected] kafka_2.11-1.0.0]# bin/kafka-server-start.sh config/server.properties
[2020-09-22 10:08:16,763] INFO KafkaConfig values:
......
[2020-09-22 10:08:17,716] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
四、使用 Kafka
1、建立一個 topic
建立一個名為“test”的topic,它有一個分割槽和一個副本:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
檢視topic列表命令:
bin/kafka-topics.sh --list --zookeeper localhost:2181
刪除topic命令:
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
執行效果:
[[email protected] kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
[[email protected] kafka_2.11-1.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
test
2、使用producer(生產者)傳送訊息
Kafka自帶一個命令列客戶端,它從檔案或標準輸入中獲取輸入,並將其作為message(訊息)傳送到Kafka叢集。預設情況下,每行將作為單獨的message傳送。
執行 producer,然後在控制檯輸入一些訊息以傳送到伺服器。命令如下:
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>message A
>message B
3、使用consumer(消費者)獲取訊息
Kafka 還有一個命令列consumer(消費者),將訊息轉儲到標準輸出。
# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
message A
message B
4、通過"describe topic"命令檢視叢集情況
# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test 【後面的 --topic test 不加也行,就是輸出的內容會比較多】
以下是對輸出資訊的解釋。第一行給出了所有分割槽的摘要,下面的每行都給出了一個分割槽的資訊。因為我們只有一個分割槽,所以只有一行。
“leader”是負責給定分割槽所有讀寫操作的節點。每個節點都是隨機選擇的部分分割槽的領導者。
“replicas”是複製分割槽日誌的節點列表,不管這些節點是leader還是僅僅活著。
“isr”是一組“同步”replicas,是replicas列表的子集,它活著並被指到leader。
五、補充說明
SpringBoot 整合 Kafka,啟動報錯:Error connecting to node f345f75a2b57:9092
SpringBoot 整合 Kafka ,則 Kakfa 需要包含如下配置內容:
config/server.properties
advertised.listeners=PLAINTEXT://127.0.0.1:9092 【監聽器釋出到Zookeeper供客戶端使用。若不配置,則當專案執行得伺服器和kafka執行的不是同一臺伺服器,會連線不上】
否則,SpringBoot啟動時,會報類似如下錯誤:
16:55:45.260 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-8, groupId=topic.group2] Error connecting to node f345f75a2b57:9092 (id: 0 rack: null)
java.io.IOException: Can't resolve address: f345f75a2b57:9092
六、參考
1、Kafka官網