diff --git a/pyqtgraph/multiprocess/__init__.py b/pyqtgraph/multiprocess/__init__.py index 843b42a3..32a250cb 100644 --- a/pyqtgraph/multiprocess/__init__.py +++ b/pyqtgraph/multiprocess/__init__.py @@ -21,4 +21,4 @@ TODO: from .processes import * from .parallelizer import Parallelize, CanceledError -from .remoteproxy import proxy \ No newline at end of file +from .remoteproxy import proxy, ClosedError, NoResultError diff --git a/pyqtgraph/multiprocess/bootstrap.py b/pyqtgraph/multiprocess/bootstrap.py index bb71a703..f9cb0b0e 100644 --- a/pyqtgraph/multiprocess/bootstrap.py +++ b/pyqtgraph/multiprocess/bootstrap.py @@ -13,16 +13,25 @@ if __name__ == '__main__': #print "key:", ' '.join([str(ord(x)) for x in authkey]) path = opts.pop('path', None) if path is not None: - ## rewrite sys.path without assigning a new object--no idea who already has a reference to the existing list. - while len(sys.path) > 0: - sys.path.pop() - sys.path.extend(path) + if isinstance(path, str): + # if string, just insert this into the path + sys.path.insert(0, path) + else: + # if list, then replace the entire sys.path + ## modify sys.path in place--no idea who already has a reference to the existing list. + while len(sys.path) > 0: + sys.path.pop() + sys.path.extend(path) if opts.pop('pyside', False): import PySide targetStr = opts.pop('targetStr') - target = pickle.loads(targetStr) ## unpickling the target should import everything we need + try: + target = pickle.loads(targetStr) ## unpickling the target should import everything we need + except: + print("Current sys.path:", sys.path) + raise target(**opts) ## Send all other options to the target function sys.exit(0) diff --git a/pyqtgraph/multiprocess/processes.py b/pyqtgraph/multiprocess/processes.py index 02f259e5..7560ff70 100644 --- a/pyqtgraph/multiprocess/processes.py +++ b/pyqtgraph/multiprocess/processes.py @@ -1,4 +1,4 @@ -import subprocess, atexit, os, sys, time, random, socket, signal +import subprocess, atexit, os, sys, time, random, socket, signal, inspect import multiprocessing.connection try: import cPickle as pickle @@ -50,7 +50,9 @@ class Process(RemoteEventHandler): process to process requests from the parent process until it is asked to quit. If you wish to specify a different target, it must be picklable (bound methods are not). - copySysPath If True, copy the contents of sys.path to the remote process + copySysPath If True, copy the contents of sys.path to the remote process. + If False, then only the path required to import pyqtgraph is + added. debug If True, print detailed information about communication with the child process. wrapStdout If True (default on windows) then stdout and stderr from the @@ -82,7 +84,13 @@ class Process(RemoteEventHandler): port = l.address[1] ## start remote process, instruct it to run target function - sysPath = sys.path if copySysPath else None + if copySysPath: + sysPath = sys.path + else: + # what path do we need to make target importable? + mod = inspect.getmodule(target) + modroot = sys.modules[mod.__name__.split('.')[0]] + sysPath = os.path.abspath(os.path.join(os.path.dirname(modroot.__file__), '..')) bootstrap = os.path.abspath(os.path.join(os.path.dirname(__file__), 'bootstrap.py')) self.debugMsg('Starting child process (%s %s)' % (executable, bootstrap)) @@ -182,7 +190,8 @@ def startEventLoop(name, port, authkey, ppid, debug=False): HANDLER.processRequests() # exception raised when the loop should exit time.sleep(0.01) except ClosedError: - break + HANDLER.debugMsg('Exiting server loop.') + sys.exit(0) class ForkedProcess(RemoteEventHandler): @@ -462,21 +471,20 @@ class FileForwarder(threading.Thread): self.start() def run(self): - if self.output == 'stdout': + if self.output == 'stdout' and self.color is not False: while True: line = self.input.readline() with self.lock: cprint.cout(self.color, line, -1) - elif self.output == 'stderr': + elif self.output == 'stderr' and self.color is not False: while True: line = self.input.readline() with self.lock: cprint.cerr(self.color, line, -1) else: + if isinstance(self.output, str): + self.output = getattr(sys, self.output) while True: line = self.input.readline() with self.lock: self.output.write(line) - - - diff --git a/pyqtgraph/multiprocess/remoteproxy.py b/pyqtgraph/multiprocess/remoteproxy.py index 208e17f4..bc02da83 100644 --- a/pyqtgraph/multiprocess/remoteproxy.py +++ b/pyqtgraph/multiprocess/remoteproxy.py @@ -419,7 +419,7 @@ class RemoteEventHandler(object): if opts is None: opts = {} - assert callSync in ['off', 'sync', 'async'], 'callSync must be one of "off", "sync", or "async"' + assert callSync in ['off', 'sync', 'async'], 'callSync must be one of "off", "sync", or "async" (got %r)' % callSync if reqId is None: if callSync != 'off': ## requested return value; use the next available request ID reqId = self.nextRequestId @@ -466,10 +466,7 @@ class RemoteEventHandler(object): return req if callSync == 'sync': - try: - return req.result() - except NoResultError: - return req + return req.result() def close(self, callSync='off', noCleanup=False, **kwds): try: @@ -572,6 +569,10 @@ class RemoteEventHandler(object): self.proxies[ref] = proxy._proxyId def deleteProxy(self, ref): + if self.send is None: + # this can happen during shutdown + return + with self.proxyLock: proxyId = self.proxies.pop(ref)