Python处理海量数据的实战研究

2024-06-09 17:38

本文主要是介绍Python处理海量数据的实战研究,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

最近看了July的一些关于Java处理海量数据的问题研究,深有感触,链接:http://blog.csdn.net/v_july_v/article/details/6685962

感谢July ^_^

他用的是Java的Hash Map等方法做了处理,讲解的非常深刻入骨


我也一时兴起,想拿Python试试刀,看看Python对于海量数据的处理能力如何。无奈在百度和Google输入“Python 海量数据”都无果。可能是国内使用python的不多,用python处理海量数据的就更少了。不过这浇灭不了我的欲望,哈哈


打算拿July的其中一个问题来试验一下:

[plain]  view plain copy
  1. <strong>海量日志数据,提取出某日访问百度次数最多的那个IP。</strong>  

July给出的解决方案:
[plain]  view plain copy
  1. 方案1:首先是这一天,并且是访问百度的日志中的IP取出来,逐个写入到一个大文件中。注意到IP是32位的,最多有2^32个IP。同样可以采用映射的方法,比如模1000,把整个大文件映射为1000个小文件,再找出每个小文中出现频率最大的IP(可以采用hash_map进行频率统计,然后再找出频率最大的几个)及相应的频率。然后再在这1000个最大的IP中,找出那个频率最大的IP,即为所求。  
下手吧!


(一)生成数据

我首先构造1亿个IP,这些IP前两段都是“10.197”,后两段为0-255的随机数

把这1亿个IP存入一个文本文件中

Python代码如下:

[python]  view plain copy
  1. __author__ = "Wally Yu (dayu.ebay@gmail.com)"  
  2. __date__ = "$Date: 2012/04/09 $"  
  3.   
  4. def generateRandom(rangeFrom, rangeTo):  
  5.     import random  
  6.     return random.randint(rangeFrom,rangeTo)  
  7.   
  8. def generageMassiveIPAddr(fileLocation,numberOfLines):  
  9.     IP = []  
  10.     file_handler = open(fileLocation, 'a+')  
  11.     for i in range(numberOfLines):  
  12.         IP.append('10.197.' + str(generateRandom(0,255))+'.'+ str(generateRandom(0,255)) + '\n')  
  13.   
  14.     file_handler.writelines(IP)  
  15.     file_handler.close()  
  16.   
  17. if __name__ == '__main__':  
  18.     from time import ctime  
  19.     print ctime()  
  20.     for i in range(10):  
  21.         print '  ' + str(i) + ": " + ctime()  
  22.         generageMassiveIPAddr('d:\\massiveIP.txt'10000000)  
  23.     print ctime()  

这里插一下,我的软件硬件环境:

硬件:

 - ThinkPad T420(CPU: i7, 内存16G)

软件:

 -OS: WinXP32位 (只认出3.6G内存)

 - Python:2.7


从Python的print日志中基本可以看出,生成一千万条IP地址大概需要1分钟,生成1亿条记录需要10分钟

占据硬盘空间:1.4G

日志:

[python]  view plain copy
  1. Mon Apr 09 16:52:28 2012  
  2.   0: Mon Apr 09 16:52:28 2012  
  3.   1: Mon Apr 09 16:53:28 2012  
  4.   2: Mon Apr 09 16:54:29 2012  
  5.   3: Mon Apr 09 16:55:30 2012  
  6.   4: Mon Apr 09 16:56:32 2012  
  7.   5: Mon Apr 09 16:57:33 2012  
  8.   6: Mon Apr 09 16:58:36 2012  
  9.   7: Mon Apr 09 16:59:36 2012  
  10.   8: Mon Apr 09 17:00:36 2012  
  11.   9: Mon Apr 09 17:01:35 2012  
  12. Mon Apr 09 17:02:36 2012  

(二)处理思路

假设现在可用内存仅为128M,而每行IP经计算需要14Byte

因为数据太大,把1亿条数据载入内存,再做排序会导致内存溢出。July的思想:“以大化小,分而治之”非常合适,我转化后的操作思路:

