RabbitMQ 基础教程(2) - Work Queue

RabbitMQ 基础教程(2) – Work Queue

注:本文是对浩瀚博客的进修和总结,能够存在明白毛病。请带着疑心的眼力,同时如果有毛病希望能指出。

如果你喜欢我的文章,能够关注我的私家博客:http://blog-qeesung.rhcloud.com/

在上一篇文章 RabbitMQ 基础教程(1) – Hello World 中,我们已简朴的引见了RabbitMQ以及怎样发送和吸收一个音讯。接下来我们将继承深切RabbitMQ,研究一下音讯行列(Work Queue)

音讯行列

音讯的宣布者宣布一个音讯到音讯行列中,然后信息的消耗者掏出音讯举行消耗。

                                 queue
 +-------------+      +--+--+--+--+--+--+     +-------------+
 |   producer  |----->|m1|m2| ... |  |  |---->|   consumer  |
 +-------------+      +--+--+--+--+--+--+     +-------------+

然则实际情况每每比这个要庞杂,如果我们有多个信息的宣布者和多个信息的消耗者,那RabbitMQ又将会是怎样事变呢?


+--------------+                              +--------------+
|   producer1  +-                           / |  consumer1   |
+--------------+ \-          queue         /- +--------------+
+--------------+   \- +---+---+---+----+ /-   +--------------+
|   producer2  +---->X|m1 |m2 |m3 |... |\---->|  consumer2   |
+--------------+   /- +---+---+---+----+ \-   +--------------+
+--------------+ /-                        \- +--------------+
|      ...     |/                            \|      ...     |
+--------------+                              +--------------+

Round-robin 分发算法

RabbitMQ中,如果有多个消耗者同时消耗同一个音讯行列,那末就经由历程Round-robin算法将音讯行列中的音讯匀称的分派给每个消耗者。

这个算法实在很简朴,每收到一个新的音讯,就将这个音讯分发给高低一个消耗者。比方上一个消耗者是consumer-n,那末有新音讯来的时刻就将这个新的音讯宣布到consumer-n+1,以此类推,如果到了末了一个消耗者,那末就又从第一个最先。即:consumer-index = (consumer-index + 1) mod consumer-number

为了演示,首先来做几项准备事变。

定义使命 task.js

/**
 * 竖立一个使命 
 * @param taskName 使命名字
 * @param costTime 使命话费的时候
 * @param callback 使命完毕今后的回调函数
 * @constructor
 */
function Task(taskName ,costTime , callback){
    if(typeof(costTime) !== 'number')
        costTime = 0; // no delay there 
    setTimeout(function () {
        console.log(taskName+" finished");
        if(callback && typeof (callback) === 'function')
            callback();
    } , 1000*costTime);
};

串行化的音讯使命构造

使命宣布者担任将该构造宣布到行列中,然后消耗者掏出音讯,新建使命最先实行。

{
    taskName : 'taskname',
    costTime : 1
}

竖立使命音讯 task-producer.js

var amqp = require('amqplib/callback_api');

// 衔接上RabbitMQ服务器
amqp.connect('amqp://localhost', function(err, conn) {
    conn.createChannel(function(err, ch) {
        var q = 'tasks';

        // 获得发送音讯的数量,默许发送4个
        var name;
        var cost;
        
        (function () {
            if(process.argv.length < 4 )
            {
                console.error('ERROR : usage - node rabbit-producer <taskname> <costtime>');
                process.exit(-1);
            }
            
            name = process.argv[2];
            cost = +process.argv[3];
        })();

        // 新建行列,然后将行列中的音讯耐久化作废
        ch.assertQueue(q, {durable: true});
        // 将使命串行化存入Buffer中,并推入行列
        ch.sendToQueue(q, new Buffer(JSON.stringify({taskName :name ,costTime :cost })),{persistent:true});
        console.log(" [x] Sent "+name);
        setTimeout(function () {
            process.exit(0);
        },500);
    });
});

消耗使命音讯 task-consumer.js

var amqp = require('amqplib/callback_api');
var Task = require('./task.js');

amqp.connect('amqp://localhost', function(err, conn) {
    conn.createChannel(function(err, ch) {
        var q = 'tasks';

        ch.assertQueue(q, {durable: true});
        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
        // 监听行列上面的音讯
        ch.consume(q, function(msg) {
            var obj = JSON.parse(msg.content.toString('utf8'));
            console.log('Get the task '+obj.taskName);
            // 定义新的使命
            new Task(obj.taskName,obj.costTime);
        }, {noAck: true});
    });
});

如今开启两个消耗者历程来守候消耗tasks行列中的音讯

# shell1
node task-consumer.js

# shell2
node task-consumer.js

然后向行列中推入三个音讯

# shell3
node task-producer.js task1 0
node task-producer.js task2 0
node task-producer.js task3 0

运转效果

# shell1
[*] Waiting for messages in tasks. To exit press CTRL+C
Get the task task1
task1 finished
Get the task task3
task3 finished

# shell2
[*] Waiting for messages in tasks. To exit press CTRL+C
Get the task task2
task2 finished

# 已经由历程Round-robin算法将音讯行列中的音讯分派到衔接的消耗者中了.

音讯,行列耐久化

仔细的读者能够已发现了我们在声明行列发送音讯的代码块中改动了一小部分的代码,那就是

// 声明行列
ch.assertQueue(q, {durable: true});

// 发送信息
ch.sendToQueue(q, new Buffer(JSON.stringify({taskName :name ,costTime :cost })),{persistent:true});

经由历程将行列的durable设置参数生命为true能够保证在RabbitMQ服务器退出或许异常停止的情况下不会丧失音讯行列,注重这里只是不会丧失音讯行列,并非音讯行列中没有被消耗的音讯不会丧失。

为了保证音讯行列中的音讯不会丧失,就须要在发送音讯时指定persistent选项,这里并不能百分之百的保证音讯不会丧失,由于从行列中有新的音讯,到将行列中音讯耐久化到磁盘这一段时候以内是没法保证的。

音讯的应对

如今存在如许一种场景,消耗者取到音讯,然后竖立使命最先实行。然则使命实行到一半就抛出异常,那末这个使命算是没有被胜利实行的。

在我们之前的代码完成中,都是音讯行列中有新的音讯,立时就这个音讯分派给消耗者消耗,不论消耗者对音讯处置惩罚效果怎样,音讯行列会立时将已分派的音讯从音讯行列中删除。如果这个使命异常重要,或许一定要实行胜利,那末一旦使命在实行历程当中抛出异常,那末这个使命就再也找不回来了,这是异常恐怖的事变。

还幸亏RabbitMQ中我们能够为已分派的音讯和音讯行列之间竖立一个应对关联:

  • 如果音讯处置惩罚胜利,那末就发送一个回复给音讯行列,通知它:我已胜利处置惩罚音讯,不再须要这条音讯了,你能够删除了,因而音讯行列就将已应对的音讯从音讯行列中删除。

  • 如果处置惩罚失利,也就是没有收到应对,那末就将这条音讯从新发送给该行列的其他消耗者。

要在消耗者和音讯行列之间竖立这类应对关联我们只须要将channelconsume函数的noAck参数设成false就能够了。

 ch.consume(q, function(msg) {
            var obj = JSON.parse(msg.content.toString('utf8'));
            console.log('Get the task '+obj.taskName);
            // 定义新的使命
            new Task(obj.taskName,obj.costTime);
        }, {noAck: false}); // 这里设置成false

下面我们就模仿一下音讯处置惩罚失利的场景:

var amqp = require('amqplib/callback_api');
var Task = require('./task.js');

amqp.connect('amqp://localhost', function(err, conn) {
    conn.createChannel(function(err, ch) {
        var q = 'tasks';

        ch.assertQueue(q, {durable: true});
        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
        // 监听行列上面的音讯
        ch.consume(q, function(msg) {
            var obj = JSON.parse(msg.content.toString('utf8'));
            console.log('Get the task '+obj.taskName);
            // 定义新的使命
            new Task(obj.taskName,obj.costTime,function(){
                if(obj.taskName === 'task2')
                    throw new Error("Test error");
                else
                    ch.ack(msg);
            }); // 如果是使命二,那末就抛出异常。
        }, {noAck: false});
    });
});

根据上面的剧本实行递次,我们在实行一遍剧本: consumer2获得实行task2音讯,然后立时抛出异常退出举行,然后音讯行列再将这个音讯分派给cosumer1,接着也实行失利了,退出历程,终究音讯行列中将只会有一个task2的音讯存在。

启动消耗者守候音讯

# shell1 开启消耗者1 
node rabbit-consumer.js

# shell2 开启消耗者2
node rabbit-consumer.js

竖立音讯

node rabbit-producer.js task1 0
node rabbit-producer.js task2 10
node rabbit-producer.js task3 0

我们能来看一下效果:

# shell2 消耗者2
[*] Waiting for messages in tasks. To exit press CTRL+C
Get the task task2
task2 finished # 消耗者2实行使命2的时刻抛出异常,task2将会从新发送给消耗者1
... throw  new Error('Error test');


# shell1 消耗者1
[*] Waiting for messages in tasks. To exit press CTRL+C
Get the task task1
task1 finished
Get the task task3
task3 finished
Get the task task2 # 消耗者1吸收到任何2
task2 finished
... throw  new Error('Error test'); # 也抛出异常了

终究会在音讯行列中剩下一条未消耗的信息。

这里有一点须要注重,如果你将noAck选项设置成了false,那末如果音讯处置惩罚胜利,一定要举行应对,担任音讯行列中的音讯会越来越多,直到撑爆内存。

越发平衡的负载

在上文中我们听到过音讯行列经由历程Round-robin算法来将音讯分派给消耗者,然则这个分派历程是自觉的。比方如今有两个消耗者,consumer1consumer2,根据Round-robin算法就会将奇数编号的使命发配给consumer1,将偶数编号的使命分派给consumer2,然则这些使命正好有一个特征,奇数编号的使命比较沉重,而偶数编号的使命就比较简朴。

那末这就会形成一个题目,那就是consumer1会被累死,而consumer2会被闲死。形成了负载不平衡。如果每个音讯都被胜利消耗今后通知音讯行列,然后音讯行列再将新的音讯分派给余暇下来的消耗者不就好了。

RabbitMQ中的确有如许的一个设置选项。那就是ch.prefetch(1);

我们如今就来模仿一下

var amqp = require('amqplib/callback_api');
var Task = require('./task.js');

amqp.connect('amqp://localhost', function(err, conn) {
    conn.createChannel(function(err, ch) {
        var q = 'tasks';

        ch.assertQueue(q, {durable: true});
        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
        // 监听行列上面的音讯
        ch.prefetch(1); // 增加这一行
        ch.consume(q, function(msg) {
            var obj = JSON.parse(msg.content.toString('utf8'));
            console.log('Get the task '+obj.taskName);
            new Task(obj.taskName,obj.costTime ,function () {
                ch.ack(msg);
            });
        }, {noAck: false});
    });
});

启动消耗者守候音讯

# shell1 开启消耗者1 
node rabbit-consumer.js

# shell2 开启消耗者2
node rabbit-consumer.js

竖立音讯

node rabbit-producer.js task1 0
node rabbit-producer.js task2 20
node rabbit-producer.js task3 0
node rabbit-producer.js task4 20
# shell1 开启消耗者1
[*] Waiting for messages in tasks. To exit press CTRL+C
Get the task task1 # 使命立时完毕
task1 finished
Get the task task3 # 使命立时完毕
task3 finished
Get the task task4 # 使命四被分派到consumer1中了 
task4 finished

# shell2 开启消耗者2
[*] Waiting for messages in tasks. To exit press CTRL+C
Get the task task2 
task2 finished
    原文作者:qeesung
    原文地址: https://segmentfault.com/a/1190000005646464
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