1. 程式人生 > 實用技巧 >php呼叫kafka訊息佇列

php呼叫kafka訊息佇列

2020年10月27日16:49:33

環境php 7.3 laravel 8

擴充套件https://github.com/arnaud-lb/php-rdkafka

其他的php擴充套件不是很久沒更新就是擴充套件關係亂七八糟,建議使用rdkafka

http://pecl.php.net/package/rdkafka 有編譯好的dll擴充套件

注意:將其中php_rdkafka.dll放入php目錄下的ext資料夾內,librdkafka.dll放入php根目錄下,然後修改php.ini,新增:

extension=php_rdkafka.dll

php -m和phpinfo()都需要驗證是否安裝成功

程式碼:

KafkaProducerService

<?php

namespace App\Service;

use RdKafka\Conf;
use RdKafka\Producer;

class KafkaProducerService {

    public static function doTask() {

        $conf = new Conf();
//        $conf->set('log_level', (string) LOG_DEBUG);
//        $conf->set('debug', 'all');
        $rk
= new Producer($conf); $rk->addBrokers('172.18.0.105'); $topic = $rk->newTopic('zx'); $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message 222222222222"); $rk->poll(1); } }
View Code

KafkaConsumerService

<?php

namespace App\Service;

use RdKafka\Conf;
use RdKafka\Consumer; class KafkaConsumerService { public static function doTask() { $conf = new Conf(); // $conf->set('log_level', (string) LOG_DEBUG); // $conf->set('debug', 'all'); $rk = new Consumer($conf); $rk->addBrokers('172.18.0.105'); $topic = $rk->newTopic('zx'); $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); while (true) { $msg = $topic->consume(0, 5000); if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) { // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead. continue; } elseif ($msg->err) { echo $msg->errstr(), "\n"; break; } else { echo $msg->payload, "\n"; } } } }
View Code

注意:

1,你測試伺服器上的kafka必須的先測試好,不然有些錯誤,在log裡面沒法只管體現

2,$rk->poll(1);必須,不然消費端接受不到資訊,因為是編譯擴充套件,所以不清楚底層是怎麼實現的

一些測試命令

啟動檢視啟動錯誤
bin/kafka-server-start.sh config/server.properties bin/zookeeper-server-start.sh config/zookeeper.properties
172.18.0.105 本地測試服務Ip
bin/kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=3000
//刪除
bin/kafka-topics.sh --delete --topic testTopic --zookeeper localhost:2181
//列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
//建立
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zx
//生產者客戶端
bin/kafka-console-producer.sh --broker-list 172.18.0.105:9092 --topic zx
//消費者客戶端
bin/kafka-console-consumer.sh --bootstrap-server 172.18.0.105:9092 --topic zx --from-beginning

jps檢視是否啟動
9173 Kafka
9462 Jps
8589 QuorumPeerMain

現在伺服器啟動生產者客戶端,消費者客戶端測試確保kafka跑起來完全沒問題,這點一定要確保

個人的一點感悟,kafka配置和測試有一定難度,不然rabbitmq上手快,而且kafka找錯誤真的不好找