kafka重置到最新offset偏移量
阿新 • • 發佈:2018-05-09
ray IT 每次 lis 設置 通過 默認 cname 解決問題
小弟近日用kafka測試傳輸數據設置的單消費者,不料消費者頭天晚上就掛掉了 ,重啟消費者,因為auto.offset.reset 默認為latest,所以消費者從昨天晚上的數據接著消費,因為差了一晚上了,消費者一時半會追不上生產者的步伐,而我又需要實時展示數據,且又不能每次重啟消費者重新賦予group.id。所以需要手動修改偏移量到最新。
最後通過以下代碼解決問題
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
…………
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition partition0 = new TopicPartition("topicName", 0);
consumer.assign(Arrays.asList(partition0,partition1,partition2));
// consumer.seek(partition0, 220);
consumer.seekToEnd(Arrays.asList(partition0));
}
解決!
參考了三度微涼的博客:https://blog.csdn.net/yu280265067/article/details/69390094
kafka重置到最新offset偏移量