资讯专栏INFORMATION COLUMN

谜之RxJava (三)update —— 线程切换(二)

fox_soyoung / 2039人阅读

摘要:在更新版本后,这个接口进行了一个重构,变换方式从一个比较难理解的递归嵌套的上做一个改成了从角度上进行了一层封装。从类型来说,脱离了的概念,变身成了。而在老接口中,变换前的是通过传进来的。

RxJava更新版本后,OperatorSubscribeOn这个接口进行了一个重构,变换方式从一个比较难理解的递归嵌套的Observable>上做一个Operator改成了从OnSubscribe角度上进行了一层封装。

从类型来说,OperatorSubscribeOn脱离了Operator的概念,变身成了OnSubscribe

我们来比对下吧~

老版本的核心实现:

@Override
public Subscriber> call(final Subscriber subscriber) {
    final Worker inner = scheduler.createWorker();
    subscriber.add(inner);
    return new Subscriber>(subscriber) {

        @Override
        public void onCompleted() {
            // ignore because this is a nested Observable and we expect only 1 Observable emitted to onNext
        }

        @Override
        public void onError(Throwable e) {
            subscriber.onError(e);
        }

        @Override
        public void onNext(final Observable o) {
            inner.schedule(new Action0() {

                @Override
                public void call() {
                    final Thread t = Thread.currentThread();
                    o.unsafeSubscribe(new Subscriber(subscriber) {

                        @Override
                        public void onCompleted() {
                            subscriber.onCompleted();
                        }

                        @Override
                        public void onError(Throwable e) {
                            subscriber.onError(e);
                        }

                        @Override
                        public void onNext(T t) {
                            subscriber.onNext(t);
                        }

                        @Override
                        public void setProducer(final Producer producer) {
                            subscriber.setProducer(new Producer() {

                                @Override
                                public void request(final long n) {
                                    if (Thread.currentThread() == t) {
                                        // don"t schedule if we"re already on the thread (primarily for first setProducer call)
                                        // see unit test "testSetProducerSynchronousRequest" for more context on this
                                        producer.request(n);
                                    } else {
                                        inner.schedule(new Action0() {

                                            @Override
                                            public void call() {
                                                producer.request(n);
                                            }
                                        });
                                    }
                                }

                            });
                        }

                    });
                }
            });
        }

    };
}

操作符,核心是把一个Subscriber转换成另外一个Subscriber

再看看新版实现

@Override
public void call(final Subscriber subscriber) {
    final Worker inner = scheduler.createWorker();
    subscriber.add(inner);
    
    inner.schedule(new Action0() {
        @Override
        public void call() {
            final Thread t = Thread.currentThread();
            
            Subscriber s = new Subscriber(subscriber) {
                @Override
                public void onNext(T t) {
                    subscriber.onNext(t);
                }
                
                @Override
                public void onError(Throwable e) {
                    try {
                        subscriber.onError(e);
                    } finally {
                        inner.unsubscribe();
                    }
                }
                
                @Override
                public void onCompleted() {
                    try {
                        subscriber.onCompleted();
                    } finally {
                        inner.unsubscribe();
                    }
                }
                
                @Override
                public void setProducer(final Producer p) {
                    subscriber.setProducer(new Producer() {
                        @Override
                        public void request(final long n) {
                            if (t == Thread.currentThread()) {
                                p.request(n);
                            } else {
                                inner.schedule(new Action0() {
                                    @Override
                                    public void call() {
                                        p.request(n);
                                    }
                                });
                            }
                        }
                    });
                }
            };
            
            source.unsafeSubscribe(s);
        }
    });
}

这里实现的是OnSubscribe接口,我们知道,OnSubscribeObservable真正执行的代码段。

在新的接口重构后,唯一的不同是,在它里面需要存一个指向原始Observable的source变量。 而在老接口中,变换前的Observable是通过Observable传进来的。

欢迎关注我的专栏,来从头到尾学习RxJava

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/13001.html

相关文章

  • 谜之RxJava)—— 线程切换

    摘要:谜之二一个异步库最迷人的是什么答案就是把异步序列写到一个工作流里和的如出一辙。综上所述,如果我们需要我们的在一个别的线程上运行的时候,只需要在后面跟一个即可。 【谜之RxJava (二) —— Magic Lift】 Rxjava -- 一个异步库 RxJava最迷人的是什么?答案就是把异步序列写到一个工作流里!和javascript的Promise/A如出一辙。OK,在java中做异...

    walterrwu 评论0 收藏0
  • 谜之RxJavaupdate 2 —— subscribeOn 和 observeOn 的区

    摘要:操作,操作是在线程上,因为在他们之前的切换了线程。特别注意那一段,对于操作和操作是无效的再简单点总结就是的调用切换之前的线程。 开头 之前我们分析过subscribeOn这个函数,现在我们来看下subscribeOn和observeOn这两个函数到底有什么异同。 用过rxjava的旁友都知道,subscribeOn和observeOn都是用来切换线程用的,可是我什么时候用subscri...

    justCoding 评论0 收藏0
  • 谜之RxJava(四)—— Retrofit和RxJava的基情

    摘要:我们可以看下,它把,也就是取消请求的情况处理的挺好。这段代码是给增加一个的事件。也就是请求完成的时候,会自动对进行一个终止,也就是的行为。 概述 前文回顾: 谜之RxJava (三)—— 线程切换 今天来介绍下和RxJava搭配使用的好基友,就是我们的Retrofit啦,Retrofit使用动态代理的机制,为我们提供了一个简要的使用方法来获取网络上的资料,现在更新到2.0.0-beta...

    Scliang 评论0 收藏0
  • 谜之RxJava) —— Magic Lift

    摘要:回顾上一篇文章讲了和之间的关系。我们知道,的具体工作都是在中完成的。这样就生成了一个代理的,最后我们最外层的对象对我们代理的进行了调用。。也就是此处的就是被包裹后的对象。迷之三线程切换欢迎关注我以及 回顾 上一篇文章 讲了Observable、OnSubscribe和Subscriber之间的关系。 我们知道,Observable的具体工作都是在OnSubscribe中完成的。从这个类...

    fobnn 评论0 收藏0
  • 谜之RxJava (一) —— 最基本的观察者模式

    摘要:最近在界,最火的大概就是了。先来上个最最简单的,经典的。这段代码产生的最终结果就是在里会出现。原来只是为了帮我们处理好异常,以及防止工作流的重复。这是最最基本的工作流,让我们认识到他是怎么工作的。谜之二欢迎关注我以及 最近在Android界,最火的framework大概就是RxJava了。扔物线大大之前写了一篇文章 《给 Android 开发者的 RxJava 详解》,在我学习RxJa...

    ckllj 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<