RxJS笔记

RxJS

《深入浅出RxJS》读书笔记

遗留问题

  1. Observable的HOT与COLD对应的现实场景,以及在编码中的表现

chapter1

html部份


  测试你对时刻的觉得
  按住我一秒钟然后放手
  你的时刻:毫秒
  
  1. jquery完成

    var time = new Date().getTime();
    $("#hold-me")
        .mousedown(function(event) {
            time = new Date().getTime();
        })
        .mouseup(function(event) {
            if (time) {
                var elapse = new Date().getTime() - time;
                $("#hold-time").text(elapse);
                // 重置time 防止直接触发mouseup事宜,比方在A处点击然后在B处up
                time = null;
            }
        });
  2. RxJS完成

    const holdMeButton = document.getElementById("hold-me");
                const mouseDown$ = Rx.Observable.fromEvent(holdMeButton,"mousedown");
                const mouseUp$ = Rx.Observable.fromEvent(holdMeButton,"mouseup");
                // 猎取间隔时刻
                const holdTime$ = mouseUp$.timestamp().withLatestFrom(mouseDown$.timestamp(),(mouseUpEvent,mouseDownEvent)=>{
                    return mouseUpEvent.timestamp - mouseDownEvent.timestamp
                });
    
                holdTime$.subscribe(ms=>{
                    document.getElementById("hold-time").innerText = ms;
                })
    
                holdTime$.flatMap(ms=>{
                    return Rx.Observable.ajax('https://timing-sense-score-board.herokuapp.com/score/'+ms)
                })
                .map(e=>e.response)
                .subscribe(res=>{
                    document.getElementById("rank").innerText = `你超过了${res.rank}% 的用户`
                })

chapter2

Koa2的运用

重要用来加载静态资本,所以运用到了
koa,
koa-static

const path = require("path");
const koa = require("koa");
const serve = require("koa-static");
const app = new koa();

app.use(async function (ctx,next) {
    console.log("收到要求...")
    await next()
    console.log(`"${ctx.path}"要求 已处置惩罚...`)
})

app.use(serve(path.resolve(__dirname, "../src"))).listen(3001,function(err){
    if(err) throw err;
    console.log("顺序启动胜利")
});

ObservableObserver

Observable 可被视察的对象,Observer视察者,Observer经由过程subscribe来视察Observable对象

RxJS的数据流就是Observable对象:

  1. 视察者形式
  2. 迭代器形式

举个栗子

// 运用 deep-link体式格局引入函数
const Observable = require("rxjs").Observable;

/*
 * 定义Observable对象的行动,会发生数据,挪用定阅者的next要领
 * 1. 此处的Observer与定阅者行动 theObserver并非同一个对象,而是对theObserver的包装
 * 2. 假如observer.error被挪用,以后的complete或许next就不会被挪用啦,同理,complete被挪用以后,也不会
 *    再挪用next或许error
 * 3. 假如error或许complete一向未挪用,则observer就一向在内存中守候被挪用
*/
const onSubscribe = observer =>{
    observer.next(1);
    observer.error(2);
    observer.complete(3);
}
// 发生一个Observable对象
const source$ = new Observable(onSubscribe);
// 定义视察者的行动 消耗Observable对象发生的数据
const theObserver = {
    next:item => console.log(item),
    error:item => console.error(item),
    complete:item => console.log("已完成"),
}
// 竖立Observable与Observer的关联
source$.subscribe(theObserver)

退订subscribe

在定阅一段事宜以后observer不再相应吐出的信息了,这时刻能够退订,然则Observeable还会一向发生数据

const Observable = require("rxjs").Observable;

const onSubscribe = observer =>{
    let n = 1;
    const handle = setInterval(()=>{
        console.log(`in onSubscribe ${n}`)
        // if(n>3){
        //     observer.complete()
        // }
        observer.next(n++);
    },1000)
    return {
        unsubscribe(){
            // clearInterval(handle)
        }
    }
}

const source$ = new Observable(onSubscribe);

const theObserver = {
    next:item => console.log(item)
}

let subscription = source$.subscribe(theObserver)

setTimeout(()=>{
      // 此处的unsubscribe也是封装过的
    subscription.unsubscribe()
},3500)

node中实行,会一向打印 in onSubscribe *,然则source$不会再相应

Chapter3 操纵符基本

const Observable = require("rxjs/Observable").Observable;
const of = require("rxjs/observable/of").of;
const map = require("rxjs/operator/map").map;
// 新建一个操纵符
// 此处this是外部变量,致使此operator不再是纯函数
Observable.prototype.double = function(){
    // return this::map(x=>x*2)
    return map.call(this,x=>x*2)
}

const source$ = of(1,3,4);
const result$ = source$.double();

result$.subscribe(value=>console.log(value))

lettable/pipeable操纵符

处理须要运用call或许bind转变this的操纵,如许是依靠外部环境的,不属于纯函数,也会损失TS的范例搜检上风

  • lettableObservable对象通报给下文,防止运用this

    const Observable = require("rxjs/Observable").Observable;
    require("rxjs/add/observable/of").of;
    require("rxjs/add/operator/map").map;
    require("rxjs/add/operator/let").let;
    
    const source$ = Observable.of(1,2,3);
    const double$ = obs$ => obs$.map(v=>v*2);
    // 接收上文,通报到下文
    const result$ = source$.let(double$);
    
    result$.subscribe(console.log)
    

  不引入`map`补丁,开辟**lettable**写法的操纵符

// ES5完成
function map(project){

return function(obj$){
  // 经由过程上面的Observable天生一个新Observable
  return new Observable(observer=>{
    return obj$.subscribe({
      next:value=>observer.next(project(value)),
      error:err=>observer.error(err),
      complete:()=>observer.complete()
    })
  })
}

}
// 增加操纵符
var result$ = source$.let(map(x => x * 3));

// ES6完成
const map6 = fn => obj$ =>

  new Observable(observer =>
      obj$.subscribe({
          next: value => observer.next(fn(value)),
          error: err => observer.error(err),
          complete: () => observer.complete()
      })
  );

// 增加操纵符
var result$ = source$.let(map6(x => x * 4));


`pipeable`是`lettable`的别称,轻易关于`lattable`的明白,V6以上才支撑

## Chapter4 建立数据流

> 大多数的操纵符是静态操纵符

### 基本操纵符

1.  `create`简朴的返回一个Observable对象
    
Observable.create = function(subscribe){
  return new Observable(subscribe)
}
```
  1. of枚举数据

    import {Observable} from "rxjs/Observable";
    import "rxjs/add/observable/of"
    // 顺次吐出数据,一次性emit
    const source$ = Observable.of(1,2,3);
    // 定阅
    // 第一个参数是next,第二个参数是error回调,第三个参数是complete回调
    source$.subscribe(console.log,null,()=>{console.log("Complete")})
  2. range发生指定局限的数据

    const sourc$ = Observable.range(/*初始值*/1,/*个数*/100);
    // 每次只能步进 1
  3. generate轮回建立

    相当于for轮回

    const source$ = Observable.generate(
      // 初始值
      2,
      // 推断前提
      value=> value < 10,
      // 步进
      value=> value+0.5,
      // 函数体,发生的效果
      value=> value*value
    )

    运用generate替代range

    const range = function(min,count){
      const max = min + count;
      return Observable.generate(min,v=>vv+1,v=>v*v)
    }
  4. repeat反复数据的数据流

    实例操纵符,经由过程import 'rxjs/add/operator/repeat'引入

    1. V4版本中repeat是静态属性,如许在运用Observable.repeat(1,2)反复1两次,如许数据就够天真
    2. V5版本中改成实例属性以后,Observable.of(1,2,4).repeat(2),将发生的1,2,3反复两次,功用越发壮大
    const Observable = require("rxjs").Observable;
    require("rxjs/add/operator/repeat");
    
    const source$ = Observable.create(observer => {
        setTimeout(() => {
            observer.next(1);
        }, 1000);
        setTimeout(() => {
            observer.next(2);
        }, 2000);
        setTimeout(() => {
            observer.next(3);
        }, 3000);
        setTimeout(() => {
            observer.complete(1);
        }, 4000);
        return {
            unsubscribe(){
                console.log("on Unsubscribe")
            }
        }
    });
    
    const repeat$ = source$.repeat(2)
    
    repeat$.subscribe(console.log,null,()=>{console.log("Complete")})
    
    // 1
    // 2
    // 3
    // on Unsubscribe
    // 1
    // 2
    // 3
    // Complete
    // on Unsubscribe
    • 假如没有observer.complete()repeat不会被挪用

      repeat以complete为契时机再次实行数据源,假如上游一向没有complete下流就不会实行

    • 由于repeat的存在,第一次数据源实行完(以complete为契机)后并不会实行observer的complete回调
  5. empty,throw,never

建立异步数据的Observable对象

  1. intervaltimer

    interval类似于setInterval

    require('rxjs/add/observable/interval')
    // 每隔1000ms发生一个数据,初始值为0,步进为1
    Observable.interval(1000)'

    timer 是setTimeout的超集

    // 1000ms后最先发生数据,以后每隔1000ms发生一个数据,功用相当于interval
    Observable.timer(1000,1000)
    // 指定日期
    Observable.time(new Date(new Date().getTime() + 12000))
  2. from 把统统转化为Observable

    1. 将一切的Iterable的对象都转化为Observable对象
    2. 能够将Promise对象转化为Observable对象,功用与fromPromise雷同
  3. fromPromise异步处置惩罚的对接

    const Observable = require("rxjs").Observable;
    require("rxjs/add/observable/fromPromise");
    
    const promise = Promise.resolve(123);
    Observable.fromPromise(promise).subscribe(console.log, null, () =>
        console.log("Complete")
    );
    //123
    //Complete
    const promise1 = Promise.reject("error");
    Observable.from(
        console.log,
        err => console.log("catch", err),
        () => console.log("Complete!")
    );
    // 未捕捉的Promise毛病
    // (node:765) UnhandledPromiseRejectionWarning: error
    // (node:765) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing
    // inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (
    // rejection id: 1)
    // (node:765) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
  4. fromEvent衔接DOM与RxJS的桥梁

    const event$ = Observable.fromEvent(document.getElementById("btn"),"click");
    event$.subscribe(event=>{
      // Do something
    })

    在NodeJs中能够与EventEmitter交互

    const Observable = require("rxjs").Observable;
    const EventEmitter = require("events");
    require("rxjs/add/observable/fromEvent")
    
    const emitter = new EventEmitter();
    
    const source$ = Observable.fromEvent(emitter,"msg");
    source$.subscribe(console.log,null,()=>console.log("Complete"))
    
    emitter.emit("msg",1)
    // 1
    emitter.emit("msg","haha")
    // haha
    emitter.emit("a-msg","haha")
    //
    emitter.emit("msg",'nihao')
    // nihao

    fromEventHot Observable,也就是数据的发生和定阅无关,关于fromEvent来讲,数据源是外部发生的,不受RxJS掌握,这是Hot Observable对象的特性

  5. fromEventPattern针对不范例的事宜源

    范例的事宜源:DOM事宜,EventEmitter事宜

  6. ajax见最上面的例子
  7. repeatWhen

    比方 在上游事宜完毕以后的一段时刻再从新定阅

    const Observable = require("rxjs").Observable;
    require("rxjs/add/operator/repeatWhen")
    
    const notifier = ()=>{
        return Observable.interval(1000);
    }
    
    const source$ = Observable.of(1,2,3);
    // const source$ = Observable.create(observer=>{
    //     observer.next(111);
    //     return {
    //         unsubscribe(){
    //             console.log("on Unsubscribe")
    //         }
    //     }
    // });
    const repeat$ = source$.repeatWhen(notifier);
    
    repeat$.subscribe(console.log,null,()=>console.log("Complete"))
    // 每隔一秒发生一次
    // 1
    // 2
    // 3
    // 1
    // 2
    // 3
    // 1
    // 2
    // 3
    // 1
    // 2
    // 3
    // 1
    // 2
    // 3
    // ^C
  8. defer耽误建立Observable

    针对Observable占用内存比较大的状况,懒加载

    const Observable = require("rxjs").Observable;
    require("rxjs/add/observable/defer");
    require("rxjs/add/observable/of");
    
    const observableFactory = ()=>Observable.of(1,2,3);
    const source$ = Observable.defer(observableFactory)

兼并数据流

功用需求 操纵符
把多个数据流以首尾相连的体式格局兼并 concat,concatAll
把多个数据流以先到先得的体式格局兼并 merge,mergeAll
把多个数据流中的数据以一一对应的体式格局兼并 zipzipAll
延续兼并多个数据流中最新发生的数据 combineLatest,combineAll,withLatestFrom
从多个数据流中拔取第一个发生内容的数据流 race
在数据流前面增加一个指定数据 startWith
只猎取多个数据流末了发生的数据 forkJoin
高阶数据流中切换数据源 switch,exhaust
  1. concat

    1. 实例要领
    2. 静态要领,假如两个数据没有前后关联,引荐运用此要领
    • 实例要领

      const Observable = require("rxjs").Observable;
      require("rxjs/add/operator/of")
      require("rxjs/add/operator/concat")
      
      const source$1 = Observable.of(1,2,3);
      const source$2 = Observable.of(4,5,6);
      
      source$1.concat(source$2).subscribe(console.log,null,()=>console.log("Complete"))
    • 静态要领

      const Observable = require("rxjs").Observable;
      require("rxjs/add/operator/of")
      require("rxjs/add/observable/concat")
      
      const source$1 = Observable.of(1,2,3);
      const source$2 = Observable.of(4,5,6);
      
      Observable
        .concat(source$1,source$2)
        .subscribe(console.log,null,()=>console.log("Complete"))
`concat`在将上一个数据源通报下去的时刻会挪用上一个`Observable`的`unsubscribe`,假如上一个`Observable`一向为结束,后续的都不会被挪用

```javascript
const source$1 = Observable.internal(1000);
const source$2 = Observable.of(1);
const concated$ = Observable.concat(source$1,source$2);
// 此时 source$2永久不会被挪用
```

在此推想:`rxjs/add/operator/*`下的属性都是实例属性,`rxjs/add/observable/*`下的属性都是实例属性
  1. merge先到先得

    merge用在同步数据的状况下和concat表现只,不发起运用

    const Observable = require("rxjs").Observable;
    require("rxjs/add/operator/merge");
    require("rxjs/add/operator/map");
    require("rxjs/add/observable/timer");
    
    const source$1 = Observable.timer(0, 1000).map(x => x + "A");
    const source$2 = Observable.timer(500, 1000).map(x => x + "B");
    const source$3 = Observable.timer(1000, 1000).map(x => x + "C");
    
    // 此时 source$1与source$2永久不会住手,所以
    source$1
        .merge(source$2, source$3, /*此参数限定了兼并的Observable的个数*/ 2)
        .subscribe(console.log, null, () => console.log("Complete"));
    
    // 0A
    // 0B
    // 1A
    // 1B
    // 2A
    // 2B
    // 3A
    // 3B
    // 4A
    // 4B
    // ^C
    
    原文作者:Feliks
    原文地址: https://segmentfault.com/a/1190000015654736
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