1. 程式人生 > >Kafka-kafka 重置偏移量 :通過 kafka-consumer-groups.sh 針對 >= kafka 0.11

Kafka-kafka 重置偏移量 :通過 kafka-consumer-groups.sh 針對 >= kafka 0.11

參考文章

1. Kafka consumer group位移0ffset重設

本文書寫環境:

kafka_2.12-2.0.0.jar

scala 2.12 編譯下的 kafka 2.0.0 版本。

在資料開發中,有時候可能會遇到 kafka 中的資料需要重算的情況。如果是 2-3 年前,技術方案絕大部分是,

方案一:

      重新將篩選的資料再次推送到 kafka 中。

方案二:

      編寫程式,手動重新設定各個partition 的 offset 值。

現在的方案:

    隨著技術的不斷演進,現在我們可以直接通過  kafka-consumer-groups.sh  就能重新設定 offset 值了。

需要特別強調的是, 這是0.11.0.0版本提供的新功能且只適用於新版本consumer。

0.11.0.0+ 版本豐富了kafka-consumer-groups指令碼的功能,使用者可以直接使用該指令碼很方便地為已有的consumer group重新設定位移,但前提必須是consumer group必須是inactive的,即不能是處於正在工作中的狀態。

Tips : 如何確定 consumer group 是不是 inactive .

活躍中:

非活躍:

先務虛一下。總體來說,重設位移的流程由3步組成,如下圖所示:

看下 執行指令碼給出的提示  :

Option                                  Description                            
------                                  -----------                            
--all-topics                            Consider all topics assigned to a      
                                          group in the `reset-offsets` process.
--bootstrap-server <String: server to   REQUIRED: The server(s) to connect to. 
  connect to>                                                                  
--by-duration <String: duration>        Reset offsets to offset by duration    
                                          from current timestamp. Format:      
                                          'PnDTnHnMnS'                         
--command-config <String: command       Property file containing configs to be 
  config property file>                   passed to Admin Client and Consumer. 
--delete                                Pass in groups to delete topic         
                                          partition offsets and ownership      
                                          information over the entire consumer 
                                          group. For instance --group g1 --    
                                          group g2                             
--describe                              Describe consumer group and list       
                                          offset lag (number of messages not   
                                          yet processed) related to given      
                                          group.                               
--dry-run                               Only show results without executing    
                                          changes on Consumer Groups.          
                                          Supported operations: reset-offsets. 
--execute                               Execute operation. Supported           
                                          operations: reset-offsets.           
--export                                Export operation execution to a CSV    
                                          file. Supported operations: reset-   
                                          offsets.                             
--from-file <String: path to CSV file>  Reset offsets to values defined in CSV 
                                          file.                                
--group <String: consumer group>        The consumer group we wish to act on.  
--list                                  List all consumer groups.              
--members                               Describe members of the group. This    
                                          option may be used with '--describe' 
                                          and '--bootstrap-server' options     
                                          only.                                
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          members                              
--offsets                               Describe the group and list all topic  
                                          partitions in the group along with   
                                          their offset lag. This is the        
                                          default sub-action of and may be     
                                          used with '--describe' and '--       
                                          bootstrap-server' options only.      
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          offsets                              
--reset-offsets                         Reset offsets of consumer group.       
                                          Supports one consumer group at the   
                                          time, and instances should be        
                                          inactive                             
                                        Has 2 execution options: --dry-run     
                                          (the default) to plan which offsets  
                                          to reset, and --execute to update    
                                          the offsets. Additionally, the --    
                                          export option is used to export the  
                                          results to a CSV format.             
                                        You must choose one of the following   
                                          reset specifications: --to-datetime, 
                                          --by-period, --to-earliest, --to-    
                                          latest, --shift-by, --from-file, --  
                                          to-current.                          
                                        To define the scope use --all-topics   
                                          or --topic. One scope must be        
                                          specified unless you use '--from-    
                                          file'.                               
--shift-by <Long: number-of-offsets>    Reset offsets shifting current offset  
                                          by 'n', where 'n' can be positive or 
                                          negative.                            
--state                                 Describe the group state. This option  
                                          may be used with '--describe' and '--
                                          bootstrap-server' options only.      
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          state                                
--timeout <Long: timeout (ms)>          The timeout that can be set for some   
                                          use cases. For example, it can be    
                                          used when describing the group to    
                                          specify the maximum amount of time   
                                          in milliseconds to wait before the   
                                          group stabilizes (when the group is  
                                          just created, or is going through    
                                          some changes). (default: 5000)       
--to-current                            Reset offsets to current offset.       
--to-datetime <String: datetime>        Reset offsets to offset from datetime. 
                                          Format: 'YYYY-MM-DDTHH:mm:SS.sss'    
--to-earliest                           Reset offsets to earliest offset.      
--to-latest                             Reset offsets to latest offset.        
--to-offset <Long: offset>              Reset offsets to a specific offset.    
--topic <String: topic>                 The topic whose consumer group         
                                          information should be deleted or     
                                          topic whose should be included in    
                                          the reset offset process. In `reset- 
                                          offsets` case, partitions can be     
                                          specified using this format: `topic1:
                                          0,1,2`, where 0,1,2 are the          
                                          partition to be included in the      
                                          process. Reset-offsets also supports 
                                          multiple topic inputs.               
--verbose                               Provide additional information, if     
                                          any, when describing the group. This 
                                          option may be used with '--          
                                          offsets'/'--members'/'--state' and   
                                          '--bootstrap-server' options only.   
                                        Example: --bootstrap-server localhost: 
                                          9092 --describe --group group1 --    
                                          members --verbose           

我們進行進一步的整理 : 

確定topic作用域——當前有3種作用域指定方式:


    --all-topics(為consumer group下所有topic的所有分割槽調整位移),


    --topic t1 --topic t2(為指定的若干個topic的所有分割槽調整位移),


    --topic t1:0,1,2(為指定的topic分割槽調整位移)

確定位移重設策略——當前支援8種設定規則:

    --to-earliest:把位移調整到分割槽當前最小位移

    --to-latest:把位移調整到分割槽當前最新位移

    --to-current:把位移調整到分割槽當前位移

    --to-offset <offset>: 把位移調整到指定位移處

    --shift-by N: 把位移調整到當前位移 + N處,注意N可以是負數,表示向前移動

    --to-datetime <datetime>:把位移調整到大於給定時間的最早位移處,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000

    --by-duration <duration>:把位移調整到距離當前時間指定間隔的位移處,duration格式是PnDTnHnMnS,比如PT0H5M0S

   --from-file <file>:從CSV檔案中讀取調整策略

確定執行方案——當前支援3種方案:

    什麼引數都不加:只是打印出位移調整方案,不具體執行

    --execute:執行真正的位移調整

    --export:把位移調整方案按照CSV格式列印,方便使用者成csv檔案,供後續直接使用

     針對上面的8種策略,本文重點演示前面7種策略。首先,我們建立一個測試topic,5個分割槽,併發送5,000,000條測試訊息:

> bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 5 --replication-factor 1 --topic test

Created topic "test".

> bin/kafka-producer-perf-test.sh --topic test --num-records 5000000 --throughput -1 --record-size 100 --producer-props bootstrap.servers=localhost:9092 acks=-1



1439666 records sent, 287760.5 records/sec (27.44 MB/sec), 75.7 ms avg latency, 317.0 max latency.
1541123 records sent, 308163.0 records/sec (29.39 MB/sec), 136.4 ms avg latency, 480.0 max latency.
1878025 records sent, 375529.9 records/sec (35.81 MB/sec), 58.2 ms avg latency, 600.0 max latency.
5000000 records sent, 319529.652352 records/sec (30.47 MB/sec), 86.33 ms avg latency, 600.00 ms max latency, 38 ms 50th, 319 ms 95th, 516 ms 99th, 591 ms 99.9th.

     然後,啟動一個console consumer程式,組名設定為test-group:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer-property group.id=test-group

..............

      待執行一段時間後關閉consumer程式將group設定為inactive。現在執行kafka-consumer-groups.sh指令碼首先確定當前group的消費進度:

bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --describe
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 1000000 1000000 0 consumer-1-8688633a-2f88-4c41-89ca-fd0cd6d19ec7 /127.0.0.1 consumer-1
test 1 1000000 1000000 0 consumer-1-8688633a-2f88-4c41-89ca-fd0cd6d19ec7 /127.0.0.1 consumer-1
test 2 1000000 1000000 0 consumer-1-8688633a-2f88-4c41-89ca-fd0cd6d19ec7 /127.0.0.1 consumer-1
test 3 1000000 1000000 0 consumer-1-8688633a-2f88-4c41-89ca-fd0cd6d19ec7 /127.0.0.1 consumer-1
test 4 1000000 1000000 0 consumer-1-8688633a-2f88-4c41-89ca-fd0cd6d19ec7 /127.0.0.1 consumer-1

      由上面輸出可知,當前5個分割槽LAG列的值都是0,表示全部消費完畢。現在我們演示下如何重設位移。

1. --to-earliest

bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-earliest --execute
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

TOPIC PARTITION NEW-OFFSET 
test 0 0 
test 1 0 
test 4 0 
test 3 0 
test 2 0

上面輸出表明,所有分割槽的位移都已經被重設為0

2. --to-latest

bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-latest --execute
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

TOPIC PARTITION NEW-OFFSET 
test 0 1000000 
test 1 1000000 
test 4 1000000 
test 3 1000000 
test 2 1000000

上面輸出表明,所有分割槽的位移都已經被重設為最新位移,即1,000,000

3.  --to-offset <offset>

bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

TOPIC PARTITION NEW-OFFSET 
test 0 500000 
test 1 500000 
test 4 500000 
test 3 500000 
test 2 500000

上面輸出表明,所有分割槽的位移都已經調整為給定的500000

4.  --to-current

bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

TOPIC PARTITION NEW-OFFSET 
test 0 500000 
test 1 500000 
test 4 500000 
test 3 500000 
test 2 500000

輸出表明所有分割槽的位移都已經被移動到當前位移(這個有點傻,因為位移距上一步沒有變動)

5. --shift-by N

bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

TOPIC PARTITION NEW-OFFSET 
test 0 400000 
test 1 400000 
test 4 400000 
test 3 400000 
test 2 400000

輸出表明所有分割槽的位移被移動到(500000 - 100000) = 400000處

6. --to-datetime

bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

TOPIC PARTITION NEW-OFFSET 
test 0 1000000 
test 1 1000000 
test 4 1000000 
test 3 1000000 
test 2 1000000

將所有分割槽的位移調整為2017年8月4日14:30之後的最早位移

注意:這裡需要根據時區設定時間,如以東8時區進行設定例子:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group --reset-offsets --all-topics --to-datetime 2018-10-23T18:50:00.000+08:00 --execute

時間 :  2018-10-23T18:50:00.000+08:00

[[email protected] ~]# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group --reset-offsets --topic test_find --to-datetime 2018-10-23T18:50:00.000+08:00 --execute

TOPIC                          PARTITION  NEW-OFFSET     
test_find                      0          2              
test_find                      4          2              
test_find                      3          2              
test_find                      1          2              
test_find                      2          2    

7. --by-duration

bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --by-duration PT0H30M0S
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

TOPIC PARTITION NEW-OFFSET 
test 0 0 
test 1 0 
test 4 0 
test 3 0 
test 2 0

將所有分割槽位移調整為30分鐘之前的最早位移

相關推薦

Kafka-kafka 偏移 通過 kafka-consumer-groups.sh 針對 >= kafka 0.11

參考文章 1. Kafka consumer group位移0ffset重設 本文書寫環境: kafka_2.12-2.0.0.jar scala 2.12 編譯下的 kafka 2.0.0 版本。 在資料開發中,有時候可能會遇到 kafka 中的資料需

Linux C ftruncate 函式清空檔案注意事項(要使用 lseek 偏移

DESCRIPTION        The truncate() and ftruncate() functions cause the regular file named by path or referenced by fd to be truncated to a size of precisely

分散式訊息系統Kafka(五)偏移

5、偏移量提交 5.1 偏移量 (1)新舊版本偏移量的變化   在Kafka0.9版本之前消費者儲存的偏移量是在zookeeper中/consumers/GROUP.ID/offsets/TOPIC.NAME/PARTITION.ID。新版消費者不在儲

Kafka_Kafka 消費者 偏移 與 積壓 查詢指令碼 kafka-consumer-groups.sh

本文章對應的 kafka 版本是  kafka_2.11-0.10.0.1 版本號的含義 scala 2.11 kafka 0.10.0.1 背景:    kafka 0.9 及以上 有了一個大版本變化, 主要有以下幾個方面:   1.kafka-cli

zookeeper上修改kafka消費組的偏移

[[email protected] bin]$ zookeeper-shell.sh 192.168.0.1:2181 Connecting to 192.168.0.1:2181 Wel

kafka手動修改消費者偏移

1.建立一個測試主題: [[email protected] bin]#./kafka-topics.sh --zookeeper snn:2181 --topic offset-test --partitions 2 --replication-factor 2

python sys.defaultencoding

ng- tracking open 變量 sys print ext str order <span style="font-family: Arial, Helvetica, sans-serif;">比如重置變量defaulten

kafka到最新offset偏移

ray IT 每次 lis 設置 通過 默認 cname 解決問題 小弟近日用kafka測試傳輸數據設置的單消費者,不料消費者頭天晚上就掛掉了 ,重啟消費者,因為auto.offset.reset 默認為latest,所以消費者從昨天晚上的數據接著消費,因為差了一晚上了,消

Kafka(七)消費者偏移

sof () 取模 失敗 data 兩種方法 保存 庫存 num 在Kafka0.9版本之前消費者保存的偏移量是在zookeeper中/consumers/GROUP.ID/offsets/TOPIC.NAME/PARTITION.ID。新版消費者不在保存偏移量到zooke

SparkStreaming消費Kafka中的資料 使用zookeeper和MySQL儲存偏移的兩種方式

Spark讀取Kafka資料的方式有兩種,一種是receiver方式,另一種是直連方式。今天分享的SparkStreaming消費Kafka中的資料儲存偏移量的兩種方式都是基於直連方式上的 話不多說 直接上程式碼 ! 第一種是使用zookeeper儲存偏移量 object Kafka

Spark Streaming管理Kafka偏移

前言 為了讓Spark Streaming消費kafka的資料不丟資料,可以建立Kafka Direct DStream,由Spark Streaming自己管理offset,並不是存到zookeeper。啟用S​​park Streaming的 checkpoints是儲存偏移量的最簡單方法,因為它可以

如何手動更新Kafka中某個Topic的偏移

轉載: 我們在使用consumer消費資料時,有些情況下我們需要對已經消費過的資料進行重新消費,這裡介紹kafka中兩種重新消費資料的方法。 1. 修改offset 我們在使用consumer消費的時候,每個topic會產生一個偏移量,這個偏移量保證我們消費的訊息順

kafkaconsumer的offset 資料重複消費

[[email protected] ~]/opt/cloudera/parcels/KAFKA-2.0.1-1.2.0.1.p0.5/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --br

Spark Streaming 之 Kafka 偏移管理

本文主要介紹 Spark Streaming 應用開發中消費 Kafka 訊息的相關內容,文章著重突出了開發環境的配置以及手動管理 Kafka 偏移量的實現。 一、開發環境 1、元件版本 CDH 叢集版本:6.0.1 Spark 版本:2.2.0 Kafka 版本:1.0.1 2、M

使用redis儲存kafka偏移

使用redis儲存kafka的偏移量 轉自:Lu_Xiao_Yue 使用Redis來記錄偏移量,以前用receive方式時,使用zookeeper儲存偏移量,不用自己儲存偏移量,使用直連方式可以自己儲存偏移量,更加靈活。在直連方式中,儲存偏移量可以使用zookeeper,也可以使用mys

docker通過docker-compose部署kafka

通過github安裝1.14.0版本的docker-compose curl -L https://github.com/docker/compose/releases/download/1.14.0/docker-compose-`uname -s`-`uname -m` >

kafka同步非同步消費和訊息的偏移(四)

1. 消費者位置(consumer position) 因為kafka服務端不儲存訊息的狀態,所以消費端需要自己去做很多事情。我們每次呼叫poll()方法他總是返回已經儲存在生產者佇列中還未被消費者消費的訊息。訊息在每一個分割槽中都是順序的,那麼必然可以通過一

如何管理Spark Streaming消費Kafka偏移(二)

上篇文章,討論了在spark streaming中管理消費kafka的偏移量的方式,本篇就接著聊聊上次說升級失敗的案例。 事情發生一個月前,由於當時我們想提高spark streaming程式的並行處理效能,於是需要增加kafka分割槽個數,,這裡需要說下,在新版本sp

如何管理Spark Streaming消費Kafka偏移(三)

前面的文章已經介紹了在spark streaming整合kafka時,如何處理其偏移量的問題,由於spark streaming自帶的checkpoint弊端非常明顯,所以一些對資料一致性要求比較高的專案裡面,不建議採用其自帶的checkpoint來做故障恢復。 在sp

Spark+Kafka的Direct方式將偏移傳送到Zookeeper的實現

Apache Spark 1.3.0引入了Direct API,利用Kafka的低層次API從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,並且通過這種方式去實現零資料丟失(zero data loss)相比使用基於Receiver的方法要高效。但是因為是Spar