1. 每行IP需要14B空间,那么128M内存最多可以处理 128M / 14B = 9142857个IP

    把每36571429个IP拆成一个小文件保存起来,每个小文件的大小小于等于128M,共将生成11个文件

2. 对每个小文件用Hash Table处理,Python有自己非常高效的Hash Table:Dictionary!

    具体处理如下:

    1). 构建名为“Result”的Dictionary,“key”为IP地址,“value”为该IP地址出现的次数,用来记录11个文件每一个的最多出现的IP

    2). 构建名为“IP”的Dictionary,“key”为IP地址,“value”为该IP地址出现的次数,用来记录每一个小文件的所有IP地址

    3). 读入每一条IP地址到“IP” Dictionary,如果该IP地址出现过,把相应的value的值加1;如果该IP地址没有出现过,则key=IP地址,value=1

    4). 对“IP” Dictionary进行内排序,返回最大的IP地址(如果有若干个最大值是一样的,就都返回)

    5). 将返回值存入“Result” Dictionary

    6). 对“Result”进行内排序,返回最大的IP地址(如果有若干个最大值是一样的,就都返回)


(三)实现

1)拆分成小文件

代码如下:

[python]  view plain copy
  1. __author__ = "Wally Yu (dayu.ebay@gmail.com)"  
  2. __date__ = "$Date: 2012/04/10 $"  
  3.   
  4. from time import ctime  
  5. def splitFile(fileLocation, targetFoler):  
  6.     file_handler = open(fileLocation, 'r')  
  7.     block_size = 9142857  
  8.     line = file_handler.readline()  
  9.     temp = []  
  10.     countFile = 1  
  11.     while line:  
  12.         for i in range(block_size):  
  13.             if i == (block_size-1):  
  14.                 # write block to small files  
  15.                 file_writer = open(targetFoler + "\\file_"+str(countFile)+".txt"'a+')  
  16.                 file_writer.writelines(temp)  
  17.                 file_writer.close()  
  18.                 temp = []  
  19.                 print "  file " + str(countFile) + " generated at: " + str(ctime())  
  20.                 countFile = countFile + 1  
  21.             else:  
  22.                 temp.append(file_handler.readline())  
  23.       
  24.     file_handler.close()  
  25.   
  26. if __name__ == '__main__':  
  27.     print "Start At: " + str(ctime())  
  28.     splitFile("d:\\massiveIP.txt""d:\\massiveData")  

运行后的log如下:
[plain]  view plain copy
  1. Start At: Tue Apr 10 10:56:25 2012  
  2.   file 1 generated at: Tue Apr 10 10:56:37 2012  
  3.   file 2 generated at: Tue Apr 10 10:56:47 2012  
  4.   file 3 generated at: Tue Apr 10 10:57:00 2012  
  5.   file 4 generated at: Tue Apr 10 10:57:14 2012  
  6.   file 5 generated at: Tue Apr 10 10:57:26 2012  
  7.   file 6 generated at: Tue Apr 10 10:57:42 2012  
  8.   file 7 generated at: Tue Apr 10 10:57:53 2012  
  9.   file 8 generated at: Tue Apr 10 10:58:04 2012  
  10.   file 9 generated at: Tue Apr 10 10:58:16 2012  
  11.   file 10 generated at: Tue Apr 10 10:58:27 2012  
  12.   file 11 generated at: Tue Apr 10 10:58:38 2012  

可见拆分一个文件需要费时10-15秒,拆分文件总共耗时2分14秒


2). 找出出现次数最大的IP:

代码如下:

