RabbitMQ 3.9.7 鏡像模式集群與Springboot 2.5.5 整合
1. 概述
老話說的好:做人要懂得變通,善於思考,有時稍微轉個彎,也許問題就解決瞭。
言歸正傳,之前我們聊瞭 RabbitMQ 3.9.7 鏡像模式集群的搭建,今天我們來聊聊 RabbitMQ 3.9.7 鏡像模式集群與Springboot 2.5.5 整合。
2. 場景說明
服務器A IP:192.168.1.22
服務器B IP:192.168.1.8
服務器C IP:192.168.1.144
此三臺服務器上已搭建好瞭 RabbitMQ鏡像模式集群,鏡像模式集群的搭建,可參見我的上一篇文章。
3. 與Springboot的整合
3.1 引入依賴
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.5</version> <relativePath/> <!-- lookup parent from repository --> </parent>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3.2 生產服務配置
spring: rabbitmq: addresses: 192.168.1.22:5672,192.168.1.8:5672,192.168.1.144:5672 username: guest password: guest virtual-host: / connection-timeout: 16000 # 啟用消息確認模式 publisher-confirm-type: correlated # 啟用 return 消息模式 publisher-returns: true template: mandatory: true
3.3 生產服務代碼
import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import java.util.Map; @Component public class Producer { @Autowired private RabbitTemplate rabbitTemplate; /** * 確認回調 */ final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { // correlationData 唯一標識 // ack mq是否收到消息 // cause 失敗原因 System.out.println("correlationData:" + correlationData.getId()); System.out.println("ack:" + ack); System.out.println("cause:" + cause); } }; /** * 發送消息 * @param messageBody 消息體 * @param headers 附加屬性 * @throws Exception */ public void sendMessage(String messageBody, Map<String, Object> headers, String id) throws Exception { MessageHeaders messageHeaders = new MessageHeaders(headers); Message<String> message = MessageBuilder.createMessage(messageBody, messageHeaders); rabbitTemplate.setConfirmCallback(confirmCallback); String exchangeName = "exchange-hello"; String routingKey = "test.123"; CorrelationData correlationData = new CorrelationData(id); rabbitTemplate.convertAndSend(exchangeName, routingKey, message, new MessagePostProcessor() { /** * 發送消息後做的事情 * @param message * @return * @throws AmqpException */ @Override public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException { return message; } }, correlationData); } }
3.4 消費服務配置
spring: rabbitmq: addresses: 192.168.1.22:5672,192.168.1.8:5672,192.168.1.144:5672 username: guest password: guest virtual-host: / connection-timeout: 16000 listener: simple: # 設置為手工ACK acknowledge-mode: manual concurrency: 5 prefetch: 1 max-concurrency: 10
3.5 消費服務代碼
import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @Component public class Consumer { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queue-hello", durable = "true"), exchange = @Exchange(value = "exchange-hello" , durable = "true", type = "topic"), key = "test.*" )) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { System.out.println("收到消息:" + message.getPayload()); Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); } }
3.6 Rest 測試代碼
@RestController @RequestMapping("/mq") public class RabbitmqController { @Autowired private Producer producer; @GetMapping("/sendMessage") public String sendMessage(@RequestParam String messageBody, @RequestParam String id) throws Exception { Map<String, Object> headers = new HashMap<>(); producer.sendMessage(messageBody, headers, id); return "success"; } }
4. 綜述
到此這篇關於RabbitMQ 3.9.7 鏡像模式集群與Springboot 2.5.5 整合的文章就介紹到這瞭,更多相關RabbitMQ鏡像模式集群內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- rabbitmq中routingkey的作用說明
- 聊聊RabbitMQ發佈確認高級問題
- SpringBoot+RabbitMQ實現消息可靠傳輸詳解
- springboot2.5.6集成RabbitMq實現Topic主題模式(推薦)
- Springboot整合Rabbitmq之Confirm和Return機制