springBoot整合rabbitmq測試常用模型小結
之前我們記錄瞭原生java代碼使用rabbitmq的方法,很簡單,類似於原生jdbc代碼一樣,將連接對象抽離出來作為工具類,生產者和消費者通過工具類獲取連接對象,進而獲取通道對象,再註冊交換機或者是隊列等,發送消息與接收消息。
在企業開發中,我們更多的是使用spring框架來整合其它技術,springboot更是方便的提供瞭各種starter來快速添加依賴,完成整合,開箱即用。
1.添加依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.編寫配置
配置信息包括ip,端口,虛擬主機,用戶名和密碼,和原生java代碼所需的配置信息一致。
spring: application: name: spirngboot-rabbitmq rabbitmq: host: 192.168.20.128 port: 5672 virtual-host: /vh username: wuwl password: 123456
3.編寫並測試
本文主要針對前五種常用模型,在spirngboot框架的基礎上整合rabbitmq並進行測試使用。
(1) Hello World模型
這是一種簡單的直連模型,生產者將消息直接發送至消息隊列,消費者綁定消息隊列後直接獲取,一對一。spring-boot-starter-amqp
為我們提供瞭一個org.springframework.amqp.rabbit.core.RabbitTemplate
類來方便我們使用rabbitmq
,自動註入即可。
生產者測試類:
@SpringBootTest(classes = RabbitmqDemoApplication.class) @RunWith(SpringRunner.class) public class RabbitmqDemoApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testHelloQueues(){ rabbitTemplate.convertAndSend("hello","hello world"); } }
生產者向名為hello的隊列發送消息,但是,在沒有消費者的情況下,生產者沒有任何意義。另外,convertAndSend
方法的第一個參數並不是消息隊列的意思,而是routingKey
,我們根據源碼找到最初定義的接口可以看到以下內容:
/** * Convert a Java object to an Amqp {@link Message} and send it to a default exchange * with a specific routing key. * * @param routingKey the routing key * @param message a message to send * @throws AmqpException if there is a problem */ void convertAndSend(String routingKey, Object message) throws AmqpException;
第二個參數為Object
類型,也就是說可以傳遞任意類型的對象,該方法將對象轉換成一個Amqp
消息並發送到一個默認的交換機,並且routingKey
為第一個參數的內容,沒有提到消息隊列的信息,但我們可以分析到,這裡的routingKey
與queues
應該是同名的。
消費者類:
@Component @RabbitListener(queuesToDeclare = @Queue("hello")) public class HelloQueuesConsumer { @RabbitHandler public void consume(String msg){ System.out.println("消費消息:" + msg + " " + System.currentTimeMillis()); } }
上面的代碼等同於:
@Component public class HelloQueuesConsumer { @RabbitListener(queuesToDeclare = @Queue("hello")) public void consume(String msg){ System.out.println("消費消息:" + msg + " " + System.currentTimeMillis()); } }
@RabbitListener 可以標註在類上面,需配合 @RabbitHandler 註解一起使用
@RabbitListener 標註在類上面表示當有收到消息的時候,就交給 @RabbitHandler 的方法處理,具體使用哪個方法處理,根據 MessageConverter 轉換後的參數類型
直接啟動測試方法,也就是生產者,可以看到:
消費者有接收到消息隊列中的信息並打印。
(2) work queues模型
生產者測試方法,類與第一個模型一致
@Test public void testWorkQueues(){ for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend("work","work index " + i); } }
消費者類:
@Component public class WorkQueuesConsumer { @RabbitListener(queuesToDeclare = @Queue("work")) public void consume1(String msg){ System.out.println("consumer1消費消息:" + msg); } @RabbitListener(queuesToDeclare = @Queue("work")) public void consume2(String msg){ System.out.println("consumer2消費消息:" + msg); } }
啟動生產者測試方法:
消費者一與消費者二均勻分配瞭隊列中的消息任務,即使兩者執行效率不一致,也同樣是均勻分配。
(3) Publish/Subscribe模型
生產者測試方法:
for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend("amq.fanout","","fanout msg " + i); }
消費者類:
@Component public class FanoutQueuesConsumer { @RabbitListener(bindings = { @QueueBinding(value = @Queue, exchange = @Exchange( value = "amq.fanout", type = "fanout"))}) public void consume1(String msg) { System.out.println("consumer1消費消息:" + msg); } @RabbitListener(bindings = { @QueueBinding(value = @Queue, exchange = @Exchange( value = "amq.fanout", type = "fanout"))}) public void consume2(String msg) { System.out.println("consumer2消費消息:" + msg); } }
註意此處的交換機信息
啟動生產者測試方法:
此處隻粘貼瞭部分打印信息,兩個消費者獲得瞭相同的消息,生產者將消息發送至交換機,由交換機發送至已註冊到交換機的所有臨時消息隊列,進而消費者獲取隊列中的消息。
(4) Routing模型
生產者測試方法:
@Test public void testDirectQueues(){ rabbitTemplate.convertAndSend("amq.direct","info","routingKey is info"); rabbitTemplate.convertAndSend("amq.direct","warn","routingKey is warn"); rabbitTemplate.convertAndSend("amq.direct","error","routingKey is error"); }
routing也成為fanout模型,對應的交換機類型為direct
消費者類:
@Component public class DirectQueuesConsumer { @RabbitListener(bindings = { @QueueBinding(value = @Queue, exchange = @Exchange( value = "amq.direct", type = "direct"), key = {"info", "warn", "error"})}) public void consume1(String msg) { System.out.println("consumer1消費消息:" + msg); } @RabbitListener(bindings = { @QueueBinding(value = @Queue, exchange = @Exchange( value = "amq.direct", type = "direct"), key = "error")}) public void consume2(String msg) { System.out.println("consumer2消費消息:" + msg); } }
啟動生產者測試類:
消費者一配置瞭三種類型的routingKey,所以三種類型的消息都能夠接收到,消費者二隻能接受到error類型的消息。
(5) Topic模型
生產者測試方法:
@Test public void testTopicQueues(){ rabbitTemplate.convertAndSend("amq.topic","file.info","routingKey is info"); rabbitTemplate.convertAndSend("amq.topic","file.warn","routingKey is warn"); rabbitTemplate.convertAndSend("amq.topic","file.error","routingKey is error"); }
消費者類:
@Component public class TopicQueuesConsumer { @RabbitListener(bindings = { @QueueBinding(value = @Queue, exchange = @Exchange( value = "amq.topic", type = "topic"), key = {"#"})}) public void consume1(String msg) { System.out.println("consumer1消費消息:" + msg); } @RabbitListener(bindings = { @QueueBinding(value = @Queue, exchange = @Exchange( value = "amq.topic", type = "topic"), key = "*.error")}) public void consume2(String msg) { System.out.println("consumer2消費消息:" + msg); } }
啟動生產者測試方法:
消費者一配置的routingKey
為#
,可以接受任意類型的消息,*
好代表一個單詞,消費者二可以接受任意單詞加上.error
為routingKey
的消息。
到此這篇關於springBoot整合rabbitmq並測試五種常用模型的文章就介紹到這瞭,更多相關springBoot整合rabbitmq內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- rabbitmq中routingkey的作用說明
- SpringBoot整合RabbitMQ實現六種工作模式的示例
- SpringBoot集成RabbitMQ和概念介紹
- 消息交換模式RabbitMQ簡介
- Java RabbitMQ高級特性詳細分析