Java EventBus手把手帶你實現

一、說明

在Guava中,EventBus簡化瞭觀察者模式的實現。理解EventBus的原理來,自動動手實現一個簡單的EventBus。

二、Guava的EventBus

EventBus叫做“時間總線”,它提供瞭實現觀察者模式的骨架代碼。可以基於此框架,非常容易地在自己的業務場景中實現觀察者模式。它不僅支持異步非阻塞模式,同時支持同步阻塞模式。

基於EventBus,不需要定義Observer接口(觀察者接口),任意類型的對象都可以註冊到EventBus中。通過@Subscribe註解來表明類中哪個函數可以接收觀察者發送的消息。

Guava EventBus中的幾個主要的類和函數:

EventBus、SyncEventBus

EventBus類中封裝瞭對外暴露的所有可調用接口。其中EventBus實現瞭同步阻塞的觀察者模式,SyncEventBus繼承EventBus提供瞭異步非阻塞的觀察者模式。

// 同步阻塞的方式
EventBus eventBus = new EventBus(); 
// 異步非阻塞的方式
final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20; // 異步非阻塞線程池大小
ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE);
EventBus eventBus = new AsyncEventBus(executorService);

register() 函數

EventBus通過register()函數來註冊觀察者。它可以接收任意類型(Object)的觀察者。具體的函數定義如下:

public void register(Object object) {
  //......
}

unregister() 函數

相對於register()unregister()函數是從EventBus中刪除某個觀察者。

public void unregister(Object object) {
  //......
}

post函數

EventBus提供post()函數 ,用來給觀察者發消息。

public void post(Object event) {
  //......
}

post發送消息的時候,並不是把消息發送給所有的觀察者,而是發送給可匹配的觀察者。所謂可匹配指的是,能接收的消息類型是發送消息(post函數中定義的父類)。

比如,AObserver能接收的消息類型是XMsg,BObserver能接收的消息類型是YMsg,CObserver能接收的消息類型是ZMsg。其中,XMsg是YMsg的父類。

XMsg xMsg= new XMsg();
YMsg yMsg= new YMsg();
ZMsg zMsg= new ZMsg();
post(xMsg);// AObserver  接收消息
post(yMsg);// AObserver和BObserver接收到消息
post(zMsg);// CObserver接收到消息

Observer(觀察者)能接收到消息類型是通過@Subscribe註解定義的。

@Subscribe 註解

EventBus通過@Subscribe註解類標明,某個函數能接收哪種類型的消息。(類型不能是基本類型)

三、EventBus的原理

四、動手實現一個EventBus

@Beat 標註一個公共的API(公共的類、方法或字段) 在未來的發行版本中會發生不兼容的變化。帶有此註釋的 API 不受其包含庫所做的任何兼容性保證。請註意,此註釋的存在並不意味著所討論 API 的質量或性能,隻是它不是“API 凍結”的事實。

應用程序依賴 beta API 通常是安全的,但需要在升級期間進行一些額外的工作。然而,不建議在類庫(包含在用戶的CLASSPATH中,不受開發人員的控制)上這麼做。

4.1 定義Subscribe註解

定義Subscribe註解,用於標明哪個函數可以接收消息。

/**
 * 定義一個註解,表明觀察者中的哪個函數可以接收消息
 */
@Retention(RetentionPolicy.RUNTIME)  // 註解的聲明周期
@Target(ElementType.METHOD)  // 註解作用的地方
@Beta  // 標註API在未來發行的版本是可能有不兼容的變化
public @interface MySubscribe {
}

4.2 ObserverAction

用來表示@MySubscribe註解的方法。

/**
 * 用來表示 @MySubscribe 註解方法
 */
public class MyObserverAction {
    private Object target;
    private Method method;
    public MyObserverAction(Object target, Method method) {
        this.target = checkNotNull(target);
        this.method = method;
        this.method.setAccessible(true);
    }
    /**
     * event是method方法的參數
     * @param event
     */
    public void execute(Object event) {
        try {
            method.invoke(target, event);
        } catch (IllegalAccessException | InvocationTargetException  e) {
            throw new RuntimeException(e);
        }
    }
}

4.3 ObserverRegister

Observer 註冊表。

/**
 * Observer 註冊表
 */
public class MyObserverRegister {
    // 註冊表, 消息類型: 觀察者方法
    private ConcurrentMap<Class<?>, CopyOnWriteArraySet<MyObserverAction>> registry = new ConcurrentHashMap<>();
    /**
     * 將觀察者註冊到 註冊表中
     * @param observer 觀察者
     */
    public void register(Object observer) {
        Map<Class<?>, Collection<MyObserverAction>> observerActions = findAllObserverActions(observer);
        for (Map.Entry<Class<?>, Collection<MyObserverAction>> entry : observerActions.entrySet()) {
            Class<?> eventType = entry.getKey();
            Collection<MyObserverAction> evenActions = entry.getValue();
            CopyOnWriteArraySet<MyObserverAction> registryEvenActions =
                    registry.getOrDefault(eventType, new CopyOnWriteArraySet<>());
            registryEvenActions.addAll(evenActions);
            registry.put(eventType, registryEvenActions);
        }
    }
    /**
     * 獲取匹配的觀察者事件
     * @param event
     * @return
     */
    public List<MyObserverAction> getMatchedMyObserverActions(Object event) {
        List<MyObserverAction> result = new ArrayList<>();
        Class<?> postedEventClass = event.getClass();
        for (Map.Entry<Class<?>, CopyOnWriteArraySet<MyObserverAction>> entry : registry.entrySet()) {
            Class<?> eventClass = entry.getKey();
            // 匹配相同類型或父類型
            if (postedEventClass.isAssignableFrom(eventClass)) {
                result.addAll(entry.getValue());
            }
        }
        return result;
    }
    // 消息類型(觀察者類型類型及其父類型) 觀察者方法
    public Map<Class<?>, Collection<MyObserverAction>> findAllObserverActions(Object observer) {
        Map<Class<?>, Collection<MyObserverAction>> result = new HashMap<>();
        // 觀察者類型
        Class<?> observerClass = observer.getClass();
        for (Method method : getAnnotatedMethods(observerClass)) {
            Class<?>[] parameterTypes = method.getParameterTypes();
            Class<?> eventType = parameterTypes[0];
            result.putIfAbsent(eventType, new ArrayList<>());
            result.get(eventType).add(new MyObserverAction(observer, method));
        }
        return result;
    }
    /**
     * 根據觀察者類型,查找方法列表
     * @param clazz
     * @return
     */
    public List<Method> getAnnotatedMethods(Class<?> clazz) {
        List<Method> result = new ArrayList<>();
        for (Method method : clazz.getDeclaredMethods()) {
            if (method.isAnnotationPresent(MySubscribe.class)) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                checkArgument(parameterTypes.length==1,
                        "方法%s 有一個註解@MySubscribe ,它有%s個參數,實際要求有且隻有一個參數",
                        method, parameterTypes.length);
                result.add(method);
            }
        }
        return result;
    }
}

4.4 EventBus

/**
 * 實現 同步阻塞的 EventBus
 */
public class MyEventBus {
    private Executor executor;
    private MyObserverRegister register = new MyObserverRegister();
    public MyEventBus() {
        // MoreExecutors.directExecutor() 是 Google Guava 提供的工具類,看似是多線程,實際上是單線程。
        // 之所以要這麼實現,主要還是為瞭跟 AsyncEventBus 統一代碼邏輯,做到代碼復用
        this(MoreExecutors.directExecutor());
    }
    // 註意這裡的修飾符
    protected MyEventBus(Executor executor) {
        this.executor = executor;
    }
    public void register(Object observer) {
        register.register(observer);
    }
    public void post(Object event) {
        List<MyObserverAction> observerActions = register.getMatchedMyObserverActions(event);
        for (MyObserverAction observerAction : observerActions) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    observerAction.execute(event);
                }
            });
        }
    }
}

4.5 SyncEventBus

/**
 * 異步非阻塞的EventBus
 */
public class MySyncEventBus extends MyEventBus {
    public MySyncEventBus(Executor executor) {
        super(executor);
    }
}

五、使用自定義的EventBus

    public static void main(String[] args) {
        // 自定義的EventBus
        MyEventBus myEventBus = new MyEventBus();
        // 註冊一個觀察者
        myEventBus.register(new CurrentConditionsDisplayListener());
        // 向觀察者發送消息
        myEventBus.post(23.0f);
    }

六、擴展

Spring Event

到此這篇關於Java EventBus手把手帶你實現的文章就介紹到這瞭,更多相關Java EventBus內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: