1. 程式人生 > 其它 >基於zookeeper的kafka中介軟體

基於zookeeper的kafka中介軟體

一、Zookeeper 概述

1、Zookeeper 定義

Zookeeper是一個開源的分散式的,為分散式框架提供協調服務的Apache專案。

2、Zookeeper 工作機制

Zookeeper從設計模式角度來理解:是一個基於觀察者模式設計的分散式服務管理框架,它負責儲存和管理大家都關心的資料,然後接受觀察者的註冊,一旦這些資料的狀態發生變化,Zookeeper就將負責通知已經在Zookeeper上註冊的那些觀察者做出相應的反應。也就是說 Zookeeper = 檔案系統 + 通知機制。

3、Zookeeper 特點

(1)Zookeeper:一個領導者(Leader),多個跟隨者(Follower)組成的叢集。

(2)Zookeepe叢集中只要有半數以上節點存活,Zookeeper叢集就能正常服務。所以Zookeeper適合安裝奇數臺伺服器。

(3)全域性資料一致:每個Server儲存一份相同的資料副本,Client無論連線到哪個Server,資料都是一致的。

(4)更新請求順序執行,來自同一個Client的更新請求按其傳送順序依次執行,即先進先出。

(5)資料更新原子性,一次資料更新要麼成功,要麼失敗。

(6)實時性,在一定時間範圍內,Client能讀到最新資料。

4、Zookeeper 資料結構

ZooKeeper資料模型的結構與Linux檔案系統很類似,整體上可以看作是一棵樹,每個節點稱做一個ZNode。每一個ZNode預設能夠儲存1MB的資料,每個ZNode都可以通過其路徑唯一標識。

5、Zookeeper 應用場景

提供的服務包括:統一命名服務、統一配置管理、統一叢集管理、伺服器節點動態上下線、軟負載均衡等。

●統一命名服務

在分散式環境下,經常需要對應用/服務進行統一命名,便於識別。例如:IP不容易記住,而域名容易記住。

●統一配置管理

(1)分散式環境下,配置檔案同步非常常見。一般要求一個叢集中,所有節點的配置資訊是一致的,比如Kafka叢集。對配置檔案修改後,希望能夠快速同步到各個節點上。

(2)配置管理可交由ZooKeeper實現。可將配置資訊寫入ZooKeeper上的一個Znode。各個客戶端伺服器監聽這個Znode。一旦 Znode中的資料被修改,ZooKeeper將通知各個客戶端伺服器。

●統一叢集管理

(1)分散式環境中,實時掌握每個節點的狀態是必要的。可根據節點實時狀態做出一些調整。

(2)ZooKeeper可以實現實時監控節點狀態變化。可將節點資訊寫入ZooKeeper上的一個ZNode。監聽這個ZNode可獲取它的實時狀態變化。

●伺服器動態上下線

客戶端能實時洞察到伺服器上下線的變化。

●軟負載均衡

在Zookeeper中記錄每臺伺服器的訪問數,讓訪問數最少的伺服器去處理最新的客戶端請求。

6、Zookeeper 選舉機制

●第一次啟動選舉機制

(1)伺服器1啟動,發起一次選舉。伺服器1投自己一票。此時伺服器1票數一票,不夠半數以上(3票),選舉無法完成,伺服器1狀態保持為LOOKING;

(2)伺服器2啟動,再發起一次選舉。伺服器1和2分別投自己一票並交換選票資訊:此時伺服器1發現伺服器2的myid比自己目前投票推舉的(伺服器1)大,更改選票為推舉伺服器2。此時伺服器1票數0票,伺服器2票數2票,沒有半數以上結果,選舉無法完成,伺服器1,2狀態保持LOOKING

(3)伺服器3啟動,發起一次選舉。此時伺服器1和2都會更改選票為伺服器3。此次投票結果:伺服器1為0票,伺服器2為0票,伺服器3為3票。此時伺服器3的票數已經超過半數,伺服器3當選Leader。伺服器1,2更改狀態為FOLLOWING,伺服器3更改狀態為LEADING;

(4)伺服器4啟動,發起一次選舉。此時伺服器1,2,3已經不是LOOKING狀態,不會更改選票資訊。交換選票資訊結果:伺服器3為3票,伺服器4為1票。此時伺服器4服從多數,更改選票資訊為伺服器3,並更改狀態為FOLLOWING;

(5)伺服器5啟動,同4一樣當小弟。

●非第一次啟動選舉機制

(1)當ZooKeeper 叢集中的一臺伺服器出現以下兩種情況之一時,就會開始進入Leader選舉:

1)伺服器初始化啟動。

2)伺服器執行期間無法和Leader保持連線。

(2)而當一臺機器進入Leader選舉流程時,當前叢集也可能會處於以下兩種狀態:

1)叢集中本來就已經存在一個Leader。

對於已經存在Leader的情況,機器試圖去選舉Leader時,會被告知當前伺服器的Leader資訊,對於該機器來說,僅僅需要和 Leader機器建立連線,並進行狀態同步即可。

2)叢集中確實不存在Leader。

假設ZooKeeper由5臺伺服器組成,SID分別為1、2、3、4、5,ZXID分別為8、8、8、7、7,並且此時SID為3的伺服器是Leader。某一時刻,3和5伺服器出現故障,因此開始進行Leader選舉。

選舉Leader規則:

1.EPOCH大的直接勝出

2.EPOCH相同,事務id大的勝出

3.事務id相同,伺服器id大的勝出

-------------------------------------------------------------------------------------
SID:伺服器ID。用來唯一標識一臺ZooKeeper叢集中的機器,每臺機器不能重複,和myid一致。

ZXID:事務ID。ZXID是一個事務ID,用來標識一次伺服器狀態的變更。在某一時刻,叢集中的每臺機器的ZXID值不一定完全一致,這和ZooKeeper伺服器對於客戶端“更新請求”的處理邏輯速度有關。

Epoch:每個Leader任期的代號。沒有Leader時同一輪投票過程中的邏輯時鐘值是相同的。每投完一次票這個資料就會增加

二、部署 Zookeeper 叢集

1、部署Zookeeper叢集的操作步驟

準備 3 臺伺服器做 Zookeeper 叢集

192.168.229.33
192.168.229.77
192.168.229.99

1.1 安裝前準備

//關閉防火牆

systemctl stop firewalld
systemctl disable firewalld
setenforce 0  

//安裝 JDK

yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version  

//下載安裝包
官方下載地址:https://archive.apache.org/dist/zookeeper/

cd /opt
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz  

1.2、安裝 Zookeeper

cd /opt
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7  

1.3 修改配置檔案

cd /usr/local/zookeeper-3.5.7/conf/
cp zoo_sample.cfg zoo.cfg
 
vim zoo.cfg
tickTime=2000 #通訊心跳時間,Zookeeper伺服器與客戶端心跳時間,單位毫秒
initLimit=10 #Leader和Follower初始連線時能容忍的最多心跳數(tickTime的數量),這裡表示為10*2s
syncLimit=5 #Leader和Follower之間同步通訊的超時時間,這裡表示如果超過5*2s,Leader認為Follwer死掉,並從伺服器列表中刪除Follwer
dataDir=/usr/local/zookeeper-3.5.7/data ●修改,指定儲存Zookeeper中的資料的目錄,目錄需要單獨建立
dataLogDir=/usr/local/zookeeper-3.5.7/logs ●新增,指定存放日誌的目錄,目錄需要單獨建立
clientPort=2181 #客戶端連線埠
#新增叢集資訊
server.1=192.168.2.33:3188:3288
server.2=192.168.2.77:3188:3288
server.3=192.168.2.99:3188:3288 

server.A=B:C:D
●A是一個數字,表示這個是第幾號伺服器。叢集模式下需要在zoo.cfg中dataDir指定的目錄下建立一個檔案myid,這個檔案裡面有一個數據就是A的值,Zookeeper啟動時讀取此檔案,拿到裡面的資料與zoo.cfg裡面的配置資訊比較從而判斷到底是哪個server。
●B是這個伺服器的地址。
●C是這個伺服器Follower與叢集中的Leader伺服器交換資訊的埠。
●D是萬一叢集中的Leader伺服器掛了,需要一個埠來重新進行選舉,選出一個新的Leader,而這個埠就是用來執行選舉時伺服器相互通訊的埠。
-------------------------------------------------------------------------------------

1.4 拷貝配置好的 Zookeeper 配置檔案到其他機器上

scp /usr/local/zookeeper-3.5.7/conf/zoo.cfg 192.168.229.50:/usr/local/zookeeper-3.5.7/conf/
scp /usr/local/zookeeper-3.5.7/conf/zoo.cfg 192.168.229.40:/usr/local/zookeeper-3.5.7/conf/

1.5 在每個節點上建立資料目錄和日誌目錄

mkdir /usr/local/zookeeper-3.5.7/data
mkdir /usr/local/zookeeper-3.5.7/logs  

1.6 在每個節點的dataDir指定的目錄下建立一個 myid 的檔案

echo 1 > /usr/local/zookeeper-3.5.7/data/myid
echo 2 > /usr/local/zookeeper-3.5.7/data/myid
echo 3 > /usr/local/zookeeper-3.5.7/data/myid  

1.7 配置 Zookeeper 啟動指令碼

vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/usr/local/zookeeper-3.5.7'
case $1 in
start)
echo "---------- zookeeper 啟動 ------------"
$ZK_HOME/bin/zkServer.sh start
;;
stop)
echo "---------- zookeeper 停止 ------------"
$ZK_HOME/bin/zkServer.sh stop
;;
restart)
echo "---------- zookeeper 重啟 ------------"
$ZK_HOME/bin/zkServer.sh restart
;;
status)
echo "---------- zookeeper 狀態 ------------"
$ZK_HOME/bin/zkServer.sh status
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac   

1.8 設定開機自啟

chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper

1.9 分別啟動 Zookeeper

service zookeeper start   

1.10 檢視當前狀態

service zookeeper status

2、例項操作:部署Zookeeper叢集

2.1 安裝前準備

2.2、安裝 Zookeeper

2.3 修改配置檔案

1.4 拷貝配置好的 Zookeeper 配置檔案到其他機器上

1.5 在每個節點上建立資料目錄和日誌目錄

1.6 在每個節點的dataDir指定的目錄下建立一個 myid 的檔案

1.7 配置 Zookeeper 啟動指令碼

1.8 設定開機自啟並開啟服務

1.9 檢視當前狀態

一個leader,兩個follower

三、Kafka 概述

1、為什麼需要訊息佇列(MQ)

主要原因是由於在高併發環境下,同步請求來不及處理,請求往往會發生阻塞。比如大量的請求併發訪問資料庫,導致行鎖表鎖,最後請求執行緒會堆積過多,從而觸發 too many connection 錯誤,引發雪崩效應。

我們使用訊息佇列,通過非同步處理請求,從而緩解系統的壓力。訊息佇列常應用於非同步處理,流量削峰,應用解耦,訊息通訊等場景。

當前比較常見的 MQ 中介軟體有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。

2、使用訊息佇列的好處

(1)解耦

允許你獨立的擴充套件或修改兩邊的處理過程,只要確保它們遵守同樣的介面約束。

(2)可恢復性

系統的一部分元件失效時,不會影響到整個系統。訊息佇列降低了程序間的耦合度,所以即使一個處理訊息的程序掛掉,加入佇列中的訊息仍然可以在系統恢復後被處理。

(3)緩衝

有助於控制和優化資料流經過系統的速度,解決生產訊息和消費訊息的處理速度不一致的情況。

(4)靈活性 & 峰值處理能力

在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用訊息佇列能夠使關鍵元件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

(5)非同步通訊

很多時候,使用者不想也不需要立即處理訊息。訊息佇列提供了非同步處理機制,允許使用者把一個訊息放入佇列,但並不立即處理它。想向佇列中放入多少訊息就放多少,然後在需要的時候再去處理它們。

3、訊息佇列的兩種模式

(1)點對點模式(一對一,消費者主動拉取資料,訊息收到後訊息清除)

訊息生產者生產訊息傳送到訊息佇列中,然後訊息消費者從訊息佇列中取出並且消費訊息。訊息被消費以後,訊息佇列中不再有儲存,所以訊息消費者不可能消費到已經被消費的訊息。訊息佇列支援存在多個消費者,但是對一個訊息而言,只會有一個消費者可以消費。

(2)釋出/訂閱模式(一對多,又叫觀察者模式,消費者消費資料之後不會清除訊息)

訊息生產者(釋出)將訊息釋出到 topic 中,同時有多個訊息消費者(訂閱)消費該訊息。和點對點方式不同,釋出到 topic 的訊息會被所有訂閱者消費。

釋出/訂閱模式是定義物件間一種一對多的依賴關係,使得每當一個物件(目標物件)的狀態發生改變,則所有依賴於它的物件(觀察者物件)都會得到通知並自動更新。

4、Kafka 定義

