RxJava構建流基本原理示例解析

正文

本節,我們從Rxjava使用代碼入手,去結合自己已有的知識體系,加查閱部分源碼驗證的方式,來一起探索一下Rxjava實現的基本原理。

為瞭本文原理分析環節,可以被更多的人理解、學習,所以小編從初學者的角度,從使用入手,一點點的分析瞭其中的源碼細節、思想,建議大傢隨著本文的章節步驟,一步一步的來閱讀,才能更快、更好的理解Rxjava的真正的思想精髓,也為我們之後的實踐課程留一個好的底子。

1.構建流

大傢一定鬱悶,怎麼突然跑出來構建流的詞匯,小編你之前也沒講到過呀?大傢先別急,我們一步一步來。 首先從一個最簡單的,RxJava的樣例使用代碼,我們去分解,打上步驟

private void test() {
	//第一步:just調用
    Observable.just("https://img-blog.csdn.net/20160903083319668")
    //第二步:map調用
            .map(new Function<String, Bitmap>() {
                @Override
                public Bitmap apply(String s) throws Exception {
                    //Bitmap bitmap = downloadImage(s);
                    return null;
                }
            })
            //第三步:subscribeOn、observeOn調用
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            //第四步:subscribe調用
            .subscribe(new Observer<Bitmap>() {
                @Override
                public void onSubscribe() {
                    Log.d(TAG, "onSubscribe");
                }
                @Override
                public void onNext(Bitmap s) {
                    Log.d(TAG, "onNext s = " + s);
                }
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError ", e);
                }
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}

從上面的樣例代碼分析、分解,我們明面上看到四個步驟,暫且列下來:

  • 第一步:just調用
  • 第二步:map調用
  • 第三步:subscribeOn、observeOn調用
  • 第四步:subscribe調用

這裡運行一下,我們看到日志打印

大傢看一下這裡幾個關鍵的函數對象

Observable 被觀察者,subscribe訂閱的意思,Observer觀察者的意思,just是僅僅輸入url,subscribeOn和observeOn是線程切換功能,map是中間的一個操作符。接下來,我們就這些看到的操作符、關鍵字,進行解讀。

1.1 just的解讀

從整體鏈式調用的代碼形式上,我們大概可以猜測到,just、subscribeOn、observeOn、map等中間操作符,必定都是基於同一個對象的變形的builder模式,也就是說,他們應該都是同一個類內部的方法。

我們看一下的Observable的just源碼

@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item) {
    ObjectHelper.requireNonNull(item, "item is null");
    return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

可以看到這裡,僅僅是將傳入的T 也就是我們上面樣例中的urlString,包裝為瞭ObservableJust對象。而just的調用,返回的依然是一個Observable被觀察者對象。 我們看一下ObservableJust類代碼

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }
    @Override
    public T call() {
        return value;
    }
}

從上面看到,ObservableJust僅僅是將傳入的T封裝瞭一層而已,它繼承與Observable抽象類,而Observable抽象類實現瞭ObservableSource接口

public abstract class Observable<T> implements ObservableSource<T> {

而ObservableSource接口,就是我們外界調用的subscribe訂閱方法的源頭

public interface ObservableSource<T> {
    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull Observer<? super T> observer);
}

好瞭,just講到這裡,我們繼續map,到目前為止,ObservableJust的subscribe方法僅僅是一個方法而已,並未調用到,所以我們暫時不予理睬,後面調用到,我們再去分析。

1.2 map的解讀

private void test() {
	//第一步:just調用
    Observable.just("https://img-blog.csdn.net/20160903083319668")
    //第二步:map調用
            .map(new Function<String, Bitmap>() {
                @Override
                public Bitmap apply(String s) throws Exception {
                    //Bitmap bitmap = downloadImage(s);
                    return null;
                }
            })
            //第三步:subscribeOn、observeOn調用
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            //第四步:subscribe調用
            .subscribe(new Observer<Bitmap>() {
                @Override
                public void onSubscribe() {
                    Log.d(TAG, "onSubscribe");
                }
                @Override
                public void onNext(Bitmap s) {
                    Log.d(TAG, "onNext s = " + s);
                }
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError ", e);
                }
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}

從這個使用代碼來看,map是just與subscribe之前的一個操作符,起到承上啟下的作用。

我們看到首先是just返回的對象,可以直接調用map方法,我們之前分析、實現過just,知道just實際上返回的就是一個Observable,所以map方法就是Observable裡面的一個方法。

再往下看,map返回後的對象,直接調用瞭subscribe方法,我們之前設計實現響應式功能代碼的時候知道,subscribe是Observable裡面的一個方法,那麼這裡也就知道瞭,map方法肯定也是返回一個Observable對象。

從上面分析,我們看到一個流向圖

大傢發現,這個流。隻做瞭一件事情,每個操作符,都是對上一層的Observable進行瞭一層包裝代理而已。這時我們提出一個概念,這個流,我們命名為構建流。

同理,subscribeOn、observeOn是否也是這樣的呢?我們通過源碼驗證一下。

1.3 subscribeOn、observeOn

有瞭上面map的經驗,我們猜測,subscribeOn、observeOn順序調用的時候,應該也僅僅是對上一層的Observable進行瞭一層包裝代理而已。我們看一下subscribeOn、observeOn的源碼

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    @NonNull
    public final Observable&lt;T&gt; subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        //果然這裡包裝為瞭ObservableSubscribeOn對象
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn&lt;&gt;(this, scheduler));
    }
    public final Observable&lt;T&gt; observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn&lt;&gt;(this, scheduler, delayError, bufferSize));
    }

我們繼續看一下ObservableSubscribeOn、ObservableObserveOn類的源碼

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    //...省略若幹
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
}

而AbstractObservableWithUpstream本身就是繼承與Observable

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {}

1.4 小結

從上面可以看到,也就是到目前為止,just、map、subscribeOn、observeOn一系列調用下來,依然都是執行一個操作而已,即每個操作符,都是對上一層的Observable進行瞭一層包裝代理而已。

這就是構建流,構建流的作用就是,從開始操作符到訂閱為止,從左往右執行,期間每個操作符,都是對上一層的Observable進行瞭一層包裝代理而已。如圖:

以上就是RxJava構建流基本原理示例解析的詳細內容,更多關於RxJava構建流基本原理的資料請關註WalkonNet其它相關文章!

推薦閱讀: