Scrapy数据流转分析(三)

上一篇文章简单介绍了下Scrapy的启动,我们知道了scrapy.crawler.CrawlerProcess这个类是启动爬虫的幕后黑手。本文将深入到CrawlerProcess这个类中,分析Scrapy的调度逻辑。

class CrawlerProcess(CrawlerRunner):
    
    def __init__(self, settings=None, install_root_handler=True):
        super(CrawlerProcess, self).__init__(settings)
        install_shutdown_handlers(self._signal_shutdown)
        configure_logging(self.settings, install_root_handler)
        log_scrapy_info(self.settings)

    def _signal_shutdown(self, signum, _):
        install_shutdown_handlers(self._signal_kill)
        signame = signal_names[signum]
        logger.info("Received %(signame)s, shutting down gracefully. Send again to force ",
                    {'signame': signame})
        reactor.callFromThread(self._graceful_stop_reactor)

    def _signal_kill(self, signum, _):
        install_shutdown_handlers(signal.SIG_IGN)
        signame = signal_names[signum]
        logger.info('Received %(signame)s twice, forcing unclean shutdown',
                    {'signame': signame})
        reactor.callFromThread(self._stop_reactor)

    def start(self, stop_after_crawl=True):

        if stop_after_crawl:
            d = self.join()
            # Don't start the reactor if the deferreds are already fired
            if d.called:
                return
            d.addBoth(self._stop_reactor)

        reactor.installResolver(self._get_dns_resolver())
        tp = reactor.getThreadPool()
        tp.adjustPoolsize(maxthreads=self.settings.getint('REACTOR_THREADPOOL_MAXSIZE'))
        reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
        reactor.run(installSignalHandlers=False)  # blocking call

    def _get_dns_resolver(self):
        if self.settings.getbool('DNSCACHE_ENABLED'):
            cache_size = self.settings.getint('DNSCACHE_SIZE')
        else:
            cache_size = 0
        return CachingThreadedResolver(
            reactor=reactor,
            cache_size=cache_size,
            timeout=self.settings.getfloat('DNS_TIMEOUT')
        )

    def _graceful_stop_reactor(self):
        d = self.stop()
        d.addBoth(self._stop_reactor)
        return d

    def _stop_reactor(self, _=None):
        try:
            reactor.stop()
        except RuntimeError:  # raised if already stopped or in shutdown stage
            pass

来看一下这个类有些啥方法:

  1. __init__方法调用父类的初始化方法,并注册了爬虫停止的回调和日志组件,最后一行打印出了配置信息
  2. _signal_shutdown_graceful_stop_reactor接收一个信号,优雅的(等所有爬虫把手上事情搞完)停止爬虫。
  3. 另外一组函数_signal_kill_stop_reactor就是强制停止爬虫。
  4. _get_dns_resolver则是创建一个DNS缓存,提高请求速度的
  5. 剩下的一个start方法主要是初始化一个reactor事件管理器。

它主要是完成了爬虫的启动和收尾工作。那么它的调度逻辑应该是实现在父类中了,我们看一下它的父类scrapy.crawler.CrawlerRunner

class CrawlerRunner(object):
    """
    This is a convenient helper class that keeps track of, manages and runs
    crawlers inside an already setup Twisted `reactor`_.

    The CrawlerRunner object must be instantiated with a
    :class:`~scrapy.settings.Settings` object.

    This class shouldn't be needed (since Scrapy is responsible of using it
    accordingly) unless writing scripts that manually handle the crawling
    process. See :ref:`run-from-script` for an example.
    """
    crawlers = property(
        lambda self: self._crawlers,
        doc="Set of :class:`crawlers <scrapy.crawler.Crawler>` started by "
            ":meth:`crawl` and managed by this class."
    )

    def __init__(self, settings=None):
        if isinstance(settings, dict) or settings is None:
            settings = Settings(settings)
        self.settings = settings
        self.spider_loader = _get_spider_loader(settings)
        self._crawlers = set()
        self._active = set()

    @property
    def spiders(self):
        warnings.warn("CrawlerRunner.spiders attribute is renamed to "
                      "CrawlerRunner.spider_loader.",
                      category=ScrapyDeprecationWarning, stacklevel=2)
        return self.spider_loader

    def crawl(self, crawler_or_spidercls, *args, **kwargs):
        
        crawler = self.create_crawler(crawler_or_spidercls)
        return self._crawl(crawler, *args, **kwargs)

    def _crawl(self, crawler, *args, **kwargs):
        self.crawlers.add(crawler)
        d = crawler.crawl(*args, **kwargs)
        self._active.add(d)

        def _done(result):
            self.crawlers.discard(crawler)
            self._active.discard(d)
            return result

        return d.addBoth(_done)

    def create_crawler(self, crawler_or_spidercls):

        if isinstance(crawler_or_spidercls, Crawler):
            return crawler_or_spidercls
        return self._create_crawler(crawler_or_spidercls)

    def _create_crawler(self, spidercls):
        if isinstance(spidercls, six.string_types):
            spidercls = self.spider_loader.load(spidercls)
        return Crawler(spidercls, self.settings)

    def stop(self):
        """
        Stops simultaneously all the crawling jobs taking place.

        Returns a deferred that is fired when they all have ended.
        """
        return defer.DeferredList([c.stop() for c in list(self.crawlers)])

    @defer.inlineCallbacks
    def join(self):
        """
        join()

        Returns a deferred that is fired when all managed :attr:`crawlers` have
        completed their executions.
        """
        while self._active:
            yield defer.DeferredList(self._active)

作者写的注释:This is a convenient helper class that keeps track of, manages and runs crawlers inside an already setup Twisted `reactor',看来是找对地方了。。先看一下初始化方法:__init__,这方法主要是设置了下setting,配置一个spider_loader,还有两个set。这个spider_loader顾名思义是用来加载爬虫的,根据入参类型,将我们传入的spider名字加载成scrapy.crawler.Crawler类实例。_active集合是正在爬取的爬虫集合。
终于开始爬了,crawl(self, crawler_or_spidercls, *args, **kwargs)方法先是创建一个scrapy.crawler.Crawler实例,后面create_crawler_create_crawler方法都是为这一步服务的,然后再调用这个方法:

def _crawl(self, crawler, *args, **kwargs):
        self.crawlers.add(crawler)
        d = crawler.crawl(*args, **kwargs)
        self._active.add(d)

        def _done(result):
            self.crawlers.discard(crawler)
            self._active.discard(d)
            return result

        return d.addBoth(_done)

代码一部分是对前面两个set进行操作,更新爬虫状态。剩下是这一句d = crawler.crawl(*args, **kwargs),调用scrapy.crawler.Crawler的crawl方法,返回一个Deferred对象,又是一个入口,我们进去看下:

    @defer.inlineCallbacks
    def crawl(self, *args, **kwargs):
        assert not self.crawling, "Crawling already taking place"
        self.crawling = True

        try:
            self.spider = self._create_spider(*args, **kwargs)
            self.engine = self._create_engine()
            start_requests = iter(self.spider.start_requests())
            yield self.engine.open_spider(self.spider, start_requests)
            yield defer.maybeDeferred(self.engine.start)
        except Exception:
            # In Python 2 reraising an exception after yield discards
            # the original traceback (see https://bugs.python.org/issue7563),
            # so sys.exc_info() workaround is used.
            # This workaround also works in Python 3, but it is not needed,
            # and it is slower, so in Python 3 we use native `raise`.
            if six.PY2:
                exc_info = sys.exc_info()

            self.crawling = False
            if self.engine is not None:
                yield self.engine.close()

            if six.PY2:
                six.reraise(*exc_info)
            raise

终于看到架构概览上的组件名称了,从头开始分析:首先对爬虫状态做一个判断,没问题后修改状态,然后重头戏开始:首先是创建了spider和engine两个对象,然后将我们设置的start_url构造为requests作为参数传入engine.open_spider,接下来分析engine.open_spider函数:

    @defer.inlineCallbacks
    def open_spider(self, spider, start_requests=(), close_if_idle=True):
        assert self.has_capacity(), "No free spider slot when opening %r" % \
            spider.name
        logger.info("Spider opened", extra={'spider': spider})
        nextcall = CallLaterOnce(self._next_request, spider)
        scheduler = self.scheduler_cls.from_crawler(self.crawler)
        start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
        slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
        self.slot = slot
        self.spider = spider
        yield scheduler.open(spider)
        yield self.scraper.open_spider(spider)
        self.crawler.stats.open_spider(spider)
        yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
        slot.nextcall.schedule()
        slot.heartbeat.start(5)

第一步构建了CallLaterOnce对象,该对象十分重要,是实现循环生成Request的关键,我们看一下这个函数:

class CallLaterOnce(object):

    def __init__(self, func, *a, **kw):
        self._func = func
        self._a = a
        self._kw = kw
        self._call = None
    def schedule(self, delay=0):
        if self._call is None:
            self._call = reactor.callLater(delay, self)
    def cancel(self):
        if self._call:
            self._call.cancel()
    def __call__(self):
        self._call = None
        return self._func(*self._a, **self._kw)

初始化的时候会传入一个函数(我们这里传入_next_request),schedule函数向reactor注册一个延迟回调,回调函数是它本身,那么调用的时候就会执行call方法,该方法调用了传入的函数func(这里是_next_request)。那么这个CallLaterOnce这个函数,我们每次调用一下schedule,就会向reactor注册一个延时调用,也就是调用一次_next_request方法,这样就可以通过schedule实现循环执行_next_request方法。下面是_next_request方法:

def _next_request(self, spider):

    slot = self.slot
    if not slot:
        return
    if self.paused:
        return
    # 是否等待,取决于一下几个条件
    # 1. slot是否关闭
    # 2. Engine是否停止
    # 3. scraper中的response对象超过设值
    # 4. downloader中下载的请求数是否超过设值
    while not self._needs_backout(spider):
        # 第一次取request时,是没有的,第一次一定是执行start_url生成的请求
        # 以后取值会进到这里来
        if not self._next_request_from_scheduler(spider):
            break
    # 如果start_requests有数据且不需要等待
    if slot.start_requests and not self._needs_backout(spider):
        try:
            # 获取下一个种子请求
            request = next(slot.start_requests)
        except StopIteration:
            slot.start_requests = None
        except Exception:
            slot.start_requests = None
            logger.error('Error while obtaining start requests',
                         exc_info=True, extra={'spider': spider})
        else:
            # 这里并没有实际发起请求,只把request放到schedule队列中
            self.crawl(request, spider)
    if self.spider_is_idle(spider) and slot.close_if_idle:
        self._spider_idle(spider)

上面代码已经添加了一些注释,真正发起请求的代码在_next_request_from_scheduler方法中,下面看它的代码:

def _next_request_from_scheduler(self, spider):
    slot = self.slot
    # 获取下一个request
    request = slot.scheduler.next_request()
    if not request:
        return
    # 下载
    d = self._download(request, spider)
    # 注册回掉方法
    d.addBoth(self._handle_downloader_output, request, spider)
    d.addErrback(lambda f: logger.info('Error while handling downloader output',
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': spider}))
    d.addBoth(lambda _: slot.remove_request(request))
    d.addErrback(lambda f: logger.info('Error while removing request from slot',
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': spider}))
    d.addBoth(lambda _: slot.nextcall.schedule())
    d.addErrback(lambda f: logger.info('Error while scheduling new request',
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': spider}))
    return d

这的代码很清晰,我们直接看_download方法:

def _download(self, request, spider):
    slot = self.slot
    slot.add_request(request)
    def _on_success(response):
        # 下载成功的回调方法
        assert isinstance(response, (Response, Request))
        if isinstance(response, Response):
            # 如果下载后结果为Response,返回Response
            response.request = request
            logkws = self.logformatter.crawled(request, response, spider)
            logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
            self.signals.send_catch_log(signal=signals.response_received, \
                response=response, request=request, spider=spider)
        return response
    def _on_complete(_):
        # 下载完成的回调,不管如何进行下一次调度
        slot.nextcall.schedule()
        return _
    # 调用Downloader下载
    dwld = self.downloader.fetch(request, spider)
    # 注册回调函数
    dwld.addCallbacks(_on_success)
    dwld.addBoth(_on_complete)
    return dwld

是对downloader的一些包装,可以看我写的注释,这里不多解释,直接看downloader.fetch函数:

def fetch(self, request, spider):
    def _deactivate(response):
        self.active.remove(request)
        return response
    self.active.add(request)
    # 调用downloader middleaware
    dfd = self.middleware.download(self._enqueue_request, request, spider)
    # 注册结束回调
    return dfd.addBoth(_deactivate)

倒数第三行调用了downloader middleaware的download方法:

def download(self, download_func, request, spider):
    @defer.inlineCallbacks
    # 这里是对所有注册的下载器中间件的process_request方法做处理
    def process_request(request):
        # 依次执行下载器中间件中的process_request函数,对返回值做一个判断,如果返回为None,继续其他函数,否则直接返回
        for method in self.methods['process_request']:
            response = yield method(request=request, spider=spider)
            assert response is None or isinstance(response, (Response, Request)), \
                    'Middleware %s.process_request must return None, Response or Request, got %s' % \
                    (six.get_method_self(method).__class__.__name__, response.__class__.__name__)
            if response:
                defer.returnValue(response)
        # 如果没有返回值,执行download handler的下载方法(直接去下载)
        defer.returnValue((yield download_func(request=request,spider=spider)))
    # 这里是对所有注册的下载器中间件的process_response方法做处理(实现逻辑跟上面的差不多)
    @defer.inlineCallbacks
    def process_response(response):
        assert response is not None, 'Received None in process_response'
        if isinstance(response, Request):
            defer.returnValue(response)
        # 如果下载器中间件有定义process_response,则依次执行
        for method in self.methods['process_response']:
            response = yield method(request=request, response=response,
                                    spider=spider)
            assert isinstance(response, (Response, Request)), \
                'Middleware %s.process_response must return Response or Request, got %s' % \
                (six.get_method_self(method).__class__.__name__, type(response))
            if isinstance(response, Request):
                defer.returnValue(response)
        defer.returnValue(response)
    # 这里是对所有注册的下载器中间件的process_exception方法做处理
    @defer.inlineCallbacks
    def process_exception(_failure):
        exception = _failure.value
        # 如果下载器中间件有定义process_exception,则依次执行
        for method in self.methods['process_exception']:
            response = yield method(request=request, exception=exception,
                                    spider=spider)
            assert response is None or isinstance(response, (Response, Request)), \
                'Middleware %s.process_exception must return None, Response or Request, got %s' % \
                (six.get_method_self(method).__class__.__name__, type(response))
            if response:
                defer.returnValue(response)
        defer.returnValue(_failure)
    # 注册执行、错误、回调方法
    deferred = mustbe_deferred(process_request, request)
    deferred.addErrback(process_exception)
    deferred.addCallback(process_response)
    return deferred

代码很长,但逻辑比较清楚,请看注释。这里主要处理下载器中间件中声明的一系列方法,假如这一切都顺利(所有中间件process_request方法返回None)通过中间件后应该去下载器下载页面了,代码可以执行到defer.returnValue((yield download_func(request=request,spider=spider)))这里,我们之前传入的download_func为self._enqueue_request,那么看一下这个函数吧

    def _enqueue_request(self, request, spider):
        key, slot = self._get_slot(request, spider)
        request.meta['download_slot'] = key

        # 处理完成的回调函数
        def _deactivate(response):
            slot.active.remove(request)
            return response

        slot.active.add(request)
        deferred = defer.Deferred().addBoth(_deactivate)
        slot.queue.append((request, deferred))
        self._process_queue(spider, slot)
        return deferred

按照我们的设想,这里应该是下载页面的部分 ,那么这个函数就是下载请求队列了,每一个通过一系列下载器中间件的请求,都被入队到这里,并且被_process_queue这个函数拿去执行,来看下它:

    def _process_queue(self, spider, slot):
        if slot.latercall and slot.latercall.active():
            return

        # 如果设置了download_delay,就延迟处理队列
        now = time()
        delay = slot.download_delay()
        if delay:
            penalty = delay - now + slot.lastseen
            if penalty > 0:
                slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
                return

        # Process enqueued requests if there are free slots to transfer for this slot
        # 下载请求
        while slot.queue and slot.free_transfer_slots() > 0:
            slot.lastseen = now
            request, deferred = slot.queue.popleft()
            dfd = self._download(slot, request, spider)
            dfd.chainDeferred(deferred)
            # prevent burst if inter-request delays were configured
            if delay:
                self._process_queue(spider, slot)
                break

在下载请求前做了download_delay的处理,通过_download函数下载:

    def _download(self, slot, request, spider):
        # The order is very important for the following deferreds. Do not change!

        # 1. Create the download deferred
        dfd = mustbe_deferred(self.handlers.download_request, request, spider)

        # 2. Notify response_downloaded listeners about the recent download
        # before querying queue for next request
        def _downloaded(response):
            self.signals.send_catch_log(signal=signals.response_downloaded,
                                        response=response,
                                        request=request,
                                        spider=spider)
            return response
        dfd.addCallback(_downloaded)

        # 3. After response arrives,  remove the request from transferring
        # state to free up the transferring slot so it can be used by the
        # following requests (perhaps those which came from the downloader
        # middleware itself)
        slot.transferring.add(request)

        def finish_transferring(_):
            slot.transferring.remove(request)
            self._process_queue(spider, slot)
            return _

        return dfd.addBoth(finish_transferring)

终于是调用到了download handler,这里给它添加了一些回调,完成下载后去下载队列中另外的请求。
一个请求下载完成后,是如何交给Item Pipeline处理的呢?response经过reactor事件管理中心的层层回调,回到了这里:

    def _next_request_from_scheduler(self, spider):
        slot = self.slot
        request = slot.scheduler.next_request()
        if not request:
            return
        d = self._download(request, spider)
        # 添加一个回调 _handle_downloader_output 来处理downloader的输出
        d.addBoth(self._handle_downloader_output, request, spider)
        d.addErrback(lambda f: logger.info('Error while handling downloader output',
                                           exc_info=failure_to_exc_info(f),
                                           extra={'spider': spider}))
        d.addBoth(lambda _: slot.remove_request(request))
        d.addErrback(lambda f: logger.info('Error while removing request from slot',
                                           exc_info=failure_to_exc_info(f),
                                           extra={'spider': spider}))
        d.addBoth(lambda _: slot.nextcall.schedule())
        d.addErrback(lambda f: logger.info('Error while scheduling new request',
                                           exc_info=failure_to_exc_info(f),
                                           extra={'spider': spider}))
        return d

看到注释的部分了吗?这里添加了一个回调来处理response,在这里面完成了Item Pipelines的process_item方法:

    def _handle_downloader_output(self, response, request, spider):
        assert isinstance(response, (Request, Response, Failure)), response
        # 如果downloader middleware返回的是一个request对象,就把这个request重新调度
        if isinstance(response, Request):
            self.crawl(response, spider)
            return
        # response is a Response or Failure
        d = self.scraper.enqueue_scrape(response, request, spider)
        d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
                                            exc_info=failure_to_exc_info(f),
                                            extra={'spider': spider}))
        return d

这里对返回的response做了下判断,见注释。如果是真的Response对象,就把他交给scraper来处理,下面是一系列scrape相关的函数:

    def enqueue_scrape(self, response, request, spider):
        slot = self.slot
        dfd = slot.add_response_request(response, request)
        def finish_scraping(_):
            slot.finish_response(response, request)
            self._check_if_closing(spider, slot)
            self._scrape_next(spider, slot)
            return _
        dfd.addBoth(finish_scraping)
        dfd.addErrback(
            lambda f: logger.error('Scraper bug processing %(request)s',
                                   {'request': request},
                                   exc_info=failure_to_exc_info(f),
                                   extra={'spider': spider}))
        self._scrape_next(spider, slot)
        return dfd

    def _scrape_next(self, spider, slot):
        while slot.queue:
            response, request, deferred = slot.next_response_request_deferred()
            self._scrape(response, request, spider).chainDeferred(deferred)

    def _scrape(self, response, request, spider):
        """Handle the downloaded response or failure through the spider
        callback/errback"""
        assert isinstance(response, (Response, Failure))

        dfd = self._scrape2(response, request, spider) # returns spiders processed output
        dfd.addErrback(self.handle_spider_error, request, response, spider)
        dfd.addCallback(self.handle_spider_output, request, response, spider)
        return dfd

    def _scrape2(self, request_result, request, spider):
        """Handle the different cases of request's result been a Response or a
        Failure"""
        if not isinstance(request_result, Failure):
            return self.spidermw.scrape_response(
                self.call_spider, request_result, request, spider)
        else:
            # FIXME: don't ignore errors in spider middleware
            dfd = self.call_spider(request_result, request, spider)
            return dfd.addErrback(
                self._log_download_errors, request_result, request, spider)

    def handle_spider_output(self, result, request, response, spider):
        if not result:
            return defer_succeed(None)
        it = iter_errback(result, self.handle_spider_error, request, response, spider)
        dfd = parallel(it, self.concurrent_items,
            self._process_spidermw_output, request, response, spider)
        return dfd

    def _process_spidermw_output(self, output, request, response, spider):
        """Process each Request/Item (given in the output parameter) returned
        from the given spider
        """
        if isinstance(output, Request):
            self.crawler.engine.crawl(request=output, spider=spider)
        elif isinstance(output, (BaseItem, dict)):
            self.slot.itemproc_size += 1
            # 重点在这里,调用process_item方法
            dfd = self.itemproc.process_item(output, spider)
            dfd.addBoth(self._itemproc_finished, output, response, spider)
            return dfd
        elif output is None:
            pass
        else:
            typename = type(output).__name__
            logger.error('Spider must return Request, BaseItem, dict or None, '
                         'got %(typename)r in %(request)s',
                         {'request': request, 'typename': typename},
                         extra={'spider': spider})

跟下载器中间件一个套路,只不过这里用的是迭代器,就不解释了,各位看注释就很明了了。还有一个重要的函数是call_spider,这个函数的作用就是跟爬虫模块交互,它可以帮我们调用Spider.parse函数(默认),也可以调用request.callback。
至此,scrapy的一个请求的生命周期已经走完了,被schedule从数据库/硬盘/内存/redis加载,交给downloader下载,完成后交给spider和item pipeline,中间由engine统揽全局。当然scrapy远不止这么简单,本文仅仅介绍了最简单的数据流转,后续还会介绍一些更高级的功能。

    原文作者:Len_8030
    原文地址: https://www.jianshu.com/p/e34f9a7d484b
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