Java分散式跟蹤系統Zipkin(八):Zipkin原始碼分析-KafkaCollector
前面幾篇博文中,都是使用OkHttpSender來上報Trace資訊給Zipkin,這在生產環境中,當業務量比較大的時候,可能會成為一個性能瓶頸,這一篇博文我們來使用KafkaSender將Trace資訊先寫入到Kafka中,然後Zipkin使用KafkaCollector從Kafka中收集Span資訊。
在Brave配置中需要將Sender設定為KafkaSender,而zipkin的collector元件配置為KafkaCollector
相關程式碼在Chapter8/zipkin-kafka中
pom.xml中新增依賴
<dependency>
<groupId >io.zipkin.reporter2</groupId>
<artifactId>zipkin-sender-kafka11</artifactId>
<version>${zipkin-reporter2.version}</version>
</dependency>
TracingConfiguration中,我們修改Sender為KafkaSender,指定Kafka的地址,以及topic
@Bean
Sender sender() {
return KafkaSender.newBuilder().bootstrapServers("localhost:9091,localhost:9092,localhost:9093" ).topic("zipkin").encoding(Encoding.JSON).build();
}
我們先啟動zookeeper(預設埠號為2181),再依次啟動一個本地的3個broker的kafka叢集(埠號分別為9091、9092、9093),最後啟動一個KafkaManager(預設埠號9000),KafkaManager是Kafka的UI管理工具
關於如何搭建本地Kafka偽叢集,請自行上網搜尋教程,本文使用的Kafka版本為0.10.0.0。
kafka啟動完畢後,我們建立名為zipkin的topic,因為我們有3個broker,我這裡設定replication-factor=3
bin/windows/kafka -topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic zipkin
我們使用如下命令啟動zipkin,帶上Kafka的Zookeeper地址引數,這樣zipkin就會從kafka中消費我們上報的trace資訊。
java -jar zipkin-server-2.2.1-exec.jar --KAFKA_ZOOKEEPER=localhost:2181
然後分別執行,主意我們這裡將backend的埠改為9001,目的是為了避免和KafkaManager埠號衝突。
mvn spring-boot:run -Drun.jvmArguments="-Dserver.port=9001 -Dzipkin.service=backend"
mvn spring-boot:run -Drun.jvmArguments="-Dserver.port=8081 -Dzipkin.service=frontend"
為了看到這兩條訊息的具體內容,我們可以在kafka安裝目錄使用如下命令
bin/windows/kafka-console-consumer.bat --zookeeper localhost:2181 --topic zipkin --from-beginning
在控制檯會打印出最近的兩條訊息
[{"traceId":"802bd09f480b5faa","parentId":"802bd09f480b5faa","id":"bb3c70909ea3ee3c","kind":"SERVER","name":"get","timestamp":1510891296426607,"duration":10681,"localEndpoint":{"serviceName":"backend","ipv4":"10.200.170.137"},"remoteEndpoint":{"ipv4":"127.0.0.1","port":64421},"tags":{"http.path":"/api"},"shared":true}]
[{"traceId":"802bd09f480b5faa","parentId":"802bd09f480b5faa","id":"bb3c70909ea3ee3c","kind":"CLIENT","name":"get","timestamp":1510891296399882,"duration":27542,"localEndpoint":{"serviceName":"frontend","ipv4":"10.200.170.137"},"tags":{"http.path":"/api"}},{"traceId":"802bd09f480b5faa","id":"802bd09f480b5faa","kind":"SERVER","name":"get","timestamp":1510891296393252,"duration":39514,"localEndpoint":{"serviceName":"frontend","ipv4":"10.200.170.137"},"remoteEndpoint":{"ipv6":"::1","port":64420},"tags":{"http.path":"/"}}]
這說明我們的應用frontend和backend已經將trace資訊寫入kafka成功了!
在Zipkin的Web介面中,也能查詢到這次跟蹤資訊
在zipkin的控制檯,我們也看到跟Kafka相關的類ConsumerFetcherThread啟動,我們在後續專門分析zipkin的原始碼再來看看這個類。
2017-11-17 11:25:00.477 INFO 9292 --- [49-8e18eab0-0-1] kafka.consumer.ConsumerFetcherThread : [ConsumerFetcherThread-zipkin_LT290-1510889099649-8e18eab0-0-1], Starting
2017-11-17 11:25:00.482 INFO 9292 --- [r-finder-thread] kafka.consumer.ConsumerFetcherManager : [ConsumerFetcherManager-1510889099800] Added fetcher for partitions ArrayBuffer([[zipkin,0], initOffset 0 to broker id:1,host:10.200.170.137,port:9091] )
KafkaSender
public abstract class KafkaSender extends Sender {
public static Builder newBuilder() {
// Settings below correspond to "Producer Configs"
// http://kafka.apache.org/0102/documentation.html#producerconfigs
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
properties.put(ProducerConfig.ACKS_CONFIG, "0");
return new zipkin2.reporter.kafka11.AutoValue_KafkaSender.Builder()
.encoding(Encoding.JSON)
.properties(properties)
.topic("zipkin")
.overrides(Collections.EMPTY_MAP)
.messageMaxBytes(1000000);
}
@Override public zipkin2.Call<Void> sendSpans(List<byte[]> encodedSpans) {
if (closeCalled) throw new IllegalStateException("closed");
byte[] message = encoder().encode(encodedSpans);
return new KafkaCall(message);
}
}
KafkaSender中通過KafkaProducer客戶端來發送訊息給Kafka,在newBuilder方法中,設定了一些預設值,比如topic預設為zipkin,編碼預設用JSON,訊息最大位元組數1000000,還可以通過overrides來覆蓋預設的配置來定製KafkaProducer。
在sendSpans方法中返回KafkaCall,這個物件的execute方法,在AsyncReporter中的flush方法中會被呼叫:
void flush(BufferNextMessage bundler) {
// ...
sender.sendSpans(nextMessage).execute();
// ...
}
KafkaCall的父類BaseCall方法execute會呼叫doExecute,而在doExecute方法中使用了一個AwaitableCallback將KafkaProducer的非同步傳送訊息的方法,強制轉為了同步傳送,這裡也確實處理的比較優雅。
class KafkaCall extends BaseCall<Void> { // KafkaFuture is not cancelable
private final byte[] message;
KafkaCall(byte[] message) {
this.message = message;
}
@Override protected Void doExecute() throws IOException {
final AwaitableCallback callback = new AwaitableCallback();
get().send(new ProducerRecord<>(topic(), message), (metadata, exception) -> {
if (exception == null) {
callback.onSuccess(null);
} else {
callback.onError(exception);
}
});
callback.await();
return null;
}
@Override protected void doEnqueue(Callback<Void> callback) {
get().send(new ProducerRecord<>(topic(), message), (metadata, exception) -> {
if (exception == null) {
callback.onSuccess(null);
} else {
callback.onError(exception);
}
});
}
@Override public Call<Void> clone() {
return new KafkaCall(message);
}
}
這裡還有一個知識點,get方法每次都會返回一個新的KafkaProducer,我在第一眼看到這段程式碼時也曾懷疑,難道這裡沒有效能問題?
原來這裡用到了google的外掛autovalue裡的標籤@Memoized,結合@AutoValue標籤,它會在自動生成的類裡,給我們新增一些程式碼,可以看到get方法裡作了一層快取,所以我們的擔心是沒有必要的
@Memoized KafkaProducer<byte[], byte[]> get() {
KafkaProducer<byte[], byte[]> result = new KafkaProducer<>(properties());
provisioned = true;
return result;
}
AutoValue_KafkaSender
final class AutoValue_KafkaSender extends $AutoValue_KafkaSender {
private volatile KafkaProducer<byte[], byte[]> get;
AutoValue_KafkaSender(Encoding encoding$, int messageMaxBytes$, BytesMessageEncoder encoder$,
String topic$, Properties properties$) {
super(encoding$, messageMaxBytes$, encoder$, topic$, properties$);
}
@Override
KafkaProducer<byte[], byte[]> get() {
if (get == null) {
synchronized (this) {
if (get == null) {
get = super.get();
if (get == null) {
throw new NullPointerException("get() cannot return null");
}
}
}
}
return get;
}
}
KafkaCollector
我們再來看下Zipkin中的KafkaCollector,我們開啟zipkin-server的原始碼,在目錄resources/zipkin-server-shared.yml檔案中,發現關於kafka的配置片段
而我們在本文前面使用–KAFKA_ZOOKEEPER啟動了zipkin,將kafka的zookeeper引數傳遞給了KafkaServer的main方法,也就是說,我們制定了zipkin.collector.kafka.zookeeper的值為localhost:2181
java -jar zipkin-server-2.2.1-exec.jar --KAFKA_ZOOKEEPER=localhost:2181
zipkin-server-shared.yml
zipkin:
collector:
kafka:
# ZooKeeper host string, comma-separated host:port value.
zookeeper: ${KAFKA_ZOOKEEPER:}
# Name of topic to poll for spans
topic: ${KAFKA_TOPIC:zipkin}
# Consumer group this process is consuming on behalf of.
group-id: ${KAFKA_GROUP_ID:zipkin}
# Count of consumer threads consuming the topic
streams: ${KAFKA_STREAMS:1}
# Maximum size of a message containing spans in bytes
max-message-size: ${KAFKA_MAX_MESSAGE_SIZE:1048576}
在pom.xml中,有如下依賴
<!-- Kafka Collector -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-autoconfigure-collector-kafka</artifactId>
<optional>true</optional>
</dependency>
ZipkinKafkaCollectorAutoConfiguration
我們找到zipkin-autoconfigure/collector-kafka的ZipkinKafkaCollectorAutoConfiguration類,使用了@Conditional註解,當KafkaZooKeeperSetCondition條件滿足時,ZipkinKafkaCollectorAutoConfiguration類會被SpringBoot載入。當載入時,會配置KafkaCollector到spring容器中。
@Configuration
@EnableConfigurationProperties(ZipkinKafkaCollectorProperties.class)
@Conditional(KafkaZooKeeperSetCondition.class)
public class ZipkinKafkaCollectorAutoConfiguration {
/**
* This launches a thread to run start. This prevents a several second hang, or worse crash if
* zookeeper isn't running, yet.
*/
@Bean KafkaCollector kafka(ZipkinKafkaCollectorProperties kafka, CollectorSampler sampler,
CollectorMetrics metrics, StorageComponent storage) {
final KafkaCollector result =
kafka.toBuilder().sampler(sampler).metrics(metrics).storage(storage).build();
// don't use @Bean(initMethod = "start") as it can crash the process if zookeeper is down
Thread start = new Thread("start " + result.getClass().getSimpleName()) {
@Override public void run() {
result.start();
}
};
start.setDaemon(true);
start.start();
return result;
}
}
KafkaZooKeeperSetCondition
KafkaZooKeeperSetCondition繼承了SpringBootCondition,實現了getMatchOutcome方法,當上下文的環境變數中有配置zipkin.collector.kafka.zookeeper的時候,則條件滿足,即ZipkinKafkaCollectorAutoConfiguration會被載入
final class KafkaZooKeeperSetCondition extends SpringBootCondition {
static final String PROPERTY_NAME = "zipkin.collector.kafka.zookeeper";
@Override
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) {
String kafkaZookeeper = context.getEnvironment().getProperty(PROPERTY_NAME);
return kafkaZookeeper == null || kafkaZookeeper.isEmpty() ?
ConditionOutcome.noMatch(PROPERTY_NAME + " isn't set") :
ConditionOutcome.match();
}
}
在ZipkinKafkaCollectorAutoConfiguration中,啟動了一個守護執行緒來執行KafkaCollector的start方法,避免zookeeper連不上,阻塞zipkin的啟動過程。
public final class KafkaCollector implements CollectorComponent {
final LazyConnector connector;
final LazyStreams streams;
KafkaCollector(Builder builder) {
connector = new LazyConnector(builder);
streams = new LazyStreams(builder, connector);
}
@Override public KafkaCollector start() {
connector.get();
streams.get();
return this;
}
}
KafkaCollector中初始化了兩個物件,LazyConnector,和LazyStreams,在start方法中呼叫了2個物件的get方法
LazyConnector
LazyConnector繼承了Lazy,當get方法被呼叫的時候,compute方法會被呼叫
static final class LazyConnector extends LazyCloseable<ZookeeperConsumerConnector> {
final ConsumerConfig config;
LazyConnector(Builder builder) {
this.config = new ConsumerConfig(builder.properties);
}
@Override protected ZookeeperConsumerConnector compute() {
return (ZookeeperConsumerConnector) createJavaConsumerConnector(config);
}
@Override
public void close() {
ZookeeperConsumerConnector maybeNull = maybeNull();
if (maybeNull != null) maybeNull.shutdown();
}
}
Lazy的get方法中,使用了典型的懶漢式單例模式,並使用了double-check,方式多執行緒構造多個例項,而真正構造物件是委派給compute方法
public abstract class Lazy<T> {
volatile T instance = null;
/** Remembers the result, if the operation completed unexceptionally. */
protected abstract T compute();
/** Returns the same value, computing as necessary */
public final T get() {
T result = instance;
if (result == null) {
synchronized (this) {
result = instance;
if (result == null) {
instance = result = tryCompute();
}
}
}
return result;
}
/**
* This is called in a synchronized block when the value to memorize hasn't yet been computed.
*
* <p>Extracted only for LazyCloseable, hence package protection.
*/
T tryCompute() {
return compute();
}
}
在LazyConnector的compute方法中根據ConsumerConfig構造出了ZookeeperConsumerConnector,這個是kafka 0.8版本一種重要的物件,基於zookeeper的ConsumerConnector。
LazyStreams
在LazyStreams的compute中,新建了一個執行緒池,執行緒池大小可以由引數streams(即zipkin.collector.kafka.streams)來指定,預設為一個執行緒的執行緒池。
然後通過topicCountMap設定zipkin的kafka消費使用的執行緒數,再使用ZookeeperConsumerConnector的createMessageStreams方法來建立KafkaStream,然後使用執行緒池執行KafkaStreamProcessor。
static final class LazyStreams extends LazyCloseable<ExecutorService> {
final int streams;
final String topic;
final Collector collector;
final CollectorMetrics metrics;
final LazyCloseable<ZookeeperConsumerConnector> connector;
final AtomicReference<CheckResult> failure = new AtomicReference<>();
LazyStreams(Builder builder, LazyCloseable<ZookeeperConsumerConnector> connector) {
this.streams = builder.streams;
this.topic = builder.topic;
this.collector = builder.delegate.build();
this.metrics = builder.metrics;
this.connector = connector;
}
@Override protected ExecutorService compute() {
ExecutorService pool = streams == 1
? Executors.newSingleThreadExecutor()
: Executors.newFixedThreadPool(streams);
Map<String, Integer> topicCountMap = new LinkedHashMap<>(1);
topicCountMap.put(topic, streams);
for (KafkaStream<byte[], byte[]> stream : connector.get().createMessageStreams(topicCountMap)
.get(topic)) {
pool.execute(guardFailures(new KafkaStreamProcessor(stream, collector, metrics)));
}
return pool;
}
Runnable guardFailures(final Runnable delegate) {
return () -> {
try {
delegate.run();
} catch (RuntimeException e) {
failure.set(CheckResult.failed(e));
}
};
}
@Override
public void close() {
ExecutorService maybeNull = maybeNull();
if (maybeNull != null) maybeNull.shutdown();
}
}
KafkaStreamProcessor
在KafkaStreamProcessor的run方法中,迭代stream物件,取出獲得的流資料,然後呼叫Collector的acceptSpans方法,即使用storage元件來接收並存儲span資料。
final class KafkaStreamProcessor implements Runnable {
final KafkaStream<byte[], byte[]> stream;
final Collector collector;
final CollectorMetrics metrics;
KafkaStreamProcessor(
KafkaStream<byte[], byte[]> stream, Collector collector, CollectorMetrics metrics) {
this.stream = stream;
this.collector = collector;
this.metrics = metrics;
}
@Override
public void run() {
ConsumerIterator<byte[], byte[]> messages = stream.iterator();
while (messages.hasNext()) {
byte[] bytes = messages.next().message();
metrics.incrementMessages();
if (bytes.length == 0) {
metrics.incrementMessagesDropped();
continue;
}
// If we received legacy single-span encoding, decode it into a singleton list
if (bytes[0] <= 16 && bytes[0] != 12 /* thrift, but not a list */) {
try {
metrics.incrementBytes(bytes.length);
Span span = SpanDecoder.THRIFT_DECODER.readSpan(bytes);
collector.accept(Collections.singletonList(span), NOOP);
} catch (RuntimeException e) {
metrics.incrementMessagesDropped();
}
} else {
collector.acceptSpans(bytes, DETECTING_DECODER, NOOP);
}
}
}
}
這裡的kafka消費方式還是kafka0.8版本的,如果你想用kafka0.10+的版本,可以更改zipkin-server的pom,將collector-kafka10加入到依賴中,其原理跟kafka0.8的差不多,此處不再展開分析了。
<!-- Kafka10 Collector -->
<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-autoconfigure-collector-kafka10</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-collector-kafka10</artifactId>
</dependency>
在生產環境中,我們可以將zipkin的日誌收集器改為kafka來提高系統的吞吐量,而且也可以讓客戶端和zipkin服務端解耦,客戶端將不依賴zipkin服務端,只依賴kafka叢集。
當然我們也可以將zipkin的collector替換為RabbitMQ來提高日誌收集的效率,zipkin對scribe也作了支援,這裡就不展開篇幅細說了。