资讯专栏INFORMATION COLUMN

RxJava初级解析(一)

frank_fun / 1813人阅读

摘要:二实例解析最简单的例子可被观察者观察者订阅执行以上代码会打印出如下结果这样一个最简单的代码就完成了。所以通过你可以自定义一些实现方法的,这些通常是可以作为公用的操作。这些仅仅是最基础用法的一个解析。

扔物线大大的文章确实写的牛 扔物线,看了他的文章受益匪浅,文中很多会引用到他的一些分析,没有看过他的文章的建议先看一下。
一.概述 先简单介绍一下RxJava的思想

RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

概念介绍 1.Observable(可观察者,即被观察者)

事件的触发者。

2.Observer/Subscriber(观察者)

事件的产生者

3.subscribe(订阅)

可被观察者和观察者之间的桥梁

4.事件

产生的事件

5.总结

举个例子:我是一名读者杂志会员,我想订阅读者期刊,当我订阅之后,读者工作室就会每个月给我发一本读者杂志。在这个事件中,我就是一名被观察者,读者工作室就是观察者,因为我一旦产生订阅这件事,就会触发读者工作室的一系列动作。

二.实例解析 1.最简单的例子

observable(可被观察者)

        Observable observable = Observable.create(new Observable.OnSubscribe() {

            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext("test1");
                subscriber.onNext("test2");
                subscriber.onCompleted();
            }
        });

subscriber/observer(观察者)

private Subscriber subscribe = new Subscriber() {
    @Override
    public void onCompleted() {
        Log.e("HP", "onCompleted");
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {
        Log.e("HP", s);
    }
};

subscribe(订阅)

observable.subscribe(subscribe);

执行以上代码会打印出如下结果

test1
test2
onCompleted

这样一个最简单的RxJava代码就完成了。

为什么在call方法中调用onNext(),onCompleted()会触发subscriber/observer中对应的方法呢?接下来一起看一下源码,看看是如何订阅成功的。

进入到Observable.create()方法

     public final static  Observable create(OnSubscribe f) {
            return new Observable(hook.onCreate(f));
        }
protected Observable(OnSubscribe f) {
    this.onSubscribe = f;
}

这里创建了一个Observable对象,同时通过构造函数将 f 赋值给Observalbe类中的onSubscribe

进入到subscribe方法

  

      public final Subscription subscribe(Subscriber subscriber) {
            return Observable.subscribe(subscriber, this);
        }
        
        private static  Subscription subscribe(Subscriber subscriber, Observable observable) {
         // validate and proceed
            if (subscriber == null) {
                throw new IllegalArgumentException("observer can not be null");
            }
            if (observable.onSubscribe == null) {
                throw new IllegalStateException("onSubscribe function can not be null.");
                /*
                 * the subscribe function can also be overridden but generally that"s not the appropriate approach
                 * so I won"t mention that in the exception
                 */
            }
            
            // new Subscriber so onStart it
            subscriber.onStart();
            
            /*
             * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
             * to user code from within an Observer"
             */
            // if not already wrapped
            if (!(subscriber instanceof SafeSubscriber)) {
                // assign to `observer` so we return the protected version
                subscriber = new SafeSubscriber(subscriber);
            }
    
            // The code below is exactly the same an unsafeSubscribe but not used because it would 
            // add a significant depth to already huge call stacks.
            try {
                // allow the hook to intercept and/or decorate
                hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
                return hook.onSubscribeReturn(subscriber);
            } catch (Throwable e) {
                // special handling for certain Throwable/Error/Exception types
                Exceptions.throwIfFatal(e);
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(hook.onSubscribeError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    hook.onSubscribeError(r);
                    // TODO why aren"t we throwing the hook"s return value.
                    throw r;
                }
                return Subscriptions.unsubscribed();
            }
        }

我们看到

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);

其中hook.onSubscribeStart返回的是create()方法中创建的OnSubscribe对象,即上文中提到的 onSubscribe ,在这里调用OnSubscribe 中的call方法,将subscriber/observer传递到call方法中,所以我们在call中调用onNext(),onError(),onCompleted()会触发observer中对应的方法,从而达到了事件通知的效果。

2.订阅Action

观察者除了我们的subscriber/observer之外,还可以是Action

再看一个例子
定义一个action

private Action1 action1 = new Action1() {
    @Override
    public void call(String s) {
        Log.e("HP",s);
    }
};
observable.subscribe(action1);

再看输出结果

test1
test2

不出所料,输出结果与预期的相同。
进入subscribe方法看以看究竟

    public final Subscription subscribe(final Action1 onNext) {
            if (onNext == null) {
                throw new IllegalArgumentException("onNext can not be null");
            }
    
            return subscribe(new Subscriber() {
    
                @Override
                public final void onCompleted() {
                    // do nothing
                }
    
                @Override
                public final void onError(Throwable e) {
                    throw new OnErrorNotImplementedException(e);
                }
    
                @Override
                public final void onNext(T args) {
                    onNext.call(args);
                }
    
            });

原来在subscribe方法中将我们的action转换成了subscribe。
所以通过action你可以自定义一些实现onNext(),onError(),onComplete()方法的action,这些action通常是可以作为公用的操作。

这些仅仅是RxJava最基础用法的一个解析。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/12988.html

相关文章

  • Rxjava2.x源码解析): 订阅流程

    摘要:现在网上已经有大量的源码分析文章,各种技术的都有。然后立即调用的方法,表示订阅过程完成。好啦,本篇文章就写到这里,带大家完成了订阅事件的发送及处理的整个流程。毕竟,不谈线程切换,谈什么源码分析,哈哈。 现在网上已经有大量的源码分析文章,各种技术的都有。但我觉得很多文章对初学者并不友好,让人读起来云里雾里的,比源码还源码。究其原因,是根本没有从学习者的角度去分析。在自己完成了源码阅读之后...

    张红新 评论0 收藏0
  • RxJava2.x源码解析):订阅流程

    摘要:现在网上已经有大量的源码分析文章,各种技术的都有。你完全可以写成下面的链式风格方法会最先被执行同样,为了便于理解,我会借用流里面经常用到的水流进行类比。该子类的命名是有规律可言的。现在网上已经有大量的源码分析文章,各种技术的都有。但我觉得很多文章对初学者并不友好,让人读起来云里雾里的,比源码还源码。究其原因,是根本没有从学习者的角度去分析。在自己完成了源码阅读之后,却忘记了自己是如何一步步提...

    harryhappy 评论0 收藏0
  • RxJava 源码解析之观察者模式

    摘要:传统观察者模式观察者模式面向的需求是对象观察者对对象被观察者的某种变化高度敏感,需要在变化的一瞬间做出反应。如下图而作为一个工具库,使用的就是通用形式的观察者模式。这是为了方便以上就是最基本的一个通过观察者模式,来响应事件的原理。 了解 RxJava 的应该都知道是一个基于事务驱动的库,响应式编程的典范。提到事务驱动和响应就不得不说说,设计模式中观察者模式,已经了解的朋友,可以直接跳过...

    Steve_Wang_ 评论0 收藏0
  • 「码个蛋」2017年200篇精选干货集合

    摘要:让你收获满满码个蛋从年月日推送第篇文章一年过去了已累积推文近篇文章,本文为年度精选,共计篇,按照类别整理便于读者主题阅读。本篇文章是今年的最后一篇技术文章,为了让大家在家也能好好学习,特此花了几个小时整理了这些文章。 showImg(https://segmentfault.com/img/remote/1460000013241596); 让你收获满满! 码个蛋从2017年02月20...

    wangtdgoodluck 评论0 收藏0
  • 20+个很棒的Android开源项目

    摘要:一套完整有效的组件化方案,支持组件的组件完全隔离单独调试集成调试组件交互跳转动态加载卸载等功能项目地址非常棒的开源应用程序列表项目地址实现多插件化和动态加载,支持资源分包和热修复项目地址灵活的组件化路由框架项目地址用于显示 DDComponentForAndroid 一套完整有效的android组件化方案,支持组件的组件完全隔离、单独调试、集成调试、组件交互、UI跳转、动态加载卸载等功...

    olle 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<