C#利用RabbitMQ實現點對點消息傳輸

消息隊列模型

所有 MQ 產品從模型抽象上來說都是一樣的過程:
消費者(consumer)訂閱某個隊列。生產者(producer)創建消息,然後發佈到隊列(queue)中,最後將消息發送到監聽的消費者。

RabbitMQ設置

RabbitMQ是通過交換機將消息轉發到對應隊列,所以隊列需要和交換機進行綁定。本例將隊列綁定到默認的amq.direct交換機,並設置Routing key,如下圖所示:

RabbitMQ動態庫安裝

通過NuGet包管理器進行安裝RabbitMQ.Client,如下所示:

RabbitMQ.Client相關知識點

  • ConnectionFactory:構造一個實例,主要創建連接。
  • IConnection:表示一個基於AMQP協議的連接。
  • IModel:表示一個RabbitMQ通道,可用於聲明一個隊列,然後開始消費。
  • EventingBasicConsumer:基於獨立事件監聽的基礎消費者,可以監聽並接收消息。
  • 生產者基本步驟:1. 創建連接 2. 基於連接創建通道 3. 基於通道聲明隊列,4. 開始生產並發佈消息
  • 消費者基本步驟:1. 創建連接 2. 基於連接創建通道 3. 基於通道聲明隊列,4. 創建消費者,5. 綁定通道和消費者,並開始消費

示例效果圖

本例主要有一個生產者,一個消費者,通過消息隊列進行消息轉發和接收。

生產者負責消息發送,如下圖所示:

消費者負責消息接收,如下圖所示:

核心代碼

代碼結構:主要包括生產者,消費者,公共基礎代碼,如下所示:

RabbitMqHelper主要創建連接,如下所示:

public class RabbitMqHelper
    {

        /// <summary>
        /// 創建連接
        /// </summary>
        /// <returns></returns>
        public IConnection GetConnection()
        {
            try
            {
                var factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    Port = 5672,
                    UserName = "guest",
                    Password = "guest",
                    VirtualHost = "/ShortMsgHost"
                };
                var conn = factory.CreateConnection();
                return conn;
            }
            catch (Exception ex) {
                throw ex;
            }
        }



    }

RabbmitMqSendHelper用於發送消息,如下所示:

public class RabbmitMqSendHelper : RabbitMqHelper
    {
        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="msg"></param>
        /// <returns></returns>
        public bool SendMsg(string msg)
        {
            try
            {
                using (var conn = GetConnection())
                {
                    using (var channel = conn.CreateModel())
                    {
                        channel.QueueDeclare(queue: "ShortMsgQueue",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                        var body = Encoding.UTF8.GetBytes(msg);

                        channel.BasicPublish(exchange: "amq.direct",
                                             routingKey: "ShortMsgKey",
                                             basicProperties: null,
                                             body: body);

                        //Console.WriteLine(" [x] Sent {0}", message);
                    };
                };
                return true;
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }
    }

RabbitMqReceiveHelper主要用於接收信息,如下所示:

public class RabbitMqReceiveHelper : RabbitMqHelper
    {
        public RabbitMqReceiveEventHandler OnReceiveEvent;

        private IConnection conn;

        private IModel channel;

        private EventingBasicConsumer consumer;

        public bool StartReceiveMsg()
        {
            try
            {
                conn = GetConnection();

                channel = conn.CreateModel();

                channel.QueueDeclare(queue: "ShortMsgQueue",
                                durable: true,
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);

                consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    //Console.WriteLine(" [x] Received {0}", message);
                    if (OnReceiveEvent != null)
                    {
                        OnReceiveEvent(message);
                    }
                };
                channel.BasicConsume(queue: "ShortMsgQueue",
                                        autoAck: true,
                                        consumer: consumer);
                return true;
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }
    }

作者:Alan.hsiang
出處:http://www.cnblogs.com/hsiang/

以上就是C#利用RabbitMQ實現點對點消息傳輸的實現示例的詳細內容,更多關於c# 用RabbitMQ實現點對點消息傳輸的資料請關註WalkonNet其它相關文章!

推薦閱讀: