Python笔记:分布式爬虫原理与Scrapy分布式应用

2024-03-16 13:50

本文主要是介绍Python笔记:分布式爬虫原理与Scrapy分布式应用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、分布式爬虫原理

  • Scrapy框架虽然爬虫是异步多线程的,但是我们只能在一台主机上运行,爬取效率还是有限。
  • 分布式爬虫则是将多台主机组合起来,共同完成一个爬取任务,将大大提高爬取的效率。

分布式爬虫架构

1 ) Scrapy单机架构回顾

  • Scrapy单机爬虫中有一个本地爬取队列Queue,这个队列是利用deque模块实现的。
  • 如果有新的Request产生,就会放到队列里面,随后Request被Scheduler调度。
  • 之后Request交给Downloader执行爬取,这就是简单的调度架构。

2 ) 分布式爬虫架构

2.1 在多台主机上同时运行爬虫任务, 架构图如下:

2.2 维护爬取队列

  • 爬取队列是基于内存存储的Redis, 它支持多种数据结构,如:列表、集合、有序集合等, 存取的操作也非常简单。
  • Redis支持的这几种数据结构,在存储中都有各自优点:
    • 列表(list)有lpush()、lpop()、rpush()、rpop()方法,可以实现先进先出的队列和先进后出的栈式爬虫队列。
    • 集合(set)的元素是无序且不重复的,这样我们可以非常方便的实现随机且不重复的爬取队列。
    • 有序集合有分数表示,而Scrapy的Request也有优先级的控制,我们可以用它来实现带优先级调度的队列。

2.3 数据去重

  • Scrapy有自动去重,它的去重使用了Python中的集合实现。用它记录了Scrapy中每个Request的指纹(Request的散列值)。
  • 对于分布式爬虫来说,我们肯定不能再用每个爬虫各自的集合来去重了,因为不能共享,各主机之间就无法做到去重了。
  • 可以使用Redis的集合来存储指纹集合,那么这样去重集合也是利用Redis共享的。
  • 每台主机只要将新生成Request的指纹与集合比对,判断是否重复并选择添加入到其中。即实例了分布式Request的去重。

2.4 防止中断

  • 在Scrapy中,爬虫运行时的Request队列放在内存中。爬虫运行中断后,这个队列的空间就会被释放,导致爬取不能继续。
  • 要做到中断后继续爬取,我们可以将队列中的Request保存起来,下次爬取直接读取保存的数据既可继续上一次爬取的队列。
  • 在Scrapy中制定一个爬取队列的存储路径即可,这个路径使用JOB_DIR变量来标识,命令如下:
    • $scrapy crawl spider -s JOB_DIR=crawls/spider
  • 官方文档:http://doc.scrapy.org/en/latest/topics/jobs.html
  • 在Scrapy中,把爬取队列保存到本地,第二次爬取直接读取并恢复队列既可。
  • 在分布式框架中就不用担心这个问题了,因为爬取队列本身就是用数据库存储的,中断后再启动就会接着上次中断的地方继续爬取。
  • 当Redis的队列为空时,爬虫会重新爬取;当队列不为空时,爬虫便会接着上次中断处继续爬取。

2.5 架构实现

  • 首先实现一个共享的爬取队列, 还要实现去重的功能。
  • 重写一个Scheduer的实现, 使之可以从共享的爬取队列存取Request。
  • Scrapy-Redis 分布式爬虫的开源包, 直接使用就可以很方便实现分布式爬虫。

二、Scrapy分布式爬虫

关于Scrapy-Redis

  • Scrapy-Redis则是一个基于Redis的Scrapy分布式组件。
  • 它利用Redis对用于爬取的请求(Requests)进行存储和调度(Schedule),并对爬取产生的项目(items)存储以供后续处理使用。
  • Scrapy-redis重写了Scrapy一些比较关键的代码,将Scrapy变成一个可以在多个主机上同时运行的分布式爬虫。
  • Scrapy-Redis的官方网址:https://github.com/rmax/scrapy-redis
  • 架构图

准备工作

  • scrapy
  • scrapy-redis
  • redis
  • mysql
  • python的mysqldb模块
  • python的redis模块

模块安装

  • $ pip3 install scrapy-redis
  • $ pip3 install redis

关于Scrapy-redis的各个组件

1 ) connection.py

  • 负责根据setting中配置实例化redis连接。
  • 被dupefilter和scheduler调用。
  • 总之涉及到redis存取的都要使用到这个模块。

2 ) dupefilter.py

  • 负责执行requst的去重,实现的很有技巧性,使用redis的set数据结构。
  • 但是注意scheduler并不使用其中用于在这个模块中实现的dupefilter键做request的调度,而是使用queue.py模块中实现的queue。
  • 当request不重复时,将其存入到queue中,调度时将其弹出。

3 ) queue.py

  • FIFO的SpiderQueue,SpiderPriorityQueue,以及LIFI的SpiderStack。
  • 默认使用的是第二中,这也就是出现之前文章中所分析情况的原因(链接)。

4 ) pipelines.py

  • 用来实现分布式处理,它将Item存储在redis中以实现分布式处理。
  • 在这里需要读取配置,所以就用到了from_crawler()函数。

5 ) scheduler.py

  • 此扩展是对scrapy中自带的scheduler的替代(在settings的SCHEDULER变量中指出),正是利用此扩展实现crawler的分布式调度。其利用的数据结构来自于queue中实现的数据结构。
  • scrapy-redis所实现的两种分布式:爬虫分布式以及item处理分布式就是由模块scheduler和模块pipelines实现。上述其它模块作为为二者辅助的功能模块。

6 ) spider.py

  • 设计的这个spider从redis中读取要爬的url, 然后执行爬取, 若爬取过程中返回更多的url, 那么继续进行直至所有的request完成。
  • 之后继续从redis中读取url, 循环这个过程。

具体的配置和使用(对Scrapy改造)

1 ) 首先在settings.py中配置redis

在scrapy-redis 自带的例子中已经配置好

# 指定使用scrapy-redis的去重
DUPEFILTER_CLASS = 'scrapy_redis.dupefilters.RFPDupeFilter'# 指定使用scrapy-redis的调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"# 在redis中保持scrapy-redis用到的各个队列,从而允许暂停和暂停后恢复,也就是不清理redis queues
SCHEDULER_PERSIST = True# 指定排序爬取地址时使用的队列,
# 默认的 按优先级排序(Scrapy默认),由sorted set实现的一种非FIFO、LIFO方式。
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderPriorityQueue'REDIS_URL = None # 一般情况可以省去
REDIS_HOST = '127.0.0.1' # 也可以根据情况改成 localhost
REDIS_PORT = 6379

2 ) item.py的改造

from scrapy.item import Item, Field
from scrapy.loader import ItemLoader
from scrapy.loader.processors import MapCompose, TakeFirst, Joinclass ExampleItem(Item):name = Field()description = Field()link = Field()crawled = Field()spider = Field()url = Field()class ExampleLoader(ItemLoader):default_item_class = ExampleItemdefault_input_processor = MapCompose(lambda s: s.strip())default_output_processor = TakeFirst()description_out = Join()

3 ) spider的改造

  • star_urls变成了redis_key从redis中获得request,继承的scrapy.spider变成RedisSpider。
from scrapy_redis.spiders import RedisSpiderclass MySpider(RedisSpider):"""Spider that reads urls from redis queue (myspider:start_urls)."""name = 'myspider_redis'redis_key = 'myspider:start_urls'def __init__(self, *args, **kwargs):# Dynamically define the allowed domains list.domain = kwargs.pop('domain', '')self.allowed_domains = filter(None, domain.split(','))super(MySpider, self).__init__(*args, **kwargs)def parse(self, response):return {'name': response.css('title::text').extract_first(),'url': response.url,}
  • 启动爬虫 $scrapy runspider my.py

    • 可以输入多个来观察多进程的效果。
    • 打开了爬虫之后你会发现爬虫处于等待爬取的状态,是因为list此时为空。
    • 所以需要在redis控制台中添加启动地址,这样就可以愉快的看到所有的爬虫都动起来啦。
    • $lpush mycrawler:start_urls http://www.***.com
  • 关于 settings.py 的配置

# 指定使用scrapy-redis的调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"# 指定使用scrapy-redis的去重
DUPEFILTER_CLASS = 'scrapy_redis.dupefilters.RFPDupeFilter'# 指定排序爬取地址时使用的队列,
# 默认的 按优先级排序(Scrapy默认),由sorted set实现的一种非FIFO、LIFO方式。
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderPriorityQueue'
# 可选的 按先进先出排序(FIFO)
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderQueue'
# 可选的 按后进先出排序(LIFO)
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderStack'# 在redis中保持scrapy-redis用到的各个队列,从而允许暂停和暂停后恢复,也就是不清理redis queues
SCHEDULER_PERSIST = True# 只在使用SpiderQueue或者SpiderStack是有效的参数,指定爬虫关闭的最大间隔时间
# SCHEDULER_IDLE_BEFORE_CLOSE = 10# 通过配置RedisPipeline将item写入key为 spider.name : items 的redis的list中,供后面的分布式处理item
# 这个已经由 scrapy-redis 实现,不需要我们写代码
ITEM_PIPELINES = {'scrapy_redis.pipelines.RedisPipeline': 400
}# 指定redis数据库的连接参数
# REDIS_PASS是我自己加上的redis连接密码(默认不做)
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
#REDIS_PASS = 'redisP@ssw0rd'# LOG等级
LOG_LEVEL = 'DEBUG'#默认情况下,RFPDupeFilter只记录第一个重复请求。将DUPEFILTER_DEBUG设置为True会记录所有重复的请求。
DUPEFILTER_DEBUG =True# 覆盖默认请求头,可以自己编写Downloader Middlewares设置代理和UserAgent
DEFAULT_REQUEST_HEADERS = {'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8','Accept-Language': 'zh-CN,zh;q=0.8','Connection': 'keep-alive','Accept-Encoding': 'gzip, deflate, sdch'
}

三、Scrapy分布式应用案例

实现目标

  • 实现主从分布式爬虫,爬取5i5j的楼盘信息
  • URL地址:https://fang.5i5j.com/bj/loupan/
  • 准备工作:开启redis数据库服务

应用案例代码仓库和架构

1 )仓库

  • 点此查看项目地址

  • 注意:此项目没有做反爬处理,可以使用第三方ip代理,具体请看前面博文,没有从radis中存储到mongodb中,具体实现方式参考最后的代码

2 )架构

3 ) 编写master(主)项目代码

  • 编辑爬虫文件:fang.py
# -*- coding: utf-8 -*-
# python3.7.1 + scrapy1.5.1
from scrapy.spiders import CrawlSpider, Rule  
from scrapy.linkextractors import LinkExtractor  
from fang_5i5j.items import MasterItem  class FangSpider(CrawlSpider):  name = 'master'  allowed_domains = ['fang.5i5j.com']start_urls = ['https://fang.5i5j.com/bj/loupan/']item = MasterItem()# Rule是在定义抽取链接的规则rules = (Rule(LinkExtractor(allow=('https://fang.5i5j.com/bj/loupan/n[0-9]+/',)), callback='parse_item',  follow=True),)def parse_item(self, response):item = self.itemitem['url'] = response.url  return item
  • 编辑items.py 储存url地址
# -*- coding: utf-8 -*-# Define here the models for your scraped items
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/items.htmlimport scrapyclass MasterItem(scrapy.Item):# define the fields for your item here like:url = scrapy.Field()  # pass
  • 编辑pipelines.py负责存储爬取的url地址到redis中
import redis,reclass Fang5I5JPipeline(object):def process_item(self, item, spider):return itemclass MasterPipeline(object):  def __init__(self,host,port):#连接redis数据库self.r = redis.Redis(host=host, port=port, decode_responses=True)#self.redis_url = 'redis://password:@localhost:6379/'  #self.r = redis.Redis.from_url(self.redis_url,decode_responses=True)  @classmethoddef from_crawler(cls, crawler):'''注入实例化对象(传入参数)'''return cls(host = crawler.settings.get("REDIS_HOST"),port = crawler.settings.get("REDIS_PORT"),)def process_item(self, item, spider):  #使用正则判断url地址是否有效,并写入redis。if re.search('/bj/loupan/', item['url']):self.r.lpush('fangspider:start_urls', item['url'])else:self.r.lpush('fangspider:no_urls', item['url'])
  • 编辑配置文件:settings.py配置文件
ITEM_PIPELINES = {'fang_5i5j.pipelines.MasterPipeline': 300,
}# 指定使用scrapy-redis的去重
DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'# 指定使用scrapy-redis的调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"# 在redis中保持scrapy-redis用到的各个队列,从而允许暂停和暂停后恢复,也就是不清理redis queues
SCHEDULER_PERSIST = True# 指定排序爬取地址时使用的队列,
# 默认的 按优先级排序(Scrapy默认),由sorted set实现的一种非FIFO、LIFO方式。
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderPriorityQueue'# REDIS_URL = 'redis:password//127.0.0.1:6379' # 一般情况可以省去
REDIS_HOST = '127.0.0.1' # 也可以根据情况改成 localhost
REDIS_PORT = 6379
  • 测试:爬取url $scrapy runspider fang.py

4 ) 编写slave(从)项目代码

  • 编辑爬虫文件:fang.py
# -*- coding: utf-8 -*-
import scrapy
from fang_5i5j.items import FangItem
from scrapy_redis.spiders import RedisSpiderclass FangSpider(RedisSpider):name = 'fang'#allowed_domains = ['fang.5i5j.com']#start_urls = ['https://fang.5i5j.com/bj/loupan/']redis_key = 'fangspider:start_urls'def __init__(self, *args, **kwargs):# Dynamically define the allowed domains list.domain = kwargs.pop('domain', '')self.allowed_domains = filter(None, domain.split(','))super(FangSpider, self).__init__(*args, **kwargs)def parse(self, response):#print(response.status)hlist = response.css("li.houst_ctn")for vo in hlist:item = FangItem()item['title'] =  vo.css("span.house_name::text").extract_first()# print(item)yield item#pass
  • 编辑 items.py
import scrapyclass FangItem(scrapy.Item):# 此处做一个字段处理,作为演示title = scrapy.Field()
  • 查看pipelines.py (按默认来即可,不用操作)
class Fang5I5JPipeline(object):def process_item(self, item, spider):return item
  • 编辑配置文件:settings.py配置文件
ITEM_PIPELINES = {# 'fang_5i5j.pipelines.Fang5I5JPipeline': 300,'scrapy_redis.pipelines.RedisPipeline': 400,
}# 指定使用scrapy-redis的去重
DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'# 指定使用scrapy-redis的调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"# 在redis中保持scrapy-redis用到的各个队列,从而允许暂停和暂停后恢复,也就是不清理redis queues
SCHEDULER_PERSIST = True# 指定排序爬取地址时使用的队列,
# 默认的 按优先级排序(Scrapy默认),由sorted set实现的一种非FIFO、LIFO方式。
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderPriorityQueue'# REDIS_URL = 'redis://localhost:6379' # 一般情况可以省去
REDIS_HOST = '127.0.0.1' # 也可以根据情况改成 localhost
REDIS_PORT = 6379
  • 测试:爬取具体房屋信息 $scrapy runspider fang.py

5 ) 测试radis

  • 在slave项目中另启一个终端,并连接redis数据库
$ redis_cli -p 6379
6379 >lpush fangspider:start_urls https://fang.5i5j.com/bj/loupan/
  • 查看终端输出及rdm中的数据信息

6 ) 将爬取到的数据存入mongodb中

  • 网站的数据爬回来了,但是放在Redis里没有处理。之前我们配置文件里面没有定制自己的ITEM_PIPELINES,而是使用了RedisPipeline,所以现在这些数据都被保存在redis中,所以我们需要另外做处理。

  • 写一个process_5i5j_profile.py文件,然后保持后台运行就可以不停地将爬回来的数据入库了。

import json
import redis
import pymongodef main():# 指定Redis数据库信息rediscli = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)# 指定MongoDB数据库信息mongocli = pymongo.MongoClient(host='localhost', port=27017)# 创建数据库名db = mongocli['demodb']# 创建空间sheet = db['fang']while True:# FIFO模式为 blpop,LIFO模式为 brpop,获取键值source, data = rediscli.blpop(["demo:items"])item = json.loads(data)sheet.insert(item)try:print u"Processing: %(name)s <%(link)s>" % itemexcept KeyError:print u"Error procesing: %r" % itemif __name__ == '__main__':main()
  • MongoDB数据库:$sudo mongod
  • 执行 python process_5i5j_mongodb.py 文件

7 ) 最终部署

  • 准备好1台主机,多台从机,将master项目部署到主机上,将slave项目部署到从机上
  • 将各个机器的ip地址、数据库地址替换成真实的地址
  • 启动从机项目,启动主机项目并在主机上启动process_5i5j_mongodb.py程序
  • 最终验证程序并对项目做相关调整

这篇关于Python笔记:分布式爬虫原理与Scrapy分布式应用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/815683

相关文章

python: 多模块(.py)中全局变量的导入

文章目录 global关键字可变类型和不可变类型数据的内存地址单模块(单个py文件)的全局变量示例总结 多模块(多个py文件)的全局变量from x import x导入全局变量示例 import x导入全局变量示例 总结 global关键字 global 的作用范围是模块(.py)级别: 当你在一个模块(文件)中使用 global 声明变量时,这个变量只在该模块的全局命名空

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

hdu4407(容斥原理)

题意:给一串数字1,2,......n,两个操作:1、修改第k个数字,2、查询区间[l,r]中与n互质的数之和。 解题思路:咱一看,像线段树,但是如果用线段树做,那么每个区间一定要记录所有的素因子,这样会超内存。然后我就做不来了。后来看了题解,原来是用容斥原理来做的。还记得这道题目吗?求区间[1,r]中与p互质的数的个数,如果不会的话就先去做那题吧。现在这题是求区间[l,r]中与n互质的数的和

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习:参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归(SAheart.csv) 【学习笔记】 陈强-机器学习-Python-Ch6 多项逻辑回归 【学习笔记 及 课后题练习】 陈强-机器学习-Python-Ch7 判别分析 【学

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

nudepy,一个有趣的 Python 库!

更多资料获取 📚 个人网站:ipengtao.com 大家好,今天为大家分享一个有趣的 Python 库 - nudepy。 Github地址:https://github.com/hhatto/nude.py 在图像处理和计算机视觉应用中,检测图像中的不适当内容(例如裸露图像)是一个重要的任务。nudepy 是一个基于 Python 的库,专门用于检测图像中的不适当内容。该

pip-tools:打造可重复、可控的 Python 开发环境,解决依赖关系,让代码更稳定

在 Python 开发中,管理依赖关系是一项繁琐且容易出错的任务。手动更新依赖版本、处理冲突、确保一致性等等,都可能让开发者感到头疼。而 pip-tools 为开发者提供了一套稳定可靠的解决方案。 什么是 pip-tools? pip-tools 是一组命令行工具,旨在简化 Python 依赖关系的管理,确保项目环境的稳定性和可重复性。它主要包含两个核心工具:pip-compile 和 pip

hdu4407容斥原理

题意: 有一个元素为 1~n 的数列{An},有2种操作(1000次): 1、求某段区间 [a,b] 中与 p 互质的数的和。 2、将数列中某个位置元素的值改变。 import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.IOException;import java.io.Inpu