本文主要是介绍Scrapy 源码分析 4 extensions middlewares详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1 简介
scrapy默认的extensions middlewares共有9个
EXTENSIONS = {}EXTENSIONS_BASE = {'scrapy.extensions.corestats.CoreStats': 0,'scrapy.extensions.telnet.TelnetConsole': 0,'scrapy.extensions.memusage.MemoryUsage': 0,'scrapy.extensions.memdebug.MemoryDebugger': 0,'scrapy.extensions.closespider.CloseSpider': 0,'scrapy.extensions.feedexport.FeedExporter': 0,'scrapy.extensions.logstats.LogStats': 0,'scrapy.extensions.spiderstate.SpiderState': 0,'scrapy.extensions.throttle.AutoThrottle': 0,
}
- scrapy.extensions.corestats.CoreStats scrapy核心数据统计
- scrapy.extensions.telnet.TelnetConsole scrapy运行时开启tcp服务,利用telnet进行连接查询scrapy的实时状态
- scrapy.extensions.memusage.MemoryUsage 内存使用预警功能,不能在window上面使用
- scrapy.extensions.memdebug.MemoryDebugger 开启gc,垃圾回收,然后统计对应的信息
- scrapy.extensions.closespider.CloseSpider 主要功能是控制超时个数、page个数、item个数、错误次数
- scrapy.extensions.feedexport.FeedExporter
- scrapy.extensions.logstats.LogStats 主要统计page、item的个数等信息,从而计算频率。
- scrapy.extensions.spiderstate.SpiderState 保存SpiderState信息
- scrapy.extensions.throttle.AutoThrottle 自适应调整延迟下载时间
2 scrapy.extensions.corestats.CoreStats scrapy
class CoreStats:def __init__(self, stats):self.stats = statsself.start_time = None@classmethoddef from_crawler(cls, crawler):o = cls(crawler.stats)crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)crawler.signals.connect(o.item_scraped, signal=signals.item_scraped)crawler.signals.connect(o.item_dropped, signal=signals.item_dropped)crawler.signals.connect(o.response_received, signal=signals.response_received)return odef spider_opened(self, spider):self.start_time = datetime.utcnow()self.stats.set_value('start_time', self.start_time, spider=spider)def spider_closed(self, spider, reason):finish_time = datetime.utcnow()elapsed_time = finish_time - self.start_timeelapsed_time_seconds = elapsed_time.total_seconds()self.stats.set_value('elapsed_time_seconds', elapsed_time_seconds, spider=spider)self.stats.set_value('finish_time', finish_time, spider=spider)self.stats.set_value('finish_reason', reason, spider=spider)def item_scraped(self, item, spider):self.stats.inc_value('item_scraped_count', spider=spider)def response_received(self, spider):self.stats.inc_value('response_received_count', spider=spider)def item_dropped(self, item, spider, exception):reason = exception.__class__.__name__self.stats.inc_value('item_dropped_count', spider=spider)self.stats.inc_value(f'item_dropped_reasons_count/{reason}', spider=spider)
监听spider_opened、spider_closed、item_scraped、item_dropped、response_received信号,进行数据统计。
3 scrapy.extensions.telnet.TelnetConsole
class TelnetConsole(protocol.ServerFactory):def __init__(self, crawler):if not crawler.settings.getbool('TELNETCONSOLE_ENABLED'):raise NotConfiguredif not TWISTED_CONCH_AVAILABLE:raise NotConfigured('TELNETCONSOLE_ENABLED setting is True but required twisted ''modules failed to import:\n' + _TWISTED_CONCH_TRACEBACK)self.crawler = crawlerself.noisy = Falseself.portrange = [int(x) for x in crawler.settings.getlist('TELNETCONSOLE_PORT')]self.host = crawler.settings['TELNETCONSOLE_HOST']self.username = crawler.settings['TELNETCONSOLE_USERNAME']self.password = crawler.settings['TELNETCONSOLE_PASSWORD']if not self.password:self.password = binascii.hexlify(os.urandom(8)).decode('utf8')logger.info('Telnet Password: %s', self.password)self.crawler.signals.connect(self.start_listening, signals.engine_started)self.crawler.signals.connect(self.stop_listening, signals.engine_stopped)@classmethoddef from_crawler(cls, crawler):return cls(crawler)def start_listening(self):self.port = listen_tcp(self.portrange, self.host, self)h = self.port.getHost()logger.info("Telnet console listening on %(host)s:%(port)d",{'host': h.host, 'port': h.port},extra={'crawler': self.crawler})def stop_listening(self):self.port.stopListening()def protocol(self):class Portal:"""An implementation of IPortal"""@defersdef login(self_, credentials, mind, *interfaces):if not (credentials.username == self.username.encode('utf8')and credentials.checkPassword(self.password.encode('utf8'))):raise ValueError("Invalid credentials")protocol = telnet.TelnetBootstrapProtocol(insults.ServerProtocol,manhole.Manhole,self._get_telnet_vars())return (interfaces[0], protocol, lambda: None)return telnet.TelnetTransport(telnet.AuthenticatingTelnetProtocol,Portal())def _get_telnet_vars(self):# Note: if you add entries here also update topics/telnetconsole.rsttelnet_vars = {'engine': self.crawler.engine,'spider': self.crawler.engine.spider,'slot': self.crawler.engine.slot,'crawler': self.crawler,'extensions': self.crawler.extensions,'stats': self.crawler.stats,'settings': self.crawler.settings,'est': lambda: print_engine_status(self.crawler.engine),'p': pprint.pprint,'prefs': print_live_refs,'help': "This is Scrapy telnet console. For more info see: ""https://docs.scrapy.org/en/latest/topics/telnetconsole.html",}self.crawler.signals.send_catch_log(update_telnet_vars, telnet_vars=telnet_vars)return telnet_vars
通过telnet可以执行本地的变量有engine、spider、slot、crawler、extensions、stats、settings、est、p、prefs、help等。
4 scrapy.extensions.memusage.MemoryUsage 内存利用
class MemoryUsage:def __init__(self, crawler):if not crawler.settings.getbool('MEMUSAGE_ENABLED'):raise NotConfiguredtry:# stdlib's resource module is only available on unix platforms.self.resource = import_module('resource')except ImportError:raise NotConfiguredself.crawler = crawlerself.warned = Falseself.notify_mails = crawler.settings.getlist('MEMUSAGE_NOTIFY_MAIL')self.limit = crawler.settings.getint('MEMUSAGE_LIMIT_MB')*1024*1024self.warning = crawler.settings.getint('MEMUSAGE_WARNING_MB')*1024*1024self.check_interval = crawler.settings.getfloat('MEMUSAGE_CHECK_INTERVAL_SECONDS')self.mail = MailSender.from_settings(crawler.settings)crawler.signals.connect(self.engine_started, signal=signals.engine_started)crawler.signals.connect(self.engine_stopped, signal=signals.engine_stopped)@classmethoddef from_crawler(cls, crawler):return cls(crawler)def get_virtual_size(self):size = self.resource.getrusage(self.resource.RUSAGE_SELF).ru_maxrssif sys.platform != 'darwin':# on macOS ru_maxrss is in bytes, on Linux it is in KBsize *= 1024return sizedef engine_started(self):self.crawler.stats.set_value('memusage/startup', self.get_virtual_size())self.tasks = []tsk = task.LoopingCall(self.update)self.tasks.append(tsk)tsk.start(self.check_interval, now=True)if self.limit:tsk = task.LoopingCall(self._check_limit)self.tasks.append(tsk)tsk.start(self.check_interval, now=True)if self.warning:tsk = task.LoopingCall(self._check_warning)self.tasks.append(tsk)tsk.start(self.check_interval, now=True)def engine_stopped(self):for tsk in self.tasks:if tsk.running:tsk.stop()def update(self):self.crawler.stats.max_value('memusage/max', self.get_virtual_size())def _check_limit(self):if self.get_virtual_size() > self.limit:self.crawler.stats.set_value('memusage/limit_reached', 1)mem = self.limit/1024/1024logger.error("Memory usage exceeded %(memusage)dM. Shutting down Scrapy...",{'memusage': mem}, extra={'crawler': self.crawler})if self.notify_mails:subj = (f"{self.crawler.settings['BOT_NAME']} terminated: "f"memory usage exceeded {mem}M at {socket.gethostname()}")self._send_report(self.notify_mails, subj)self.crawler.stats.set_value('memusage/limit_notified', 1)if self.crawler.engine.spider is not None:self.crawler.engine.close_spider(self.crawler.engine.spider, 'memusage_exceeded')else:self.crawler.stop()def _check_warning(self):if self.warned: # warn only oncereturnif self.get_virtual_size() > self.warning:self.crawler.stats.set_value('memusage/warning_reached', 1)mem = self.warning/1024/1024logger.warning("Memory usage reached %(memusage)dM",{'memusage': mem}, extra={'crawler': self.crawler})if self.notify_mails:subj = (f"{self.crawler.settings['BOT_NAME']} warning: "f"memory usage reached {mem}M at {socket.gethostname()}")self._send_report(self.notify_mails, subj)self.crawler.stats.set_value('memusage/warning_notified', 1)self.warned = Truedef _send_report(self, rcpts, subject):"""send notification mail with some additional useful info"""stats = self.crawler.statss = f"Memory usage at engine startup : {stats.get_value('memusage/startup')/1024/1024}M\r\n"s += f"Maximum memory usage : {stats.get_value('memusage/max')/1024/1024}M\r\n"s += f"Current memory usage : {self.get_virtual_size()/1024/1024}M\r\n"s += "ENGINE STATUS ------------------------------------------------------- \r\n"s += "\r\n"s += pformat(get_engine_status(self.crawler.engine))s += "\r\n"self.mail.send(rcpts, subject, s)
该功能执行需要部署在linux上,可以配置预警监控、发送预警邮件等,
配置预警邮件参数
MAIL_HOST = 'localhost' # 邮件服务器
MAIL_PORT = 25 # 邮箱端口号
MAIL_FROM = 'scrapy@localhost' # 邮箱名称
MAIL_PASS = None # 邮箱密码
MAIL_USER = None # 邮箱地址
配置预警监控的参数如下
MEMUSAGE_CHECK_INTERVAL_SECONDS = 60.0 # 每60s检测一次
MEMUSAGE_ENABLED = True # 开启预警监控
MEMUSAGE_LIMIT_MB = 0 # 预警限制使用内存
MEMUSAGE_NOTIFY_MAIL = [] # 预警邮件接收邮箱
MEMUSAGE_WARNING_MB = 0 # 预警警告信息内存大小
当使用内存查过limit和waring内存时,会发送对应的邮件提醒。
5 scrapy.extensions.memdebug.MemoryDebugger
class MemoryDebugger:def __init__(self, stats):self.stats = stats@classmethoddef from_crawler(cls, crawler):if not crawler.settings.getbool('MEMDEBUG_ENABLED'):raise NotConfiguredo = cls(crawler.stats)crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)return odef spider_closed(self, spider, reason):gc.collect()self.stats.set_value('memdebug/gc_garbage_count', len(gc.garbage), spider=spider)for cls, wdict in live_refs.items():if not wdict:continueself.stats.set_value(f'memdebug/live_refs/{cls.__name__}', len(wdict), spider=spider)
参数
MEMDEBUG_ENABLED = False # enable memory debugging
MEMDEBUG_NOTIFY = [] # send memory debugging report by mail at engine shutdown
其中MEMDEBUG_NOTITY目前项目中未使用。
主要功能就是开启gc,垃圾回收,然后统计对应的信息。
6 scrapy.extensions.closespider.CloseSpider
class CloseSpider:def __init__(self, crawler):self.crawler = crawlerself.close_on = {'timeout': crawler.settings.getfloat('CLOSESPIDER_TIMEOUT'),'itemcount': crawler.settings.getint('CLOSESPIDER_ITEMCOUNT'),'pagecount': crawler.settings.getint('CLOSESPIDER_PAGECOUNT'),'errorcount': crawler.settings.getint('CLOSESPIDER_ERRORCOUNT'),}if not any(self.close_on.values()):raise NotConfiguredself.counter = defaultdict(int)if self.close_on.get('errorcount'):crawler.signals.connect(self.error_count, signal=signals.spider_error)if self.close_on.get('pagecount'):crawler.signals.connect(self.page_count, signal=signals.response_received)if self.close_on.get('timeout'):crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)if self.close_on.get('itemcount'):crawler.signals.connect(self.item_scraped, signal=signals.item_scraped)crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)@classmethoddef from_crawler(cls, crawler):return cls(crawler)def error_count(self, failure, response, spider):self.counter['errorcount'] += 1if self.counter['errorcount'] == self.close_on['errorcount']:self.crawler.engine.close_spider(spider, 'closespider_errorcount')def page_count(self, response, request, spider):self.counter['pagecount'] += 1if self.counter['pagecount'] == self.close_on['pagecount']:self.crawler.engine.close_spider(spider, 'closespider_pagecount')def spider_opened(self, spider):from twisted.internet import reactorself.task = reactor.callLater(self.close_on['timeout'],self.crawler.engine.close_spider, spider,reason='closespider_timeout')def item_scraped(self, item, spider):self.counter['itemcount'] += 1if self.counter['itemcount'] == self.close_on['itemcount']:self.crawler.engine.close_spider(spider, 'closespider_itemcount')def spider_closed(self, spider):task = getattr(self, 'task', False)if task and task.active():task.cancel()
参数
CLOSESPIDER_TIMEOUT = 0 # download超时次数超过该数值时关系Spider
CLOSESPIDER_PAGECOUNT = 0 # download page个数超过该数值时关系Spider
CLOSESPIDER_ITEMCOUNT = 0 # pipeline item个数超过该数值时关系Spider
CLOSESPIDER_ERRORCOUNT = 0 # download 错误次数超过该数值时关系Spider
主要功能是控制超时个数、page个数、item个数、错误次数等。
7 scrapy.extensions.feedexport.FeedExporter
8 scrapy.extensions.logstats.LogStats
class LogStats:"""Log basic scraping stats periodically"""def __init__(self, stats, interval=60.0):self.stats = statsself.interval = intervalself.multiplier = 60.0 / self.intervalself.task = None@classmethoddef from_crawler(cls, crawler):interval = crawler.settings.getfloat('LOGSTATS_INTERVAL')if not interval:raise NotConfiguredo = cls(crawler.stats, interval)crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)return odef spider_opened(self, spider):self.pagesprev = 0self.itemsprev = 0self.task = task.LoopingCall(self.log, spider)self.task.start(self.interval)def log(self, spider):items = self.stats.get_value('item_scraped_count', 0)pages = self.stats.get_value('response_received_count', 0)irate = (items - self.itemsprev) * self.multiplierprate = (pages - self.pagesprev) * self.multiplierself.pagesprev, self.itemsprev = pages, itemsmsg = ("Crawled %(pages)d pages (at %(pagerate)d pages/min), ""scraped %(items)d items (at %(itemrate)d items/min)")log_args = {'pages': pages, 'pagerate': prate,'items': items, 'itemrate': irate}logger.info(msg, log_args, extra={'spider': spider})def spider_closed(self, spider, reason):if self.task and self.task.running:self.task.stop()
参数
LOGSTATS_INTERVAL = 60.0 # 每60s统计一次数据 当为0时,则不进行统计
主要统计page、item的个数等信息,从而计算频率。
9 scrapy.extensions.spiderstate.SpiderState
class SpiderState:"""Store and load spider state during a scraping job"""def __init__(self, jobdir=None):self.jobdir = jobdir@classmethoddef from_crawler(cls, crawler):jobdir = job_dir(crawler.settings)if not jobdir:raise NotConfiguredobj = cls(jobdir)crawler.signals.connect(obj.spider_closed, signal=signals.spider_closed)crawler.signals.connect(obj.spider_opened, signal=signals.spider_opened)return objdef spider_closed(self, spider):if self.jobdir:with open(self.statefn, 'wb') as f:pickle.dump(spider.state, f, protocol=4)def spider_opened(self, spider):if self.jobdir and os.path.exists(self.statefn):with open(self.statefn, 'rb') as f:spider.state = pickle.load(f)else:spider.state = {}@propertydef statefn(self):return os.path.join(self.jobdir, 'spider.state')
参数
JOBDIR='' # 项目spider state保存地址
配置JOBDIR时,会自动创建文件夹然后保存spider state到文件夹内。默认是不配置的。
10 scrapy.extensions.throttle.AutoThrottle
class AutoThrottle:def __init__(self, crawler):self.crawler = crawlerif not crawler.settings.getbool('AUTOTHROTTLE_ENABLED'):raise NotConfiguredself.debug = crawler.settings.getbool("AUTOTHROTTLE_DEBUG")self.target_concurrency = crawler.settings.getfloat("AUTOTHROTTLE_TARGET_CONCURRENCY")crawler.signals.connect(self._spider_opened, signal=signals.spider_opened)crawler.signals.connect(self._response_downloaded, signal=signals.response_downloaded)@classmethoddef from_crawler(cls, crawler):return cls(crawler)def _spider_opened(self, spider):self.mindelay = self._min_delay(spider)self.maxdelay = self._max_delay(spider)spider.download_delay = self._start_delay(spider)def _min_delay(self, spider):s = self.crawler.settingsreturn getattr(spider, 'download_delay', s.getfloat('DOWNLOAD_DELAY'))def _max_delay(self, spider):return self.crawler.settings.getfloat('AUTOTHROTTLE_MAX_DELAY')def _start_delay(self, spider):return max(self.mindelay, self.crawler.settings.getfloat('AUTOTHROTTLE_START_DELAY'))def _response_downloaded(self, response, request, spider):key, slot = self._get_slot(request, spider)latency = request.meta.get('download_latency')if latency is None or slot is None:returnolddelay = slot.delayself._adjust_delay(slot, latency, response)if self.debug:diff = slot.delay - olddelaysize = len(response.body)conc = len(slot.transferring)logger.info("slot: %(slot)s | conc:%(concurrency)2d | ""delay:%(delay)5d ms (%(delaydiff)+d) | ""latency:%(latency)5d ms | size:%(size)6d bytes",{'slot': key, 'concurrency': conc,'delay': slot.delay * 1000, 'delaydiff': diff * 1000,'latency': latency * 1000, 'size': size},extra={'spider': spider})def _get_slot(self, request, spider):key = request.meta.get('download_slot')return key, self.crawler.engine.downloader.slots.get(key)def _adjust_delay(self, slot, latency, response):"""Define delay adjustment policy"""# If a server needs `latency` seconds to respond then# we should send a request each `latency/N` seconds# to have N requests processed in paralleltarget_delay = latency / self.target_concurrency# Adjust the delay to make it closer to target_delaynew_delay = (slot.delay + target_delay) / 2.0# If target delay is bigger than old delay, then use it instead of mean.# It works better with problematic sites.new_delay = max(target_delay, new_delay)# Make sure self.mindelay <= new_delay <= self.max_delaynew_delay = min(max(self.mindelay, new_delay), self.maxdelay)# Dont adjust delay if response status != 200 and new delay is smaller# than old one, as error pages (and redirections) are usually small and# so tend to reduce latency, thus provoking a positive feedback by# reducing delay instead of increase.if response.status != 200 and new_delay <= slot.delay:returnslot.delay = new_delay
参数
AUTOTHROTTLE_ENABLED = False # 是否开启自适应下载延迟
AUTOTHROTTLE_DEBUG = False # 是否开启自适应DEBUG
AUTOTHROTTLE_MAX_DELAY = 60.0 # 最大延迟60s
AUTOTHROTTLE_START_DELAY = 5.0 # 开始延迟5s
AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0 # 自动调整精度为1s
该功能默认不开启。
这篇关于Scrapy 源码分析 4 extensions middlewares详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!