SpringBoot整合RabbitMQ解耦合二
阿新 • • 發佈:2018-11-19
如何讓工具類更加通用,與業務模組解耦合是架構師和產品研發人員一直研究的課題,不管MVC分層設計,SOA面向服務設計,還是為解決高併發的讀寫分類,前後臺分類,叢集計算都解決問題的統籌方法。如下實現Spring+RabbitMQ整合工具,使用Spring開發專案更加容易整合RabbitMQ,無論是傳送RPC同步呼叫還是非同步呼叫。
使用步驟:
一、準備Maven POM配置
二、準備SpringBoot Application.properties配置檔案
三、匯入介面IMsgProcess
package com.test.util; public interface IMsgProcess { public Boolean process(Object obj); }
四、匯入訊息監聽器
package com.test.util; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 接收RabbitMQ非同步訊息的監聽器 * 當監聽的佇列上有訊息,獲取訊息並儲存到資料庫,需要注意Service介面 * @author java * */ @Component public class RabbitmqListener { @Autowired private IMsgProcess serv; @Autowired private AmqpTemplate template; @RabbitListener(queues="test_rpc") public void receive(Message msg) { try { byte[] data = msg.getBody(); Object obj = toObject(data); boolean rtn = serv.process(obj); //判斷是否是同步訊息 if(msg.getMessageProperties() != null) { String replyTo = msg.getMessageProperties().getReplyTo(); if(replyTo != null && !"".equals(replyTo)) { String corrId = msg.getMessageProperties().getCorrelationIdString(); MessageProperties mprop = new MessageProperties(); mprop.setCorrelationIdString(corrId); mprop.setReplyTo(replyTo); System.out.println("訊息處理結果="+rtn+",回覆佇列replyTo="+replyTo+",關聯ID corrId="+corrId); byte[] body = (rtn+"").getBytes(); Message replyMsg = new Message(body,mprop); template.send("", replyTo, replyMsg); System.out.println("接收到同步訊息="+new String(data)); } else { System.out.println("接收到非同步訊息="+new String(data)); } } else { System.out.println("接收到非同步訊息="+new String(data)); } } catch(Exception e) { e.printStackTrace(); } } public Object toObject(byte[] data) { try { ByteArrayInputStream bais = new ByteArrayInputStream(data); ObjectInputStream ois = new ObjectInputStream(bais); Object obj = ois.readObject(); bais.close(); return obj; } catch(Exception e) { e.printStackTrace(); } return null; } }
五、匯入訊息傳送工具類
可以傳送非同步訊息,也可以傳送同步訊息
package com.test.util; import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; @Component public class SendMsg { @Value("${spring.rabbitmq.host}") private String host = "localhost"; @Value("${spring.rabbitmq.port}") private String port = "5672"; @Value("${spring.rabbitmq.username}") private String userName = "admin2"; @Value("${spring.rabbitmq.password}") private String pwd = "admin2"; public byte[] toBytes(Object obj) { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(obj); byte[] data = baos.toByteArray(); baos.close(); return data; } catch(Exception e) { e.printStackTrace(); } return null; } public void sendObject(Object obj,Integer waitSecond) { try { System.out.println("host="+host+",port="+port); //1.creeate ConnectionFactory ConnectionFactory cf = new ConnectionFactory(); cf.setHost(host); cf.setPort(Integer.parseInt(port)); cf.setUsername(userName); cf.setPassword(pwd); //2.create Conection Connection con = cf.newConnection(); //3.create Channel Channel channel = con.createChannel(); //4.create exchage String exgName = "test_rpc_exg"; channel.exchangeDeclare(exgName, "direct"); String queueName = "test_rpc"; String routeKey = "java.io.File"; boolean durable = true; boolean exclusive = false; boolean autoDelete = false; channel.queueDeclare(queueName, durable, exclusive, autoDelete, null); //5.bind exchange and queue channel.queueBind(queueName,exgName,routeKey); channel.basicPublish(exgName, routeKey, null,toBytes(obj)); channel.close(); con.close(); if(waitSecond == null) waitSecond = 1; Thread.sleep(waitSecond*1000); } catch(Exception e) { e.printStackTrace(); } } /** * 傳送同步訊息 * @param obj * @return */ public boolean sendSyncObject(Object obj) { try { System.out.println("host="+host+",port="+port); //1.creeate ConnectionFactory ConnectionFactory cf = new ConnectionFactory(); cf.setHost(host); cf.setPort(Integer.parseInt(port)); cf.setUsername(userName); cf.setPassword(pwd); //2.create Conection Connection con = cf.newConnection(); //3.create Channel Channel channel = con.createChannel(); //4.create exchage String exgName = "test_rpc_exg"; channel.exchangeDeclare(exgName, "direct"); String queueName = "test_rpc"; String routeKey = "java.io.File"; boolean durable = true; boolean exclusive = false; boolean autoDelete = false; channel.queueDeclare(queueName, durable, exclusive, autoDelete, null); String replyTo = channel.queueDeclare().getQueue(); final String corrId = java.util.UUID.randomUUID().toString(); //5.bind exchange and queue channel.queueBind(queueName,exgName,routeKey); //6.send msg AMQP.BasicProperties prop = new AMQP.BasicProperties.Builder() .correlationId(corrId).replyTo(replyTo).build(); channel.basicPublish(exgName, routeKey, prop,toBytes(obj)); final BlockingQueue<byte[]> response = new ArrayBlockingQueue<byte[]>(1); Consumer csum = new DefaultConsumer(channel){ @Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { String corrId2 = properties.getCorrelationId(); //if(corrId.equals(corrId2)) { response.offer(body); } } }; channel.basicConsume(replyTo,true,csum); //從JVM阻塞佇列中獲取回覆訊息,如果沒收到當前執行緒阻塞 byte[] result = response.take(); String str = new String(result); System.out.println("result="+str); //7.close Connection channel.close(); con.close(); if("true".equals(str)) return true; else return false; } catch(Exception e) { e.printStackTrace(); } return false; } }
六、實現訊息處理類
必須實現IMsgProcess介面