终于弄清楚,pyspider为什么重写on_result之后,调试的时候可以把数据插入数据库,而不重写的时候不行。
这一篇文章主要是记录process和result部分的内容。之后会通过这些内容改写一下数据库。
def run(self):
'''Run loop'''
logger.info("processor starting...")
while not self._quit:
try:
task, response = self.inqueue.get(timeout=1)
self.on_task(task, response)
self._exceptions = 0
except Queue.Empty as e:
continue
except KeyboardInterrupt:
break
except Exception as e:
logger.exception(e)
self._exceptions += 1
if self._exceptions > self.EXCEPTION_LIMIT:
break
continue
logger.info("processor exiting...")
process进程通过run开始,这段代码通过一个while循环,监听inqueue队列,然后把task,response在on_task运行。inqueue队列是processor2result队列,如果你用消息队列的话,你可以看到消息队列里面有这个队列。
def on_task(self, task, response):
'''Deal one task'''
start_time = time.time()
response = rebuild_response(response)
try:
assert 'taskid' in task, 'need taskid in task'
project = task['project']
updatetime = task.get('project_updatetime', None)
md5sum = task.get('project_md5sum', None)
project_data = self.project_manager.get(project, updatetime, md5sum)
assert project_data, "no such project!"
if project_data.get('exception'):
ret = ProcessorResult(logs=(project_data.get('exception_log'), ),
exception=project_data['exception'])
else:
#注意这里把爬虫实例执行并且把结果返回给队列,最后返回一个processresult对象
ret = project_data['instance'].run_task(
project_data['module'], task, response)
except Exception as e:
logstr = traceback.format_exc()
ret = ProcessorResult(logs=(logstr, ), exception=e)
process_time = time.time() - start_time
if not ret.extinfo.get('not_send_status', False):
if ret.exception:
track_headers = dict(response.headers)
else:
track_headers = {}
for name in ('etag', 'last-modified'):
if name not in response.headers:
continue
track_headers[name] = response.headers[name]
status_pack = {
'taskid': task['taskid'],
'project': task['project'],
'url': task.get('url'),
'track': {
'fetch': {
'ok': response.isok(),
'redirect_url': response.url if response.url != response.orig_url else None,
'time': response.time,
'error': response.error,
'status_code': response.status_code,
'encoding': getattr(response, '_encoding', None),
'headers': track_headers,
'content': response.text[:500] if ret.exception else None,
},
'process': {
'ok': not ret.exception,
'time': process_time,
'follows': len(ret.follows),
'result': (
None if ret.result is None
else utils.text(ret.result)[:self.RESULT_RESULT_LIMIT]
),
'logs': ret.logstr()[-self.RESULT_LOGS_LIMIT:],
'exception': ret.exception,
},
'save': ret.save,
},
}
if 'schedule' in task:
status_pack['schedule'] = task['schedule']
# FIXME: unicode_obj should used in scheduler before store to database
# it's used here for performance.
#把status信息放入status_queue,这个队列还不知道做什么用
self.status_queue.put(utils.unicode_obj(status_pack))
# FIXME: unicode_obj should used in scheduler before store to database
# it's used here for performance.
#如果有新的url放入这个队列
if ret.follows:
for each in (ret.follows[x:x + 1000] for x in range(0, len(ret.follows), 1000)):
self.newtask_queue.put([utils.unicode_obj(newtask) for newtask in each])
on_task比较长。
先看project_manager这个方法,获得project的信息,这里比较主要的是instance,是一个爬虫的handlerbase实例,可以调用run_task()方法,这块怎么来的以后再说吧,没时间了
project_data = self.project_manager.get(project, updatetime, md5sum)
然后是往队列里面扔东西,status_queue不知道干啥,newtask_queue应该是新的url队列,另外ret是processresult对象,以后看
self.status_queue.put(utils.unicode_obj(status_pack))
self.newtask_queue.put([utils.unicode_obj(newtask) for newtask in each])
run_task方法
def run_task(self, module, task, response):
"""
Processing the task, catching exceptions and logs, return a `ProcessorResult` object
"""
self.logger = logger = module.logger
result = None
exception = None
stdout = sys.stdout
self.task = task
if isinstance(response, dict):
response = rebuild_response(response)
self.response = response
self.save = (task.get('track') or {}).get('save', {})
try:
if self.__env__.get('enable_stdout_capture', True):
sys.stdout = ListO(module.log_buffer)
self._reset()
result = self._run_task(task, response)
if inspect.isgenerator(result):
for r in result:
self._run_func(self.on_result, r, response, task)
else:
self._run_func(self.on_result, result, response, task)
except Exception as e:
logger.exception(e)
exception = e
finally:
follows = self._follows
messages = self._messages
logs = list(module.log_buffer)
extinfo = self._extinfo
save = self.save
sys.stdout = stdout
self.task = None
self.response = None
self.save = None
module.log_buffer[:] = []
return ProcessorResult(result, follows, messages, logs, exception, extinfo, save)
这里面有调用_run_func其实就是调用callback的那个函数,这里是执行采集的地方,
result = self._run_task(task, response)
if inspect.isgenerator(result):
for r in result:
self._run_func(self.on_result, r, response, task)
else:
self._run_func(self.on_result, result, response, task)
这里执行方法获得结果,如果是return的数据,执行on_result方法。
def on_result(self, result):
"""Receiving returns from other callback, override me."""
if not result:
return
assert self.task, "on_result can't outside a callback."
if self.is_debugger():
pprint(result)
if self.__env__.get('result_queue'):
self.__env__['result_queue'].put((self.task, result))
on_result把结果放在result_queue里面
大概process就干了这些
def run(self):
'''Run loop'''
logger.info("result_worker starting...")
while not self._quit:
try:
task, result = self.inqueue.get(timeout=1)
self.on_result(task, result)
except Queue.Empty as e:
continue
except KeyboardInterrupt:
break
except AssertionError as e:
logger.error(e)
continue
except Exception as e:
logger.exception(e)
continue
logger.info("result_worker exiting...")
result开始监听队列执行on_result
def on_result(self, task, result):
'''Called every result'''
if not result:
return
if 'taskid' in task and 'project' in task and 'url' in task:
logger.info('result %s:%s %s -> %.30r' % (
task['project'], task['taskid'], task['url'], result))
return self.resultdb.save(
project=task['project'],
taskid=task['taskid'],
url=task['url'],
result=result
)
else:
logger.warning('result UNKNOW -> %.30r' % result)
return
存入数据库,用project,taskid,url的表,就是默认的那个