RocketMQ消息隊列實現隨機消息發送當做七夕禮物
正文
都在過情人節,前端的小哥哥們給女朋友畫個頁面,美美的,寫個chrome插件,好看的,俺們後端程序員咋辦。
我給媳婦寫首詩,哈哈
我決定,把想對媳婦說的,今天發送到一個MQ裡邊,然後在七夕當天,打開消費者,將這一段話給俺媳婦看。你看,這就是我好久前對你說的話,這就是我們後端程序員的浪漫。當然也可以多發送幾個,到時候跟根據topic
控制到底發什麼,哈哈。
這裡首先得用順序消息,當然,消息過期時間得設置的長一點。
1 下載並啟動RocketMQ
點擊下載,這是個windows版本的。
下載完成解壓後長這樣:
然後後還需要配置環境變量
這個時候就可以進入到RocketMQ的bin目錄啟動MQ瞭
1.1 首先啟動name server
start mqnamesrv.cmd
1.2 然後啟動Broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
這個時候就啟動成功瞭
2 生產者
需要註意的是,消息必須是順序消息,不然發送的消息順序就亂瞭。一首情詩順序亂瞭,讀不下去,豈不是很尷尬。
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import java.util.List; public class RocketMQOrderProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); String[] tags = new String[]{"TagA", "TagC", "TagD"}; //讀取文件 List<String> messages = getMessages(); for (int i = 0; i < messages.size(); i++) { String body = messages.get(i); Message msg = new Message("topic_luke", tags[i % tags.length], "KEY" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long id = ((Integer)arg).longValue(); long index = id % mqs.size(); return mqs.get((int) index); } }, i); } producer.shutdown(); } static List<String> getMessages() throws IOException { String temp = null; File f = new File("E:\Code\online-taxi-three\demo\luke.txt"); InputStreamReader read = new InputStreamReader(new FileInputStream(f)); ArrayList readList = new ArrayList(); BufferedReader reader = new BufferedReader(read); while ((temp = reader.readLine()) != null && !"".equals(temp)) { readList.add(temp); } return readList; } }
3 消費者
這裡需要註意的是setConsumeThreadMax
和setConsumeThreadMin
都需要設置成1,單線程取消息這樣就可以通過sleep控制消息的顯示速度,不然並發取消息就很快顯示完瞭。不夠唯美。
import lombok.SneakyThrows; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; import java.util.concurrent.TimeUnit; public class RockerMQConsumer { public static void main(String[] args) throws Exception { //實例化消息消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke"); //指定nameserver地址 consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeThreadMax(1); consumer.setConsumeThreadMin(1); consumer.setPullBatchSize(1); //訂閱topic consumer.subscribe("topic_luke","*"); // 註冊回調實現類來處理從broker拉取回來的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @SneakyThrows @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); TimeUnit.SECONDS.sleep(3); } // 標記該消息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者實例 consumer.start(); System.out.printf("Consumer Started.%n"); } }
發送的內容在這裡自由編寫哈,路徑和文件名能對上就行,謝謝觀看,最近突發奇想,把技術編成故事講出來,會不會比較受歡迎呢。
以上就是RocketMQ消息隊列實現隨機消息發送當做七夕禮物的詳細內容,更多關於RocketMQ消息隊列隨機消息的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- RocketMQ實現隨緣分BUG小功能示例詳解
- 分佈式消息隊列RocketMQ概念詳解
- RocketMQ普通消息實戰演練詳解
- docker安裝RocketMQ的實現步驟
- 一文徹底掌握RocketMQ 的存儲模型