1. 程式人生 > >Spring 整合Kafka(完整版)

Spring 整合Kafka(完整版)

前面的文章我們已經完成了Kafka基於Zookeeper的叢集的搭建了。Kafka叢集搭建請點我。記過幾天的研究已經實現Spring的集成了。本文重點

jar包準備

  • 整合是基於spring-integration-kafka完成的。我這裡用的專案是maven。該jar包在maven的位置
<dependency>    
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId
>
<version>1.3.0.RELEASE</version> </dependency>

友情提醒:自己在網上看的教程多引入了kafka_2.10jar包。我的專案報錯。建議搭建指引入和kafka相關的上面那個jar包

配置生產者(spring-kafka-producer.xml)

  • 有了jar包我們只需要在spring的配置檔案中配置就行了。這裡我單獨將生產者和消費者進行抽離配置

  • 首先我們配置生產訊息的頻道(工具類),這個頻道基於queue。最後我們在訊息傳送也是通過該類實現傳送訊息的

<int:channel id="kafkaProducerChannel"
> <int:queue /> </int:channel>
  • 有了頻道我們需要將頻道和訊息分類結合起來 , outbound-channel-adapter 。顧名思義傳送+頻道+分類。該類就是設定這三個的聯絡的。這裡我們主要看的是kafka-producer-context-ref。他是生產者訊息的來源地
<int-kafka:outbound-channel-adapter
        id="kafkaOutboundChannelAdapterTopic" kafka-producer-context-ref="producerContextTopic"
auto-startup="true" channel="kafkaProducerChannel" order="3"> <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="1" task-executor="taskExecutor" /> </int-kafka:outbound-channel-adapter>
  • 生產者的類別設定。及訊息的編碼序列化等操作都是該類設定的
    首先就是這裡的topic。每個topic對應一個類。topic中的broker-list是kafka服務(叢集)。key-serializer和key-encoder分別設定序列化和編碼。兩者只需要設定一個就行。value-class-type是訊息的型別。value-serializer和value-encoder和key是一樣的解釋
<int-kafka:producer-context id="producerContextTopic"
        producer-properties="producerProperties">
        <int-kafka:producer-configurations>
            <!-- 多個topic配置  broker-list kafaka服務
            key_serializer  value-serializer 就是用了自己的編碼格式
            value-class-type 指定傳送訊息的型別-->
            <int-kafka:producer-configuration
                broker-list="192.168.1.130:9091" key-serializer="stringSerializer"
                value-class-type="java.lang.Object" value-serializer="stringSerializer"
                topic="testTopic" />
            <int-kafka:producer-configuration
                broker-list="192.168.1.130:9091" key-serializer="stringSerializer"
                value-class-type="java.lang.Object" value-serializer="stringSerializer"
                topic="myTopic" />
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>
  • 上面消費者設定的序列化我們需要單獨設定一下。我們可以採用spring-integration-kafka提供的序列化類。但是用了那個序列只能傳遞字串。我們可以從定義該類實現傳遞物件(包括字串)
    這裡寫圖片描述
<bean id="stringSerializer" class="com.bshinfo.web.base.kafka.producer.MySerializer" />
  • 完整配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <!-- 生產者生產資訊是鍵值對內容的格式。預設是 org.apache.kafka.common.serialization.StringSerializer
    這裡我們重寫方法。便於方法傳遞物件  具體看MySerializer-->
    <bean id="stringSerializer" class="com.bshinfo.web.base.kafka.producer.MySerializer" />
    <!-- 這裡的Encoder在下面沒有用到 刪掉也可以  Encoder和Serializer只用設定一個就行了。
    consumer.xml中的配置也是一樣 -->
    <!-- <bean id="kafkaEncoder"
        class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
        <constructor-arg value="com.kafka.demo.util.ObjectEncoder" />
    </bean> -->
    <!-- 生產者一些配置屬性。不配置按預設執行 -->
    <bean id="producerProperties"
        class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="topic.metadata.refresh.interval.ms">3600000</prop>
                <prop key="message.send.max.retries">5</prop>
                <!-- <prop key="serializer.class">com.kafka.demo.util.ObjectEncoder</prop> -->
                <prop key="request.required.acks">1</prop>
            </props>
        </property>
    </bean>

    <!-- 生產者通過這個頻道傳送訊息  基於queue-->
    <int:channel id="kafkaProducerChannel">
        <int:queue />
    </int:channel>

    <!-- 生產者傳送訊息設定  頻道+方法配置 -->
    <int-kafka:outbound-channel-adapter
        id="kafkaOutboundChannelAdapterTopic" kafka-producer-context-ref="producerContextTopic"
        auto-startup="true" channel="kafkaProducerChannel" order="3">
        <int:poller fixed-delay="1000" time-unit="MILLISECONDS"
            receive-timeout="1" task-executor="taskExecutor" />
    </int-kafka:outbound-channel-adapter>
    <task:executor id="taskExecutor" pool-size="5"
        keep-alive="120" queue-capacity="500" />

    <!-- 訊息傳送的主題設定。必須設定了主題才能傳送相應主題訊息 -->
    <int-kafka:producer-context id="producerContextTopic"
        producer-properties="producerProperties">
        <int-kafka:producer-configurations>
            <!-- 多個topic配置  broker-list kafaka服務
            key_serializer  value-serializer 就是用了自己的編碼格式
            value-class-type 指定傳送訊息的型別-->
            <int-kafka:producer-configuration
                broker-list="192.168.1.130:9091" key-serializer="stringSerializer"
                value-class-type="java.lang.Object" value-serializer="stringSerializer"
                topic="testTopic" />
            <int-kafka:producer-configuration
                broker-list="192.168.1.130:9091" key-serializer="stringSerializer"
                value-class-type="java.lang.Object" value-serializer="stringSerializer"
                topic="myTopic" />
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>
</beans>
  • 最後我們在生產訊息的地方注入我們配置檔案中的頻道就可以傳送訊息了
    這裡寫圖片描述
    這裡寫圖片描述

