SpringBoot中使用RabbitMQ

搭建初始环境

1. 引入依赖
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 配置配置文件
spring:
  application:
    name: springboot_rabbitmq
  rabbitmq:
    host: 10.15.0.9
    port: 5672
    username: ems
    password: 123
    virtual-host: /ems

RabbitTemplate 用来简化操作 使用时候直接在项目中注入即可使用

第一种hello world模型使用

  1. 开发生产者
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Test
    public void testHello(){
      rabbitTemplate.convertAndSend("hello","hello world");
    }
    
  2. 开发消费者
    @Component
    @RabbitListener(queuesToDeclare = @Queue("hello"))
    public class HelloCustomer {
    
        @RabbitHandler
        public void receive1(String message){
            System.out.println("message = " + message);
        }
    }
    

    第二种work模型使用

  3. 开发生产者
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Test
    public void testWork(){
      for (int i = 0; i < 10; i++) {
        rabbitTemplate.convertAndSend("work","hello work!");
      }
    }
    
  4. 开发消费者
    @Component
    public class WorkCustomer {
        @RabbitListener(queuesToDeclare = @Queue("work"))
        public void receive1(String message){
            System.out.println("work message1 = " + message);
        }
    
        @RabbitListener(queuesToDeclare = @Queue("work"))
        public void receive2(String message){
            System.out.println("work message2 = " + message);
        }
    }
    

    说明:默认在Spring AMQP实现中Work这种方式就是公平调度,如果需要实现能者多劳需要额外配置

Fanout 广播模型

  1. 开发生产者
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Test
    public void testFanout() throws InterruptedException {
      rabbitTemplate.convertAndSend("logs","","这是日志广播");
    }
    
  1. 开发消费者
    @Component
    public class FanoutCustomer {
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue,
                exchange = @Exchange(name="logs",type = "fanout")
        ))
        public void receive1(String message){
            System.out.println("message1 = " + message);
        }
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue, //创建临时队列
                exchange = @Exchange(name="logs",type = "fanout")  //绑定交换机类型
        ))
        public void receive2(String message){
            System.out.println("message2 = " + message);
        }
    }
    

Route 路由模型

  1. 开发生产者
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Test
    public void testDirect(){
      rabbitTemplate.convertAndSend("directs","error","error 的日志信息");
    }
    
  2. 开发消费者
    @Component
    public class DirectCustomer {
    
        @RabbitListener(bindings ={
                @QueueBinding(
                        value = @Queue(),
                        key={"info","error"},
                        exchange = @Exchange(type = "direct",name="directs")
                )})
        public void receive1(String message){
            System.out.println("message1 = " + message);
        }
    
        @RabbitListener(bindings ={
                @QueueBinding(
                        value = @Queue(),
                        key={"error"},
                        exchange = @Exchange(type = "direct",name="directs")
                )})
        public void receive2(String message){
            System.out.println("message2 = " + message);
        }
    }
    
    

    Topic 订阅模型(动态路由模型)

  3. 开发生产者
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    //topic
    @Test
    public void testTopic(){
      rabbitTemplate.convertAndSend("topics","user.save.findAll","user.save.findAll 的消息");
    }
    
  1. 开发消费者
    @Component
    public class TopCustomer {
        @RabbitListener(bindings = {
                @QueueBinding(
                        value = @Queue,
                        key = {"user.*"},
                        exchange = @Exchange(type = "topic",name = "topics")
                )
        })
        public void receive1(String message){
            System.out.println("message1 = " + message);
        }
    
        @RabbitListener(bindings = {
                @QueueBinding(
                        value = @Queue,
                        key = {"user.#"},
                        exchange = @Exchange(type = "topic",name = "topics")
                )
        })
        public void receive2(String message){
            System.out.println("message2 = " + message);
        }
    }