上一篇文章简单介绍了下Scrapy的启动,我们知道了scrapy.crawler.CrawlerProcess这个类是启动爬虫的幕后黑手。本文将深入到CrawlerProcess这个类中,分析Scrapy的调度逻辑。
class CrawlerProcess(CrawlerRunner):
def __init__(self, settings=None, install_root_handler=True):
super(CrawlerProcess, self).__init__(settings)
install_shutdown_handlers(self._signal_shutdown)
configure_logging(self.settings, install_root_handler)
log_scrapy_info(self.settings)
def _signal_shutdown(self, signum, _):
install_shutdown_handlers(self._signal_kill)
signame = signal_names[signum]
logger.info("Received %(signame)s, shutting down gracefully. Send again to force ",
{'signame': signame})
reactor.callFromThread(self._graceful_stop_reactor)
def _signal_kill(self, signum, _):
install_shutdown_handlers(signal.SIG_IGN)
signame = signal_names[signum]
logger.info('Received %(signame)s twice, forcing unclean shutdown',
{'signame': signame})
reactor.callFromThread(self._stop_reactor)
def start(self, stop_after_crawl=True):
if stop_after_crawl:
d = self.join()
# Don't start the reactor if the deferreds are already fired
if d.called:
return
d.addBoth(self._stop_reactor)
reactor.installResolver(self._get_dns_resolver())
tp = reactor.getThreadPool()
tp.adjustPoolsize(maxthreads=self.settings.getint('REACTOR_THREADPOOL_MAXSIZE'))
reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
reactor.run(installSignalHandlers=False) # blocking call
def _get_dns_resolver(self):
if self.settings.getbool('DNSCACHE_ENABLED'):
cache_size = self.settings.getint('DNSCACHE_SIZE')
else:
cache_size = 0
return CachingThreadedResolver(
reactor=reactor,
cache_size=cache_size,
timeout=self.settings.getfloat('DNS_TIMEOUT')
)
def _graceful_stop_reactor(self):
d = self.stop()
d.addBoth(self._stop_reactor)
return d
def _stop_reactor(self, _=None):
try:
reactor.stop()
except RuntimeError: # raised if already stopped or in shutdown stage
pass
来看一下这个类有些啥方法:
-
__init__
方法调用父类的初始化方法,并注册了爬虫停止的回调和日志组件,最后一行打印出了配置信息 -
_signal_shutdown
和_graceful_stop_reactor
接收一个信号,优雅的(等所有爬虫把手上事情搞完)停止爬虫。 - 另外一组函数
_signal_kill
和_stop_reactor
就是强制停止爬虫。 -
_get_dns_resolver
则是创建一个DNS缓存,提高请求速度的 - 剩下的一个
start
方法主要是初始化一个reactor事件管理器。
它主要是完成了爬虫的启动和收尾工作。那么它的调度逻辑应该是实现在父类中了,我们看一下它的父类scrapy.crawler.CrawlerRunner
class CrawlerRunner(object):
"""
This is a convenient helper class that keeps track of, manages and runs
crawlers inside an already setup Twisted `reactor`_.
The CrawlerRunner object must be instantiated with a
:class:`~scrapy.settings.Settings` object.
This class shouldn't be needed (since Scrapy is responsible of using it
accordingly) unless writing scripts that manually handle the crawling
process. See :ref:`run-from-script` for an example.
"""
crawlers = property(
lambda self: self._crawlers,
doc="Set of :class:`crawlers <scrapy.crawler.Crawler>` started by "
":meth:`crawl` and managed by this class."
)
def __init__(self, settings=None):
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
self.settings = settings
self.spider_loader = _get_spider_loader(settings)
self._crawlers = set()
self._active = set()
@property
def spiders(self):
warnings.warn("CrawlerRunner.spiders attribute is renamed to "
"CrawlerRunner.spider_loader.",
category=ScrapyDeprecationWarning, stacklevel=2)
return self.spider_loader
def crawl(self, crawler_or_spidercls, *args, **kwargs):
crawler = self.create_crawler(crawler_or_spidercls)
return self._crawl(crawler, *args, **kwargs)
def _crawl(self, crawler, *args, **kwargs):
self.crawlers.add(crawler)
d = crawler.crawl(*args, **kwargs)
self._active.add(d)
def _done(result):
self.crawlers.discard(crawler)
self._active.discard(d)
return result
return d.addBoth(_done)
def create_crawler(self, crawler_or_spidercls):
if isinstance(crawler_or_spidercls, Crawler):
return crawler_or_spidercls
return self._create_crawler(crawler_or_spidercls)
def _create_crawler(self, spidercls):
if isinstance(spidercls, six.string_types):
spidercls = self.spider_loader.load(spidercls)
return Crawler(spidercls, self.settings)
def stop(self):
"""
Stops simultaneously all the crawling jobs taking place.
Returns a deferred that is fired when they all have ended.
"""
return defer.DeferredList([c.stop() for c in list(self.crawlers)])
@defer.inlineCallbacks
def join(self):
"""
join()
Returns a deferred that is fired when all managed :attr:`crawlers` have
completed their executions.
"""
while self._active:
yield defer.DeferredList(self._active)
作者写的注释:This is a convenient helper class that keeps track of, manages and runs crawlers inside an already setup Twisted `reactor'
,看来是找对地方了。。先看一下初始化方法:__init__
,这方法主要是设置了下setting,配置一个spider_loader,还有两个set。这个spider_loader顾名思义是用来加载爬虫的,根据入参类型,将我们传入的spider名字加载成scrapy.crawler.Crawler类实例。_active集合是正在爬取的爬虫集合。
终于开始爬了,crawl(self, crawler_or_spidercls, *args, **kwargs)
方法先是创建一个scrapy.crawler.Crawler实例,后面create_crawler
和_create_crawler
方法都是为这一步服务的,然后再调用这个方法:
def _crawl(self, crawler, *args, **kwargs):
self.crawlers.add(crawler)
d = crawler.crawl(*args, **kwargs)
self._active.add(d)
def _done(result):
self.crawlers.discard(crawler)
self._active.discard(d)
return result
return d.addBoth(_done)
代码一部分是对前面两个set进行操作,更新爬虫状态。剩下是这一句d = crawler.crawl(*args, **kwargs)
,调用scrapy.crawler.Crawler的crawl方法,返回一个Deferred对象,又是一个入口,我们进去看下:
@defer.inlineCallbacks
def crawl(self, *args, **kwargs):
assert not self.crawling, "Crawling already taking place"
self.crawling = True
try:
self.spider = self._create_spider(*args, **kwargs)
self.engine = self._create_engine()
start_requests = iter(self.spider.start_requests())
yield self.engine.open_spider(self.spider, start_requests)
yield defer.maybeDeferred(self.engine.start)
except Exception:
# In Python 2 reraising an exception after yield discards
# the original traceback (see https://bugs.python.org/issue7563),
# so sys.exc_info() workaround is used.
# This workaround also works in Python 3, but it is not needed,
# and it is slower, so in Python 3 we use native `raise`.
if six.PY2:
exc_info = sys.exc_info()
self.crawling = False
if self.engine is not None:
yield self.engine.close()
if six.PY2:
six.reraise(*exc_info)
raise
终于看到架构概览上的组件名称了,从头开始分析:首先对爬虫状态做一个判断,没问题后修改状态,然后重头戏开始:首先是创建了spider和engine两个对象,然后将我们设置的start_url构造为requests作为参数传入engine.open_spider,接下来分析engine.open_spider函数:
@defer.inlineCallbacks
def open_spider(self, spider, start_requests=(), close_if_idle=True):
assert self.has_capacity(), "No free spider slot when opening %r" % \
spider.name
logger.info("Spider opened", extra={'spider': spider})
nextcall = CallLaterOnce(self._next_request, spider)
scheduler = self.scheduler_cls.from_crawler(self.crawler)
start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
self.slot = slot
self.spider = spider
yield scheduler.open(spider)
yield self.scraper.open_spider(spider)
self.crawler.stats.open_spider(spider)
yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
slot.nextcall.schedule()
slot.heartbeat.start(5)
第一步构建了CallLaterOnce
对象,该对象十分重要,是实现循环生成Request的关键,我们看一下这个函数:
class CallLaterOnce(object):
def __init__(self, func, *a, **kw):
self._func = func
self._a = a
self._kw = kw
self._call = None
def schedule(self, delay=0):
if self._call is None:
self._call = reactor.callLater(delay, self)
def cancel(self):
if self._call:
self._call.cancel()
def __call__(self):
self._call = None
return self._func(*self._a, **self._kw)
初始化的时候会传入一个函数(我们这里传入_next_request),schedule函数向reactor注册一个延迟回调,回调函数是它本身,那么调用的时候就会执行call方法,该方法调用了传入的函数func(这里是_next_request)。那么这个CallLaterOnce这个函数,我们每次调用一下schedule,就会向reactor注册一个延时调用,也就是调用一次_next_request方法,这样就可以通过schedule实现循环执行_next_request方法。下面是_next_request方法:
def _next_request(self, spider):
slot = self.slot
if not slot:
return
if self.paused:
return
# 是否等待,取决于一下几个条件
# 1. slot是否关闭
# 2. Engine是否停止
# 3. scraper中的response对象超过设值
# 4. downloader中下载的请求数是否超过设值
while not self._needs_backout(spider):
# 第一次取request时,是没有的,第一次一定是执行start_url生成的请求
# 以后取值会进到这里来
if not self._next_request_from_scheduler(spider):
break
# 如果start_requests有数据且不需要等待
if slot.start_requests and not self._needs_backout(spider):
try:
# 获取下一个种子请求
request = next(slot.start_requests)
except StopIteration:
slot.start_requests = None
except Exception:
slot.start_requests = None
logger.error('Error while obtaining start requests',
exc_info=True, extra={'spider': spider})
else:
# 这里并没有实际发起请求,只把request放到schedule队列中
self.crawl(request, spider)
if self.spider_is_idle(spider) and slot.close_if_idle:
self._spider_idle(spider)
上面代码已经添加了一些注释,真正发起请求的代码在_next_request_from_scheduler
方法中,下面看它的代码:
def _next_request_from_scheduler(self, spider):
slot = self.slot
# 获取下一个request
request = slot.scheduler.next_request()
if not request:
return
# 下载
d = self._download(request, spider)
# 注册回掉方法
d.addBoth(self._handle_downloader_output, request, spider)
d.addErrback(lambda f: logger.info('Error while handling downloader output',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
d.addBoth(lambda _: slot.remove_request(request))
d.addErrback(lambda f: logger.info('Error while removing request from slot',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
d.addBoth(lambda _: slot.nextcall.schedule())
d.addErrback(lambda f: logger.info('Error while scheduling new request',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
return d
这的代码很清晰,我们直接看_download方法:
def _download(self, request, spider):
slot = self.slot
slot.add_request(request)
def _on_success(response):
# 下载成功的回调方法
assert isinstance(response, (Response, Request))
if isinstance(response, Response):
# 如果下载后结果为Response,返回Response
response.request = request
logkws = self.logformatter.crawled(request, response, spider)
logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
self.signals.send_catch_log(signal=signals.response_received, \
response=response, request=request, spider=spider)
return response
def _on_complete(_):
# 下载完成的回调,不管如何进行下一次调度
slot.nextcall.schedule()
return _
# 调用Downloader下载
dwld = self.downloader.fetch(request, spider)
# 注册回调函数
dwld.addCallbacks(_on_success)
dwld.addBoth(_on_complete)
return dwld
是对downloader的一些包装,可以看我写的注释,这里不多解释,直接看downloader.fetch函数:
def fetch(self, request, spider):
def _deactivate(response):
self.active.remove(request)
return response
self.active.add(request)
# 调用downloader middleaware
dfd = self.middleware.download(self._enqueue_request, request, spider)
# 注册结束回调
return dfd.addBoth(_deactivate)
倒数第三行调用了downloader middleaware的download方法:
def download(self, download_func, request, spider):
@defer.inlineCallbacks
# 这里是对所有注册的下载器中间件的process_request方法做处理
def process_request(request):
# 依次执行下载器中间件中的process_request函数,对返回值做一个判断,如果返回为None,继续其他函数,否则直接返回
for method in self.methods['process_request']:
response = yield method(request=request, spider=spider)
assert response is None or isinstance(response, (Response, Request)), \
'Middleware %s.process_request must return None, Response or Request, got %s' % \
(six.get_method_self(method).__class__.__name__, response.__class__.__name__)
if response:
defer.returnValue(response)
# 如果没有返回值,执行download handler的下载方法(直接去下载)
defer.returnValue((yield download_func(request=request,spider=spider)))
# 这里是对所有注册的下载器中间件的process_response方法做处理(实现逻辑跟上面的差不多)
@defer.inlineCallbacks
def process_response(response):
assert response is not None, 'Received None in process_response'
if isinstance(response, Request):
defer.returnValue(response)
# 如果下载器中间件有定义process_response,则依次执行
for method in self.methods['process_response']:
response = yield method(request=request, response=response,
spider=spider)
assert isinstance(response, (Response, Request)), \
'Middleware %s.process_response must return Response or Request, got %s' % \
(six.get_method_self(method).__class__.__name__, type(response))
if isinstance(response, Request):
defer.returnValue(response)
defer.returnValue(response)
# 这里是对所有注册的下载器中间件的process_exception方法做处理
@defer.inlineCallbacks
def process_exception(_failure):
exception = _failure.value
# 如果下载器中间件有定义process_exception,则依次执行
for method in self.methods['process_exception']:
response = yield method(request=request, exception=exception,
spider=spider)
assert response is None or isinstance(response, (Response, Request)), \
'Middleware %s.process_exception must return None, Response or Request, got %s' % \
(six.get_method_self(method).__class__.__name__, type(response))
if response:
defer.returnValue(response)
defer.returnValue(_failure)
# 注册执行、错误、回调方法
deferred = mustbe_deferred(process_request, request)
deferred.addErrback(process_exception)
deferred.addCallback(process_response)
return deferred
代码很长,但逻辑比较清楚,请看注释。这里主要处理下载器中间件中声明的一系列方法,假如这一切都顺利(所有中间件process_request方法返回None)通过中间件后应该去下载器下载页面了,代码可以执行到defer.returnValue((yield download_func(request=request,spider=spider)))
这里,我们之前传入的download_func为self._enqueue_request,那么看一下这个函数吧
def _enqueue_request(self, request, spider):
key, slot = self._get_slot(request, spider)
request.meta['download_slot'] = key
# 处理完成的回调函数
def _deactivate(response):
slot.active.remove(request)
return response
slot.active.add(request)
deferred = defer.Deferred().addBoth(_deactivate)
slot.queue.append((request, deferred))
self._process_queue(spider, slot)
return deferred
按照我们的设想,这里应该是下载页面的部分 ,那么这个函数就是下载请求队列了,每一个通过一系列下载器中间件的请求,都被入队到这里,并且被_process_queue这个函数拿去执行,来看下它:
def _process_queue(self, spider, slot):
if slot.latercall and slot.latercall.active():
return
# 如果设置了download_delay,就延迟处理队列
now = time()
delay = slot.download_delay()
if delay:
penalty = delay - now + slot.lastseen
if penalty > 0:
slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
return
# Process enqueued requests if there are free slots to transfer for this slot
# 下载请求
while slot.queue and slot.free_transfer_slots() > 0:
slot.lastseen = now
request, deferred = slot.queue.popleft()
dfd = self._download(slot, request, spider)
dfd.chainDeferred(deferred)
# prevent burst if inter-request delays were configured
if delay:
self._process_queue(spider, slot)
break
在下载请求前做了download_delay的处理,通过_download函数下载:
def _download(self, slot, request, spider):
# The order is very important for the following deferreds. Do not change!
# 1. Create the download deferred
dfd = mustbe_deferred(self.handlers.download_request, request, spider)
# 2. Notify response_downloaded listeners about the recent download
# before querying queue for next request
def _downloaded(response):
self.signals.send_catch_log(signal=signals.response_downloaded,
response=response,
request=request,
spider=spider)
return response
dfd.addCallback(_downloaded)
# 3. After response arrives, remove the request from transferring
# state to free up the transferring slot so it can be used by the
# following requests (perhaps those which came from the downloader
# middleware itself)
slot.transferring.add(request)
def finish_transferring(_):
slot.transferring.remove(request)
self._process_queue(spider, slot)
return _
return dfd.addBoth(finish_transferring)
终于是调用到了download handler,这里给它添加了一些回调,完成下载后去下载队列中另外的请求。
一个请求下载完成后,是如何交给Item Pipeline处理的呢?response经过reactor事件管理中心的层层回调,回到了这里:
def _next_request_from_scheduler(self, spider):
slot = self.slot
request = slot.scheduler.next_request()
if not request:
return
d = self._download(request, spider)
# 添加一个回调 _handle_downloader_output 来处理downloader的输出
d.addBoth(self._handle_downloader_output, request, spider)
d.addErrback(lambda f: logger.info('Error while handling downloader output',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
d.addBoth(lambda _: slot.remove_request(request))
d.addErrback(lambda f: logger.info('Error while removing request from slot',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
d.addBoth(lambda _: slot.nextcall.schedule())
d.addErrback(lambda f: logger.info('Error while scheduling new request',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
return d
看到注释的部分了吗?这里添加了一个回调来处理response,在这里面完成了Item Pipelines的process_item方法:
def _handle_downloader_output(self, response, request, spider):
assert isinstance(response, (Request, Response, Failure)), response
# 如果downloader middleware返回的是一个request对象,就把这个request重新调度
if isinstance(response, Request):
self.crawl(response, spider)
return
# response is a Response or Failure
d = self.scraper.enqueue_scrape(response, request, spider)
d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
return d
这里对返回的response做了下判断,见注释。如果是真的Response对象,就把他交给scraper来处理,下面是一系列scrape相关的函数:
def enqueue_scrape(self, response, request, spider):
slot = self.slot
dfd = slot.add_response_request(response, request)
def finish_scraping(_):
slot.finish_response(response, request)
self._check_if_closing(spider, slot)
self._scrape_next(spider, slot)
return _
dfd.addBoth(finish_scraping)
dfd.addErrback(
lambda f: logger.error('Scraper bug processing %(request)s',
{'request': request},
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
self._scrape_next(spider, slot)
return dfd
def _scrape_next(self, spider, slot):
while slot.queue:
response, request, deferred = slot.next_response_request_deferred()
self._scrape(response, request, spider).chainDeferred(deferred)
def _scrape(self, response, request, spider):
"""Handle the downloaded response or failure through the spider
callback/errback"""
assert isinstance(response, (Response, Failure))
dfd = self._scrape2(response, request, spider) # returns spiders processed output
dfd.addErrback(self.handle_spider_error, request, response, spider)
dfd.addCallback(self.handle_spider_output, request, response, spider)
return dfd
def _scrape2(self, request_result, request, spider):
"""Handle the different cases of request's result been a Response or a
Failure"""
if not isinstance(request_result, Failure):
return self.spidermw.scrape_response(
self.call_spider, request_result, request, spider)
else:
# FIXME: don't ignore errors in spider middleware
dfd = self.call_spider(request_result, request, spider)
return dfd.addErrback(
self._log_download_errors, request_result, request, spider)
def handle_spider_output(self, result, request, response, spider):
if not result:
return defer_succeed(None)
it = iter_errback(result, self.handle_spider_error, request, response, spider)
dfd = parallel(it, self.concurrent_items,
self._process_spidermw_output, request, response, spider)
return dfd
def _process_spidermw_output(self, output, request, response, spider):
"""Process each Request/Item (given in the output parameter) returned
from the given spider
"""
if isinstance(output, Request):
self.crawler.engine.crawl(request=output, spider=spider)
elif isinstance(output, (BaseItem, dict)):
self.slot.itemproc_size += 1
# 重点在这里,调用process_item方法
dfd = self.itemproc.process_item(output, spider)
dfd.addBoth(self._itemproc_finished, output, response, spider)
return dfd
elif output is None:
pass
else:
typename = type(output).__name__
logger.error('Spider must return Request, BaseItem, dict or None, '
'got %(typename)r in %(request)s',
{'request': request, 'typename': typename},
extra={'spider': spider})
跟下载器中间件一个套路,只不过这里用的是迭代器,就不解释了,各位看注释就很明了了。还有一个重要的函数是call_spider,这个函数的作用就是跟爬虫模块交互,它可以帮我们调用Spider.parse函数(默认),也可以调用request.callback。
至此,scrapy的一个请求的生命周期已经走完了,被schedule从数据库/硬盘/内存/redis加载,交给downloader下载,完成后交给spider和item pipeline,中间由engine统揽全局。当然scrapy远不止这么简单,本文仅仅介绍了最简单的数据流转,后续还会介绍一些更高级的功能。