RxJava 觸發流基本原理源碼解析

正文

本節,我們從Rxjava使用代碼入手,去結合自己已有的知識體系,加查閱部分源碼驗證的方式,來一起探索一下Rxjava實現的基本原理。

為瞭本文原理分析環節,可以被更多的人理解、學習,所以小編從初學者的角度,從使用入手,一點點的分析瞭其中的源碼細節、思想,建議大傢隨著本文的章節步驟,一步一步的來閱讀,才能更快、更好的理解Rxjava的真正的思想精髓,也為我們之後的實踐課程留一個好的底子。

觸發流

到目前為止,我們講瞭構建流、訂閱流,但是依然沒有觸發真正的observer中的事件,例如:

@Override
   public void onSubscribe(@NonNull Disposable d) {
       Log.d(TAG, "onSubscribe");
   }
   @Override
   public void onNext(@NonNull String s) {
       Log.d(TAG, "onNext s = " + s);
   }
   @Override
   public void onError(@NonNull Throwable e) {
       Log.d(TAG, "onError");
   }
   @Override
   public void onComplete() {
       Log.d(TAG, "onComplete");
   }

各位看官,莫急莫急,且聽老衲娓娓道來。

還記得上面的訂閱流嗎?訂閱流從右往左執行的,執行到最後的observable,執行瞭它的subscribe方法。我們從使用代碼知道,最左端的observable是啥來著,大傢還記得嗎?當然是ObservableJust

private void test() {
	//第一步:just調用
    Observable.just("https://img-blog.csdn.net/20160903083319668")
    //第二步:map調用
            .map(new Function<String, Bitmap>() {
                @Override
                public Bitmap apply(String s) throws Exception {
                    //Bitmap bitmap = downloadImage(s);
                    return null;
                }
            })
            //第三步:subscribeOn、observeOn調用
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            //第四步:subscribe調用
            .subscribe(new Observer<Bitmap>() {
                @Override
                public void onSubscribe() {
                    Log.d(TAG, "onSubscribe");
                }
                @Override
                public void onNext(Bitmap s) {
                    Log.d(TAG, "onNext s = " + s);
                }
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError ", e);
                }
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}

我們就順坡下驢,看一下ObservableJust的subscribe方法做啥瞭

public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {
    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }
    @Override
    public T get() {
        return value;
    }
}

仔細一看,這裡面沒有subscribe方法,那麼肯定就是調用父類observable的subscribe方法瞭

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        //對象封裝,暫時不是重點,我們跳過
        observer = RxJavaPlugins.onSubscribe(this, observer);
        //判空
        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

大傢看到這裡,其實關鍵在於,最終調用瞭一個subscribeActual方法,所以我們繼續看子類ObservableJust的subscribeActual方法幹啥瞭?

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }

接續根據ScalarDisposable的run方法

   public static final class ScalarDisposable<T>
    extends AtomicInteger
    implements QueueDisposable<T>, Runnable {
        private static final long serialVersionUID = 3880992722410194083L;
        final Observer<? super T> observer;
        final T value;
		//...省略很多代碼
        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
            	//可以看到這裡執行瞭onNext、onComplete方法
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }
    }

小結

看到這裡,我們知道瞭,開始一層一層的從左往右去調用observer的相關方法瞭。 由訂閱流可知,每層的observable實際上擁有下一層的observer的代理類,所以自然而然,從最左邊開始調用observer的相關方法開始,觸發流,就是從左往右,一層一層的剝開之前包裹的observer,然後順序調用裡面的onNext、onComplete等方法。 不信,我們挑一個ObservableMap來驗證一下。

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }
            U v;
            try {
                v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //此處調用瞭下遊的observer的onNext方法
            downstream.onNext(v);
        }
    }
}

可以看到裡面,的確調用瞭下遊的observer的onNext方法。

總結

整個過程,分為構建流、訂閱流、觸發流。

構建流:從左到右執行不同的操作符的過程,其實很簡單,就是根據不同的操作符,對原始的 observable進行逐層包裝,這裡可以看出,每層的節點 N* 就持有瞭上一層的observable。

訂閱流:從右到左的 subscribe 調用過程,這個過程中,每個observable內部的subscribeActual執行兩個關鍵操作,一個是對自己已有的observer進行一層重新包裝,另外一個就是使用前面節點的observable,訂閱包裝好的observer。

觸發流:在訂閱流執行完成之後,執行到最左端的observable,我們發現它內部的subscribeActual實現,實際上就是調用裡面擁有的observer的相關回調方法(onNext、onComplete、onError等),那麼這層回調流就簡單瞭,就是一層一層的調用裡面的observer,最終執行到最右端的observer。

篇幅所限,大傢也發現瞭,我們本節課,我們詳細講解Rxjava線程切換的實現原理,這個有兩個原因,一是篇幅所限,本節內容已經夠多瞭,大傢先吃透框架,另外一方面是,線程切換我相信我們後面實踐環節,待框架自我搭建實現之後,裡面的線程切換功能就是水到渠成的事情,相信憑借大傢已有的知識,都可以做到的。

所以建議大傢,先別看這塊Rxjava是如何實現線程切換的,而是想一下,它是怎麼實現的?到時我們自己的Rxjava框架搭建起來之後,填充實現一下。

提個醒兒,大傢還記得我們之前EventBus源碼分析、實踐環節嗎?其中也說到瞭線程切換。其實原理差不多,大傢先想一下。

以上就是RxJava 觸發流基本原理源碼解析的詳細內容,更多關於RxJava 觸發流原理的資料請關註WalkonNet其它相關文章!

推薦閱讀: