Merge pull request #555 from acq4/multiprocess-updates

Multiprocess updates
This commit is contained in:
Luke Campagnola 2017-09-13 20:46:52 -07:00 committed by GitHub
commit 1f77433755
4 changed files with 38 additions and 20 deletions

View File

@ -21,4 +21,4 @@ TODO:
from .processes import * from .processes import *
from .parallelizer import Parallelize, CanceledError from .parallelizer import Parallelize, CanceledError
from .remoteproxy import proxy from .remoteproxy import proxy, ClosedError, NoResultError

View File

@ -13,16 +13,25 @@ if __name__ == '__main__':
#print "key:", ' '.join([str(ord(x)) for x in authkey]) #print "key:", ' '.join([str(ord(x)) for x in authkey])
path = opts.pop('path', None) path = opts.pop('path', None)
if path is not None: if path is not None:
## rewrite sys.path without assigning a new object--no idea who already has a reference to the existing list. if isinstance(path, str):
while len(sys.path) > 0: # if string, just insert this into the path
sys.path.pop() sys.path.insert(0, path)
sys.path.extend(path) else:
# if list, then replace the entire sys.path
## modify sys.path in place--no idea who already has a reference to the existing list.
while len(sys.path) > 0:
sys.path.pop()
sys.path.extend(path)
if opts.pop('pyside', False): if opts.pop('pyside', False):
import PySide import PySide
targetStr = opts.pop('targetStr') targetStr = opts.pop('targetStr')
target = pickle.loads(targetStr) ## unpickling the target should import everything we need try:
target = pickle.loads(targetStr) ## unpickling the target should import everything we need
except:
print("Current sys.path:", sys.path)
raise
target(**opts) ## Send all other options to the target function target(**opts) ## Send all other options to the target function
sys.exit(0) sys.exit(0)

View File

@ -1,4 +1,4 @@
import subprocess, atexit, os, sys, time, random, socket, signal import subprocess, atexit, os, sys, time, random, socket, signal, inspect
import multiprocessing.connection import multiprocessing.connection
try: try:
import cPickle as pickle import cPickle as pickle
@ -50,7 +50,9 @@ class Process(RemoteEventHandler):
process to process requests from the parent process until it process to process requests from the parent process until it
is asked to quit. If you wish to specify a different target, is asked to quit. If you wish to specify a different target,
it must be picklable (bound methods are not). it must be picklable (bound methods are not).
copySysPath If True, copy the contents of sys.path to the remote process copySysPath If True, copy the contents of sys.path to the remote process.
If False, then only the path required to import pyqtgraph is
added.
debug If True, print detailed information about communication debug If True, print detailed information about communication
with the child process. with the child process.
wrapStdout If True (default on windows) then stdout and stderr from the wrapStdout If True (default on windows) then stdout and stderr from the
@ -82,7 +84,13 @@ class Process(RemoteEventHandler):
port = l.address[1] port = l.address[1]
## start remote process, instruct it to run target function ## start remote process, instruct it to run target function
sysPath = sys.path if copySysPath else None if copySysPath:
sysPath = sys.path
else:
# what path do we need to make target importable?
mod = inspect.getmodule(target)
modroot = sys.modules[mod.__name__.split('.')[0]]
sysPath = os.path.abspath(os.path.join(os.path.dirname(modroot.__file__), '..'))
bootstrap = os.path.abspath(os.path.join(os.path.dirname(__file__), 'bootstrap.py')) bootstrap = os.path.abspath(os.path.join(os.path.dirname(__file__), 'bootstrap.py'))
self.debugMsg('Starting child process (%s %s)' % (executable, bootstrap)) self.debugMsg('Starting child process (%s %s)' % (executable, bootstrap))
@ -182,7 +190,8 @@ def startEventLoop(name, port, authkey, ppid, debug=False):
HANDLER.processRequests() # exception raised when the loop should exit HANDLER.processRequests() # exception raised when the loop should exit
time.sleep(0.01) time.sleep(0.01)
except ClosedError: except ClosedError:
break HANDLER.debugMsg('Exiting server loop.')
sys.exit(0)
class ForkedProcess(RemoteEventHandler): class ForkedProcess(RemoteEventHandler):
@ -462,21 +471,20 @@ class FileForwarder(threading.Thread):
self.start() self.start()
def run(self): def run(self):
if self.output == 'stdout': if self.output == 'stdout' and self.color is not False:
while True: while True:
line = self.input.readline() line = self.input.readline()
with self.lock: with self.lock:
cprint.cout(self.color, line, -1) cprint.cout(self.color, line, -1)
elif self.output == 'stderr': elif self.output == 'stderr' and self.color is not False:
while True: while True:
line = self.input.readline() line = self.input.readline()
with self.lock: with self.lock:
cprint.cerr(self.color, line, -1) cprint.cerr(self.color, line, -1)
else: else:
if isinstance(self.output, str):
self.output = getattr(sys, self.output)
while True: while True:
line = self.input.readline() line = self.input.readline()
with self.lock: with self.lock:
self.output.write(line) self.output.write(line)

View File

@ -419,7 +419,7 @@ class RemoteEventHandler(object):
if opts is None: if opts is None:
opts = {} opts = {}
assert callSync in ['off', 'sync', 'async'], 'callSync must be one of "off", "sync", or "async"' assert callSync in ['off', 'sync', 'async'], 'callSync must be one of "off", "sync", or "async" (got %r)' % callSync
if reqId is None: if reqId is None:
if callSync != 'off': ## requested return value; use the next available request ID if callSync != 'off': ## requested return value; use the next available request ID
reqId = self.nextRequestId reqId = self.nextRequestId
@ -466,10 +466,7 @@ class RemoteEventHandler(object):
return req return req
if callSync == 'sync': if callSync == 'sync':
try: return req.result()
return req.result()
except NoResultError:
return req
def close(self, callSync='off', noCleanup=False, **kwds): def close(self, callSync='off', noCleanup=False, **kwds):
try: try:
@ -572,6 +569,10 @@ class RemoteEventHandler(object):
self.proxies[ref] = proxy._proxyId self.proxies[ref] = proxy._proxyId
def deleteProxy(self, ref): def deleteProxy(self, ref):
if self.send is None:
# this can happen during shutdown
return
with self.proxyLock: with self.proxyLock:
proxyId = self.proxies.pop(ref) proxyId = self.proxies.pop(ref)