springboot+RabbitMq之topic和fanout
阿新 • • 發佈:2018-11-24
1.建立一個springboot專案,匯入如下maven座標
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
2application.properties 配置
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
3topic萬用字元交換器使用
3.1生產者
package com.czxy.controller; import com.czxy.config.TopicRabbitConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; //topic-exange交換機 ROUTING_KEY1 = "topic.message"; @GetMapping("/send1") public void send1() { String context = "hi, i am message 1"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend(TopicRabbitConfig.EXCHANGE_NAME, TopicRabbitConfig.ROUTING_KEY1, context); } //topic-exange交換機 ROUTING_KEYx = "topic.#"; @GetMapping("/send2") public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend(TopicRabbitConfig.EXCHANGE_NAME, TopicRabbitConfig.ROUTING_KEYx, context); } }
3.2 消費者
package com.czxy.controller; import com.czxy.config.FanoutRabbitConfig; import com.czxy.config.TopicRabbitConfig; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Consumer { @RabbitListener(queues = TopicRabbitConfig.QUEUE_NAME1) public void consumeMessage(String message) { System.out.println(message+"topic1"); } @RabbitListener(queues = TopicRabbitConfig.QUEUE_NAME2) public void consumeMessage1(String message) { System.out.println(message+"topic2"); } }
3.3 配置類
package com.czxy.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
public final static String QUEUE_NAME1 = "topic.message";
public final static String QUEUE_NAME2 = "topic.messages";
public final static String EXCHANGE_NAME = "TopicExchange";
public final static String ROUTING_KEY1 = "topic.message";
public final static String ROUTING_KEY2 = "topic.messages";
public final static String ROUTING_KEYx = "topic.#";
public final static String ROUTING_KEYy = "topic.*";
// 建立佇列
@Bean
public Queue queue1() {
return new Queue(TopicRabbitConfig.QUEUE_NAME1);
}
@Bean
public Queue queue2() {
return new Queue(TopicRabbitConfig.QUEUE_NAME2);
}
// 建立一個 topic 型別的交換器
@Bean
public TopicExchange exchange() {
return new TopicExchange(TopicRabbitConfig.EXCHANGE_NAME);
}
// 使用路由鍵(routingKey)把佇列(Queue)繫結到交換器(Exchange)
@Bean
public Binding binding1(Queue queue1, TopicExchange exchange) {
return BindingBuilder.bind(queue1).to(exchange).with(TopicRabbitConfig.ROUTING_KEY1);
}
@Bean
public Binding binding2(Queue queue2, TopicExchange exchange) {
return BindingBuilder.bind(queue2).to(exchange).with(TopicRabbitConfig.ROUTING_KEYx);
}
}
4fanout廣播模式交換器使用
4.1生產者
package com.czxy.controller;
import com.czxy.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
// fanout-exange交換機 不需要路由鍵
@GetMapping("/send")
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange", "", context);
}
}
4.2 消費者
package com.czxy.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitConfig {
public final static String FINDOUT1 = ("fanout.a");
public final static String FINDOUT2 = ("fanout.b");
public final static String FINDOUT3 = ("fanout.c");
public final static String FindoutExange = ("fanoutExchange");
//建立佇列
@Bean
public Queue AMessage() {
return new Queue(FanoutRabbitConfig.FINDOUT1);
}
@Bean
public Queue BMessage() {
return new Queue(FanoutRabbitConfig.FINDOUT2);
}
@Bean
public Queue CMessage() {
return new Queue(FanoutRabbitConfig.FINDOUT3);
}
//建立交換機
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FanoutRabbitConfig.FindoutExange);
}
//為佇列模式繫結交換機
@Bean
Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
4.3配置類
package com.czxy.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitConfig {
public final static String FINDOUT1 = ("fanout.a");
public final static String FINDOUT2 = ("fanout.b");
public final static String FINDOUT3 = ("fanout.c");
public final static String FindoutExange = ("fanoutExchange");
//建立佇列
@Bean
public Queue AMessage() {
return new Queue(FanoutRabbitConfig.FINDOUT1);
}
@Bean
public Queue BMessage() {
return new Queue(FanoutRabbitConfig.FINDOUT2);
}
@Bean
public Queue CMessage() {
return new Queue(FanoutRabbitConfig.FINDOUT3);
}
//建立交換機
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FanoutRabbitConfig.FindoutExange);
}
//為佇列模式繫結交換機
@Bean
Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}