1. 程式人生 > 實用技巧 >RabbitMQ與SpringBoot整合(個人技術部落格)

RabbitMQ與SpringBoot整合(個人技術部落格)

一、RabbitMQ的介紹

RabbitMQ是訊息中介軟體的一種,訊息中介軟體即分散式系統中完成訊息的傳送和接收的基礎軟體.這些軟體有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,現已經轉讓給apache).

訊息中介軟體的工作過程可以用生產者消費者模型來表示.即,生產者不斷的向訊息佇列傳送資訊,而消費者從訊息佇列中消費資訊.具體過程如下:

從上圖可看出,對於訊息佇列來說,生產者,訊息佇列,消費者是最重要的三個概念,生產者發訊息到訊息佇列中去,消費者監聽指定的訊息佇列,並且當訊息佇列收到訊息之後,接收訊息佇列傳來的訊息,並且給予相應的處理.訊息佇列常用於分散式系統之間互相資訊的傳遞.

對於RabbitMQ來說,除了這三個基本模組以外,還添加了一個模組,即交換機(Exchange).它使得生產者和訊息佇列之間產生了隔離,生產者將訊息傳送給交換機,而交換機則根據排程策略把相應的訊息轉發給對應的訊息佇列.

二、Direct模式

首先建立兩個maven工程,這是為了模擬分散式應用系統中,兩個應用之間互相交流的過程,一個傳送者(Sender),一個接收者(Receiver)

緊接著,配置pom.xml檔案,注意其中用到了springboot對於AMQP(高階訊息佇列協議,即面向訊息的中介軟體的設計)

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.0.RELEASE</version>
    </parent>
    <properties>
        <java.version>1.7</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
            <scope>true</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!-- 新增springboot對amqp的支援 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.tomcat.embed</groupId>
            <artifactId>tomcat-embed-jasper</artifactId>
            <scope>provided</scope>
        </dependency>
    </dependencies>

緊接著,我們編寫傳送者相關的程式碼.首先毫無疑問,要書寫啟動類:

@SpringBootApplication
public class App{
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}

接著在application.properties中,去編輯和RabbitMQ相關的配置資訊,配置資訊的代表什麼內容根據鍵就能很直觀的看出了.這裡埠是5672,不是15672...15672是管理端的埠!

spring.application.name=spirng-boot-rabbitmq-sender
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

隨後,配置Queue(訊息佇列).那注意由於採用的是Direct模式,需要在配置Queue的時候,指定一個鍵,使其和交換機繫結.

@Configuration
public class SenderConf {
     @Bean
     public Queue queue() {
          return new Queue("queue");
     }
}

接著就可以傳送訊息啦!在SpringBoot中,我們使用AmqpTemplate去傳送訊息!程式碼如下:

@Component
public class HelloSender {
    @Autowired
    private AmqpTemplate template;

    public void send() {
    template.convertAndSend("queue","hello,rabbit~");
    }
}

編寫測試類

@SpringBootTest(classes=App.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class TestRabbitMQ {

    @Autowired
    private HelloSender helloSender;

    @Test
    public void testRabbit() {
        helloSender.send();
    }
}

接著我們編寫接收端.接收端的pom檔案,application.properties(修改spring.application.name),Queue配置類,App啟動類都是一致的!這裡省略不計.主要在於我們需要配置監聽器去監聽繫結到的訊息佇列,當訊息佇列有訊息的時候,予以接收,程式碼如下:

@Component
public class HelloReceive {

    @RabbitListener(queues="queue")    //監聽器監聽指定的Queue
    public void processC(String str) {
        System.out.println("Receive:"+str);
    }

}

實際上RabbitMQ還可以支援傳送物件:當然由於涉及到序列化和反序列化,該物件要實現Serilizable介面.HelloSender做出如下改寫:

public void send() {

        User user=new User();    //實現Serializable介面
        user.setUsername("hlhdidi");
        user.setPassword("123");
        template.convertAndSend("queue",user);
}

HelloReceiver做出如下改寫:

@RabbitListener(queues="queue")    //監聽器監聽指定的Queue
    public void process1(User user) {    //用User作為引數
        System.out.println("Receive1:"+user);
    }