SpringBoot中使用RabbitMQ
搭建初始环境
1. 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 配置配置文件
spring:
application:
name: rabbitmq-springboot
rabbitmq:
host: 124.71.207.241
port: 5672
username: ems
password: 123
virtual-host: ems
模板对象RabbitTemplate
用来简化操作 使用时候直接在项目中注入即可使用
第一种hello world模型使用
开发生产者
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testHello(){ rabbitTemplate.convertAndSend("hello","hello world"); }
开发消费者
@Component @RabbitListener(queuesToDeclare = @Queue("hello")) //@RabbitListener(queuesToDeclare = @Queue(value = "work",declare = "true")) // ----@Queue的参数----- //name: 队列的名称; //durable: 是否持久化; //exclusive: 是否独享、排外的; //autoDelete: 是否自动删除; //arguments:队列的其他属性参数 public class HelloCustomer { @RabbitHandler public void receive1(String message){ System.out.println("message = " + message); } }
在springboot中RabbitMQ是以消费者为主体的,如果只通过rabbitTemplate开发了生产者,而没有对应的消费者,是不会产生对应的队列和消息的。
第二种work模型使用
开发生产者
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testWork(){ for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("work","hello work!"); } }
开发消费者
@Component public class WorkCustomer { @RabbitListener(queuesToDeclare = @Queue("work")) public void receive1(String message){ System.out.println("work message1 = " + message); } @RabbitListener(queuesToDeclare = @Queue("work")) public void receive2(String message){ System.out.println("work message2 = " + message); } }
说明:默认在Spring AMQP实现中Work这种方式就是公平调度,如果需要实现能者多劳需要额外配置
Fanout 广播模型
开发生产者
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testFanout() throws InterruptedException { rabbitTemplate.convertAndSend("logs","","这是日志广播"); }
开发消费者
@Component public class FanoutCustomer { @RabbitListener(bindings = @QueueBinding( value = @Queue, exchange = @Exchange(name="logs",type = "fanout") )) public void receive1(String message){ System.out.println("message1 = " + message); } @RabbitListener(bindings = @QueueBinding( value = @Queue, //不指定队列名,创建临时队列 exchange = @Exchange(name="logs",type = "fanout") //绑定交换机类型 )) public void receive2(String message){ System.out.println("message2 = " + message); } }
Route 路由模型
开发生产者
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testDirect(){ rabbitTemplate.convertAndSend("directs","error","error 的日志信息"); }
开发消费者
@Component public class DirectCustomer { @RabbitListener(bindings ={ @QueueBinding( value = @Queue(),//不指定队列名,创建临时队列 key={"info","error"}, exchange = @Exchange(type = "direct",name="directs") )}) public void receive1(String message){ System.out.println("message1 = " + message); } @RabbitListener(bindings ={ @QueueBinding( value = @Queue(), key={"error"}, exchange = @Exchange(type = "direct",name="directs") )}) public void receive2(String message){ System.out.println("message2 = " + message); } }
Topic 订阅模型(动态路由模型)
开发生产者
@Autowired private RabbitTemplate rabbitTemplate; //topic @Test public void testTopic(){ rabbitTemplate.convertAndSend("topics","user.save.findAll","user.save.findAll 的消息"); }
开发消费者
@Component public class TopCustomer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, key = {"user.*"}, exchange = @Exchange(type = "topic",name = "topics") ) }) public void receive1(String message){ System.out.println("message1 = " + message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, key = {"user.#"}, exchange = @Exchange(type = "topic",name = "topics") ) }) public void receive2(String message){ System.out.println("message2 = " + message); } }
给个饭钱?
- Post link: http://sovzn.github.io/2023/08/12/RabbitMQ%E6%95%B4%E5%90%88SpringBoot/
- Copyright Notice: All articles in this blog are licensed under unless otherwise stated.
若没有本文 Issue,您可以使用 Comment 模版新建。
GitHub Issues