C# 基於消息發佈訂閱模型的示例(上)

  在我們的開發過程中,我們經常會遇到這樣的場景就是一個對象的其中的一些狀態依賴於另外的一個對象的狀態,而且這兩個對象之間彼此是沒有關聯的,及兩者之間的耦合性非常低,特別是在這種基於容器模型的開發中遇到的會非常多,比如Prism框架或者MEF這種框架中,而我們會發現在這樣的系統中我們經常使用一種Publish和Subscribe的模式來進行交互,這種交互有什麼好處呢?基於帶著這些問題的思考,我們來一步步來剖析!

  首先第一步就是定義一個叫做IEventAggregator的接口,裡面定義瞭一些重載的Subscribe和Publish方法,我們具體來看一看這個接口:

/// <summary>
   ///   Enables loosely-coupled publication of and subscription to events.
   /// </summary>
   public interface IEventAggregator
   {
       /// <summary>
       ///   Gets or sets the default publication thread marshaller.
       /// </summary>
       /// <value>
       ///   The default publication thread marshaller.
       /// </value>
       Action<System.Action> PublicationThreadMarshaller { get; set; }
 
       /// <summary>
       ///   Subscribes an instance to all events declared through implementations of <see cref = "IHandle{T}" />
       /// </summary>
       /// <param name = "instance">The instance to subscribe for event publication.</param>
       void Subscribe(object instance);
 
       /// <summary>
       ///   Unsubscribes the instance from all events.
       /// </summary>
       /// <param name = "instance">The instance to unsubscribe.</param>
       void Unsubscribe(object instance);
 
       /// <summary>
       ///   Publishes a message.
       /// </summary>
       /// <param name = "message">The message instance.</param>
       /// <remarks>
       ///   Uses the default thread marshaller during publication.
       /// </remarks>
       void Publish(object message);
 
       /// <summary>
       ///   Publishes a message.
       /// </summary>
       /// <param name = "message">The message instance.</param>
       /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param>
       void Publish(object message, Action<System.Action> marshal);
   }

  有瞭這個接口,接下來就是怎樣去實現這個接口中的各種方法,我們來看看具體的實現過程。

/// <summary>
    ///   Enables loosely-coupled publication of and subscription to events.
    /// </summary>
    public class EventAggregator : IEventAggregator
    {
        /// <summary>
        ///   The default thread marshaller used for publication;
        /// </summary>
        public static Action<System.Action> DefaultPublicationThreadMarshaller = action => action();
 
        readonly List<Handler> handlers = new List<Handler>();
 
        /// <summary>
        ///   Initializes a new instance of the <see cref = "EventAggregator" /> class.
        /// </summary>
        public EventAggregator()
        {
            PublicationThreadMarshaller = DefaultPublicationThreadMarshaller;
        }
 
        /// <summary>
        ///   Gets or sets the default publication thread marshaller.
        /// </summary>
        /// <value>
        ///   The default publication thread marshaller.
        /// </value>
        public Action<System.Action> PublicationThreadMarshaller { get; set; }
 
        /// <summary>
        ///   Subscribes an instance to all events declared through implementations of <see cref = "IHandle{T}" />
        /// </summary>
        /// <param name = "instance">The instance to subscribe for event publication.</param>
        public virtual void Subscribe(object instance)
        {
            lock(handlers)
            {
                if (handlers.Any(x => x.Matches(instance)))
                {
                    return;
                }                   
                handlers.Add(new Handler(instance));
            }
        }
 
        /// <summary>
        ///   Unsubscribes the instance from all events.
        /// </summary>
        /// <param name = "instance">The instance to unsubscribe.</param>
        public virtual void Unsubscribe(object instance)
        {
            lock(handlers)
            {
                var found = handlers.FirstOrDefault(x => x.Matches(instance));
                if (found != null)
                {
                   handlers.Remove(found);
                }                  
            }
        }
 
        /// <summary>
        ///   Publishes a message.
        /// </summary>
        /// <param name = "message">The message instance.</param>
        /// <remarks>
        ///   Does not marshall the the publication to any special thread by default.
        /// </remarks>
        public virtual void Publish(object message)
        {
            Publish(message, PublicationThreadMarshaller);
        }
 
        /// <summary>
        ///   Publishes a message.
        /// </summary>
        /// <param name = "message">The message instance.</param>
        /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param>
        public virtual void Publish(object message, Action<System.Action> marshal)
        {
            Handler[] toNotify;
            lock (handlers)
            {
                toNotify = handlers.ToArray();
            }
            marshal(() =>
            {
                var messageType = message.GetType();
                var dead = toNotify
                    .Where(handler => !handler.Handle(messageType, message))
                    .ToList();
 
                if(dead.Any())
                {
                    lock(handlers)
                    {
                        foreach(var handler in dead)
                        {
                            handlers.Remove(handler);
                        }
                    }
                }
            });
        }
 
        protected class Handler
        {
            readonly WeakReference reference;
            readonly Dictionary<Type, MethodInfo> supportedHandlers = new Dictionary<Type, MethodInfo>();
 
            public Handler(object handler)
            {
                reference = new WeakReference(handler);
 
                var interfaces = handler.GetType().GetInterfaces()
                    .Where(x => typeof(IHandle).IsAssignableFrom(x) && x.IsGenericType);
 
                foreach(var @interface in interfaces)
                {
                    var type = @interface.GetGenericArguments()[0];
                    var method = @interface.GetMethod("Handle");
                    supportedHandlers[type] = method;
                }
            }
 
            public bool Matches(object instance)
            {
                return reference.Target == instance;
            }
 
            public bool Handle(Type messageType, object message)
            {
                var target = reference.Target;
                if(target == null)
                    return false;
 
                foreach(var pair in supportedHandlers)
                {
                    if(pair.Key.IsAssignableFrom(messageType))
                    {
                        pair.Value.Invoke(target, new[] { message });
                        return true;
                    }
                }
                return true;
            }
        }
    }

  首先在EventAggregator的內部維護瞭一個LIst<Handler>的List對象,用來存放一系列的Handle,那麼這個嵌套類Handler到底起什麼作用呢?

  我們會發現在每一次當執行這個Subscribe的方法的時候,會將當前object類型的參數instance傳入到Handler這個對象中,在Handler這個類的構造函數中,首先將這個instance放入到一個弱引用中去,然後再獲取這個對象所有繼承的接口,並查看是否繼承瞭IHandle<TMessage>這個泛型的接口,如果能夠獲取到,那麼就通過反射獲取到當前instance中定義的Handle方法,並獲取到其中定義的表示泛型類型的類型實參或泛型類型定義的類型形參,並把這兩個對象放到內部定義的一個Dictionary<Type, MethodInfo>字典之中,這樣就把這樣一個活得具體的處理方法的Handler對象放到瞭一個List<Handler>集合中,這個就是訂閱消息的核心部分,所以當前的對象要想訂閱一個消息,那麼必須實現泛型接口IHandle<TMessage>,並且實現接口中的方法,同時最重要的就是在當前對象的構造函數函數中去訂閱消息(即執行Subscribe(this),我們來看一看這個泛型接口IHandle<TMessage> 

public interface IHandle {}
 
/// <summary>
///   Denotes a class which can handle a particular type of message.
/// </summary>
/// <typeparam name = "TMessage">The type of message to handle.</typeparam>
public interface IHandle<TMessage> : IHandle
{
    /// <summary>
    ///   Handles the message.
    /// </summary>
    /// <param name = "message">The message.</param>
    void Handle(TMessage message);
}

  在看完瞭Subscribe這個方法後,後面我們就來看看Unsubscribe方法吧,這個思路其實很簡單就是找到List<Handler>中的這個對象,並且移除當前的對象就可以瞭,那麼下面我們關註的重點就是Publish這個方法中到底實現瞭什麼?首先來看看代碼,然後再來做一步步分析。 

/// <summary>
        ///   Publishes a message.
        /// </summary>
        /// <param name = "message">The message instance.</param>
        /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param>
        public virtual void Publish(object message, Action<System.Action> marshal)
        {
            Handler[] toNotify;
            lock (handlers)
            {
                toNotify = handlers.ToArray();
            }
            marshal(() =>
            {
                var messageType = message.GetType();
                var dead = toNotify
                    .Where(handler => !handler.Handle(messageType, message))
                    .ToList();
 
                if(dead.Any())
                {
                    lock(handlers)
                    {
                        foreach(var handler in dead)
                        {
                            handlers.Remove(handler);
                        }
                    }
                }
            });
        }

  我們看到,在發佈一個object類型的message的時候,必然對應著另外的一個對象來處理這個消息,那麼怎樣找到這個消息的處理這呢?

  對,我們在Subscribe一個對象的時候不是已經通過反射將訂閱這個消息的對象及方法都存在瞭一個List<Handler>中去瞭嗎?那麼我們隻需要在這個List中找到對應的和message類型一致的那個對象並執行裡面的Handle方法不就可以瞭嗎?確實是一個很好的思路,這裡我們看代碼也是這樣實行的。

  這裡面還有一個要點就是,如果執行的方法返回瞭false,就是執行不成功,那麼就從當前的List<Handler>中移除掉這個對象,因為這樣的操作是沒有任何意義的,通過這樣的過程我們就能夠完沒地去實現兩個對象之間的消息傳遞瞭,另外我們通過總結以後就能夠發現,這個思路實現的重點包括以下方面:

  1 所有消息訂閱的對象必須實現統一的接口IHandle<TMessage>,並實現裡面的Handel方法。

  2 整個EventAggregator必須是單實例或者是靜態的,這樣才能夠在統一的集合中去實現上述的各種操作。

  最後還是按照之前的慣例,最後給出一個具體的實例來做相關的說明,請點擊此處進行下載,在下篇中我們將介紹一種簡單版的基於事件的發佈和訂閱模式的例子。

以上就是C# 基於消息發佈訂閱模型的示例(上)的詳細內容,更多關於c# 發佈訂閱模型的資料請關註WalkonNet其它相關文章!

推薦閱讀: