From e21480855fd66fdb9161c8b77e49340a3713a9ba Mon Sep 17 00:00:00 2001 From: Luke Campagnola <> Date: Fri, 17 Aug 2012 16:15:13 -0400 Subject: [PATCH] multiprocess updates: - avoid sending keyboard interrupt signals to child processes - parallelizer keeps better track of processes that die unexpectedly - added ability to specify a different executable when starting new processes --- multiprocess/bootstrap.py | 3 ++- multiprocess/parallelizer.py | 15 ++++++++++++--- multiprocess/processes.py | 9 ++++++--- multiprocess/remoteproxy.py | 11 ++++++++++- 4 files changed, 30 insertions(+), 8 deletions(-) diff --git a/multiprocess/bootstrap.py b/multiprocess/bootstrap.py index 196d34c8..e0d1c02c 100644 --- a/multiprocess/bootstrap.py +++ b/multiprocess/bootstrap.py @@ -1,7 +1,8 @@ """For starting up remote processes""" -import sys, pickle +import sys, pickle, os if __name__ == '__main__': + os.setpgrp() ## prevents signals (notably keyboard interrupt) being forwarded from parent to this process name, port, authkey, targetStr, path = pickle.load(sys.stdin) if path is not None: ## rewrite sys.path without assigning a new object--no idea who already has a reference to the existing list. diff --git a/multiprocess/parallelizer.py b/multiprocess/parallelizer.py index eda82d31..6447efae 100644 --- a/multiprocess/parallelizer.py +++ b/multiprocess/parallelizer.py @@ -85,12 +85,14 @@ class Parallelize: def __exit__(self, *exc_info): if self.proc is not None: ## worker + exceptOccurred = exc_info[0] is not None ## hit an exception during processing. + try: - if exc_info[0] is not None: + if exceptOccurred: sys.excepthook(*exc_info) finally: #print os.getpid(), 'exit' - os._exit(0) + os._exit(1 if exceptOccurred else 0) else: ## parent if self.showProgress: @@ -137,6 +139,7 @@ class Parallelize: ## process events from workers until all have exited. activeChilds = self.childs[:] + self.exitCodes = [] pollInterval = 0.01 while len(activeChilds) > 0: waitingChildren = 0 @@ -156,7 +159,8 @@ class Parallelize: activeChilds.remove(ch) while True: try: - os.waitpid(ch.childPid, 0) + pid, exitcode = os.waitpid(ch.childPid, 0) + self.exitCodes.append(exitcode) break except OSError as ex: if ex.errno == 4: ## If we get this error, just try again @@ -183,6 +187,11 @@ class Parallelize: finally: if self.showProgress: self.progressDlg.__exit__(None, None, None) + if len(self.exitCodes) < len(self.childs): + raise Exception("Parallelizer started %d processes but only received exit codes from %d." % (len(self.childs), len(self.exitCodes))) + for code in self.exitCodes: + if code != 0: + raise Exception("Error occurred in parallel-executed subprocess (console output may have more information).") return [] ## no tasks for parent process. diff --git a/multiprocess/processes.py b/multiprocess/processes.py index c14b8b0a..a4769679 100644 --- a/multiprocess/processes.py +++ b/multiprocess/processes.py @@ -31,7 +31,7 @@ class Process(RemoteEventHandler): ProxyObject for more information. """ - def __init__(self, name=None, target=None, copySysPath=True): + def __init__(self, name=None, target=None, executable=None, copySysPath=True): """ ============ ============================================================= Arguments: @@ -50,6 +50,8 @@ class Process(RemoteEventHandler): target = startEventLoop if name is None: name = str(self) + if executable is None: + executable = sys.executable ## random authentication key authkey = ''.join([chr(random.getrandbits(7)) for i in range(20)]) @@ -68,7 +70,7 @@ class Process(RemoteEventHandler): ## start remote process, instruct it to run target function sysPath = sys.path if copySysPath else None bootstrap = os.path.abspath(os.path.join(os.path.dirname(__file__), 'bootstrap.py')) - self.proc = subprocess.Popen((sys.executable, bootstrap), stdin=subprocess.PIPE) + self.proc = subprocess.Popen((executable, bootstrap), stdin=subprocess.PIPE) targetStr = pickle.dumps(target) ## double-pickle target so that child has a chance to ## set its sys.path properly before unpickling the target pickle.dump((name+'_child', port, authkey, targetStr, sysPath), self.proc.stdin) @@ -166,7 +168,8 @@ class ForkedProcess(RemoteEventHandler): ## - no reading/writing file handles/sockets owned by parent process (stdout is ok) ## - don't touch QtGui or QApplication at all; these are landmines. ## - don't let the process call exit handlers - ## - + + os.setpgrp() ## prevents signals (notably keyboard interrupt) being forwarded from parent to this process ## close all file handles we do not want shared with parent conn.close() diff --git a/multiprocess/remoteproxy.py b/multiprocess/remoteproxy.py index c15c5b9d..4f13b5c6 100644 --- a/multiprocess/remoteproxy.py +++ b/multiprocess/remoteproxy.py @@ -60,7 +60,11 @@ class RemoteEventHandler(object): @classmethod def getHandler(cls, pid): - return cls.handlers[pid] + try: + return cls.handlers[pid] + except: + print pid, cls.handlers + raise def getProxyOption(self, opt): return self.proxyOptions[opt] @@ -88,6 +92,11 @@ class RemoteEventHandler(object): except ExitError: self.exited = True raise + except IOError as err: + if err.errno == 4: ## interrupted system call; try again + continue + else: + raise except: print "Error in process %s" % self.name sys.excepthook(*sys.exc_info())