Scrapy中的Pipeline组件

简介

在下图中可以看到items.py与pipeline.py,其中items是用来定义抓取内容的实体;pipeline则是用来处理抓取的item的管道

《Scrapy中的Pipeline组件》 2018-05-20_21-21-40.png

Item管道的主要责任是负责处理有蜘蛛从网页中抽取的Item,他的主要任务是清晰、验证和存储数据。当页面被蜘蛛解析后,将被发送到Item管道,并经过几个特定的次序处理数据。每个Item管道的组件都是有一个简单的方法组成的Python类。获取了Item并执行方法,同时还需要确定是否需要在Item管道中继续执行下一步或是直接丢弃掉不处理。简而言之,就是通过spider爬取的数据都会通过这个pipeline处理,可以在pipeline中不进行操作或者执行相关对数据的操作。

管道的功能

1.清理HTML数据
2.验证解析到的数据(检查Item是否包含必要的字段)
3.检查是否是重复数据(如果重复就删除)
4.将解析到的数据存储到数据库中

Pipeline中的操作

process_item(item, spider)
每一个item管道组件都会调用该方法,并且必须返回一个item对象实例或raise DropItem异常。被丢掉的item将不会在管道组件进行执行。此方法有两个参数,一个是item,即要处理的Item对象,另一个参数是spider,即爬虫。
此外,我们也可以在类中实现以下方法
open_spider(spider)当spider执行的时候将调用该方法
close_spider(spider)当spider关闭的时候将调用该方法

定制自己的Pipeline组件:

1.生成json数据

class JsonWithEncodingPipeline(object):
    def __init__(self):
        self.file=codecs.open('article.json', 'w', encoding="utf-8")
    def process_item(self, item, spider):
        lines=json.dumps(dict(item), ensure_ascii=False) + '\n'
        self.file.write(lines)
        return item
    def spider_closed(self, spider):
        self.file.close()

2.操作mysql关系数据库

class MysqlPipeline(object):
    def __init__(self):
        self.conn=MySQLdb.connect('localhost', 'root', '*****', 'article_spider', charset="utf8", use_unicode=True)
        self.cursor=self.conn.cursor()

    def process_item(self, item, spider):
        insert_sql="""
            insert into article_items(title, url, url_object_id , create_date)
            VALUES(%s, %s, %s, %s)
        """
        self.cursor.execute(insert_sql, (item["title"], item["url"], item['url_object_id'], item["create_date"]))
        self.conn.commit()

3.异步操作mysql关系数据库

# 异步处理关系数据库
class MysqlTwistedPipline(object):
    def __init__(self, dbpool):
        self.dbpool=dbpool

    @classmethod
    def from_settings(cls, settings):
        dbparms=dict(
            host=settings["MYSQL_HOST"],    #这里要在settings中事先定义好
            db=settings["MYSQL_DBNAME"],
            user=settings["MYSQL_USER"],
            passwd=settings["MYSQL_PASSWORD"],
            charset="utf8",
            cursorclass=MySQLdb.cursors.DictCursor,
            use_unicode=True,
        )
        dbpool=adbapi.ConnectPool("MySQLdb", **dbparms)

        return cls(dbpool)

    def process_item(self, item, spider):
    # 使用twisted将mysql插入变成异步执行
        query = self.dbpool.runInteraction(self.do_insert, item)
        query.addErrback(self.handle_error)


    def handle_error(self, failure, item, spider):
       #处理异步插入的异常
        print(failure)

    def do_insert(self, cursor, item):
        #执行具体的插入
        insert_sql = """
                    insert into article_items(title, url, url_object_id , create_date)
                    VALUES(%s, %s, %s, %s)
                """
        self.cursor.execute(insert_sql, (item["title"], item["url"], item['url_object_id'], item["create_date"]))

4.数据去重

from scrapy.exceptions import DropItem

class DuplicatesPipeline(object):

    def __init__(self):
        self.ids_seen = set()

    def process_item(self, item, spider):
        if item['id'] in self.ids_seen:
            raise DropItem("Duplicate item found: %s" % item)
        else:
            self.ids_seen.add(item['id'])
            return item

使用组件

# Configure item pipelines
# See https://doc.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
   # 'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
  # 'scrapy.pipelines.images.ImagesPipeline': 1,
   'ArticleSpider.pipelines.MysqlPipeline': 1,
   # 'ArticleSpider.pipelines.JsonExporterPipeline': 2,
   # 'ArticleSpider.pipelines.ArticleImagePipeline': 1
}

每个pipeline后面有一个数值,这个数组的范围是0-1000,这个数值是这些在pipeline中定义的类的优先级,越小越优先。

    原文作者:MrYun
    原文地址: https://www.jianshu.com/p/73920111ff08
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