资讯专栏INFORMATION COLUMN

RxJava系列番外篇:一个RxJava解决复杂业务逻辑的案例

EscapedDog / 1493人阅读

摘要:之前写过一系列的文章,也承诺过会尽快有的介绍。所以这次还是给大家分享一个使用解决问题的案例,希望对大家在使用的时候有一点点启发。上述这一套复杂的业务逻辑如果使用传统编码方式将是极其复杂的。

之前写过一系列RxJava1的文章,也承诺过会尽快有RxJava2的介绍。无奈实际项目中还未真正的使用RxJava2,不敢妄动笔墨。所以这次还是给大家分享一个使用RxJava1解决问题的案例,希望对大家在使用RxJava的时候有一点点启发。对RxJava还不了解的同学可以先去看看我之前的RxJava系列文章:

RxJava系列1(简介)

RxJava系列2(基本概念及使用介绍)

RxJava系列3(转换操作符)

RxJava系列4(过滤操作符)

RxJava系列5(组合操作符)

RxJava系列6(从微观角度解读RxJava源码)

RxJava系列7(最佳实践)

业务场景

拿MinimalistWeather这个开源的天气App来举例:

进入App首页后,首先我们需要从数据库中获取当前城市的天气数据,如果数据库中存在天气数据则在UI页面上展示天气数据;如果数据库中未存储当前城市的天气数据,或者已存储的天气数据的发布时间相比现在已经超过了一小时,并且网络属于连接状态则调用API从服务端获取天气数据。如果获取到到的天气数据发布时间和当前数据库中的天气数据发布时间一致则丢弃掉从服务端获取到的天气数据,如果不一致则更新数据库并且在页面上展示最新的天气信息。(同时天气数据源是可配置的,可选择是小米天气数据源还是Know天气数据源)

解决方案

首先我们需要创建一个从数据库获取天气数据的Observable observableForGetWeatherFromDB,同时我们也需要创建一个从API获取天气数据的Observable observableForGetWeatherFromNetWork;为了在无网络状态下免于创建observableForGetWeatherFromNetWork我们在这之前需要首先判断下网络状态。最后使用contact操作符将两个Observable合并,同时使用distincttakeUntil操作符来过滤筛选数据以符合业务需求,然后结合subscribeOnobserveOn做线程切换。上述这一套复杂的业务逻辑如果使用传统编码方式将是极其复杂的。下面我们来看看使用RxJava如何清晰简洁的来实现这个复杂的业务:

