资讯专栏INFORMATION COLUMN

RxJava2:Observable和Observer如何传递

malakashi / 2373人阅读

摘要:以为例先上代码传递这里每次调用一个操作符返回的都是的直接子类或者间接之类以为例这里重新了一个的子类对象结论如下每个操作符都会对应返回一个的子类对象类名格式然后去调用下一个操作符比如操作符返回的是的实例对象对于的创建型操作符返回的是其直接子类

以Observable为例,先上代码:

//①
ObservableJust observable = (ObservableJust) Observable.just("hello rxjava2");
//②
        ObservableSubscribeOn subscribe = (ObservableSubscribeOn) observable.subscribeOn(Schedulers.io());
//③
        ObservableObserveOn observerOn = (ObservableObserveOn) subscribe.observeOn(AndroidSchedulers.mainThread());
//④
        ObservableDoFinally doFinally = (ObservableDoFinally) observerOn.doFinally(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("doFinally");
            }
        });
//⑤
        ObservableDoOnLifecycle doOnSubscribe = (ObservableDoOnLifecycle) doFinally.doOnSubscribe(new Consumer() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                System.out.println("doOnSubscribe: " + disposable.hashCode());
            }
        });
//⑥
        doOnSubscribe.subscribe(new Observer() {
                    @Override
                    public void onSubscribe(Disposable d) {
                       System.out.println("onSubscribe: "+d.hashCode());
                      /*  if (!d.isDisposed()){
                           System.out.println("onSubscribe: dispose");
                           d.dispose();
                       }*/
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println("onNext: "+s);
                        Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError: "+e.getMessage());
                        Toast.makeText(MainActivity.this, e.getMessage(), Toast.LENGTH_SHORT).show();

                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                        Toast.makeText(MainActivity.this, "onComplete", Toast.LENGTH_SHORT).show();
                    }
                });
Observable传递

这里每次调用一个操作符,返回的都是Observable的直接子类或者间接之类.以just为例:

 public static  Observable just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust(item));
    }

这里重新new了一个Observable的子类对象ObservableJust.

结论如下:

每个操作符都会对应返回一个Observable的子类对象,类名格式ObservableXXX然后去调用下一个操作符.比如interval操作符,返回的是ObservableInterval的实例对象.

对于Observable的创建型操作符,返回的是其直接子类,而其他操作符,返回的是AbstractObservableWithUpstream的子类对象.AbstractObservableWithUpstream的构造函数中,第一个参数就是Observable对象,这一点非常重要,这个参数是上一个操作符返回的Observable对象.这保证了整个调用流程的起始处的Observable对象能在整个流程中传递.

最后一步订阅subscribe(Observer).如果没有最下游的观察者对数据做接收,整个调用流程是不会执行的.
先从⑥开始看ObservableDoOnLifecycle的subscribe方法做了什么.

@Override
    protected void subscribeActual(Observer observer) {
        source.subscribe(new DisposableLambdaObserver(observer, onSubscribe, onDispose));
    }

source就是上游操作符返回的Observable的子类对象,通过AbstractObservableWithUpstream的构造函数传递给下游的.这里去调用了上一个Observable对象的subscribe方法.这个调用由下至上,直到整个流程的起始处.

Observable对象先从上游逐步通过下游的Observable对象的构造函数传递给下游,再通过下游的subscribe方法,逐步去调用上游的subscribe方法.

Observer传递

订阅发生在最后一步调用subscribe(Observer).从第⑤步ObservableDoOnLifecycle的subscribe方法开始看.

 @Override
    protected void subscribeActual(Observer observer) {
        source.subscribe(new DoFinallyObserver(observer, onFinally));
    }

重新创建一个DoFinallyObserver对象,并把第⑥步的Observer参数传入后,交给上游的Observable.这个调用流程会逐步传递到最上游的ObservableJust的subscribe方法.

