Java EventBus手把手帶你實現
一、說明
在Guava中,EventBus簡化瞭觀察者模式的實現。理解EventBus的原理來,自動動手實現一個簡單的EventBus。
二、Guava的EventBus
EventBus叫做“時間總線”,它提供瞭實現觀察者模式的骨架代碼。可以基於此框架,非常容易地在自己的業務場景中實現觀察者模式。它不僅支持異步非阻塞模式,同時支持同步阻塞模式。
基於EventBus,不需要定義Observer接口(觀察者接口),任意類型的對象都可以註冊到EventBus中。通過@Subscribe
註解來表明類中哪個函數可以接收觀察者發送的消息。
Guava EventBus中的幾個主要的類和函數:
EventBus、SyncEventBus
EventBus類中封裝瞭對外暴露的所有可調用接口。其中EventBus實現瞭同步阻塞的觀察者模式,SyncEventBus繼承EventBus提供瞭異步非阻塞的觀察者模式。
// 同步阻塞的方式 EventBus eventBus = new EventBus(); // 異步非阻塞的方式 final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20; // 異步非阻塞線程池大小 ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE); EventBus eventBus = new AsyncEventBus(executorService);
register()
函數
EventBus通過register()
函數來註冊觀察者。它可以接收任意類型(Object)的觀察者。具體的函數定義如下:
public void register(Object object) { //...... }
unregister()
函數
相對於register()
,unregister()
函數是從EventBus中刪除某個觀察者。
public void unregister(Object object) { //...... }
post
函數
EventBus提供post()
函數 ,用來給觀察者發消息。
public void post(Object event) { //...... }
post發送消息的時候,並不是把消息發送給所有的觀察者,而是發送給可匹配的觀察者。所謂可匹配指的是,能接收的消息類型是發送消息(post函數中定義的父類)。
比如,AObserver能接收的消息類型是XMsg,BObserver能接收的消息類型是YMsg,CObserver能接收的消息類型是ZMsg。其中,XMsg是YMsg的父類。
XMsg xMsg= new XMsg(); YMsg yMsg= new YMsg(); ZMsg zMsg= new ZMsg(); post(xMsg);// AObserver 接收消息 post(yMsg);// AObserver和BObserver接收到消息 post(zMsg);// CObserver接收到消息
Observer(觀察者)能接收到消息類型是通過@Subscribe
註解定義的。
@Subscribe
註解
EventBus通過@Subscribe
註解類標明,某個函數能接收哪種類型的消息。(類型不能是基本類型)
三、EventBus的原理
四、動手實現一個EventBus
@Beat
標註一個公共的API(公共的類、方法或字段) 在未來的發行版本中會發生不兼容的變化。帶有此註釋的 API 不受其包含庫所做的任何兼容性保證。請註意,此註釋的存在並不意味著所討論 API 的質量或性能,隻是它不是“API 凍結”的事實。
應用程序依賴 beta API 通常是安全的,但需要在升級期間進行一些額外的工作。然而,不建議在類庫(包含在用戶的CLASSPATH中,不受開發人員的控制)上這麼做。
4.1 定義Subscribe註解
定義Subscribe註解,用於標明哪個函數可以接收消息。
/** * 定義一個註解,表明觀察者中的哪個函數可以接收消息 */ @Retention(RetentionPolicy.RUNTIME) // 註解的聲明周期 @Target(ElementType.METHOD) // 註解作用的地方 @Beta // 標註API在未來發行的版本是可能有不兼容的變化 public @interface MySubscribe { }
4.2 ObserverAction
用來表示@MySubscribe
註解的方法。
/** * 用來表示 @MySubscribe 註解方法 */ public class MyObserverAction { private Object target; private Method method; public MyObserverAction(Object target, Method method) { this.target = checkNotNull(target); this.method = method; this.method.setAccessible(true); } /** * event是method方法的參數 * @param event */ public void execute(Object event) { try { method.invoke(target, event); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } } }
4.3 ObserverRegister
Observer 註冊表。
/** * Observer 註冊表 */ public class MyObserverRegister { // 註冊表, 消息類型: 觀察者方法 private ConcurrentMap<Class<?>, CopyOnWriteArraySet<MyObserverAction>> registry = new ConcurrentHashMap<>(); /** * 將觀察者註冊到 註冊表中 * @param observer 觀察者 */ public void register(Object observer) { Map<Class<?>, Collection<MyObserverAction>> observerActions = findAllObserverActions(observer); for (Map.Entry<Class<?>, Collection<MyObserverAction>> entry : observerActions.entrySet()) { Class<?> eventType = entry.getKey(); Collection<MyObserverAction> evenActions = entry.getValue(); CopyOnWriteArraySet<MyObserverAction> registryEvenActions = registry.getOrDefault(eventType, new CopyOnWriteArraySet<>()); registryEvenActions.addAll(evenActions); registry.put(eventType, registryEvenActions); } } /** * 獲取匹配的觀察者事件 * @param event * @return */ public List<MyObserverAction> getMatchedMyObserverActions(Object event) { List<MyObserverAction> result = new ArrayList<>(); Class<?> postedEventClass = event.getClass(); for (Map.Entry<Class<?>, CopyOnWriteArraySet<MyObserverAction>> entry : registry.entrySet()) { Class<?> eventClass = entry.getKey(); // 匹配相同類型或父類型 if (postedEventClass.isAssignableFrom(eventClass)) { result.addAll(entry.getValue()); } } return result; } // 消息類型(觀察者類型類型及其父類型) 觀察者方法 public Map<Class<?>, Collection<MyObserverAction>> findAllObserverActions(Object observer) { Map<Class<?>, Collection<MyObserverAction>> result = new HashMap<>(); // 觀察者類型 Class<?> observerClass = observer.getClass(); for (Method method : getAnnotatedMethods(observerClass)) { Class<?>[] parameterTypes = method.getParameterTypes(); Class<?> eventType = parameterTypes[0]; result.putIfAbsent(eventType, new ArrayList<>()); result.get(eventType).add(new MyObserverAction(observer, method)); } return result; } /** * 根據觀察者類型,查找方法列表 * @param clazz * @return */ public List<Method> getAnnotatedMethods(Class<?> clazz) { List<Method> result = new ArrayList<>(); for (Method method : clazz.getDeclaredMethods()) { if (method.isAnnotationPresent(MySubscribe.class)) { Class<?>[] parameterTypes = method.getParameterTypes(); checkArgument(parameterTypes.length==1, "方法%s 有一個註解@MySubscribe ,它有%s個參數,實際要求有且隻有一個參數", method, parameterTypes.length); result.add(method); } } return result; } }
4.4 EventBus
/** * 實現 同步阻塞的 EventBus */ public class MyEventBus { private Executor executor; private MyObserverRegister register = new MyObserverRegister(); public MyEventBus() { // MoreExecutors.directExecutor() 是 Google Guava 提供的工具類,看似是多線程,實際上是單線程。 // 之所以要這麼實現,主要還是為瞭跟 AsyncEventBus 統一代碼邏輯,做到代碼復用 this(MoreExecutors.directExecutor()); } // 註意這裡的修飾符 protected MyEventBus(Executor executor) { this.executor = executor; } public void register(Object observer) { register.register(observer); } public void post(Object event) { List<MyObserverAction> observerActions = register.getMatchedMyObserverActions(event); for (MyObserverAction observerAction : observerActions) { executor.execute(new Runnable() { @Override public void run() { observerAction.execute(event); } }); } } }
4.5 SyncEventBus
/** * 異步非阻塞的EventBus */ public class MySyncEventBus extends MyEventBus { public MySyncEventBus(Executor executor) { super(executor); } }
五、使用自定義的EventBus
public static void main(String[] args) { // 自定義的EventBus MyEventBus myEventBus = new MyEventBus(); // 註冊一個觀察者 myEventBus.register(new CurrentConditionsDisplayListener()); // 向觀察者發送消息 myEventBus.post(23.0f); }
六、擴展
Spring Event
到此這篇關於Java EventBus手把手帶你實現的文章就介紹到這瞭,更多相關Java EventBus內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Android實現消息總線的幾種方式詳解
- Java設計模式之java觀察者模式詳解
- java和Spring中觀察者模式的應用詳解
- Go語言設計模式之實現觀察者模式解決代碼臃腫
- vue中的eventBus會不會產生內存泄漏你知道嗎