springboot-rabbitmq-reply 消息直接回復模式詳情

一、使用場景

MQ的作用包括瞭解耦、異步等。

通常生產者隻負責生產消息,而不關心消息誰去獲取,或者消費結果如何;消費者隻負責接收指定的消息進行業務處理而不關心消息從哪裡來一級回復業務處理情況。但我們項目中有特殊的業務存在,我們作為消息生產者在生產消息後需要接收消費者的響應結果(說白瞭就是類似同步調用 請求響應的MQ使用),經過研究,MQ的Reply模式(直接回復模式)就是為此種業務模式而產生。

二、Reply實戰

(1)依賴與YML配置

依賴:

我這裡隻列出最核心的rabbitMq所需依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置:

無其餘特殊配置,因為reply就是rabbitmq的一種交互方式而已

spring:
  rabbitmq:
    host: 10.50.40.116
    port: 5673
    username: admin
    password: admin

(2)RabbitMq bean配置

package com.leilei.demo;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author lei
 * @create 2022-09-19 21:44
 * @desc mq配置
 **/
@Configuration
public class RabbitMqConfig {
    @Bean
    public Queue bizQueue() {
        return new Queue("bizQueue");
    }
    @Bean
    public Queue replyQueue() {
        return new Queue("replyQueue");
    }
    @Bean
    FanoutExchange bizExchange() {
        return new FanoutExchange("bizExchange");
    }
}

業務類:

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Vehicle implements Serializable {
    private Integer id;
    private String name;
}

(3)消息生產端

消息生產端需要做的事情:有生產消息、接受消息消費響應

(1)生產消息

  • 1、生產消息,看業務場景選擇是否生成全局唯一自定義的消息ID
  • 2、指定消息消費後響應的隊列(Reply)
    /**
     * 生產消息
     *
     * @param
     * @return void
     * @author lei
     * @date 2022-09-19 21:59:18
     */
    public void replySend() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setReplyTo("replyQueue");
        //todo 根據業務,做一個嚴謹的全局唯一ID,我這裡暫時用UUID
        String correlationId = UUID.randomUUID().toString();
        // 我這裡指定瞭唯一消息ID,看業務場景,消費者消費響應後,生產者端可根據消息ID做業務處理
        messageProperties.setCorrelationId(correlationId);
        Vehicle vehicle = new Vehicle(1, "川A0001");
        Message message = new Message(JSON.toJSONString(vehicle).getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("bizExchange","",message);
        System.out.println("生產者發送消息,自定義消息ID為:" + correlationId);
    }

(2)接受Reply響應

消費者消費消息後會將處理結果進行發送到一個隊列,我們讀取這裡隊列就可以拿到對應消息的響應結果進行業務處理瞭

    /**
     * 接收消息響應
     *
     * @param message
     * @return void
     * @author lei
     * @date 2022-09-19 21:59:27
     */
    @RabbitListener(queues = "replyQueue")
    public void replyResponse(Message message) {
        String s = new String(message.getBody());
        String correlationId = message.getMessageProperties().getCorrelationId();
        System.out.println("收到客戶端響應消息ID:" + correlationId);
        //todo 根據消息ID可判斷這是哪一個消息的響應,我們就可做業務操作
        System.out.println("收到客戶端響應消息:" + s);
    }

(4)消息消費端

消息消費端需要做的事有:接受消息然後進行業務處理、響應消息

(1)方法一:sendTo註解+方法返回值

一般來說,我們mq消費者監聽方法不需要返回值,我們這裡使用sendTo註解,則需要將要響應的消息定義為返回值,sendTo註解中指定要響應到哪個隊列

重點:

  • 1、sendTo註解指定要相應的隊列(註意和生產端保持一致)
  • 2、方法定義的返回值內容就是要響應的消息,最終會發送到sendTo註解指定要相應的隊列
  • 3、這種方法的缺點是消費端的主關性很高,因為sendTo指定的目標隊列可以自己瞎寫,導致生產者端無法正確收到消息響應,但我相信一般項目中也不會這麼幹
    /**
     * 方式1   SendTo指定響應隊列
     *
     * @param message
     * @return String
     * @author lei
     * @date 2022-09-19 16:17:52
     */
    @RabbitListener(queues ="bizQueue")
    @SendTo("replyQueue")
    public String handleEmailMessage(Message message) {
        try {
            String msg=new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("---consumer接收到消息----{}",msg);
            return "客戶端響應消息:"+msg+"處理完成!";
        } catch (Exception e) {
            log.error("處理業務消息失敗",e);
        }
        return null;
    }

(2)方法二:讀取生產端的消息使用模板發送

與普通的消費者方法一樣,隻需要RabbitListener註解監聽業務隊列;但還需要根據消息獲取出ReplyTo地址,然後自己消費者方法內部手動發送消息

  • 1、優點,更強烈的感受到消息請求 響應的交互性,流程看起來更清晰
  • 2、缺點,代碼不雅
    /**
     * 方式2  message消息獲取內部reply rabbitmq手動發送
     *
     * @param message
     * @return String
     * @author lei
     * @date 2022-09-19 16:17:52
     */
    @RabbitListener(queues = "bizQueue")
    public void handleEmailMessage2(Message message) {
        try {
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("---consumer接收到消息----{}", msg);
            String replyTo = message.getMessageProperties().getReplyTo();
            System.out.println("接收到的reply:" + replyTo);
            rabbitTemplate.convertAndSend(replyTo, "客戶端響應消息:" + msg + "處理完成!", x -> {
                x.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId());
                return x;
            });
        } catch (Exception e) {
            log.error("處理業務消息失敗",e);
        }
    }

(3)方法三:方法返回值

這種方式與1其實是一致的,但我經過測試,因為生產者消息指定瞭ReplyTo的地址,消費者端無需自己再次手動指定,即生產消息到哪裡,是否響應以及響應消息發送到哪裡全由生產端自己空,消費者隻需要處理自身業務以及返回結果

   /**
     * 方式三  方法有返回值,返回要響應的數據 (reply 由生產者發送消息時指定,消費者不做任何處理)
     *
     * @param message
     * @return String
     * @author lei
     * @date 2022-09-19 23:17:47
     */
    @RabbitListener(queues ="bizQueue")
    public String handleEmailMessage3(Message message) {
        try {
            String msg=new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("---consumer接收到消息----{}",msg);
            return "客戶端響應消息:"+msg+"處理完成!";
        }
        catch (Exception e) {
            log.error("處理業務消息失敗",e);
        }
        return null;
    }

(4)測試

生產消息:

消費消息與響應:

收到的響應:

鏈路:

如此,MQ版本的請求響應模式就完成瞭,其實很多大佬使用MQ來實現RPC就是用的ReplyTo啦!

到此這篇關於springboot-rabbitmq-reply 消息直接回復模式詳情的文章就介紹到這瞭,更多相關springboot-rabbitmq-reply 內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: