diff --git a/multiprocess/parallelizer.py b/multiprocess/parallelizer.py index 6447efae..96a766c1 100644 --- a/multiprocess/parallelizer.py +++ b/multiprocess/parallelizer.py @@ -1,4 +1,4 @@ -import os, sys, time, multiprocessing +import os, sys, time, multiprocessing, re from processes import ForkedProcess from remoteproxy import ExitError @@ -37,12 +37,13 @@ class Parallelize: since it is automatically sent via pipe back to the parent process. """ - def __init__(self, tasks, workers=None, block=True, progressDialog=None, randomReseed=True, **kwds): + def __init__(self, tasks=None, workers=None, block=True, progressDialog=None, randomReseed=True, **kwds): """ =============== =================================================================== Arguments: tasks list of objects to be processed (Parallelize will determine how to - distribute the tasks) + distribute the tasks). If unspecified, then each worker will receive + a single task with a unique id number. workers number of worker processes or None to use number of CPUs in the system progressDialog optional dict of arguments for ProgressDialog @@ -70,6 +71,8 @@ class Parallelize: if not hasattr(os, 'fork'): workers = 1 self.workers = workers + if tasks is None: + tasks = range(workers) self.tasks = list(tasks) self.reseed = randomReseed self.kwds = kwds.copy() @@ -103,7 +106,7 @@ class Parallelize: self.progressDlg.__enter__() self.progressDlg.setMaximum(len(self.tasks)) self.progress = {os.getpid(): []} - return Tasker(None, self.tasks, self.kwds) + return Tasker(self, None, self.tasks, self.kwds) def runParallel(self): @@ -121,7 +124,7 @@ class Parallelize: proc = ForkedProcess(target=None, preProxy=self.kwds, randomReseed=self.reseed) if not proc.isParent: self.proc = proc - return Tasker(proc, chunks[i], proc.forkedProxies) + return Tasker(self, proc, chunks[i], proc.forkedProxies) else: self.childs.append(proc) @@ -197,7 +200,26 @@ class Parallelize: @staticmethod def suggestedWorkerCount(): - return multiprocessing.cpu_count() ## is this really the best option? + if 'linux' in sys.platform: + ## I think we can do a little better here.. + ## cpu_count does not consider that there is little extra benefit to using hyperthreaded cores. + try: + cores = {} + pid = None + + for line in open('/proc/cpuinfo'): + m = re.match(r'physical id\s+:\s+(\d+)', line) + if m is not None: + pid = m.groups()[0] + m = re.match(r'cpu cores\s+:\s+(\d+)', line) + if m is not None: + cores[pid] = int(m.groups()[0]) + return sum(cores.values()) + except: + return multiprocessing.cpu_count() + + else: + return multiprocessing.cpu_count() def _taskStarted(self, pid, i, **kwds): ## called remotely by tasker to indicate it has started working on task i @@ -212,8 +234,9 @@ class Parallelize: class Tasker: - def __init__(self, proc, tasks, kwds): - self.proc = proc + def __init__(self, parallelizer, process, tasks, kwds): + self.proc = process + self.par = parallelizer self.tasks = tasks for k, v in kwds.iteritems(): setattr(self, k, v) @@ -229,7 +252,20 @@ class Tasker: #print os.getpid(), 'no more tasks' self.proc.close() + def process(self): + """ + Process requests from parent. + Usually it is not necessary to call this unless you would like to + receive messages (such as exit requests) during an iteration. + """ + if self.proc is not None: + self.proc.processRequests() + def numWorkers(self): + """ + Return the number of parallel workers + """ + return self.par.workers #class Parallelizer: #"""