1. 程式人生 > >SpringBoot+ActiveMq實現點對點(Queue)訊息佇列

SpringBoot+ActiveMq實現點對點(Queue)訊息佇列

上篇博文主要分析了三種不同的請求方式,其中提到了基於訊息佇列的請求,當然只是從理論的角度去進行了分析,本篇博文就再次結合具體實現來說說訊息佇列。

一、什麼是訊息佇列?

作為中介軟體,訊息佇列是分散式應用間交換資訊的重要元件。訊息佇列可駐留在記憶體或磁碟上, 佇列可以儲存訊息直到它們被應用程式讀走。通過訊息佇列,應用程式可以在不知道彼此位置的情況下獨立處理訊息,或者在處理訊息前不需要等待接收此訊息。所以訊息佇列可以解決應用解耦、非同步訊息、流量削鋒等問題,是實現高效能、高可用、可伸縮和最終一致性架構中不可以或缺的一環。

簡單的來說,訊息佇列就是獨立於客戶端與服務端,將訊息(請求)以佇列的形式儲存起來,等待服務端進行讀取。

二、訊息佇列的原理分析

如下圖所示,使用者1、2、3同時向服務端系統傳送請求,三個請求會先被分配到佇列中儲存起來,服務端會監聽佇列中的訊息,一旦系統空閒,並且監聽到佇列中有訊息,系統就會從佇列中取出訊息,並進行處理。如此設計,系統可以按照自己的節奏去處理請求,從而減輕服務端的壓力,保證業務處理的流暢;即使系統由於某些原因停止執行,由於未處理的請求仍儲存在佇列中,這些請求也不會丟失。

三、訊息佇列的架構

訊息佇列主要分為點對點(Queue)模式和訂閱(Topic)模式兩種,點對點模式的整體流程如下圖所示:

主要角色有生產者、訊息、佇列、消費者

生產者將生產出來的訊息塞入到佇列中,消費者從佇列中取出訊息並消費,被消費完的訊息不會存在在佇列中,一條訊息只會被一個消費者消費一次;

訂閱模式與點對點模式整體流程相似,不同在於,由於是訂閱關係,生產者生產出來的訊息,會被所有的消費者接收到;(由於本文以主要分析點對點模式,這裡僅大致介紹一下訂閱模式,之後會對訂閱模式進行補充)

常用的訊息中介軟體有:ActivieMq、RabbitMq、RocketMq和kafka這幾種,本文是以ActivieMq作為訊息中介軟體的;

四、具體實現

1、安裝ActiveMq

本文使用的是5.15.5版本的,下載地址是http://activemq.apache.org/activemq-5155-release.html

下載解壓之後,進入到bin目錄下,如果是windows版本,可以直接點選activemq.bat啟動;如果是linux版本,則執行./activemq start;

2、在專案中匯入相應jar包

在pom中新增maven:

<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-all</artifactId>
	<version>5.15.5</version>
</dependency>

3、建立生產者:

Message(訊息)有TextMessage(文字訊息)、MapMessage(鍵值對訊息)、StreamMessage(流訊息)、BytesMessage(位元組訊息)和ObjectMessage(物件訊息),由於MapMessage是一種結構比較靈活的訊息型別,因此本文以此為例;

/**
 *
 * @author yuyan
 * @create 2018-08-28 16:09
 **/
@Service
public class Producer {

    public void sendMessage(String msg){
        try {
            //建立連線工廠,三個引數分別是使用者名稱、密碼以及訊息佇列所在地址
            ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,
                    "tcp://localhost:61616");

            //連線到JMS提供者
            Connection conn = connFactory.createConnection();
            //開啟連線
            conn.start();

            //事務性會話,自動確認訊息
            Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //訊息的目的地,建立佇列"queue"
            Destination destination = session.createQueue("queue");
            //訊息生產者
            MessageProducer producer = session.createProducer(destination);

//           //文字訊息
//          TextMessage textMessage = session.createTextMessage("這是文字訊息");
//          producer.send(textMessage);

            //鍵值對訊息
            MapMessage mapMessage = session.createMapMessage();
            //將訊息內容放入到訊息裡
            mapMessage.setString("reqDesc", msg);
            //生產者傳送訊息
            producer.send(mapMessage);
//
//            //流訊息
//            StreamMessage streamMessage = session.createStreamMessage();
//            streamMessage.writeString("這是流訊息");
//            producer.send(streamMessage);
//
//            //位元組訊息
//            String s = "BytesMessage位元組訊息";
//            BytesMessage bytesMessage = session.createBytesMessage();
//            bytesMessage.writeBytes(s.getBytes());
//            producer.send(bytesMessage);
//
//            //物件訊息
//            User user = new User("obj_info", "物件訊息"); //User物件必須實現Serializable介面
//            ObjectMessage objectMessage = session.createObjectMessage();
//            objectMessage.setObject(user);
//            producer.send(objectMessage);


            session.commit(); //提交會話,該條訊息會進入"queue"佇列,生產者也完成了歷史使命
            producer.close();
            session.close();
            conn.close();

        }catch (Exception e){
            e.printStackTrace();

        }

    }

}

