使用bloomfilter修改scrapy-redis去重

首先我们先了解一下为什么要使用bloomfilter去修改scrapy的去重机制。scrapy采用指纹方式进行编码去重,在scrapy/utils/request.py文件中request_fingerprint函数是执行指纹编码的

fp = hashlib.sha1()
fp.update(to_bytes(request.method))
fp.update(to_bytes(canonicalize_url(request.url)))
fp.update(request.body or b'')
if include_headers:
    for hdr in include_headers:
       if hdr in request.headers:
            fp.update(hdr)
            for v in request.headers.getlist(hdr):
                fp.update(v)
cache[include_headers] = fp.hexdigest()

可以看到,它是将请求方法、url、内容、请求头进行sha1加密,生成长度为40位的16进制数指纹信息。sha1加密生成的160bit的散列值,1byte=8bit,因此每一个指纹占用空间为20byte。一万个值占用空间为200KB,一亿个指纹占用2GB。如果我们爬取上亿条数据的话,仅仅指纹信息占用的内存就会达到GB级别,这还不算我们提取的数据和爬取队列。在这种情况下,我们要么通过增加内存来提高爬取上限,要么就改变去重算法来减少内存占用。增加内存对于我们来说不太合适(qiong),那么就要改变去重算法了。bloomfilter就是这样一个算法。

Bloomfilter算法简介

Bloom Filter是一种空间效率很高的随机数据结构,它利用位数组很简洁地表示一个集合,并能判断一个元素是否属于这个集合。Bloom Filter的这种高效是有一定代价的:在判断一个元素是否属于某个集合时,有可能会把不属于这个集合的元素误认为属于这个集合。因此,Bloom Filter不适合那些“零错误”的应用场合。而在能容忍低错误率的应用场合下,Bloom Filter通过极少的错误换取了存储空间的极大节省。

集合表示和元素查询

下面我们具体来看Bloom Filter是如何用位数组表示集合的。初始状态时,Bloom Filter是一个包含m位的位数组,每一位都置为0。
为了表达S={x1, x2,…,xn}这样一个n个元素的集合,Bloom Filter使用k个相互独立的哈希函数,它们分别将集合中的每个元素映射到{1,…,m}的范围中。对任意一个元素x,第i个哈希函数映射的位置hi(x)就会被置为1(1≤i≤k)。注意,如果一个位置多次被置为1,那么只有第一次会起作用,后面几次将没有任何效果。

例如有一组字符 arr:”哈哈“,”呵呵“……..

字符串:“哈哈”
哈希算法1处理后:8
哈希算法2处理后:1
哈希算法3处理后:3

映射到数组后:

《使用bloomfilter修改scrapy-redis去重》

再处理字符串:“呵呵”
哈希算法1处理后:2
哈希算法2处理后:1
哈希算法3处理后:9

继续映射到位数组:

《使用bloomfilter修改scrapy-redis去重》

每一个字符都使用这些哈希算法进行处理后,映射到位数组。

判断在这些字符串是否包含”嘻嘻“
哈希算法1处理后:0
哈希算法2处理后:1
哈希算法3处理后:7

只要判断下标分别为 0,1,7位置的值是否都为1,如下图因为位置0跟位置7的值不为1

所以”嘻嘻“不包含在arr中,反之如果都为1就说明已经存在

《使用bloomfilter修改scrapy-redis去重》

因为这种算法存在一定的错误率,我在网上找到以下错误率的表格:

《使用bloomfilter修改scrapy-redis去重》 错误率表

表中第一列为m/n的值,第二列为最优k值,其后列为不同k值的误判概率。当k值确定时,随着m/n的增大,误判概率逐渐变小。当m/n的值确定时,当k越靠近最优K值,误判概率越小。误判概率总体来看都是极小的,在容忍此误判概率的情况下,大幅减小存储空间和判定速度是完全值得的。

redis的setbit和getbit

