學習筆記--Java消息中間件
#### 消息中間件
消息中間件:關註於數據的發送和接受,利用高效可靠的異步消息傳遞機制集成分布式系統
JMS:Java消息服務,Java平臺中關於面向消息中間件的API
AMQP:提供統一消息服務的應用層標準協議
常見消息中間件
ActiveMQ
RabbitMQ
Kafka
JMS規範
提供者:實現JMS規範的消息中間件服務器
客戶端:發送或接受消息的應用程序
生產者/發布者:創建並發送消息的客戶端
消費者/訂閱者:接收並處理消息的客戶端
消息:應用程序之間傳遞的數據內容
消息模式:在客戶端之間傳遞消息的方式,JMS中定義了主題和隊列兩種模式
JMS消息模式
隊列模型:
- 客戶端包括生產者和消費者
- 消息只能被一個消費者消費
- 隨時消費
主題模型:
- 客戶端包括發布者和訂閱者
- 消息能被所有訂閱者消費
- 消費者不能消費訂閱之前就發送到主題中的消息
JMS編碼接口:
- ConnectionFactory:用於創建連接到消息中間件的連接工廠
- Connection:代表了應用程序和消息服務器之間的通信鏈路
- Destination:消息發布和接收的地點,包括隊列和主題
- Session:表示一個單線程的上下文,用於發送和接收消息
- MessageConsumer:由會話創建,用於接收發送到目標的消息
- MessageProducer:由會話創建,用於發送消息到目標
- Message:在消費者和生產者之間傳送的對象,包括消息頭,一組消息屬性,一個消息體
使用ActiveMQ
隊列模型
producer
//1. 創建ConnectionFactory ConnectionFactory factory = new ActiveMQConnectionFactory(url); //2. 創建Connection Connection connection = factory.createConnection(); //3. 啟動Connection connection.start(); //4. 創建Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5. 創建Destination Destination destination = session.createQueue(queueName); //6. 創建MessageProducer MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { //7. 創建消息 TextMessage message = session.createTextMessage("test" + i); //8. 發布消息 producer.send(message); System.out.println("發送消息: " + message.getText()); } //9. 關閉連接 connection.close();
consumer
//1. 創建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2. 創建Connection
Connection connection = factory.createConnection();
//3. 啟動Connection
connection.start();
//4. 創建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. 創建Destination
Destination destination = session.createQueue(queueName);
//6. 創建MessageConsumer
MessageConsumer consumer = session.createConsumer(destination);
//7. 創建消息監聽器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收消息: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//9. 關閉連接(消息監聽異步執行,需程序全部運行結束才能關閉連接)
// connection.close();
主題模型
producer
//1. 創建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2. 創建Connection
Connection connection = factory.createConnection();
//3. 啟動Connection
connection.start();
//4. 創建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. 創建Destination
Destination destination = session.createTopic(topicName);
//6. 創建MessageProducer
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 100; i++) {
//7. 創建消息
TextMessage message = session.createTextMessage("test" + i);
//8. 發布消息
producer.send(message);
System.out.println("發送消息: " + message.getText());
}
//9. 關閉連接
connection.close();
consumer
//1. 創建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2. 創建Connection
Connection connection = factory.createConnection();
//3. 啟動Connection
connection.start();
//4. 創建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. 創建Destination
Destination destination = session.createTopic(topicName);
//6. 創建MessageConsumer
MessageConsumer consumer = session.createConsumer(destination);
//7. 創建消息監聽器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收消息: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//9. 關閉連接(消息監聽異步執行,需程序全部運行結束才能關閉連接)
// connection.close();
spring jms
- ConnectionFactory 用於管理連接的連接工廠
- 由spring提供
- SingleConnectionFactory和CachingConnectionFactory
- JmsTemplate 用於發送和接收消息的模板類
- 由spring提供,在容器中註冊就可以使用
- 線程安全
- MessageListener 消息監聽器
- 實現一個onMessage方法,只接收一個Message參數
spring使用jms示例
common.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:annotation-config />
<!-- ActiveMQ為我們提供的ConnectionFactory -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
<!-- spring jms為我們提供連接池 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 一個隊列目的地,點對點的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue" />
</bean>
<!-- 一個主題目的地,發布訂閱消息 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic"/>
</bean>
</beans>
producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<import resource="common.xml" />
<!-- 配置JmsTemplate,用於發送消息-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<bean class="com.qyluo.jms.spring.producer.ProducerServiceImpl" />
</beans>
cosumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 導入公共配置 -->
<import resource="common.xml" />
<!-- 配置消息監聽器 -->
<bean id="consumerMessageListener" class="com.qyluo.jms.spring.consumer.ConsumerMessageListener" />
<!-- 配置消息監聽容器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicDestination"/>
<property name="messageListener" ref="consumerMessageListener"/>
</bean>
</beans>
ProducerServiceImpl
public class ProducerServiceImpl implements ProducerService {
@Autowired
JmsTemplate jmsTemplate;
@Resource(name = "topicDestination")
Destination destination;
@Override
public void sendMessage(final String message) {
//使用JmsTemplate發送消息
jmsTemplate.send(destination, new MessageCreator() {
//創建一個消息
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(message);
return textMessage;
}
});
System.out.println("發送消息: " + message);
}
}
AppProducer
public class AppProducer {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
ProducerService service = context.getBean(ProducerService.class);
for (int i = 0; i < 100; i++) {
service.sendMessage("text" + i);
}
context.close();
}
}
ConsumerMessageListener
public class ConsumerMessageListener implements MessageListener { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收消息: " + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
AppConsumer
public class AppConsumer {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
}
}
ActiveMQ集群
集群方式
- 客戶端集群:多個消費者消費同一個隊列
- Broker clusters:多個Broker之間同步消息
- Master Slave:實現高可用
ActiveMQ失效轉移(failover)
允許當其中一臺消息服務器宕機時,客戶端在傳輸層上重新連接到其它消息服務器
語法:failover:(uri1,...,uriN)?transportOptions
transportOptions參數說明
- randomize 默認為true,表示在URI列表中選擇URI連接時是否采用隨機策略
- initialReconnectDelay 默認為10,單位毫秒,表示第一次嘗試重新連接之間等待的時間
- maxReconnectDelay 默認為30000,單位毫秒,最長重連的時間間隔
Broker Cluster集群配置
NetworkConnector(網絡連接器):ActiveMQ服務器之間的網絡通訊方式
分為靜態連接器和動態連接器
靜態連接器:
<networkConnectors>
<networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/>
</networkConnectors>
動態連接器:
<networkConnectors>
<networkConnector uri="multicast://default"/>
</networkConnectors>
<transportConnectors>
<transportConnector uri="tcp://localhost:0" discoverUri="multicast://default"/>
</transportConnectors>
Master/Slave集群配置
ActiveMQ Master Slave集群方案
- Share nothing storage master/slave (已過時,5.8+後移除)
- Shared storage master/slave 共享存儲
- Replicated LevelDB Store 基於復制的LevelDB Store
兩種集群方式對比
方式 | 高可用 | 負載均衡 |
--|----------|--------------|
Master/Slave | 是 | 否 |
Broker Cluster | 否 | 是 |
三臺服務器的完美集群方案
Node A和Node B做消息同步,Node A和Node C做消息同步,Node B和Node C做Master / Slave對資源進行持久化
服務器 | 服務端口 | 管理端口 | 存儲 | 網絡連接器 | 用途 |
---|---|---|---|---|---|
Node-A | 61616 | 8161 | - | Node-B、Node-C | 消費者 |
Node-B | 61617 | 8162 | /share_file/kahadb | Node-A | 生產者,消費者 |
Node-C | 61618 | 8163 | /share_file/kahadb | Node-A | 生產者,消費者 |
企業系統中的最佳實踐
實際業務場景特點
- 子業務系統都有集群的可能性
- 同一個消息會廣播給關註該類消息的所有子業務系統
- 同一類消息在集群中被負載消費
- 業務的發生和消息的發布最終一致性
使用ActiveMQ的虛擬主題解決方案
- 發布者:將消息發布到一個主題中,主題名以VirtualTopic開頭,如VirtualTopic.TEST
- 消費者:從隊列中獲取消息,在隊列名中表明自己身份,如Consumer.A.VirtualTopic.TEST
使用JMS中XA系列接口保證強一致性
- 引入分布式事務
- 要求業務操作必須支持XA協議
使用消息表的本地事務解決方案
使用內存日誌的解決方案
基於消息機制的事件總線
事件驅動架構
RabbitMQ
RabbitMQ:使用交換器綁定到隊列
- 創建ConnectionFactory
- 創建Connection
- 創建Channel
- 定義Exchange,類型I必須為fanout
- 定義Queue並且綁定隊列
Kafka
Kafka使用group.id分組消費者
- 配置消息者參數group.id相同時對消息進行負載處理
- 配置服務器partitions參數,控制同一個group.id下的consumer數量小於partitions
- kafka只保證同一個group.id下的消息是有序的
學習筆記--Java消息中間件