RxJava構建流基本原理示例解析
正文
本節,我們從Rxjava使用代碼入手,去結合自己已有的知識體系,加查閱部分源碼驗證的方式,來一起探索一下Rxjava實現的基本原理。
為瞭本文原理分析環節,可以被更多的人理解、學習,所以小編從初學者的角度,從使用入手,一點點的分析瞭其中的源碼細節、思想,建議大傢隨著本文的章節步驟,一步一步的來閱讀,才能更快、更好的理解Rxjava的真正的思想精髓,也為我們之後的實踐課程留一個好的底子。
1.構建流
大傢一定鬱悶,怎麼突然跑出來構建流的詞匯,小編你之前也沒講到過呀?大傢先別急,我們一步一步來。 首先從一個最簡單的,RxJava的樣例使用代碼,我們去分解,打上步驟
private void test() { //第一步:just調用 Observable.just("https://img-blog.csdn.net/20160903083319668") //第二步:map調用 .map(new Function<String, Bitmap>() { @Override public Bitmap apply(String s) throws Exception { //Bitmap bitmap = downloadImage(s); return null; } }) //第三步:subscribeOn、observeOn調用 .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) //第四步:subscribe調用 .subscribe(new Observer<Bitmap>() { @Override public void onSubscribe() { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Bitmap s) { Log.d(TAG, "onNext s = " + s); } @Override public void onError(Throwable e) { Log.e(TAG, "onError ", e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
從上面的樣例代碼分析、分解,我們明面上看到四個步驟,暫且列下來:
- 第一步:just調用
- 第二步:map調用
- 第三步:subscribeOn、observeOn調用
- 第四步:subscribe調用
這裡運行一下,我們看到日志打印
大傢看一下這裡幾個關鍵的函數對象
Observable 被觀察者,subscribe訂閱的意思,Observer觀察者的意思,just是僅僅輸入url,subscribeOn和observeOn是線程切換功能,map是中間的一個操作符。接下來,我們就這些看到的操作符、關鍵字,進行解讀。
1.1 just的解讀
從整體鏈式調用的代碼形式上,我們大概可以猜測到,just、subscribeOn、observeOn、map等中間操作符,必定都是基於同一個對象的變形的builder模式,也就是說,他們應該都是同一個類內部的方法。
我們看一下的Observable的just源碼
@CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> just(T item) { ObjectHelper.requireNonNull(item, "item is null"); return RxJavaPlugins.onAssembly(new ObservableJust<T>(item)); }
可以看到這裡,僅僅是將傳入的T 也就是我們上面樣例中的urlString,包裝為瞭ObservableJust對象。而just的調用,返回的依然是一個Observable被觀察者對象。 我們看一下ObservableJust類代碼
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> { private final T value; public ObservableJust(final T value) { this.value = value; } @Override protected void subscribeActual(Observer<? super T> observer) { ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value); observer.onSubscribe(sd); sd.run(); } @Override public T call() { return value; } }
從上面看到,ObservableJust僅僅是將傳入的T封裝瞭一層而已,它繼承與Observable抽象類,而Observable抽象類實現瞭ObservableSource接口
public abstract class Observable<T> implements ObservableSource<T> {
而ObservableSource接口,就是我們外界調用的subscribe訂閱方法的源頭
public interface ObservableSource<T> { /** * Subscribes the given Observer to this ObservableSource instance. * @param observer the Observer, not null * @throws NullPointerException if {@code observer} is null */ void subscribe(@NonNull Observer<? super T> observer); }
好瞭,just講到這裡,我們繼續map,到目前為止,ObservableJust的subscribe方法僅僅是一個方法而已,並未調用到,所以我們暫時不予理睬,後面調用到,我們再去分析。
1.2 map的解讀
private void test() { //第一步:just調用 Observable.just("https://img-blog.csdn.net/20160903083319668") //第二步:map調用 .map(new Function<String, Bitmap>() { @Override public Bitmap apply(String s) throws Exception { //Bitmap bitmap = downloadImage(s); return null; } }) //第三步:subscribeOn、observeOn調用 .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) //第四步:subscribe調用 .subscribe(new Observer<Bitmap>() { @Override public void onSubscribe() { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Bitmap s) { Log.d(TAG, "onNext s = " + s); } @Override public void onError(Throwable e) { Log.e(TAG, "onError ", e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
從這個使用代碼來看,map是just與subscribe之前的一個操作符,起到承上啟下的作用。
我們看到首先是just返回的對象,可以直接調用map方法,我們之前分析、實現過just,知道just實際上返回的就是一個Observable,所以map方法就是Observable裡面的一個方法。
再往下看,map返回後的對象,直接調用瞭subscribe方法,我們之前設計實現響應式功能代碼的時候知道,subscribe是Observable裡面的一個方法,那麼這裡也就知道瞭,map方法肯定也是返回一個Observable對象。
從上面分析,我們看到一個流向圖
大傢發現,這個流。隻做瞭一件事情,每個操作符,都是對上一層的Observable進行瞭一層包裝代理而已。這時我們提出一個概念,這個流,我們命名為構建流。
同理,subscribeOn、observeOn是否也是這樣的呢?我們通過源碼驗證一下。
1.3 subscribeOn、observeOn
有瞭上面map的經驗,我們猜測,subscribeOn、observeOn順序調用的時候,應該也僅僅是對上一層的Observable進行瞭一層包裝代理而已。我們看一下subscribeOn、observeOn的源碼
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) @NonNull public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) { Objects.requireNonNull(scheduler, "scheduler is null"); //果然這裡包裝為瞭ObservableSubscribeOn對象 return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler)); } public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) { Objects.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize)); }
我們繼續看一下ObservableSubscribeOn、ObservableObserveOn類的源碼
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } //...省略若幹 } public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } }
而AbstractObservableWithUpstream本身就是繼承與Observable
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {}
1.4 小結
從上面可以看到,也就是到目前為止,just、map、subscribeOn、observeOn一系列調用下來,依然都是執行一個操作而已,即每個操作符,都是對上一層的Observable進行瞭一層包裝代理而已。
這就是構建流,構建流的作用就是,從開始操作符到訂閱為止,從左往右執行,期間每個操作符,都是對上一層的Observable進行瞭一層包裝代理而已。如圖:
以上就是RxJava構建流基本原理示例解析的詳細內容,更多關於RxJava構建流基本原理的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- RxJava實戰之訂閱流基本原理示例解析
- RxJava 觸發流基本原理源碼解析
- 淺談Python響應式類庫RxPy
- JavaScript中rxjs與 Observable 兩大類操作符解析
- Android使用Retrofit上傳文件功能