前面说的BloomFilter算法是单机的。但是拥有大数据量的系统绝不是一台服务器,所以需要多台服务器共享。结合Redis的BitMap就能够完美的实现这一需求。利用redis的高性能以及通过pipeline将多条bit操作命令批量提交,实现了多机BloomFilter的bit数据共享。唯一需要注意的是redis的bitmap只支持2的32次方大小,对应到内存也就是512MB,数组的下标最大只能是2的32次方-1。不过这个限制我们可以通过构建多个redis的bitmap通过hash取模的方式分散一下即可。万分之一的误判率,512MB可以放下2亿条数据。

SETBIT key offset value
对 key 所储存的字符串值,设置或清除指定偏移量上的位(bit)。
在redis中,存储的字符串都是以二级制的进行存在的。

举例:
设置一个key-value ,键的名字叫“andy” 值为字符’a’

《使用bloomfilter修改scrapy-redis去重》

我们知道 ‘a’ 的ASCII码是 97。转换为二进制是:01100001。offset的学名叫做“偏移” 。二进制中的每一位就是offset值啦,比如在这里 offset 0 等于 ‘0’ ,offset 1等于’1′ ,offset2等于’1′,offset 6 等于’1′ ,没错,offset是从左往右计数的,也就是从高位往低位。

我们通过SETBIT 命令将 andy中的 ‘a’ 变成 ‘b’ 应该怎么变呢?
也就是将 01100001 变成 01100010 (b的ASCII码是98),这个很简单啦,也就是将’a’中的offset 6从0变成1,将offset 7 从1变成0 。

《使用bloomfilter修改scrapy-redis去重》

果然,就从’a’ 变成 ‘b’了。

GETBIT KEY_NAME OFFSET
getbit很简单就是获取偏移位置上的二进制值。

scrapy-redis修改去重算法

我们先看一下scrapy-redis本身提供的去重方式

def request_seen(self, request):
    """Returns True if request was already seen.
    Parameters
    ----------
    request : scrapy.http.Request
    Returns
    -------
    bool
    """
    fp = self.request_fingerprint(request)
    # This returns the number of values added, zero if already exists.
    added = self.server.sadd(self.key, fp)
    return added == 0

通过request_fingerprint函数生成指纹信息,self.server是redis的set集合,使用sadd添加到集合中。sadd添加数据时,如果不存在,就返回1,如果已存在就返回0。

首先我们需要提供哈希函数。对于哈希函数应该必须具备高随机性、低碰撞率。也就是要保证我们的哈希函数所计算出来的值能够平均分散在内存区域的任意位置。

class HashMap(object):
    def __init__(self, m, seed):
        self.m = m
        self.seed = seed
    
    def hash(self, value):
        """
        Hash Algorithm
        :param value: Value
        :return: Hash Value
        """
        ret = 0
        for i in range(len(value)):
            ret += self.seed * ret + ord(value[i])        
        return (self.m - 1) & ret

这是网上通常用到的哈希算法,这种算法我没法确定它的随机率。但是我看到了另外一个库:mmh3

murmurhash简介 – mmh3

MurmurHash 是一种非加密型哈希函数,适用于一般的哈希检索操作。 由Austin Appleby在2008年发明, 并出现了多个变种,都已经发布到了公有领域(public domain)。与其它流行的哈希函数相比,对于规律性较强的key,MurmurHash的随机分布特征表现更良好。—摘自wiki
以下是使用mmh3算法实现的Bloomfilter算法

import mmh3
from hashlib import md5

