Kafka 入門教程之二: Java連線Kafka之生產者
阿新 • • 發佈:2019-01-09
1. 檢查service配置檔案
修改引數 advertised.listeners=PLAINTEXT://tjtestrac1:9092
注意防火牆對埠的遮蔽
[[email protected] config]$ vi server.properties
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
#listeners=PLAINTEXT:tjtestrac1:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://tjtestrac1:9092
2. 重啟kafka 服務
這裡採用最簡單粗暴的方式關閉 生產千萬不要這樣
[[email protected] config]$ jps
12048 Jps
30323 QuorumPeerMain
4739 Kafka
[ [email protected] config]$ kill -9 30323
[[email protected] config]$ kill -9 4739
[[email protected] config]$ jps
12727 Jps
確認關閉乾淨後 重新啟動
[[email protected] config]$ zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties &
[1] 16757
[[email protected] config]$ jps
17093 Jps
16757 QuorumPeerMain
[[email protected] config]$ kafka-server-start.sh $KAFKA_HOME/config/server.properties &
[[email protected] config]$ jps
19858 Kafka
16757 QuorumPeerMain
20600 Jps
2. Kafka producer 的體系結構和基本概念
Producer component
Producer 提供了很多不同場景下的 API 介面:
1. 對於傳統的credit card 交易系統是不允許丟失資料以及產生錯誤的重複資料
1. 對於網際網路資料使用者行為跟蹤等資料的採集,系統是可以容忍資料丟失以及錯誤資料的產生的
3. Eclipse 構建Kafka 專案 (Maven 構建)
a) 建立名為kafkaClient 的專案
b) 配置pom.xml 檔案
https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.12/2.1.0
配置slf4j 服務
https://mvnrepository.com/artifact/org.slf4j/slf4j-api/1.8.0-beta2
https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12/1.8.0-beta2
https://mvnrepository.com/artifact/org.slf4j/slf4j-nop/1.8.0-beta2
https://mvnrepository.com/artifact/commons-logging/commons-logging
https://mvnrepository.com/artifact/org.slf4j/slf4j-simple/1.8.0-beta2
https://mvnrepository.com/artifact/org.slf4j/slf4j-jdk14/1.8.0-beta2
在pom 檔案中加入
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<modelVersion>4.0.0</modelVersion>
<groupId>KafkaClient</groupId>
<artifactId>KafkaClient</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.8.0-beta2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.8.0-beta2</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-logging/commons-logging -->
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-nop -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.8.0-beta2</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.8.0-beta2</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-jdk14 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.8.0-beta2</version>
<scope>test</scope>
</dependency>
</dependencies>
log4j.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%-5p %c{1} - %m%n"/>
</layout>
</appender>
<root>
<priority value ="debug" />
<appender-ref ref="console" />
</root>
</log4j:configuration>
c) 建立一個生產者的類 MsgSender
生產者幾個配置的必要資訊如下:
parameter name | parameter desc |
---|---|
bootstrap.servers | 配置連線Kafka 伺服器的連線字串資訊 |
key.serializer | Key 的class: 介面org.apache.kafka.common.serialization.Serializer 的實現類 |
value.serializer | Value 的 class: 生產者指定固定的類的物件向broker 傳送資訊 |
java 程式碼:
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class MsgSender {
/**
* @param args
*/
public static String TOPIC = "TestMsg";
public static void main(String[] args)throws Exception {
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "tjtestrac1:9092");
kafkaProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "123123123",
"Welcome to my home!!! ");
try {
System.out.print("sending start..............");
Future future = producer.send(record);
future.get();
System.out.print("sending end..............");
} catch (Exception e) {
e.printStackTrace();
}finally {
producer.close();
}
}
}
控制檯輸出
INFO ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [tjtestrac1:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
登陸伺服器 驗證訊息是否傳送成功
[[email protected] config]$ kafka-topics.sh --list --zookeeper localhost:2181
TestMsg
__consumer_offsets
test
[[email protected] config]$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestMsg --from-beginning
Welcome to my home!!!
Welcome to my home!!!
Welcome to my home!!!