资讯专栏INFORMATION COLUMN

谜之RxJava (二) —— Magic Lift

fobnn / 1429人阅读

摘要:回顾上一篇文章讲了和之间的关系。我们知道,的具体工作都是在中完成的。这样就生成了一个代理的,最后我们最外层的对象对我们代理的进行了调用。。也就是此处的就是被包裹后的对象。迷之三线程切换欢迎关注我以及

回顾

上一篇文章 讲了ObservableOnSubscribeSubscriber之间的关系。 我们知道,Observable的具体工作都是在OnSubscribe中完成的。从这个类名我们也知道,如果生成了一个Observable对象,而不进行subscribe,那么什么都不会发生!

OK,RxJava最让人兴奋的就是它有各种各样的操作符,什么map呀,flatMap呀各种,我们今天要知其然知其所以然,那么他们是如何实现功能的呢?

例子
Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {
        subscriber.onNext("hello");
    }
})
.map(new Func1() {
    @Override
    public String call(String s) {
        return s + "word";
    }
})
.subscribe(new Subscriber() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {
        Log.d("rx", s);
    }
});
lift

我们先看下进行链式调用map之后,发生了什么。

public final  Observable map(Func1 func) {
    return lift(new OperatorMap(func));
}

对,就是调用了lift函数!,然后把我们的转换器(Transfomer,我好想翻译成变形金刚)传入进去,看下它做了什么事。

public final  Observable lift(final Operator operator) {
    return new Observable(new OnSubscribe() {
        @Override
        public void call(Subscriber o) {
            try {
                Subscriber st = hook.onLift(operator).call(o);
                try {
                    // new Subscriber created and being subscribed with so "onStart" it
                    st.onStart();
                    onSubscribe.call(st);
                } catch (Throwable e) {
                    // localized capture of errors rather than it skipping all operators 
                    // and ending up in the try/catch of the subscribe method which then
                    // prevents onErrorResumeNext and other similar approaches to error handling
                    if (e instanceof OnErrorNotImplementedException) {
                        throw (OnErrorNotImplementedException) e;
                    }
                    st.onError(e);
                }
            } catch (Throwable e) {
                if (e instanceof OnErrorNotImplementedException) {
                    throw (OnErrorNotImplementedException) e;
                }
                // if the lift function failed all we can do is pass the error to the final Subscriber
                // as we don"t have the operator available to us
                o.onError(e);
            }
        }
    });
}

来,我来简化一下

public final  Observable lift(final Operator operator) {
    return new Observable(...);
}

返回了一个新的Observable对象,这才是重点! 这种链式调用看起来特别熟悉?有没有像javascript中的Promise/A,在then中返回一个Promise对象进行链式调用?

OK,那么我们要看下它是如何工作的啦。

map()调用之后,我们操作的就是新的Observable对象,我们可以把它取名为Observable$2,OK,我们这里调用subscribe,完整的就是Observable$2.subscribe,继续看到subscribe里,重要的几个调用:

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);

注意注意 ! 这里的observableObservable$2!!也就是说,这里的onSubscribe是,lift中定义的!!

OK,我们追踪下去,回到lift的定义中。

return new Observable(new OnSubscribe() {
    @Override
    public void call(Subscriber o) {
        try {
            Subscriber st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so "onStart" it
                st.onStart();
                onSubscribe.call(st); //请注意我!! 这个onSubscribe是原始的OnSubScribe对象!!
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators 
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                if (e instanceof OnErrorNotImplementedException) {
                    throw (OnErrorNotImplementedException) e;
                }
                st.onError(e);
            }
        } catch (Throwable e) {
            if (e instanceof OnErrorNotImplementedException) {
                throw (OnErrorNotImplementedException) e;
            }
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don"t have the operator available to us
            o.onError(e);
        }
    }
});

一定一定要注意这段函数执行的上下文!,这段函数中的onSubscribe对象指向的是外部类,也就是第一个ObservableonSubScribe!而不是Observable$2中的onSubscribe,OK,谨记这一点之后,看看

Subscriber st = hook.onLift(operator).call(o);

