RxJS基础教程

RxJS是一个基于可视察数据流在异步编程运用中的库。

ReactiveX is a combination of the best ideas from

the
Observer pattern, the
Iterator pattern, and
functional programming

正如官网所说,RxJS是基于视察者形式,迭代器形式和函数式编程。因而,起首要对这几个形式有所明白

视察者形式

window.addEventListener('click', function(){
  console.log('click!');
})

JS的事宜监听就是天生的视察者形式。给window的click事宜(被视察者)绑定了一个listener(视察者),当事宜发作,回调函数就会被触发

<!– more –>

迭代器形式

迭代器形式,供应一种要领递次接见一个聚合对象中的种种元素,而又不暴露该对象的内部示意。

ES6里的Iterator即可完成:

let arr = ['a', 'b', 'c'];
let iter = arr[Symbol.iterator]();

iter.next() // { value: 'a', done: false }
iter.next() // { value: 'b', done: false }
iter.next() // { value: 'c', done: false }
iter.next() // { value: undefined, done: true }

重复挪用迭代对象的next要领,即可递次接见

函数式编程

提到函数式编程,就要提到声明式编程和敕令式编程
函数式编程是声明式编程的表现

题目:将数组[1, 2, 3]的每一个元素乘以2,然后盘算总和。

敕令式编程

const arr = [1, 2, 3];
let total = 0;

for(let i = 0; i < arr.length; i++) {
  total += arr[i] * 2;
}

声明式编程

const arr = [1, 2, 3];
let total = arr.map(x => x * 2).reduce((total, value) => total + value)

声明式的特点是专注于形貌结果自身,不关注究竟怎样抵达结果。而敕令式就是真正完成结果的步骤

声明式编程把原始数据经由一系列转换(map, reduce),末了获得想要的数据

如今前端盛行的MVC框架(Vue,React,Angular),也都是首倡:编写UI组织时运用声明式编程,在编写营业逻辑时运用敕令式编程

RxJS

RxJS里有两个主要的观点须要我们明白:
Observable (可视察对象)
Observer (视察者)

var btn = document.getElementById('btn');
var handler = function() {
  console.log('click');
}
btn.addEventListener('click', handler)

上面这个例子里:
btn这个DOM元素的click事宜就是一个Observable
handler这个函数就是一个Observer,当btn的click事宜被触发,就会挪用该函数

改用RxJS编写;

Rx.Observable.fromEvent(btn, 'click')
.subscribe(() => console.log('click'));

fromEvent把一个event转成了一个Observable,然后它就能够被定阅subscribe

流stream

Observable实在就是数据流stream
是在时刻流逝的历程当中发生的一系列事宜。它具有时刻与事宜响应的观点。

我们能够把统统输入都当作数据流来处置惩罚,比方说:

  • 用户操纵
  • 收集响应
  • 定时器
  • Worker

发生新流

当发生了一个流后,我们能够经由历程操纵符(Operator)对这个流举行一系列加工操纵,然后发生一个新的流

Rx.Observable.fromEvent(window, 'click')
  .map(e => 1)
  .scan((total, now) => total + now)
  .subscribe(value => {
    console.log(value)
  })

map把流转换成了一个每次发生1的新流,然后scan相似reduce,也会发生一个新流,末了这个流被定阅。终究完成了:每次点击累加1的结果

能够用一个结果图来示意该历程:
《RxJS基础教程》

兼并流

也能够对若干个数据流举行组合:

例子:我们要完成下面这个结果:

《RxJS基础教程》

Rx.Observable.fromEvent(document.querySelector('input[name=plus]'), 'click')
  .mapTo(1)
  .merge(
    Rx.Observable.fromEvent(document.querySelector('input[name=minus]'), 'click')
      .mapTo(-1)
  )
  .scan((total, now) => total + now)
  .subscribe(value => {
    document.querySelector('#counter').innerText = value;
  })

merge能够把两个数据流全部在一起,结果能够参考以下:

《RxJS基础教程》

适才谁人例子的数据流以下:

《RxJS基础教程》

以RxJS的写法,就是把按下加1当做一个数据流,把按下减1当做一个数据流,再经由历程merge把两个数据流兼并,末了经由历程scan操纵符,把新流上的数据累加,这就是我们想要的计数器结果

扁平化流

有时刻,我们的Observable送出的是一个新的Observable:

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.of(1, 2, 3));
source.subscribe(value => {
  console.log(value)
});

这里,console打印出来的是对象,而不是我们想要的1,2,3,这是由于map返回的Rx.Observable.of(1, 2, 3)自身也是个Observable

用图示意以下:

click  : ------c------------c--------

        map(e => Rx.Observable.of(1,2,3))

source : ------o------------o--------
                \            \
                 (123)|       (123)|

因而,我们定阅到的value值就是一个Observable对象,而不是一般数据1,2,3

我想要的实在不是Observable自身,而是属于这个Observable内里的那些东西,如今这个情况就是Observable内里又有Observable,有两层,然则我想要让它变成一层就好,该怎样办呢?

这就须要把Observable扁平化

const arr = [1, [2, 3], 4];

// 扁平化后:
const flatArr = [1, 2, 3, 4];

concatAll这个操纵符就能够把Observable扁平化

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.of(1, 2, 3));
var example = source.concatAll();

example.subscribe(value => {
  console.log(value)
})
click  : ------c------------c--------

        map(e => Rx.Observable.of(1,2,3))

source : ------o------------o--------
                \            \
                 (123)|       (123)|

                   concatAll()

example: ------(123)--------(123)------------

flatMap操纵符也能够完成一样的作用,就是写法有些差别:

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.flatMap(e => Rx.Observable.of(1, 2, 3));
source.subscribe(value => {
  console.log(value)
})
click  : ------c------------c--------

        flatMap(e => Rx.Observable.of(1,2,3))

source: ------(123)--------(123)------------

简朴拖拽实例

学完前面几个操纵符,我们就能够写一个简朴的实例了

拖拽的道理是:

  • 监听拖拽元素的mousedown
  • 监听body的mousemove
  • 监听body的mouseup
<style type="text/css">
html, body {
  height: 100%;
  background-color: tomato;
  position: relative;
}

#drag {
  position: absolute;
  display: inline-block;
  width: 100px;
  height: 100px;
  background-color: #fff;
  cursor: all-scroll;
}
</style>
<div id="drag"></div>
const mouseDown = Rx.Observable.fromEvent(dragDOM, 'mousedown');
const mouseUp = Rx.Observable.fromEvent(body, 'mouseup');
const mouseMove = Rx.Observable.fromEvent(body, 'mousemove');

起首给出3个Observable,离别代表3种事宜,我们愿望mousedown的时刻监听mousemove,然后mouseup时住手监听,因而RxJS能够这么写:

const source = mouseDown
.map(event => mouseMove.takeUntil(mouseUp))

takeUntil操纵符能够在某个前提符应时,发送complete事宜

source: -------e--------------e-----
                \              \
                  --m-m-m-m|     -m--m-m--m-m|

从图上能够看出,我们还须要把source扁平化,才猎取所需数据。

完全代码:

const dragDOM = document.getElementById('drag');
const body = document.body;

const mouseDown = Rx.Observable.fromEvent(dragDOM, 'mousedown');
const mouseUp = Rx.Observable.fromEvent(body, 'mouseup');
const mouseMove = Rx.Observable.fromEvent(body, 'mousemove');

mouseDown
    .flatMap(event => mouseMove.takeUntil(mouseUp))
    .map(event => ({ x: event.clientX, y: event.clientY }))
    .subscribe(pos => {
        dragDOM.style.left = pos.x + 'px';
        dragDOM.style.top = pos.y + 'px';
    })

Observable Observer

前面的例子,我们都在议论fromEvent转换的Observable,实在另有很多种要领发生一个Observable,个中create也是一种罕见的要领,能够用来竖立自定义的Observable

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

console.log('just before subscribe');
observable.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});
console.log('just after subscribe');

掌握台实行的结果:

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

Observable 实行能够通报三种范例的值:

“Next” 关照: 发送一个值,比方数字、字符串、对象,等等。
“Error” 关照: 发送一个 JavaScript 毛病 或 异常。
“Complete” 关照: 不再发送任何值。
“Next” 关照是最主要,也是最罕见的范例:它们示意通报给视察者的现实数据。”Error” 和 “Complete” 关照能够只会在 Observable 实行时期发作一次,而且只会实行个中的一个。

var observable = Rx.Observable.create(function subscribe(observer) {
  try {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
  } catch (err) {
    observer.error(err); // 假如捕获到异常会发送一个毛病
  }
});

Observer视察者只是一组回调函数的鸠合,每一个回调函数对应一种 Observable 发送的关照范例:next、error 和 complete 。

var observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

Observer和Observable是经由历程subscribe要领竖立联络的

observable.subscribe(observer);

unsubscribe

observer定阅了Observable以后,还能够作废定阅

var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
// 稍后:
subscription.unsubscribe();

unsubscribe圈套:

let stream$ = new Rx.Observable.create((observer) => {
  let i = 0;
  let id = setInterval(() => {
    console.log('setInterval');
    observer.next(i++);
  },1000)
})

let subscription = stream$.subscribe((value) => {
  console.log('Value', value)
});

setTimeout(() => {
  subscription.unsubscribe();
}, 3000)

3秒后虽然作废了定阅,然则开启的setInterval定时器并不会自动清算,我们须要本身返回一个清算函数

let stream$ = new Rx.Observable.create((observer) => {
  let i = 0;
  let id = setInterval(() => {
    observer.next(i++);
  },1000)

  // 返回了一个清算函数
  return function(){
    clearInterval( id );
  }
})

let subscription = stream$.subscribe((value) => {
  console.log('Value', value)
});

setTimeout(() => {
  subscription.unsubscribe() // 在这我们挪用了清算函数

}, 3000)

Ajax异步操纵

<input type="text">
function sendRequest(search) {
  return Rx.Observable.ajax.getJSON(`http://deepred5.com/cors.php?search=${search}`)
    .map(response => response)
}

Rx.Observable.fromEvent(document.querySelector('input'), 'keyup')
  .map(e => e.target.value)
  .flatMap(search => sendRequest(search))
  .subscribe(value => {
    console.log(value)
  })

用户每次在input框每次举行输入,均会触发ajax请求,而且每一个ajax返回的值都邑被打印一遍

如今须要完成如许一个功用:
愿望用户在300ms之内住手输入,才发送请求(防抖),而且console打印出来的值只需近来的一个ajax返回的

Rx.Observable.fromEvent(document.querySelector('input'), 'keyup')
  .debounceTime(300)
  .map(e => e.target.value)
  .switchMap(search => sendRequest(search))
  .subscribe(value => {
    console.log(value)
  })

debounceTime示意经由n毫秒后,没有流入新值,那末才将值转入下一个环节
switchMap能作废上一个已无用的请求,只保存末了的请求结果流,如许就确保处置惩罚展现的是末了的搜刮的结果

能够看到,RxJS对异步的处置惩罚是异常优异的,对异步的结果能举行种种庞杂的处置惩罚和挑选。

React + Redux 的异步解决计划:redux-observable

Redux的action都是同步的,所以默许情况下也只能处置惩罚同步数据流。

为了天生异步action,处置惩罚异步数据流,有很多差别的解决计划,比方 redux-thunk、redux-promise、redux-saga 等等。

redux-thunk举例:

挪用一个异步API,起首要先定义三个同步action组织函数,离别示意

  • 请求最先
  • 请求胜利
  • 请求失利

然后再定义一个异步action组织函数,该函数不再是返回一般的对象,而是返回一个函数,在这个函数里,举行ajax异步操纵,然后依据返回的胜利和失利,离别挪用前面定义的同步action

actions.js


export const FETCH_STARTED = 'WEATHER/FETCH_STARTED';
export const FETCH_SUCCESS = 'WEATHER/FETCH_SUCCESS';
export const FETCH_FAILURE = 'WEATHER/FETCH_FAILURE';

// 一般action组织函数,返回一般对象
export const fetchWeatherStarted = () => ({
  type: FETCH_STARTED
});

export const fetchWeatherSuccess = (result) => ({
  type: FETCH_SUCCESS,
  result
})

export const fetchWeatherFailure = (error) => ({
  type: FETCH_FAILURE,
  error
})

// 异步action组织函数,返回一个函数
export const fetchWeather = (cityCode) => {
  return (dispatch) => {
    const apiUrl = `/data/cityinfo/${cityCode}.html`;

    dispatch(fetchWeatherStarted())

    return fetch(apiUrl).then((response) => {
      if (response.status !== 200) {
        throw new Error('Fail to get response with status ' + response.status);
      }

      response.json().then((responseJson) => {
        dispatch(fetchWeatherSuccess(responseJson.weatherinfo));
      }).catch((error) => {
        dispatch(fetchWeatherFailure(error));
      });
    }).catch((error) => {
      dispatch(fetchWeatherFailure(error));
    })
  };
}

如今假如想要异步请求,只需:

// fetchWeather是个异步action组织函数
dispatch(fetchWeather('23333'));

我们再来看看redux-observable:

挪用一个异步API,不再须要定义一个异步action组织函数,一切的action组织函数都只是返回一般的对象

那末ajax请求在那里发送?

答案是在Epic举行异步操纵

Epic是redux-observable的中心原语。

它是一个函数,吸收 actions 流作为参数而且返回 actions 流。 Actions 入, actions 出.

export const FETCH_STARTED = 'WEATHER/FETCH_STARTED';
export const FETCH_SUCCESS = 'WEATHER/FETCH_SUCCESS';
export const FETCH_FAILURE = 'WEATHER/FETCH_FAILURE';

export const fetchWeather = cityCode => ({ type: FETCH_STARTED, cityCode });
export const fetchWeatherSuccess = result => (
  { type: FETCH_SUCCESS, result };
);
export const fetchWeatherFailure = (error) => (
  {
    type: FETCH_FAILURE,
    error
  }
)

export const fetchWeatherEpic = action$ =>
  action$.ofType(FETCH_STARTED)
    .mergeMap(action =>
      ajax.getJSON(`/data/cityinfo/${action.cityCode}.html`)
        .map(response => fetchWeatherSuccess(response.weatherinfo))
        // 这个处置惩罚异常的action必需运用Observable.of要领转为一个observable
        .catch(error => Observable.of(fetchWeatherFailure(error)))
    );

如今假如想要异步请求,只需:

// fetchWeather只是个一般的action组织函数
dispatch(fetchWeather('23333'));

相较于thunk中间件,运用redux-observable来处置惩罚异步action,有以下长处:

  • 不须要修正action组织函数,返回的仍然是一般对象
  • epics中间件会将action封装成Observable对象,能够运用RxJs的响应api来掌握异步流程,它就像一个具有很多高等功用的Promise,如今我们在Redux中也能够获得它的优点。

总结

原生JS传统处理异步的体式格局:callback、Generator、Promise、async/await

RxJS处理的是数据流的题目,它能够让批量数据处置惩罚起来更轻易

能够设想的一些运用场景:

  • 多个服务端及时音讯流,经由历程RxJS举行高阶处置惩罚,末了到 view 层就是很清楚的一个Observable,然则view层自身处置惩罚用户事宜依旧能够相沿原有的范式。
  • 爬虫抓取,每次对一个网站的前5页做平行请求,每一个请求假如失利就重试,重试3次以后再摒弃。

能够看出,这类须要对流举行庞杂操纵的场景越发合适RxJS

公司内部现在的大部分体系,前端就能够不太合适用RxJS,由于大部分是背景CRUD体系,整体性、及时性的请求都不高,而且也没有迥殊庞杂的数据流操纵

我们推荐在合适RxJS的处所用RxJS,然则不强求RxJS for everything。RxJS给了我们另一种思索和处理题目的体式格局,但这不一定是必要的

参考

构建流式运用—RxJS详解

愿望是最浅显易懂的RxJS教学

RxJS入门指引和开端运用

30天通晓RxJS系列

    原文作者:深红
    原文地址: https://segmentfault.com/a/1190000013829356
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