RocketMQ消息隊列實現隨機消息發送當做七夕禮物

正文

都在過情人節,前端的小哥哥們給女朋友畫個頁面,美美的,寫個chrome插件,好看的,俺們後端程序員咋辦。

我給媳婦寫首詩,哈哈

我決定,把想對媳婦說的,今天發送到一個MQ裡邊,然後在七夕當天,打開消費者,將這一段話給俺媳婦看。你看,這就是我好久前對你說的話,這就是我們後端程序員的浪漫。當然也可以多發送幾個,到時候跟根據topic控制到底發什麼,哈哈。

這裡首先得用順序消息,當然,消息過期時間得設置的長一點。

1 下載並啟動RocketMQ

點擊下載,這是個windows版本的。

下載完成解壓後長這樣:

然後後還需要配置環境變量

這個時候就可以進入到RocketMQ的bin目錄啟動MQ瞭

1.1 首先啟動name server

start mqnamesrv.cmd

1.2 然後啟動Broker

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

這個時候就啟動成功瞭

2 生產者

需要註意的是,消息必須是順序消息,不然發送的消息順序就亂瞭。一首情詩順序亂瞭,讀不下去,豈不是很尷尬。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class RocketMQOrderProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        String[] tags = new String[]{"TagA", "TagC", "TagD"};
        //讀取文件
        List<String> messages = getMessages();
        for (int i = 0; i < messages.size(); i++) {
            String body = messages.get(i);
            Message msg = new Message("topic_luke", tags[i % tags.length], "KEY" + i, body.getBytes());
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = ((Integer)arg).longValue();
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, i);
        }
        producer.shutdown();
    }
    static List<String> getMessages() throws IOException {
        String temp = null;
        File f = new File("E:\Code\online-taxi-three\demo\luke.txt");
        InputStreamReader read = new InputStreamReader(new FileInputStream(f));
        ArrayList readList = new ArrayList();
        BufferedReader reader = new BufferedReader(read);
        while ((temp = reader.readLine()) != null && !"".equals(temp)) {
            readList.add(temp);
        }
        return readList;
    }
}

3 消費者

這裡需要註意的是setConsumeThreadMaxsetConsumeThreadMin都需要設置成1,單線程取消息這樣就可以通過sleep控制消息的顯示速度,不然並發取消息就很快顯示完瞭。不夠唯美。

import lombok.SneakyThrows;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class RockerMQConsumer {
    public static void main(String[] args) throws Exception {
        //實例化消息消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
        //指定nameserver地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeThreadMax(1);
        consumer.setConsumeThreadMin(1);
        consumer.setPullBatchSize(1);
        //訂閱topic
        consumer.subscribe("topic_luke","*");
        // 註冊回調實現類來處理從broker拉取回來的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @SneakyThrows
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                    TimeUnit.SECONDS.sleep(3);
                }
                // 標記該消息已經被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者實例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

發送的內容在這裡自由編寫哈,路徑和文件名能對上就行,謝謝觀看,最近突發奇想,把技術編成故事講出來,會不會比較受歡迎呢。

以上就是RocketMQ消息隊列實現隨機消息發送當做七夕禮物的詳細內容,更多關於RocketMQ消息隊列隨機消息的資料請關註WalkonNet其它相關文章!

推薦閱讀: