资讯专栏INFORMATION COLUMN

RxJS笔记

shinezejian / 2793人阅读

摘要:深入浅出读书笔记遗留问题的与对应的实际场景,以及在编码中的体现部分测试你对时间的感觉按住我一秒钟然后松手你的时间毫秒实现重置避免直接触发事件,例如在处点击然后在处实现获取间隔时间你超过了的用户的使用主要用来加载静态资源,所

RxJS
《深入浅出RxJS》读书笔记
遗留问题

Observable的HOT与COLD对应的实际场景,以及在编码中的体现

chapter1

html部分

  测试你对时间的感觉
  按住我一秒钟然后松手
  你的时间:毫秒
  

jquery实现

var time = new Date().getTime();
$("#hold-me")
    .mousedown(function(event) {
        time = new Date().getTime();
    })
    .mouseup(function(event) {
        if (time) {
            var elapse = new Date().getTime() - time;
            $("#hold-time").text(elapse);
            // 重置time 避免直接触发mouseup事件,例如在A处点击然后在B处up
            time = null;
        }
    });

RxJS实现

const holdMeButton = document.getElementById("hold-me");
            const mouseDown$ = Rx.Observable.fromEvent(holdMeButton,"mousedown");
            const mouseUp$ = Rx.Observable.fromEvent(holdMeButton,"mouseup");
            // 获取间隔时间
            const holdTime$ = mouseUp$.timestamp().withLatestFrom(mouseDown$.timestamp(),(mouseUpEvent,mouseDownEvent)=>{
                return mouseUpEvent.timestamp - mouseDownEvent.timestamp
            });

            holdTime$.subscribe(ms=>{
                document.getElementById("hold-time").innerText = ms;
            })

            holdTime$.flatMap(ms=>{
                return Rx.Observable.ajax("https://timing-sense-score-board.herokuapp.com/score/"+ms)
            })
            .map(e=>e.response)
            .subscribe(res=>{
                document.getElementById("rank").innerText = `你超过了${res.rank}% 的用户`
            })

chapter2 Koa2的使用
主要用来加载静态资源,所以使用到了 koa,koa-static
const path = require("path");
const koa = require("koa");
const serve = require("koa-static");
const app = new koa();

app.use(async function (ctx,next) {
    console.log("收到请求...")
    await next()
    console.log(`"${ctx.path}"请求 已处理...`)
})

app.use(serve(path.resolve(__dirname, "../src"))).listen(3001,function(err){
    if(err) throw err;
    console.log("程序启动成功")
});
ObservableObserver
Observable 可被观察的对象,Observer观察者,Observer通过subscribe来观察Observable对象

RxJS的数据流就是Observable对象:

观察者模式

迭代器模式

举个栗子
// 使用 deep-link方式引入函数
const Observable = require("rxjs").Observable;

/*
 * 定义Observable对象的行为,会产生数据,调用订阅者的next方法
 * 1. 此处的Observer与订阅者行为 theObserver并不是同一个对象,而是对theObserver的包装
 * 2. 如果observer.error被调用,之后的complete或者next就不会被调用啦,同理,complete被调用之后,也不会
 *    再调用next或者error
 * 3. 如果error或者complete一直未调用,则observer就一直在内存中等待被调用
*/
const onSubscribe = observer =>{
    observer.next(1);
    observer.error(2);
    observer.complete(3);
}
// 产生一个Observable对象
const source$ = new Observable(onSubscribe);
// 定义观察者的行为 消费Observable对象产生的数据
const theObserver = {
    next:item => console.log(item),
    error:item => console.error(item),
    complete:item => console.log("已完成"),
}
// 建立Observable与Observer的关系
source$.subscribe(theObserver)
退订subscribe
在订阅一段事件之后observer不再响应吐出的信息了,这时可以退订,但是Observeable还会一直产生数据
const Observable = require("rxjs").Observable;

const onSubscribe = observer =>{
    let n = 1;
    const handle = setInterval(()=>{
        console.log(`in onSubscribe ${n}`)
        // if(n>3){
        //     observer.complete()
        // }
        observer.next(n++);
    },1000)
    return {
        unsubscribe(){
            // clearInterval(handle)
        }
    }
}

const source$ = new Observable(onSubscribe);

const theObserver = {
    next:item => console.log(item)
}

let subscription = source$.subscribe(theObserver)

setTimeout(()=>{
      // 此处的unsubscribe也是封装过的
    subscription.unsubscribe()
},3500)

node中执行,会一直打印 in onSubscribe *,但是source$不会再响应

Chapter3 操作符基础
const Observable = require("rxjs/Observable").Observable;
const of = require("rxjs/observable/of").of;
const map = require("rxjs/operator/map").map;
// 新建一个操作符
// 此处this是外部变量,导致此operator不再是纯函数
Observable.prototype.double = function(){
    // return this::map(x=>x*2)
    return map.call(this,x=>x*2)
}

const source$ = of(1,3,4);
const result$ = source$.double();

result$.subscribe(value=>console.log(value))
lettable/pipeable操作符
解决需要使用call或者bind改变this的操作,这样是依赖外部环境的,不属于纯函数,也会丧失TS的类型检查优势

lettableObservable对象传递给下文,避免使用this

const Observable = require("rxjs/Observable").Observable;
require("rxjs/add/observable/of").of;
require("rxjs/add/operator/map").map;
require("rxjs/add/operator/let").let;

const source$ = Observable.of(1,2,3);
const double$ = obs$ => obs$.map(v=>v*2);
// 接受上文,传递到下文
const result$ = source$.let(double$);

result$.subscribe(console.log)

  不引入`map`补丁,开发**lettable**写法的操作符

// ES5实现
function map(project){

return function(obj$){
  // 通过上面的Observable生成一个新Observable
  return new Observable(observer=>{
    return obj$.subscribe({
      next:value=>observer.next(project(value)),
      error:err=>observer.error(err),
      complete:()=>observer.complete()
    })
  })
}

}
// 添加操作符
var result$ = source$.let(map(x => x * 3));

// ES6实现
const map6 = fn => obj$ =>

  new Observable(observer =>
      obj$.subscribe({
          next: value => observer.next(fn(value)),
          error: err => observer.error(err),
          complete: () => observer.complete()
      })
  );

// 添加操作符
var result$ = source$.let(map6(x => x * 4));

`pipeable`是`lettable`的别称,方便对于`lattable`的理解,V6以上才支持

## Chapter4 创建数据流

> 大多数的操作符是静态操作符

### 基础操作符

1.  `create`简单的返回一个Observable对象
    
Observable.create = function(subscribe){
  return new Observable(subscribe)
}
```

of列举数据

import {Observable} from "rxjs/Observable";
import "rxjs/add/observable/of"
// 依次吐出数据,一次性emit
const source$ = Observable.of(1,2,3);
// 订阅
// 第一个参数是next,第二个参数是error回调,第三个参数是complete回调
source$.subscribe(console.log,null,()=>{console.log("Complete")})

range产生指定范围的数据

const sourc$ = Observable.range(/*初始值*/1,/*个数*/100);
// 每次只能步进 1

generate循环创建

相当于for循环
const source$ = Observable.generate(
  // 初始值
  2,
  // 判断条件
  value=> value < 10,
  // 步进
  value=> value+0.5,
  // 函数体,产生的结果
  value=> value*value
)

使用generate代替range

const range = function(min,count){
  const max = min + count;
  return Observable.generate(min,v=>vv+1,v=>v*v)
}

repeat重复数据的数据流

实例操作符,通过import "rxjs/add/operator/repeat"引入

V4版本中repeat是静态属性,这样在使用Observable.repeat(1,2)重复1两次,这样数据就够灵活

V5版本中改为实例属性之后,Observable.of(1,2,4).repeat(2),将产生的1,2,3重复两次,功能更加强大

const Observable = require("rxjs").Observable;
require("rxjs/add/operator/repeat");

const source$ = Observable.create(observer => {
    setTimeout(() => {
        observer.next(1);
    }, 1000);
    setTimeout(() => {
        observer.next(2);
    }, 2000);
    setTimeout(() => {
        observer.next(3);
    }, 3000);
    setTimeout(() => {
        observer.complete(1);
    }, 4000);
    return {
        unsubscribe(){
            console.log("on Unsubscribe")
        }
    }
});

const repeat$ = source$.repeat(2)

repeat$.subscribe(console.log,null,()=>{console.log("Complete")})

// 1
// 2
// 3
// on Unsubscribe
// 1
// 2
// 3
// Complete
// on Unsubscribe

如果没有observer.complete()repeat不会被调用

repeat以complete为契机会再次执行数据源,如果上游一直没有complete下游就不会执行

因为repeat的存在,第一次数据源执行完(以complete为契机)后并不会执行observer的complete回调

empty,throw,never

创建异步数据的Observable对象

intervaltimer

interval类似于setInterval

require("rxjs/add/observable/interval")
// 每隔1000ms产生一个数据,初始值为0,步进为1
Observable.interval(1000)"

timer 是setTimeout的超集

// 1000ms后开始产生数据,之后每隔1000ms产生一个数据,功能相当于interval
Observable.timer(1000,1000)
// 指定日期
Observable.time(new Date(new Date().getTime() + 12000))

from 把一切转化为Observable

将所有的Iterable的对象都转化为Observable对象

可以将Promise对象转化为Observable对象,功能与fromPromise相同

fromPromise异步处理的对接

const Observable = require("rxjs").Observable;
require("rxjs/add/observable/fromPromise");

const promise = Promise.resolve(123);
Observable.fromPromise(promise).subscribe(console.log, null, () =>
    console.log("Complete")
);
//123
//Complete
const promise1 = Promise.reject("error");
Observable.from(
    console.log,
    err => console.log("catch", err),
    () => console.log("Complete!")
);
// 未捕获的Promise错误
// (node:765) UnhandledPromiseRejectionWarning: error
// (node:765) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing
// inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (
// rejection id: 1)
// (node:765) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

fromEvent连接DOM与RxJS的桥梁

const event$ = Observable.fromEvent(document.getElementById("btn"),"click");
event$.subscribe(event=>{
  // Do something
})

在NodeJs中可以与EventEmitter交互

const Observable = require("rxjs").Observable;
const EventEmitter = require("events");
require("rxjs/add/observable/fromEvent")

const emitter = new EventEmitter();

const source$ = Observable.fromEvent(emitter,"msg");
source$.subscribe(console.log,null,()=>console.log("Complete"))

emitter.emit("msg",1)
// 1
emitter.emit("msg","haha")
// haha
emitter.emit("a-msg","haha")
//
emitter.emit("msg","nihao")
// nihao

fromEventHot Observable,也就是数据的产生和订阅无关,对于fromEvent来说,数据源是外部产生的,不受RxJS控制,这是Hot Observable对象的特点

fromEventPattern针对不规范的事件源

规范的事件源:DOM事件,EventEmitter事件

ajax见最上面的例子

repeatWhen

例如 在上游事件结束之后的一段时间再重新订阅
const Observable = require("rxjs").Observable;
require("rxjs/add/operator/repeatWhen")

const notifier = ()=>{
    return Observable.interval(1000);
}

const source$ = Observable.of(1,2,3);
// const source$ = Observable.create(observer=>{
//     observer.next(111);
//     return {
//         unsubscribe(){
//             console.log("on Unsubscribe")
//         }
//     }
// });
const repeat$ = source$.repeatWhen(notifier);

repeat$.subscribe(console.log,null,()=>console.log("Complete"))
// 每隔一秒产生一次
// 1
// 2
// 3
// 1
// 2
// 3
// 1
// 2
// 3
// 1
// 2
// 3
// 1
// 2
// 3
// ^C

defer延迟创建Observable

针对Observable占用内存比较大的情况,懒加载
const Observable = require("rxjs").Observable;
require("rxjs/add/observable/defer");
require("rxjs/add/observable/of");

const observableFactory = ()=>Observable.of(1,2,3);
const source$ = Observable.defer(observableFactory)

合并数据流
功能需求 操作符
把多个数据流以首尾相连的方式合并 concat,concatAll
把多个数据流以先到先得的方式合并 merge,mergeAll
把多个数据流中的数据以一一对应的方式合并 zipzipAll
持续合并多个数据流中最新产生的数据 combineLatest,combineAll,withLatestFrom
从多个数据流中选取第一个产生内容的数据流 race
在数据流前面添加一个指定数据 startWith
只获取多个数据流最后产生的数据 forkJoin
高阶数据流中切换数据源 switch,exhaust

concat

实例方法

静态方法,如果两个数据没有先后关系,推荐使用此方法

实例方法

const Observable = require("rxjs").Observable;
require("rxjs/add/operator/of")
require("rxjs/add/operator/concat")

const source$1 = Observable.of(1,2,3);
const source$2 = Observable.of(4,5,6);

source$1.concat(source$2).subscribe(console.log,null,()=>console.log("Complete"))

静态方法

const Observable = require("rxjs").Observable;
require("rxjs/add/operator/of")
require("rxjs/add/observable/concat")

const source$1 = Observable.of(1,2,3);
const source$2 = Observable.of(4,5,6);

Observable
  .concat(source$1,source$2)
  .subscribe(console.log,null,()=>console.log("Complete"))

`concat`在将上一个数据源传递下去的时候会调用上一个`Observable`的`unsubscribe`,如果上一个`Observable`一直为完结,后续的都不会被调用

```javascript
const source$1 = Observable.internal(1000);
const source$2 = Observable.of(1);
const concated$ = Observable.concat(source$1,source$2);
// 此时 source$2永远不会被调用
```

在此推测:`rxjs/add/operator/*`下的属性都是实例属性,`rxjs/add/observable/*`下的属性都是实例属性

merge先到先得

merge用在同步数据的情况下和concat表现只,不建议使用
const Observable = require("rxjs").Observable;
require("rxjs/add/operator/merge");
require("rxjs/add/operator/map");
require("rxjs/add/observable/timer");

const source$1 = Observable.timer(0, 1000).map(x => x + "A");
const source$2 = Observable.timer(500, 1000).map(x => x + "B");
const source$3 = Observable.timer(1000, 1000).map(x => x + "C");

// 此时 source$1与source$2永远不会停止,所以
source$1
    .merge(source$2, source$3, /*此参数限制了合并的Observable的个数*/ 2)
    .subscribe(console.log, null, () => console.log("Complete"));

// 0A
// 0B
// 1A
// 1B
// 2A
// 2B
// 3A
// 3B
// 4A
// 4B
// ^C

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/96173.html

相关文章

  • 学习实践 - 收藏集 - 掘金

    摘要:官网地址聊天机器人插件开发实例教程一创建插件在系统技巧使你的更加专业前端掘金一个帮你提升技巧的收藏集。我会简单基于的简洁视频播放器组件前端掘金使用和实现购物车场景前端掘金本文是上篇文章的序章,一直想有机会再次实践下。 2道面试题:输入URL按回车&HTTP2 - 掘金通过几轮面试,我发现真正那种问答的技术面,写一堆项目真不如去刷技术文章作用大,因此刷了一段时间的博客和掘金,整理下曾经被...

    mikyou 评论0 收藏0
  • 【响应式编程的思维艺术】 (5)Angular中Rxjs的应用示例

    摘要:本文是响应式编程第四章构建完整的应用程序这篇文章的学习笔记。涉及的运算符每隔指定时间将流中的数据以数组形式推送出去。中提供了一种叫做异步管道的模板语法,可以直接在的微语法中使用可观测对象示例五一点建议一定要好好读官方文档。 本文是【Rxjs 响应式编程-第四章 构建完整的Web应用程序】这篇文章的学习笔记。示例代码托管在:http://www.github.com/dashnoword...

    shenhualong 评论0 收藏0
  • 原理解释 - 收藏集 - 掘金

    摘要:巧前端基础进阶全方位解读前端掘金我们在学习的过程中,由于对一些概念理解得不是很清楚,但是又想要通过一些方式把它记下来,于是就很容易草率的给这些概念定下一些方便自己记忆的有偏差的结论。 计算机程序的思维逻辑 (83) - 并发总结 - 掘金从65节到82节,我们用了18篇文章讨论并发,本节进行简要总结。 多线程开发有两个核心问题,一个是竞争,另一个是协作。竞争会出现线程安全问题,所以,本...

    AlphaGooo 评论0 收藏0
  • 原理解释 - 收藏集 - 掘金

    摘要:巧前端基础进阶全方位解读前端掘金我们在学习的过程中,由于对一些概念理解得不是很清楚,但是又想要通过一些方式把它记下来,于是就很容易草率的给这些概念定下一些方便自己记忆的有偏差的结论。 计算机程序的思维逻辑 (83) - 并发总结 - 掘金从65节到82节,我们用了18篇文章讨论并发,本节进行简要总结。 多线程开发有两个核心问题,一个是竞争,另一个是协作。竞争会出现线程安全问题,所以,本...

    forrest23 评论0 收藏0
  • Redux 进阶 - react 全家桶学习笔记(二)

    摘要:在函数式编程中,异步操作修改全局变量等与函数外部环境发生的交互叫做副作用通常认为这些操作是邪恶肮脏的,并且也是导致的源头。 注:这篇是17年1月的文章,搬运自本人 blog... https://github.com/BuptStEve/... 零、前言 在上一篇中介绍了 Redux 的各项基础 api。接着一步一步地介绍如何与 React 进行结合,并从引入过程中遇到的各个痛点引出 ...

    Godtoy 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<