import os, sys, time, multiprocessing from processes import ForkedProcess from remoteproxy import ExitError class CanceledError(Exception): """Raised when the progress dialog is canceled during a processing operation.""" pass class Parallelize: """ Class for ultra-simple inline parallelization on multi-core CPUs Example:: ## Here is the serial (single-process) task: tasks = [1, 2, 4, 8] results = [] for task in tasks: result = processTask(task) results.append(result) print results ## Here is the parallelized version: tasks = [1, 2, 4, 8] results = [] with Parallelize(tasks, workers=4, results=results) as tasker: for task in tasker: result = processTask(task) tasker.results.append(result) print results The only major caveat is that *result* in the example above must be picklable, since it is automatically sent via pipe back to the parent process. """ def __init__(self, tasks, workers=None, block=True, progressDialog=None, randomReseed=True, **kwds): """ =============== =================================================================== Arguments: tasks list of objects to be processed (Parallelize will determine how to distribute the tasks) workers number of worker processes or None to use number of CPUs in the system progressDialog optional dict of arguments for ProgressDialog to update while tasks are processed randomReseed If True, each forked process will reseed its random number generator to ensure independent results. Works with the built-in random and numpy.random. kwds objects to be shared by proxy with child processes (they will appear as attributes of the tasker) =============== =================================================================== """ ## Generate progress dialog. ## Note that we want to avoid letting forked child processes play with progress dialogs.. self.showProgress = False if progressDialog is not None: self.showProgress = True if isinstance(progressDialog, basestring): progressDialog = {'labelText': progressDialog} import pyqtgraph as pg self.progressDlg = pg.ProgressDialog(**progressDialog) if workers is None: workers = self.suggestedWorkerCount() if not hasattr(os, 'fork'): workers = 1 self.workers = workers self.tasks = list(tasks) self.reseed = randomReseed self.kwds = kwds.copy() self.kwds['_taskStarted'] = self._taskStarted def __enter__(self): self.proc = None if self.workers == 1: return self.runSerial() else: return self.runParallel() def __exit__(self, *exc_info): if self.proc is not None: ## worker try: if exc_info[0] is not None: sys.excepthook(*exc_info) finally: #print os.getpid(), 'exit' os._exit(0) else: ## parent if self.showProgress: self.progressDlg.__exit__(None, None, None) def runSerial(self): if self.showProgress: self.progressDlg.__enter__() self.progressDlg.setMaximum(len(self.tasks)) self.progress = {os.getpid(): []} return Tasker(None, self.tasks, self.kwds) def runParallel(self): self.childs = [] ## break up tasks into one set per worker workers = self.workers chunks = [[] for i in xrange(workers)] i = 0 for i in range(len(self.tasks)): chunks[i%workers].append(self.tasks[i]) ## fork and assign tasks to each worker for i in range(workers): proc = ForkedProcess(target=None, preProxy=self.kwds, randomReseed=self.reseed) if not proc.isParent: self.proc = proc return Tasker(proc, chunks[i], proc.forkedProxies) else: self.childs.append(proc) ## Keep track of the progress of each worker independently. self.progress = {ch.childPid: [] for ch in self.childs} ## for each child process, self.progress[pid] is a list ## of task indexes. The last index is the task currently being ## processed; all others are finished. try: if self.showProgress: self.progressDlg.__enter__() self.progressDlg.setMaximum(len(self.tasks)) ## process events from workers until all have exited. activeChilds = self.childs[:] pollInterval = 0.01 while len(activeChilds) > 0: waitingChildren = 0 rem = [] for ch in activeChilds: try: n = ch.processRequests() if n > 0: waitingChildren += 1 except ExitError: #print ch.childPid, 'process finished' rem.append(ch) if self.showProgress: self.progressDlg += 1 #print "remove:", [ch.childPid for ch in rem] for ch in rem: activeChilds.remove(ch) while True: try: os.waitpid(ch.childPid, 0) break except OSError as ex: if ex.errno == 4: ## If we get this error, just try again continue #print "Ignored system call interruption" else: raise #print [ch.childPid for ch in activeChilds] if self.showProgress and self.progressDlg.wasCanceled(): for ch in activeChilds: ch.kill() raise CanceledError() ## adjust polling interval--prefer to get exactly 1 event per poll cycle. if waitingChildren > 1: pollInterval *= 0.7 elif waitingChildren == 0: pollInterval /= 0.7 pollInterval = max(min(pollInterval, 0.5), 0.0005) ## but keep it within reasonable limits time.sleep(pollInterval) finally: if self.showProgress: self.progressDlg.__exit__(None, None, None) return [] ## no tasks for parent process. @staticmethod def suggestedWorkerCount(): return multiprocessing.cpu_count() ## is this really the best option? def _taskStarted(self, pid, i, **kwds): ## called remotely by tasker to indicate it has started working on task i #print pid, 'reported starting task', i if self.showProgress: if len(self.progress[pid]) > 0: self.progressDlg += 1 if pid == os.getpid(): ## single-worker process if self.progressDlg.wasCanceled(): raise CanceledError() self.progress[pid].append(i) class Tasker: def __init__(self, proc, tasks, kwds): self.proc = proc self.tasks = tasks for k, v in kwds.iteritems(): setattr(self, k, v) def __iter__(self): ## we could fix this up such that tasks are retrieved from the parent process one at a time.. for i, task in enumerate(self.tasks): self.index = i #print os.getpid(), 'starting task', i self._taskStarted(os.getpid(), i, _callSync='off') yield task if self.proc is not None: #print os.getpid(), 'no more tasks' self.proc.close() #class Parallelizer: #""" #Use:: #p = Parallelizer() #with p(4) as i: #p.finish(do_work(i)) #print p.results() #""" #def __init__(self): #pass #def __call__(self, n): #self.replies = [] #self.conn = None ## indicates this is the parent process #return Session(self, n) #def finish(self, data): #if self.conn is None: #self.replies.append((self.i, data)) #else: ##print "send", self.i, data #self.conn.send((self.i, data)) #os._exit(0) #def result(self): #print self.replies #class Session: #def __init__(self, par, n): #self.par = par #self.n = n #def __enter__(self): #self.childs = [] #for i in range(1, self.n): #c1, c2 = multiprocessing.Pipe() #pid = os.fork() #if pid == 0: ## child #self.par.i = i #self.par.conn = c2 #self.childs = None #c1.close() #return i #else: #self.childs.append(c1) #c2.close() #self.par.i = 0 #return 0 #def __exit__(self, *exc_info): #if exc_info[0] is not None: #sys.excepthook(*exc_info) #if self.childs is not None: #self.par.replies.extend([conn.recv() for conn in self.childs]) #else: #self.par.finish(None)