Kafka 系列(三)—— Kafka 生產者詳解
一、生產者傳送訊息的過程
首先介紹一下 Kafka 生產者傳送訊息的過程:
- Kafka 會將傳送訊息包裝為 ProducerRecord 物件, ProducerRecord 物件包含了目標主題和要傳送的內容,同時還可以指定鍵和分割槽。在傳送 ProducerRecord 物件前,生產者會先把鍵和值物件序列化成位元組陣列,這樣它們才能夠在網路上傳輸。
- 接下來,資料被傳給分割槽器。如果之前已經在 ProducerRecord 物件裡指定了分割槽,那麼分割槽器就不會再做任何事情。如果沒有指定分割槽 ,那麼分割槽器會根據 ProducerRecord 物件的鍵來選擇一個分割槽,緊接著,這條記錄被新增到一個記錄批次裡,這個批次裡的所有訊息會被髮送到相同的主題和分割槽上。有一個獨立的執行緒負責把這些記錄批次傳送到相應的 broker 上。
- 伺服器在收到這些訊息時會返回一個響應。如果訊息成功寫入 Kafka,就返回一個 RecordMetaData 物件,它包含了主題和分割槽資訊,以及記錄在分割槽裡的偏移量。如果寫入失敗,則會返回一個錯誤。生產者在收到錯誤之後會嘗試重新傳送訊息,如果達到指定的重試次數後還沒有成功,則直接丟擲異常,不再重試。
二、建立生產者
2.1 專案依賴
本專案採用 Maven 構建,想要呼叫 Kafka 生產者 API,需要匯入 kafka-clients
依賴,如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
複製程式碼
2.2 建立生產者
建立 Kafka 生產者時,以下三個屬性是必須指定的:
- bootstrap.servers :指定 broker 的地址清單,清單裡不需要包含所有的 broker 地址,生產者會從給定的 broker 裡查詢 broker 的資訊。不過建議至少要提供兩個 broker 的資訊作為容錯;
- key.serializer :指定鍵的序列化器;
- value.serializer :指定值的序列化器。
建立的示例程式碼如下:
public class SimpleProducer {
public static void main(String[] args) {
String topicName = "Hello-Kafka";
Properties props = new Properties();
props.put("bootstrap.servers","hadoop001:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
/*建立生產者*/
Producer<String,String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String,String> record = new ProducerRecord<>(topicName,"hello" + i,"world" + i);
/* 傳送訊息*/
producer.send(record);
}
/*關閉生產者*/
producer.close();
}
}
複製程式碼
本篇文章的所有示例程式碼可以從 Github 上進行下載:kafka-basis
2.3 測試
1. 啟動Kakfa
Kafka 的執行依賴於 zookeeper,需要預先啟動,可以啟動 Kafka 內建的 zookeeper,也可以啟動自己安裝的:
# zookeeper啟動命令
bin/zkServer.sh start
# 內建zookeeper啟動命令
bin/zookeeper-server-start.sh config/zookeeper.properties
複製程式碼
啟動單節點 kafka 用於測試:
# bin/kafka-server-start.sh config/server.properties
複製程式碼
2. 建立topic
# 建立用於測試主題
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 --partitions 1 \
--topic Hello-Kafka
# 檢視所有主題
bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
複製程式碼
3. 啟動消費者
啟動一個控制檯消費者用於觀察寫入情況,啟動命令如下:
# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic Hello-Kafka --from-beginning
複製程式碼
4. 執行專案
此時可以看到消費者控制檯,輸出如下,這裡 kafka-console-consumer
只會打印出值資訊,不會打印出鍵資訊。
2.4 可能出現的問題
在這裡可能出現的一個問題是:生產者程式在啟動後,一直處於等待狀態。這通常出現在你使用預設配置啟動 Kafka 的情況下,此時需要對 server.properties
檔案中的 listeners
配置進行更改:
# hadoop001 為我啟動kafka服務的主機名,你可以換成自己的主機名或者ip地址
listeners=PLAINTEXT://hadoop001:9092
複製程式碼
二、傳送訊息
上面的示例程式呼叫了 send
方法傳送訊息後沒有做任何操作,在這種情況下,我們沒有辦法知道訊息傳送的結果。想要知道訊息傳送的結果,可以使用同步傳送或者非同步傳送來實現。
2.1 同步傳送
在呼叫 send
方法後可以接著呼叫 get()
方法,send
方法的返回值是一個 Future<RecordMetadata>物件,RecordMetadata 裡麵包含了傳送訊息的主題、分割槽、偏移量等資訊。改寫後的程式碼如下:
for (int i = 0; i < 10; i++) {
try {
ProducerRecord<String,"k" + i,"world" + i);
/*同步傳送訊息*/
RecordMetadata metadata = producer.send(record).get();
System.out.printf("topic=%s,partition=%d,offset=%s \n",metadata.topic(),metadata.partition(),metadata.offset());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
複製程式碼
此時得到的輸出如下:偏移量和呼叫次數有關,所有記錄都分配到了 0 分割槽,這是因為在建立 Hello-Kafka
主題時候,使用 --partitions
指定其分割槽數為 1,即只有一個分割槽。
topic=Hello-Kafka,partition=0,offset=40
topic=Hello-Kafka,offset=41
topic=Hello-Kafka,offset=42
topic=Hello-Kafka,offset=43
topic=Hello-Kafka,offset=44
topic=Hello-Kafka,offset=45
topic=Hello-Kafka,offset=46
topic=Hello-Kafka,offset=47
topic=Hello-Kafka,offset=48
topic=Hello-Kafka,offset=49
複製程式碼
2.2 非同步傳送
通常我們並不關心傳送成功的情況,更多關注的是失敗的情況,因此 Kafka 提供了非同步傳送和回撥函式。 程式碼如下:
for (int i = 0; i < 10; i++) {
ProducerRecord<String,"world" + i);
/*非同步傳送訊息,並監聽回撥*/
producer.send(record,new Callback() {
@Override
public void onCompletion(RecordMetadata metadata,Exception exception) {
if (exception != null) {
System.out.println("進行異常處理");
} else {
System.out.printf("topic=%s,metadata.offset());
}
}
});
}
複製程式碼
三、自定義分割槽器
Kafka 有著預設的分割槽機制:
- 如果鍵值為 null, 則使用輪詢 (Round Robin) 演演算法將訊息均衡地分佈到各個分割槽上;
- 如果鍵值不為 null,那麼 Kafka 會使用內建的雜湊演演算法對鍵進行雜湊,然後分佈到各個分割槽上。
某些情況下,你可能有著自己的分割槽需求,這時候可以採用自定義分割槽器實現。這裡給出一個自定義分割槽器的示例:
3.1 自定義分割槽器
/**
* 自定義分割槽器
*/
public class CustomPartitioner implements Partitioner {
private int passLine;
@Override
public void configure(Map<String,?> configs) {
/*從生產者配置中獲取分數線*/
passLine = (Integer) configs.get("pass.line");
}
@Override
public int partition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster) {
/*key 值為分數,當分數大於分數線時候,分配到 1 分割槽,否則分配到 0 分割槽*/
return (Integer) key >= passLine ? 1 : 0;
}
@Override
public void close() {
System.out.println("分割槽器關閉");
}
}
複製程式碼
需要在建立生產者時指定分割槽器,和分割槽器所需要的配置引數:
public class ProducerWithPartitioner {
public static void main(String[] args) {
String topicName = "Kafka-Partitioner-Test";
Properties props = new Properties();
props.put("bootstrap.servers","org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
/*傳遞自定義分割槽器*/
props.put("partitioner.class","com.heibaiying.producers.partitioners.CustomPartitioner");
/*傳遞分割槽器所需的引數*/
props.put("pass.line",6);
Producer<Integer,String> producer = new KafkaProducer<>(props);
for (int i = 0; i <= 10; i++) {
String score = "score:" + i;
ProducerRecord<Integer,i,score);
/*非同步傳送訊息*/
producer.send(record,(metadata,exception) ->
System.out.printf("%s,\n",score,metadata.partition()));
}
producer.close();
}
}
複製程式碼
3.2 測試
需要建立一個至少有兩個分割槽的主題:
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 --partitions 2 \
--topic Kafka-Partitioner-Test
複製程式碼
此時輸入如下,可以看到分數大於等於 6 分的都被分到 1 分割槽,而小於 6 分的都被分到了 0 分割槽。
score:6,partition=1,score:7,score:8,score:9,score:10,score:0,score:1,score:2,score:3,score:4,score:5,分割槽器關閉
複製程式碼
四、生產者其他屬性
上面生產者的建立都僅指定了服務地址,鍵序列化器、值序列化器,實際上 Kafka 的生產者還有很多可配置屬性,如下:
1. acks
acks 引數指定了必須要有多少個分割槽副本收到訊息,生產者才會認為訊息寫入是成功的:
- acks=0 : 訊息傳送出去就認為已經成功了,不會等待任何來自伺服器的響應;
- acks=1 : 只要叢集的首領節點收到訊息,生產者就會收到一個來自伺服器成功響應;
- acks=all :只有當所有參與複製的節點全部收到訊息時,生產者才會收到一個來自伺服器的成功響應。
2. buffer.memory
設定生產者記憶體緩衝區的大小。
3. compression.type
預設情況下,傳送的訊息不會被壓縮。如果想要進行壓縮,可以配置此引數,可選值有 snappy,gzip,lz4。
4. retries
發生錯誤後,訊息重發的次數。如果達到設定值,生產者就會放棄重試並返回錯誤。
5. batch.size
當有多個訊息需要被髮送到同一個分割槽時,生產者會把它們放在同一個批次裡。該引數指定了一個批次可以使用的記憶體大小,按照位元組數計算。
6. linger.ms
該引數制定了生產者在傳送批次之前等待更多訊息加入批次的時間。
7. clent.id
客戶端 id,伺服器用來識別訊息的來源。
8. max.in.flight.requests.per.connection
指定了生產者在收到伺服器響應之前可以傳送多少個訊息。它的值越高,就會佔用越多的記憶體,不過也會提升吞吐量,把它設定為 1 可以保證訊息是按照傳送的順序寫入伺服器,即使發生了重試。
9. timeout.ms,request.timeout.ms & metadata.fetch.timeout.ms
- timeout.ms 指定了 borker 等待同步副本返回訊息的確認時間;
- request.timeout.ms 指定了生產者在傳送資料時等待伺服器返回響應的時間;
- metadata.fetch.timeout.ms 指定了生產者在獲取元資料(比如分割槽首領是誰)時等待伺服器返回響應的時間。
10. max.block.ms
指定了在呼叫 send()
方法或使用 partitionsFor()
方法獲取元資料時生產者的阻塞時間。當生產者的傳送緩衝區已滿,或者沒有可用的元資料時,這些方法會阻塞。在阻塞時間達到 max.block.ms 時,生產者會丟擲超時異常。
11. max.request.size
該引數用於控制生產者傳送的請求大小。它可以指傳送的單個訊息的最大值,也可以指單個請求裡所有訊息總的大小。例如,假設這個值為 1000K ,那麼可以傳送的單個最大訊息為 1000K ,或者生產者可以在單個請求裡傳送一個批次,該批次包含了 1000 個訊息,每個訊息大小為 1K。
12. receive.buffer.bytes & send.buffer.byte
這兩個引數分別指定 TCP socket 接收和傳送資料包緩衝區的大小,-1 代表使用作業系統的預設值。
參考資料
- Neha Narkhede,Gwen Shapira,Todd Palino(著),薛命燈 (譯) . Kafka 權威指南 . 人民郵電出版社 . 2017-12-26
更多大資料系列文章可以參見 GitHub 開源專案: 大資料入門指南