一次Python性能调优经历

2024-06-16 13:58

本文主要是介绍一次Python性能调优经历,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

从毕业到现在一直都是做java的,最近因为转到数据组,所以开始着手学python。
近期一个项目需求是把百度百科词条页面里面的“明星关系”抽取出来并存储,当然这里不用实时的从百度百科上去爬取,百度百科的词条信息已经全部在我们mongodb库中,总计1700万的词条,当然这里有很大的重复。
当然这里,因为数据量太大,所以必须要开启多线程处理,最开始用的是python的concurrent包里面的futures模块,用该模块下的ThreadPoolExecutor线程池来处理,代码如下:
import zlib
import simplejson
import pymongo
import codecs
import threading
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSouphost = "10.10.0.0"
port = 20000
db = "baike_db"
username = "username"
passwd = "passwd"
crawlCol = "crawlCol"
browserCol = "brawlCol"
dbClient = pymongo.MongoClient(host, port)[db]
dbClient.authenticate(username, passwd)
crawlDb = dbClient[crawlCol]
browserCol = dbClient[browserCol]
subviewUrl = ""
lock = threading.Lock()
writeLock = threading.Lock()
handledUrlSet = set()
exceptionFile = codecs.open("exceptionFile.o", "a", "utf-8")
relationFile = codecs.open("relationFile.o", "a", "utf-8")
handledUrlFile = codecs.open("handledUrlFile.o", "a", "utf-8")
def handleDocument(doc):try:subviewUrl = doc['subview_url']print subviewUrllock.acquire()if not(subviewUrl in handledUrlSet):handledUrlSet.add(subviewUrl)lock.release()text = zlib.decompress(doc['text'])htmlDoc = BeautifulSoup(text, 'html5lib')relations =  htmlDoc.select(".star-info-block.relations")if relations:relation = relations[0]reList = relations.select("li")if reList:realList = []for real in reList:aList = real.select("a")if aList:people = aList[0]data = simplejson.loads("{}")data['href'] = people['href']data['src'] = people.img['src']em = people.em.extract()data['name'] = em.text.strip()data['relation'] = people.text.strip()realList.append(data)if realList:record = simplejson.loads("{}")record['subviewUrl'] = realListprint(record)writeLock.acquire()relationFile.write(str(record) + "\n")writeLock.release()else:lock.release()except BaseException as e:exceptionFile.write(subviewUrl + "\t" + str(e) + "\n")# 创建一个大小为15的线程池
pool = ThreadPoolExecutor(max_workers=15)# 先处理比较新的browser数据
docBrowserList = browserCol.find()
index = 0
for doc in docBrowserList:print("browser:" + str(index))index += 1pool.submit(handleDocument, doc)index = 0# 再处理比较老的crawl数据
docCrawlList = crawlCol.find()
for doc in docCrawlList:print("crawl:" + str(index))index += 1pool.submit(handleDocument, doc)思想就是创建一个大小为15的线程池,然后主线程每次从mongodb里面取出一个document后,提交给线程池处理。线程池内部的有一个队列,当提交的任务超过了线程池最大可以同时处理的任务时,新提交的任务就会放入内部队列中,但是因为我们这里有1700w的数据,所以如果用线程池的话要么队列会被撑爆而抛异常,要么内存会被撑爆。当然在这个基础上,可以通过捕获队列满时抛出的异常来做响应处理,比如等待一段时间,理论上也是可以用线程池处理的。但是我比较懒,一般来说,我都会选择比较简便的办法。
所以我改用threading模块,代码如下
#-*-coding:utf-8 -*-
import zlib
import Queue
import simplejson
import pymongo
import codecs
import threading
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSouphost = "10.10.0.0"
port = 20000
db = "baike_db"
username = "username"
passwd = "passwd"
crawlCol = "crawlCol"
browserCol = "brawlCol"
dbClient = pymongo.MongoClient(host, port)[db]
dbClient.authenticate(username, passwd)
crawlDb = dbClient[crawlCol]
browserCol = dbClient[browserCol]
subviewUrl = ""
lock = threading.Lock()
writeLock = threading.Lock()
exceptLock = threading.Lock()
handledUrlSet = set()
docQue = Queue.Queue(maxsize=1000)
exceptionFile = codecs.open("exceptionFile.o", "a", "utf-8")
relationFile = codecs.open("relationFile.o", "a", "utf-8")
handledUrlFile = codecs.open("handledUrlFile.o", "a", "utf-8")def handleDocument(doc):try:	subviewUrl = doc['subview_url']print subviewUrllock.acquire()if not(subviewUrl in handledUrlSet):handledUrlSet.add(subviewUrl)lock.release()text = zlib.decompress(doc['text'])htmlDoc = BeautifulSoup(text, 'html5lib')relations =  htmlDoc.select(".star-info-block.relations")if relations:relation = relations[0]reList = relations.select("li")if reList:realList = []for real in reList:aList = real.select("a")if aList:people = aList[0]data = simplejson.loads("{}")data['href'] = people['href']data['src'] = people.img['src']  em = people.em.extract()data['name'] = em.text.strip()data['relation'] = people.text.strip()realList.append(data)if realList:record = simplejson.loads("{}")record['subviewUrl'] = realListprint(record) writeLock.acquire()relationFile.write(str(record) + "\n")writeLock.release()else:lock.release()except BaseException as e:exceptLock.acquire()exceptionFile.write(subviewUrl + "\t" + str(e) + "\n")    exceptLock.release()class handleThread(threading.Thread):def __init__(self, threadId):threading.Thread.__init__(self)self.threadId = threadIddef run(self):global docQuewhile True:doc = docQue.get(block=True)if doc:handleDocument(doc)else:print("thread%d run over" % threadId) breakclass getDocumentThread(threading.Thread):def __init__(self, threadId):threading.Thread.__init__(self)self.threadId = threadIddef run(self):global docQueglobal browserColglobal crawlColdocBrowserList = browserCol.find()index = 0for doc in docBrowserList:print("browser:" + str(index) + "to queue")index += 1docQue.put(doc, block=True)index = 0docCrawlList = crawlCol.find()for doc in docCrawlList:print("crawl:" + str(index) + "to queue")index += 1docQue.put(doc, block=True)#设置结束标记for i in range(15):docQue.put(None, block=True)getThread = getDocumentThread(1)
getThread.start()for j in range(15):thread = handleThread(j)thread.start()于是乎,跑起来,但是发现速度慢的跟狗,一秒处理几十个,我擦,这样算的话我1700w要搞到猴年马月去?那为什么处理这么慢呢?对于我这个刚入门python的人来说过段找了下其他长期用python的同事,得到的回复是这样的:
python本身虽然支持多线程的功能,但是这多个线程会受一个GIL限制,就类似于一个全局的锁,所以意思就是多个线程在这里的时候就要排队了具体关于GIL的介绍,大家可以参考这篇文章:
http://cenalulu.github.io/python/gil-in-python/
知道了python的多线程是这样的玩意后,我惊呆了,这是明显的缺陷啊,好在python提供了多进程来弥补这个缺陷,所以我将代码改成了多进程,代码如下:
#-*-coding:utf-8 -*-
import zlib
import Queue
import simplejson
import pymongo
import codecs
import time
from multiprocessing import queues
import multiprocessing
import threading
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
from multiprocessing import Process, Managerhost = "10.10.0.0"
port = 20000
db = "baike_db"
username = "username"
passwd = "passwd"
crawlCol = "crawlCol"
browserCol = "brawlCol"dbClient = pymongo.MongoClient(host, port)[db]
dbClient.authenticate(username, passwd)
crawlDb = dbClient[crawlCol]
browserCol = dbClient[browserCol]m = Manager()
lock = multiprocessing.Lock()
writeLock = multiprocessing.Lock()
exceptLock = multiprocessing.Lock()
handledUrlDict = m.dict()
docQue = queues.Queue(maxsize=1000)
exceptionFile = codecs.open("exceptionFile.o", "a", "utf-8")
relationFile = codecs.open("relationFile.o", "a", "utf-8")
#handledUrlFile = codecs.open("handledUrlFile.o", "a", "utf-8")
pCount = 15def handleDocument(doc, threadId, lock, exceptLock, writeLock, handledUrlDict):subviewUrl = ""#start = int(time.time() * 1000)try: subviewUrl = doc['subview_url']print "thread" + str(threadId) + subviewUrllock.acquire()if not(subviewUrl in handledUrlDict):handledUrlDict[subviewUrl] = 1lock.release()#       s2 = int(time.time() * 1000)#      print("threadId %d s2 use: %d" % (threadId, s2 - start))text = zlib.decompress(doc['text']).decode('utf-8')#     s4 = int(time.time() * 1000)#    print("threadId %d s4 use: %d" % (threadId, s4 - s2))htmlDoc = BeautifulSoup(text, 'lxml')#   s5 = int(time.time() * 1000)#  print("threadId %d s5 use: %d" % (threadId, s5 - s4))relations =  htmlDoc.select(".star-info-block.relations")# s3 = int(time.time() * 1000)# print("threadId %d s3 use: %d" % (threadId, s3 - s5))if relations:relation = relations[0]reList = relation.select("li")if reList:realList = []for real in reList:aList = real.select("a")if aList:people = aList[0]data = simplejson.loads("{}")data['href'] = people['href']data['src'] = people.img['src']  em = people.em.extract()data['name'] = em.text.strip()data['relation'] = people.text.strip()realList.append(data)if realList:record = simplejson.loads("{}")record['subviewUrl'] = subviewUrlrecord['realList']= realList print(record) writeLock.acquire()relationFile.write(str(record) + "\n")writeLock.release()# s4 = int(time.time() * 1000)# print("threadId %d s4 use: %d" % (threadId, s4 - s3))else:lock.release()passexcept BaseException as e:exceptLock.acquire()exceptionFile.write(subviewUrl + "\t" + str(e) + "\n")    exceptLock.release()#print("threadId %d total use: %d" % (threadId, int(time.time() * 1000) - start))class handleProcess(multiprocessing.Process):def __init__(self, threadId, docQue, lock, exceptLock, writeLock, handledUrlDict):multiprocessing.Process.__init__(self)self.threadId = threadIdself.docQue = docQueself.lock = lockself.exceptLock = exceptLockself.writeLock = writeLockself.handledUrlDict = handledUrlDictdef run(self):while True:doc = docQue.get(block=True)if doc:handleDocument(doc, self.threadId, lock, exceptLock, writeLock, handledUrlDict)else:print("thread%d run over" % threadId) breakclass getDocumentThread(threading.Thread):def __init__(self, threadId):threading.Thread.__init__(self)self.threadId = threadIddef run(self):global docQueglobal browserColglobal crawlColdocBrowserList = browserCol.find()index = 0for doc in docBrowserList:print("browser:" + str(index) + "to queue")index += 1docQue.put(doc, block=True)index = 0docCrawlList = crawlCol.find()for doc in docCrawlList:print("crawl:" + str(index) + "to queue")index += 1docQue.put(doc, block=True)#设置结束标记global pCountfor i in range(pCount):docQue.put(None, block=True)getThread = getDocumentThread(1)
getThread.start()for j in range(pCount):process = handleProcess(j, docQue, lock, exceptLock, writeLock, handledUrlDict)process.start()#process.join()多进程有个什么蛋疼的问题呢。因为多进程的内存是独立的,他们之间是不共享内存的,那怎么通信?多个进程之间怎么共享变量?python提供的很多种解决方案,都在multiprocessing模块里面,比如队列,字典等,如果这几个模块里面要访问线程不安全的对象怎么办?它提供的全局锁可以解决这个问题。
代码运行起来后,发现速度特别慢,评估了下,要处理完这1700w的数据需要45个小时,我去,这个时间不能忍受,于是找原因,看时间主要消耗在哪里,所以代码中我加了很多日志,记录每个步骤消耗的时间,最后发现耗时主要在如下代码:
htmlDoc = BeautifulSoup(text, 'html5lib')(这里改之前的代码)
于是我查了下BeanutifulSoup的文档,它有好几个解析器,分别为html.parse, lxml,html5lib等,你可以直接参考它的官方文档:
https://www.crummy.com/software/BeautifulSoup/bs4/doc/index.zh.html
文档中比较了这几个解释器,可以很清楚的看到文档说html5lib解析器很慢,所以我改用了lxml,所以瞬间提升了4.5倍,这个时间就可以接收了。最后抛出一个这里我还没有弄明白的一个问题:子进程和主线程的退出时机
import codecs
import time
import threading
from multiprocessing import Process, Manager
from multiprocessing import queues
import multiprocessing
# 每个子进程执行的函数
# 参数中,传递了一个用于多进程之间数据共享的特殊字典
f = codecs.open("exceptionFile.o", "a", "utf-8")def func1(d,q, f,s):print(d)f.write("11\n")d['k3']= 'k3's.add('33')print("11:" + str('11' in s))print("k1:" + str("k1" in d))
def func2(d,q, f,s):print(d)f.write("11\n")print("33:" + str('33' in s))print("k3:" + str('k3' in d))q = queues.Queue(1000)
for j in range(1000):q.put(j)class testProc(Process):def __init__(self, d):multiprocessing.Process.__init__(self)self.d = ddef run(self):print("testk3:" + str('k3' in d))class testThread(threading.Thread):def run(self):#time.sleep(5)pass# 在主进程中创建特殊字典
m = Manager()
d = m.dict()
s = set()
s.add('11')
d['k1'] = 'k1'
d['k2'] = 3
t = testThread()
t.start()
p1 = Process(target=func1, args=(d,q,f,s))
p1.start()p2 = Process(target=func2, args=(d,q,f,s))
p2.start()如上代码,会报如下错误:
<DictProxy object, typeid 'dict' at 0x7f7110976dd0; '__str__()' failed>
Process Process-2:self._connect()File "/usr/local/lib/python2.7/multiprocessing/managers.py", line 742, in _connect
Traceback (most recent call last):conn = self._Client(self._token.address, authkey=self._authkey)File "/usr/local/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrapFile "/usr/local/lib/python2.7/multiprocessing/connection.py", line 169, in Clientself.run()File "/usr/local/lib/python2.7/multiprocessing/process.py", line 114, in runself._target(*self._args, **self._kwargs)File "test.py", line 15, in func1c = SocketClient(address)d['k3']= 'k3'File "<string>", line 2, in __setitem__File "/usr/local/lib/python2.7/multiprocessing/connection.py", line 308, in SocketClientFile "/usr/local/lib/python2.7/multiprocessing/managers.py", line 755, in _callmethods.connect(address)File "/usr/local/lib/python2.7/socket.py", line 228, in methself._connect()File "/usr/local/lib/python2.7/multiprocessing/managers.py", line 742, in _connectconn = self._Client(self._token.address, authkey=self._authkey)File "/usr/local/lib/python2.7/multiprocessing/connection.py", line 169, in Clientreturn getattr(self._sock,name)(*args)
error: [Errno 2] No such file or directoryc = SocketClient(address)File "/usr/local/lib/python2.7/multiprocessing/connection.py", line 308, in SocketClients.connect(address)File "/usr/local/lib/python2.7/socket.py", line 228, in methreturn getattr(self._sock,name)(*args)
error: [Errno 2] No such file or directory查了下错误原因:说是因为主线程退出了,导致子线程执行失败,实践证明解决办法有两个
第一个:在代码最后执行:
p1.join()
p2.join()
即明确告诉主进程,你必须在我们两个子进程执行完之后才能继续执行,这样主线程就不会提前退出
第二个:让testThread开启后休眠,这样也可以正常执行,但是这是为什么呢?如果testThread休眠时间到了,但是p1和p2还没有执行完,是不是也会直接抛异常呢?
通过这个例子可以发现,我处理百度百科的数据代码中,主线程开启了一个getDocumentThread线程不停的往队列中丢数据,另外开启了15个进程不段的从队列中取数据并处理,现在初期可以正常执行时因为getDocumentThread线程还在执行,如果到了最后的1000条记录,该线程执行完后,先退出,此时15个进程因为还有1000个数据要处理,此时会不会因为getDocumentThread线程的退出而抛异常呢?
希望看了这篇的文档的朋友,如果知道,可以回复下我。

这篇关于一次Python性能调优经历的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1066635

相关文章

Python 字符串占位

在Python中,可以使用字符串的格式化方法来实现字符串的占位。常见的方法有百分号操作符 % 以及 str.format() 方法 百分号操作符 % name = "张三"age = 20message = "我叫%s,今年%d岁。" % (name, age)print(message) # 我叫张三,今年20岁。 str.format() 方法 name = "张三"age

一道经典Python程序样例带你飞速掌握Python的字典和列表

Python中的列表(list)和字典(dict)是两种常用的数据结构,它们在数据组织和存储方面有很大的不同。 列表(List) 列表是Python中的一种有序集合,可以随时添加和删除其中的元素。列表中的元素可以是任何数据类型,包括数字、字符串、其他列表等。列表使用方括号[]表示,元素之间用逗号,分隔。 定义和使用 # 定义一个列表 fruits = ['apple', 'banana

Python应用开发——30天学习Streamlit Python包进行APP的构建(9)

st.area_chart 显示区域图。 这是围绕 st.altair_chart 的语法糖。主要区别在于该命令使用数据自身的列和指数来计算图表的 Altair 规格。因此,在许多 "只需绘制此图 "的情况下,该命令更易于使用,但可定制性较差。 如果 st.area_chart 无法正确猜测数据规格,请尝试使用 st.altair_chart 指定所需的图表。 Function signa

python实现最简单循环神经网络(RNNs)

Recurrent Neural Networks(RNNs) 的模型: 上图中红色部分是输入向量。文本、单词、数据都是输入,在网络里都以向量的形式进行表示。 绿色部分是隐藏向量。是加工处理过程。 蓝色部分是输出向量。 python代码表示如下: rnn = RNN()y = rnn.step(x) # x为输入向量,y为输出向量 RNNs神经网络由神经元组成, python

python 喷泉码

因为要完成毕业设计,毕业设计做的是数据分发与传输的东西。在网络中数据容易丢失,所以我用fountain code做所发送数据包的数据恢复。fountain code属于有限域编码的一部分,有很广泛的应用。 我们日常生活中使用的二维码,就用到foutain code做数据恢复。你遮住二维码的四分之一,用手机的相机也照样能识别。你遮住的四分之一就相当于丢失的数据包。 为了实现并理解foutain

python 点滴学

1 python 里面tuple是无法改变的 tuple = (1,),计算tuple里面只有一个元素,也要加上逗号 2  1 毕业论文改 2 leetcode第一题做出来

Python爬虫-贝壳新房

前言 本文是该专栏的第32篇,后面会持续分享python爬虫干货知识,记得关注。 本文以某房网为例,如下图所示,采集对应城市的新房房源数据。具体实现思路和详细逻辑,笔者将在正文结合完整代码进行详细介绍。接下来,跟着笔者直接往下看正文详细内容。(附带完整代码) 正文 地址:aHR0cHM6Ly93aC5mYW5nLmtlLmNvbS9sb3VwYW4v 目标:采集对应城市的

python 在pycharm下能导入外面的模块,到terminal下就不能导入

项目结构如下,在ic2ctw.py 中导入util,在pycharm下不报错,但是到terminal下运行报错  File "deal_data/ic2ctw.py", line 3, in <module>     import util 解决方案: 暂时方案:在终端下:export PYTHONPATH=/Users/fujingling/PycharmProjects/PSENe

将一维机械振动信号构造为训练集和测试集(Python)

从如下链接中下载轴承数据集。 https://www.sciencedirect.com/science/article/pii/S2352340918314124 import numpy as npimport scipy.io as sioimport matplotlib.pyplot as pltimport statistics as statsimport pandas

剑指offer(C++)--数组中只出现一次的数字

题目 一个整型数组里除了两个数字之外,其他的数字都出现了两次。请写程序找出这两个只出现一次的数字。 class Solution {public:void FindNumsAppearOnce(vector<int> data,int* num1,int *num2) {int len = data.size();if(len<2)return;int one = 0;for(int i