Parallelizer: more clever assessment of CPU count (ignores hyperthreaded cores on linux)

This commit is contained in:
Luke Campagnola 2012-08-31 17:20:05 -04:00
parent 0402d08604
commit 92d11cee91

View File

@ -1,4 +1,4 @@
import os, sys, time, multiprocessing
import os, sys, time, multiprocessing, re
from processes import ForkedProcess
from remoteproxy import ExitError
@ -37,12 +37,13 @@ class Parallelize:
since it is automatically sent via pipe back to the parent process.
"""
def __init__(self, tasks, workers=None, block=True, progressDialog=None, randomReseed=True, **kwds):
def __init__(self, tasks=None, workers=None, block=True, progressDialog=None, randomReseed=True, **kwds):
"""
=============== ===================================================================
Arguments:
tasks list of objects to be processed (Parallelize will determine how to
distribute the tasks)
distribute the tasks). If unspecified, then each worker will receive
a single task with a unique id number.
workers number of worker processes or None to use number of CPUs in the
system
progressDialog optional dict of arguments for ProgressDialog
@ -70,6 +71,8 @@ class Parallelize:
if not hasattr(os, 'fork'):
workers = 1
self.workers = workers
if tasks is None:
tasks = range(workers)
self.tasks = list(tasks)
self.reseed = randomReseed
self.kwds = kwds.copy()
@ -103,7 +106,7 @@ class Parallelize:
self.progressDlg.__enter__()
self.progressDlg.setMaximum(len(self.tasks))
self.progress = {os.getpid(): []}
return Tasker(None, self.tasks, self.kwds)
return Tasker(self, None, self.tasks, self.kwds)
def runParallel(self):
@ -121,7 +124,7 @@ class Parallelize:
proc = ForkedProcess(target=None, preProxy=self.kwds, randomReseed=self.reseed)
if not proc.isParent:
self.proc = proc
return Tasker(proc, chunks[i], proc.forkedProxies)
return Tasker(self, proc, chunks[i], proc.forkedProxies)
else:
self.childs.append(proc)
@ -197,7 +200,26 @@ class Parallelize:
@staticmethod
def suggestedWorkerCount():
return multiprocessing.cpu_count() ## is this really the best option?
if 'linux' in sys.platform:
## I think we can do a little better here..
## cpu_count does not consider that there is little extra benefit to using hyperthreaded cores.
try:
cores = {}
pid = None
for line in open('/proc/cpuinfo'):
m = re.match(r'physical id\s+:\s+(\d+)', line)
if m is not None:
pid = m.groups()[0]
m = re.match(r'cpu cores\s+:\s+(\d+)', line)
if m is not None:
cores[pid] = int(m.groups()[0])
return sum(cores.values())
except:
return multiprocessing.cpu_count()
else:
return multiprocessing.cpu_count()
def _taskStarted(self, pid, i, **kwds):
## called remotely by tasker to indicate it has started working on task i
@ -212,8 +234,9 @@ class Parallelize:
class Tasker:
def __init__(self, proc, tasks, kwds):
self.proc = proc
def __init__(self, parallelizer, process, tasks, kwds):
self.proc = process
self.par = parallelizer
self.tasks = tasks
for k, v in kwds.iteritems():
setattr(self, k, v)
@ -229,7 +252,20 @@ class Tasker:
#print os.getpid(), 'no more tasks'
self.proc.close()
def process(self):
"""
Process requests from parent.
Usually it is not necessary to call this unless you would like to
receive messages (such as exit requests) during an iteration.
"""
if self.proc is not None:
self.proc.processRequests()
def numWorkers(self):
"""
Return the number of parallel workers
"""
return self.par.workers
#class Parallelizer:
#"""