Scrapy 源码分析 4 extensions middlewares详解

2024-05-07 10:08

本文主要是介绍Scrapy 源码分析 4 extensions middlewares详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1 简介

scrapy默认的extensions middlewares共有9个

EXTENSIONS = {}EXTENSIONS_BASE = {'scrapy.extensions.corestats.CoreStats': 0,'scrapy.extensions.telnet.TelnetConsole': 0,'scrapy.extensions.memusage.MemoryUsage': 0,'scrapy.extensions.memdebug.MemoryDebugger': 0,'scrapy.extensions.closespider.CloseSpider': 0,'scrapy.extensions.feedexport.FeedExporter': 0,'scrapy.extensions.logstats.LogStats': 0,'scrapy.extensions.spiderstate.SpiderState': 0,'scrapy.extensions.throttle.AutoThrottle': 0,
}
  1. scrapy.extensions.corestats.CoreStats scrapy核心数据统计
  2. scrapy.extensions.telnet.TelnetConsole scrapy运行时开启tcp服务,利用telnet进行连接查询scrapy的实时状态
  3. scrapy.extensions.memusage.MemoryUsage 内存使用预警功能,不能在window上面使用
  4. scrapy.extensions.memdebug.MemoryDebugger  开启gc,垃圾回收,然后统计对应的信息
  5. scrapy.extensions.closespider.CloseSpider 主要功能是控制超时个数、page个数、item个数、错误次数
  6. scrapy.extensions.feedexport.FeedExporter
  7. scrapy.extensions.logstats.LogStats 主要统计page、item的个数等信息,从而计算频率。
  8. scrapy.extensions.spiderstate.SpiderState 保存SpiderState信息
  9. scrapy.extensions.throttle.AutoThrottle 自适应调整延迟下载时间

2 scrapy.extensions.corestats.CoreStats scrapy

class CoreStats:def __init__(self, stats):self.stats = statsself.start_time = None@classmethoddef from_crawler(cls, crawler):o = cls(crawler.stats)crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)crawler.signals.connect(o.item_scraped, signal=signals.item_scraped)crawler.signals.connect(o.item_dropped, signal=signals.item_dropped)crawler.signals.connect(o.response_received, signal=signals.response_received)return odef spider_opened(self, spider):self.start_time = datetime.utcnow()self.stats.set_value('start_time', self.start_time, spider=spider)def spider_closed(self, spider, reason):finish_time = datetime.utcnow()elapsed_time = finish_time - self.start_timeelapsed_time_seconds = elapsed_time.total_seconds()self.stats.set_value('elapsed_time_seconds', elapsed_time_seconds, spider=spider)self.stats.set_value('finish_time', finish_time, spider=spider)self.stats.set_value('finish_reason', reason, spider=spider)def item_scraped(self, item, spider):self.stats.inc_value('item_scraped_count', spider=spider)def response_received(self, spider):self.stats.inc_value('response_received_count', spider=spider)def item_dropped(self, item, spider, exception):reason = exception.__class__.__name__self.stats.inc_value('item_dropped_count', spider=spider)self.stats.inc_value(f'item_dropped_reasons_count/{reason}', spider=spider)

监听spider_opened、spider_closed、item_scraped、item_dropped、response_received信号,进行数据统计。

3 scrapy.extensions.telnet.TelnetConsole

class TelnetConsole(protocol.ServerFactory):def __init__(self, crawler):if not crawler.settings.getbool('TELNETCONSOLE_ENABLED'):raise NotConfiguredif not TWISTED_CONCH_AVAILABLE:raise NotConfigured('TELNETCONSOLE_ENABLED setting is True but required twisted ''modules failed to import:\n' + _TWISTED_CONCH_TRACEBACK)self.crawler = crawlerself.noisy = Falseself.portrange = [int(x) for x in crawler.settings.getlist('TELNETCONSOLE_PORT')]self.host = crawler.settings['TELNETCONSOLE_HOST']self.username = crawler.settings['TELNETCONSOLE_USERNAME']self.password = crawler.settings['TELNETCONSOLE_PASSWORD']if not self.password:self.password = binascii.hexlify(os.urandom(8)).decode('utf8')logger.info('Telnet Password: %s', self.password)self.crawler.signals.connect(self.start_listening, signals.engine_started)self.crawler.signals.connect(self.stop_listening, signals.engine_stopped)@classmethoddef from_crawler(cls, crawler):return cls(crawler)def start_listening(self):self.port = listen_tcp(self.portrange, self.host, self)h = self.port.getHost()logger.info("Telnet console listening on %(host)s:%(port)d",{'host': h.host, 'port': h.port},extra={'crawler': self.crawler})def stop_listening(self):self.port.stopListening()def protocol(self):class Portal:"""An implementation of IPortal"""@defersdef login(self_, credentials, mind, *interfaces):if not (credentials.username == self.username.encode('utf8')and credentials.checkPassword(self.password.encode('utf8'))):raise ValueError("Invalid credentials")protocol = telnet.TelnetBootstrapProtocol(insults.ServerProtocol,manhole.Manhole,self._get_telnet_vars())return (interfaces[0], protocol, lambda: None)return telnet.TelnetTransport(telnet.AuthenticatingTelnetProtocol,Portal())def _get_telnet_vars(self):# Note: if you add entries here also update topics/telnetconsole.rsttelnet_vars = {'engine': self.crawler.engine,'spider': self.crawler.engine.spider,'slot': self.crawler.engine.slot,'crawler': self.crawler,'extensions': self.crawler.extensions,'stats': self.crawler.stats,'settings': self.crawler.settings,'est': lambda: print_engine_status(self.crawler.engine),'p': pprint.pprint,'prefs': print_live_refs,'help': "This is Scrapy telnet console. For more info see: ""https://docs.scrapy.org/en/latest/topics/telnetconsole.html",}self.crawler.signals.send_catch_log(update_telnet_vars, telnet_vars=telnet_vars)return telnet_vars

通过telnet可以执行本地的变量有engine、spider、slot、crawler、extensions、stats、settings、est、p、prefs、help等。

4 scrapy.extensions.memusage.MemoryUsage 内存利用

class MemoryUsage:def __init__(self, crawler):if not crawler.settings.getbool('MEMUSAGE_ENABLED'):raise NotConfiguredtry:# stdlib's resource module is only available on unix platforms.self.resource = import_module('resource')except ImportError:raise NotConfiguredself.crawler = crawlerself.warned = Falseself.notify_mails = crawler.settings.getlist('MEMUSAGE_NOTIFY_MAIL')self.limit = crawler.settings.getint('MEMUSAGE_LIMIT_MB')*1024*1024self.warning = crawler.settings.getint('MEMUSAGE_WARNING_MB')*1024*1024self.check_interval = crawler.settings.getfloat('MEMUSAGE_CHECK_INTERVAL_SECONDS')self.mail = MailSender.from_settings(crawler.settings)crawler.signals.connect(self.engine_started, signal=signals.engine_started)crawler.signals.connect(self.engine_stopped, signal=signals.engine_stopped)@classmethoddef from_crawler(cls, crawler):return cls(crawler)def get_virtual_size(self):size = self.resource.getrusage(self.resource.RUSAGE_SELF).ru_maxrssif sys.platform != 'darwin':# on macOS ru_maxrss is in bytes, on Linux it is in KBsize *= 1024return sizedef engine_started(self):self.crawler.stats.set_value('memusage/startup', self.get_virtual_size())self.tasks = []tsk = task.LoopingCall(self.update)self.tasks.append(tsk)tsk.start(self.check_interval, now=True)if self.limit:tsk = task.LoopingCall(self._check_limit)self.tasks.append(tsk)tsk.start(self.check_interval, now=True)if self.warning:tsk = task.LoopingCall(self._check_warning)self.tasks.append(tsk)tsk.start(self.check_interval, now=True)def engine_stopped(self):for tsk in self.tasks:if tsk.running:tsk.stop()def update(self):self.crawler.stats.max_value('memusage/max', self.get_virtual_size())def _check_limit(self):if self.get_virtual_size() > self.limit:self.crawler.stats.set_value('memusage/limit_reached', 1)mem = self.limit/1024/1024logger.error("Memory usage exceeded %(memusage)dM. Shutting down Scrapy...",{'memusage': mem}, extra={'crawler': self.crawler})if self.notify_mails:subj = (f"{self.crawler.settings['BOT_NAME']} terminated: "f"memory usage exceeded {mem}M at {socket.gethostname()}")self._send_report(self.notify_mails, subj)self.crawler.stats.set_value('memusage/limit_notified', 1)if self.crawler.engine.spider is not None:self.crawler.engine.close_spider(self.crawler.engine.spider, 'memusage_exceeded')else:self.crawler.stop()def _check_warning(self):if self.warned: # warn only oncereturnif self.get_virtual_size() > self.warning:self.crawler.stats.set_value('memusage/warning_reached', 1)mem = self.warning/1024/1024logger.warning("Memory usage reached %(memusage)dM",{'memusage': mem}, extra={'crawler': self.crawler})if self.notify_mails:subj = (f"{self.crawler.settings['BOT_NAME']} warning: "f"memory usage reached {mem}M at {socket.gethostname()}")self._send_report(self.notify_mails, subj)self.crawler.stats.set_value('memusage/warning_notified', 1)self.warned = Truedef _send_report(self, rcpts, subject):"""send notification mail with some additional useful info"""stats = self.crawler.statss = f"Memory usage at engine startup : {stats.get_value('memusage/startup')/1024/1024}M\r\n"s += f"Maximum memory usage          : {stats.get_value('memusage/max')/1024/1024}M\r\n"s += f"Current memory usage          : {self.get_virtual_size()/1024/1024}M\r\n"s += "ENGINE STATUS ------------------------------------------------------- \r\n"s += "\r\n"s += pformat(get_engine_status(self.crawler.engine))s += "\r\n"self.mail.send(rcpts, subject, s)

该功能执行需要部署在linux上,可以配置预警监控、发送预警邮件等,

配置预警邮件参数

MAIL_HOST = 'localhost' # 邮件服务器
MAIL_PORT = 25 # 邮箱端口号
MAIL_FROM = 'scrapy@localhost' # 邮箱名称
MAIL_PASS = None # 邮箱密码
MAIL_USER = None # 邮箱地址

配置预警监控的参数如下

MEMUSAGE_CHECK_INTERVAL_SECONDS = 60.0  # 每60s检测一次
MEMUSAGE_ENABLED = True  # 开启预警监控
MEMUSAGE_LIMIT_MB = 0 # 预警限制使用内存
MEMUSAGE_NOTIFY_MAIL = [] # 预警邮件接收邮箱
MEMUSAGE_WARNING_MB = 0 # 预警警告信息内存大小

当使用内存查过limit和waring内存时,会发送对应的邮件提醒。

5 scrapy.extensions.memdebug.MemoryDebugger 

class MemoryDebugger:def __init__(self, stats):self.stats = stats@classmethoddef from_crawler(cls, crawler):if not crawler.settings.getbool('MEMDEBUG_ENABLED'):raise NotConfiguredo = cls(crawler.stats)crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)return odef spider_closed(self, spider, reason):gc.collect()self.stats.set_value('memdebug/gc_garbage_count', len(gc.garbage), spider=spider)for cls, wdict in live_refs.items():if not wdict:continueself.stats.set_value(f'memdebug/live_refs/{cls.__name__}', len(wdict), spider=spider)

参数

MEMDEBUG_ENABLED = False  # enable memory debugging
MEMDEBUG_NOTIFY = []  # send memory debugging report by mail at engine shutdown

其中MEMDEBUG_NOTITY目前项目中未使用。

主要功能就是开启gc,垃圾回收,然后统计对应的信息。

6 scrapy.extensions.closespider.CloseSpider

class CloseSpider:def __init__(self, crawler):self.crawler = crawlerself.close_on = {'timeout': crawler.settings.getfloat('CLOSESPIDER_TIMEOUT'),'itemcount': crawler.settings.getint('CLOSESPIDER_ITEMCOUNT'),'pagecount': crawler.settings.getint('CLOSESPIDER_PAGECOUNT'),'errorcount': crawler.settings.getint('CLOSESPIDER_ERRORCOUNT'),}if not any(self.close_on.values()):raise NotConfiguredself.counter = defaultdict(int)if self.close_on.get('errorcount'):crawler.signals.connect(self.error_count, signal=signals.spider_error)if self.close_on.get('pagecount'):crawler.signals.connect(self.page_count, signal=signals.response_received)if self.close_on.get('timeout'):crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)if self.close_on.get('itemcount'):crawler.signals.connect(self.item_scraped, signal=signals.item_scraped)crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)@classmethoddef from_crawler(cls, crawler):return cls(crawler)def error_count(self, failure, response, spider):self.counter['errorcount'] += 1if self.counter['errorcount'] == self.close_on['errorcount']:self.crawler.engine.close_spider(spider, 'closespider_errorcount')def page_count(self, response, request, spider):self.counter['pagecount'] += 1if self.counter['pagecount'] == self.close_on['pagecount']:self.crawler.engine.close_spider(spider, 'closespider_pagecount')def spider_opened(self, spider):from twisted.internet import reactorself.task = reactor.callLater(self.close_on['timeout'],self.crawler.engine.close_spider, spider,reason='closespider_timeout')def item_scraped(self, item, spider):self.counter['itemcount'] += 1if self.counter['itemcount'] == self.close_on['itemcount']:self.crawler.engine.close_spider(spider, 'closespider_itemcount')def spider_closed(self, spider):task = getattr(self, 'task', False)if task and task.active():task.cancel()

参数

CLOSESPIDER_TIMEOUT = 0  # download超时次数超过该数值时关系Spider
CLOSESPIDER_PAGECOUNT = 0  # download page个数超过该数值时关系Spider
CLOSESPIDER_ITEMCOUNT = 0  # pipeline item个数超过该数值时关系Spider
CLOSESPIDER_ERRORCOUNT = 0  # download 错误次数超过该数值时关系Spider

主要功能是控制超时个数、page个数、item个数、错误次数等。

7 scrapy.extensions.feedexport.FeedExporter

8 scrapy.extensions.logstats.LogStats

class LogStats:"""Log basic scraping stats periodically"""def __init__(self, stats, interval=60.0):self.stats = statsself.interval = intervalself.multiplier = 60.0 / self.intervalself.task = None@classmethoddef from_crawler(cls, crawler):interval = crawler.settings.getfloat('LOGSTATS_INTERVAL')if not interval:raise NotConfiguredo = cls(crawler.stats, interval)crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)return odef spider_opened(self, spider):self.pagesprev = 0self.itemsprev = 0self.task = task.LoopingCall(self.log, spider)self.task.start(self.interval)def log(self, spider):items = self.stats.get_value('item_scraped_count', 0)pages = self.stats.get_value('response_received_count', 0)irate = (items - self.itemsprev) * self.multiplierprate = (pages - self.pagesprev) * self.multiplierself.pagesprev, self.itemsprev = pages, itemsmsg = ("Crawled %(pages)d pages (at %(pagerate)d pages/min), ""scraped %(items)d items (at %(itemrate)d items/min)")log_args = {'pages': pages, 'pagerate': prate,'items': items, 'itemrate': irate}logger.info(msg, log_args, extra={'spider': spider})def spider_closed(self, spider, reason):if self.task and self.task.running:self.task.stop()

参数

LOGSTATS_INTERVAL = 60.0 # 每60s统计一次数据 当为0时,则不进行统计

主要统计page、item的个数等信息,从而计算频率。

9 scrapy.extensions.spiderstate.SpiderState

class SpiderState:"""Store and load spider state during a scraping job"""def __init__(self, jobdir=None):self.jobdir = jobdir@classmethoddef from_crawler(cls, crawler):jobdir = job_dir(crawler.settings)if not jobdir:raise NotConfiguredobj = cls(jobdir)crawler.signals.connect(obj.spider_closed, signal=signals.spider_closed)crawler.signals.connect(obj.spider_opened, signal=signals.spider_opened)return objdef spider_closed(self, spider):if self.jobdir:with open(self.statefn, 'wb') as f:pickle.dump(spider.state, f, protocol=4)def spider_opened(self, spider):if self.jobdir and os.path.exists(self.statefn):with open(self.statefn, 'rb') as f:spider.state = pickle.load(f)else:spider.state = {}@propertydef statefn(self):return os.path.join(self.jobdir, 'spider.state')

参数

JOBDIR='' # 项目spider state保存地址

配置JOBDIR时,会自动创建文件夹然后保存spider state到文件夹内。默认是不配置的。

10 scrapy.extensions.throttle.AutoThrottle

class AutoThrottle:def __init__(self, crawler):self.crawler = crawlerif not crawler.settings.getbool('AUTOTHROTTLE_ENABLED'):raise NotConfiguredself.debug = crawler.settings.getbool("AUTOTHROTTLE_DEBUG")self.target_concurrency = crawler.settings.getfloat("AUTOTHROTTLE_TARGET_CONCURRENCY")crawler.signals.connect(self._spider_opened, signal=signals.spider_opened)crawler.signals.connect(self._response_downloaded, signal=signals.response_downloaded)@classmethoddef from_crawler(cls, crawler):return cls(crawler)def _spider_opened(self, spider):self.mindelay = self._min_delay(spider)self.maxdelay = self._max_delay(spider)spider.download_delay = self._start_delay(spider)def _min_delay(self, spider):s = self.crawler.settingsreturn getattr(spider, 'download_delay', s.getfloat('DOWNLOAD_DELAY'))def _max_delay(self, spider):return self.crawler.settings.getfloat('AUTOTHROTTLE_MAX_DELAY')def _start_delay(self, spider):return max(self.mindelay, self.crawler.settings.getfloat('AUTOTHROTTLE_START_DELAY'))def _response_downloaded(self, response, request, spider):key, slot = self._get_slot(request, spider)latency = request.meta.get('download_latency')if latency is None or slot is None:returnolddelay = slot.delayself._adjust_delay(slot, latency, response)if self.debug:diff = slot.delay - olddelaysize = len(response.body)conc = len(slot.transferring)logger.info("slot: %(slot)s | conc:%(concurrency)2d | ""delay:%(delay)5d ms (%(delaydiff)+d) | ""latency:%(latency)5d ms | size:%(size)6d bytes",{'slot': key, 'concurrency': conc,'delay': slot.delay * 1000, 'delaydiff': diff * 1000,'latency': latency * 1000, 'size': size},extra={'spider': spider})def _get_slot(self, request, spider):key = request.meta.get('download_slot')return key, self.crawler.engine.downloader.slots.get(key)def _adjust_delay(self, slot, latency, response):"""Define delay adjustment policy"""# If a server needs `latency` seconds to respond then# we should send a request each `latency/N` seconds# to have N requests processed in paralleltarget_delay = latency / self.target_concurrency# Adjust the delay to make it closer to target_delaynew_delay = (slot.delay + target_delay) / 2.0# If target delay is bigger than old delay, then use it instead of mean.# It works better with problematic sites.new_delay = max(target_delay, new_delay)# Make sure self.mindelay <= new_delay <= self.max_delaynew_delay = min(max(self.mindelay, new_delay), self.maxdelay)# Dont adjust delay if response status != 200 and new delay is smaller# than old one, as error pages (and redirections) are usually small and# so tend to reduce latency, thus provoking a positive feedback by# reducing delay instead of increase.if response.status != 200 and new_delay <= slot.delay:returnslot.delay = new_delay

参数

AUTOTHROTTLE_ENABLED = False # 是否开启自适应下载延迟
AUTOTHROTTLE_DEBUG = False # 是否开启自适应DEBUG
AUTOTHROTTLE_MAX_DELAY = 60.0 # 最大延迟60s
AUTOTHROTTLE_START_DELAY = 5.0 # 开始延迟5s
AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0 # 自动调整精度为1s

该功能默认不开启。

这篇关于Scrapy 源码分析 4 extensions middlewares详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/967054

相关文章

详解C#如何提取PDF文档中的图片

《详解C#如何提取PDF文档中的图片》提取图片可以将这些图像资源进行单独保存,方便后续在不同的项目中使用,下面我们就来看看如何使用C#通过代码从PDF文档中提取图片吧... 当 PDF 文件中包含有价值的图片,如艺术画作、设计素材、报告图表等,提取图片可以将这些图像资源进行单独保存,方便后续在不同的项目中使

Android中Dialog的使用详解

《Android中Dialog的使用详解》Dialog(对话框)是Android中常用的UI组件,用于临时显示重要信息或获取用户输入,本文给大家介绍Android中Dialog的使用,感兴趣的朋友一起... 目录android中Dialog的使用详解1. 基本Dialog类型1.1 AlertDialog(

C#数据结构之字符串(string)详解

《C#数据结构之字符串(string)详解》:本文主要介绍C#数据结构之字符串(string),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录转义字符序列字符串的创建字符串的声明null字符串与空字符串重复单字符字符串的构造字符串的属性和常用方法属性常用方法总结摘

Java中StopWatch的使用示例详解

《Java中StopWatch的使用示例详解》stopWatch是org.springframework.util包下的一个工具类,使用它可直观的输出代码执行耗时,以及执行时间百分比,这篇文章主要介绍... 目录stopWatch 是org.springframework.util 包下的一个工具类,使用它

Java进行文件格式校验的方案详解

《Java进行文件格式校验的方案详解》这篇文章主要为大家详细介绍了Java中进行文件格式校验的相关方案,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、背景异常现象原因排查用户的无心之过二、解决方案Magandroidic Number判断主流检测库对比Tika的使用区分zip

Java实现时间与字符串互相转换详解

《Java实现时间与字符串互相转换详解》这篇文章主要为大家详细介绍了Java中实现时间与字符串互相转换的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、日期格式化为字符串(一)使用预定义格式(二)自定义格式二、字符串解析为日期(一)解析ISO格式字符串(二)解析自定义

springboot security快速使用示例详解

《springbootsecurity快速使用示例详解》:本文主要介绍springbootsecurity快速使用示例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝... 目录创www.chinasem.cn建spring boot项目生成脚手架配置依赖接口示例代码项目结构启用s

Python中随机休眠技术原理与应用详解

《Python中随机休眠技术原理与应用详解》在编程中,让程序暂停执行特定时间是常见需求,当需要引入不确定性时,随机休眠就成为关键技巧,下面我们就来看看Python中随机休眠技术的具体实现与应用吧... 目录引言一、实现原理与基础方法1.1 核心函数解析1.2 基础实现模板1.3 整数版实现二、典型应用场景2

一文详解SpringBoot响应压缩功能的配置与优化

《一文详解SpringBoot响应压缩功能的配置与优化》SpringBoot的响应压缩功能基于智能协商机制,需同时满足很多条件,本文主要为大家详细介绍了SpringBoot响应压缩功能的配置与优化,需... 目录一、核心工作机制1.1 自动协商触发条件1.2 压缩处理流程二、配置方案详解2.1 基础YAML

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很