这行代码,就是定义operator,生成一个经过operator操作过的Subscriber,看下OperatorMap这个类中的call方法

@Override
public Subscriber call(final Subscriber o) {
    return new Subscriber(o) {

        @Override
        public void onCompleted() {
            o.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            o.onError(e);
        }

        @Override
        public void onNext(T t) {
            try {
                o.onNext(transformer.call(t));
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(OnErrorThrowable.addValueAsLastCause(e, t));
            }
        }

    };
}

没错,对传入的Subscriber做了一个代理,把转换后的值传入。
这样就生成了一个代理的Subscriber

最后我们最外层的OnSubscribe对象对我们代理的Subscriber进行了调用。。
也就是

 @Override
public void call(Subscriber subscriber) {
    //此处的subscriber就是被map包裹(wrapper)后的对象。
    subscriber.onNext("hello");
}

然后这个subscriber传入到内部,链式的通知,最后通知到我们在subscribe函数中定义的对象。

这时候要盗下扔物线大大文章的图

还不明白的各位,可以自己写一个Demo试一下。

下一章讲下RxJava中很重要的线程切换。

【迷之RxJava(三)—— 线程切换】

欢迎关注我Github 以及 weibo、@Gemini

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/12439.html

相关文章

  • 谜之RxJava (三)—— 线程切换

    摘要:谜之二一个异步库最迷人的是什么答案就是把异步序列写到一个工作流里和的如出一辙。综上所述,如果我们需要我们的在一个别的线程上运行的时候,只需要在后面跟一个即可。 【谜之RxJava (二) —— Magic Lift】 Rxjava -- 一个异步库 RxJava最迷人的是什么?答案就是把异步序列写到一个工作流里!和javascript的Promise/A如出一辙。OK,在java中做异...

    walterrwu 评论0 收藏0
  • 谜之RxJava (三)update —— 线程切换(

    摘要:在更新版本后,这个接口进行了一个重构,变换方式从一个比较难理解的递归嵌套的上做一个改成了从角度上进行了一层封装。从类型来说,脱离了的概念,变身成了。而在老接口中,变换前的是通过传进来的。 在RxJava更新版本后,OperatorSubscribeOn这个接口进行了一个重构,变换方式从一个比较难理解的递归嵌套的Observable上做一个Operator改成了从OnSubscribe角...

    fox_soyoung 评论0 收藏0
  • 谜之RxJava(四)—— Retrofit和RxJava的基情

    摘要:我们可以看下,它把,也就是取消请求的情况处理的挺好。这段代码是给增加一个的事件。也就是请求完成的时候,会自动对进行一个终止,也就是的行为。 概述 前文回顾: 谜之RxJava (三)—— 线程切换 今天来介绍下和RxJava搭配使用的好基友,就是我们的Retrofit啦,Retrofit使用动态代理的机制,为我们提供了一个简要的使用方法来获取网络上的资料,现在更新到2.0.0-beta...

    Scliang 评论0 收藏0
  • 谜之RxJava (一) —— 最基本的观察者模式

    摘要:最近在界,最火的大概就是了。先来上个最最简单的,经典的。这段代码产生的最终结果就是在里会出现。原来只是为了帮我们处理好异常,以及防止工作流的重复。这是最最基本的工作流,让我们认识到他是怎么工作的。谜之二欢迎关注我以及 最近在Android界,最火的framework大概就是RxJava了。扔物线大大之前写了一篇文章 《给 Android 开发者的 RxJava 详解》,在我学习RxJa...

    ckllj 评论0 收藏0
  • 谜之RxJava (三)update 2 —— subscribeOn 和 observeOn 的区

    摘要:操作,操作是在线程上,因为在他们之前的切换了线程。特别注意那一段,对于操作和操作是无效的再简单点总结就是的调用切换之前的线程。 开头 之前我们分析过subscribeOn这个函数,现在我们来看下subscribeOn和observeOn这两个函数到底有什么异同。 用过rxjava的旁友都知道,subscribeOn和observeOn都是用来切换线程用的,可是我什么时候用subscri...

    justCoding 评论0 收藏0

发表评论

0条评论

fobnn

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<