SpringBoot+RabbitMQ實現消息可靠傳輸詳解

環境配置

SpringBoot 整合 RabbitMQ 實現消息的發送。

1.添加 maven 依賴

       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.添加 application.yml 配置文件

spring:
  rabbitmq:
    host: 192.168.3.19
    port: 5672
    username: admin
    password: xxxx

3.配置交換機、隊列以及綁定

    @Bean
    public DirectExchange myExchange() {
        DirectExchange directExchange = new DirectExchange("myExchange");
        return directExchange;
    }

    @Bean
    public Queue myQueue() {
        Queue queue = new Queue("myQueue");
        return queue;
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");
    }

4.生產發送消息

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public String send(String message) {
        rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message);
        System.out.println("【發送消息】" + message)
        return "【send message】" + message;
    }

5.消費者接收消息

    @RabbitListener(queuesToDeclare = @Queue("myQueue"))
    public void process(String msg, Channel channel, Message message) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date date = new Date();
        String time = sdf.format(date);
        System.out.println("【接收信息】" + msg + " 當前時間" + time);

6.調用生產端發送消息 hello,控制臺輸出:

【發送消息】hello
【接收信息】hello 當前時間2022-05-12 10:21:14

說明消息已經被成功接收。

消息丟失分析

一條消息的從生產到消費,消息丟失可能發生在以下幾個階段:

  • 生產端丟失: 生產者無法傳輸到 RabbitMQ
  • 存儲端丟失: RabbitMQ 存儲自身掛瞭
  • 消費端丟失:存儲由於網絡問題,無法發送到消費端,或者消費掛瞭,無法發送正常消費

RabbitMQ 從生產端、儲存端、消費端都對可靠性傳輸做很好的支持。

生產階段

生產階段通過請求確認機制,來確保消息的可靠傳輸。當發送消息到 RabbitMQ 服務器 之後,RabbitMQ 收到消息之後,給發送返回一個請求確認,表示RabbitMQ 服務器已成功的接收到瞭消息。

配置application.yml

spring:
  rabbitmq:
    # 消息確認機制 生產者 -> 交換機
    publisher-confirms: true
    # 消息返回機制  交換機 -> 隊列
    publisher-returns: true

配置

@Configuration
@Slf4j
public class RabbitConfig {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("【correlationData】:" + correlationData);
                log.info("【ack】" + ack);
                log.info("【cause】" + cause);
                if (ack) {
                    log.info("【發送成功】");
                } else {
                    log.info("【發送失敗】correlationData:" + correlationData + " cause:" + cause);
                }
            }
        });
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.warn("【消息發送失敗】");
                log.info("【message】" + message);
                log.info("【replyCode】" + replyCode);
            }
        });

        return rabbitTemplate;
    }
}

消息從 生產者 到 交換機, 有confirmCallback 確認模式。發送消息成功後消息會調用方法confirm(CorrelationData correlationData, boolean ack, String cause),根據 ack 判斷消息是否發送成功。

消息從 交換機 到 隊列,有returnCallback 退回模式。

發送消息 product message 控制臺輸出如下:

【發送消息】product message
【接收信息】product message 當前時間2022-05-12 11:27:56
【correlationData】:null
【ack】true
【cause】null
【發送成功】

生產端模擬消息丟失

這裡有兩個方案:

  • 發送消息後立馬關閉 broke,後者把網絡關閉,但是broker關閉之後控制臺一直就會報錯,發送消息也報500錯誤。
  • 發送不存在的交換機:
// myExchange 修改成 myExchangexxxxx
rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);

結果:

【correlationData】:null
【ack】false
【cause】channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND – no exchange 'myExchangexxxxx' in vhost '/', class-id=60, method-id=40)
【發送失敗】

當發送失敗可以對消息進行重試

交換機正確,發送不存在的隊列:

交換機接收到消息,返回成功通知,控制臺輸出:

【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]
【ack】true
【cause】null
【發送成功】

交換機沒有找到隊列,返回失敗信息:

【消息發送失敗】
【message】product message
【replyCode】312

RabbitMQ

開啟隊列持久化,創建的隊列和交換機默認配置是持久化的。首先把隊列和交換機設置正確,修改消費監聽的隊列,使得消息存放在隊列裡

修改隊列的持久化,修改成非持久化:

    @Bean
    public Queue myQueue() {
        Queue queue = new Queue("myQueue",false);
        return queue;
    }

發送消息之後,消息存放在隊列中,然後重啟 RabbitMQ,消息不存在瞭。
設置隊列持久化:

    @Bean
    public Queue myQueue() {
        Queue queue = new Queue("myQueue",true);
        return queue;
    }

重啟之後,隊列的消息還存在。

消費端

消費端默認開始 ack 自動確認模式,當隊列消息被消費者接收,不管有沒有被消費端消息,都自動刪除隊列中的消息。所以為瞭確保消費端能成功消費消息,將自動模式改成手動確認模式:

修改application.yml 文件

spring:
  rabbitmq:
    # 手動消息確認
    listener:
      simple:
        acknowledge-mode: manual

消費接收消息之後需要手動確認:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    @RabbitListener(queuesToDeclare = @Queue("myQueue"))
    public void process(String msg, Channel channel, Message message) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date date = new Date();
        String time = sdf.format(date);
        System.out.println("【接收信息】" + msg + " 當前時間" + time);
        System.out.println(message.getMessageProperties().getDeliveryTag());
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

如果不添加:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

發送兩條消息

消息被接收後,沒有確認,重新放到隊列中:

重啟項目,之後,隊列的消息會發送到消費者,但是沒有 ack 確認,還是繼續會放回隊列中。

加上 channel.basicAck 之後,再重啟項目

隊列消息就被刪除瞭

basicAck 方法最後一個參數 multiple 表示是刪除之前的隊列。

multiple 設置成 true,把後面的隊列都清理掉瞭

到此這篇關於SpringBoot+RabbitMQ實現消息可靠傳輸詳解的文章就介紹到這瞭,更多相關SpringBoot RabbitMQ消息可靠傳輸內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: