C#利用RabbitMQ實現點對點消息傳輸
消息隊列模型
所有 MQ 產品從模型抽象上來說都是一樣的過程:
消費者(consumer)訂閱某個隊列。生產者(producer)創建消息,然後發佈到隊列(queue)中,最後將消息發送到監聽的消費者。
RabbitMQ設置
RabbitMQ是通過交換機將消息轉發到對應隊列,所以隊列需要和交換機進行綁定。本例將隊列綁定到默認的amq.direct交換機,並設置Routing key,如下圖所示:
RabbitMQ動態庫安裝
通過NuGet包管理器進行安裝RabbitMQ.Client,如下所示:
RabbitMQ.Client相關知識點
- ConnectionFactory:構造一個實例,主要創建連接。
- IConnection:表示一個基於AMQP協議的連接。
- IModel:表示一個RabbitMQ通道,可用於聲明一個隊列,然後開始消費。
- EventingBasicConsumer:基於獨立事件監聽的基礎消費者,可以監聽並接收消息。
- 生產者基本步驟:1. 創建連接 2. 基於連接創建通道 3. 基於通道聲明隊列,4. 開始生產並發佈消息
- 消費者基本步驟:1. 創建連接 2. 基於連接創建通道 3. 基於通道聲明隊列,4. 創建消費者,5. 綁定通道和消費者,並開始消費
示例效果圖
本例主要有一個生產者,一個消費者,通過消息隊列進行消息轉發和接收。
生產者負責消息發送,如下圖所示:
消費者負責消息接收,如下圖所示:
核心代碼
代碼結構:主要包括生產者,消費者,公共基礎代碼,如下所示:
RabbitMqHelper主要創建連接,如下所示:
public class RabbitMqHelper { /// <summary> /// 創建連接 /// </summary> /// <returns></returns> public IConnection GetConnection() { try { var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost = "/ShortMsgHost" }; var conn = factory.CreateConnection(); return conn; } catch (Exception ex) { throw ex; } } }
RabbmitMqSendHelper用於發送消息,如下所示:
public class RabbmitMqSendHelper : RabbitMqHelper { /// <summary> /// 發送消息 /// </summary> /// <param name="msg"></param> /// <returns></returns> public bool SendMsg(string msg) { try { using (var conn = GetConnection()) { using (var channel = conn.CreateModel()) { channel.QueueDeclare(queue: "ShortMsgQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: "amq.direct", routingKey: "ShortMsgKey", basicProperties: null, body: body); //Console.WriteLine(" [x] Sent {0}", message); }; }; return true; } catch (Exception ex) { throw ex; } } }
RabbitMqReceiveHelper主要用於接收信息,如下所示:
public class RabbitMqReceiveHelper : RabbitMqHelper { public RabbitMqReceiveEventHandler OnReceiveEvent; private IConnection conn; private IModel channel; private EventingBasicConsumer consumer; public bool StartReceiveMsg() { try { conn = GetConnection(); channel = conn.CreateModel(); channel.QueueDeclare(queue: "ShortMsgQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); //Console.WriteLine(" [x] Received {0}", message); if (OnReceiveEvent != null) { OnReceiveEvent(message); } }; channel.BasicConsume(queue: "ShortMsgQueue", autoAck: true, consumer: consumer); return true; } catch (Exception ex) { throw ex; } } }
作者:Alan.hsiang
出處:http://www.cnblogs.com/hsiang/
以上就是C#利用RabbitMQ實現點對點消息傳輸的實現示例的詳細內容,更多關於c# 用RabbitMQ實現點對點消息傳輸的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- .net平臺的rabbitmq使用封裝demo詳解
- .Net Core和RabbitMQ限制循環消費的方法
- C#通過rabbitmq實現定時任務(延時隊列)
- rabbitmq中routingkey的作用說明
- SpringBoot+RabbitMQ實現消息可靠傳輸詳解