activemq釋出訂閱
阿新 • • 發佈:2018-12-18
一:加入jar
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
二:建立廣播模式生產者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TopicProducer { public static void main(String[] args) throws JMSException { //1.建立連線工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); //2.獲取連線 Connection connection = connectionFactory.createConnection(); //3.啟動連線 connection.start(); //4.獲取session (引數1:是否啟動事務,引數2:訊息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立主題物件 Topic topic = session.createTopic("test-topic"); //6.建立訊息生產者 MessageProducer producer = session.createProducer(topic); //7.建立訊息 TextMessage textMessage = session.createTextMessage("歡迎來到activemq世界"); //8.傳送訊息 producer.send(textMessage); //9.關閉資源 producer.close(); session.close(); connection.close(); } }
三:建立廣播模式消費者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; public class TopicConsumer { public static void main(String[] args) throws JMSException, IOException { //1.建立連線工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); //2.獲取連線 Connection connection = connectionFactory.createConnection(); //3.啟動連線 connection.start(); //4.獲取session (引數1:是否啟動事務,引數2:訊息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立主題物件 Topic topic = session.createTopic("test-topic"); //6.建立訊息消費 MessageConsumer consumer = session.createConsumer(topic); //7.監聽訊息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到訊息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //8.等待鍵盤輸入,等待監聽程式 System.in.read(); //9.關閉資源 consumer.close(); session.close(); connection.close(); } }