Kafka 是一個分散式的基於釋出/訂閱模式的訊息佇列(MQ,Message Queue),主要應用於大資料實時處理領域。

5、Kafka 簡介

Kafka 是最初由 Linkedin 公司開發,是一個分散式、支援分割槽的(partition)、多副本的(replica),基於 Zookeeper 協調的分散式訊息中介軟體系統,它的最大的特性就是可以實時的處理大量資料以滿足各種需求場景,比如基於 hadoop 的批處理系統、低延遲的實時系統、Spark/Flink 流式處理引擎,nginx 訪問日誌,訊息服務等等,用 scala 語言編寫,

Linkedin 於 2010 年貢獻給了 Apache 基金會併成為頂級開源專案。

6、Kafka 的特性

●高吞吐量、低延遲

Kafka 每秒可以處理幾十萬條訊息,它的延遲最低只有幾毫秒。每個 topic 可以分多個 Partition,Consumer Group 對 Partition 進行消費操作,提高負載均衡能力和消費能力。

●可擴充套件性

kafka 叢集支援熱擴充套件

●永續性、可靠性

訊息被持久化到本地磁碟,並且支援資料備份防止資料丟失

●容錯性

允許叢集中節點失敗(多副本情況下,若副本數量為 n,則允許 n-1 個節點失敗)

●高併發

支援數千個客戶端同時讀寫

7、Kafka 系統架構

(1)Broker

一臺 kafka 伺服器就是一個 broker。一個叢集由多個 broker 組成。一個 broker 可以容納多個 topic。

(2)Topic

可以理解為一個佇列,生產者和消費者面向的都是一個 topic。

類似於資料庫的表名或者 ES 的 index

物理上不同 topic 的訊息分開儲存

(3)Partition

為了實現擴充套件性,一個非常大的 topic 可以分佈到多個 broker(即伺服器)上,一個 topic 可以分割為一個或多個 partition,每個 partition 是一個有序的佇列。Kafka 只保證 partition 內的記錄是有序的,而不保證 topic 中不同 partition 的順序。

每個 topic 至少有一個 partition,當生產者產生資料的時候,會根據分配策略選擇分割槽,然後將訊息追加到指定的分割槽的佇列末尾。

Partation 資料路由規則:

  • 指定了 patition,則直接使用;
  • 未指定 patition 但指定 key(相當於訊息中某個屬性),通過對 key 的 value 進行 hash 取模,選出一個 patition;
  • patition 和 key 都未指定,使用輪詢選出一個 patition。

每條訊息都會有一個自增的編號,用於標識訊息的偏移量,標識順序從 0 開始。

每個 partition 中的資料使用多個 segment 檔案儲存。

如果 topic 有多個 partition,消費資料時就不能保證資料的順序。嚴格保證訊息的消費順序的場景下(例如商品秒殺、 搶紅包),需要將 partition 數目設為 1。

●broker 儲存 topic 的資料。如果某 topic 有 N 個 partition,叢集有 N 個 broker,那麼每個 broker 儲存該 topic 的一個 partition。
●如果某 topic 有 N 個 partition,叢集有 (N+M) 個 broker,那麼其中有 N 個 broker 儲存 topic 的一個 partition, 剩下的 M 個 broker 不儲存該 topic 的 partition 資料。
●如果某 topic 有 N 個 partition,叢集中 broker 數目少於 N 個,那麼一個 broker 儲存該 topic 的一個或多個 partition。在實際生產環境中,儘量避免這種情況的發生,這種情況容易導致 Kafka 叢集資料不均衡。

分割槽的原因

●方便在叢集中擴充套件,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個叢集就可以適應任意大小的資料了;

●可以提高併發,因為可以以Partition為單位讀寫了。

(4)Replica

副本,為保證叢集中的某個節點發生故障時,該節點上的 partition 資料不丟失,且 kafka 仍然能夠繼續工作,kafka 提供了副本機制,一個 topic 的每個分割槽都有若干個副本,一個 leader 和若干個 follower。

(5)Leader

每個 partition 有多個副本,其中有且僅有一個作為 Leader,Leader 是當前負責資料的讀寫的 partition。

(6)Follower

Follower 跟隨 Leader,所有寫請求都通過 Leader 路由,資料變更會廣播給所有 Follower,Follower 與 Leader 保持資料同步。Follower 只負責備份,不負責資料的讀寫。
如果 Leader 故障,則從 Follower 中選舉出一個新的 Leader。
當 Follower 掛掉、卡住或者同步太慢,Leader 會把這個 Follower 從 ISR(Leader 維護的一個和 Leader 保持同步的 Follower 集合) 列表中刪除,重新建立一個 Follower。

