资讯专栏INFORMATION COLUMN

Android RxJava系列二: 常用拓展操作符

gghyoo / 1901人阅读

摘要:这个方法可以将事件序列中的元素进行整合加工,返回一个新的被观察者。功能操作符延迟一段事件发送事件。在执行上述操作显示在线程执行上述准备操作在线程执行下面操作开始了指定被观察者的线程,要注意的时,如果多次调用此方法,只有第一次有效。

前言

本篇文章主要介绍Rxjava 2.x的一些常用的操作符,对Rxjava不熟悉的朋友可以先去看下我之前的两篇介绍

Android RxJava:基础介绍与使用

Android RxJava系列一: 基础常用详解

创建操作符

create() 创建一个被观察者

public static  Observable create(ObservableOnSubscribe source)
Observable observable = Observable.create(new ObservableOnSubscribe() {
    @Override
    public void subscribe(ObservableEmitter e) throws Exception {
        e.onNext("This is Observer"); //通过 ObservableEmitter 发射器向观察者发送事件。
        e.onComplete();
    }
});

just() 创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。

public static  Observable just(T item) 
......
public static  Observable just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
Observable.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "-------onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "-------onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "-------onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "-------onComplete ");
    }
});

使用just()方法创建Observable对象,Observable会将事件逐个发送

From 操作符

fromArray() 这个方法和 just() 类似,只不过 fromArray 可以传入一个数组

fromCallable() Callable 和 Runnable 的用法基本一致,只是它会返回一个结果值

fromIterable() 直接发送一个 List 集合数据给观察者

public static  Observable fromArray(T... items)

Integer array[] = {1, 2, 3, 4};
Observable.fromArray(array)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "--------------onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "--------------onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "--------------onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "--------------onComplete ");
    }
});
public static  Observable fromCallable(Callable supplier)

Observable.fromCallable(new Callable < Integer > () {

    @Override
    public Integer call() throws Exception {
        return 1;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "--------------accept " + integer);
    }
});

public static  Observable fromIterable(Iterable source)


List list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
list.add(3);
Observable.fromIterable(list)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "--------------onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "--------------onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "--------------onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "--------------onComplete ");
    }
});

empty() 直接发送 onComplete() 事件

Observable.empty()
.subscribe(new Observer < Object > () {

    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "---------------------onSubscribe");
    }

    @Override
    public void onNext(Object o) {
        Log.d(TAG, "---------------------onNext");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "---------------------onError " + e);
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete");
    }
});
转换操作符

map() map 可以将被观察者发送的数据类型转变成其他的类型,是一对一的转换

public final  Observable map(Function mapper)


//将 Integer 类型的数据转换成 String。
Observable.just(1, 2, 3)
.map(new Function < Integer, String > () {
    @Override
    public String apply(Integer integer) throws Exception {
        return  integer+"rxjava";
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.e(TAG, "----------------------onSubscribe");
    }

    @Override
    public void onNext(String s) {
        Log.e(TAG, "----------------------onNext " + s);
    }

    @Override
    public void onError(Throwable e) {
      Log.d(TAG, "---------------------onError " + e);
    }

    @Override
    public void onComplete() {
 Log.d(TAG, "---------------------onComplete" );
    }
});


flatMap() 这个方法可以将事件序列中的元素进行整合加工,返回一个新的被观察者。

public final  Observable flatMap(Function> mapper)

flatMap() 其实与 map() 类似,但是 flatMap() 返回的是一个 Observerable。现在用一个map()的例子和flatMap()的例子来对比说明 flatMap() 的用法。

需求:我们现在需要通过学校拿到院系列表,然后在每个院系中拿到学生的信息.
传统的实现方式有很多种,我就不举例了,直接使用Rxjava实现:

//学校
 class School{
        private String name;
        private List departments;

        public School(){}
        public School(String name, List departments) {
            this.name = name;
            this.departments = departments;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public List getDepartments() {
            return departments;
        }

        public void setDepartments(List departments) {
            this.departments = departments;
        }
    }
//院系
 class Department{
        private String name;
        private List students;

        public Department(){}
        public Department(String name, List students) {
            this.name = name;
            this.students = students;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public List getStudents() {
            return students;
        }

        public void setStudents(List students) {
            this.students = students;
        }
    }

//学生
class Student {
        private String name;
        private String school;

        public Student(){}
        public Student(String name, String school) {
            this.name = name;
            this.school = school;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getSchool() {
            return school;
        }

        public void setSchool(String school) {
            this.school = school;
        }
    }

使用map()方法实现:

 //使用map()实现方式
        Observable.fromIterable(departments)
                .map(new Function>() {
                    @Override
                    public List apply(Department department) throws Exception {
                        return department.getStudents();
                    }
                })
                .subscribe(new Observer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(List students) {
                       for (Student student : students){
                           Log.d("----------", student.getName()+student.getSchool() );
                        //如果还需要获取学生所有课程以及成绩
                       ......................
                       }
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
  //使用flatMap()实现
        Observable.fromIterable(departments)
                .flatMap(new Function>() {
                    @Override
                    public ObservableSource apply(Department department) throws Exception {
                        return Observable.fromIterable(department.getStudents());
                    }
                })
                .flatMap() //如果还需要获取学生所有课程以及成绩操作
                .subscribe(new Observer() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        
                    }

                    @Override
                    public void onNext(Student student) {
                       Log.d("---------",student.getName()+student.getSchool());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

以上代码中map()方法实现中,可以看到我们在onNext()方法中使用了for循环.如果代码逻辑在复杂一些,就可能需要嵌套for循环来实现,那就真的迷之缩进了,而使用flatMap()方法实现,只需要实现一个flatMap()转换一下就好了,随着代码逻辑增加,代码依然清晰,这就是flatMap()的强大之处,也是很多人喜欢使用Rxjava的原因所在.

concatMap()
concatMap() 和 flatMap() 基本上是一样的,只不过 concatMap() 转发出来的事件是有序的,而 flatMap() 是无序的。

public final  Observable concatMap(Function> mapper)
public final  Observable concatMap(Function> mapper, int prefetch)


  Observable.fromIterable(departments)
                .concatMap(new Function>() {
                    @Override
                    public ObservableSource apply(Department department) throws Exception {
                        return Observable.fromIterable(department.getStudents());
                    }
                })

功能操作符

delay() 延迟一段事件发送事件。

相当于handler的延迟发送事件

handler.sendEmptyMessageDelayed(0,2000);
public final Observable delay(long delay, TimeUnit unit)

Observable.just(1, 2, 3)
.delay(2, TimeUnit.SECONDS) //延迟两秒再发送事件
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d("------------onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d("------------"+integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {
        Log.d(TAG, "----------------onComplete");
    }
});

doOnSubscribe() Observable 每发送 onSubscribe() 之前都会回调这个方法。
此方法通常用来做准备工作,例如弹一个ProgressDialog提示用户, But,这里有一个小坑,特别拿出来说明一下:

前方有坑,请集中注意力

Observable.doOnSubscribe()方法是在subscribe() 调用后而且在事件发送前执行。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

 Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onComplete();
            }
        })
          .subscribeOn(Schedulers.io()) //在io执行上述操作
          .doOnSubscribe(new Consumer() {
              @Override
              public void accept(Disposable disposable) throws Exception {
                  dialog.show(); //显示dialog
              }
          })
          .subscribeOn(AndroidSchedulers.mainThread()) //在UI线程执行上述准备操作
          .observeOn(AndroidSchedulers.mainThread())//在UI线程执行下面操作
          .subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d("----","开始了");
            }

            @Override
            public void onNext(String s) {
                Log.d("----", s);
                dialog.dismiss();
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.d("----", "complete");
            }
        });

subscribeOn() 指定被观察者的线程,要注意的时,如果多次调用此方法,只有第一次有效。

public final Observable subscribeOn(Scheduler scheduler)

observeOn() 指定观察者的线程,每指定一次就会生效一次。

public final Observable observeOn(Scheduler scheduler)

Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新线程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 线程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定
以上就是Rxjava常用的一些操作符介绍和使用方法实例了

RxJava2 只看这一篇文章就够了这是玉刚说的一篇关于Rxjava常用API的介绍,基本囊括了Rxjava所用到的所有API,还有代码举例,也是强烈建议观看收藏

关于Rxjava系列二就到此结束啦,后面有时间我还会写写与retrofit2的结合使用,欢迎关注订阅!

欢迎关注作者darryrzhong,更多干货等你来拿哟.

请赏个小红心!因为你的鼓励是我写作的最大动力!
更多精彩文章请关注

个人博客:darryrzhong

掘金

简书

SegmentFault

慕课网手记

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/15012.html

相关文章

  • Android RxJava系列一: 基础常用详解

    摘要:行为模式和差不多,区别在于的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下比更有效率。这个使用的固定的线程池,大小为核数。指定所发生的线程,即被激活时所处的线程。或者叫做事件产生的线程。 前言 本篇主要介绍Rxjava在 Android 项目中的基础使用和常用方法,旨在给对 RxJava 感兴趣的人一些入门的指引.对Rxjava不熟悉的朋友可以去看我之前写的一...

    anquan 评论0 收藏0
  • Android框架 - 收藏集 - 掘金

    摘要:说好的分类集合来啦,采用标签云的方式来展示掘金阅读提示点击下文中返回到顶部有分类不合理的地方请提。反编译这个后发现其使用个优质的开源项目掘金是由的开源的一个库,用于构建可预期的和声明式的用户界面。 想不想通过一线互联网公司面试? - Android - 掘金国内一线互联网公司内部面试题库 以下面试题来自于百度、小米、乐视、美团、58、猎豹、360、新浪、搜狐内部题库 熟悉本文中列出的知...

    zengdongbao 评论0 收藏0
  • Android框架 - 收藏集 - 掘金

    摘要:说好的分类集合来啦,采用标签云的方式来展示掘金阅读提示点击下文中返回到顶部有分类不合理的地方请提。反编译这个后发现其使用个优质的开源项目掘金是由的开源的一个库,用于构建可预期的和声明式的用户界面。 想不想通过一线互联网公司面试? - Android - 掘金国内一线互联网公司内部面试题库 以下面试题来自于百度、小米、乐视、美团、58、猎豹、360、新浪、搜狐内部题库 熟悉本文中列出的知...

    awesome23 评论0 收藏0
  • Rx系列学习 - 收藏集 - 掘金

    摘要:最近开始转移到掘金。要解决的问题支持切换网络请求地址,例如实现掘金前言以前在项目中使用,来进行事件通知与订阅。因此精华翻译掘金本人学习完了后,将值得重点注意的变化进行了翻译和归纳,适合正在使用的同学快速了解如果错误欢迎批评指正。 收藏安卓开发中非常实用优秀的库! 有图有真相! - Android - 掘金本来是打算收藏工具类的,但转念一想,已经有这么多优秀的库了,就没必要再去重复造轮子...

    kidsamong 评论0 收藏0
  • 「码个蛋」2017年200篇精选干货集合

    摘要:让你收获满满码个蛋从年月日推送第篇文章一年过去了已累积推文近篇文章,本文为年度精选,共计篇,按照类别整理便于读者主题阅读。本篇文章是今年的最后一篇技术文章,为了让大家在家也能好好学习,特此花了几个小时整理了这些文章。 showImg(https://segmentfault.com/img/remote/1460000013241596); 让你收获满满! 码个蛋从2017年02月20...

    wangtdgoodluck 评论0 收藏0

发表评论

0条评论

gghyoo

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<