puppeteer_node爬虫分布式进阶

前面的文章将puppeteer做爬虫的基本一直到布置都梳理了一遍,如今来看一下分布式的处置惩罚

1) 为何须要分布式

   1. 须要抓取的差别数据有许多,会同时开启无头浏览器去抓取,然后获取到数据后又无厘头的一股脑挤进数据库
   2. 没法保证统一时候须要的数据只要一个操纵在举行

2) 分布式挑选

由于运用的是node,所以尽量的寻觅node支撑的分布式框架

ZooKeeper 和 RabbitMQ 的头脑百度上有很多申明,读者能够自行搜刮作更细致的相识

node版的
zookeeper

node版的
RabbitMQ

3) 衔接之前的 puppeteer进阶版_爬取书旗小说 文章内容,文章只是放了一些重要的代码,末端会附上项目地点,人人能够去撸一撸

发布者,给书旗起一个标识为 37 (channel_id),然后是要抓取书的书本id(channel_book_id)

// 我们以接口的情势吸收爬取的参数    
// 简易版要求(除了吸收参数不做任何处置惩罚) -> 发布者
app.get('/v1.0/grasp_book', (req, res, next) => {
  // 抓取时须要的参数
  if (!req.query.channel_id && !req.query.channel_book_id) {
    res.send({
      code: 403,
      msg: 'params error'
    })
    return null;
  }
  // 发布者
  // 衔接rabbitmq
  amqp.connect('amqp://rabbitmq:12345678@127.0.0.1:5672/').then(function(conn) {
    return conn.createChannel().then(function(ch) {
      // 竖立 hello 的音讯行列
      var q = 'hello';
      // 剖析为json字符串花样作为通报的数据花样
      var msg = JSON.stringify({
        "channel_id": req.query.channel_id,
        "book_id": req.query.book_id
      })
      // 衔接并坚持
      var ok = ch.assertQueue(q, {durable: true});
      return ok.then(function(_qok) {
        // 发送数据到消费者
        ch.sendToQueue(q, Buffer.from(msg));
        console.log(" [x] Sent '%s'", msg);
        return ch.close();
      });
    }).finally(function() { conn.close(); });
  }).catch(console.warn);
  res.send({
    code: 200,
    msg: 'success'
  });
}); 

命令行启动app.js,发出要求并检察效果,{} 内里的就是我们发送出去的数据

《puppeteer_node爬虫分布式进阶》
《puppeteer_node爬虫分布式进阶》

消费者以及ZooKeeper

消费者,吸收到发布者通报过来的数据竖立音讯行列,然后用Zookeeper竖立暂时节点以坚持行列顺次实行

var zookeeper = require('node-zookeeper-client');

// 依据标识动态引入js文件
function moduleCustomize(channel_id) {
    return require(`../${channel_id}.js`)
}

 
var client = zookeeper.createClient('127.0.0.1:2181');

async function sleep(second) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve('sleep')
    }, second)
  })
}

// 衔接_zookeeper
client.once('connected', function () {
  console.log('Connected to the server.');
  // 竖立衔接 rabbitmp
  amqp.connect('amqp://rabbitmq:12345678@127.0.0.1:5672/').then(function(conn) {
    return conn.createChannel().then(function(ch) {
      // 与名为 hello(由发布者竖立) 的音讯行列竖立衔接
      var ok = ch.assertQueue('hello', {durable: true});
      // 预存为1
      ok = ok.then(function() { ch.prefetch(1); });
      ok = ok.then(function() {
        // doWork 回调函数 -> 实行吸收到数据后的操纵
        ch.consume('hello', doWork, {noAck: false});
      });
      return ok;
      // rabbitmq 处置惩罚
      function doWork(msg) {
          // 吸收到数据
          var body = msg.content.toString();
          console.log("[x] Received '%s'", body);
          let _body = JSON.parse(body)
          let channel_book_id = _body['channel_book_id'];
          let channel_id = _body['channel_id'];
          // zookeeper 节点
          let path = `/${channel_id + "_" + channel_book_id}`
          // 衔接_zookeeper 推断是不是存在
          client.exists(path, function (error, stat) {
              if (error) {
                  console.log(error.stack);
                  return;
              }
              if (stat) {
                console.log('Node exists.');
                // 存在则不实行,但须要将数据通报下去
                // ack 能够参考 https://www.jianshu.com/p/a5f7fce67803
                ch.ack(msg);
              } else {
                  console.log('Node does not exist.');
                  // 操纵完成后则开释当前竖立的暂时节点
                  client.create(path, null, zookeeper.CreateMode.EPHEMERAL, function (error) {
                    if (error) {
                        console.log('Failed to create node: %s due to: %s.', path, error);
                    } else {
                        console.log('Node: %s is successfully created.', path);
                          
                        // 依据传入标识(如书旗就是37)动态引入js文件(抓书的操纵)
                        moduleCustomize(channel_id).init(channel_book_id)

                        // 通报数据
                        ch.ack(msg);
                        // 开释当前竖立的暂时节点
                        client.remove(path, -1, function (error) {
                            if (error) {
                                console.log(error.stack);
                                return;
                            }
                            console.log('Node is deleted.');
                        });

                    }
                  });
                }
              console.log('使命实行终了')
            });
      }
    });
  }).catch(console.warn);


});
 
client.connect();

命令行启动rab_consumer.js,{} 内里就是我们吸收到音讯行列内里的数据了

《puppeteer_node爬虫分布式进阶》

4) 检察mongo效果数据

《puppeteer_node爬虫分布式进阶》

5) 项目地点(update分支)

    原文作者:Taste
    原文地址: https://segmentfault.com/a/1190000013611070
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