4、建立消費者:

消費者監聽訊息的方式有兩種,一種是通過Api提供的監聽器去實現監聽;另一種是通過迴圈的方式去主動接收訊息。這裡將消費者設計成一個元件,在伺服器啟動時,消費者就會被建立,並監聽佇列。

/**
 *
 * @author yuyan
 * @create 2018-08-28 18:39
 **/
@Component
public class Comsumer implements ApplicationRunner{

    @Override
    public void run(ApplicationArguments args) throws Exception {
        init();
    }
     
    public  void init() throws JMSException {
        //前期的初始化工作與生產者相同
        ConnectionFactory factory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://localhost:61616"
        );

        Connection conn = factory.createConnection();
        conn.start();

        Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        //與生產者的訊息目的地相同
        Destination dest = session.createQueue("queue");

        MessageConsumer messConsumer = session.createConsumer(dest);
        //方式1設定訊息監聽
        messConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    MapMessage m = (MapMessage)message;
                    System.out.println("consumer接收到"+m.getString("reqDesc")+"的請求並開始處理,時間是"+new Date());
                    System.out.println("這裡會停頓5s,模擬系統處理請求,時間是"+new Date());
                    Thread.sleep(5000);
                    System.out.println("consumer接收到"+m.getString("reqDesc")+"的請求並處理完畢,時間是"+new Date());
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        });
      //方式2主動接收訊息
//        while(true){
//            try {
//                MapMessage m = (MapMessage) messConsumer.receive();
//              
//                Thread.sleep(1000);
//                System.out.println(m.getString("reqDesc"));
//            }catch (Exception e){
//                e.printStackTrace();
//            }
//
//        }

//        if(conn != null)conn.close();
    }


}

5、寫一個介面並進行測試

/**
 *
 * @author yuyan
 * @create 2018-08-28 16:32
 **/
@Controller
@RequestMapping(value = "MessageCenter")
public class MessageController {

    @Autowired
    //建立一個生產者,消費者在系統執行時已經建立
    Producer producer;

    @RequestMapping(value = "/SendMessageByQueue", method = RequestMethod.GET)
    @ResponseBody
    public void send(String msg) {
        try {
            System.out.println(msg+"開始發出一次請求,時間是"+new Date());
            producer.sendMessage(msg);
            System.out.println(msg+"請求傳送完成,時間是"+new Date());


        }catch (Exception e){
            e.printStackTrace();
        }
    }


}

測試結果如下:

通過觀察兩個紅框可以發現:1、2兩個請求是幾乎同時發出的,使用者2先進入佇列,隨後1進入,通過觀察兩個綠框可以得知,由於2先進入佇列,2先執行,5s執行完後,1才開始執行;

我們也可以通過http://localhost:8161/admin/queues.jsp,也就是ActiveMq的管理介面去觀察變化:

Name:佇列的名稱

Number Of Pending Messages:佇列中還未被處理的訊息數量

Numer Of Consumers:消費者的數量

Messages Enququed:入佇列的訊息數量(包括已出隊的)

Messages Deququed:出佇列的訊息數量

這是佇列的初始狀態:

當兩個請求入隊後:

此時佇列中有兩條入隊(未被處理)的訊息,

當一個請求並服務端(消費者)取走後:

此時佇列中還未被處理的訊息數量為1,出佇列的訊息數量為1;

 

當另一個請求也被服務端取走後:

佇列中再無其他訊息,兩條訊息均已出列;

那麼,一個訊息佇列的流程就已經全部走完了。

五、總結

訊息佇列並不能提高系統的執行速度(如果想提高速度,還是需要用到多執行緒等方式),訊息佇列作為中介軟體的作用是降低應用間的耦合,在高併發、高流量的情況下保證服務端的穩定,保證業務流程的順暢和資料的完整(請求不丟失)。