RocketMQ普通消息實戰演練詳解

引言

之前研究瞭RocketMQ的源碼,在這裡將各種消息發送與消費的demo進行舉例,方便以後使用的時候CV。

相關的配置,安裝和啟動在這篇文章有相關講解  https://www.jb51.net/article/260237.htm

普通消息同步發送

同步消息是指發送出消息後,同步等待,直到接收到Broker發送成功的響應才會繼續發送下一個消息。這個方式可以確保消息發送到Broker成功,一些重要的消息可以使用此方式,比如重要的通知。

public static void main(String[] args) throws Exception {
    //實例化消息生產者對象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //設置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //啟動Producer實例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8));
        //同步發送方式
        SendResult send = producer.send(msg);
        //確認返回
        System.out.println(send);
    }
    //關閉producer
    producer.shutdown();
}

普通消息異步發送

異步消息發送方在發送瞭一條消息後,不等接收方發回響應,接著進行第二條消息發送。發送方通過回調接口的方式接收服務器響應,並對響應結果進行處理。

public static void main(String[] args) throws Exception {
    //實例化消息生產者對象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //設置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //啟動Producer實例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8));
        //SendCallback會接收異步返回結果的回調
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }
            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
            }
        });
    }
    //若是過早關閉producer,會拋出The producer service state not OK, SHUTDOWN_ALREADY的錯
    Thread.sleep(10000);
    //關閉producer
    producer.shutdown();
}

普通消息單向發送

單項發送不關心發送的結果,隻發送請求不等待應答。發送消息耗時極短。

public static void main(String[] args) throws Exception {
    //實例化消息生產者對象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //設置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //啟動Producer實例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8));
        //同步發送方式
        producer.sendOneway(msg);
    }
    //關閉producer
    producer.shutdown();
}

集群消費模式

消費者采用負載均衡的方式消費消息,同一個Group下的多個Consumer共同消費Queue裡的Message,每個Consumer處理的消息不同。

一個Consumer Group中的各個Consumer實例分共同消費消息,即一條消息隻會投遞到一個Group下面的一個實例,並且隻消費一遍。

例如某個Topic有3個隊列,其中一個Consumer Group 有 3 個實例,那麼每個實例隻消費其中的1個隊列。集群消費模式是消費者默認的消費方式。

public static void main(String[] args) throws Exception {
    //實例化消息消費者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
    //指定nameserver地址
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //訂閱topic,"*"表示所有tag
    consumer.subscribe("topic_luke","*");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    // 註冊回調實現類來處理從broker拉取回來的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @SneakyThrows
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            // 標記該消息已經被成功消費
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 啟動消費者實例
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

廣播消費模式

廣播消費模式中把消息對一個Group下的各個Consumer實例都投遞一遍。也就是說消息也會被 Group 中的每個Consumer都消費一次。

實際上,是一個消費組下的每個消費者實例都獲取到瞭topic下面的每個Message Queue去拉取消費。所以消息會投遞到每個消費者實例。

public static void main(String[] args) throws Exception {
    //實例化消息消費者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
    //指定nameserver地址
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //訂閱topic,"*"表示所有tag
    consumer.subscribe("topic_luke","*");
    consumer.setMessageModel(MessageModel.BROADCASTING);
    // 註冊回調實現類來處理從broker拉取回來的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @SneakyThrows
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            // 標記該消息已經被成功消費
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 啟動消費者實例
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

以上就是RocketMQ普通消息實戰演練詳解的詳細內容,更多關於RocketMQ普通消息的資料請關註WalkonNet其它相關文章!

推薦閱讀: