1. 程式人生 > >Kafka_Kafka 消費者 偏移量 與 積壓 查詢指令碼 kafka-consumer-groups.sh

Kafka_Kafka 消費者 偏移量 與 積壓 查詢指令碼 kafka-consumer-groups.sh

本文章對應的 kafka 版本是  kafka_2.11-0.10.0.1

版本號的含義

scala 2.11

kafka 0.10.0.1

背景:

   kafka 0.9 及以上 有了一個大版本變化,

主要有以下幾個方面:

  1.kafka-client 不再區分高低api

  2.kafka 消費者偏移量資訊 不再單純的儲存在 zookeeper 中, kafka 會自己維護自己的 消費情況。

     對於某些特殊的情況:如 kafka-console-consumer ,  目前在 0.10.0.1 還是會儲存在 zookeeper 中。

版本變遷  對 comsumer 影響 圖解:

正文:

  根據上面的背景介紹,我們不難看出針對於 0.9 以及 0.9 以下的版本檢視 kafka 消費偏移量 的方式 有所不同。

由於目前主流環境都升級到 0.8 + , 這裡我們針對於  >= 0.9 的版本進行講解。

查詢方法:

  這裡我們講解的方法主要是通過原生 kafka 提供的工具指令碼進行查詢。

  工具指令碼的位置與名稱  為  bin/kafka-consumer-groups.sh

[ro[email protected] my_bin]# cd $KAFKA_HOME 
[[email protected] kafka]# cd bin/
[
[email protected]
bin]# ll 總用量 116 -rwxr-xr-x. 1 root root 1052 8月 4 2016 connect-distributed.sh -rwxr-xr-x. 1 root root 1051 8月 4 2016 connect-standalone.sh -rwxr-xr-x. 1 root root 861 8月 4 2016 kafka-acls.sh -rwxr-xr-x. 1 root root 864 8月 4 2016 kafka-configs.sh -rwxr-xr-x. 1 root root 945 8月 4 2016 kafka-console-consumer.sh -rwxr-xr-x. 1 root root 944 8月 4 2016 kafka-console-producer.sh -rwxr-xr-x. 1 root root 871 8月 4 2016 kafka-consumer-groups.sh -rwxr-xr-x. 1 root root 872 8月 4 2016 kafka-consumer-offset-checker.sh -rwxr-xr-x. 1 root root 948 8月 4 2016 kafka-consumer-perf-test.sh -rwxr-xr-x. 1 root root 862 8月 4 2016 kafka-mirror-maker.sh -rwxr-xr-x. 1 root root 886 8月 4 2016 kafka-preferred-replica-election.sh -rwxr-xr-x. 1 root root 959 8月 4 2016 kafka-producer-perf-test.sh -rwxr-xr-x. 1 root root 874 8月 4 2016 kafka-reassign-partitions.sh -rwxr-xr-x. 1 root root 868 8月 4 2016 kafka-replay-log-producer.sh -rwxr-xr-x. 1 root root 874 8月 4 2016 kafka-replica-verification.sh -rwxr-xr-x. 1 root root 6358 8月 4 2016 kafka-run-class.sh -rwxr-xr-x. 1 root root 1364 8月 4 2016 kafka-server-start.sh -rwxr-xr-x. 1 root root 975 8月 4 2016 kafka-server-stop.sh -rwxr-xr-x. 1 root root 870 8月 4 2016 kafka-simple-consumer-shell.sh -rwxr-xr-x. 1 root root 945 8月 4 2016 kafka-streams-application-reset.sh -rwxr-xr-x. 1 root root 863 8月 4 2016 kafka-topics.sh -rwxr-xr-x. 1 root root 958 8月 4 2016 kafka-verifiable-consumer.sh -rwxr-xr-x. 1 root root 958 8月 4 2016 kafka-verifiable-producer.sh drwxr-xr-x. 2 root root 4096 8月 4 2016 windows -rwxr-xr-x. 1 root root 867 8月 4 2016 zookeeper-security-migration.sh -rwxr-xr-x. 1 root root 1381 8月 4 2016 zookeeper-server-start.sh -rwxr-xr-x. 1 root root 978 8月 4 2016 zookeeper-server-stop.sh -rwxr-xr-x. 1 root root 968 8月 4 2016 zookeeper-shell.sh

我們首先執行下這個指令碼,看下幫助:

Option                                 Description                           
------                                 -----------                           
--bootstrap-server <server to connect  REQUIRED (only when using new-        
  to>                                    consumer): The server to connect to.
--command-config <command config       Property file containing configs to be
  property file>                         passed to Admin Client and Consumer.
--delete                               Pass in groups to delete topic        
                                         partition offsets and ownership     
                                         information over the entire consumer
                                         group. For instance --group g1 --   
                                         group g2                            
                                       Pass in groups with a single topic to 
                                         just delete the given topic's       
                                         partition offsets and ownership     
                                         information for the given consumer  
                                         groups. For instance --group g1 --  
                                         group g2 --topic t1                 
                                       Pass in just a topic to delete the    
                                         given topic's partition offsets and 
                                         ownership information for every     
                                         consumer group. For instance --topic
                                         t1                                  
                                       WARNING: Group deletion only works for
                                         old ZK-based consumer groups, and   
                                         one has to use it carefully to only 
                                         delete groups that are not active.  
--describe                             Describe consumer group and list      
                                         offset lag related to given group.  
--group <consumer group>               The consumer group we wish to act on. 
--list                                 List all consumer groups.             
--new-consumer                         Use new consumer.                     
--topic <topic>                        The topic whose consumer group        
                                         information should be deleted.      
--zookeeper <urls>                     REQUIRED (unless new-consumer is      
                                         used): The connection string for the
                                         zookeeper connection in the form    
                                         host:port. Multiple URLS can be     
                                         given to allow fail-over.

這裡我們先編寫一個生產者,消費者的例子,完整原始碼在最後面(java 實現)

我們先啟動消費者,再啟動生產者, 再通過 bin/kafka-consumer-groups.sh 進行消費偏移量查詢,

注意:

   在執行指令碼查詢的時候,對於新的 kafka 自維護的 偏移量的  消費者來說 , 消費者 同時執行 是必須的。

否則會出現 group.id 指定 的 group 查詢不到的錯誤 !!!!

由於kafka 消費者記錄 group 的消費 偏移量 有兩種方式 :

1)kafka 自維護 (新)

2)zookpeer 維護  (舊)  ,已經逐漸被廢棄

所以 ,指令碼檢視  消費偏移量的方式有兩種   kafka自維護  / zookeeper維護

kafka 維護  消費偏移量的 情況:

1. 檢視有那些 group ID 正在進行消費: 

[[email protected] bin]# kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.75.128:9092 --list
group

kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.75.128:9092 --list

注意:

   這裡面是沒有指定 topic 的,所以檢視的所有的 topic 的 消費者 的 group.id  的列表。

注意: 重名的 group.id 只會顯示一次 

2.檢視指定group.id 的消費者消費情況 

[[email protected] bin]# kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.75.128:9092 --group group --describe
GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
group                          producer_consumer_java_test_20181009 0          2436            2437            1               consumer-1_/192.168.75.1
group                          test_find1                     0          303094          303094          0               consumer-1_/192.168.75.1
group                          test_find1                     1          303068          303068          0               consumer-1_/192.168.75.1
group                          test_find1                     2          303713          303713          0               consumer-1_/192.168.75.1

kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.75.128:9092 --group group --describe

zookeeper 維護  消費偏移量的 情況:

1. 檢視有那些 group ID 正在進行消費: 

[[email protected] bin]# kafka-consumer-groups.sh --zookeeper localhost:2181 --list
console-consumer-28542

[[email protected] bin]# kafka-consumer-groups.sh --zookeeper localhost:2181 --list  

2.檢視指定group.id 的消費者消費情況 

[[email protected] bin]# kafka-consumer-groups.sh --zookeeper localhost:2181 --group console-consumer-28542 --describe
GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
console-consumer-28542         test_find1                     0          303094          303094          0               console-consumer-28542_master-1539167387803-268319a0-0
console-consumer-28542         test_find1                     1          303068          303068          0               console-consumer-28542_master-1539167387803-268319a0-0
console-consumer-28542         test_find1                     2          303713          303713          0               console-consumer-28542_master-1539167387803-268319a0-0

kafka-consumer-groups.sh --zookeeper localhost:2181 --group console-consumer-28542 --describe

常見問題:

[[email protected] bin]# kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --group group --describe

java.lang.RuntimeException: Request GROUP_COORDINATOR failed on brokers List(localhost:9092 (id: -1 rack: null))
	at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:67)
	at kafka.admin.AdminClient.findCoordinator(AdminClient.scala:72)
	at kafka.admin.AdminClient.describeGroup(AdminClient.scala:125)
	at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:147)
	at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:315)
	at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describe(ConsumerGroupCommand.scala:86)
	at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describe(ConsumerGroupCommand.scala:303)
	at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:65)
	at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)

產生原因:

kafka 的 conf/server.properties  中設定了 host.name

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=127.0.0.1:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

host.name=192.168.75.128

#徹底刪除 topic
delete.topic.enable=true

==============================================

題外篇: 

半多執行緒的生產者與消費者

生產者:

KafkaProducerSingleton.java
package test.kafka.vm.half_multi_thread;


import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;

public final class KafkaProducerSingleton {

    private static final Logger LOGGER = LoggerFactory
            .getLogger(KafkaProducerSingleton.class);

    private static KafkaProducer<String, String> kafkaProducer;

    private Random random = new Random();

    private String topic;

    private int retry;

    private KafkaProducerSingleton() {

    }


    /**
     * 靜態內部類
     *
     * @author tanjie
     */
    private static class LazyHandler {

        private static final KafkaProducerSingleton instance = new KafkaProducerSingleton();
    }

    /**
     * 單例模式,kafkaProducer是執行緒安全的,可以多執行緒共享一個例項
     *
     * @return
     */
    public static final KafkaProducerSingleton getInstance() {
        return LazyHandler.instance;
    }

    /**
     * kafka生產者進行初始化
     *
     * @return KafkaProducer
     */
    public void init(String topic, int retry) {
        this.topic = topic;
        this.retry = retry;
        if (null == kafkaProducer) {
            Properties props = new Properties();
            InputStream inStream = null;
            try {
                inStream = this.getClass().getClassLoader()
                        .getResourceAsStream("test/config/kafka/kafka.properties");
                props.load(inStream);
                //ISR 確認機制
                props.put(ProducerConfig.ACKS_CONFIG,"1");
                kafkaProducer = new KafkaProducer<String, String>(props);
            } catch (IOException e) {
                LOGGER.error("kafkaProducer初始化失敗:" + e.getMessage(), e);
            } finally {
                if (null != inStream) {
                    try {
                        inStream.close();
                    } catch (IOException e) {
                        LOGGER.error("kafkaProducer初始化失敗:" + e.getMessage(), e);
                    }
                }
            }
        }
    }

    /**
     * 通過kafkaProducer傳送訊息
     *
     * @param topic        訊息接收主題
     * @param partitionNum 哪一個分割槽
     * @param retry        重試次數
     * @param message      具體訊息值
     */
    public void sendKafkaMessage(final String message) {
        /**
         * 1、如果指定了某個分割槽,會只講訊息發到這個分割槽上 2、如果同時指定了某個分割槽和key,則也會將訊息傳送到指定分割槽上,key不起作用
         * 3、如果沒有指定分割槽和key,那麼將會隨機發送到topic的分割槽中 4、如果指定了key,那麼將會以hash<key>的方式傳送到分割槽中
         */
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                topic, random.nextInt(3), "", message);
        // send方法是非同步的,新增訊息到快取區等待發送,並立即返回,這使生產者通過批量傳送訊息來提高效率
        // kafka生產者是執行緒安全的,可以單例項傳送訊息
        kafkaProducer.send(record, new Callback() {
            public void onCompletion(RecordMetadata recordMetadata,
                                     Exception exception) {
                if (null != exception) {
                    LOGGER.error("kafka傳送訊息失敗:" + exception.getMessage(),
                            exception);
                    retryKakfaMessage(message);
                }
            }
        });
    }

    /**
     * 當kafka訊息傳送失敗後,重試
     *
     * @param retryMessage
     */
    private void retryKakfaMessage(final String retryMessage) {
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                topic, random.nextInt(3), "", retryMessage);
        for (int i = 1; i <= retry; i++) {
            try {
                kafkaProducer.send(record);
                return;
            } catch (Exception e) {
                LOGGER.error("kafka傳送訊息失敗:" + e.getMessage(), e);
                retryKakfaMessage(retryMessage);
            }
        }
    }

    /**
     * kafka例項銷燬
     */
    public void close() {
        if (null != kafkaProducer) {
            kafkaProducer.close();
        }
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public int getRetry() {
        return retry;
    }

    public void setRetry(int retry) {
        this.retry = retry;
    }

}
ProducerHandler.java
package test.kafka.vm.half_multi_thread;


public class ProducerHandler implements Runnable {

    private String message;

    public ProducerHandler(String message) {
        this.message = message;
    }

    @Override
    public void run() {
        KafkaProducerSingleton kafkaProducerSingleton = KafkaProducerSingleton
                .getInstance();
        kafkaProducerSingleton.init("test_find1", 3);

        int i = 0;
        while (true) {

            try{
                System.out.println("當前執行緒:" + Thread.currentThread().getName()
                        + ",獲取的kafka例項:" + kafkaProducerSingleton);
                kafkaProducerSingleton.sendKafkaMessage("傳送訊息: " + message + " " + (++i));
                Thread.sleep(100);
            }catch (Exception e){

            }
        }

    }

}

啟動主函式:

ProducerMain.java
package test.kafka.vm.half_multi_thread;

/**
 * Created by szh on 2018/10/10.
 */
public class ProducerMain {

    public static void main(String[] args){
        Thread thread = new Thread(new ProducerHandler("qqq"));
        thread.start();
    }

}

消費者:

Kafka_ConsumerAuto.java

package test.kafka.vm.half_multi_thread;


import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public final class Kafka_ConsumerAuto {

    /**
     * kafka消費者不是執行緒安全的
     */
    private final KafkaConsumer<String, String> consumer;

    private ExecutorService executorService;

    public Kafka_ConsumerAuto() {
        Properties props = new Properties();
        props.put("bootstrap.servers",
                "192.168.75.128:9092");
        props.put("group.id", "group");
        // 關閉自動提交
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "100");
        props.put("session.timeout.ms", "30000");

        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("test_find1"));
    }

    public void execute() {
        executorService = Executors.newFixedThreadPool(3);
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10);
            if (null != records) {
                executorService.submit(new ConsumerThreadAuto(records, consumer));
            }
        }
    }

    public void shutdown() {
        try {
            if (consumer != null) {
                consumer.close();
            }
            if (executorService != null) {
                executorService.shutdown();
            }
            if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
                System.out.println("Timeout");
            }
        } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
        }
    }

}
ConsumerThreadAuto.java
package test.kafka.vm.half_multi_thread;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.List;

/**
 * 多消費者,多個work執行緒,難保證分割槽訊息消費的順序性
 *
 * @author tanjie
 */
public final class ConsumerThreadAuto implements Runnable {

    private ConsumerRecords<String, String> records;

    private KafkaConsumer<String, String> consumer;

    public ConsumerThreadAuto(ConsumerRecords<String, String> records,
                              KafkaConsumer<String, String> consumer) {
        this.records = records;
        this.consumer = consumer;
    }

    @Override
    public void run() {

        for(ConsumerRecord<String,String> record : records){
            System.out.println("當前執行緒:" + Thread.currentThread() + ","
                    + "偏移量:" + record.offset() + "," + "主題:"
                    + record.topic() + "," + "分割槽:" + record.partition()
                    + "," + "獲取的訊息:" + record.value());
        }
    }
}
ConsumerAutoMain.java
package test.kafka.vm.half_multi_thread;

/**
 * Created by szh on 2018/10/10.
 */
public class ConsumerAutoMain {


    public static void main(String[] args) {
        Kafka_ConsumerAuto kafka_consumerAuto = new Kafka_ConsumerAuto();
        try {
            kafka_consumerAuto.execute();
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            kafka_consumerAuto.shutdown();
        }
    }


}