2012-06-18 19:20:35 +00:00
|
|
|
import os, sys, time, multiprocessing
|
|
|
|
from processes import ForkedProcess
|
|
|
|
from remoteproxy import ExitError
|
|
|
|
|
2012-06-22 02:00:04 +00:00
|
|
|
class CanceledError(Exception):
|
|
|
|
"""Raised when the progress dialog is canceled during a processing operation."""
|
|
|
|
pass
|
|
|
|
|
2012-06-18 19:20:35 +00:00
|
|
|
class Parallelize:
|
|
|
|
"""
|
|
|
|
Class for ultra-simple inline parallelization on multi-core CPUs
|
|
|
|
|
|
|
|
Example::
|
|
|
|
|
|
|
|
## Here is the serial (single-process) task:
|
|
|
|
|
|
|
|
tasks = [1, 2, 4, 8]
|
|
|
|
results = []
|
|
|
|
for task in tasks:
|
|
|
|
result = processTask(task)
|
|
|
|
results.append(result)
|
|
|
|
print results
|
|
|
|
|
|
|
|
|
|
|
|
## Here is the parallelized version:
|
|
|
|
|
|
|
|
tasks = [1, 2, 4, 8]
|
|
|
|
results = []
|
|
|
|
with Parallelize(tasks, workers=4, results=results) as tasker:
|
|
|
|
for task in tasker:
|
|
|
|
result = processTask(task)
|
|
|
|
tasker.results.append(result)
|
|
|
|
print results
|
|
|
|
|
|
|
|
|
2012-06-22 02:00:04 +00:00
|
|
|
The only major caveat is that *result* in the example above must be picklable,
|
|
|
|
since it is automatically sent via pipe back to the parent process.
|
2012-06-18 19:20:35 +00:00
|
|
|
"""
|
|
|
|
|
2012-07-01 03:32:26 +00:00
|
|
|
def __init__(self, tasks, workers=None, block=True, progressDialog=None, randomReseed=True, **kwds):
|
2012-06-18 19:20:35 +00:00
|
|
|
"""
|
2012-06-22 02:00:04 +00:00
|
|
|
=============== ===================================================================
|
|
|
|
Arguments:
|
|
|
|
tasks list of objects to be processed (Parallelize will determine how to
|
|
|
|
distribute the tasks)
|
|
|
|
workers number of worker processes or None to use number of CPUs in the
|
|
|
|
system
|
|
|
|
progressDialog optional dict of arguments for ProgressDialog
|
|
|
|
to update while tasks are processed
|
2012-07-01 03:32:26 +00:00
|
|
|
randomReseed If True, each forked process will reseed its random number generator
|
|
|
|
to ensure independent results. Works with the built-in random
|
|
|
|
and numpy.random.
|
2012-06-22 02:00:04 +00:00
|
|
|
kwds objects to be shared by proxy with child processes (they will
|
|
|
|
appear as attributes of the tasker)
|
|
|
|
=============== ===================================================================
|
2012-06-18 19:20:35 +00:00
|
|
|
"""
|
|
|
|
|
2012-06-22 02:00:04 +00:00
|
|
|
## Generate progress dialog.
|
|
|
|
## Note that we want to avoid letting forked child processes play with progress dialogs..
|
|
|
|
self.showProgress = False
|
|
|
|
if progressDialog is not None:
|
|
|
|
self.showProgress = True
|
|
|
|
if isinstance(progressDialog, basestring):
|
|
|
|
progressDialog = {'labelText': progressDialog}
|
|
|
|
import pyqtgraph as pg
|
|
|
|
self.progressDlg = pg.ProgressDialog(**progressDialog)
|
|
|
|
|
2012-06-18 19:20:35 +00:00
|
|
|
if workers is None:
|
2012-06-22 02:00:04 +00:00
|
|
|
workers = self.suggestedWorkerCount()
|
2012-06-18 19:20:35 +00:00
|
|
|
if not hasattr(os, 'fork'):
|
|
|
|
workers = 1
|
|
|
|
self.workers = workers
|
|
|
|
self.tasks = list(tasks)
|
2012-07-01 03:32:26 +00:00
|
|
|
self.reseed = randomReseed
|
2012-06-22 02:00:04 +00:00
|
|
|
self.kwds = kwds.copy()
|
|
|
|
self.kwds['_taskStarted'] = self._taskStarted
|
2012-06-18 19:20:35 +00:00
|
|
|
|
|
|
|
def __enter__(self):
|
|
|
|
self.proc = None
|
2012-06-22 02:00:04 +00:00
|
|
|
if self.workers == 1:
|
|
|
|
return self.runSerial()
|
|
|
|
else:
|
|
|
|
return self.runParallel()
|
|
|
|
|
|
|
|
def __exit__(self, *exc_info):
|
|
|
|
|
|
|
|
if self.proc is not None: ## worker
|
|
|
|
try:
|
|
|
|
if exc_info[0] is not None:
|
|
|
|
sys.excepthook(*exc_info)
|
|
|
|
finally:
|
|
|
|
#print os.getpid(), 'exit'
|
|
|
|
os._exit(0)
|
|
|
|
|
|
|
|
else: ## parent
|
|
|
|
if self.showProgress:
|
|
|
|
self.progressDlg.__exit__(None, None, None)
|
|
|
|
|
|
|
|
def runSerial(self):
|
|
|
|
if self.showProgress:
|
|
|
|
self.progressDlg.__enter__()
|
|
|
|
self.progressDlg.setMaximum(len(self.tasks))
|
|
|
|
self.progress = {os.getpid(): []}
|
|
|
|
return Tasker(None, self.tasks, self.kwds)
|
|
|
|
|
|
|
|
|
|
|
|
def runParallel(self):
|
2012-06-18 19:20:35 +00:00
|
|
|
self.childs = []
|
|
|
|
|
|
|
|
## break up tasks into one set per worker
|
2012-06-22 02:00:04 +00:00
|
|
|
workers = self.workers
|
2012-06-18 19:20:35 +00:00
|
|
|
chunks = [[] for i in xrange(workers)]
|
|
|
|
i = 0
|
|
|
|
for i in range(len(self.tasks)):
|
|
|
|
chunks[i%workers].append(self.tasks[i])
|
|
|
|
|
|
|
|
## fork and assign tasks to each worker
|
|
|
|
for i in range(workers):
|
2012-07-01 03:32:26 +00:00
|
|
|
proc = ForkedProcess(target=None, preProxy=self.kwds, randomReseed=self.reseed)
|
2012-06-18 19:20:35 +00:00
|
|
|
if not proc.isParent:
|
|
|
|
self.proc = proc
|
|
|
|
return Tasker(proc, chunks[i], proc.forkedProxies)
|
|
|
|
else:
|
|
|
|
self.childs.append(proc)
|
|
|
|
|
2012-06-22 02:00:04 +00:00
|
|
|
## Keep track of the progress of each worker independently.
|
|
|
|
self.progress = {ch.childPid: [] for ch in self.childs}
|
|
|
|
## for each child process, self.progress[pid] is a list
|
|
|
|
## of task indexes. The last index is the task currently being
|
|
|
|
## processed; all others are finished.
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
if self.showProgress:
|
|
|
|
self.progressDlg.__enter__()
|
|
|
|
self.progressDlg.setMaximum(len(self.tasks))
|
|
|
|
## process events from workers until all have exited.
|
|
|
|
|
|
|
|
activeChilds = self.childs[:]
|
|
|
|
pollInterval = 0.01
|
|
|
|
while len(activeChilds) > 0:
|
|
|
|
waitingChildren = 0
|
2012-06-18 19:20:35 +00:00
|
|
|
rem = []
|
2012-06-22 02:00:04 +00:00
|
|
|
for ch in activeChilds:
|
|
|
|
try:
|
|
|
|
n = ch.processRequests()
|
|
|
|
if n > 0:
|
|
|
|
waitingChildren += 1
|
|
|
|
except ExitError:
|
|
|
|
#print ch.childPid, 'process finished'
|
|
|
|
rem.append(ch)
|
|
|
|
if self.showProgress:
|
|
|
|
self.progressDlg += 1
|
|
|
|
#print "remove:", [ch.childPid for ch in rem]
|
|
|
|
for ch in rem:
|
|
|
|
activeChilds.remove(ch)
|
2012-07-01 03:32:26 +00:00
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
os.waitpid(ch.childPid, 0)
|
|
|
|
break
|
|
|
|
except OSError as ex:
|
|
|
|
if ex.errno == 4: ## If we get this error, just try again
|
|
|
|
continue
|
|
|
|
#print "Ignored system call interruption"
|
|
|
|
else:
|
|
|
|
raise
|
|
|
|
|
2012-06-22 02:00:04 +00:00
|
|
|
#print [ch.childPid for ch in activeChilds]
|
|
|
|
|
|
|
|
if self.showProgress and self.progressDlg.wasCanceled():
|
|
|
|
for ch in activeChilds:
|
|
|
|
ch.kill()
|
|
|
|
raise CanceledError()
|
|
|
|
|
|
|
|
## adjust polling interval--prefer to get exactly 1 event per poll cycle.
|
|
|
|
if waitingChildren > 1:
|
|
|
|
pollInterval *= 0.7
|
|
|
|
elif waitingChildren == 0:
|
|
|
|
pollInterval /= 0.7
|
|
|
|
pollInterval = max(min(pollInterval, 0.5), 0.0005) ## but keep it within reasonable limits
|
|
|
|
|
|
|
|
time.sleep(pollInterval)
|
|
|
|
finally:
|
|
|
|
if self.showProgress:
|
|
|
|
self.progressDlg.__exit__(None, None, None)
|
2012-06-18 19:20:35 +00:00
|
|
|
return [] ## no tasks for parent process.
|
2012-06-22 02:00:04 +00:00
|
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def suggestedWorkerCount():
|
|
|
|
return multiprocessing.cpu_count() ## is this really the best option?
|
2012-06-18 19:20:35 +00:00
|
|
|
|
2012-06-22 02:00:04 +00:00
|
|
|
def _taskStarted(self, pid, i, **kwds):
|
|
|
|
## called remotely by tasker to indicate it has started working on task i
|
|
|
|
#print pid, 'reported starting task', i
|
|
|
|
if self.showProgress:
|
|
|
|
if len(self.progress[pid]) > 0:
|
|
|
|
self.progressDlg += 1
|
|
|
|
if pid == os.getpid(): ## single-worker process
|
|
|
|
if self.progressDlg.wasCanceled():
|
|
|
|
raise CanceledError()
|
|
|
|
self.progress[pid].append(i)
|
2012-06-18 19:20:35 +00:00
|
|
|
|
|
|
|
|
|
|
|
class Tasker:
|
|
|
|
def __init__(self, proc, tasks, kwds):
|
|
|
|
self.proc = proc
|
|
|
|
self.tasks = tasks
|
|
|
|
for k, v in kwds.iteritems():
|
|
|
|
setattr(self, k, v)
|
|
|
|
|
|
|
|
def __iter__(self):
|
|
|
|
## we could fix this up such that tasks are retrieved from the parent process one at a time..
|
2012-06-22 02:00:04 +00:00
|
|
|
for i, task in enumerate(self.tasks):
|
|
|
|
self.index = i
|
|
|
|
#print os.getpid(), 'starting task', i
|
|
|
|
self._taskStarted(os.getpid(), i, _callSync='off')
|
2012-06-18 19:20:35 +00:00
|
|
|
yield task
|
|
|
|
if self.proc is not None:
|
2012-06-22 02:00:04 +00:00
|
|
|
#print os.getpid(), 'no more tasks'
|
2012-06-18 19:20:35 +00:00
|
|
|
self.proc.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#class Parallelizer:
|
|
|
|
#"""
|
|
|
|
#Use::
|
|
|
|
|
|
|
|
#p = Parallelizer()
|
|
|
|
#with p(4) as i:
|
|
|
|
#p.finish(do_work(i))
|
|
|
|
#print p.results()
|
|
|
|
|
|
|
|
#"""
|
|
|
|
#def __init__(self):
|
|
|
|
#pass
|
|
|
|
|
|
|
|
#def __call__(self, n):
|
|
|
|
#self.replies = []
|
|
|
|
#self.conn = None ## indicates this is the parent process
|
|
|
|
#return Session(self, n)
|
|
|
|
|
|
|
|
#def finish(self, data):
|
|
|
|
#if self.conn is None:
|
|
|
|
#self.replies.append((self.i, data))
|
|
|
|
#else:
|
|
|
|
##print "send", self.i, data
|
|
|
|
#self.conn.send((self.i, data))
|
|
|
|
#os._exit(0)
|
|
|
|
|
|
|
|
#def result(self):
|
|
|
|
#print self.replies
|
|
|
|
|
|
|
|
#class Session:
|
|
|
|
#def __init__(self, par, n):
|
|
|
|
#self.par = par
|
|
|
|
#self.n = n
|
|
|
|
|
|
|
|
#def __enter__(self):
|
|
|
|
#self.childs = []
|
|
|
|
#for i in range(1, self.n):
|
|
|
|
#c1, c2 = multiprocessing.Pipe()
|
|
|
|
#pid = os.fork()
|
|
|
|
#if pid == 0: ## child
|
|
|
|
#self.par.i = i
|
|
|
|
#self.par.conn = c2
|
|
|
|
#self.childs = None
|
|
|
|
#c1.close()
|
|
|
|
#return i
|
|
|
|
#else:
|
|
|
|
#self.childs.append(c1)
|
|
|
|
#c2.close()
|
|
|
|
#self.par.i = 0
|
|
|
|
#return 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#def __exit__(self, *exc_info):
|
|
|
|
#if exc_info[0] is not None:
|
|
|
|
#sys.excepthook(*exc_info)
|
|
|
|
#if self.childs is not None:
|
|
|
|
#self.par.replies.extend([conn.recv() for conn in self.childs])
|
|
|
|
#else:
|
|
|
|
#self.par.finish(None)
|
|
|
|
|