Kafka學習筆記4--Kafka生產者的客戶端(PHP)開發
阿新 • • 發佈:2020-01-01
一、準備工作
雖然 Kafka 是用 Java/Scala 語言編寫的,但這不妨礙它對多語言的支援。可以在 Kafka 官網的 CLIENTS 檢視 Kafka 支援的語言,其中包括 C/C++、Python、Go 等語言。
PHP 操作 Kafka 需要安裝 librdkafka 庫和 kafka 的 PHP 擴充套件。
1.安裝 librdkafka 庫
git clone https://github.com/edenhill/librdkafka.git
./configure
make
sudo make install
2.安裝 php-kafka 擴充套件
$ git clone https://github.com/arnaud-lb/php-rdkafka.git $ cd librdkafka/ $ phpize $ ./configure $ make $ sudo make install #在php.ini 檔案中配置 rdkafka擴充套件 extension=rdkafka.so #檢視擴充套件是否生效 php -m | grep kafka
二、程式碼實現
demo 來源於 https://github.com/arnaud-lb/php-rdkafka#examples
正常的生產邏輯如下:
1.配置生產者客戶端引數及建立相應的生產者例項;
/** * Create a producer */ $conf = new RdKafka\Conf(); $conf->set('log_level', LOG_DEBUG); //$conf->set('debug', 'all'); $rk = new RdKafka\Producer($conf); $rk->addBrokers("127.0.0.1");
2.構建主題;
/**
* Create a topic instance from the producer
*/
$topic = $rk->newTopic("test");
3.傳送訊息;
/** * Producing messages * The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition. * 第一個引數是分割槽,RD_KAFKA_PARTITION_UA 表示未分配,並且由 librdkafka 選擇分割槽。 * The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue. * 第二個引數是訊息標誌,為 0 或 RD_KAFKA_MSG_F_BLOCK,當佇列滿了時阻止生產訊息。 * The message payload can be anything. * 訊息可以是任何內容。 */ $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");
4.關閉生產者例項。
/**
* Proper shutdown
* This should be done prior to destroying a producer instance
* to make sure all queued and in-flight produce requests are completed before terminating.
* 關閉生產者例項前需確保所有在佇列中和正在生產的生產請求都已完成。
* Not calling flush can lead to message loss!
* 不呼叫flush會導致訊息丟失!
*/
$timeout_ms = 60000; // 1 minute
$rk->flush($timeout_ms);
檢驗訊息是否傳送成功
終端開啟一個消費者:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
在另一個視窗執行 php producer.php,
可看到消費者終端接收到訊息。
完整程式碼如下:
<?php
/**
* Created by PhpStorm.
* User: liulu
* Date: 2020/1/1
* Time: 18:38
*/
/**
* Create a producer
*/
$conf = new RdKafka\Conf();
$conf->set('log_level', LOG_DEBUG);
//$conf->set('debug', 'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("127.0.0.1");
/**
* Create a topic instance from the producer
*/
$topic = $rk->newTopic("test");
/**
* Producing messages
* The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
* 第一個引數是分割槽,RD_KAFKA_PARTITION_UA 表示未分配,並且由 librdkafka 選擇分割槽。
* The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue.
* 第二個引數是訊息標誌,為 0 或 RD_KAFKA_MSG_F_BLOCK,當佇列滿了時阻止生產訊息。
* The message payload can be anything.
* 訊息可以是任何內容。
*/
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");
/**
* Proper shutdown
* This should be done prior to destroying a producer instance
* to make sure all queued and in-flight produce requests are completed before terminating.
* 關閉生產者例項前需確保所有在佇列中和正在生產的生產請求都已完成。
* Not calling flush can lead to message loss!
* 不呼叫flush會導致訊息丟失!
*/
$timeout_ms = 60000; // 1 minute
$rk->flush($timeout_ms);
echo 'finished';exit;