(7)Producer

生產者即資料的釋出者,該角色將訊息釋出到 Kafka 的 topic 中。

broker 接收到生產者傳送的訊息後,broker 將該訊息追加到當前用於追加資料的 segment 檔案中。

生產者傳送的訊息,儲存到一個 partition 中,生產者也可以指定資料儲存的 partition。

(8)Consumer

消費者可以從 broker 中讀取資料。消費者可以消費多個 topic 中的資料。

(9)Consumer Group(CG)

消費者組,由多個 consumer 組成。

所有的消費者都屬於某個消費者組,即消費者組是邏輯上的一個訂閱者。可為每個消費者指定組名,若不指定組名則屬於預設的組。

將多個消費者集中到一起去處理某一個 Topic 的資料,可以更快的提高資料的消費能力。

消費者組內每個消費者負責消費不同分割槽的資料,一個分割槽只能由一個組內消費者消費,防止資料被重複讀取。

消費者組之間互不影響。

(10)offset 偏移量

可以唯一的標識一條訊息。

偏移量決定讀取資料的位置,不會有執行緒安全的問題,消費者通過偏移量來決定下次讀取的訊息(即消費位置)。

訊息被消費之後,並不被馬上刪除,這樣多個業務就可以重複使用 Kafka 的訊息。

某一個業務也可以通過修改偏移量達到重新讀取訊息的目的,偏移量由使用者控制。

訊息最終還是會被刪除的,預設生命週期為 1 周(7*24小時)。

(11)Zookeeper

Kafka 通過 Zookeeper 來儲存叢集的 meta 資訊。

由於 consumer 在消費過程中可能會出現斷電宕機等故障,consumer 恢復後,需要從故障前的位置的繼續消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復後繼續消費。

Kafka 0.9 版本之前,consumer 預設將 offset 儲存在 Zookeeper 中;從 0.9 版本開始,consumer 預設將 offset 儲存在 Kafka 一個內建的 topic 中,該 topic 為 __consumer_offsets。

四、部署zookeeper + kafka 叢集

1、部署zookeeper + kafka 叢集

1.1 下載安裝包

官方下載地址:http://kafka.apache.org/downloads.html

cd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz 

1.2.安裝 Kafka

cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka 

1.3 修改配置檔案

cd /usr/local/kafka/config/
cp server.properties{,.bak}
 
vim server.properties
broker.id=0                                    #21行,broker的全域性唯一編號,每個broker不能重複,因此要在其他機器上配置 broker.id=1、broker.id=2
listeners=PLAINTEXT://192.168.2.33:9092       #31行,指定監聽的IP和埠,如果修改每個broker的IP需區分開來,也可保持預設配置不用修改
num.network.threads=3                          #42行,broker 處理網路請求的執行緒數量,一般情況下不需要去修改
num.io.threads=8                               #45行,用來處理磁碟IO的執行緒數量,數值應該大於硬碟數
socket.send.buffer.bytes=102400                #48行,傳送套接字的緩衝區大小
socket.receive.buffer.bytes=102400             #51行,接收套接字的緩衝區大小
socket.request.max.bytes=104857600             #54行,請求套接字的緩衝區大小
log.dirs=/usr/local/kafka/logs                 #60行,kafka執行日誌存放的路徑,也是資料存放的路徑
num.partitions=1                               #65行,topic在當前broker上的預設分割槽個數,會被topic建立時的指定引數覆蓋
num.recovery.threads.per.data.dir=1            #69行,用來恢復和清理data下資料的執行緒數量
log.retention.hours=168                        #103行,segment檔案(資料檔案)保留的最長時間,單位為小時,預設為7天,超時將被刪除
log.segment.bytes=1073741824                   #110行,一個segment檔案最大的大小,預設為 1G,超出將新建一個新的segment檔案
zookeeper.connect=192.168.2.33:2181,192.168.2.77:2181,192.168.2.99:2181                                   #123行,配置連線Zookeeper叢集地址

1.4 修改環境變數

vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
 
source /etc/profile 

1.5 配置 Zookeeper 啟動指令碼

vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)
echo "---------- Kafka 啟動 ------------"
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo "---------- Kafka 停止 ------------"
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
$0 start
;;
status)
echo "---------- Kafka 狀態 ------------"
count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
if [ "$count" -eq 0 ];then
echo "kafka is not running"
else
echo "kafka is running"
fi
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac  

1.6 設定開機自啟

chmod +x /etc/init.d/kafka
chkconfig --add kafka 

1.7 分別啟動 Kafka

service kafka start  

1.8 Kafka 命令列操作

//建立topic

kafka-topics.sh --create --zookeeper 192.168.2.33:2181,192.168.23.77:2181,192.168.2.99:2181 --replication-factor 2 --partitions 3 --topic test
 
-------------------------------------------------------------------------------------
--zookeeper:定義 zookeeper 叢集伺服器地址,如果有多個 IP 地址使用逗號分割,一般使用一個 IP 即可
--replication-factor:定義分割槽副本數,1 代表單副本,建議為 2
--partitions:定義分割槽數
--topic:定義 topic 名稱
------------------------------------ 

//檢視當前伺服器中的所有 topic

kafka-topics.sh --list --zookeeper 192.168.2.33:2181,192.168.2.77:2181,192.168.2.99:2181 

//檢視某個 topic 的詳情

kafka-topics.sh --describe --zookeeper 192.168.2.33:2181,192.168.2.77:2181,192.168.2.99:2181  

//釋出訊息

kafka-console-producer.sh --broker-list 192.168.2.33:9092,192.168.2.77:9092,192.168.2.99:9092 --topic test  

//消費訊息

kafka-console-consumer.sh --bootstrap-server 192.168.2.33:9092,192.168.2.77:9092,192.168.2.99:9092 --topic test --from-beginning
 
-------------------------------------------------------------------------------------
--from-beginning:會把主題中以往所有的資料都讀取出來
-------------------------------------------------------------------------------------  

//修改分割槽數

kafka-topics.sh --zookeeper 192.168.2.33:2181,192.168.2.77:2181,192.168.2.99:2181 --alter --topic test  --partitions 6  

//刪除 topic

kafka-topics.sh --delete --zookeeper 192.168.2.33:2181,192.168.2.77:2181,192.168.2.99:2181 --topic  test

2、例項操作:部署zookeeper + kafka 叢集

1.1 安裝zookeeper叢集

詳見本篇部落格上文,接著上面的實驗繼續做(在所有叢集伺服器操作)

1.2 下載安裝包並安裝kafka

1.3 修改配置檔案

1.4 修改環境變數

1.5 配置 kafka 啟動指令碼並設定開機自啟,啟動kafka

1.6 Kafka 命令列操作

建立topic並檢視

釋出訊息並讀取訊息

修改分割槽數


刪除 topic

五、部署 Filebeat+Kafka+ELK

1、部署 Filebeat+Kafka+ELK的操作步驟

1.1.部署 Zookeeper+Kafka 叢集

見上文,接著上面的實驗做的

1.2、部署 Filebeat

要搭建ELK,詳見之前的部落格

cd /usr/local/filebeat
 
vim filebeat.yml
filebeat.prospectors:
- type: log
enabled: true
paths:
- /var/log/messages
- /var/log/*.log
......
#新增輸出到 Kafka 的配置
output.kafka:
enabled: true
hosts: ["192.168.2。33:9092","192.168.2.77:9092","192.168.2.99:9092"] #指定 Kafka 叢集配置
topic: "filebeat_test" #指定 Kafka 的 topic
 
#啟動 filebeat
./filebeat -e -c filebeat.yml  

1.3.部署 ELK,在 Logstash 元件所在節點上新建一個 Logstash 配置檔案

cd /etc/logstash/conf.d/
 
vim filebeat.conf
input {
kafka {
bootstrap_servers => "192.168.229.60:9092,192.168.229.50:9092,192.168.229.40:9092"
topics => "filebeat_test"
group_id => "test123"
auto_offset_reset => "earliest"
}
}
 
output {
elasticsearch {
hosts => ["192.168.229.70:9200"]
index => "filebeat_test-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}  

1.4 啟動 logstash

logstash -f filebeat.conf  

1.5 瀏覽器訪問測試

瀏覽器訪問 http://192.168.2.22:5601 登入 Kibana,單擊“Create Index Pattern”按鈕新增索引“filebeat_test-*”,單擊 “create” 按鈕建立,單擊 “Discover” 按鈕可檢視圖表資訊及日誌資訊。