资讯专栏INFORMATION COLUMN

谜之RxJava (一) —— 最基本的观察者模式

ckllj / 308人阅读

摘要:最近在界,最火的大概就是了。先来上个最最简单的,经典的。这段代码产生的最终结果就是在里会出现。原来只是为了帮我们处理好异常,以及防止工作流的重复。这是最最基本的工作流,让我们认识到他是怎么工作的。谜之二欢迎关注我以及

最近在Android界,最火的framework大概就是RxJava了。
扔物线大大之前写了一篇文章 《给 Android 开发者的 RxJava 详解》,在我学习RxJava的过程中受益匪浅。经过阅读这篇文章后,我们来看下RxJava的源码,揭开它神秘的面纱。

这里准备分几篇文章写,为了能让自己有个喘口气的机会。

先来上个最最简单的,经典的Demo。

Demo
Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {
        subscriber.onNext("hello");
    }
}).subscribe(new Subscriber() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {
        Log.d("rx", s);
    }
});

这段代码产生的最终结果就是在Log里会出现hello。

看下这段代码的具体流程吧。
这里有2个函数createsubscribe,我们看看create里面干了啥。

OnSubscribe对象
public final static  Observable create(OnSubscribe f) {
    return new Observable(hook.onCreate(f));
}
// constructor
protected Observable(OnSubscribe f) {
    this.onSubscribe = f;
}

这里的hook是一个默认实现,里面不做任何事,就是返回f。我们看见create只是给ObservableonSubscribe赋值了我们定义的OnSubscribe

Subscriber对象

来看下subscribe这个函数做了什么事

public final Subscription subscribe(Subscriber subscriber) {
    return Observable.subscribe(subscriber, this);
}

private static  Subscription subscribe(Subscriber subscriber, Observable observable) {
 // validate and proceed
    if (subscriber == null) {
        throw new IllegalArgumentException("observer can not be null");
    }
    if (observable.onSubscribe == null) {
        throw new IllegalStateException("onSubscribe function can not be null.");
        /*
         * the subscribe function can also be overridden but generally that"s not the appropriate approach
         * so I won"t mention that in the exception
         */
    }
    
    // new Subscriber so onStart it
    subscriber.onStart();
    
    /*
     * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
     * to user code from within an Observer"
     */
    // if not already wrapped
    if (!(subscriber instanceof SafeSubscriber)) {
        // assign to `observer` so we return the protected version
        subscriber = new SafeSubscriber(subscriber);
    }

    // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
    try {
        // allow the hook to intercept and/or decorate
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
        // special handling for certain Throwable/Error/Exception types
        Exceptions.throwIfFatal(e);
        // if an unhandled error occurs executing the onSubscribe we will propagate it
        try {
            subscriber.onError(hook.onSubscribeError(e));
        } catch (OnErrorNotImplementedException e2) {
            // special handling when onError is not implemented ... we just rethrow
            throw e2;
        } catch (Throwable e2) {
            // if this happens it means the onError itself failed (perhaps an invalid function implementation)
            // so we are unable to propagate the error correctly and will just throw
            RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
            // TODO could the hook be the cause of the error in the on error handling.
            hook.onSubscribeError(r);
            // TODO why aren"t we throwing the hook"s return value.
            throw r;
        }
        return Subscriptions.unsubscribed();
    }
}

我们看到,这里我们的subscriberSafeSubscriber包裹了一层。

if (!(subscriber instanceof SafeSubscriber)) {
    // assign to `observer` so we return the protected version
    subscriber = new SafeSubscriber(subscriber);
}

然后开始执行工作流

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);

默认的hook只是返回我们之前定义的onSubscribe,这里调用的call方法就是我们在外面定义的

new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {
        subscriber.onNext("hello");
    }
})

我们调用传入的subscriber对象的onNext方法,这里的subscriberSafeSubscriber
SafeScriber

public void onNext(T args) {
    try {
        if (!done) {
            actual.onNext(args);
        }
    } catch (Throwable e) {
        // we handle here instead of another method so we don"t add stacks to the frame
        // which can prevent it from being able to handle StackOverflow
        Exceptions.throwIfFatal(e);
        // handle errors if the onNext implementation fails, not just if the Observable fails
        onError(e);
    }
}

actual就是我们自己定义的subscriber。 原来SafeSubscriber只是为了帮我们处理好异常,以及防止工作流的重复。

这是RxJava最最基本的工作流,让我们认识到他是怎么工作的。之后我们来讲讲其中的细节和其他神奇的内容。

【谜之RxJava (二) —— Magic Lift】

欢迎关注我Github 以及 weibo、@Gemini

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/12440.html

相关文章

  • 谜之RxJava (三)—— 线程切换

    摘要:谜之二一个异步库最迷人的是什么答案就是把异步序列写到一个工作流里和的如出一辙。综上所述,如果我们需要我们的在一个别的线程上运行的时候,只需要在后面跟一个即可。 【谜之RxJava (二) —— Magic Lift】 Rxjava -- 一个异步库 RxJava最迷人的是什么?答案就是把异步序列写到一个工作流里!和javascript的Promise/A如出一辙。OK,在java中做异...

    walterrwu 评论0 收藏0
  • 谜之RxJava(四)—— Retrofit和RxJava基情

    摘要:我们可以看下,它把,也就是取消请求的情况处理的挺好。这段代码是给增加一个的事件。也就是请求完成的时候,会自动对进行一个终止,也就是的行为。 概述 前文回顾: 谜之RxJava (三)—— 线程切换 今天来介绍下和RxJava搭配使用的好基友,就是我们的Retrofit啦,Retrofit使用动态代理的机制,为我们提供了一个简要的使用方法来获取网络上的资料,现在更新到2.0.0-beta...

    Scliang 评论0 收藏0
  • 谜之RxJava (二) —— Magic Lift

    摘要:回顾上一篇文章讲了和之间的关系。我们知道,的具体工作都是在中完成的。这样就生成了一个代理的,最后我们最外层的对象对我们代理的进行了调用。。也就是此处的就是被包裹后的对象。迷之三线程切换欢迎关注我以及 回顾 上一篇文章 讲了Observable、OnSubscribe和Subscriber之间的关系。 我们知道,Observable的具体工作都是在OnSubscribe中完成的。从这个类...

    fobnn 评论0 收藏0
  • 谜之RxJava (三)update —— 线程切换(二)

    摘要:在更新版本后,这个接口进行了一个重构,变换方式从一个比较难理解的递归嵌套的上做一个改成了从角度上进行了一层封装。从类型来说,脱离了的概念,变身成了。而在老接口中,变换前的是通过传进来的。 在RxJava更新版本后,OperatorSubscribeOn这个接口进行了一个重构,变换方式从一个比较难理解的递归嵌套的Observable上做一个Operator改成了从OnSubscribe角...

    fox_soyoung 评论0 收藏0
  • 谜之RxJava (三)update 2 —— subscribeOn 和 observeOn

    摘要:操作,操作是在线程上,因为在他们之前的切换了线程。特别注意那一段,对于操作和操作是无效的再简单点总结就是的调用切换之前的线程。 开头 之前我们分析过subscribeOn这个函数,现在我们来看下subscribeOn和observeOn这两个函数到底有什么异同。 用过rxjava的旁友都知道,subscribeOn和observeOn都是用来切换线程用的,可是我什么时候用subscri...

    justCoding 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<