queue-fun —— nodejs下基于Promise的队列控制模块。

工作告一段落,闲来无事,写了一个在nodejs实现“半阻塞”的控制程序。

一直以来,nodejs以单线程非阻塞,高并发的特性而闻名。搞这个“半阻塞”是东西,有什么用呢?

场景一:

现在的web应用可有都是一个这样的结构:

http服务(node) > 接口(业务逻辑) > 数据库

很多时候,瓶颈一般出现在业务层,或者数据层。更多的可能是某一个业务的处理,拉下整个系统的性能。

当用户或一些不怀好意的人,故意大量调用这些处理逻辑,好吧,你nodejs是非阻塞的,这一大波处理请求就一窝蜂冲到到业务层,很可能导致整个系统性能下降,或者瘫痪。

如果这个时候,node层就把这些耗资源的请求,排好队,控制好并发,甚至分用户排成队,配置好不同身份的用户并发量。是不是很好。

场景二:

应用中,可能会有一些无需即时处理的同一类业务,处理前都需要收集一次资源,处理业务,处理后再清理一次。高消耗工作主要在收集或是清理上。

这个时候,我们可将要处理的业务暂存到队列中,当队列数量到达一个值或是某个时间点时,我们一次性处理完队列中的任务。在消耗上,只做一次“收集”、“清理”的操作。

可见,如果在小型的项目中如果node方便实现类似线程池的功能,对整个项目的稳定性及工作效率是有很大贡献的。

现在说一说我写的这一个“半阻塞”的东西,为什么要基于Promise;

如果你用过q.js或了解EC6,应该对Promise有所了解,不然还请先了解下。

我的想法是,并不需要把整个业务的后续处理全都放到队列中去,而只是将高消耗的那一部分放入队列,利用Promise的异部处理机制来处理后续的操作。

在编写代码的时候你几乎可以忘记队列的存在,但是他就在那里默默的工作着,代码可读性和灵活性没有丝毫影响。

 

queue-fun

queue-fun 是基于Promise的 运行队列控制类。

使用场景

  • 巨量同逻辑业务平稳处理
  • 间歇性高并发系统
  • 控制单用户占用资源过高

队列

queue-fun.Queue(q)

初始化队列控制 参数q可传

  • 无参数 队列使用内置的实现的Promise;
  • q/ES6原生Promise 插入队列方法: push unshift go jump返回对应的promise
实例化队列 new queue-fun.Queue()(runMax, con)
  • runMax 并行运行队列方法的最大个数
  • con 配置队列 开始 结束 事件,运行单元的 成功,失败 事件及配置执行单元出错的 重试 机制。
var queue = new queue-fun.Queue()(100,{ "event_succ":function(){} //成功 ,"event_err":function(){} //失败 ,"event_begin":function(){} //队列开始 ,"event_end":function(){} //队列完成 ,"event_add":function(){} //有执行项添加进执行单元后执行,注意go及jump不会触发 ,"retryON":0 //队列单元出错重试次数 ,"retryType":0 //重试模式true/false(优先/搁置)执行 })`

API

queue.push(promisefun, args[]con)

向队列中尾部添加运行单元 fun: promise function args: 传入的参数 con 默认值

{
    'event_succ':null //一般没用,某些情况下,可能比then要方便 ,'event_err':null //一般没用,某些情况下,可能比then要方便 ,'Queue_event':true //默认会执行队列定义的回调 }
queue.unshift(promisefun, args[]con) 同push 向队列中头部添加运行单元
queue.go(promisefun, args[]con) 同push,添加后会启动队列.
queue.jump(promisefun, args[]con) 同unshift,添加后启动队列.
setMax(newMax)

修改并行数

queue.start()

启动队列

queue.stop()

暂停队列

queue.clear()

清空队列

demo

var queuefun = require('queue-fun'); //引入 //初始化Promise异步队列类 var Queue = queuefun.Queue(); //实列化最大并发为2的运行队列 var queue1 = new Queue(2,{ "event_succ":function(data){console.log('queue-succ:',data)} //成功 ,"event_err":function(err){console.log('queue-succ:',data)} //失败 }); var q = queuefun.Q; //模块中简单实现了Q的基本功能,可以一试, //定义一个Promise风格的异步方法 function testfun(i){ var deferred = q.defer(); setTimeout(function(){ if(i\ && i % 3 == 0){ deferred.reject(new Error("err " + i)) }else{ deferred.resolve(i) } },(Math.random() * 2000)>>0) return deferred.promise; } //向队列添加运行单元 queue1.push(testfun,[1]) //添加运行项 queue1.go(testfun,[2]) //添加并自动启动队列 queue1.go(testfun,[3],{Queue_event:0}) //添加不会触发队列 回调的运行项. queue1.go(testfun,[4]).then( function(data){console.log('done-succ:',data)}, function(err){console.log('done-err:',err)} ) queue1.go(testfun,[5],{ event_succ:function(data){console.log('conf-succ:',data)}, event_err:function(err){console.log('conf-err:',err)} })

关于内置Promise实现类queuefun.Q

实现了Promises/A+规范及done,spread,fail;
API模仿Q;
模拟实现了 q.defer,q.Promise,q.all,q.any,q.nfcall,q.nfapply,q.denodeify 等函数.

.toPromis(obj).then()

如果你习惯了.then风格写代码,你可以尝试用toPromis将普通函数/语句包装一下,让他可以获得then方法,及捕获错误。

var add = function(a,b){return a+b;} q.toPromis(function(){return add(a+b)}) .then(console.log,console.error)

待完善

  • 集群支持
  • 内存溢出隐患处理
  • 其它Promise实现类的支持

安装:npm install quque-fun

github: https://github.com/cnwhy/queue-fun

 

    原文作者:算法小白
    原文地址: https://www.cnblogs.com/whyoop/p/4538502.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