WQhuanm
RabbitMQ学习笔记

RabbitMQ学习笔记

基础知识

  1. 基本架构

  • Broker:即RabbitMQ的实体服务器。提供一种传输服务,维护一条从生产者到消费者的传输线路,保证消息数据能按照指定的方式传输。
  • exchange:交换机,负责消息路由。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
  • Channel:建立在tcp上的虚拟消息通道,也称信道。每个tcp连接可建立多个Channel(类似http2的流),每个Channel代表一个会话任务
  1. 交换机类型
  • Fanout:广播,将消息交给所有绑定到交换机的队列
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过多个RoutingKey用.隔开,且可以使用通配符(#匹配0或多个key,*匹配一个key,eg:china.new.#.haha,#表示中间可以有其他key)
  • Headers:头匹配,基于MQ的消息头匹配,用的较少
  1. MQ可靠性保证

  2. 生产者确认机制:Publisher Confirm机制(消息入队成功则返回ack,否则返回nack)

  3. MQ可靠性
    + 默认开启数据持久化(ps:消息持久化后才会返回给生产者ack)
    + 默认开启LazyQueue:接收到消息直接存入磁盘,消费时才从磁盘读取到内存(懒加载)

  4. 消费者保证
    1. 消费者确认机制

    • manual:手动模式。需要调用api发送ack或reject
    • auto:自动模式。基于aop,当业务正常执行时则自动返回ack;业务异常自动返回nack;消息处理或校验异常自动返回reject;
      1. 消费者重试机制:开启后,失败会再本地重试,重试达到最大次数后执行失败处理策略
  5. 失败处理策略
    + ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
    + RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(死信队列)

  6. 延迟队列(需要使用插件,即声明交换机为延迟交换机)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name="delay.queue"),
    exchange = @Exchange(name = "delay.ec",type = ExchangeTypes.DIRECT,delayed = "true"), //设置为delay队列
    key="delay"
    ))
    public void listenDelay(User user){
    System.out.println("消费者4接收到direct.queue的消息:"+ user );
    }

    rabbitTemplate.convertAndSend(delayExchange,key,user,message -> {
    message.getMessageProperties().setHeader("x-delay",5000);//消息头设置延迟时间
    return message;
    });

SpringAMQP的编写方式

  1. 注解配置消费者对应的队列和交换机
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
      @RabbitListener(bindings = @QueueBinding( //配置该消费者对应的队列和交换机以及队列订阅的key
    value = @Queue(name = "direct.queue"),
    exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue"),
    exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
    key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue的消息:【" + msg + "】");
    }


    //发布者发布格式一般就是指定[交换机],路由key(没有交换机时则队列名字),消息Object(要定义序列化器,且类要实现了序列化接口Serializable)
    rabbitTemplate.convertAndSend(exchange,key,message);
  2. 定义消息转换器
    1
    2
    3
    4
    5
    6
    7
    8
    @Bean
    public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter=new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
    }

附:RabbitMQ的Docker部署与整合配置

Docker部署

  1. 基本部署
    1
    2
    3
    4
    5
    6
    7
    8
    9
    docker pull rabbitmq:management
    docker run -d `
    --name rabbitmq `
    -p 5672:5672 `
    -p 15672:15672 `
    -v rabbitmq_data:/var/lib/rabbitmq `
    -e RABBITMQ_DEFAULT_USER=admin `
    -e RABBITMQ_DEFAULT_PASS=1234 `
    rabbitmq:management
  2. 添加DelayExchange延迟消息插件
    • 先下载插件
      1
      2
      docker cp rabbitmq_delayed_message_exchange-v4.0.7.ez rabbitmq:/plugins
      docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

整合rabbitmq

  1. 导入AMQP依赖

    1
    2
    3
    4
    5
    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  2. application.yml配置

本文作者:WQhuanm
本文链接:https://wqhuanm.github.io/Issue_Blog/2025/04/09/27_RabbitMQ学习笔记/
版权声明:本文采用 CC BY-NC-SA 3.0 CN 协议进行许可