资讯专栏INFORMATION COLUMN

Rxjava2.x源码解析(二): 线程切换

Xufc / 1100人阅读

摘要:即,默认情况下它会随着上游线程的切换而切换,二者始终在一个线程,除非它通过自行指定。然后就直接将这个交给线程池去执行了。即这行代码所在的线程。指定的是下游接收事件的线程即这些回调方法的执行线程。

上一篇文章Rxjava2.x源码解析(一): 订阅流程中我们讲了 RxJava2 的订阅部分的源码。但 RxJava2 最强大的部分其实是在异步。默认情况下,下游接收事件所在的线程和上游发送事件所在的线程是同一个线程。接下来我们在上一篇文章的示例代码中加入线程切换相关代码:

        // 上游 observable
        Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                Log.d(TAG, "subscribe: ");
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        });
        // 下游 observer
        Observer observer = new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                // onSubscribe 方法会最先被执行
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: ");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        };

        // 在子线程中进行事件的发送
        observable.subscribeOn(Schedulers.newThread())
                // 切换到UI线程进行监听
                .observeOn(AndroidSchedulers.mainThread())
                // 将上游和下游进行关联
                .subscribe(observer);

我们通过subscribeOn(Schedulers.newThread())这行代码,就可以将我们上游的代码切换到子线程中去执行,通过observeOn(AndroidSchedulers.mainThread())又能指定下游监听的代码执行在主线程(这里的 AndroidSchedulers 并不是RxJava2 默认提供的,而是属于Android领域的,由RxAndroid这个库实现)。一行代码,就能自由切换上下游的代码执行的线程,这么骚的操作,到底是怎么实现的呢?

我们上面两个方法中传入的都是一个Scheduler实例,翻译过来就是“调度器”,负责线程相关的调度。

那接下来我们就先从上游相关的subscribeOn(Schedulers.newThread())开始分析。
先从参数入手,看看这个Schedulers.newThread()中执行了什么:

public final class Schedulers {
    static final Scheduler SINGLE;

    static final Scheduler COMPUTATION;

    static final Scheduler IO;

    static final Scheduler TRAMPOLINE;

    // 这里是 NEW_THREAD
    static final Scheduler NEW_THREAD;

    static final class SingleHolder {...}

    static final class ComputationHolder {...}

    static final class IoHolder {...}

    // 初始化一个默认的 NewThreadScheduler
    static final class NewThreadHolder {
        static final Scheduler DEFAULT = new NewThreadScheduler();
    }

    static {
        ...

        // 由一个新创建的 NewThreadTask 来初始化 NEW_THREAD
        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }


    @NonNull
    public static Scheduler newThread() {
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    }

    ...

    static final class IOTask implements Callable {...}

    // 这里是 NewThreadTask
    static final class NewThreadTask implements Callable {
        @Override
        public Scheduler call() throws Exception {
            return NewThreadHolder.DEFAULT;
        }
    }

    static final class SingleTask implements Callable {...}

    static final class ComputationTask implements Callable {...}
}

可以看到,newThread(...)方法会返回一个Scheduler类型的静态变量NEW_THREAD,而该变量的初始化是在如下的静态代码块中:

    static {
        ...

        // 由一个新创建的 NewThreadTask 来初始化 NEW_THREAD,类型为 Scheduler
        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }

这里面创建了一个NewThreadTask实例,该类也比较简单,就是在call()方法中返回了NewThreadHolder.DEFAULT

    static final class NewThreadTask implements Callable {
        @Override
        public Scheduler call() throws Exception {
            return NewThreadHolder.DEFAULT;
        }
    }

NewThreadHolder.DEFAULT则是一个NewThreadScheduler对象:

    // 初始化一个默认的 NewThreadScheduler
    static final class NewThreadHolder {
        static final Scheduler DEFAULT = new NewThreadScheduler();
    }

那我们不禁好奇,这个call()方法又是什么时候调用的呢?我们继续回到RxJavaPlugins.initNewThreadScheduler(new NewThreadTask())这行代码,从名称来看是初始化NewThreadScheduler对象的,那我们进去看下是如何进行的:

    public static Scheduler initNewThreadScheduler(@NonNull Callable defaultScheduler) {
        ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can"t be null");
        Function, ? extends Scheduler> f = onInitNewThreadHandler;
        if (f == null) {
        // 直接看这里
            return callRequireNonNull(defaultScheduler);
        }
        return applyRequireNonNull(f, defaultScheduler);
    }

