RxJS
《深入浅出RxJS》读书笔记
遗留问题
- Observable的HOT与COLD对应的现实场景,以及在编码中的表现
chapter1
html部份
测试你对时刻的觉得
按住我一秒钟然后放手
你的时刻:毫秒
jquery
完成var time = new Date().getTime(); $("#hold-me") .mousedown(function(event) { time = new Date().getTime(); }) .mouseup(function(event) { if (time) { var elapse = new Date().getTime() - time; $("#hold-time").text(elapse); // 重置time 防止直接触发mouseup事宜,比方在A处点击然后在B处up time = null; } });
RxJS
完成const holdMeButton = document.getElementById("hold-me"); const mouseDown$ = Rx.Observable.fromEvent(holdMeButton,"mousedown"); const mouseUp$ = Rx.Observable.fromEvent(holdMeButton,"mouseup"); // 猎取间隔时刻 const holdTime$ = mouseUp$.timestamp().withLatestFrom(mouseDown$.timestamp(),(mouseUpEvent,mouseDownEvent)=>{ return mouseUpEvent.timestamp - mouseDownEvent.timestamp }); holdTime$.subscribe(ms=>{ document.getElementById("hold-time").innerText = ms; }) holdTime$.flatMap(ms=>{ return Rx.Observable.ajax('https://timing-sense-score-board.herokuapp.com/score/'+ms) }) .map(e=>e.response) .subscribe(res=>{ document.getElementById("rank").innerText = `你超过了${res.rank}% 的用户` })
chapter2
Koa2的运用
重要用来加载静态资本,所以运用到了
koa
,
koa-static
const path = require("path");
const koa = require("koa");
const serve = require("koa-static");
const app = new koa();
app.use(async function (ctx,next) {
console.log("收到要求...")
await next()
console.log(`"${ctx.path}"要求 已处置惩罚...`)
})
app.use(serve(path.resolve(__dirname, "../src"))).listen(3001,function(err){
if(err) throw err;
console.log("顺序启动胜利")
});
Observable
和 Observer
Observable 可被视察的对象,Observer视察者,Observer经由过程subscribe来视察Observable对象
RxJS的数据流就是Observable对象:
- 视察者形式
- 迭代器形式
举个栗子
// 运用 deep-link体式格局引入函数
const Observable = require("rxjs").Observable;
/*
* 定义Observable对象的行动,会发生数据,挪用定阅者的next要领
* 1. 此处的Observer与定阅者行动 theObserver并非同一个对象,而是对theObserver的包装
* 2. 假如observer.error被挪用,以后的complete或许next就不会被挪用啦,同理,complete被挪用以后,也不会
* 再挪用next或许error
* 3. 假如error或许complete一向未挪用,则observer就一向在内存中守候被挪用
*/
const onSubscribe = observer =>{
observer.next(1);
observer.error(2);
observer.complete(3);
}
// 发生一个Observable对象
const source$ = new Observable(onSubscribe);
// 定义视察者的行动 消耗Observable对象发生的数据
const theObserver = {
next:item => console.log(item),
error:item => console.error(item),
complete:item => console.log("已完成"),
}
// 竖立Observable与Observer的关联
source$.subscribe(theObserver)
退订subscribe
在定阅一段事宜以后observer不再相应吐出的信息了,这时刻能够退订,然则Observeable还会一向发生数据
const Observable = require("rxjs").Observable;
const onSubscribe = observer =>{
let n = 1;
const handle = setInterval(()=>{
console.log(`in onSubscribe ${n}`)
// if(n>3){
// observer.complete()
// }
observer.next(n++);
},1000)
return {
unsubscribe(){
// clearInterval(handle)
}
}
}
const source$ = new Observable(onSubscribe);
const theObserver = {
next:item => console.log(item)
}
let subscription = source$.subscribe(theObserver)
setTimeout(()=>{
// 此处的unsubscribe也是封装过的
subscription.unsubscribe()
},3500)
在node
中实行,会一向打印 in onSubscribe *
,然则source$不会再相应
Chapter3 操纵符基本
const Observable = require("rxjs/Observable").Observable;
const of = require("rxjs/observable/of").of;
const map = require("rxjs/operator/map").map;
// 新建一个操纵符
// 此处this是外部变量,致使此operator不再是纯函数
Observable.prototype.double = function(){
// return this::map(x=>x*2)
return map.call(this,x=>x*2)
}
const source$ = of(1,3,4);
const result$ = source$.double();
result$.subscribe(value=>console.log(value))
lettable/pipeable操纵符
处理须要运用call或许bind转变this的操纵,如许是依靠外部环境的,不属于纯函数,也会损失TS的范例搜检上风
lettable
将Observable
对象通报给下文,防止运用this
const Observable = require("rxjs/Observable").Observable; require("rxjs/add/observable/of").of; require("rxjs/add/operator/map").map; require("rxjs/add/operator/let").let; const source$ = Observable.of(1,2,3); const double$ = obs$ => obs$.map(v=>v*2); // 接收上文,通报到下文 const result$ = source$.let(double$); result$.subscribe(console.log)
不引入`map`补丁,开辟**lettable**写法的操纵符
// ES5完成
function map(project){
return function(obj$){
// 经由过程上面的Observable天生一个新Observable
return new Observable(observer=>{
return obj$.subscribe({
next:value=>observer.next(project(value)),
error:err=>observer.error(err),
complete:()=>observer.complete()
})
})
}
}
// 增加操纵符
var result$ = source$.let(map(x => x * 3));
// ES6完成
const map6 = fn => obj$ =>
new Observable(observer =>
obj$.subscribe({
next: value => observer.next(fn(value)),
error: err => observer.error(err),
complete: () => observer.complete()
})
);
// 增加操纵符
var result$ = source$.let(map6(x => x * 4));
`pipeable`是`lettable`的别称,轻易关于`lattable`的明白,V6以上才支撑
## Chapter4 建立数据流
> 大多数的操纵符是静态操纵符
### 基本操纵符
1. `create`简朴的返回一个Observable对象
Observable.create = function(subscribe){
return new Observable(subscribe)
}
```
of
枚举数据import {Observable} from "rxjs/Observable"; import "rxjs/add/observable/of" // 顺次吐出数据,一次性emit const source$ = Observable.of(1,2,3); // 定阅 // 第一个参数是next,第二个参数是error回调,第三个参数是complete回调 source$.subscribe(console.log,null,()=>{console.log("Complete")})
range
发生指定局限的数据const sourc$ = Observable.range(/*初始值*/1,/*个数*/100); // 每次只能步进 1
generate
轮回建立相当于for轮回
const source$ = Observable.generate( // 初始值 2, // 推断前提 value=> value < 10, // 步进 value=> value+0.5, // 函数体,发生的效果 value=> value*value )
运用
generate
替代rangeconst range = function(min,count){ const max = min + count; return Observable.generate(min,v=>vv+1,v=>v*v) }
repeat
反复数据的数据流实例操纵符,经由过程
import 'rxjs/add/operator/repeat'
引入- V4版本中repeat是静态属性,如许在运用
Observable.repeat(1,2)
反复1两次,如许数据就够天真 - V5版本中改成实例属性以后,Observable.of(1,2,4).repeat(2),将发生的1,2,3反复两次,功用越发壮大
const Observable = require("rxjs").Observable; require("rxjs/add/operator/repeat"); const source$ = Observable.create(observer => { setTimeout(() => { observer.next(1); }, 1000); setTimeout(() => { observer.next(2); }, 2000); setTimeout(() => { observer.next(3); }, 3000); setTimeout(() => { observer.complete(1); }, 4000); return { unsubscribe(){ console.log("on Unsubscribe") } } }); const repeat$ = source$.repeat(2) repeat$.subscribe(console.log,null,()=>{console.log("Complete")}) // 1 // 2 // 3 // on Unsubscribe // 1 // 2 // 3 // Complete // on Unsubscribe
假如没有
observer.complete()
repeat不会被挪用repeat以complete为契时机再次实行数据源,假如上游一向没有complete下流就不会实行
- 由于
repeat
的存在,第一次数据源实行完(以complete为契机)后并不会实行observer的complete回调
- V4版本中repeat是静态属性,如许在运用
-
empty
,throw
,never
建立异步数据的Observable对象
interval
和timer
interval
类似于setIntervalrequire('rxjs/add/observable/interval') // 每隔1000ms发生一个数据,初始值为0,步进为1 Observable.interval(1000)'
timer
是setTimeout的超集// 1000ms后最先发生数据,以后每隔1000ms发生一个数据,功用相当于interval Observable.timer(1000,1000) // 指定日期 Observable.time(new Date(new Date().getTime() + 12000))
from
把统统转化为Observable
- 将一切的Iterable的对象都转化为Observable对象
- 能够将Promise对象转化为Observable对象,功用与fromPromise雷同
fromPromise
异步处置惩罚的对接const Observable = require("rxjs").Observable; require("rxjs/add/observable/fromPromise"); const promise = Promise.resolve(123); Observable.fromPromise(promise).subscribe(console.log, null, () => console.log("Complete") ); //123 //Complete const promise1 = Promise.reject("error"); Observable.from( console.log, err => console.log("catch", err), () => console.log("Complete!") ); // 未捕捉的Promise毛病 // (node:765) UnhandledPromiseRejectionWarning: error // (node:765) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing // inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). ( // rejection id: 1) // (node:765) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
fromEvent
衔接DOM与RxJS的桥梁const event$ = Observable.fromEvent(document.getElementById("btn"),"click"); event$.subscribe(event=>{ // Do something })
在NodeJs中能够与
EventEmitter
交互const Observable = require("rxjs").Observable; const EventEmitter = require("events"); require("rxjs/add/observable/fromEvent") const emitter = new EventEmitter(); const source$ = Observable.fromEvent(emitter,"msg"); source$.subscribe(console.log,null,()=>console.log("Complete")) emitter.emit("msg",1) // 1 emitter.emit("msg","haha") // haha emitter.emit("a-msg","haha") // emitter.emit("msg",'nihao') // nihao
fromEvent
是Hot Observable,也就是数据的发生和定阅无关,关于fromEvent
来讲,数据源是外部发生的,不受RxJS
掌握,这是Hot Observable
对象的特性fromEventPattern
针对不范例的事宜源范例的事宜源:DOM事宜,EventEmitter事宜
-
ajax
见最上面的例子 repeatWhen
比方 在上游事宜完毕以后的一段时刻再从新定阅
const Observable = require("rxjs").Observable; require("rxjs/add/operator/repeatWhen") const notifier = ()=>{ return Observable.interval(1000); } const source$ = Observable.of(1,2,3); // const source$ = Observable.create(observer=>{ // observer.next(111); // return { // unsubscribe(){ // console.log("on Unsubscribe") // } // } // }); const repeat$ = source$.repeatWhen(notifier); repeat$.subscribe(console.log,null,()=>console.log("Complete")) // 每隔一秒发生一次 // 1 // 2 // 3 // 1 // 2 // 3 // 1 // 2 // 3 // 1 // 2 // 3 // 1 // 2 // 3 // ^C
defer
耽误建立Observable针对Observable占用内存比较大的状况,懒加载
const Observable = require("rxjs").Observable; require("rxjs/add/observable/defer"); require("rxjs/add/observable/of"); const observableFactory = ()=>Observable.of(1,2,3); const source$ = Observable.defer(observableFactory)
兼并数据流
功用需求 操纵符 把多个数据流以首尾相连的体式格局兼并 concat
,concatAll
把多个数据流以先到先得的体式格局兼并 merge
,mergeAll
把多个数据流中的数据以一一对应的体式格局兼并 zip
和zipAll
延续兼并多个数据流中最新发生的数据 combineLatest
,combineAll
,withLatestFrom
从多个数据流中拔取第一个发生内容的数据流 race
在数据流前面增加一个指定数据 startWith
只猎取多个数据流末了发生的数据 forkJoin
从高阶数据流中切换数据源 switch
,exhaust
concat
- 实例要领
- 静态要领,假如两个数据没有前后关联,引荐运用此要领
实例要领
const Observable = require("rxjs").Observable; require("rxjs/add/operator/of") require("rxjs/add/operator/concat") const source$1 = Observable.of(1,2,3); const source$2 = Observable.of(4,5,6); source$1.concat(source$2).subscribe(console.log,null,()=>console.log("Complete"))
静态要领
const Observable = require("rxjs").Observable; require("rxjs/add/operator/of") require("rxjs/add/observable/concat") const source$1 = Observable.of(1,2,3); const source$2 = Observable.of(4,5,6); Observable .concat(source$1,source$2) .subscribe(console.log,null,()=>console.log("Complete"))
`concat`在将上一个数据源通报下去的时刻会挪用上一个`Observable`的`unsubscribe`,假如上一个`Observable`一向为结束,后续的都不会被挪用
```javascript
const source$1 = Observable.internal(1000);
const source$2 = Observable.of(1);
const concated$ = Observable.concat(source$1,source$2);
// 此时 source$2永久不会被挪用
```
在此推想:`rxjs/add/operator/*`下的属性都是实例属性,`rxjs/add/observable/*`下的属性都是实例属性
merge
先到先得merge用在同步数据的状况下和concat表现只,不发起运用
const Observable = require("rxjs").Observable; require("rxjs/add/operator/merge"); require("rxjs/add/operator/map"); require("rxjs/add/observable/timer"); const source$1 = Observable.timer(0, 1000).map(x => x + "A"); const source$2 = Observable.timer(500, 1000).map(x => x + "B"); const source$3 = Observable.timer(1000, 1000).map(x => x + "C"); // 此时 source$1与source$2永久不会住手,所以 source$1 .merge(source$2, source$3, /*此参数限定了兼并的Observable的个数*/ 2) .subscribe(console.log, null, () => console.log("Complete")); // 0A // 0B // 1A // 1B // 2A // 2B // 3A // 3B // 4A // 4B // ^C