# 在redis中初始化一个大字符串,也可以认为是在redis中开辟了一块内存空间
# 需要指定数据库名, 比如这儿用到的就是db1
# 指定使用数据块个数,也就是开辟几个这样的大字符串。
# 当数据达到非常大时,512M肯定是不够用的,可能每个位都被置为1了,所以需要开辟多个大字符串
# 大字符串名name = (key + int)
class BloomFilter(object):
    def __init__(self, server, key, debug=False, **kwargs):
        """
        :param host: the host of Redis
        :param port: the port of Redis
        :param db: witch db in Redis
        :param blockNum: one blockNum for about 90,000,000; if you have more strings for filtering, increase it.
        :param key: the key's name in Redis
        """
        self.server = server
        # 2^32 = 512M
        # 这是一个限制值,最大为512M,因为在redis中,字符串值可以进行伸展,伸展时,空白位置以0填充。
        self.seeds = kwargs.get("seeds", defaults.BLOOMFILTER_SEED)
        self.key = key
        self.blockNum = kwargs.get("blockNum", defaults.BLOOMFILTER_BLOCK) #需要多少个内存块
        self.bit_size = 1<<kwargs.get("bit_size", defaults.BLOOMFILTER_SIZE)

    # 判断元素是否在集合中
    def isContains(self, str_input):
        if not str_input:
            return False
        
        name = self.key + str(sum(map(ord, str_input)) % self.blockNum)
        for seed in self.seeds:
            loc = mmh3.hash(str_input, seed, signed=False)
            # 判断是否在集合中,要求所有的哈希函数得到的偏移值都是1
            # 如果getbit为0,则说明此元素不在集合中,跳出判断
            if self.server.getbit(name, loc % self.bit_size) == 0:
                break
        else:
            # for中所有条件均未跳出,说明所有的偏移值都是1,元素在集合中
            return True
        return False

    # 将str_input映射的结果,写入到大字符串中,也就是置上相关的标志位
    def insert(self, str_input):
        name = self.key + str(sum(map(ord, str_input)) % self.blockNum)
        for seed in self.seeds:
            loc = mmh3.hash(str_input, seed, signed=False)
            self.server.setbit(name, loc % self.bit_size, 1)

具体代码太多,这里就不全贴了,只说几个scrapy-redis中需要改的文件部分:

《使用bloomfilter修改scrapy-redis去重》

defaults.py:

BLOOMFILTER_BLOCK = 1  # 内存区块个数
BLOOMFILTER_SIZE = 31  # 内存占用大小
BLOOMFILTER_SEED = 6  # 种子个数

scheduler.py:

    def open(self, spider):
        self.spider = spider
        ......
        try:
            self.df = load_object(self.dupefilter_cls)(
                server=self.server,
                key=self.dupefilter_key % {'spider': spider.name},
                debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
                blockNum=spider.settings.getint('BLOOMFILTER_BLOCK', defaults.BLOOMFILTER_BLOCK),
                bit_size = spider.settings.getint('BLOOMFILTER_SIZE', defaults.BLOOMFILTER_SIZE),
                seeds = spider.settings.getint('BLOOMFILTER_SEED', defaults.BLOOMFILTER_SEED)
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate dupefilter class '%s': %s",
                             self.dupefilter_cls, e)

dupefilter.py:

class RFPDupeFilter(BaseDupeFilter):
    logger = logger
    def __init__(self, server, key, debug=False, **kwargs):
        ......

使用**kwargs参数是为了保持一致,在scheduler调度中保持参数的一致性,这样我们在settings中就可以切换配置两种去重方式:
settings:

# 确保所有的爬虫通过Redis去重
# DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
DUPEFILTER_CLASS = "scrapy_redis.bloomfilter.Bloomfilter"

# 内存区块数量,主要是每一块最大512M,如果需要的更多的话,就可以分为多个区块去存储
BLOOMFILTER_BLOCK = 1

# 内存大小,表示2的30次方
BLOOMFILTER_SIZE = 30

# 种子个数,合适的数量可以参照上面的错误率表来确定
BLOOMFILTER_SEED = 6

大概思路已经说完了,具体的请看我GitHub上的源码:https://github.com/xingxingzaixian/python-spiders/tree/master/scrapy_redis

如果你觉得我的文章还可以,可以关注我的微信公众号,查看更多实战文章:Python爬虫实战之路
也可以扫描下面二维码,添加我的微信公众号

《使用bloomfilter修改scrapy-redis去重》 公众号

    原文作者:星星在线
    原文地址: https://www.jianshu.com/p/a2b6eb35ed65
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