RxJS
《深入浅出RxJS》读书笔记
遗留问题
- Observable的HOT与COLD对应的实际场景,以及在编码中的体现
chapter1
html部分
测试你对时间的感觉
按住我一秒钟然后松手
你的时间:毫秒
jquery
实现var time = new Date().getTime(); $("#hold-me") .mousedown(function(event) { time = new Date().getTime(); }) .mouseup(function(event) { if (time) { var elapse = new Date().getTime() - time; $("#hold-time").text(elapse); // 重置time 避免直接触发mouseup事件,例如在A处点击然后在B处up time = null; } });
RxJS
实现const holdMeButton = document.getElementById("hold-me"); const mouseDown$ = Rx.Observable.fromEvent(holdMeButton,"mousedown"); const mouseUp$ = Rx.Observable.fromEvent(holdMeButton,"mouseup"); // 获取间隔时间 const holdTime$ = mouseUp$.timestamp().withLatestFrom(mouseDown$.timestamp(),(mouseUpEvent,mouseDownEvent)=>{ return mouseUpEvent.timestamp - mouseDownEvent.timestamp }); holdTime$.subscribe(ms=>{ document.getElementById("hold-time").innerText = ms; }) holdTime$.flatMap(ms=>{ return Rx.Observable.ajax('https://timing-sense-score-board.herokuapp.com/score/'+ms) }) .map(e=>e.response) .subscribe(res=>{ document.getElementById("rank").innerText = `你超过了${res.rank}% 的用户` })
chapter2
Koa2的使用
主要用来加载静态资源,所以使用到了
koa
,
koa-static
const path = require("path");
const koa = require("koa");
const serve = require("koa-static");
const app = new koa();
app.use(async function (ctx,next) {
console.log("收到请求...")
await next()
console.log(`"${ctx.path}"请求 已处理...`)
})
app.use(serve(path.resolve(__dirname, "../src"))).listen(3001,function(err){
if(err) throw err;
console.log("程序启动成功")
});
Observable
和 Observer
Observable 可被观察的对象,Observer观察者,Observer通过subscribe来观察Observable对象
RxJS的数据流就是Observable对象:
- 观察者模式
- 迭代器模式
举个栗子
// 使用 deep-link方式引入函数
const Observable = require("rxjs").Observable;
/*
* 定义Observable对象的行为,会产生数据,调用订阅者的next方法
* 1. 此处的Observer与订阅者行为 theObserver并不是同一个对象,而是对theObserver的包装
* 2. 如果observer.error被调用,之后的complete或者next就不会被调用啦,同理,complete被调用之后,也不会
* 再调用next或者error
* 3. 如果error或者complete一直未调用,则observer就一直在内存中等待被调用
*/
const onSubscribe = observer =>{
observer.next(1);
observer.error(2);
observer.complete(3);
}
// 产生一个Observable对象
const source$ = new Observable(onSubscribe);
// 定义观察者的行为 消费Observable对象产生的数据
const theObserver = {
next:item => console.log(item),
error:item => console.error(item),
complete:item => console.log("已完成"),
}
// 建立Observable与Observer的关系
source$.subscribe(theObserver)
退订subscribe
在订阅一段事件之后observer不再响应吐出的信息了,这时可以退订,但是Observeable还会一直产生数据
const Observable = require("rxjs").Observable;
const onSubscribe = observer =>{
let n = 1;
const handle = setInterval(()=>{
console.log(`in onSubscribe ${n}`)
// if(n>3){
// observer.complete()
// }
observer.next(n++);
},1000)
return {
unsubscribe(){
// clearInterval(handle)
}
}
}
const source$ = new Observable(onSubscribe);
const theObserver = {
next:item => console.log(item)
}
let subscription = source$.subscribe(theObserver)
setTimeout(()=>{
// 此处的unsubscribe也是封装过的
subscription.unsubscribe()
},3500)
在node
中执行,会一直打印 in onSubscribe *
,但是source$不会再响应
Chapter3 操作符基础
const Observable = require("rxjs/Observable").Observable;
const of = require("rxjs/observable/of").of;
const map = require("rxjs/operator/map").map;
// 新建一个操作符
// 此处this是外部变量,导致此operator不再是纯函数
Observable.prototype.double = function(){
// return this::map(x=>x*2)
return map.call(this,x=>x*2)
}
const source$ = of(1,3,4);
const result$ = source$.double();
result$.subscribe(value=>console.log(value))
lettable/pipeable操作符
解决需要使用call或者bind改变this的操作,这样是依赖外部环境的,不属于纯函数,也会丧失TS的类型检查优势
lettable
将Observable
对象传递给下文,避免使用this
const Observable = require("rxjs/Observable").Observable; require("rxjs/add/observable/of").of; require("rxjs/add/operator/map").map; require("rxjs/add/operator/let").let; const source$ = Observable.of(1,2,3); const double$ = obs$ => obs$.map(v=>v*2); // 接受上文,传递到下文 const result$ = source$.let(double$); result$.subscribe(console.log)
不引入`map`补丁,开发**lettable**写法的操作符
// ES5实现
function map(project){
return function(obj$){
// 通过上面的Observable生成一个新Observable
return new Observable(observer=>{
return obj$.subscribe({
next:value=>observer.next(project(value)),
error:err=>observer.error(err),
complete:()=>observer.complete()
})
})
}
}
// 添加操作符
var result$ = source$.let(map(x => x * 3));
// ES6实现
const map6 = fn => obj$ =>
new Observable(observer =>
obj$.subscribe({
next: value => observer.next(fn(value)),
error: err => observer.error(err),
complete: () => observer.complete()
})
);
// 添加操作符
var result$ = source$.let(map6(x => x * 4));
`pipeable`是`lettable`的别称,方便对于`lattable`的理解,V6以上才支持
## Chapter4 创建数据流
> 大多数的操作符是静态操作符
### 基础操作符
1. `create`简单的返回一个Observable对象
Observable.create = function(subscribe){
return new Observable(subscribe)
}
```
of
列举数据import {Observable} from "rxjs/Observable"; import "rxjs/add/observable/of" // 依次吐出数据,一次性emit const source$ = Observable.of(1,2,3); // 订阅 // 第一个参数是next,第二个参数是error回调,第三个参数是complete回调 source$.subscribe(console.log,null,()=>{console.log("Complete")})
range
产生指定范围的数据const sourc$ = Observable.range(/*初始值*/1,/*个数*/100); // 每次只能步进 1
generate
循环创建相当于for循环
const source$ = Observable.generate( // 初始值 2, // 判断条件 value=> value < 10, // 步进 value=> value+0.5, // 函数体,产生的结果 value=> value*value )
使用
generate
代替rangeconst range = function(min,count){ const max = min + count; return Observable.generate(min,v=>vv+1,v=>v*v) }
repeat
重复数据的数据流实例操作符,通过
import 'rxjs/add/operator/repeat'
引入- V4版本中repeat是静态属性,这样在使用
Observable.repeat(1,2)
重复1两次,这样数据就够灵活 - V5版本中改为实例属性之后,Observable.of(1,2,4).repeat(2),将产生的1,2,3重复两次,功能更加强大
const Observable = require("rxjs").Observable; require("rxjs/add/operator/repeat"); const source$ = Observable.create(observer => { setTimeout(() => { observer.next(1); }, 1000); setTimeout(() => { observer.next(2); }, 2000); setTimeout(() => { observer.next(3); }, 3000); setTimeout(() => { observer.complete(1); }, 4000); return { unsubscribe(){ console.log("on Unsubscribe") } } }); const repeat$ = source$.repeat(2) repeat$.subscribe(console.log,null,()=>{console.log("Complete")}) // 1 // 2 // 3 // on Unsubscribe // 1 // 2 // 3 // Complete // on Unsubscribe
如果没有
observer.complete()
repeat不会被调用repeat以complete为契机会再次执行数据源,如果上游一直没有complete下游就不会执行
- 因为
repeat
的存在,第一次数据源执行完(以complete为契机)后并不会执行observer的complete回调
- V4版本中repeat是静态属性,这样在使用
-
empty
,throw
,never
创建异步数据的Observable对象
interval
和timer
interval
类似于setIntervalrequire('rxjs/add/observable/interval') // 每隔1000ms产生一个数据,初始值为0,步进为1 Observable.interval(1000)'
timer
是setTimeout的超集// 1000ms后开始产生数据,之后每隔1000ms产生一个数据,功能相当于interval Observable.timer(1000,1000) // 指定日期 Observable.time(new Date(new Date().getTime() + 12000))
from
把一切转化为Observable
- 将所有的Iterable的对象都转化为Observable对象
- 可以将Promise对象转化为Observable对象,功能与fromPromise相同
fromPromise
异步处理的对接const Observable = require("rxjs").Observable; require("rxjs/add/observable/fromPromise"); const promise = Promise.resolve(123); Observable.fromPromise(promise).subscribe(console.log, null, () => console.log("Complete") ); //123 //Complete const promise1 = Promise.reject("error"); Observable.from( console.log, err => console.log("catch", err), () => console.log("Complete!") ); // 未捕获的Promise错误 // (node:765) UnhandledPromiseRejectionWarning: error // (node:765) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing // inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). ( // rejection id: 1) // (node:765) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
fromEvent
连接DOM与RxJS的桥梁const event$ = Observable.fromEvent(document.getElementById("btn"),"click"); event$.subscribe(event=>{ // Do something })
在NodeJs中可以与
EventEmitter
交互const Observable = require("rxjs").Observable; const EventEmitter = require("events"); require("rxjs/add/observable/fromEvent") const emitter = new EventEmitter(); const source$ = Observable.fromEvent(emitter,"msg"); source$.subscribe(console.log,null,()=>console.log("Complete")) emitter.emit("msg",1) // 1 emitter.emit("msg","haha") // haha emitter.emit("a-msg","haha") // emitter.emit("msg",'nihao') // nihao
fromEvent
是Hot Observable,也就是数据的产生和订阅无关,对于fromEvent
来说,数据源是外部产生的,不受RxJS
控制,这是Hot Observable
对象的特点fromEventPattern
针对不规范的事件源规范的事件源:DOM事件,EventEmitter事件
-
ajax
见最上面的例子 repeatWhen
例如 在上游事件结束之后的一段时间再重新订阅
const Observable = require("rxjs").Observable; require("rxjs/add/operator/repeatWhen") const notifier = ()=>{ return Observable.interval(1000); } const source$ = Observable.of(1,2,3); // const source$ = Observable.create(observer=>{ // observer.next(111); // return { // unsubscribe(){ // console.log("on Unsubscribe") // } // } // }); const repeat$ = source$.repeatWhen(notifier); repeat$.subscribe(console.log,null,()=>console.log("Complete")) // 每隔一秒产生一次 // 1 // 2 // 3 // 1 // 2 // 3 // 1 // 2 // 3 // 1 // 2 // 3 // 1 // 2 // 3 // ^C
defer
延迟创建Observable针对Observable占用内存比较大的情况,懒加载
const Observable = require("rxjs").Observable; require("rxjs/add/observable/defer"); require("rxjs/add/observable/of"); const observableFactory = ()=>Observable.of(1,2,3); const source$ = Observable.defer(observableFactory)
合并数据流
功能需求 操作符 把多个数据流以首尾相连的方式合并 concat
,concatAll
把多个数据流以先到先得的方式合并 merge
,mergeAll
把多个数据流中的数据以一一对应的方式合并 zip
和zipAll
持续合并多个数据流中最新产生的数据 combineLatest
,combineAll
,withLatestFrom
从多个数据流中选取第一个产生内容的数据流 race
在数据流前面添加一个指定数据 startWith
只获取多个数据流最后产生的数据 forkJoin
从高阶数据流中切换数据源 switch
,exhaust
concat
- 实例方法
- 静态方法,如果两个数据没有先后关系,推荐使用此方法
实例方法
const Observable = require("rxjs").Observable; require("rxjs/add/operator/of") require("rxjs/add/operator/concat") const source$1 = Observable.of(1,2,3); const source$2 = Observable.of(4,5,6); source$1.concat(source$2).subscribe(console.log,null,()=>console.log("Complete"))
静态方法
const Observable = require("rxjs").Observable; require("rxjs/add/operator/of") require("rxjs/add/observable/concat") const source$1 = Observable.of(1,2,3); const source$2 = Observable.of(4,5,6); Observable .concat(source$1,source$2) .subscribe(console.log,null,()=>console.log("Complete"))
`concat`在将上一个数据源传递下去的时候会调用上一个`Observable`的`unsubscribe`,如果上一个`Observable`一直为完结,后续的都不会被调用
```javascript
const source$1 = Observable.internal(1000);
const source$2 = Observable.of(1);
const concated$ = Observable.concat(source$1,source$2);
// 此时 source$2永远不会被调用
```
在此推测:`rxjs/add/operator/*`下的属性都是实例属性,`rxjs/add/observable/*`下的属性都是实例属性
merge
先到先得merge用在同步数据的情况下和concat表现只,不建议使用
const Observable = require("rxjs").Observable; require("rxjs/add/operator/merge"); require("rxjs/add/operator/map"); require("rxjs/add/observable/timer"); const source$1 = Observable.timer(0, 1000).map(x => x + "A"); const source$2 = Observable.timer(500, 1000).map(x => x + "B"); const source$3 = Observable.timer(1000, 1000).map(x => x + "C"); // 此时 source$1与source$2永远不会停止,所以 source$1 .merge(source$2, source$3, /*此参数限制了合并的Observable的个数*/ 2) .subscribe(console.log, null, () => console.log("Complete")); // 0A // 0B // 1A // 1B // 2A // 2B // 3A // 3B // 4A // 4B // ^C