RxJava實戰之訂閱流基本原理示例解析
正文
本節,我們從Rxjava使用代碼入手,去結合自己已有的知識體系,加查閱部分源碼驗證的方式,來一起探索一下Rxjava實現的基本原理。
為瞭本文原理分析環節,可以被更多的人理解、學習,所以小編從初學者的角度,從使用入手,一點點的分析瞭其中的源碼細節、思想,建議大傢隨著本文的章節步驟,一步一步的來閱讀,才能更快、更好的理解Rxjava的真正的思想精髓,也為我們之後的實踐課程留一個好的底子。
訂閱流
有人會問,小編,你到現在為止,隻是講瞭流程,而沒有講到具體每個中間操作符,在轉換的對象裡面的方法調用,這個問題,問的特別好!!!
還記得小編開篇說的那句話嗎?我們從Rxjava的使用代碼入手
private void test() { //第一步:just調用 Observable.just("https://img-blog.csdn.net/20160903083319668") //第二步:map調用 .map(new Function<String, Bitmap>() { @Override public Bitmap apply(String s) throws Exception { //Bitmap bitmap = downloadImage(s); return null; } }) //第三步:subscribeOn、observeOn調用 .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) //第四步:subscribe調用 .subscribe(new Observer<Bitmap>() { @Override public void onSubscribe() { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Bitmap s) { Log.d(TAG, "onNext s = " + s); } @Override public void onError(Throwable e) { Log.e(TAG, "onError ", e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
從上面的樣例代碼分析、分解,我們明面上看到四個步驟,暫且列下來:
- 第一步:just調用
- 第二步:map調用
- 第三步:subscribeOn、observeOn調用
- 第四步:subscribe調用
之所以我們沒有講到ObservableObserveOn、ObservableMap、ObservableJust等對象裡面的具體方法調用,是因為到目前為止,從使用例子入手,根本就沒有調用到,所以我們也就無從分析到。,本節,接下來我們分析subscribe調用,大傢就發現,裡面的某些方法開始調用上瞭。
subscribe的解讀收下
我們知道,上面的just、map、subscribeOn、observeOn一系列調用下來,依然是一個Observable對象、
Observable是被觀察者的意思,subscribe是訂閱的意思,Observer是觀察者的意思。
大傢發現瞭沒有?這裡有個問題,這傢夥和我們標準的觀察者有很大的不同,標準觀察者模式,是一種一對多的行為型設計模式,其實就是若幹個觀察者,將自身的接口引用註冊到被觀察者內部,被觀察者狀態發生變更時,遍歷內部的list列表,一一通知觀察者,如下圖
Observable.just("https://img-blog.csdn.net/20160903083319668") .subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(@NonNull String s) { Log.d(TAG, "onNext s = " + s); } @Override public void onError(@NonNull Throwable e) { Log.d(TAG, "onError"); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
可是我們看上面Rxjava的代碼,與標準觀察者模式有兩點不同
- 一是被觀察者訂閱瞭觀察者
- 二是從使用上看觀察者和被觀察者的訂閱關系是一對一的
上面提出的兩點不同,我們一邊看源碼,一邊試著去理解一下。
- 一對一的通知:因為響應式編程思想的重點在於,一個變化,另外一個要能感知到,那麼通過這樣變形的觀察者模式,去實現一對一的通知,我覺得也沒啥問題。
- 被觀察者訂閱觀察者:這個從理論上講,就沒辦法去理解瞭,對吧,因為你再怎麼變形標準觀察者模式,那也肯定是觀察者訂閱被觀察者,所以這裡我們有必要簡單通過源碼去瞭解一下
我們從上面看到just是將傳入的T,再次封裝為瞭一個ObservableJust對象
@CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> just(T item) { ObjectHelper.requireNonNull(item, "item is null"); return RxJavaPlugins.onAssembly(new ObservableJust<T>(item)); }
我們看一下ObservableJust類代碼
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> { private final T value; public ObservableJust(final T value) { this.value = value; } @Override protected void subscribeActual(Observer<? super T> observer) { ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value); observer.onSubscribe(sd); sd.run(); } @Override public T call() { return value; } }
從上面看到,ObservableJust僅僅是將傳入的T封裝瞭一層而已,它繼承與Observable抽象類,而Observable抽象類實現瞭ObservableSource接口
public abstract class Observable<T> implements ObservableSource<T> {
而ObservableSource接口,就是我們外界調用的subscribe訂閱方法的源頭
public interface ObservableSource<T> { /** * Subscribes the given Observer to this ObservableSource instance. * @param observer the Observer, not null * @throws NullPointerException if {@code observer} is null */ void subscribe(@NonNull Observer<? super T> observer); }
所以Observable肯定實現瞭subscribe方法,我們看一下Observable的subscribe方法幹什麼瞭
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { //對象封裝,暫時不是重點,我們跳過 observer = RxJavaPlugins.onSubscribe(this, observer); //判空 ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
大傢看到這裡,其實關鍵在於,最終調用瞭一個subscribeActual方法,而這個方法是個啥?在哪裡實現的?一看,這玩意原來是Observable類中的一個抽象方法
protected abstract void subscribeActual(Observer<? super T> observer);
所以這裡繞回到開頭,我們知道just,實際上是將傳入的參數T,轉換封裝為瞭ObservableJust對象,而ObservableJust繼承與 Observable,所以subscribeActual方法它肯定去瞭
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> { private final T value; public ObservableJust(final T value) { this.value = value; } @Override protected void subscribeActual(Observer<? super T> observer) { ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value); //最終這裡還是調用瞭觀察者的相應方法 observer.onSubscribe(sd); sd.run(); } @Override public T call() { return value; } }
小結
大傢,發現瞭沒有,這裡繞瞭一圈,最終調用通過Observable的抽象方法subscribeActual的巧妙實現,最終還是觀察者訂閱瞭被觀察者,被觀察者內部最終調用瞭觀察者的具體方法。
這裡和標準觀察者模式不同的是,被觀察者立馬去通知瞭觀察者,說直接點,在調用被觀察者的訂閱方法時,其實就是直接調用瞭觀察者相應的方法,隻不過這裡通過模板方法模式,巧妙的封裝瞭,好瞭,Rxjava的觀察者模式源碼,我們簡單理解到這裡,我們試著自己去編寫實現一下。
也就是訂閱流的過程中,是以執行subscribe方法為開始,從右往左執行,這個執行過程中,每個節點,做兩件事情
- 對後面的observer節點,做一層包裝代理,變為代理的observerProxy
由於構建流的執行,每個節點實際上擁有上一個節點observable對象的引用,所以執行 source.subscribe(observerProxy)
訂閱流講到現在,大傢是否理解瞭?當然這裡沒有詳細講解其中ObservableSubscribeOn、ObservableObserveOn中的訂閱,如何進行的線程切換,這個並非是不去講,還是那句老話,飯要一點一點的吃,我們congoing使用方法入手,想要去瞭解的是Rxjava的整體框架原理。至於線程切換如何實現的?這個留個念想,大傢可以認真想一下,不建議大傢直接去看源碼。我們在Rxjava實踐環節,也會帶大傢一點一點的去實現這個核心功能。
以上就是RxJava實戰之訂閱流基本原理示例解析的詳細內容,更多關於RxJava訂閱流基本原理的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- RxJava 觸發流基本原理源碼解析
- RxJava構建流基本原理示例解析
- JavaScript中rxjs與 Observable 兩大類操作符解析
- 淺談Python響應式類庫RxPy
- Android實現消息總線的幾種方式詳解