SpringBoot整合RabbitMQ, 實現生產者與消費者的功能
自然,依賴是少不瞭的。除瞭spring-boot-starter-web依賴外。
就這個是最主要的依賴瞭,其他的看著辦就是瞭。我用的是gradle,用maven的看著弄也一樣的。無非就是包+包名+版本
//AMQP compile('org.springframework.boot:spring-boot-starter-amqp:2.0.4.RELEASE')
這裡有一個坑。導致我後來發送消息時一直連不上去。報錯: java.net.SocketException: socket closed。
我去網上尋找瞭許多方案。大致都是一個意思。沒有設置遠程連接權限。讓我添加一個用戶,並且設置最大權限。
下面是添加rabbitmq用戶的命令
#rabbitmqctl add_user 賬號 密碼 rabbitmqctl add_user admin 614 #分配用戶標簽(admin為要賦予administrator權限的剛創建的那個賬號的名字) rabbitmqctl set_user_tags admin administrator #設置權限,開啟遠程訪問 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
我用完之後去管控臺(http://ip:15672)看瞭一下用戶列表。確實已經添加上去瞭,也是最大權限。
然鵝並沒有什麼卵用
後來強行摸索出來瞭,原來是版本差異的原因。我SpringBoot本來是使用的是2.0.3版本,然後AMQP我使用的是2.0.4。可能有什麼不兼容的地方。
把Springboot和AMQP的版本給同步成一個就好瞭。別的版本差一點根本沒啥問題,就AMQP特殊,也是醉瞭。
使用SpriongBoot的yml配置:重點是rabbitmq那一欄
設置好登錄用戶、密碼、地址端口、虛擬地址、超時時間就可以瞭
server: port: 8080 servlet: context-path: / spring: http: encoding: charset: UTF-8 jackson: #前端頁面傳Date值時格式化 date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://192.168.194.128:3306/mysql?serverTimezone=Asia/Shanghai username: root password: 614 rabbitmq: port: 5672 host: 192.168.194.128 username: admin password: 614 virtual-host: / connection-timeout: 15s #Redis配置 redis: host: 192.168.194.128 port: 6379 #Redis連接池配置 jedis: pool: min-idle: 0 max-idle: 8 max-active: 8 max-wait: -1ms
這裡又有個小坑,這個rabbitmq的超時時間(connection-timeout)配的我真的是醉瞭,我看的教程裡寫的是15000,表示15秒,我一輸之後IDEA直接報紅線啊。
網上一找,全特麼用毫秒值配的,行吧,應該我們用的不是一個版本。
點開看下這參數接受一個java.time.Duration對象,百思不得其解。這玩意咋配?我不會啊。找瞭二十分鐘的攻略才知道是這樣子配的,使用數字+時間標志。比如1h、1M、1m、1d、1s、1ms這種格式就行瞭。
咳咳,配置文件弄好後也就差不多可以使用rabbitmq發消息瞭。
生產端發消息。隻需要使用 RabbitTemplate 類就夠瞭,看到這個名字,有沒有一種很熟悉的感覺?
Redis也有個這玩意 叫 RedisTemplate
關於發消息,在這兒最好還是先指定好exchange和routingKey,即交換機和路由鍵。
這樣發過去的消息才能被發到指定的交換機上,然後交換機在通過你的routingKey來發送給綁定瞭該routingKey的所有隊列。
所以首先登陸管控臺(http://ip:15672),到Exchanges和Queues菜單下,創建好交換機和隊列,還有他們之間的routingKey。這個步驟我就不詳細描述瞭。單靠語言不怎麼能夠描述清楚。估計得配很多圖,有需要的自行google把。
萬事俱備。正式開始發送消息。
先準備一個要發的玩意。根據業務需求自己創個model就行。我這隨便寫一個。
關於這個messageId,及消息唯一ID。他的作用是將該條消息數據和RabbitMQ發送的消息綁定起來。不要也不是不行。隻是最好還是設置一個這個參數。
package com.skypyb.rabbitmq.entity; import java.io.Serializable; public class User1 implements Serializable{ private Long id; private String name; private String messageId;//儲存消息發送的唯一標識 public User1() { } public User1(Long id, String name, String messageId) { this.id = id; this.name = name; this.messageId = messageId; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } }
要發送的數據模型已經準備好,接下來這個類是一個重點。即發送消息的類。
註入RabbbitTemplate,然後就可以通過他的 convertSendAndReceive() 方法進行消息的發送。
他有很多種重載,最好是選用我這種,比較可控。交換機、路由鍵、消息唯一ID全部指定好。
package com.skypyb.rabbitmq.producer; import com.skypyb.rabbitmq.entity.User1; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component("user1Sender") public class User1Sender { @Autowired private RabbitTemplate rabbitTemplate;//操作rabbitmq的模板 public void send(User1 user1){ CorrelationData correlationData= new CorrelationData(); correlationData.setId(user1.getMessageId()); rabbitTemplate.convertSendAndReceive( "user1-exchange",//exchange "user1.key1",//routingKey user1,//消息體內容 correlationData//消息唯一ID ); } }
emmmm,是不是感覺還是挺簡單的。一個方法調用,消息就過去瞭。就發送到指定的交換機瞭。交換機再通過你的routingKey轉發給綁定在上邊的隊列。生產端這邊就完事瞭。
寫個測試類測試一下。
package com.skypyb.test; import com.skypyb.rabbitmq.Application; import com.skypyb.rabbitmq.entity.User1; import com.skypyb.rabbitmq.producer.User1Sender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; import java.util.UUID; @RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class TestOne { @Autowired private User1Sender user1Sender; @Test public void testSend1(){ User1 user1 = new User1(); user1.setId(1L); user1.setName("測試用戶1"); user1.setMessageId("user1$"+System.currentTimeMillis()+"$"+ UUID.randomUUID().toString()); user1Sender.send(user1); } }
運行完畢後。登陸管控臺(http://ip:15672),進入Queues菜單。即可發現消息隊列中已接收到一條消息,會是一個等待消費的狀態。
至於到底是哪個消息隊列來處理嘛,那就得看你的exchange通過你的routingKey具體把消息轉發到哪兒瞭。這個都是在管控臺裡邊配置的。
生產端準備完畢。接下來是消費端。消費端也很簡單,yml需要添加消費端的配置。簽收模式最好選擇手動簽收。可控。
server: port: 8081 servlet: context-path: / spring: http: encoding: charset: UTF-8 jackson: #前端頁面傳Date值時格式化 date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://192.168.194.128:3306/mysql?serverTimezone=Asia/Shanghai username: root password: 614 #rabbitmq基本配置 rabbitmq: addresses: 192.168.194.128:5672 username: admin password: 614 virtual-host: / connection-timeout: 15s #rabbitmq消費端配置 listener: simple: #並發數 concurrency: 5 #最大並發數 max-concurrency: 10 #簽收模式:手工簽收、自動簽收 acknowledge-mode: manual #限流,在此消費端同一時間隻有一條消息消費 prefetch: 1 #Redis配置 redis: host: 192.168.194.128 port: 6379 #Redis連接池配置 jedis: pool: min-idle: 0 max-idle: 8 max-active: 8 max-wait: -1ms
具體的消費者,具體解釋都寫在註釋中瞭。
關於@Exchange註解中設置的交換機的type屬性,主要是用這些值:
- fanout:會把所有發到Exchange的消息路由到所有和它綁定的Queue
- direct:會把消息路由到routing key和binding key完全相同的Queue,不相同的丟棄
- topic:direct是嚴格匹配,那麼topic就算模糊匹配,routing key和binding key都用.來區分單詞串,比如A.B.C,*匹配任意單詞,#匹配任意多個或0個單詞,比如。A.B.*可以匹配到A.B.C
- headers:不依賴routing key和binding key,通過對比消息屬性中的headers屬性,對比Exchange和Queue綁定時指定的鍵值對,相同就路由過來
basicAck()方法可以確認消息消費。執行後,消息隊列中這條消息就沒瞭。multiple參數表示是否批量消費,一般都選false。
package com.skypyb.rabbitmq.controller; import com.rabbitmq.client.Channel; import com.skypyb.rabbitmq.entity.User1; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; @Component public class User1Receiver { /** * @param user1 消息體,使用 @Payload 註解 * @param headers 消息頭,使用 @Headers 註解 * @param channel */ /*@RabbitListener表示監聽的具體隊列. bindings屬性代表綁定。裡邊有幾個值填寫,填寫好綁定的隊列名字和交換機名字 指定好routingKey。若指定的這些參數不存在的話。則會自行給你創建好 durable代表是否持久化 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "user1-queue", durable = "true"), exchange = @Exchange(name = "user1-exchange", durable = "true", type = "topic"), key = "user1.#" ) ) @RabbitHandler//標識這個方法用於消費消息 public void onUser1Message(@Payload User1 user1, @Headers Map<String, Object> headers, Channel channel) throws IOException { //消費者操作 System.out.println("-------收到消息辣!-----"); System.out.println("發過來的用戶名為:" + user1.getName()); //basicAck()表示確認已經消費消息。通知一下mq,需要先得到 delivery tag //delivery tag可以從消息頭裡邊get出來 Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); } }
把消費端的服務打開後,就已經在監聽瞭。若監聽的隊列中已有消息,則會立即處理。直到隊列中沒消息為止。
若隊列為空,他就不會動,這個時候我啟動一下生產者那邊的測試,消息一發出去,立馬就被消費。非常完美。就是這個效果。
呼,偶爾也不想咸魚瞭啊,今天一天大概把RabbitMQ搞明白一些瞭,配置也會配瞭,消息也會發瞭。踩瞭一萬個坑,有不少是那種比較SB的采坑方式,一般人應該踩不到,我就不打出來瞭。
還是感覺有很多收獲的。就是累成麻瓜瞭。
以上就是SpringBoot整合RabbitMQ, 實現生產者與消費者的功能的詳細內容,更多關於SpringBoot整合RabbitMQ, 實現生產者與消費者的功能的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- rabbitmq中routingkey的作用說明
- RabbitMQ 3.9.7 鏡像模式集群與Springboot 2.5.5 整合
- 聊聊RabbitMQ發佈確認高級問題
- springboot2.5.6集成RabbitMq實現Topic主題模式(推薦)
- Springboot整合Rabbitmq之Confirm和Return機制