kafka-0.10.0.1版本在java客戶端傳送資料接收不到
阿新 • • 發佈:2018-12-12
錯誤日誌:
org.apache.kafka.common.errors.TimeoutException: Batch containing 100 record
kafka版本:0.10.0.1
zookeeper版本: 3.4.5
產生的現象:
編寫好java客戶端後,給kafka傳送資料,可是資料會一直髮送不過去,而且顯示上面那種TimeoutException錯誤.
生產者配置:
import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaProducerTest{ public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.72.18:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for(int i = 0; i < 100; i++){ System.out.println(Integer.toString(i)); producer.send(new ProducerRecord<String, String>("test1001", Integer.toString(i), Integer.toString(i))); } producer.close(); } }
消費者程式碼:
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaConsumerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.72.18:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("test1001")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } } }
解決:
最終是傳送配置應該不正確,原來的配置:
# The id of the broker. This must be set to a unique integer for each broker. broker.id=0 ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = security_protocol://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://name1:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 advertised.listeners=PLAINTEXT://name1:9092 # The number of threads handling network requests num.network.threads=3
修改後的配置:
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.72.18:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://192.168.72.18:9092
# The number of threads handling network requests
num.network.threads=3
這樣把ip寫好,不使用名字對映,就正常了