1. 程式人生 > >spring 整合kafka監聽消費

spring 整合kafka監聽消費

前言

最近專案裡有個需求,要消費kafka裡的資料。之前也手動寫過程式碼去消費kafka資料。但是轉念一想。既然spring提供了消費kafka的方法。就沒必要再去重複造輪子。於是嘗試使用spring的API。

專案技術背景,使用springMVC,XML配置和註解相互使用。kafka的配置都是使用XML方式。

整合過程

  1. 引入spring-kafka的依賴包

      <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.0.RELEASE</version>
        </dependency>

2. 在spring的xml檔案裡增加配置項,也可以單獨建立一個spring-context-XX.xml檔案。

 <!-- consumer configuration  該配置項可以根據自己業務的實際需求做增加或刪除-->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
                <entry key="group.id" value="group" />
                <entry key="enable.auto.commit" value="true" />
                <entry key="auto.commit.interval.ms" value="3000" />
                <entry key="session.timeout.ms" value="10000" />
                <entry key="key.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />
                <entry key="value.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />
            </map>
        </constructor-arg>
    </bean>

    <!-- create factory  該類是spring jar包裡提供,就這麼配置-->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties" />
        </constructor-arg>
    </bean>

    <!-- 自定義的消費類,需要實現spring的介面 -->
    <bean id="payPalConsumer"
          class="com.chao.service.consumer.PayPalConsumer" />

    <!-- 該類也是jar包裡提供的,注入的監聽類是自己定義的,topic名稱是配置檔案引入的-->
    <bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties">
        <constructor-arg name="topics" value="${kafka.paypal.topic.name}"/>
        <property name="messageListener" ref="payPalConsumer" />
    </bean>

    <!-- 改類也是jar裡提供的,把這個containerProperties和consumerfactory 注入 -->
    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
          init-method="doStart">
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
    </bean>

 2. 自定義消費者類,消費者類依然可以使用註解。

/**
 * get msg from kafka
 */
@Component 
public class PayPalConsumer implements MessageListener<String, String> {

    private static Logger logger = LoggerFactory.getLogger(PayPalConsumer.class);
    @Autowired
    private XXService XXService;
    @Override
    public void onMessage(ConsumerRecord<String, String> authorizeRecord) {
        String value = authorizeRecord.value();
        if (StringUtils.isEmpty(value)){
            logger.warn("receive message from kafka is null");
            return;
        }
        logger.info("receive message from kafka is {}",value);
    }
}

使用這個步驟配置,一次性過。非常順