RocketMQ普通消息實戰演練詳解
引言
之前研究瞭RocketMQ的源碼,在這裡將各種消息發送與消費的demo進行舉例,方便以後使用的時候CV。
相關的配置,安裝和啟動在這篇文章有相關講解 https://www.jb51.net/article/260237.htm
普通消息同步發送
同步消息是指發送出消息後,同步等待,直到接收到Broker發送成功的響應才會繼續發送下一個消息。這個方式可以確保消息發送到Broker成功,一些重要的消息可以使用此方式,比如重要的通知。
public static void main(String[] args) throws Exception { //實例化消息生產者對象 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //設置NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //啟動Producer實例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8)); //同步發送方式 SendResult send = producer.send(msg); //確認返回 System.out.println(send); } //關閉producer producer.shutdown(); }
普通消息異步發送
異步消息發送方在發送瞭一條消息後,不等接收方發回響應,接著進行第二條消息發送。發送方通過回調接口的方式接收服務器響應,並對響應結果進行處理。
public static void main(String[] args) throws Exception { //實例化消息生產者對象 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //設置NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //啟動Producer實例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8)); //SendCallback會接收異步返回結果的回調 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable throwable) { throwable.printStackTrace(); } }); } //若是過早關閉producer,會拋出The producer service state not OK, SHUTDOWN_ALREADY的錯 Thread.sleep(10000); //關閉producer producer.shutdown(); }
普通消息單向發送
單項發送不關心發送的結果,隻發送請求不等待應答。發送消息耗時極短。
public static void main(String[] args) throws Exception { //實例化消息生產者對象 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //設置NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //啟動Producer實例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8)); //同步發送方式 producer.sendOneway(msg); } //關閉producer producer.shutdown(); }
集群消費模式
消費者采用負載均衡的方式消費消息,同一個Group下的多個Consumer共同消費Queue裡的Message,每個Consumer處理的消息不同。
一個Consumer Group中的各個Consumer實例分共同消費消息,即一條消息隻會投遞到一個Group下面的一個實例,並且隻消費一遍。
例如某個Topic有3個隊列,其中一個Consumer Group 有 3 個實例,那麼每個實例隻消費其中的1個隊列。集群消費模式是消費者默認的消費方式。
public static void main(String[] args) throws Exception { //實例化消息消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke"); //指定nameserver地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //訂閱topic,"*"表示所有tag consumer.subscribe("topic_luke","*"); consumer.setMessageModel(MessageModel.CLUSTERING); // 註冊回調實現類來處理從broker拉取回來的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @SneakyThrows @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } // 標記該消息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者實例 consumer.start(); System.out.printf("Consumer Started.%n"); }
廣播消費模式
廣播消費模式中把消息對一個Group下的各個Consumer實例都投遞一遍。也就是說消息也會被 Group 中的每個Consumer都消費一次。
實際上,是一個消費組下的每個消費者實例都獲取到瞭topic下面的每個Message Queue去拉取消費。所以消息會投遞到每個消費者實例。
public static void main(String[] args) throws Exception { //實例化消息消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke"); //指定nameserver地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //訂閱topic,"*"表示所有tag consumer.subscribe("topic_luke","*"); consumer.setMessageModel(MessageModel.BROADCASTING); // 註冊回調實現類來處理從broker拉取回來的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @SneakyThrows @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } // 標記該消息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者實例 consumer.start(); System.out.printf("Consumer Started.%n"); }
以上就是RocketMQ普通消息實戰演練詳解的詳細內容,更多關於RocketMQ普通消息的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- RocketMQ實現隨緣分BUG小功能示例詳解
- RocketMQ消息隊列實現隨機消息發送當做七夕禮物
- 分佈式消息隊列RocketMQ概念詳解
- docker安裝RocketMQ的實現步驟
- RocketMQ消息發送流程源碼剖析