1. 程式人生 > 其它 >【Kafka學習筆記】一、Kafka單機搭建(基於kafka_2.11-1.0.0)

【Kafka學習筆記】一、Kafka單機搭建(基於kafka_2.11-1.0.0)

技術標籤:Javaspringbootkafkajavakafka

PS:歡迎轉載,但請註明出處,謝謝配合。

Kafka單機搭建(基於kafka_2.11-1.0.0)

一、基礎環境準備(CentOS,docker)

本人作業系統Windows,不想用VMware搭建Linux系統,正好學過Docker,所以用Docker容器實現Linux環境(CentOS)。
如何在Windows上安裝Docker環境,可參考之前的文章 Redis單機搭建(基於redis-6.0.6)

1、搜尋 centos映象

docker search centos

2、拉取 centos映象

docker pull centos

3、啟動 centos 容器

docker run -it -p 2181:2181 -p 9092:9092 --name kafka-single -v D:\DockerContainerSharedFile:/DockerContainerSharedFile centos
exit 退出
docker start kafka-single
docker exec -it kafka-single /bin/bash

二、環境準備(JDK)

1、下載JDK包


登入Oracle官網,選擇對應版本的JDK,這裡選擇 jdk-8u261-linux-x64.tar.gz (jdk8,linux,64位)
(linux檢視系統位數,可以通過命令:uname -a,結果有x86_64,表示是64位)

2、解壓縮JDK包

mkdir -p /usr/java
cp /DockerContainerSharedFile/jdk-8u261-linux-x64.tar.gz /usr/java
cd /usr/java
tar -zxvf jdk-8u261-linux-x64.tar.gz

3、配置環境變數

vi /etc/profile

然後在最下面進行如下設定,其中jdk需要改成自己對應的版本
#set java enviroment
JAVA_HOME=/usr/java/jdk1.8.0_261
JRE_HOME=/usr/java/jdk1.8.0_261/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH PATH

設定完成後,退出vim,然後繼續輸入如下指令,重新整理環境變數
source /etc/profile

4、測試是否安裝成功

java -version

在這裡插入圖片描述

三、安裝 Kafka

1、獲取Kafka安裝包(離線方式)

將下載好的Kafka包,先放到共享目錄下,然後執行如下命令:    
cp /DockerContainerSharedFile/kafka_2.11-1.0.0.tgz /usr/local/    
cd /usr/local/    
補充說明:線上安裝,可參考官網命令:wget http://download.redis.io/releases/redis-6.0.6.tar.gz

2、解壓Redis原始碼包

tar -zxf kafka_2.11-1.0.0.tgz          
cd kafka_2.11-1.0.0

3、修改配置檔案

config/server.properties
	listeners=PLAINTEXT://:9092     【#監聽埠】
	advertised.listeners=PLAINTEXT://127.0.0.1:9092 【監聽器釋出到Zookeeper供客戶端使用。若不配置,則當專案執行的伺服器和kafka執行的不是同一臺伺服器,會連線不上】

4、啟動Zookeeper伺服器(Kafka內建)

命令1:
bin/zookeeper-server-start.sh config/zookeeper.properties    【退出,則終止】
命令2:(推薦)
nohup bin/zookeeper-server-start.sh config/zookeeper.properties >>zk-temp.out &    【後臺啟動,退出,不終止】

啟動日誌:
[[email protected] kafka_2.11-1.0.0]# bin/zookeeper-server-start.sh config/zookeeper.properties
[2020-09-22 10:00:18,476] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
......
[2020-09-22 10:00:18,551] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

5、啟動Kafka伺服器

nohup bin/kafka-server-start.sh config/server.properties >>kafka-temp.out &    【後臺啟動,退出,不終止】

啟動日誌:
[[email protected] kafka_2.11-1.0.0]# bin/kafka-server-start.sh config/server.properties
[2020-09-22 10:08:16,763] INFO KafkaConfig values:
......
[2020-09-22 10:08:17,716] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

四、使用 Kafka

1、建立一個 topic

建立一個名為“test”的topic,它有一個分割槽和一個副本:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

檢視topic列表命令:

 bin/kafka-topics.sh --list --zookeeper localhost:2181

刪除topic命令:

 bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

執行效果:

[[email protected] kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
[[email protected] kafka_2.11-1.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
test

2、使用producer(生產者)傳送訊息
Kafka自帶一個命令列客戶端,它從檔案或標準輸入中獲取輸入,並將其作為message(訊息)傳送到Kafka叢集。預設情況下,每行將作為單獨的message傳送。
執行 producer,然後在控制檯輸入一些訊息以傳送到伺服器。命令如下:

# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>message A
>message B

3、使用consumer(消費者)獲取訊息
Kafka 還有一個命令列consumer(消費者),將訊息轉儲到標準輸出。

# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
message A
message B

4、通過"describe topic"命令檢視叢集情況

# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test 【後面的 --topic test 不加也行,就是輸出的內容會比較多】

在這裡插入圖片描述

以下是對輸出資訊的解釋。第一行給出了所有分割槽的摘要,下面的每行都給出了一個分割槽的資訊。因為我們只有一個分割槽,所以只有一行。

“leader”是負責給定分割槽所有讀寫操作的節點。每個節點都是隨機選擇的部分分割槽的領導者。
“replicas”是複製分割槽日誌的節點列表,不管這些節點是leader還是僅僅活著。
“isr”是一組“同步”replicas,是replicas列表的子集,它活著並被指到leader。

五、補充說明

SpringBoot 整合 Kafka,啟動報錯:Error connecting to node f345f75a2b57:9092

SpringBoot 整合 Kafka ,則 Kakfa 需要包含如下配置內容:

config/server.properties
	advertised.listeners=PLAINTEXT://127.0.0.1:9092   【監聽器釋出到Zookeeper供客戶端使用。若不配置,則當專案執行得伺服器和kafka執行的不是同一臺伺服器,會連線不上】 

否則,SpringBoot啟動時,會報類似如下錯誤:

16:55:45.260 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-8, groupId=topic.group2] Error connecting to node f345f75a2b57:9092 (id: 0 rack: null)
	java.io.IOException: Can't resolve address: f345f75a2b57:9092

六、參考

1、Kafka官網