SpringBoot整合RabbitMQ的5種模式實戰
一、環境準備
1、pom依賴
<!-- 父工程依賴 --> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.6.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.6.0</version> </dependency> </dependencies>
2、配置文件
server: port: 8080 spring: rabbitmq: host: 192.168.131.171 port: 5672 username: jihu password: jihu virtual-host: /jihu
3、啟動類
@SpringBootApplication public class RabbitMQApplication { public static void main(String[] args) { SpringApplication.run(RabbitMQApplication.class); } }
5、Swagger2類
@Configuration @EnableSwagger2 public class Swagger2 { // http://127.0.0.1:8080/swagger-ui.html @Bean public Docket createRestApi() { return new Docket(DocumentationType.SWAGGER_2) .apiInfo(apiInfo()) .select() .apis(RequestHandlerSelectors.basePackage("com.jihu")) .paths(PathSelectors.any()) .build(); } private ApiInfo apiInfo() { return new ApiInfoBuilder() .title("極狐-Spring Boot中使用spring-boot-starter-amqp集成rabbitmq") .description("測試SpringBoot整合進行各種工作模式信息的發送") /* .termsOfServiceUrl("https://www.jianshu.com/p/c79f6a14f6c9") */ .contact("roykingw") .version("1.0") .build(); } }
6、ProducerController
@RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; //helloWorld 直連模式 @ApiOperation(value = "helloWorld發送接口", notes = "直接發送到隊列") @GetMapping(value = "/helloWorldSend") public Object helloWorldSend(String message) throws AmqpException, UnsupportedEncodingException { //設置部分請求參數 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //發消息 rabbitTemplate.send("helloWorldqueue", new Message(message.getBytes("UTF-8"), messageProperties)); return "message sended : " + message; } //工作隊列模式 @ApiOperation(value = "workqueue發送接口", notes = "發送到所有監聽該隊列的消費") @GetMapping(value = "/workqueueSend") public Object workqueueSend(String message) throws AmqpException, UnsupportedEncodingException { MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //制造多個消息進行發送操作 for (int i = 0; i < 10; i++) { rabbitTemplate.send("work_sb_mq_q", new Message(message.getBytes("UTF-8"), messageProperties)); } return "message sended : " + message; } // pub/sub 發佈訂閱模式 交換機類型 fanout @ApiOperation(value = "fanout發送接口", notes = "發送到fanoutExchange。消息將往該exchange下的所有queue轉發") @GetMapping(value = "/fanoutSend") public Object fanoutSend(String message) throws AmqpException, UnsupportedEncodingException { MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //fanout模式隻往exchange裡發送消息。分發到exchange下的所有queue rabbitTemplate.send("fanoutExchange", "", new Message(message.getBytes("UTF-8"), messageProperties)); return "message sended : " + message; } //routing路由工作模式 交換機類型 direct @ApiOperation(value = "direct發送接口", notes = "發送到directExchange。exchange轉發消息時,會往routingKey匹配的queue發送") @GetMapping(value = "/directSend") public Object routingSend(String routingKey, String message) throws AmqpException, UnsupportedEncodingException { if (null == routingKey) { routingKey = "china.changsha"; } MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //fanout模式隻往exchange裡發送消息。分發到exchange下的所有queue rabbitTemplate.send("directExchange", routingKey, new Message(message.getBytes("UTF-8"), messageProperties)); return "message sended : routingKey >" + routingKey + ";message > " + message; } //topic 工作模式 交換機類型 topic @ApiOperation(value = "topic發送接口", notes = "發送到topicExchange。exchange轉發消息時,會往routingKey匹配的queue發送,*代表一個單詞,#代表0個或多個單詞。") @GetMapping(value = "/topicSend") public Object topicSend(String routingKey, String message) throws AmqpException, UnsupportedEncodingException { if (null == routingKey) { routingKey = "changsha.kf"; } MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //fanout模式隻往exchange裡發送消息。分發到exchange下的所有queue rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"), messageProperties)); return "message sended : routingKey >" + routingKey + ";message > " + message; } }
7、ConcumerReceiver
@Component public class ConcumerReceiver { //直連模式的多個消費者,會分到其中一個消費者進行消費。類似task模式 //通過註入RabbitContainerFactory對象,來設置一些屬性,相當於task裡的channel.basicQos @RabbitListener(queues = "helloWorldqueue") public void helloWorldReceive(String message) { System.out.println("helloWorld模式 received message : " + message); } //工作隊列模式 @RabbitListener(queues = "work_sb_mq_q") public void wordQueueReceiveq1(String message) { System.out.println("工作隊列模式1 received message : " + message); } @RabbitListener(queues = "work_sb_mq_q") public void wordQueueReceiveq2(String message) { System.out.println("工作隊列模式2 received message : " + message); } //pub/sub模式進行消息監聽 @RabbitListener(queues = "fanout.q1") public void fanoutReceiveq1(String message) { System.out.println("發佈訂閱模式1received message : " + message); } @RabbitListener(queues = "fanout.q2") public void fanoutReceiveq2(String message) { System.out.println("發佈訂閱模式2 received message : " + message); } //Routing路由模式 @RabbitListener(queues = "direct_sb_mq_q1") public void routingReceiveq1(String message) { System.out.println("Routing路由模式routingReceiveq11111 received message : " + message); } @RabbitListener(queues = "direct_sb_mq_q2") public void routingReceiveq2(String message) { System.out.println("Routing路由模式routingReceiveq22222 received message : " + message); } //topic 模式 //註意這個模式會有優先匹配原則。例如發送routingKey=hunan.IT,那匹配到hunan.*(hunan.IT,hunan.eco),之後就不會再去匹配*.ITd @RabbitListener(queues = "topic_sb_mq_q1") public void topicReceiveq1(String message) { System.out.println("Topic模式 topic_sb_mq_q1 received message : " + message); } @RabbitListener(queues = "topic_sb_mq_q2") public void topicReceiveq2(String message) { System.out.println("Topic模式 topic_sb_mq_q2 received message : " + message); } }
二、簡單模式
隊列配置:
/** * HelloWorld rabbitmq第一個工作模式 * 直連模式隻需要聲明隊列,所有消息都通過隊列轉發。 * 無需設置交換機 */ @Configuration public class HelloWorldConfig { @Bean public Queue setQueue() { return new Queue("helloWorldqueue"); } }
三、工作隊列模式
@Configuration public class WorkConfig { //聲明隊列 @Bean public Queue workQ1() { return new Queue("work_sb_mq_q"); } }
四、廣播模式(Fanout)
/** * Fanout模式需要聲明exchange,並綁定queue,由exchange負責轉發到queue上。 * 廣播模式 交換機類型設置為:fanout */ @Configuration public class FanoutConfig { //聲明隊列 @Bean public Queue fanoutQ1() { return new Queue("fanout.q1"); } @Bean public Queue fanoutQ2() { return new Queue("fanout.q2"); } //聲明exchange @Bean public FanoutExchange setFanoutExchange() { return new FanoutExchange("fanoutExchange"); } //聲明Binding,exchange與queue的綁定關系 @Bean public Binding bindQ1() { return BindingBuilder.bind(fanoutQ1()).to(setFanoutExchange()); } @Bean public Binding bindQ2() { return BindingBuilder.bind(fanoutQ2()).to(setFanoutExchange()); } }
五、直連模式(Direct)
/* 路由模式|Routing模式 交換機類型:direct */ @Configuration public class DirectConfig { //聲明隊列 @Bean public Queue directQ1() { return new Queue("direct_sb_mq_q1"); } @Bean public Queue directQ2() { return new Queue("direct_sb_mq_q2"); } //聲明exchange @Bean public DirectExchange setDirectExchange() { return new DirectExchange("directExchange"); } //聲明binding,需要聲明一個routingKey @Bean public Binding bindDirectBind1() { return BindingBuilder.bind(directQ1()).to(setDirectExchange()).with("china.changsha"); } @Bean public Binding bindDirectBind2() { return BindingBuilder.bind(directQ2()).to(setDirectExchange()).with("china.beijing"); } }
六、通配符模式(Topic)
/* Topics模式 交換機類型 topic * */ @Configuration public class TopicConfig { //聲明隊列 @Bean public Queue topicQ1() { return new Queue("topic_sb_mq_q1"); } @Bean public Queue topicQ2() { return new Queue("topic_sb_mq_q2"); } //聲明exchange @Bean public TopicExchange setTopicExchange() { return new TopicExchange("topicExchange"); } //聲明binding,需要聲明一個roytingKey @Bean public Binding bindTopicHebei1() { return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("changsha.*"); } @Bean public Binding bindTopicHebei2() { return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("#.beijing"); } }
測試
我們啟動上面的SpringBoot項目。
然後我們訪問swagger地址:http://127.0.0.1:8080/swagger-ui.html
然後我們就可以使用swagger測試接口瞭。
或者可以使用postman進行測試。
到此這篇關於SpringBoot整合RabbitMQ的5種模式實戰的文章就介紹到這瞭,更多相關SpringBoot整合RabbitMQ模式內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- SpringBoot整合RabbitMQ實現六種工作模式的示例
- springBoot整合rabbitmq測試常用模型小結
- Spring Boot+RabbitMQ 通過fanout模式實現消息接收功能(支持消費者多實例部署)
- springboot整合消息隊列RabbitMQ
- SpringBoot集成RabbitMQ和概念介紹