分佈式消息隊列RocketMQ概念詳解
1.MQ概述
1.1 RocketMQ簡介
RocketMQ 是阿裡開源的分佈式消息中間件,跟其它中間件相比,RocketMQ 的特點是純JAVA實現,是一套提供瞭消息生產,存儲,消費全過程API的軟件系統。
1.2 MQ用途
限流削峰
MQ可以將系統的超量請求暫存其中,以便系統後期可以慢慢進行處理,從而避免瞭請求的丟失或系統被壓垮。
異步解耦
上遊系統對下遊系統的調用若為同步調用,則會大大降低系統的吞吐量與並發度,且系統耦合度太高、而異步調用則會解決這些問題。所以兩層之間若要實現由同步到異步的轉化,一般性做法就是,在這兩層間添加一個MQ層。
數據收集
分佈式系統會產生海量級數據流,如:業務日志、監控數據、用戶行為等。針對這些數據流進行實時或批量采集匯總,然後對這些數據流進行大數據分析,這是當前互聯網平臺的必備技術。通過MQ完成此類數據收集是最好的選擇。
1.3 常見MQ產品
RabbitMQ
RabbitMQ是使用ErLang語言開發的一款MQ產品。其吞吐量較Kafka與RocketMQ要低,且由於其不是Java語言開發,所以公司內部對其實現定制化開發難度較大。
Kafka
Kafka是使用Scala/Java語言開發的一款MQ產品。其最大的特點就是高吞吐量,常用於大數據領域的實時計算、日志采集等場景。其沒有遵循任何常見的MQ協議,而是使用自研協議。
RocketMQ
RocketMQ是使用Java語言開發的一款MQ產品。經過數年阿裡雙11的考驗,性能與穩定性非常高。其沒有遵循任何常見的MQ協議,而是使用自研協議。
對比
2.RocketMQ 基本概念
2.1 消息
消息是指,消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬於一個主題。單個消息所占空間不會很大。
RocketMQ中每個消息擁有唯一的MessageId,且可以攜帶具有業務標識的Key,以方便對消息的查詢。不過需要註意的是,MessageId有兩個:在生產者send()消息時會自動生成一個MessageId(msgId),當消息到達Broker後,Broker也會自動生成一個MessageId(offsetMsgId)。msgId、offsetMsgId與key都稱為消息標識。
msgId:由producer端生成,其生成規則為: producerIp + 進程pid + MessageClientIDSetter類的ClassLoader的hashCode + 當前時間 + AutomicInteger自增計數器
offsetMsgId:由broker端生成,其生成規則為:brokerIp + 物理分區的offset(Queue中的偏移量)
key:由用戶指定的業務相關的唯一標識
2.2 主題
Topic表示一類消息的集合,每個主題包含若幹條消息,每條消息隻能屬於一個主題,是RocketMQ進行消息訂閱的基本單位。 一個生產者可以同時發送多種Topic的消息;而一個消費者隻對某種特定的Topic感興趣,即隻可以訂閱和消費一種Topic的消息。
2.3 標簽
標簽為消息設置的標簽,用於同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標簽。 標簽能夠有效地保持代碼的清晰度和連貫性,並優化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴展性。 Topic是消息的一級分類,Tag是消息的二級分類。Topic相當於貨物,Tag相當於上海山東等地區。
2.4 隊列
存儲消息的物理實體。 一個Topic中可以包含多個Queue,每個Queue中存放的就是該Topic的消息。 一個Topic的Queue也被稱為一個Topic中消息的分區(Partition)。 一個Topic的Queue中的消息隻能被一個消費者組中的一個消費者消費。 一個Queue中的消息不允許同一個消費者組中的多個消費者同時消費。
分片不同於分區。在RocketMQ中,分片指的是存放相應Topic的Broker。每個分片中會創建出相應數量的分區,即Queue,每個Queue的大小都是相同的。
2.5 Producer
消息生產者,負責生產消息。Producer通過MQ的負載均衡模塊選擇相應的Broker集群隊列進行消息投遞,投遞的過程支持快速失敗並且低延遲。 例如:用戶提交的請求寫入到MQ的過程,就是消息生產的過程,在這裡用戶就是生產者 。
RocketMQ中的消息生產者都是以生產者組(Producer Group)的形式出現的。生產者組是同一類生產者的集合,這類Producer發送相同Topic類型的消息。一個生產者組可以同時發送多個主題的消息。如果主題中有多個隊列,生產者組隻有一個生產者,生產者會采取輪詢的方式進行發送消息。
生產者代碼如下:
導入依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency>
生產者代碼
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer order = new DefaultMQProducer("order"); order.setNamesrvAddr("localhost:9876"); order.start(); Message message = new Message("myTopic", "myTag", ("test").getBytes()); SendResult result = order.send(message); System.out.println(result); order.shutdown(); }
2.6 Consumer
消息消費者,負責消費消息。一個消息消費者會從Broker服務器中獲取到消息,並對消息進行相關業務處理。 例如:系統從MQ中讀取到請求,並對請求進行處理的過程就是消息消費的過程,在這裡系統就是消費者。
RocketMQ中的消息消費者都是以消費者組(Consumer Group)的形式出現的。消費者組是同一類消費者的集合,這類Consumer消費的是同一個Topic類型的消息。 消費者組使得在消息消費方面,實現負載均衡(將一個Topic中的不同的Queue平均分配給同一個Consumer Group的不同的Consumer,註意,並不是將消息負載均衡)和容錯(一個Consmer掛瞭,該Consumer Group中的其它Consumer可以接著消費原Consumer消費的Queue)的目標變得非常容易。
消費者代碼
public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("myTopic","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { System.out.println("收到的消息"+list); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
負載均衡策略
queue 個數大於 Consumer個數, 那麼 Consumer 會平均分配 queue,不夠平均,會根據clientId排序來拿取餘數
queue個數小於Consumer個數,那麼會有Consumer閑置,就是浪費掉瞭,其餘Consumer平均分配到queue
消費者組中Consumer的數量應該小於等於訂閱Topic的Queue數量。如果超出Queue數量,則多出的Consumer將不能消費消息。
2.7 NameServer
NameServer是一個Broker與Topic路由的註冊中心,支持Broker的動態註冊與發現。
主要包括兩個功能:
Broker管理:接受Broker集群的註冊信息並且保存下來作為路由信息的基本數據;提供心跳檢測機制,檢查Broker是否還存活。
路由信息管理:每個NameServer中都保存著Broker集群的整個路由信息和用於客戶端查詢的隊列信息。Producer和Conumser通過NameServer可以獲取整個Broker集群的路由信息,從而進行消息的投遞和消費。NameServer可以獲取整個Broker集群的路由信息,從而進行消息的投遞和消費。
路由註冊
Name Server既然是註冊中心,那麼是如何完成註冊的呢? NameServer通常也是以集群的方式部署,不過,NameServer是無狀態的,即NameServer集群中的各個節點間是無差異的,各節點間相互不進行信息通訊。 那各節點中的數據是如何進行數據同步的呢?在Broker節點啟動時,輪詢NameServer列表,與每個NameServer節點建立長連接,發起註冊請求。在NameServer內部維護著⼀個Broker列表,用來動態存儲Broker的信息。
Broker節點為瞭證明自己是活著的,為瞭維護與NameServer間的長連接,會將最新的信息以心跳包的方式上報給NameServer,每30秒發送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、Broker名稱、Broker所屬集群名稱等等。NameServer在接收到心跳包後,會更新心跳時間戳,記錄這個Broker的最新存活時間。
路由剔除
由於Broker關機、宕機或網絡抖動等原因,NameServer沒有收到Broker的心跳,NameServer可能會將其從Broker列表中剔除。 NameServer中有⼀個定時任務,每隔10秒就會掃描⼀次Broker表,查看每一個Broker的最新心跳時間戳距離當前時間是否超過120秒,如果超過,則會判定Broker失效,然後將其從Broker列表中剔除。
路由發現
RocketMQ的路由發現采用的是Pull模型。當Topic路由信息出現變化時,NameServer不會主動推送給客戶端,而是客戶端定時拉取Topic最新的路由。 默認客戶端每30秒會拉取一次最新的路由。
2.8 Broker
Broker充當著消息中轉角色,負責存儲消息、轉發消息。
Broker在RocketMQ系統中負責接收並存儲從生產者發送來的消息,同時為消費者的拉取請求作準備。Broker同時也存儲著消息相關的元數據,包括消費者組消費進度偏移offset、主題、隊列等。
模塊如下圖:
Remoting Module:整個Broker的實體,負責處理來自clients端的請求。而這個Broker實體則由以下模塊構成。
Client Manager:客戶端管理器。負責接收、解析客戶端(Producer/Consumer)請求,管理客戶端。例如,維護Consumer的Topic訂閱信息
Store Service:存儲服務。提供方便簡單的API接口,處理消息存儲到物理硬盤和消息查詢功能。
HA Service:高可用服務,提供Master Broker 和 Slave Broker之間的數據同步功能。
Index Service:索引服務。根據特定的Message key,對投遞到Broker的消息進行索引服務,同時也提供根據Message Key對消息進行快速查詢的功能。
2.9 RocketMQ 工作流程
工作流程如下圖:
1)啟動NameServer,NameServer啟動後開始監聽端口,等待Broker、Producer、Consumer連接。
2)啟動Broker時,Broker會與所有的NameServer建立並保持長連接,然後每50秒向NameServer定時發送心跳包。
3)發送消息前,可以先創建Topic,創建Topic時需要指定該Topic要存儲在哪些Broker上,當然,在創建Topic時也會將Topic與Broker的關系寫入到NameServer中。不過,這步是可選的,也可以在發送消息時自動創建Topic。
4) Producer發送消息,啟動時先跟NameServer集群中的其中一臺建立長連接,並從NameServer中獲取路由信息,即當前發送的Topic消息的Queue與Broker的地址(IP+Port)的映射關系。然後根據算法策略從隊選擇一個Queue,與隊列所在的Broker建立長連接從而向Broker發消息。當然,在獲取到路由信息後,Producer會首先將路由信息緩存到本地,再每30秒從NameServer更新一次路由信息。
5)Consumer跟Producer類似,跟其中一臺NameServer建立長連接,獲取其所訂閱Topic的路由信息,然後根據算法策略從路由信息中獲取到其所要消費的Queue,然後直接跟Broker建立長連接,開始消費其中的消息。Consumer在獲取到路由信息後,同樣也會每30秒從NameServer更新一次路由信息。不過不同於Producer的是,Consumer還會向Broker發送心跳,以確保Broker的存活狀態。
以上就是分佈式消息隊列RocketMQ概念詳解的詳細內容,更多關於RocketMQ概念的資料請關註WalkonNet其它相關文章!