WQhuanm
RabbitMQ学习笔记

RabbitMQ学习笔记

基础知识

  1. 基本架构
  • Broker:即RabbitMQ的实体服务器。提供一种传输服务,维护一条从生产者到消费者的传输线路,保证消息数据能按照指定的方式传输。
  • exchange:交换机,负责消息路由。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
  • Channel:建立在tcp上的虚拟消息通道,也称信道。每个tcp连接可建立多个Channel(类似http2的流),每个Channel代表一个会话任务
  1. 交换机类型
  • Fanout:广播,将消息交给所有绑定到交换机的队列
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过多个RoutingKey用.隔开,且可以使用通配符(#匹配0或多个key,*匹配一个key,eg:china.new.#.haha,#表示中间可以有其他key)
  • Headers:头匹配,基于MQ的消息头匹配,用的较少
  1. MQ可靠性保证

    1. 生产者确认机制:Publisher Confirm机制(消息入队成功则返回ack,否则返回nack)
    2. MQ可靠性
      • 默认开启数据持久化(ps:消息持久化后才会返回给生产者ack)
      • 默认开启LazyQueue:接收到消息直接存入磁盘,消费时才从磁盘读取到内存(懒加载)
    3. 消费者保证
    4. 消费者确认机制
      • manual:手动模式。需要调用api发送ack或reject
      • auto:自动模式。基于aop,当业务正常执行时则自动返回ack;业务异常自动返回nack;消息处理或校验异常自动返回reject;
    5. 消费者重试机制:开启后,失败会再本地重试,重试达到最大次数后执行失败处理策略
    6. 失败处理策略
    • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
    • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(死信队列)
  2. 延迟队列(需要使用插件,即声明交换机为延迟交换机)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name="delay.queue"),
    exchange = @Exchange(name = "delay.ec",type = ExchangeTypes.DIRECT,delayed = "true"), //设置为delay队列
    key="delay"
    ))
    public void listenDelay(User user){
    System.out.println("消费者4接收到direct.queue的消息:"+ user );
    }

    rabbitTemplate.convertAndSend(delayExchange,key,user,message -> {
    message.getMessageProperties().setHeader("x-delay",5000);//消息头设置延迟时间
    return message;
    });

SpringAMQP的编写方式

  1. 注解配置消费者对应的队列和交换机

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    @RabbitListener(bindings = @QueueBinding( //配置该消费者对应的队列和交换机以及队列订阅的key
    value = @Queue(name = "direct.queue"),
    exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue"),
    exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
    key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue的消息:【" + msg + "】");
    }


    //发布者发布格式一般就是指定[交换机],路由key(没有交换机时则队列名字),消息Object(要定义序列化器,且类要实现了序列化接口Serializable)
    rabbitTemplate.convertAndSend(exchange,key,message);

  2. 定义消息转换器

    1
    2
    3
    4
    5
    6
    7
    8
    @Bean
    public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter=new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
    }

附:RabbitMQ的Docker部署与整合配置

Docker部署

  1. 基本部署

    1
    2
    3
    4
    5
    6
    7
    8
    9
    docker pull rabbitmq:management
    docker run -d `
    --name rabbitmq `
    -p 5672:5672 `
    -p 15672:15672 `
    -v rabbitmq_data:/var/lib/rabbitmq `
    -e RABBITMQ_DEFAULT_USER=admin `
    -e RABBITMQ_DEFAULT_PASS=1234 `
    rabbitmq:management

  2. 添加DelayExchange延迟消息插件

    • 先下载插件

    • 然后执行如下命令

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
              docker cp rabbitmq_delayed_message_exchange-v4.0.7.ez rabbitmq:/plugins
      docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
      ```

      #### 整合rabbitmq

      1. 导入AMQP依赖

      ~~~xml
      <!--AMQP依赖,包含RabbitMQ-->
      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
      ~~~

      1. application.yml配置

      ```yml
      spring:
      rabbitmq:
      host: localhost
      port: 5672
      virtual-host: / #rabbitmq的虚拟主机
      username: admin
      password: 1234
      connection-timeout: 1s # 设置MQ的连接超时时间
      publisher-confirm-type: correlated # 开启publisher confirm机制,类型为异步回调返回
      listener:
      simple:
      prefetch: 5 # 每次只能获取5条消息,处理完成才能获取下一个消息
      acknowledge-mode: auto # 开启消费者确认机制,自动模式
      retry:
      enabled: true # 开启消费者失败重试
      initial-interval: 1000ms # 初始的失败等待时长为1秒
      multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
      max-attempts: 3 # 最大重试次数
      stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

本文作者:WQhuanm
本文链接:https://wqhuanm.github.io/Issue_Blog/2025/04/09/27_RabbitMQ学习笔记/
版权声明:本文采用 CC BY-NC-SA 3.0 CN 协议进行许可