1. 程式人生 > 程式設計 >Kafka 系列(三)—— Kafka 生產者詳解

Kafka 系列(三)—— Kafka 生產者詳解

一、生產者傳送訊息的過程

首先介紹一下 Kafka 生產者傳送訊息的過程:

  • Kafka 會將傳送訊息包裝為 ProducerRecord 物件, ProducerRecord 物件包含了目標主題和要傳送的內容,同時還可以指定鍵和分割槽。在傳送 ProducerRecord 物件前,生產者會先把鍵和值物件序列化成位元組陣列,這樣它們才能夠在網路上傳輸。
  • 接下來,資料被傳給分割槽器。如果之前已經在 ProducerRecord 物件裡指定了分割槽,那麼分割槽器就不會再做任何事情。如果沒有指定分割槽 ,那麼分割槽器會根據 ProducerRecord 物件的鍵來選擇一個分割槽,緊接著,這條記錄被新增到一個記錄批次裡,這個批次裡的所有訊息會被髮送到相同的主題和分割槽上。有一個獨立的執行緒負責把這些記錄批次傳送到相應的 broker 上。
  • 伺服器在收到這些訊息時會返回一個響應。如果訊息成功寫入 Kafka,就返回一個 RecordMetaData 物件,它包含了主題和分割槽資訊,以及記錄在分割槽裡的偏移量。如果寫入失敗,則會返回一個錯誤。生產者在收到錯誤之後會嘗試重新傳送訊息,如果達到指定的重試次數後還沒有成功,則直接丟擲異常,不再重試。

https://github.com/heibaiying

二、建立生產者

2.1 專案依賴

本專案採用 Maven 構建,想要呼叫 Kafka 生產者 API,需要匯入 kafka-clients 依賴,如下:

<dependency>
    <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId> <version>2.2.0</version> </dependency> 複製程式碼

2.2 建立生產者

建立 Kafka 生產者時,以下三個屬性是必須指定的:

  • bootstrap.servers :指定 broker 的地址清單,清單裡不需要包含所有的 broker 地址,生產者會從給定的 broker 裡查詢 broker 的資訊。不過建議至少要提供兩個 broker 的資訊作為容錯;
  • key.serializer :指定鍵的序列化器;
  • value.serializer :指定值的序列化器。

建立的示例程式碼如下:

public class SimpleProducer {

    public static void main(String[] args) {

        String topicName = "Hello-Kafka";

        Properties props = new Properties();
        props.put("bootstrap.servers","hadoop001:9092");
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        /*建立生產者*/
        Producer<String,String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            ProducerRecord<String,String> record = new ProducerRecord<>(topicName,"hello" + i,"world" + i);
            /* 傳送訊息*/
            producer.send(record);
        }
        /*關閉生產者*/
        producer.close();
    }
}
複製程式碼

本篇文章的所有示例程式碼可以從 Github 上進行下載:kafka-basis

2.3 測試

1. 啟動Kakfa

Kafka 的執行依賴於 zookeeper,需要預先啟動,可以啟動 Kafka 內建的 zookeeper,也可以啟動自己安裝的:

# zookeeper啟動命令
bin/zkServer.sh start

# 內建zookeeper啟動命令
bin/zookeeper-server-start.sh config/zookeeper.properties
複製程式碼

啟動單節點 kafka 用於測試:

# bin/kafka-server-start.sh config/server.properties
複製程式碼

2. 建立topic

# 建立用於測試主題
bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                     --replication-factor 1 --partitions 1 \
                     --topic Hello-Kafka

# 檢視所有主題
 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
複製程式碼

3. 啟動消費者

啟動一個控制檯消費者用於觀察寫入情況,啟動命令如下:

# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic Hello-Kafka --from-beginning
複製程式碼

4. 執行專案

此時可以看到消費者控制檯,輸出如下,這裡 kafka-console-consumer 只會打印出值資訊,不會打印出鍵資訊。

https://github.com/heibaiying

2.4 可能出現的問題

在這裡可能出現的一個問題是:生產者程式在啟動後,一直處於等待狀態。這通常出現在你使用預設配置啟動 Kafka 的情況下,此時需要對 server.properties 檔案中的 listeners 配置進行更改:

# hadoop001 為我啟動kafka服務的主機名,你可以換成自己的主機名或者ip地址
listeners=PLAINTEXT://hadoop001:9092
複製程式碼

二、傳送訊息

上面的示例程式呼叫了 send 方法傳送訊息後沒有做任何操作,在這種情況下,我們沒有辦法知道訊息傳送的結果。想要知道訊息傳送的結果,可以使用同步傳送或者非同步傳送來實現。

2.1 同步傳送

在呼叫 send 方法後可以接著呼叫 get() 方法,send 方法的返回值是一個 Future<RecordMetadata>物件,RecordMetadata 裡麵包含了傳送訊息的主題、分割槽、偏移量等資訊。改寫後的程式碼如下:

for (int i = 0; i < 10; i++) {
    try {
        ProducerRecord<String,"k" + i,"world" + i);
        /*同步傳送訊息*/
        RecordMetadata metadata = producer.send(record).get();
        System.out.printf("topic=%s,partition=%d,offset=%s \n",metadata.topic(),metadata.partition(),metadata.offset());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}
複製程式碼

此時得到的輸出如下:偏移量和呼叫次數有關,所有記錄都分配到了 0 分割槽,這是因為在建立 Hello-Kafka 主題時候,使用 --partitions 指定其分割槽數為 1,即只有一個分割槽。

topic=Hello-Kafka,partition=0,offset=40 
topic=Hello-Kafka,offset=41 
topic=Hello-Kafka,offset=42 
topic=Hello-Kafka,offset=43 
topic=Hello-Kafka,offset=44 
topic=Hello-Kafka,offset=45 
topic=Hello-Kafka,offset=46 
topic=Hello-Kafka,offset=47 
topic=Hello-Kafka,offset=48 
topic=Hello-Kafka,offset=49 
複製程式碼

2.2 非同步傳送

通常我們並不關心傳送成功的情況,更多關注的是失敗的情況,因此 Kafka 提供了非同步傳送和回撥函式。 程式碼如下:

for (int i = 0; i < 10; i++) {
    ProducerRecord<String,"world" + i);
    /*非同步傳送訊息,並監聽回撥*/
    producer.send(record,new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata,Exception exception) {
            if (exception != null) {
                System.out.println("進行異常處理");
            } else {
                System.out.printf("topic=%s,metadata.offset());
            }
        }
    });
}
複製程式碼

三、自定義分割槽器

Kafka 有著預設的分割槽機制:

  • 如果鍵值為 null, 則使用輪詢 (Round Robin) 演演算法將訊息均衡地分佈到各個分割槽上;
  • 如果鍵值不為 null,那麼 Kafka 會使用內建的雜湊演演算法對鍵進行雜湊,然後分佈到各個分割槽上。

某些情況下,你可能有著自己的分割槽需求,這時候可以採用自定義分割槽器實現。這裡給出一個自定義分割槽器的示例:

3.1 自定義分割槽器

/**
 * 自定義分割槽器
 */
public class CustomPartitioner implements Partitioner {

    private int passLine;

    @Override
    public void configure(Map<String,?> configs) {
        /*從生產者配置中獲取分數線*/
        passLine = (Integer) configs.get("pass.line");
    }

    @Override
    public int partition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster) {
        /*key 值為分數,當分數大於分數線時候,分配到 1 分割槽,否則分配到 0 分割槽*/
        return (Integer) key >= passLine ? 1 : 0;
    }

    @Override
    public void close() {
        System.out.println("分割槽器關閉");
    }
}
複製程式碼

需要在建立生產者時指定分割槽器,和分割槽器所需要的配置引數:

public class ProducerWithPartitioner {

    public static void main(String[] args) {

        String topicName = "Kafka-Partitioner-Test";

        Properties props = new Properties();
        props.put("bootstrap.servers","org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        /*傳遞自定義分割槽器*/
        props.put("partitioner.class","com.heibaiying.producers.partitioners.CustomPartitioner");
        /*傳遞分割槽器所需的引數*/
        props.put("pass.line",6);

        Producer<Integer,String> producer = new KafkaProducer<>(props);

        for (int i = 0; i <= 10; i++) {
            String score = "score:" + i;
            ProducerRecord<Integer,i,score);
            /*非同步傳送訊息*/
            producer.send(record,(metadata,exception) ->
                    System.out.printf("%s,\n",score,metadata.partition()));
        }

        producer.close();
    }
}
複製程式碼

