multiprocess updates:
- avoid sending keyboard interrupt signals to child processes - parallelizer keeps better track of processes that die unexpectedly - added ability to specify a different executable when starting new processes
This commit is contained in:
parent
03d618e1b8
commit
e21480855f
@ -1,7 +1,8 @@
|
|||||||
"""For starting up remote processes"""
|
"""For starting up remote processes"""
|
||||||
import sys, pickle
|
import sys, pickle, os
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
os.setpgrp() ## prevents signals (notably keyboard interrupt) being forwarded from parent to this process
|
||||||
name, port, authkey, targetStr, path = pickle.load(sys.stdin)
|
name, port, authkey, targetStr, path = pickle.load(sys.stdin)
|
||||||
if path is not None:
|
if path is not None:
|
||||||
## rewrite sys.path without assigning a new object--no idea who already has a reference to the existing list.
|
## rewrite sys.path without assigning a new object--no idea who already has a reference to the existing list.
|
||||||
|
@ -85,12 +85,14 @@ class Parallelize:
|
|||||||
def __exit__(self, *exc_info):
|
def __exit__(self, *exc_info):
|
||||||
|
|
||||||
if self.proc is not None: ## worker
|
if self.proc is not None: ## worker
|
||||||
|
exceptOccurred = exc_info[0] is not None ## hit an exception during processing.
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if exc_info[0] is not None:
|
if exceptOccurred:
|
||||||
sys.excepthook(*exc_info)
|
sys.excepthook(*exc_info)
|
||||||
finally:
|
finally:
|
||||||
#print os.getpid(), 'exit'
|
#print os.getpid(), 'exit'
|
||||||
os._exit(0)
|
os._exit(1 if exceptOccurred else 0)
|
||||||
|
|
||||||
else: ## parent
|
else: ## parent
|
||||||
if self.showProgress:
|
if self.showProgress:
|
||||||
@ -137,6 +139,7 @@ class Parallelize:
|
|||||||
## process events from workers until all have exited.
|
## process events from workers until all have exited.
|
||||||
|
|
||||||
activeChilds = self.childs[:]
|
activeChilds = self.childs[:]
|
||||||
|
self.exitCodes = []
|
||||||
pollInterval = 0.01
|
pollInterval = 0.01
|
||||||
while len(activeChilds) > 0:
|
while len(activeChilds) > 0:
|
||||||
waitingChildren = 0
|
waitingChildren = 0
|
||||||
@ -156,7 +159,8 @@ class Parallelize:
|
|||||||
activeChilds.remove(ch)
|
activeChilds.remove(ch)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
os.waitpid(ch.childPid, 0)
|
pid, exitcode = os.waitpid(ch.childPid, 0)
|
||||||
|
self.exitCodes.append(exitcode)
|
||||||
break
|
break
|
||||||
except OSError as ex:
|
except OSError as ex:
|
||||||
if ex.errno == 4: ## If we get this error, just try again
|
if ex.errno == 4: ## If we get this error, just try again
|
||||||
@ -183,6 +187,11 @@ class Parallelize:
|
|||||||
finally:
|
finally:
|
||||||
if self.showProgress:
|
if self.showProgress:
|
||||||
self.progressDlg.__exit__(None, None, None)
|
self.progressDlg.__exit__(None, None, None)
|
||||||
|
if len(self.exitCodes) < len(self.childs):
|
||||||
|
raise Exception("Parallelizer started %d processes but only received exit codes from %d." % (len(self.childs), len(self.exitCodes)))
|
||||||
|
for code in self.exitCodes:
|
||||||
|
if code != 0:
|
||||||
|
raise Exception("Error occurred in parallel-executed subprocess (console output may have more information).")
|
||||||
return [] ## no tasks for parent process.
|
return [] ## no tasks for parent process.
|
||||||
|
|
||||||
|
|
||||||
|
@ -31,7 +31,7 @@ class Process(RemoteEventHandler):
|
|||||||
ProxyObject for more information.
|
ProxyObject for more information.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, name=None, target=None, copySysPath=True):
|
def __init__(self, name=None, target=None, executable=None, copySysPath=True):
|
||||||
"""
|
"""
|
||||||
============ =============================================================
|
============ =============================================================
|
||||||
Arguments:
|
Arguments:
|
||||||
@ -50,6 +50,8 @@ class Process(RemoteEventHandler):
|
|||||||
target = startEventLoop
|
target = startEventLoop
|
||||||
if name is None:
|
if name is None:
|
||||||
name = str(self)
|
name = str(self)
|
||||||
|
if executable is None:
|
||||||
|
executable = sys.executable
|
||||||
|
|
||||||
## random authentication key
|
## random authentication key
|
||||||
authkey = ''.join([chr(random.getrandbits(7)) for i in range(20)])
|
authkey = ''.join([chr(random.getrandbits(7)) for i in range(20)])
|
||||||
@ -68,7 +70,7 @@ class Process(RemoteEventHandler):
|
|||||||
## start remote process, instruct it to run target function
|
## start remote process, instruct it to run target function
|
||||||
sysPath = sys.path if copySysPath else None
|
sysPath = sys.path if copySysPath else None
|
||||||
bootstrap = os.path.abspath(os.path.join(os.path.dirname(__file__), 'bootstrap.py'))
|
bootstrap = os.path.abspath(os.path.join(os.path.dirname(__file__), 'bootstrap.py'))
|
||||||
self.proc = subprocess.Popen((sys.executable, bootstrap), stdin=subprocess.PIPE)
|
self.proc = subprocess.Popen((executable, bootstrap), stdin=subprocess.PIPE)
|
||||||
targetStr = pickle.dumps(target) ## double-pickle target so that child has a chance to
|
targetStr = pickle.dumps(target) ## double-pickle target so that child has a chance to
|
||||||
## set its sys.path properly before unpickling the target
|
## set its sys.path properly before unpickling the target
|
||||||
pickle.dump((name+'_child', port, authkey, targetStr, sysPath), self.proc.stdin)
|
pickle.dump((name+'_child', port, authkey, targetStr, sysPath), self.proc.stdin)
|
||||||
@ -166,7 +168,8 @@ class ForkedProcess(RemoteEventHandler):
|
|||||||
## - no reading/writing file handles/sockets owned by parent process (stdout is ok)
|
## - no reading/writing file handles/sockets owned by parent process (stdout is ok)
|
||||||
## - don't touch QtGui or QApplication at all; these are landmines.
|
## - don't touch QtGui or QApplication at all; these are landmines.
|
||||||
## - don't let the process call exit handlers
|
## - don't let the process call exit handlers
|
||||||
## -
|
|
||||||
|
os.setpgrp() ## prevents signals (notably keyboard interrupt) being forwarded from parent to this process
|
||||||
|
|
||||||
## close all file handles we do not want shared with parent
|
## close all file handles we do not want shared with parent
|
||||||
conn.close()
|
conn.close()
|
||||||
|
@ -60,7 +60,11 @@ class RemoteEventHandler(object):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def getHandler(cls, pid):
|
def getHandler(cls, pid):
|
||||||
|
try:
|
||||||
return cls.handlers[pid]
|
return cls.handlers[pid]
|
||||||
|
except:
|
||||||
|
print pid, cls.handlers
|
||||||
|
raise
|
||||||
|
|
||||||
def getProxyOption(self, opt):
|
def getProxyOption(self, opt):
|
||||||
return self.proxyOptions[opt]
|
return self.proxyOptions[opt]
|
||||||
@ -88,6 +92,11 @@ class RemoteEventHandler(object):
|
|||||||
except ExitError:
|
except ExitError:
|
||||||
self.exited = True
|
self.exited = True
|
||||||
raise
|
raise
|
||||||
|
except IOError as err:
|
||||||
|
if err.errno == 4: ## interrupted system call; try again
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise
|
||||||
except:
|
except:
|
||||||
print "Error in process %s" % self.name
|
print "Error in process %s" % self.name
|
||||||
sys.excepthook(*sys.exc_info())
|
sys.excepthook(*sys.exc_info())
|
||||||
|
Loading…
Reference in New Issue
Block a user