架構師之路-如何建立高可用消息中間件kafka
Kafka
一、熟悉kafka
l Server-1 broker其實就是kafka的server,因為producer和consumer都要去連它。Broker主要還是做存儲用。
l Server-2是zookeeper的server端,zookeeper的具體作用你可以去官網查,在這裏你可以先想象,它維持了一張表,記錄了各個節點的IP、端口等信息(以後還會講到,它裏面還存了kafka的相關信息)。
l Server-3、4、5他們的共同之處就是都配置了zkClient,更明確的說,就是運行前必須配置zookeeper的地址,道理也很簡單,這之間的連接都是需要zookeeper來進行分發的。
l Server-1和Server-2的關系,他們可以放在一臺機器上,也可以分開放,zookeeper也可以配集群。目的是防止某一臺掛了。
簡單說下整個系統運行的順序:
1. 啟動zookeeper的server
2. 啟動kafka的server
3. Producer如果生產了數據,會先通過zookeeper找到broker,然後將數據存放進broker
4. Consumer如果要消費數據,會先通過zookeeper找對應的broker,然後消費。
Kafka 分布式消息隊列 類似產品有JBoss、MQ
一、由Linkedln 開源,使用scala開發,有如下幾個特點:
(1)高吞吐
(2)分布式
(3)支持多語言客戶端 (C++、Java)
二、組成: 客戶端是 producer 和 consumer,提供一些API,服務器端是Broker,客戶端提供可以向Broker內發布消息、消費消息,服務器端提供消息的存儲等功能
Kafka 特點是支持分區、分布式、可拓展性強
三、Kafka 的消息分幾個層次
(1)Topic 一類主題
(2)Partition 默認每個消息有2個分區,創建Topic可以指定分區數,1天有 1億行可以分8個分區,如果每天幾十萬行就一個分區吧
(3)Message 是每個消息
四、數據處理流程
1.生產者 生產消息、將消息發布到指定的topic分區
2.kafka 集群接收到producer發過來的消息後,將其持久化到硬盤,可以指定時長,而不關註消息是否被消費
3.consumer從kafka集群pull或push方式,並控制獲取消息的offset偏移量,consumer重啟時需要根據offset開始再次消費數據,consumer自己維護offset
五、kafka如何實現高吞吐量
1.充分利用磁盤的順序讀寫
2.數據批量發送
3.數據壓縮
4.Topic劃分多個partition
六、kafka 如何實現load balance &HA
1)producer 根據用戶指定的算法,將消息發送到指定的partition
2)存在多個partition,每個partition存在多個副本replica,每個replica分布在不同的broker節點上
3)每個partition需要選取lead partition,leader partition負責讀寫,並由zookeeper負責fail over 快速失敗
4)通過zookeeper管理broker與consumer的動態加入與離開
七、擴容
當需要增加broker節點時,新增的broker會向zookeeper註冊,而producer及consumer會根據zookeeper上的watcher感知這些變化,並及時作出調整
副本分配邏輯規則如下:
-
在Kafka集群中,每個Broker都有均等分配Partition的Leader機會。
-
上述圖Broker Partition中,箭頭指向為副本,以Partition-0為例:broker1中parition-0為Leader,Broker2中Partition-0為副本。
-
上述圖種每個Broker(按照BrokerId有序)依次分配主Partition,下一個Broker為副本,如此循環叠代分配,多副本都遵循此規則。
副本分配算法如下:
-
將所有N Broker和待分配的i個Partition排序.
-
將第i個Partition分配到第(i mod n)個Broker上.
-
將第i個Partition的第j個副本分配到第((i + j) mod n)個Broker上.
二、安裝zookeeper,並配置集群
準備三臺機器做集群
服務器 |
IP地址 |
端口 |
服務器1 |
172.16.0.41 |
2181/2881/3881 |
服務器2 |
172.16.0.42 |
2182/2882/3882 |
服務器3 |
172.16.0.43 |
2183/2883/3883 |
2.1配置java環境
將jdk-7u79-linux-x64上傳到三臺服務器安裝配置。
給三臺服務器分別創建java文件夾。
將jdk 放到java文件夾下並解壓,然後刪掉壓縮文件。
配置jdk全局變量。
#vi /etc/profile
export JAVA_HOME=/usr/local/java/jdk1.7.0_79
export JRE_HOME=/usr/local/java/jdk1.7.0_79/jre
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:$CLASSPATH
export PATH=$JAVA_HOME/bin:$PATH
2.2 修改操作系統的/etc/hosts文件,添加IP與主機名映射:
# zookeeper cluster servers
172.16.0.41 edu-zk-01
172.16.0.42 edu-zk-02
172.16.0.43 edu-zk-03
2.3下載zookeeper-3.4.7.tar.gz 到/home/zy/zookeeper目錄
# mkdir -p /usr/local/zookeeper
# cd / usr/local/zookeeper/
# wget http://apache.fayea.com/zookeeper/zookeeper-3.4.7/zookeeper-3.4.7.tar.gz
2.4 解壓zookeeper安裝包,並對節點重民名
#tar -zxvf zookeeper-3.4.7.tar.gz
服務器1:
#mv zookeeper-3.4.7 node-01
服務器2:
#mv zookeeper-3.4.7 node-02
服務器3:
#mv zookeeper-3.4.7 node-03
2.5 在zookeeper的各個節點下 創建數據和日誌目錄
#cd /usr/local/zookeeper
#mkdir data
#mkdir logs
2.6 重命名配置文件
將zookeeper/node-0X/conf目錄下的zoo_sample.cfg文件拷貝一份,命名為zoo.cfg:
#cp zoo_sample.cfg zoo.cfg
2.7 修改zoo.cfg 配置文件
三臺服務器做同樣配置:zookeeper/node-01的配置(/usr/local/zookeeper/node-01/conf/zoo.cfg)如下:
參數說明:
tickTime=2000
tickTime這個時間是作為Zookeeper服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個tickTime時間就會發送一個心跳。
initLimit=10
initLimit這個配置項是用來配置Zookeeper接受客戶端(這裏所說的客戶端不是用戶連接Zookeeper服務器的客戶端,而是Zookeeper服務器集群中連接到Leader的Follower 服務器)初始化連接時最長能忍受多少個心跳時間間隔數。當已經超過10個心跳的時間(也就是tickTime)長度後Zookeeper 服務器還沒有收到客戶端的返回信息,那麽表明這個客戶端連接失敗。總的時間長度就是10*2000=20 秒。
syncLimit=5
syncLimit這個配置項標識Leader與Follower之間發送消息,請求和應答時間長度,最長不能超過多少個tickTime的時間長度,總的時間長度就是5*2000=10秒。
dataDir=/usr/local/zookeeper/node-01/data
dataDir顧名思義就是Zookeeper保存數據的目錄,默認情況下Zookeeper將寫數據的日誌文件也保存在這個目錄裏。
clientPort=2181
clientPort這個端口就是客戶端(應用程序)連接Zookeeper服務器的端口,Zookeeper會監聽這個端口接受客戶端的訪問請求。
server.A=B:C:D
server.1=edu-zk-01:2881:3881
server.2=edu-zk-02:2882:3882
server.3=edu-zk-03:2883:3883
A是一個數字,表示這個是第幾號服務器;
B是這個服務器的IP地址(或者是與IP地址做了映射的主機名);
C第一個端口用來集群成員的信息交換,表示這個服務器與集群中的Leader服務器交換信息的端口;
D是在leader掛掉時專門用來進行選舉leader所用的端口。
註意:如果是偽集群的配置方式,不同的 Zookeeper 實例通信端口號不能一樣,所以要給它們分配不同的端口號。
2.8 創建myid文件
在dataDir=/usr/local/zookeeper/node-0X/data 下創建myid文件
編輯myid文件,並在對應的IP的機器上輸入對應的編號。如在node-01上,myid文件內容就是1,node-02上就是2,node-03上就是3:
#vi /usr/local/zookeeper/node-01/data/myid## 值為1
#vi /usr/local/zookeeper/node-02/data/myid## 值為2
#vi /usr/local/zookeeper/node-03/data/myid## 值為3
2.9 啟動測試zookeeper
(1)進入/usr/local/zookeeper/node-0X/bin目錄下執行:
#/usr/local/zookeeper/node-01/bin/zkServer.sh start
#/usr/local/zookeeper/node-02/bin/zkServer.sh start
#/usr/local/zookeeper/node-03/bin/zkServer.sh start
(2)輸入jps命令查看進程:
其中,QuorumPeerMain是zookeeper進程,說明啟動正常
(3)查看狀態:
# /usr/local/zookeeper/node-01/bin/zkServer.sh status
(4)查看zookeeper服務輸出信息:
由於服務信息輸出文件在/usr/local/zookeeper/node-0X/bin/zookeeper.out
$ tail -500f zookeeper.out
三、KAFKA集群配置
利用安裝zookeeper的三臺服務器做KAFKA集群,也可以新建三個虛擬機去操作。
服務器 |
IP地址 |
端口 |
服務器1 |
172.16.0.41 |
9092 |
服務器2 |
172.16.0.42 |
9092 |
服務器3 |
172.16.0.43 |
9092 |
4.1 下載 kafka_2.9.2-0.8.1
分別在三臺服務器創建kafka目錄並且下載kafka壓縮包
#mkdir /usr/local/kafka
#tar –zxvf kafka_2.9.2-0.8.1.tar.gz
4.2 創建log文件夾
#mkdir /usr/local/kafka/kafkalogs
4.3 配置kafka
#cd /usr/local/kafka/kafka_2.9.2-0.8.1/config
#vi server.properties 修改項如下:
broker.id=0 //當前機器在集群中的唯一標識
port=9092 //kafka對外提供服務的tcp端口
host.name=172.16.0.41 //主機IP地址
log.dirs=/usr/local/kafka/kafkalogs //log存放目錄
message.max.byte=5048576 //kafka一條消息容納的消息最大為多少
default.replication.factor=2 //每個分區默認副本數量
replica.fetch.max.bytes=5048576
zookeeper.connect=172.16.0.41:2181,172.16.0.42:2182,172.16.0.43:2183
4.4 啟動kafka
# ./kafka-server-start.sh -daemon ../config/server.properties //後臺啟動運行
4.5 問題解決
[[email protected] ~]# /export/kafka/bin/kafka-console-producer.sh --broker-list 10.14.2.201:9092,10.14.2.202:9092,10.14.2.203:9092,10.14.2.204:9092 --topic test
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
# /export/kafka/bin/kafka-console-consumer.sh --zookeeper 10.14.2.201:2181,10.14.2.202:2181,10.14.2.203:2181,10.14.2.204:2181 --topic test --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
解決方法:
下載slf4j-1.7.6.zip
http://www.slf4j.org/dist/slf4j-1.7.6.zip
解壓
unzip slf4j-1.7.6.zip
把slf4j-nop-1.7.6.jar 包復制到kafka libs目錄下面
cd slf4j-1.7.6
cp slf4j-nop-1.7.6.jar /export/kafka/libs/
四、KAFKA集群驗證
5.1 創建topic
#./kafka-topics.sh --create --zookeeper 172.16.0.42:2182 --replication-factor 1 --partitions 1 --topic test
5.2 查看topic
# ./kafka-topics.sh --list --zookeeper 172.16.0.42:2182
5.3 開啟發送者並發送消息
#./kafka-console-producer.sh --broker-list 172.16.0.41:9092 --topic test
5.4 開啟消費者並接收消息
#./kafka-console-consumer.sh --zookeeper 172.16.0.42:2182 --topic test --from-beginning
參考內容:從無到有搭建中小型互聯網公司後臺服務架構與運維架構
架構師之路-如何建立高可用消息中間件kafka