SpringBoot+ActiveMq實現點對點(Queue)訊息佇列
上篇博文主要分析了三種不同的請求方式,其中提到了基於訊息佇列的請求,當然只是從理論的角度去進行了分析,本篇博文就再次結合具體實現來說說訊息佇列。
一、什麼是訊息佇列?
作為中介軟體,訊息佇列是分散式應用間交換資訊的重要元件。訊息佇列可駐留在記憶體或磁碟上, 佇列可以儲存訊息直到它們被應用程式讀走。通過訊息佇列,應用程式可以在不知道彼此位置的情況下獨立處理訊息,或者在處理訊息前不需要等待接收此訊息。所以訊息佇列可以解決應用解耦、非同步訊息、流量削鋒等問題,是實現高效能、高可用、可伸縮和最終一致性架構中不可以或缺的一環。
簡單的來說,訊息佇列就是獨立於客戶端與服務端,將訊息(請求)以佇列的形式儲存起來,等待服務端進行讀取。
二、訊息佇列的原理分析
如下圖所示,使用者1、2、3同時向服務端系統傳送請求,三個請求會先被分配到佇列中儲存起來,服務端會監聽佇列中的訊息,一旦系統空閒,並且監聽到佇列中有訊息,系統就會從佇列中取出訊息,並進行處理。如此設計,系統可以按照自己的節奏去處理請求,從而減輕服務端的壓力,保證業務處理的流暢;即使系統由於某些原因停止執行,由於未處理的請求仍儲存在佇列中,這些請求也不會丟失。
三、訊息佇列的架構
訊息佇列主要分為點對點(Queue)模式和訂閱(Topic)模式兩種,點對點模式的整體流程如下圖所示:
主要角色有生產者、訊息、佇列、消費者
生產者將生產出來的訊息塞入到佇列中,消費者從佇列中取出訊息並消費,被消費完的訊息不會存在在佇列中,一條訊息只會被一個消費者消費一次;
訂閱模式與點對點模式整體流程相似,不同在於,由於是訂閱關係,生產者生產出來的訊息,會被所有的消費者接收到;(由於本文以主要分析點對點模式,這裡僅大致介紹一下訂閱模式,之後會對訂閱模式進行補充)
常用的訊息中介軟體有:ActivieMq、RabbitMq、RocketMq和kafka這幾種,本文是以ActivieMq作為訊息中介軟體的;
四、具體實現
1、安裝ActiveMq
本文使用的是5.15.5版本的,下載地址是http://activemq.apache.org/activemq-5155-release.html
下載解壓之後,進入到bin目錄下,如果是windows版本,可以直接點選activemq.bat啟動;如果是linux版本,則執行./activemq start;
2、在專案中匯入相應jar包
在pom中新增maven:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.5</version>
</dependency>
3、建立生產者:
Message(訊息)有TextMessage(文字訊息)、MapMessage(鍵值對訊息)、StreamMessage(流訊息)、BytesMessage(位元組訊息)和ObjectMessage(物件訊息),由於MapMessage是一種結構比較靈活的訊息型別,因此本文以此為例;
/**
*
* @author yuyan
* @create 2018-08-28 16:09
**/
@Service
public class Producer {
public void sendMessage(String msg){
try {
//建立連線工廠,三個引數分別是使用者名稱、密碼以及訊息佇列所在地址
ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
//連線到JMS提供者
Connection conn = connFactory.createConnection();
//開啟連線
conn.start();
//事務性會話,自動確認訊息
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//訊息的目的地,建立佇列"queue"
Destination destination = session.createQueue("queue");
//訊息生產者
MessageProducer producer = session.createProducer(destination);
// //文字訊息
// TextMessage textMessage = session.createTextMessage("這是文字訊息");
// producer.send(textMessage);
//鍵值對訊息
MapMessage mapMessage = session.createMapMessage();
//將訊息內容放入到訊息裡
mapMessage.setString("reqDesc", msg);
//生產者傳送訊息
producer.send(mapMessage);
//
// //流訊息
// StreamMessage streamMessage = session.createStreamMessage();
// streamMessage.writeString("這是流訊息");
// producer.send(streamMessage);
//
// //位元組訊息
// String s = "BytesMessage位元組訊息";
// BytesMessage bytesMessage = session.createBytesMessage();
// bytesMessage.writeBytes(s.getBytes());
// producer.send(bytesMessage);
//
// //物件訊息
// User user = new User("obj_info", "物件訊息"); //User物件必須實現Serializable介面
// ObjectMessage objectMessage = session.createObjectMessage();
// objectMessage.setObject(user);
// producer.send(objectMessage);
session.commit(); //提交會話,該條訊息會進入"queue"佇列,生產者也完成了歷史使命
producer.close();
session.close();
conn.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
4、建立消費者:
消費者監聽訊息的方式有兩種,一種是通過Api提供的監聽器去實現監聽;另一種是通過迴圈的方式去主動接收訊息。這裡將消費者設計成一個元件,在伺服器啟動時,消費者就會被建立,並監聽佇列。
/**
*
* @author yuyan
* @create 2018-08-28 18:39
**/
@Component
public class Comsumer implements ApplicationRunner{
@Override
public void run(ApplicationArguments args) throws Exception {
init();
}
public void init() throws JMSException {
//前期的初始化工作與生產者相同
ConnectionFactory factory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//與生產者的訊息目的地相同
Destination dest = session.createQueue("queue");
MessageConsumer messConsumer = session.createConsumer(dest);
//方式1設定訊息監聽
messConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
MapMessage m = (MapMessage)message;
System.out.println("consumer接收到"+m.getString("reqDesc")+"的請求並開始處理,時間是"+new Date());
System.out.println("這裡會停頓5s,模擬系統處理請求,時間是"+new Date());
Thread.sleep(5000);
System.out.println("consumer接收到"+m.getString("reqDesc")+"的請求並處理完畢,時間是"+new Date());
}catch (Exception e){
e.printStackTrace();
}
}
});
//方式2主動接收訊息
// while(true){
// try {
// MapMessage m = (MapMessage) messConsumer.receive();
//
// Thread.sleep(1000);
// System.out.println(m.getString("reqDesc"));
// }catch (Exception e){
// e.printStackTrace();
// }
//
// }
// if(conn != null)conn.close();
}
}
5、寫一個介面並進行測試
/**
*
* @author yuyan
* @create 2018-08-28 16:32
**/
@Controller
@RequestMapping(value = "MessageCenter")
public class MessageController {
@Autowired
//建立一個生產者,消費者在系統執行時已經建立
Producer producer;
@RequestMapping(value = "/SendMessageByQueue", method = RequestMethod.GET)
@ResponseBody
public void send(String msg) {
try {
System.out.println(msg+"開始發出一次請求,時間是"+new Date());
producer.sendMessage(msg);
System.out.println(msg+"請求傳送完成,時間是"+new Date());
}catch (Exception e){
e.printStackTrace();
}
}
}
測試結果如下:
通過觀察兩個紅框可以發現:1、2兩個請求是幾乎同時發出的,使用者2先進入佇列,隨後1進入,通過觀察兩個綠框可以得知,由於2先進入佇列,2先執行,5s執行完後,1才開始執行;
我們也可以通過http://localhost:8161/admin/queues.jsp,也就是ActiveMq的管理介面去觀察變化:
Name:佇列的名稱
Number Of Pending Messages:佇列中還未被處理的訊息數量
Numer Of Consumers:消費者的數量
Messages Enququed:入佇列的訊息數量(包括已出隊的)
Messages Deququed:出佇列的訊息數量
這是佇列的初始狀態:
當兩個請求入隊後:
此時佇列中有兩條入隊(未被處理)的訊息,
當一個請求並服務端(消費者)取走後:
此時佇列中還未被處理的訊息數量為1,出佇列的訊息數量為1;
當另一個請求也被服務端取走後:
佇列中再無其他訊息,兩條訊息均已出列;
那麼,一個訊息佇列的流程就已經全部走完了。
五、總結
訊息佇列並不能提高系統的執行速度(如果想提高速度,還是需要用到多執行緒等方式),訊息佇列作為中介軟體的作用是降低應用間的耦合,在高併發、高流量的情況下保證服務端的穩定,保證業務流程的順暢和資料的完整(請求不丟失)。