初识爬虫-scrapy-redis组件

摘要

使用scrapy爬虫框架编写爬虫过程中,如果碰到大量的爬虫工作,将中间数据写入到数据库是不明智的,因为是频繁操作磁盘io导致效率下降,这时候应该把数据写入到redis中,提高效率的同时可以实现分布式爬虫。

scrapy-redis是一个基于redis的scrapy组件,通过它可以快速实现简单分布式爬虫程序,该组件本质上提供了三大功能:

  • scheduler - 调度器
  • dupefilter - URL去重规则(被调度器使用)
  • pipeline - 数据持久化

安装

1
pip install scrapy-redis

python操作redis

集合的基本操作

1
2
3
4
5
6
7
8
9
10
import redis
conn = redis.Redis(host='127.0.0.1',port=6379)
# 1 表示添加成功
# 0 表示已经存在
v = conn.sadd('xxxx','x1')
print(v)
values = conn.smembers('xxxx')
print(values)

对列表的基本操作

  • 左边插入,右边拿:队列
  • 左边插入,左边拿:栈
  • 没有数据拿的时候会夯住
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 在列表的左边插入
conn.lpush('user_list','aaa')
conn.lpush('user_list','bbb')
conn.lpush('user_list','ccc')
# 在列表的右边插入
conn.rpush('user_list','ccc')
r = conn.lrange('user_list',0,10)
print(r)
# 在列表的左边移除
v = conn.lpop('user_list')
# 在列表的右边移除
v = conn.rpop('user_list')
# 如果列表没有数据就夯住,直到有数据
v = conn.blpop('user_list')
v = conn.brpop('user_list')

有序集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# conn.zadd('score','a',60,'b',30,'c',90)
# 根据分数从大到小排序,并获取最大的分值对应的数据
val = conn.zrange('score',0,0,desc=True)
print(val)
val = conn.zrange('score',0,0)
print(val)
# 第一个删掉
conn.zremrangebyrank('score',0,0)
# 基于事务拿到第一个并删掉,相当于pop
# 根据分数从小到大排序,获取分值最小对应的数据并从redis中移除
pipe = conn.pipeline()
pipe.multi()
pipe.zrange('score',0,0).zremrangebyrank('score',0,0)
results, count = pipe.excute()

scrapy-redis的三种队列

  • 队列 SpiderQueue = FifoQueue
  • 栈 SpiderStack = LifoQueue
  • 优先级队列 SpiderPriorityQueue = PriorityQueue
  • 默认使用优先级队列(默认广度优先),
  • 其他:PriorityQueue(有序集合),
  • FifoQueue(列表)(没有优先级,先进先出,广度优先)、
  • LifoQueue(列表)(深度优先)
1
2
3
4
5
6
7
8
9
# response.request.priority 优先级
print(response, response.request.priority)
pages = response.xpath("//div[@id='page-area']//a[@class='ct_pagepa']/@href").extract()
for page in pages:
page_url = "https://dig.chouti.com" + page
obj = Request(url = page_url, callback=self.parse)
obj.priority = response.request.priority + 1
yield obj

梳理

scrapy中的去重规则

scrapy.dupefilter.RFPDupeFilter

scrapy-redis中的去重规则

scrapy_redis.dupefilter.RFPDupeFilter

scrapy中的调度器

scrapy.scheduler.Scheduler

  • 将request对象全部放到内部维护的队列:self.q = deque()
  • 将request对象全部放到硬盘维护的队列:文件操作
  • 将request对象全部放到内部维护的队列:self.q = deque()

scrapy-redis中的调度器

scrapy_redis.scheduler.Scheduler

  • 将请求通过pickle进行序列化,然后添加到redis的列表或者有序集合中
    • SCHEDULER_QUEUE_CLASS = ‘scrapy_redis.queue.FifoQueue’
  • 爬虫爬取数据时存在层级和优先级:爬虫中间件实现

使用scrpy-redis组件

  • 只使用去重规则
    • 连接redis
    • DUPEFILTER_CLASS = ‘scrapy_redis.dupefilter.RFPDupeFilter’
  • 只用调度器(一般不这么用)
    • 去重规则使用默认的
  • 去重+调度器(常用)
  • 使用pipline做持久化,将item对象保存在redis的列表中
  • 起始url不从start_urls里面取,从redis中取

URL去重

  • 如果要扩展,自己写一个然后继承
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
定义去重规则(被调度器调用并应用)
a. 内部会使用以下配置进行连接Redis
# REDIS_HOST = 'localhost' # 主机名
# REDIS_PORT = 6379 # 端口
# REDIS_URL = 'redis://user:pass@hostname:9001' # 连接URL(优先于以上配置)
# REDIS_PARAMS = {} # Redis连接参数 默认:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,})
# REDIS_PARAMS['redis_cls'] = 'myproject.RedisClient' # 指定连接Redis的Python模块 默认:redis.StrictRedis
# REDIS_ENCODING = "utf-8" # redis编码类型 默认:'utf-8'
b. 去重规则通过redis的集合完成,集合的Key为:
key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
默认配置:
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'
c. 去重规则中将url转换成唯一标示,然后在redis中检查是否已经在集合中存在
from scrapy.utils import request
from scrapy.http import Request
req = Request(url='http://www.cnblogs.com/wupeiqi.html')
result = request.request_fingerprint(req)
print(result) # 8ea4fd67887449313ccc12e5b6b92510cc53675c
PS:
- URL参数位置不同时,计算结果一致;
- 默认请求头不在计算范围,include_headers可以设置指定请求头
示例:
from scrapy.utils import request
from scrapy.http import Request
req = Request(url='http://www.baidu.com?name=8&id=1',callback=lambda x:print(x),cookies={'k1':'vvvvv'})
result = request.request_fingerprint(req,include_headers=['cookies',])
print(result)
req = Request(url='http://www.baidu.com?id=1&name=8',callback=lambda x:print(x),cookies={'k1':666})
result = request.request_fingerprint(req,include_headers=['cookies',])
print(result)
"""
# Ensure all spiders share same duplicates filter through redis.
# DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"

调度器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
"""
调度器,调度器使用PriorityQueue(有序集合)、FifoQueue(列表)、LifoQueue(列表)进行保存请求,并且使用RFPDupeFilter对URL去重
a. 调度器
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue' # 默认使用优先级队列(默认),其他:PriorityQueue(有序集合),FifoQueue(列表)、LifoQueue(列表)
SCHEDULER_QUEUE_KEY = '%(spider)s:requests' # 调度器中请求存放在redis中的key
SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat" # 对保存到redis中的数据进行序列化,默认使用pickle
SCHEDULER_PERSIST = True # 是否在关闭时候保留原来的调度器和去重记录,True=保留,False=清空
SCHEDULER_FLUSH_ON_START = True # 是否在开始之前清空 调度器和去重记录,True=清空,False=不清空
SCHEDULER_IDLE_BEFORE_CLOSE = 10 # 去调度器中获取数据时,如果为空,最多等待时间(最后没数据,未获取到)。
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter' # 去重规则,在redis中保存时对应的key
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'# 去重规则对应处理的类
"""
# Enables scheduling storing requests queue in redis.
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# Default requests serializer is pickle, but it can be changed to any module
# with loads and dumps functions. Note that pickle is not compatible between
# python versions.
# Caveat: In python 3.x, the serializer must return strings keys and support
# bytes as values. Because of this reason the json or msgpack module will not
# work by default. In python 2.x there is no such issue and you can use
# 'json' or 'msgpack' as serializers.
# SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat"
# Don't cleanup redis queues, allows to pause/resume crawls.
# SCHEDULER_PERSIST = True
# Schedule requests using a priority queue. (default)
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
# Alternative queues.
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.FifoQueue'
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.LifoQueue'
# Max idle time to prevent the spider from being closed when distributed crawling.
# This only works if queue class is SpiderQueue or SpiderStack,
# and may also block the same time when your spider start at the first time (because the queue is empty).
# SCHEDULER_IDLE_BEFORE_CLOSE = 10

数据持久化

定义持久化,爬虫yield Item对象时执行RedisPipeline

1
2
3
4
5
6
* 将item持久化到redis时,指定key和序列化函数
REDIS_ITEMS_KEY = '%(spider)s:items'
REDIS_ITEMS_SERIALIZER = 'json.dumps'
* 使用列表保存item数据
scrapy_redis.pipelines.RedisPipeline

起始URL相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
"""
起始URL相关
a. 获取起始URL时,去集合中获取还是去列表中获取?True,集合;False,列表
REDIS_START_URLS_AS_SET = False
# 获取起始URL时,如果为True,则使用self.server.spop;如果为False,则使用self.server.lpop
b. 编写爬虫时,起始URL从redis的Key中获取
REDIS_START_URLS_KEY = '%(name)s:start_urls'
# 起始的并发数
REDIS_START_URLS_BATCH_SIZE = 1
"""
# If True, it uses redis' ``spop`` operation. This could be useful if you
# want to avoid duplicates in your start urls list. In this cases, urls must
# be added via ``sadd`` command or you will get a type error from redis.
# REDIS_START_URLS_AS_SET = False
# Default start urls key for RedisSpider and RedisCrawlSpider.
# REDIS_START_URLS_KEY = '%(name)s:start_urls'

scrapy-redis示例

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
#
#
# from scrapy_redis.scheduler import Scheduler
# from scrapy_redis.queue import PriorityQueue
# SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue' # 默认使用优先级队列(默认),其他:PriorityQueue(有序集合),FifoQueue(列表)、LifoQueue(列表)
# SCHEDULER_QUEUE_KEY = '%(spider)s:requests' # 调度器中请求存放在redis中的key
# SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat" # 对保存到redis中的数据进行序列化,默认使用pickle
# SCHEDULER_PERSIST = True # 是否在关闭时候保留原来的调度器和去重记录,True=保留,False=清空
# SCHEDULER_FLUSH_ON_START = False # 是否在开始之前清空 调度器和去重记录,True=清空,False=不清空
# SCHEDULER_IDLE_BEFORE_CLOSE = 10 # 去调度器中获取数据时,如果为空,最多等待时间(最后没数据,未获取到)。
# SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter' # 去重规则,在redis中保存时对应的key
# SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'# 去重规则对应处理的类
#
#
#
# REDIS_HOST = '10.211.55.13' # 主机名
# REDIS_PORT = 6379 # 端口
# # REDIS_URL = 'redis://user:pass@hostname:9001' # 连接URL(优先于以上配置)
# # REDIS_PARAMS = {} # Redis连接参数 默认:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,})
# # REDIS_PARAMS['redis_cls'] = 'myproject.RedisClient' # 指定连接Redis的Python模块 默认:redis.StrictRedis
# REDIS_ENCODING = "utf-8" # redis编码类型 默认:'utf-8'