在script from this answer的基础上,我有以下场景:一个包含2500个大文本文件的文件夹(每个约55Mb),所有制表符分隔. Web日志,基本上.
我需要md5哈希每个文件的每一行中的第二个’列’,将修改后的文件保存在别处.源文件位于机械磁盘上,目标文件位于SSD上.
该脚本非常快速地处理前25个(左右)文件.然后它减慢了WAY.基于前25个文件,它应该在2分钟左右完成所有文件.但是,根据之后的表现,完成它们需要15分钟(左右).
它运行在具有32 Gb RAM的服务器上,任务管理器很少显示超过6 Gb的使用情况.我已经设置了启动6个进程,但核心上的cpu使用率很低,很少超过15%.
为什么这放慢了?读/写磁盘问题?垃圾收集器?坏代码?有关如何加快速度的任何想法?
这是脚本
import os
import multiprocessing
from multiprocessing import Process
import threading
import hashlib
class ThreadRunner(threading.Thread):
""" This class represents a single instance of a running thread"""
def __init__(self,fileset,filedirectory):
threading.Thread.__init__(self)
self.files_to_process = fileset
self.filedir = filedirectory
def run(self):
for current_file in self.files_to_process:
# Open the current file as read only
active_file_name = self.filedir + "/" + current_file
output_file_name = "D:/hashed_data/" + "hashed_" + current_file
active_file = open(active_file_name,"r")
output_file = open(output_file_name,"ab+")
for line in active_file:
# Load the line,hash the username,save the line
lineList = line.split("\t")
if not lineList[1] == "-":
lineList[1] = hashlib.md5(lineList[1]).hexdigest()
lineOut = '\t'.join(lineList)
output_file.write(lineOut)
# Always close files after you open them
active_file.close()
output_file.close()
print "\nCompleted " + current_file
class ProcessRunner:
""" This class represents a single instance of a running process """
def runp(self,pid,numThreads,filedirectory):
mythreads = []
for tid in range(numThreads):
th = ThreadRunner(fileset,filedirectory)
mythreads.append(th)
for i in mythreads:
i.start()
for i in mythreads:
i.join()
class ParallelExtractor:
def runInParallel(self,numProcesses,filedirectory):
myprocs = []
prunner = ProcessRunner()
# Store the file names from that directory in a list that we can iterate
file_names = os.listdir(filedirectory)
file_sets = []
for i in range(numProcesses):
file_sets.append([])
for index,name in enumerate(file_names):
num = index % numProcesses
file_sets[num].append(name)
for pid in range(numProcesses):
pr = Process(target=prunner.runp,args=(pid,file_sets[pid],filedirectory))
myprocs.append(pr)
for i in myprocs:
i.start()
for i in myprocs:
i.join()
if __name__ == '__main__':
file_directory = "E:/original_data"
processes = 6
threads = 1
extractor = ParallelExtractor()
extractor.runInParallel(numProcesses=processes,numThreads=threads,filedirectory=file_directory)
处理第一个文件时可能会提高性能,因为操作系统会将第一个文件缓存在内存中,因此不会发生磁盘I / O.这可以通过以下任一方式确认:
>重新启动服务器,从而刷新缓存
>通过从磁盘读取足够大的文件,用其他东西填充缓存
>在处理第一个文件时仔细监听磁盘搜索的缺失
现在,由于散列文件的性能瓶颈是磁盘,因此在多个进程或线程中执行散列是没用的,因为它们都使用相同的磁盘.正如@Max Noel所提到的,它实际上可以降低性能,因为您将并行读取多个文件,因此您的磁盘必须在文件之间进行搜索.正如他所提到的,性能也将根据您正在使用的操作系统的I / O调度程序而有所不同.
>使用更快的磁盘或SSD,如@Max Noel建议的那样.
>从多个磁盘读取 – 在不同的文件系统中或在RAID上的单个文件系统中读取
>在多台计算机上拆分任务(每台计算机有一个或多个磁盘)
但是,如果你想要做的就是散列这2500个文件并且你已经将它们放在一个磁盘上,那么这些解决方案就毫无用处.将它们从磁盘读取到其他磁盘然后执行散列更慢,因为您将读取文件两次,并且您可以尽可能快地读取它们.
最后,根据@yaccz的想法,如果安装了find,xargs和md5sum的cygwin二进制文件,我想你可以避免编写程序执行散列的麻烦.