消費者配置(spring-kafka-consumer.xml)

  • 上面的配置就可以實現訊息的傳送了。我們專案中會繼續配置接收訊息(消費者)。配置和生產者的配置一樣。這裡就不詳細的解釋了。程式碼裡解釋的很詳細了。只不過裡面多了配置Zookeeper的叢集資訊。還有一點因為在生產者我配置的序列化。所以這裡為了配置全面這裡採用配置的編碼了

    • 完整配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka 
                        http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
                        http://www.springframework.org/schema/integration 
                        http://www.springframework.org/schema/integration/spring-integration.xsd
                        http://www.springframework.org/schema/beans 
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/task 
                        http://www.springframework.org/schema/task/spring-task.xsd">

    <!-- 接收的頻道 也可以理解為接收的工具類 -->
    <int:channel id="inputFromKafka">
        <int:dispatcher task-executor="kafkaMessageExecutor" />
    </int:channel>
    <!-- zookeeper配置 可以配置多個 -->
    <int-kafka:zookeeper-connect id="zookeeperConnect"
        zk-connect="192.168.1.130:2181,192.168.1.130:2182,192.168.1.130:2183" zk-connection-timeout="6000"
        zk-session-timeout="6000" zk-sync-time="2000" />
    <!-- channel配置 auto-startup="true" 否則接收不發資料 -->
    <int-kafka:inbound-channel-adapter
        id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
        auto-startup="true" channel="inputFromKafka">
        <int:poller fixed-delay="1" time-unit="MILLISECONDS" />
    </int-kafka:inbound-channel-adapter>
    <task:executor id="kafkaMessageExecutor" pool-size="8" keep-alive="120" queue-capacity="500" />
    <!-- <bean id="kafkaDecoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder" /> -->

    <bean id="kafkaDecoder" class="com.bshinfo.web.base.kafka.consumer.MyDecoder" />
    <bean id="consumerProperties"
        class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="auto.offset.reset">smallest</prop>
                <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
                <prop key="fetch.message.max.bytes">5242880</prop>
                <prop key="auto.commit.interval.ms">1000</prop>
            </props>
        </property>
    </bean>
    <!-- 訊息接收的BEEN -->
    <bean id="kafkaConsumerService" class="com.bshinfo.web.base.kafka.consumer.ConsumerMessages" />
    <!-- 指定接收的方法 -->
    <int:outbound-channel-adapter channel="inputFromKafka"
        ref="kafkaConsumerService" method="processMessage" />

    <int-kafka:consumer-context id="consumerContext"
        consumer-timeout="1000" zookeeper-connect="zookeeperConnect"
        consumer-properties="consumerProperties">
        <int-kafka:consumer-configurations>
            <int-kafka:consumer-configuration
                group-id="default1" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder"
                max-messages="5000">
                <!-- 兩個TOPIC配置 -->
                <int-kafka:topic id="myTopic" streams="4" />
                <int-kafka:topic id="testTopic" streams="4" />
            </int-kafka:consumer-configuration>
        </int-kafka:consumer-configurations>
    </int-kafka:consumer-context>
</beans>
  • 配置中消費者實現類
package com.bshinfo.web.base.kafka.consumer;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import net.sf.json.JSONArray;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ConsumerMessages
{

    private static final Logger logger = LoggerFactory.getLogger(ConsumerMessages.class);

    public void processMessage(Map<String, Map<Integer, Object>> msgs) 
    {
        logger.info("================================processMessage===============");
        for (Map.Entry<String, Map<Integer, Object>> entry : msgs.entrySet()) 
        {
            logger.info("============Topic:" + entry.getKey());
            System.err.println("============Topic:" + entry.getKey());
            Map<Integer, Object> messages = entry.getValue();
            Set<Integer> keys = messages.keySet();
            for (Integer i : keys)
            {
                 logger.info("======Partition:" + i);
                 System.err.println("======Partition:" + i);
            }
            Collection<Object> values = messages.values();
            for (Iterator<Object> iterator = values.iterator(); iterator.hasNext();) 
            {
                Object object = iterator.next();
                String message = "["+object.toString()+"]";
                logger.info("=====message:" + message);
                System.err.println("=====message:" + message);
                JSONArray jsonArray = JSONArray.fromObject(object);
                for (int i=0;i<jsonArray.size();i++)
                {
                    Object object2 = jsonArray.get(i);
                    System.out.println(object2.toString());
                    /*JSONObject object2 = (JSONObject) jsonArray.get(i);
                    UserInfo userInfo = (UserInfo) JSONObject.toBean(object2,UserInfo.class);
                    System.out.println(userInfo.getRealName()+"@@@"+userInfo.getUserSex());*/
                }

            }
        }
    }
}
  • 消費者中轉碼的工具類

這裡寫圖片描述

原始碼下載