3.2 測試

需要建立一個至少有兩個分割槽的主題:

 bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                     --replication-factor 1 --partitions 2 \
                     --topic Kafka-Partitioner-Test
複製程式碼

此時輸入如下,可以看到分數大於等於 6 分的都被分到 1 分割槽,而小於 6 分的都被分到了 0 分割槽。

score:6,partition=1,score:7,score:8,score:9,score:10,score:0,score:1,score:2,score:3,score:4,score:5,分割槽器關閉
複製程式碼

四、生產者其他屬性

上面生產者的建立都僅指定了服務地址,鍵序列化器、值序列化器,實際上 Kafka 的生產者還有很多可配置屬性,如下:

1. acks

acks 引數指定了必須要有多少個分割槽副本收到訊息,生產者才會認為訊息寫入是成功的:

  • acks=0 : 訊息傳送出去就認為已經成功了,不會等待任何來自伺服器的響應;
  • acks=1 : 只要叢集的首領節點收到訊息,生產者就會收到一個來自伺服器成功響應;
  • acks=all :只有當所有參與複製的節點全部收到訊息時,生產者才會收到一個來自伺服器的成功響應。

2. buffer.memory

設定生產者記憶體緩衝區的大小。

3. compression.type

預設情況下,傳送的訊息不會被壓縮。如果想要進行壓縮,可以配置此引數,可選值有 snappy,gzip,lz4。

4. retries

發生錯誤後,訊息重發的次數。如果達到設定值,生產者就會放棄重試並返回錯誤。

5. batch.size

當有多個訊息需要被髮送到同一個分割槽時,生產者會把它們放在同一個批次裡。該引數指定了一個批次可以使用的記憶體大小,按照位元組數計算。

6. linger.ms

該引數制定了生產者在傳送批次之前等待更多訊息加入批次的時間。

7. clent.id

客戶端 id,伺服器用來識別訊息的來源。

8. max.in.flight.requests.per.connection

指定了生產者在收到伺服器響應之前可以傳送多少個訊息。它的值越高,就會佔用越多的記憶體,不過也會提升吞吐量,把它設定為 1 可以保證訊息是按照傳送的順序寫入伺服器,即使發生了重試。

9. timeout.ms,request.timeout.ms & metadata.fetch.timeout.ms

  • timeout.ms 指定了 borker 等待同步副本返回訊息的確認時間;
  • request.timeout.ms 指定了生產者在傳送資料時等待伺服器返回響應的時間;
  • metadata.fetch.timeout.ms 指定了生產者在獲取元資料(比如分割槽首領是誰)時等待伺服器返回響應的時間。

10. max.block.ms

指定了在呼叫 send() 方法或使用 partitionsFor() 方法獲取元資料時生產者的阻塞時間。當生產者的傳送緩衝區已滿,或者沒有可用的元資料時,這些方法會阻塞。在阻塞時間達到 max.block.ms 時,生產者會丟擲超時異常。

11. max.request.size

該引數用於控制生產者傳送的請求大小。它可以指傳送的單個訊息的最大值,也可以指單個請求裡所有訊息總的大小。例如,假設這個值為 1000K ,那麼可以傳送的單個最大訊息為 1000K ,或者生產者可以在單個請求裡傳送一個批次,該批次包含了 1000 個訊息,每個訊息大小為 1K。

12. receive.buffer.bytes & send.buffer.byte

這兩個引數分別指定 TCP socket 接收和傳送資料包緩衝區的大小,-1 代表使用作業系統的預設值。

參考資料

  1. Neha Narkhede,Gwen Shapira,Todd Palino(著),薛命燈 (譯) . Kafka 權威指南 . 人民郵電出版社 . 2017-12-26

更多大資料系列文章可以參見 GitHub 開源專案大資料入門指南