</>复制代码

  1. Observable observableForGetWeatherData;
  2. //首先创建一个从数据库获取天气数据的Observable
  3. Observable observableForGetWeatherFromDB = Observable.create(new Observable.OnSubscribe() {
  4. @Override
  5. public void call(Subscriber subscriber) {
  6. try {
  7. Weather weather = weatherDao.queryWeather(cityId);
  8. subscriber.onNext(weather);
  9. subscriber.onCompleted();
  10. } catch (SQLException e) {
  11. throw Exceptions.propagate(e);
  12. }
  13. }
  14. });
  15. if (!NetworkUtils.isNetworkConnected(context)) {
  16. observableForGetWeatherData = observableForGetWeatherFromDB;
  17. } else {
  18. //接着创建一个从网络获取天气数据的Observable
  19. Observable observableForGetWeatherFromNetWork = null;
  20. switch (configuration.getDataSourceType()) {
  21. case ApiConstants.WEATHER_DATA_SOURCE_TYPE_KNOW:
  22. observableForGetWeatherFromNetWork = ApiClient.weatherService.getKnowWeather(cityId)
  23. .map(new Func1() {
  24. @Override
  25. public Weather call(KnowWeather knowWeather) {
  26. return new KnowWeatherAdapter(knowWeather).getWeather();
  27. }
  28. });
  29. break;
  30. case ApiConstants.WEATHER_DATA_SOURCE_TYPE_MI:
  31. observableForGetWeatherFromNetWork = ApiClient.weatherService.getMiWeather(cityId)
  32. .map(new Func1() {
  33. @Override
  34. public Weather call(MiWeather miWeather) {
  35. return new MiWeatherAdapter(miWeather).getWeather();
  36. }
  37. });
  38. break;
  39. }
  40. assert observableForGetWeatherFromNetWork != null;
  41. observableForGetWeatherFromNetWork = observableForGetWeatherFromNetWork
  42. .doOnNext(new Action1() {
  43. @Override
  44. public void call(Weather weather) {
  45. Schedulers.io().createWorker().schedule(() -> {
  46. try {
  47. weatherDao.insertOrUpdateWeather(weather);
  48. } catch (SQLException e) {
  49. throw Exceptions.propagate(e);
  50. }
  51. });
  52. }
  53. });
  54. //使用concat操作符将两个Observable合并
  55. observableForGetWeatherData = Observable.concat(observableForGetWeatherFromDB, observableForGetWeatherFromNetWork)
  56. .filter(new Func1() {
  57. @Override
  58. public Boolean call(Weather weather) {
  59. return weather != null && !TextUtils.isEmpty(weather.getCityId());
  60. }
  61. })
  62. .distinct(new Func1() {
  63. @Override
  64. public Long call(Weather weather) {
  65. return weather.getRealTime().getTime();//如果天气数据发布时间一致,我们再认为是相同的数据从丢弃掉
  66. }
  67. })
  68. .takeUntil(new Func1() {
  69. @Override
  70. public Boolean call(Weather weather) {
  71. return System.currentTimeMillis() - weather.getRealTime().getTime() <= 60 * 60 * 1000;//如果天气数据发布的时间和当前时间差在一小时以内则终止事件流
  72. }
  73. });
  74. }
  75. observableForGetWeatherData.subscribeOn(Schedulers.io())
  76. .observeOn(AndroidSchedulers.mainThread())
  77. .subscribe(new Action1() {
  78. @Override
  79. public void call(Weather weather) {
  80. displayWeatherInformation();
  81. }
  82. }, new Action1() {
  83. @Override
  84. public void call(Throwable throwable) {
  85. Toast.makeText(context, throwable.getMessage(), Toast.LENGTH_LONG).show();
  86. }
  87. });

上面的代码看起来比较复杂,我们采用Lambda表达式简化下代码:

</>复制代码

  1. Observable observableForGetWeatherData;
  2. //首先创建一个从数据库获取天气数据的Observable
  3. Observable observableForGetWeatherFromDB = Observable.create(new Observable.OnSubscribe() {
  4. @Override
  5. public void call(Subscriber subscriber) {
  6. try {
  7. Weather weather = weatherDao.queryWeather(cityId);
  8. subscriber.onNext(weather);
  9. subscriber.onCompleted();
  10. } catch (SQLException e) {
  11. throw Exceptions.propagate(e);
  12. }
  13. }
  14. });
  15. if (!NetworkUtils.isNetworkConnected(context)) {
  16. observableForGetWeatherData = observableForGetWeatherFromDB;
  17. } else {
  18. //接着创建一个从网络获取天气数据的Observable
  19. Observable observableForGetWeatherFromNetWork = null;
  20. switch (configuration.getDataSourceType()) {
  21. case ApiConstants.WEATHER_DATA_SOURCE_TYPE_KNOW:
  22. observableForGetWeatherFromNetWork = ApiClient.weatherService.getKnowWeather(cityId)
  23. .map(knowWeather -> new KnowWeatherAdapter(knowWeather).getWeather());
  24. break;
  25. case ApiConstants.WEATHER_DATA_SOURCE_TYPE_MI:
  26. observableForGetWeatherFromNetWork = ApiClient.weatherService.getMiWeather(cityId)
  27. .map(miWeather -> new MiWeatherAdapter(miWeather).getWeather());
  28. break;
  29. }
  30. assert observableForGetWeatherFromNetWork != null;
  31. observableForGetWeatherFromNetWork = observableForGetWeatherFromNetWork
  32. .doOnNext(weather -> Schedulers.io().createWorker().schedule(() -> {
  33. try {
  34. weatherDao.insertOrUpdateWeather(weather);
  35. } catch (SQLException e) {
  36. throw Exceptions.propagate(e);
  37. }
  38. }));
  39. //使用concat操作符将两个Observable合并
  40. observableForGetWeatherData = Observable.concat(observableForGetWeatherFromDB, observableForGetWeatherFromNetWork)
  41. .filter(weather -> weather != null && !TextUtils.isEmpty(weather.getCityId()))
  42. .distinct(weather -> weather.getRealTime().getTime())//如果天气数据发布时间一致,我们再认为是相同的数据从丢弃掉
  43. .takeUntil(weather -> System.currentTimeMillis() - weather.getRealTime().getTime() <= 60 * 60 * 1000);//如果天气数据发布的时间和当前时间差在一小时以内则终止事件流
  44. }
  45. observableForGetWeatherData.subscribeOn(Schedulers.io())
  46. .observeOn(AndroidSchedulers.mainThread())
  47. .subscribe(weather -> displayWeatherInformation(),
  48. throwable -> Toast.makeText(context, throwable.getMessage(), Toast.LENGTH_LONG).show());
小技巧

在上述的实现中有几点是我们需要注意的:

为什么我需要在判断网络那块整个if else?这样看起来很不优雅,我们通过RxJava符完全可以实现同样的操作啊!之所以这样做是为了在无网络状况下去创建不必要的Observable observableForGetWeatherFromNetWork;

更新数据库的操作不应该阻塞更新UI,因此我们在observableForGetWeatherFromNetWorkdoOnNext中需要通过Schedulers.io().createWorker()去另起一条线程,以此保证更新数据库不会阻塞更新UI的操作。

</>复制代码

  1. 有同学可能会问为什么不在doOnNext之后再调用一次observeOn把更新数据库的操作切换到一条新的子线程去操作呢?其实一开始我也是这样做的,后来想想不对。整个Observable的事件传递处理就像是在一条流水线上完成的,虽然我们可以通过observeOn来指定子线程去处理更新数据库的操作,但是只有等这条子线程完成了更新数据库的任务后事件才会继续往后传递,这样就阻塞了更新UI的操作。对此有疑问的同学可以去看看我之前关于RxJava源码分析的文章或者自己动手debug看看。

问题

最后给大家留个两个问题:

上述代码是最佳实现方案吗?还有什么更加合理的做法?

我们在observableForGetWeatherData中使用distincttakeUntil过滤筛选天气数据的时候网络请求会不会已经发出去了?这样做还有意义吗?

欢迎大家留言讨论。

</>复制代码

  1. 本文中的代码在MinimalistWeather中的WeatherDataRepository类中有同样的实现,文章中为了更完整的将整个实现过程呈现出来,对代码做了部分改动。

  2. 如果大家喜欢这一系列的文章,欢迎关注我的知乎专栏、Github以及简书。

  3. 知乎专栏:https://zhuanlan.zhihu.com/baron

  4. GitHub:https://github.com/BaronZ88

  5. 简书:http://www.jianshu.com/users/cfdc52ea3399

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/66436.html

相关文章

  • RxJava系列一(简介)

    摘要:响应式编程在介绍前,我们先聊聊响应式编程。响应式编程的一个关键概念是事件。今天,响应式编程最通用的一个场景是我们的移动必须做出对网络调用用户触摸输入和系统弹框的响应。并于年二月份正式向外展示了。 转载请注明出处:https://zhuanlan.zhihu.com/p/20687178 RxJava系列1(简介) RxJava系列2(基本概念及使用介绍) RxJava系列3(转换操作...

    Gu_Yan 评论0 收藏0

发表评论

0条评论

EscapedDog

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<