//ObservableJust.java
 @Override
    protected void subscribeActual(Observer observer) {
    //参数observer是下游传上来的
        ScalarDisposable sd = new ScalarDisposable(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }

首先调用了 observer.onSubscribe(sd);可以得到结论:

Observer的onSubscribe在主线程执行,无论上下游怎么切换线程.在请求网络时,可以在这个地方弹出进度提示或者做一些初始化操作.

ScalarDisposable.run()方法调用了下游的Observer传递数据,这个调用会逐步往下传递,直到最下游的Observer,如果没遇到错误或者异常情况.

Observer对象先从最下游的订阅处开始往上传递到最上游,再携带数据逐步往下游传递.
数据传递
从上面可以知道,数据是被Observer携带,逐步往下游传递
Observable.subscribe(Consumer,Consumer,Action)

有多个重载的方法

 //方法一
 @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe() {
        return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
     //方法二
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
     //方法三
     @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer onNext, Consumer onError) {
        return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
     //方法四
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer onNext, Consumer onError,
            Action onComplete) {
        return subscribe(onNext, onError, onComplete, Functions.emptyConsumer());
    }
     //方法五
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer onNext, Consumer onError,
            Action onComplete, Consumer onSubscribe) {
       //创建LambdaObserver对象
        LambdaObserver ls = new LambdaObserver(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }
     //方法六
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer observer) {
       
            observer = RxJavaPlugins.onSubscribe(this, observer);
// 省略
            subscribeActual(observer);
        //省略
    }

前五个方法最终在第五个方法内部重新创建了一个Observer类型对象LambdaObserver,然后调用了第六个方法.

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/14719.html

相关文章

  • Rxjs 核心概念

    摘要:仿宋可以把想像成一个可以发射事件的库。在中用来处理异步事件的核心概念包括代表了未来可能会产生的一系列的值或事件的集合回调函数的集合,它知道如何去处理上产生的值或者事件,当然也包括异常。 又一年要过去了,回顾2017,rxjs始终是我在项目里使用最频繁的库,在我看来,它是一个非常优秀的数据处理工具。年初的时候就计划写点什么,碍于目前公司的项目实在抽不出时间,这一拖就到了年底。临近新年,总...

    Youngdze 评论0 收藏0
  • 介绍RxJS在Angular中的应用

    摘要:是一种针对异步数据流编程工具,或者叫响应式扩展编程可不管如何解释其目标就是异步编程,引入为了就是让异步可控更简单。最合理的方式在调用它。当组件需要向组件传递数据时,我们可以在组件中使用。的作用是使指令或组件能自定义事件。 RxJS是一种针对异步数据流编程工具,或者叫响应式扩展编程;可不管如何解释RxJS其目标就是异步编程,Angular引入RxJS为了就是让异步可控、更简单。 而今就是...

    joyqi 评论0 收藏0
  • Android架构组件官方文档02——LiveData

    摘要:此关系允许在相应生命周期对象的状态更改为时删除观察者。在这种情况下,观察者被认为始终处于活动状态,因此总是通知修改。确保或具有一旦它变为活动状态即可显示的数据。字段定义为的转换,这意味着发生更改时将调用方法。将其状态正确传播到源对象。 LiveData概述 LiveData是一个可观察的数据持有者类。与常规可观察性不同,LiveData具有生命周期感知能力,这意味着它尊重其他应用程序组...

    jindong 评论0 收藏0
  • RxJs 核心概念之Observable

    摘要:函数调用后同步计算并返回单一值生成器函数遍历器遍历过程中同步计算并返回个到无穷多个值异步执行中返回或者不返回单一值同步或者异步计算并返回个到无穷多个值是函数概念的拓展既不像,也不像是。如果不调用函数,就不会执行如果如果不订阅,也不会执行。 Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。下方表格对Observable进行了定位(为解决基于推送的...

    forrest23 评论0 收藏0
  • Rxjs 响应式编程-第一章:响应式

    摘要:响应式编程具有很强的表现力,举个例子来说,限制鼠标重复点击的例子。在响应式编程中,我把鼠标点击事件作为一个我们可以查询和操作的持续的流事件。这在响应式编程中尤其重要,因为我们随着时间变换会产生很多状态片段。迭代器模式的另一主要部分来自模式。 Rxjs 响应式编程-第一章:响应式Rxjs 响应式编程-第二章:序列的深入研究Rxjs 响应式编程-第三章: 构建并发程序Rxjs 响应式编程-...

    songze 评论0 收藏0

发表评论

0条评论

malakashi

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<