结合promise与websocket的发布/订阅模式实践
本文初衷
最近恰好在公司做了一个聊天室系统,所以在系统中做了一下对websocket进行的promise化改造,所以想写篇文章总结一下,如果大家有什么更好的方法或者心得感悟,欢迎交流
技术栈
dva + protobuf
考虑到protobuf对websocket并没什么本质影响,所以本文就不涉及了
业务场景
基于websocket的聊天室系统
开发痛点
- 可能存在按顺序触发的请求
eg. 删除req—确认删除rsp—刷新页面req—刷新页面rsp
但由于并非所有的删除操作后都会刷新页面,所以考虑是不是可以使用发布订阅模式来模拟类似promise的流式操作 - 存在同一类型请求短时间内多次触发时,如何寻找每一条回复信息的发射源,考虑可以使用promise池+唯一识别码token来实现
- 由于dva的异步操作是基于redux-saga的,所以如果可以用promise完成与websocket的互动,那么对于effects中使用yield控制异步流程,也会是一个很好的体验
实现原理
首先,这一套系统的一切前提是请求的唯一标识符token,前端发送给服务器之后,服务器必须要把这个token跟数据放在一起发回来才行
本系统的实现原理是
对websocket的send方法进行封装
发送阶段
1. send执行时,先生成一个promise,及其唯一token
2. 将promise的resolve, reject,token,及其他需要的信息放入一个对象,然后推入一个promise池中
3. 执行websocket的send方法
4. return 这个promise
接收阶段
1. 收到回复消息时,先从promise池中对token进行匹配
2. 根据回复的状态决定执行resolve或reject
3. 其他需要的操作
实现代码
// 每一个实例都只能open一条socket线路,用锁机制防止重复open
// 本例中不使用心跳检测,为了方便,只要close是非主动触发且前端能捕捉到的(如浏览器主动断开,服务器主动断开),都会进行自动重连
export class MyWebSocket {
constructor(url) {
this.url = url;
// close来源判断及后续操作
this.closeConfig = {
resolve: null,
closing: false
}
// promise池
this.promisePool = [];
}
tokenCheck(req, rsp) {
// 此处根据自己的数据结构进行tokenCheck的判断,返回一个boolean
}
open() {
return new Promise((resolve, reject) => {
if (typeof this._websocket === 'undefined') {
this._websocket = new WebSocket(this.url);
this._websocket.open = (e) => {
resolve({e, ws: this});
};
this._websocket.onerror = (e) => {
reject(e);
}
}
this._websocket.onclose = (e) => {
// 非主动close
if (!this.closeConfig.closing) {
console.log('reconnect');
// 对应的重连操作
}
// 若手动close,恢复初始状态
this.closeConfig.closing = false;
}
this._websocket.onmessage = (e) => {
this.promisePool = this.promisePool.filter((item) => {
if (this.tokenCheck(req, rsp) {
req.resolve(rsp);
return false;
}
return true;
})
};
});
}
close() {
this.closeConfig.closing = true;
this._websocket.close();
}
// token包含在content中
send(name, content) {
return new Promise((resolve, reject) => {
this.promisePool.push({
content,
resolve,
reject,
name
});
this._websocket.send({name, content});
});
}
大概流程就是这样,具体的样例如下
*test () {
const ws = new MyWebSocket('www.mywebsocket.com');
yield ws.open();
yield ws.send(.....).then(()=>{...});
yield ws.send(.....).then(()=>{...});
}
本文呢大概就是这么多了,如果有什么错误或者遗漏的地方还请大家多多指教
v0.0.2
采取了评论大佬的建议,将promise池从数组改为对象,直接将token做为key,查询起来也非常方便
export class MyWebSocket {
constructor(url) {
this.url = url;
// close来源判断及后续操作
this.closeConfig = {
resolve: null,
closing: false
}
// promise池
this.promisePool = {};
}
tokenCheck(req, rsp) {
// 此处根据自己的数据结构进行tokenCheck的判断,返回一个boolean
}
open() {
return new Promise((resolve, reject) => {
if (typeof this._websocket === 'undefined') {
this._websocket = new WebSocket(this.url);
this._websocket.open = (e) => {
resolve({e, ws: this});
};
this._websocket.onerror = (e) => {
reject(e);
}
}
this._websocket.onclose = (e) => {
// 非主动close
if (!this.closeConfig.closing) {
console.log('reconnect');
// 对应的重连操作
}
// 若手动close,恢复初始状态
this.closeConfig.closing = false;
}
this._websocket.onmessage = (e) => {
const key = e.content.token;
const req = this.promisePool[key]
req.resolve(e);
delete this.promisePool[key];
};
});
}
close() {
this.closeConfig.closing = true;
this._websocket.close();
}
// token包含在content中
send(name, content) {
return new Promise((resolve, reject) => {
this.promisePool[content.token] = {
content,
resolve,
reject,
name
};
this._websocket.send({name, content});
});
}