你的设计有点混乱。您使用一个N个worker池,然后将M jobs工作分解为N个大小为M/N的任务。换句话说,如果这些都正确,那么您将在构建在worker进程之上的池之上模拟工作进程。为什么要这么麻烦果你想使用过程,就直接使用它们。或者,使用一个池作为池,将每个作业作为自己的任务发送,并使用批处理功能以某种适当的(可调整的)方式对其进行批处理。在
这意味着runMatch只需要一个针线和针线公司,它所做的就是调用findNeedle,然后做{}部分的任何操作。然后主程序就简单多了:if __name__ == ‘__main__’:
with Pool(processes=numProcesses) as pool:
results = pool.map_async(runMatch, needleCompanies.iteritems(),
chunkSize=NUMBER_TWEAKED_IN_TESTING).get()
或者,如果结果很小,而不是让所有进程(大概)为一些共享的结果存储内容而争吵,只需返回它们。那么您根本不需要runMatch,只需:
^{2}$
或者,如果您想精确地执行N个批次,只需为每个批次创建一个流程:if __name__ == ‘__main__’:
totalTargets = len(getTargets(‘all’))
targetsPerBatch = totalTargets / numProcesses
processes = [Process(target=runMatch,
args=(targetsPerBatch,
xrange(0,
totalTargets,
targetsPerBatch)))
for _ in range(numProcesses)]
for p in processes:
p.start()
for p in processes:
p.join()
另外,您似乎为每个任务调用一次getHayStack()(以及getNeedles)。我不确定在同一时间得到多个live副本有多容易,但考虑到这是迄今为止最大的数据结构,这将是我试图排除的第一件事。事实上,即使这不是内存使用问题,getHayStack也很容易对性能造成很大的影响,除非您已经在进行某种缓存(例如,第一次显式地将其存储在全局或可变的默认参数值中,然后才使用它),因此不管怎样,它都值得修复。在
同时修复这两个潜在问题的一种方法是在^{}构造函数中使用初始值设定项:def initPool():
global _haystack
_haystack = getHayStack()
def runMatch(args):
global _haystack
# …
hayCompanies = _haystack
# …
if __name__ == ‘__main__’:
pool = Pool(processes=numProcesses, initializer=initPool)
# …
接下来,我注意到您在多个实际上并不需要它们的地方显式地生成列表。例如:scores = list(results.values())
resultIDs = list(results.keys())
needleID = resultIDs[scores.index(max(scores))]
return [needleID, haystack[needleID], max(scores)]
如果结果多于少数,这是浪费;只需直接使用results.values()iterable。(事实上,看起来您使用的是python2.x,在这种情况下,keys和{}已经是列表了,所以您只是无缘无故地多做了一个副本。)
但在这种情况下,你可以进一步简化整个过程。你只是在寻找得分最高的key(resultID)和value(score),对吧以:needleID, score = max(results.items(), key=operator.itemgetter(1))
return [needleID, haystack[needleID], score]
这也消除了对score的所有重复搜索,这将节省一些CPU。在
这可能不能直接解决内存问题,但希望它能使调试和/或调整更容易。在
首先要尝试的是使用小得多的批处理,而不是input_size/cpu_count,尝试1。内存使用减少了吗果没有,我们已经排除了这一部分。在
下一步,尝试sys.getsizeof(_haystack),看看它是怎么写的。如果它是1.6GB,那么你可以很好地将所有内容压缩到0.4GB,所以这就是攻击它的方法——例如,使用^{}数据库而不是普通的dict。在
还可以尝试在初始值设定项函数的开始和结束处转储内存使用量(使用^{}模块,getrusage(RUSAGE_SELF))。如果最后的干草堆只有0.3GB,但是你又分配了1.3GB来构建它,那就是攻击的问题。例如,您可以分离一个子进程来构建和pickle dict,然后让池初始值设定项打开并取消pickle。或者在第一个子级中合并两个构建shelvedb,并在初始值设定项中以只读方式打开它。不管怎样,这也意味着你只需要做一次CSV解析/dict构建工作,而不是8次。在
另一方面,如果您的总VM使用率仍然很低(请注意,getrusage无法直接查看您的总VM大小-ru_maxrss通常是一个有用的近似值,特别是当{}为0时),则问题出在任务本身。在
首先,getsizeof任务函数的参数和您返回的值。如果它们很大,特别是如果它们在每个任务中不断变大,或者变幻莫测,那么数据占用了太多内存,这可能只是一个问题,最终8个数据足够大,足以达到极限。在
否则,问题很可能出在任务函数本身。要么是内存泄漏(通过使用有缺陷的C扩展模块或ctypes只能发生real泄漏,但如果在调用之间保留任何引用,例如在全局中,可能会永远不必要地保留某些内容),或者某些任务本身占用了太多内存。不管是哪种方式,这应该是你可以更容易地测试的东西,只需抽出多个处理程序,直接运行任务,这样更容易调试。在
声明:本站部分文章及图片源自用户投稿,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!