摘要:操作,操作是在线程上,因为在他们之前的切换了线程。特别注意那一段,对于操作和操作是无效的再简单点总结就是的调用切换之前的线程。
开头
之前我们分析过subscribeOn这个函数,
现在我们来看下subscribeOn和observeOn这两个函数到底有什么异同。
用过rxjava的旁友都知道,subscribeOn和observeOn都是用来切换线程用的,可是我什么时候用subscribeOn,什么时候用observeOn呢,我们很少知道这两个区别是啥。
subscribeOn友情提示,如果不想看分析过程的,可以直接跳到下面的总结部分。
先看下OperatorSubscribeOn的核心代码:
public final class OperatorSubscribeOnimplements OnSubscribe { final Scheduler scheduler; final Observable source; public OperatorSubscribeOn(Observable source, Scheduler scheduler) { this.scheduler = scheduler; this.source = source; } @Override public void call(final Subscriber super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); inner.schedule(new Action0() { @Override public void call() { Subscriber s = new Subscriber (subscriber) { @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void onError(Throwable e) { try { subscriber.onError(e); } finally { inner.unsubscribe(); } } @Override public void onCompleted() { try { subscriber.onCompleted(); } finally { inner.unsubscribe(); } } .... }; source.unsafeSubscribe(s); } }); } }
这里注意两点:
因为OperatorSubscribeOn是个OnSubscribe对象,所以在call参数中传入的subscriber就是我们在外面使用Observable.subscribe(a)传入的对象a。
这里source对象指向的是调用subscribeOn之前的那个Observable序列。
明确了这两点,我们就很好的知道了subscribeOn是如何工作,产生神奇的效果了。
其实最最主要的就是一行函数
source.unsafeSubscribe(s);
并且要注意它所在的位置,是在worker的call里面,说白了,就是把source.subscribe这一行调用放在指定的线程里,那么总结起来的结论就是:
observeOnsubscribeOn的调用,改变了调用前序列所运行的线程。
同样看下OperatorObserveOn这个类的主要代码:
public final class OperatorObserveOnimplements Operator { private final Scheduler scheduler; private final boolean delayError; /** * @param scheduler the scheduler to use * @param delayError delay errors until all normal events are emitted in the other thread? */ public OperatorObserveOn(Scheduler scheduler, boolean delayError) { this.scheduler = scheduler; this.delayError = delayError; } @Override public Subscriber super T> call(Subscriber super T> child) { .... ObserveOnSubscriber parent = new ObserveOnSubscriber (scheduler, child, delayError); parent.init(); return parent; } /** Observe through individual queue per observer. */ private static final class ObserveOnSubscriber extends Subscriber implements Action0 { final Subscriber super T> child; final Scheduler.Worker recursiveScheduler; final NotificationLite on; final boolean delayError; final Queue
这里的代码有点长,我们先注意到它是一个Operator,它没有对上层Observable做任何的控制或者包装。
既然是Operator,那么它的职责就是把一个Subscriber转换成另外一个Subscriber, 我们来关注下转换后的Subscriber对转换前的Subscriber做了些什么事。
首先它是一个ObserveOnSubscriber类, 既然是Subscriber那么肯定有onNext, onComplete 和onError 看最主要的onNext
@Override public void onNext(final T t) { if (isUnsubscribed() || finished) { return; } if (!queue.offer(on.next(t))) { onError(new MissingBackpressureException()); return; } schedule(); }
好了,这里做了两件事,首先把结果缓存到一个队列里,然后调用schedule启动传入的worker
我们这里需要注意下:
在调用observeOn前的序列,把结果传入到onNext就是它的工作,它并不关心后续的流程,所以工作就到这里就结束了,剩下的交给ObserveOnSubscriber继续。
protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } }
recursiveScheduler 就是之前我们传入的Scheduler,我们一般会在observeOn传入AndroidScheluders.mainThread()对吧、
接下去,我们看下在scheduler中调用的call方法,这里只列出主要带代码
@Override public void call() { ... final Subscriber super T> localChild = this.child; for (;;) { ... boolean done = finished; Object v = q.poll(); boolean empty = v == null; if (checkTerminated(done, empty, localChild, q)) { return; } if (empty) { break; } localChild.onNext(localOn.getValue(v)); ... } if (emitted != 0L) { request(emitted); } }
OK,在Scheduler启动后, 我们在Observable.subscribe(a)传入的a就是这里的child, 我们看到,在call中终于调用了它的onNext方法,把真正的结果传了出去,但是在这里,我们是工作在observeOn的线程上的。
那么总结起来的结论就是:
复杂情况observeOn 对调用之前的序列默不关心,也不会要求之前的序列运行在指定的线程上
observeOn 对之前的序列产生的结果先缓存起来,然后再在指定的线程上,推送给最终的subscriber
我们经常多次使用subscribeOn切换线程,那么以后是否可以组合observeOn和subscribeOn达到自由切换的目的呢?
组合是可以的,但是他们的执行顺序是有条件的,如果仔细分析的话,可以知道observeOn调用之后,再调用subscribeOn是无效的,原因是什么?
因为subscribeOn改变的是subscribe这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。
经过上面的阐述,我们知道,observeOn的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算subscribeOn调用了,也只是改变observeOn这个消费者所在的线程,和OperatorObserveOn中存储的原始消费者一点关系都没有,它还是由observeOn控制。
总结如果我们有一段这样的序列
Observable .map // 操作1 .flatMap // 操作2 .subscribeOn(io) .map //操作3 .flatMap //操作4 .observeOn(main) .map //操作5 .flatMap //操作6 .subscribeOn(io) //!!特别注意 .subscribe(handleData)
假设这里我们是在主线程上调用这段代码,
那么
操作1,操作2是在io线程上,因为之后subscribeOn切换了线程
操作3,操作4也是在io线程上,因为在subscribeOn切换了线程之后,并没有发生改变。
操作5,操作6是在main线程上,因为在他们之前的observeOn切换了线程。
特别注意那一段,对于操作5和操作6是无效的
再简单点总结就是
subscribeOn的调用切换之前的线程。
observeOn的调用切换之后的线程。
observeOn之后,不可再调用subscribeOn 切换线程
=========
续 特别感谢@扔物线给的额外的总结
下面提到的“操作”包括产生事件、用操作符操作事件以及最终的通过 subscriber 消费事件
只有第一subscribeOn() 起作用(所以多个 subscribeOn() 毛意义)
这个 subscribeOn() 控制从流程开始的第一个操作,直到遇到第一个 observeOn()
observeOn() 可以使用多次,每个 observeOn() 将导致一次线程切换(),这次切换开始于这次 observeOn() 的下一个操作
不论是 subscribeOn() 还是 observeOn(),每次线程切换如果不受到下一个 observeOn() 的干预,线程将不再改变,不会自动切换到其他线程
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/13000.html
摘要:谜之二一个异步库最迷人的是什么答案就是把异步序列写到一个工作流里和的如出一辙。综上所述,如果我们需要我们的在一个别的线程上运行的时候,只需要在后面跟一个即可。 【谜之RxJava (二) —— Magic Lift】 Rxjava -- 一个异步库 RxJava最迷人的是什么?答案就是把异步序列写到一个工作流里!和javascript的Promise/A如出一辙。OK,在java中做异...
摘要:指定所发生的线程,即被激活时所处的线程。它指定的操作将在主线程运行。也就是说,的处理和最后的消费事件都会在线程中执行。 showImg(https://segmentfault.com/img/remote/1460000018252960); 前言 经过前几篇的介绍,对RxJava对模式有了一定的理解:由Observable发起事件,经过中间的处理后由Observer消费。(对RxJ...
摘要:指定所发生的线程,即被激活时所处的线程。它指定的操作将在主线程运行。也就是说,的处理和最后的消费事件都会在线程中执行。 showImg(https://segmentfault.com/img/remote/1460000018252960); 前言 经过前几篇的介绍,对RxJava对模式有了一定的理解:由Observable发起事件,经过中间的处理后由Observer消费。(对RxJ...
摘要:前面说过的功能很强大,不仅仅是实现链式的异步操作,它的功能很强大还可以通过实现的消息事件传递功能,我们来看看使用实现继承了类又实现了接口,可以同时担当订阅者和被订阅者的角色一个类产生多个对象,用一存储起来,以进行批量的取消订阅。前面说过Rxjava的功能很强大,不仅仅是实现链式的异步操作,它的功能很强大还可以通过RxBus实现EventBus的消息/事件传递功 能,我们来看看 RxBus ...
摘要:行为模式和差不多,区别在于的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下比更有效率。这个使用的固定的线程池,大小为核数。指定所发生的线程,即被激活时所处的线程。或者叫做事件产生的线程。 前言 本篇主要介绍Rxjava在 Android 项目中的基础使用和常用方法,旨在给对 RxJava 感兴趣的人一些入门的指引.对Rxjava不熟悉的朋友可以去看我之前写的一...
阅读 2457·2021-10-11 10:57
阅读 1362·2021-09-26 09:55
阅读 1119·2021-09-06 15:11
阅读 3162·2021-08-26 14:16
阅读 423·2019-08-30 15:54
阅读 361·2019-08-30 12:43
阅读 3081·2019-08-29 16:18
阅读 2410·2019-08-23 16:14