[python]  view plain copy
  1. __author__ = "Wally Yu (dayu.ebay@gmail.com)"  
  2. __date__ = "$Date: 2012/04/10 $"  
  3.   
  4. import os  
  5. from time import ctime  
  6.   
  7. def findIP(targetFolder):  
  8.     Result = {}  
  9.     IP = {}  
  10.     for root, dirs, files in os.walk(targetFolder):  
  11.         for f in files:  
  12.             # read small files  
  13.             file_handler = open(os.path.join(targetFolder, f), 'r')  
  14.             lines = file_handler.readlines()  
  15.             file_handler.close()  
  16.             # get IP in file, store to IP Dictionary  
  17.             for line in lines:  
  18.                 if line in IP:  
  19.                     IP[line] = IP[line] + 1  
  20.                 else:  
  21.                     IP[line] = 1  
  22.             # sort Dictionary  
  23.             IP = sorted(IP.items(), key=lambda d: d[1])  
  24.             # get max item(s), store to Result Dictionary  
  25.             maxItem = IP[-1][1]  
  26.             print '  File ' + str(f) + ":"  
  27.             print "    maxItem: " + str(IP[-1])  
  28.             tempTuple = IP.pop()  
  29.             while tempTuple[1] == maxItem:  
  30.                 if tempTuple[0in Result:  
  31.                     Result[tempTuple[0]] = Result[tempTuple[0]] + 1  
  32.                 else:  
  33.                     Result[tempTuple[0]] = tempTuple[1]  
  34.                 tempTuple = IP.pop()  
  35.             IP = {}  
  36.             print '    Finished: ' + ctime()  
  37.                       
  38.     print sorted(Result.items(), key=lambda d: d[1])  
  39.   
  40. if __name__ == '__main__':  
  41.     print 'Start: ' + ctime()  
  42.     findIP("d:\\massiveData")  
  43.     print 'End: ' + ctime()  

运行后的log如下:
[plain]  view plain copy
  1. Start: Thu Apr 12 10:20:01 2012  
  2.   File file_1.txt:  
  3.     maxItem: ('10.197.223.85\n', 190)  
  4.     Finished: Thu Apr 12 10:20:23 2012  
  5.   File file_10.txt:  
  6.     maxItem: ('10.197.44.215\n', 194)  
  7.     Finished: Thu Apr 12 10:20:37 2012  
  8.   File file_11.txt:  
  9.     maxItem: ('10.197.251.171\n', 181)  
  10.     Finished: Thu Apr 12 10:20:48 2012  
  11.   File file_2.txt:  
  12.     maxItem: ('10.197.181.190\n', 191)  
  13.     Finished: Thu Apr 12 10:21:00 2012  
  14.   File file_3.txt:  
  15.     maxItem: ('10.197.135.27\n', 193)  
  16.     Finished: Thu Apr 12 10:21:14 2012  
  17.   File file_4.txt:  
  18.     maxItem: ('10.197.208.113\n', 192)  
  19.     Finished: Thu Apr 12 10:21:24 2012  
  20.   File file_5.txt:  
  21.     maxItem: ('10.197.120.160\n', 190)  
  22.     Finished: Thu Apr 12 10:21:34 2012  
  23.   File file_6.txt:  
  24.     maxItem: ('10.197.69.155\n', 193)  
  25.     Finished: Thu Apr 12 10:21:44 2012  
  26.   File file_7.txt:  
  27.     maxItem: ('10.197.88.144\n', 192)  
  28.     Finished: Thu Apr 12 10:21:55 2012  
  29.   File file_8.txt:  
  30.     maxItem: ('10.197.103.234\n', 193)  
  31.     Finished: Thu Apr 12 10:22:08 2012  
  32.   File file_9.txt:  
  33.     maxItem: ('10.197.117.46\n', 192)  
  34.     Finished: Thu Apr 12 10:22:20 2012  
  35. [('10.197.251.171\n', 181), ('10.197.120.160\n', 190), ('10.197.223.85\n', 190), ('10.197.181.190\n', 191), ('10.197.117.46\n', 192), ('10.197.208.113\n', 192), ('10.197.88.144\n', 192), ('10.197.147.29\n', 193), ('10.197.68.183\n', 193), ('10.197.69.155\n', 193), ('10.197.103.234\n', 193), ('10.197.135.27\n', 193), ('10.197.44.215\n', 194)]  
  36. End: Thu Apr 12 10:22:21 2012  

由此可见,最大的IP地址为:“10.197.44.215”,共出现194次!

而Python的计算时间为2分20秒,非常快大笑


(四)引申测试

以上是在假设内存仅为128M下的计算时间,为了测试Python真正的执行效率,打算再写一算法,将所有1.4G的数据一次性导入内存,并作内排序,看看它的执行效率

代码如下:

[python]  view plain copy
  1. __author__ = "Wally Yu (dayu.ebay@gmail.com)"  
  2. __date__ = "$Date: 2012/04/10 $"  
  3.   
  4. import os  
  5. from time import ctime  
  6.   
  7. def findIPAtOnce(targetFile):  
  8.     print "Started At: " + ctime()  
  9.     Result = {}  
  10.     file_handler = open(targetFile, 'r')  
  11.     lines = file_handler.readlines()  
  12.     file_handler.close()  
  13.     print "File Read Finished At: " + ctime()  
  14.   
  15.     for line in lines:  
  16.         if line in Result:  
  17.             Result[line] = Result[line] + 1  
  18.         else:  
  19.             Result[line] = 1  
  20.     print "Write to Dic Finished At: " + ctime()  
  21.   
  22.     Result = sorted(Result.items(), key=lambda d: d[1])  
  23.     print "Sorting Finished At: " + ctime()  
  24.   
  25.     print 'Result:'  
  26.     for i in range(10):  
  27.         print '  ' + str(Result.pop())  
  28.   
  29. if __name__ == '__main__':  
  30.     findIPAtOnce("d:\\massiveIP.txt")  

最后得到了Memory Error:
[plain]  view plain copy
  1. Traceback (most recent call last):  
  2.   File "C:/Documents and Settings/wally-yu/Desktop/findIPAtOnce.py", line 30, in <module>  
  3.     findIPAtOnce("d:\\massiveIP.txt")  
  4.   File "C:/Documents and Settings/wally-yu/Desktop/findIPAtOnce.py", line 11, in findIPAtOnce  
  5.     lines = file_handler.readlines()  
  6. MemoryError  

哈哈哈!

为了测试Python的处理速度,重新生成一个小一点的Txt文件,重新运行generageMassiveIPAddr函数,生成一千万个IP地址

[python]  view plain copy
  1. if __name__ == '__main__':  
  2.     from time import ctime  
  3.     print ctime()  
  4.     for i in range(1):  
  5.         print '  ' + str(i) + ": " + ctime()  
  6.         generageMassiveIPAddr('d:\\massiveIP_small.txt'10000000)  
  7.     print ctime()  

生成后的Txt占据144M的空间

再次运行

[plain]  view plain copy
  1. if __name__ == '__main__':  
  2.     findIPAtOnce("d:\\massiveIP_small.txt")  

得到Log如下:
[plain]  view plain copy
  1. Started At: Thu Apr 12 11:03:35 2012  
  2. File Read Finished At: Thu Apr 12 11:03:41 2012  
  3. Write to Dic Finished At: Thu Apr 12 11:03:44 2012  
  4. Sorting Finished At: Thu Apr 12 11:03:45 2012  
  5. Result:  
  6.   ('10.197.222.105\n', 215)  
  7.   ('10.197.68.118\n', 210)  
  8.   ('10.197.229.152\n', 206)  
  9.   ('10.197.22.46\n', 202)  
  10.   ('10.197.98.83\n', 201)  
  11.   ('10.197.53.43\n', 201)  
  12.   ('10.197.169.65\n', 200)  
  13.   ('10.197.225.22\n', 200)  
  14.   ('10.197.146.78\n', 200)  
  15.   ('10.197.57.101\n', 200)  

可见时间消耗如下:

文件数据读取:6秒

写入Dictionary:3秒

排序:1秒

总共耗时不过10秒,而且大多时间都是I/O的开销!!!


(五)小节

由以上种种可见Python对于海量数据处理的高效率,也为Python的同行处理海量数据提供了一些思路

有兴趣的朋友可以拿其他语言做同样的测试,共同进步


(六)修改

注:

1. 以上完成于2012年4月10日,本节及以下完成于2012年4月18日

2. 六、七节的增加是由于lidguan兄发现的一个大漏洞而做的修改,非常感谢!具体评论见下:

4楼 lidguan昨天 16:23发表[回复]
我有个问题,请不吝赐教:
将一个大文件分割成许多的小文件,但楼主分割方式好像是根据文件大小来分割,然后分别排序,得出一系列的最大值,最后在最大值中比较,得出一个最后结果....

但每个ip可能在不同文件中都有记录,也许这个倒霉的ip是第一个文件的第二名,在第二个文件也是第二名,你用最大值进行比较,就会把这个倒霉的ip忽略掉,但其实这个ip才是真正的最大值..

我不懂python,当伪代码看的....如有不对的地方,请多多原谅

我确实也是考虑不周,才导致了以上算法的巨大漏洞,今天做如下修改:

思路:

1. 不对大文件进行拆分,否则会产生lidguan兄提到的问题

2. 假设这个一亿个IP地址的重复率比较高,重复后的IP可以一次性记录入Python Dictionary (Java Hash Map),那么就直接从大文件中一条一条读取IP地址,记录入Dictionary

3. 对Dictionary进行排序并输出


代码:

[python]  view plain copy
  1. __author__ = "Wally Yu (dayu.ebay@gmail.com)"  
  2. __date__ = "$Date: 2012/04/18 $"  
  3.   
  4. import os  
  5. from time import ctime  
  6.   
  7. def findIPAtOnce(targetFile):  
  8.     print "Started At: " + ctime()  
  9.     Result = {}  
  10.     file_handler = open(targetFile, 'r')  
  11.   
  12.     for line in file_handler:  
  13.         if line in Result:  
  14.             Result[line] = Result[line] + 1  
  15.         else:  
  16.             Result[line] = 1  
  17.     print "Write to Dic Finished At: " + ctime()  
  18.   
  19.     file_handler.close()  
  20.   
  21.     Result = sorted(Result.items(), key=lambda d: d[1])  
  22.     print "Sorting Finished At: " + ctime()  
  23.   
  24.     print 'Result:'  
  25.     for i in range(10):  
  26.         print '  ' + str(Result.pop())  
  27.   
  28. if __name__ == '__main__':  
  29.     findIPAtOnce("d:\\massiveIP.txt")  

Log:

[plain]  view plain copy
  1. >>>   
  2. Started At: Wed Apr 18 13:20:34 2012  
  3. Write to Dic Finished At: Wed Apr 18 13:21:34 2012  
  4. Sorting Finished At: Wed Apr 18 13:21:34 2012  
  5. Result:  
  6.   ('10.197.200.159\n', 1713)  
  7.   ('10.197.143.163\n', 1707)  
  8.   ('10.197.68.193\n', 1693)  
  9.   ('10.197.136.119\n', 1692)  
  10.   ('10.197.71.24\n', 1692)  
  11.   ('10.197.132.242\n', 1690)  
  12.   ('10.197.4.219\n', 1688)  
  13.   ('10.197.113.84\n', 1684)  
  14.   ('10.197.204.142\n', 1681)  
  15.   ('10.197.78.110\n', 1675)  

由此可见,出现最多的IP为“10.197.200.159”,出现次数1713次!

执行时间:

  • 读取文件并写入Dictionary:60秒
  • 内排序:小于1秒!!!

经过这次的修改,运算结果相信是可靠的


(七)总结

1. 修改后的代码是在假设IP地址的重复性比较高,可以一次性导入内存中操作的前提下展开的;如果重复性低,或者更大量的数据,导致无法一次导入内存的话,就需要使用外排序来找出IP地址了

2. 希望大家多多探讨,也幸亏lidguan兄的指出,否则险些酿成大错大笑

3. 最近在讨论的纯粹的QA是否有必要存在,我也来多嘴几句,我相信这些代码如果是经过了QA的测试后多多少少会降低风险,尤其是此类明显的逻辑性的错误是肯定可以避免的,所以QA人员的重要性不言而喻。说要消灭QA,鄙人觉得是软件工程思维的一种倒退

这篇关于Python处理海量数据的实战研究的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1045852

相关文章

Python 字符串占位

在Python中,可以使用字符串的格式化方法来实现字符串的占位。常见的方法有百分号操作符 % 以及 str.format() 方法 百分号操作符 % name = "张三"age = 20message = "我叫%s,今年%d岁。" % (name, age)print(message) # 我叫张三,今年20岁。 str.format() 方法 name = "张三"age

【服务器运维】MySQL数据存储至数据盘

查看磁盘及分区 [root@MySQL tmp]# fdisk -lDisk /dev/sda: 21.5 GB, 21474836480 bytes255 heads, 63 sectors/track, 2610 cylindersUnits = cylinders of 16065 * 512 = 8225280 bytesSector size (logical/physical)

React+TS前台项目实战(十七)-- 全局常用组件Dropdown封装

文章目录 前言Dropdown组件1. 功能分析2. 代码+详细注释3. 使用方式4. 效果展示 总结 前言 今天这篇主要讲全局Dropdown组件封装,可根据UI设计师要求自定义修改。 Dropdown组件 1. 功能分析 (1)通过position属性,可以控制下拉选项的位置 (2)通过传入width属性, 可以自定义下拉选项的宽度 (3)通过传入classN

一道经典Python程序样例带你飞速掌握Python的字典和列表

Python中的列表(list)和字典(dict)是两种常用的数据结构,它们在数据组织和存储方面有很大的不同。 列表(List) 列表是Python中的一种有序集合,可以随时添加和删除其中的元素。列表中的元素可以是任何数据类型,包括数字、字符串、其他列表等。列表使用方括号[]表示,元素之间用逗号,分隔。 定义和使用 # 定义一个列表 fruits = ['apple', 'banana

Python应用开发——30天学习Streamlit Python包进行APP的构建(9)

st.area_chart 显示区域图。 这是围绕 st.altair_chart 的语法糖。主要区别在于该命令使用数据自身的列和指数来计算图表的 Altair 规格。因此,在许多 "只需绘制此图 "的情况下,该命令更易于使用,但可定制性较差。 如果 st.area_chart 无法正确猜测数据规格,请尝试使用 st.altair_chart 指定所需的图表。 Function signa

SQL Server中,查询数据库中有多少个表,以及数据库其余类型数据统计查询

sqlserver查询数据库中有多少个表 sql server 数表:select count(1) from sysobjects where xtype='U'数视图:select count(1) from sysobjects where xtype='V'数存储过程select count(1) from sysobjects where xtype='P' SE

python实现最简单循环神经网络(RNNs)

Recurrent Neural Networks(RNNs) 的模型: 上图中红色部分是输入向量。文本、单词、数据都是输入,在网络里都以向量的形式进行表示。 绿色部分是隐藏向量。是加工处理过程。 蓝色部分是输出向量。 python代码表示如下: rnn = RNN()y = rnn.step(x) # x为输入向量,y为输出向量 RNNs神经网络由神经元组成, python

python 喷泉码

因为要完成毕业设计,毕业设计做的是数据分发与传输的东西。在网络中数据容易丢失,所以我用fountain code做所发送数据包的数据恢复。fountain code属于有限域编码的一部分,有很广泛的应用。 我们日常生活中使用的二维码,就用到foutain code做数据恢复。你遮住二维码的四分之一,用手机的相机也照样能识别。你遮住的四分之一就相当于丢失的数据包。 为了实现并理解foutain

python 点滴学

1 python 里面tuple是无法改变的 tuple = (1,),计算tuple里面只有一个元素,也要加上逗号 2  1 毕业论文改 2 leetcode第一题做出来

Python爬虫-贝壳新房

前言 本文是该专栏的第32篇,后面会持续分享python爬虫干货知识,记得关注。 本文以某房网为例,如下图所示,采集对应城市的新房房源数据。具体实现思路和详细逻辑,笔者将在正文结合完整代码进行详细介绍。接下来,跟着笔者直接往下看正文详细内容。(附带完整代码) 正文 地址:aHR0cHM6Ly93aC5mYW5nLmtlLmNvbS9sb3VwYW4v 目标:采集对应城市的