作为聪明人,我们直接看callRequireNonNull(defaultScheduler)这行代码:

    static Scheduler callRequireNonNull(@NonNull Callable s) {
        try {
            // 可以看到,这里调用了 s.call(),并将结果返回;若为空,则报异常
            return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can"t be null");
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }

可以看到,里面直接调用了传入的参数的call()方法,并返回。
到这里,就知道了,RxJavaPlugins.initNewThreadScheduler(new NewThreadTask())这行代码其实就是初始化一个NewThreadScheduler对象。

绕了这么远,其实Schedulers.newThread()这句就是创建了一个NewThreadScheduler对象,这里讲的比较细。

我们继续回来,看看subscribeOn(Schedulers.newThread())里面做了什么:

    public final Observable subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
    }

根据第一篇文章里的经验,我们知道,这里又是将上一步生成的 Observable 进一步封装成一个ObservableSubscribeOn并返回。其实,RxJava之所以能进行链式调用,无外乎就是在每次调用操作符方法的时候,返回一个 Observable 的引用,但是这个 Observable 所具体指向的对象,可能是不同的。中间可能就创建了新的对象,经过了一层层的包装。RxJava 里装饰器模式用的还是比较厉害的,所以说,千万别觉的实际模式都是虚无缥缈的东西。

这里返回的是一个ObservableSubscribeOn对象(注意看命名哦!规律之前讲过的)

经过上篇文章分析,我们知道,使用 Observable 的 subscribe 方法进行订阅的时候,最终会调用到 Observable 的subscribeActual(...)方法,这里的Observable具体就是ObservableSubscribeOn

    // ObservableSubscribeOn.java
    public void subscribeActual(final Observer observer) {
        final SubscribeOnObserver parent = new SubscribeOnObserver(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

可以看到,这里将 observer 也进行了包装,包装成SubscribeOnObserver对象。也相当于配套啦,haha。

然后又将这个封装后的对象传进了一个新建的 SubscribeTask 对象中。

???
这个SubscribeTask又是啥?
这个SubscribeTaskObservableSubscribeOn这个类的内部类,其实就是一个Runnable实现类:

public final class ObservableSubscribeOn extends AbstractObservableWithUpstream {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer observer) {
        // 创建一个新的 Observer
        final SubscribeOnObserver parent = new SubscribeOnObserver(observer);

        observer.onSubscribe(parent);

    // 进行线程任务的创建及分发
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    ... 
    // 是个 Runnable 实现类
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver parent;

        SubscribeTask(SubscribeOnObserver parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            // 注意,此处是关键,正是从这里开始,上游(即:source)在新线程重新对下游进行订阅。
            // 从而达到上游发送事件的线程进行切换的目的
            // 这里提前提醒下,多次订阅,并不是只有第一次订阅指定的线程才有效,那只是普通使用场景下的“凑巧”
            source.subscribe(parent);
        }
    }
}

到这,我们总算看到了线程相关的东西了。Runnable 大家肯定都熟悉吧?在它的run()方法中,调用了source.subscribe(parent),这里的 parent 我们知道,是封装之后的SubscribeOnObserver,但source又是啥?其实就是我们在 ObservableSubscribeOn 的构造函数中传进来的this,即上游的 Observable :

    // Observable.java
    public final Observable subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        // 这里传进来的 this对象,就是上游 Observable 对象
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
    }

抽象类 Observable 实现了 ObservableSource 接口,这个接口就是我们进行订阅时候用到的subscribe(...):

public interface ObservableSource {

    void subscribe(@NonNull Observer observer);
}

继续看这个 run() 方法,它相当于是把之前的上游通过subscribe(...)订阅到了新的下游。也就是说:

subscribeOn(...)方法的本质是,在指定的线程中将上游和下游进行订阅`。

这和我们链式调用中最后一步的订阅本质上是一样的。

明白了这点,也就能知道,这个线程一旦启动,新的 observer 接收和处理事件,也是在这个子线程里。即,默认情况下它会随着上游线程的切换而切换,二者始终在一个线程,除非它通过observeOn(...)自行指定。

我们现在明白了上游是如何通过一行代码就能运行在子线程里,但还没看到这个线程是什么时候、如何启动起来的。

那我们就回到之前的位置,继续看scheduler.scheduleDirect(new SubscribeTask(parent))这行代码,scheduler 具体指NewThreadScheduler,但scheduleDirect(...)这个方法是在父类中实现的,它没有进行重写(其他类型的 scheduler 有进行重写,比如 ComputationScheduler 等),那就进父类看看:

    // Scheduler.java
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        // createWorker()为抽象方法,由子类实现
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

这个方法的参数中有个 Runnable 对象,那我们直接启动个线程不就好了?当然是可以的。但是作为一个成熟的库,它一定要考虑更多的场景。需要考虑到线程安全问题,以及对线程的控制,比如,通过 Dispose 来截断上下游之间事件的事件流。

我们先看final Worker w = createWorker();这行代码,它创建了一个 Worker,具体点就是NewThreadWorker,这里贴下NewThreadScheduler.java的源码:

/**
 * Schedules work on a new thread.
 */
public final class NewThreadScheduler extends Scheduler {

    final ThreadFactory threadFactory;

    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    private static final RxThreadFactory THREAD_FACTORY;

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    static {
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    }

    public NewThreadScheduler() {
        this(THREAD_FACTORY);
    }

    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    @NonNull
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
}

继续回到scheduleDirect(...)方法的第 8 行:

 DisposeTask task = new DisposeTask(decoratedRun, w);

它将我们要执行的 runnable 和 Worker,又封装进了一个DisposeTask中,便于对流进行控制。DisposeTask是 Scheduler 的静态内部类,实现了Disposable, Runnable, SchedulerRunnableIntrospection这三个接口:

public abstract class Scheduler {

        ...
        
    static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {

        @NonNull
        final Runnable decoratedRun;

        @NonNull
        final Worker w;

        @Nullable
        Thread runner;

        DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
            this.decoratedRun = decoratedRun;
            this.w = w;
        }

        @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }

        @Override
        public void dispose() {
            if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
                ((NewThreadWorker)w).shutdown();
            } else {
                w.dispose();
            }
        }

        @Override
        public boolean isDisposed() {
            return w.isDisposed();
        }

        @Override
        public Runnable getWrappedRunnable() {
            return this.decoratedRun;
        }
    }
}

创建了 DisposeTask 之后,就将它传递给了worker执行:

w.schedule(task, delay, unit);

这行代码就是开始执行指定任务,我们可以进入NewThreadWorker.java源码中查看详细细节:

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable run) {
        return schedule(run, 0, null);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        // 最终会调用到 scheduleActual(...)方法
        return scheduleActual(action, delayTime, unit, null);
    }

    public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
        ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
        try {
            Future f;
            if (delayTime <= 0L) {
                f = executor.submit(task);
            } else {
                f = executor.schedule(task, delayTime, unit);
            }
            task.setFuture(f);
            return task;
        } catch (RejectedExecutionException ex) {
            RxJavaPlugins.onError(ex);
            return EmptyDisposable.INSTANCE;
        }
    }

    public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {...}

    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        /**********************************************
         *** 将我们的runnable对象,又经过了一层封装   *****
         *********************************************/
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        /*********************************************************************************
         *** 最终会通过 executor 线程池去执行相应的任务,通过Future,来获取线程执行后的返回值  *****
         ********************************************************************************/
        Future f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable)sr);
            } else {
                f = executor.schedule((Callable)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

    @Override
    public void dispose() {
        if (!disposed) {
            disposed = true;
            executor.shutdownNow();
        }
    }

    /**
     * Shuts down the underlying executor in a non-interrupting fashion.
     */
    public void shutdown() {
        if (!disposed) {
            disposed = true;
            executor.shutdown();
        }
    }

    @Override
    public boolean isDisposed() {
        return disposed;
    }
}

w.schedule(task, delay, unit)最终会调用到第 46 行的scheduleActual(...)方法。在该方法中,又将新传进来的runnable对象封装进 ScheduledRunnable ,封装了这么多层...~~(>_<)~~。然后就直接将这个 ScheduledRunnable交给线程池去执行了。为了能在线程执行完之后,接收返回值,使用了Future。再往下,就完全是线程池相关的知识点了,此处不再赘述。

到这,我们就完全分析完了 RxJava2 是如何通过一行subscribeOn(...)代码切换上游发送事件所在线程的。接下来我们就来分析observeOn(...)是如何切换下游处理事件的线程的。

线程的创建,这里跟之前是相同的。该方法最终会调用到如下重载方法:

    public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ...
        // 创建了一个 ObservableObserveOn 并返回
        return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));
    }

直接进ObservableObserveOnsubscribeActual(...)方法:

    protected void subscribeActual(Observer observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
        }
    }

这个方法就比较简单了,直接将上游和新创建的ObserveOnObserver进行绑定。并且在创建的ObserveOnObserver的同时,也将 worker 传进去,进行线程任务的相关处理。到这里,我们可以猜想下,封装之后的新的 ObserveOnObserver 是如何做到使原observer中的任务在指定的线程中执行的。其实就是重写对应的方法,将之前的逻辑通过worker来指定执行线程。边追源码边猜想,才能更好的理解。

接下来就来看ObservableObserveOn.java#ObserveOnObserver的源码:

static final class ObserveOnObserver extends BasicIntQueueDisposable
    implements Observer, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
 
        ...

        ObserveOnObserver(Observer actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                if (d instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable qd = (QueueDisposable) d;

                    // 注意,这里调用了 requestFusion  来获取 mode,之后会用到   
                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        downstream.onSubscribe(this);
                        // 如果是sync,会立即调用 schedule()
                        // 执行线程任务,查看run方法
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        downstream.onSubscribe(this);
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue(bufferSize);

                downstream.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            // 执行线程任务,查看run方法
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            // 执行线程任务,查看run方法
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            // 执行线程任务,查看run方法
            schedule();
        }

        @Override
        public void dispose() {... }

        @Override
        public boolean isDisposed() {
            return disposed;
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

        void drainNormal() {
            int missed = 1;

            final SimpleQueue q = queue;
            final Observer a = downstream;

            for (;;) {
                // checkTerminated 方法会检查任务是否执行结束。
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    // checkTerminated 方法会检查任务是否执行结束。
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        void drainFused() {...}

        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                // outputFused 是跟背压及操作符相关,这里直接分析 drainNormal()
                drainNormal();
            }
        }

        boolean checkTerminated(boolean d, boolean empty, Observer a) {
            if (disposed) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                // 是否设置了超时错误,是在 observeOn(scheduler, delayError, bufferSize()) 的第二个参数传入的,
                // 默认传了false
                if (delayError) {
                    if (empty) {
                        disposed = true;
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    // 根据是否报了异常,来决定是执行 onError 还是 onComplete
                    if (e != null) {
                        disposed = true;
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        disposed = true;
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }

    ...
    }

为了验证我们的猜想,我们看看在onSubscribe/onNext/onError/onComplete这些函数中都调用了什么。

我们发现,在这些函数中,差不多都调用了schedule();(调用 requestFusion(...)相关逻辑暂时忽略)。查看该函数的调用出,在第93行:

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

这里直接将this传递给了 worker 进行线程任务的执行,这里的this指的就是ObserveOnObserver,上面说道,它实现了 runnable 接口。而onSubscribe/onNext/onError/onComplete这些函数中都调用了同一个函数schedule();,有理由猜想,对各个函数的区分处理,肯定就在重写的run()方法里了,查看第150行:

        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

outputFused 涉及背压及操作符的相关处理,这里我们直接看drainNormal();

       void drainNormal() {
            int missed = 1;

            final SimpleQueue q = queue;
            final Observer a = downstream;

            for (;;) {
                // checkTerminated 方法会检查任务是否执行结束。
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    // checkTerminated 方法会检查任务是否执行结束。
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }
                    // 如果没结束,就调用新的Observer的 onNext方法
                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

在该方法中,首先通过checkTerminated(...)判断线程任务是否执行结束(complete或者error),如果没有,就去执行新的下游Observer的onNext()方法。如果执行完了,就直接返回。

那啥时候调用了新的下游Observer的onComplete/onError方法呢?当然是在checkTerminated(...)方法中啦:

 boolean checkTerminated(boolean d, boolean empty, Observer a) {
            if (disposed) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                // 是否设置了超时错误,是在 observeOn(scheduler, delayError, bufferSize()) 的第二个参数传入的,
                // 默认传了false
                if (delayError) {
                    if (empty) {
                        disposed = true;
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    // 根据是否报了异常,来决定是执行 onError 还是 onComplete
                    if (e != null) {
                        disposed = true;
                        queue.clear();
                        // 执行 onError
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        disposed = true;
                        // 执行 onComplete
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }

在该方法里,我们就看到了对onComplete()/onError方法的调用了。

好了,到这里,我们就把rxjava2 中线程切换的知识讲完了,里面还有很多细节需要大家自己细细研究。

总结

下游observeronSubscribe(...)方法一直是在它所在的线程调用的。即observable.subscribe(observer)这行代码所在的线程。

subscribeOn(...)指定的是上游发送事件的线程, 比如ObservableOnSubscribesubscribe(ObservableEmitter emitter){...}方法执行的线程,在该方法里我们往往会调用emitter.onNext(...)/onComplete()/onError(...)来发送事件。

observeOn(...) 指定的是下游接收事件的线程,即onSubscribe(...)/ onNext(...)/onError(...)/onComplete()这些回调方法的执行线程。

默认情况下,下游接收事件的线程和上游发送事件的线程,是同一个线程,下游与上游保持一致。上游通过subscribeOn(...)切换线程的时候,下游仍会自动与其保持一致。除非下游多带带通过observeOn(...)来指定下游自己的线程。

此外,还需要特别指出的一点就是,多次指定上游的线程只有第一次指定的有效这个结论是:
错误的 错误的 错误的

很多文章中也都是这么说的,但是很遗憾,是错误的,因为很多人都只是从表象出发,连续调用两次subscribeOn,然后在下游Observer的onSubscribe回调里打印线程名称,发现一直是第一次指定的那个线程,就开始想当然的总结结论了,他们的代码应该是下面这样的:

        // 上游 observable
        Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                Log.d(TAG, "subscribe: ");
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
                Log.d(TAG, "subscribe: 当前线程为: " + Thread.currentThread().getName());
            }
        });

        // 下游 observer
        Observer observer = new Observer() {...}
        
        observable
                // 第一次指定
                .subscribeOn(AndroidSchedulers.mainThread())
                // 第二次指定
                .subscribeOn(Schedulers.newThread())
                // 切换到UI线程进行监听
                .observeOn(AndroidSchedulers.mainThread())
                // 将上游和下游进行关联
                .subscribe(observer);

打印结果为:

你不断调整两个的位置,发现仍然是指定的第一个有效,似乎你是对的。不防试试下面的例子:

        // 上游 observable
        Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                Log.d(TAG, "subscribe: ");
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
                Log.d(TAG, "subscribe: 当前线程为: " + Thread.currentThread().getName());
            }
        });
        
        // 下游 observer
        Observer observer = new Observer() {...}
        
        observable
                // 第一次指定
                .subscribeOn(AndroidSchedulers.mainThread())
                // 创建第一个 onSubscribe
                .doOnSubscribe(new Consumer() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        Log.d(TAG, "accept1: 当前线程为:" + Thread.currentThread().getName());
                    }
                })
                // 第二次指定
                .subscribeOn(Schedulers.newThread())
                // 创建第二个 onSubscribe
                .doOnSubscribe(new Consumer() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        Log.d(TAG, "accept2: 当前线程为:" + Thread.currentThread().getName());
                    }
                })
                // 切换到UI线程进行监听
                .observeOn(AndroidSchedulers.mainThread())
                // 将上游和下游进行关联
                .subscribe(observer);

结果如下:


可以看到,每个doOnSubscribe(...)内的代码,运行在它上面离它最近的subscribeOn()指定的线程。也就是说,多次切换都生效了。这点也可以参考我们上面的总结里的第一条:

下游observer的onSubscribe(...)方法一直是在它所在的线程调用的。即observable.subscribe(observer)这行代码所在的线程。

doOnSubscribe操作符就不展开讲了。

再仔细看上面的截图,发现我们在第二个doOnSubscribe(...)方法中的代码反而要比第一个先执行。Why?这其实是在向上回溯。希望你还能记得,我们前面说:

subscribeOn(...)方法的本质是,在指定的线程中将上游和下游进行订阅`。

这个“上游”是个相对概念,上游之上,还有上游,所以就不断回溯,最终调用到最开始指定的那个线程。

虽然表面上看,确实是第一个指定的有效,但是千万别被欺骗了。

好了,到这,本篇文章就结束了。文章较长,可以耐心点,反复看看。

通过对 RxJava2 的研究,发现里面涉及到很多知识,我也是一边读一遍补其他知识。比如里面涉及很多并发编程的知识,而并发编程又需要你对计算机组成原理、操作系统、编译原理这些有一定的了解,还好大学考软考的时候看过这些方面的书,拾起来相对容易点。

欠的技术债总是要还的,正面刚吧。

欢迎关注公众号来获取其他最新消息,有趣的灵魂在等你。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/14952.html

相关文章

  • Rxjava2.x 源码解析): 线程切换

    摘要:这个上游是个相对概念,上游之上,还有上游,所以就不断回溯,最终调用到最开始指定的那个线程。虽然表面上看,确实是第一个指定的有效,但是千万别被欺骗了。文章较长,可以耐心点,反复看看。 这个上游是个相对概念,上游之上,还有上游,所以就不断回溯,最终调用到最开始指定的那个线程。 虽然表面上看,确实是第一个指定的有效,但是千万别被欺骗了。 好...

    LinkedME2016 评论0 收藏0
  • Rxjava2.x 源码解析): 线程切换

    摘要:这个上游是个相对概念,上游之上,还有上游,所以就不断回溯,最终调用到最开始指定的那个线程。虽然表面上看,确实是第一个指定的有效,但是千万别被欺骗了。文章较长,可以耐心点,反复看看。 这个上游是个相对概念,上游之上,还有上游,所以就不断回溯,最终调用到最开始指定的那个线程。 虽然表面上看,确实是第一个指定的有效,但是千万别被欺骗了。 好...

    MadPecker 评论0 收藏0
  • Rxjava2.x 源码解析): 线程切换

    摘要:这个上游是个相对概念,上游之上,还有上游,所以就不断回溯,最终调用到最开始指定的那个线程。虽然表面上看,确实是第一个指定的有效,但是千万别被欺骗了。文章较长,可以耐心点,反复看看。 这个上游是个相对概念,上游之上,还有上游,所以就不断回溯,最终调用到最开始指定的那个线程。 虽然表面上看,确实是第一个指定的有效,但是千万别被欺骗了。 好...

    lowett 评论0 收藏0
  • 「码个蛋」2017年200篇精选干货集合

    摘要:让你收获满满码个蛋从年月日推送第篇文章一年过去了已累积推文近篇文章,本文为年度精选,共计篇,按照类别整理便于读者主题阅读。本篇文章是今年的最后一篇技术文章,为了让大家在家也能好好学习,特此花了几个小时整理了这些文章。 showImg(https://segmentfault.com/img/remote/1460000013241596); 让你收获满满! 码个蛋从2017年02月20...

    wangtdgoodluck 评论0 收藏0
  • Rxjava2.x源码解析(一): 订阅流程

    摘要:现在网上已经有大量的源码分析文章,各种技术的都有。然后立即调用的方法,表示订阅过程完成。好啦,本篇文章就写到这里,带大家完成了订阅事件的发送及处理的整个流程。毕竟,不谈线程切换,谈什么源码分析,哈哈。 现在网上已经有大量的源码分析文章,各种技术的都有。但我觉得很多文章对初学者并不友好,让人读起来云里雾里的,比源码还源码。究其原因,是根本没有从学习者的角度去分析。在自己完成了源码阅读之后...

    张红新 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<