Fixes for multiprocess / RemoteGraphicsView:
- Process now optionally wraps stdout/stderr from child process to circumvent a python bug - Added windows error number for port-in-use check - fixed segv caused by lost QImage input in pyside
This commit is contained in:
parent
1418358bfb
commit
5b156cd3d3
@ -20,6 +20,7 @@ if __name__ == '__main__':
|
|||||||
|
|
||||||
if opts.pop('pyside', False):
|
if opts.pop('pyside', False):
|
||||||
import PySide
|
import PySide
|
||||||
|
|
||||||
|
|
||||||
targetStr = opts.pop('targetStr')
|
targetStr = opts.pop('targetStr')
|
||||||
target = pickle.loads(targetStr) ## unpickling the target should import everything we need
|
target = pickle.loads(targetStr) ## unpickling the target should import everything we need
|
||||||
|
@ -35,7 +35,7 @@ class Process(RemoteEventHandler):
|
|||||||
ProxyObject for more information.
|
ProxyObject for more information.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, name=None, target=None, executable=None, copySysPath=True, debug=False, timeout=20):
|
def __init__(self, name=None, target=None, executable=None, copySysPath=True, debug=False, timeout=20, wrapStdout=None):
|
||||||
"""
|
"""
|
||||||
============ =============================================================
|
============ =============================================================
|
||||||
Arguments:
|
Arguments:
|
||||||
@ -48,9 +48,13 @@ class Process(RemoteEventHandler):
|
|||||||
it must be picklable (bound methods are not).
|
it must be picklable (bound methods are not).
|
||||||
copySysPath If True, copy the contents of sys.path to the remote process
|
copySysPath If True, copy the contents of sys.path to the remote process
|
||||||
debug If True, print detailed information about communication
|
debug If True, print detailed information about communication
|
||||||
with the child process. Note that this option may cause
|
with the child process.
|
||||||
strange behavior on some systems due to a python bug:
|
wrapStdout If True (default on windows) then stdout and stderr from the
|
||||||
http://bugs.python.org/issue3905
|
child process will be caught by the parent process and
|
||||||
|
forwarded to its stdout/stderr. This provides a workaround
|
||||||
|
for a python bug: http://bugs.python.org/issue3905
|
||||||
|
but has the side effect that child output is significantly
|
||||||
|
delayed relative to the parent output.
|
||||||
============ =============================================================
|
============ =============================================================
|
||||||
"""
|
"""
|
||||||
if target is None:
|
if target is None:
|
||||||
@ -76,25 +80,32 @@ class Process(RemoteEventHandler):
|
|||||||
l = multiprocessing.connection.Listener(('localhost', int(port)), authkey=authkey)
|
l = multiprocessing.connection.Listener(('localhost', int(port)), authkey=authkey)
|
||||||
break
|
break
|
||||||
except socket.error as ex:
|
except socket.error as ex:
|
||||||
if ex.errno != 98:
|
if ex.errno != 98 and ex.errno != 10048: # unix=98, win=10048
|
||||||
raise
|
raise
|
||||||
port += 1
|
port += 1
|
||||||
|
|
||||||
|
|
||||||
## start remote process, instruct it to run target function
|
## start remote process, instruct it to run target function
|
||||||
sysPath = sys.path if copySysPath else None
|
sysPath = sys.path if copySysPath else None
|
||||||
bootstrap = os.path.abspath(os.path.join(os.path.dirname(__file__), 'bootstrap.py'))
|
bootstrap = os.path.abspath(os.path.join(os.path.dirname(__file__), 'bootstrap.py'))
|
||||||
self.debugMsg('Starting child process (%s %s)' % (executable, bootstrap))
|
self.debugMsg('Starting child process (%s %s)' % (executable, bootstrap))
|
||||||
|
|
||||||
## note: we need all three streams to have their own PIPE due to this bug:
|
if wrapStdout is None:
|
||||||
## http://bugs.python.org/issue3905
|
wrapStdout = sys.platform.startswith('win')
|
||||||
if debug is True: # when debugging, we need to keep the usual stdout
|
|
||||||
stdout = sys.stdout
|
if wrapStdout:
|
||||||
stderr = sys.stderr
|
## note: we need all three streams to have their own PIPE due to this bug:
|
||||||
else:
|
## http://bugs.python.org/issue3905
|
||||||
stdout = subprocess.PIPE
|
stdout = subprocess.PIPE
|
||||||
stderr = subprocess.PIPE
|
stderr = subprocess.PIPE
|
||||||
self.proc = subprocess.Popen((executable, bootstrap), stdin=subprocess.PIPE, stdout=stdout, stderr=stderr)
|
self.proc = subprocess.Popen((executable, bootstrap), stdin=subprocess.PIPE, stdout=stdout, stderr=stderr)
|
||||||
|
## to circumvent the bug and still make the output visible, we use
|
||||||
|
## background threads to pass data from pipes to stdout/stderr
|
||||||
|
self._stdoutForwarder = FileForwarder(self.proc.stdout, "stdout")
|
||||||
|
self._stderrForwarder = FileForwarder(self.proc.stderr, "stderr")
|
||||||
|
else:
|
||||||
|
self.proc = subprocess.Popen((executable, bootstrap), stdin=subprocess.PIPE)
|
||||||
|
|
||||||
targetStr = pickle.dumps(target) ## double-pickle target so that child has a chance to
|
targetStr = pickle.dumps(target) ## double-pickle target so that child has a chance to
|
||||||
## set its sys.path properly before unpickling the target
|
## set its sys.path properly before unpickling the target
|
||||||
pid = os.getpid() # we must send pid to child because windows does not have getppid
|
pid = os.getpid() # we must send pid to child because windows does not have getppid
|
||||||
@ -129,6 +140,7 @@ class Process(RemoteEventHandler):
|
|||||||
self.debugMsg('Connected to child process.')
|
self.debugMsg('Connected to child process.')
|
||||||
|
|
||||||
atexit.register(self.join)
|
atexit.register(self.join)
|
||||||
|
|
||||||
|
|
||||||
def join(self, timeout=10):
|
def join(self, timeout=10):
|
||||||
self.debugMsg('Joining child process..')
|
self.debugMsg('Joining child process..')
|
||||||
@ -140,7 +152,16 @@ class Process(RemoteEventHandler):
|
|||||||
raise Exception('Timed out waiting for remote process to end.')
|
raise Exception('Timed out waiting for remote process to end.')
|
||||||
time.sleep(0.05)
|
time.sleep(0.05)
|
||||||
self.debugMsg('Child process exited. (%d)' % self.proc.returncode)
|
self.debugMsg('Child process exited. (%d)' % self.proc.returncode)
|
||||||
|
|
||||||
|
def debugMsg(self, msg):
|
||||||
|
if hasattr(self, '_stdoutForwarder'):
|
||||||
|
## Lock output from subprocess to make sure we do not get line collisions
|
||||||
|
with self._stdoutForwarder.lock:
|
||||||
|
with self._stderrForwarder.lock:
|
||||||
|
RemoteEventHandler.debugMsg(self, msg)
|
||||||
|
else:
|
||||||
|
RemoteEventHandler.debugMsg(self, msg)
|
||||||
|
|
||||||
|
|
||||||
def startEventLoop(name, port, authkey, ppid, debug=False):
|
def startEventLoop(name, port, authkey, ppid, debug=False):
|
||||||
if debug:
|
if debug:
|
||||||
@ -409,4 +430,43 @@ def startQtEventLoop(name, port, authkey, ppid, debug=False):
|
|||||||
HANDLER.startEventTimer()
|
HANDLER.startEventTimer()
|
||||||
app.exec_()
|
app.exec_()
|
||||||
|
|
||||||
|
import threading
|
||||||
|
class FileForwarder(threading.Thread):
|
||||||
|
"""
|
||||||
|
Background thread that forwards data from one pipe to another.
|
||||||
|
This is used to catch data from stdout/stderr of the child process
|
||||||
|
and print it back out to stdout/stderr. We need this because this
|
||||||
|
bug: http://bugs.python.org/issue3905 _requires_ us to catch
|
||||||
|
stdout/stderr.
|
||||||
|
|
||||||
|
*output* may be a file or 'stdout' or 'stderr'. In the latter cases,
|
||||||
|
sys.stdout/stderr are retrieved once for every line that is output,
|
||||||
|
which ensures that the correct behavior is achieved even if
|
||||||
|
sys.stdout/stderr are replaced at runtime.
|
||||||
|
"""
|
||||||
|
def __init__(self, input, output):
|
||||||
|
threading.Thread.__init__(self)
|
||||||
|
self.input = input
|
||||||
|
self.output = output
|
||||||
|
self.lock = threading.Lock()
|
||||||
|
self.start()
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
if self.output == 'stdout':
|
||||||
|
while True:
|
||||||
|
line = self.input.readline()
|
||||||
|
with self.lock:
|
||||||
|
sys.stdout.write(line)
|
||||||
|
elif self.output == 'stderr':
|
||||||
|
while True:
|
||||||
|
line = self.input.readline()
|
||||||
|
with self.lock:
|
||||||
|
sys.stderr.write(line)
|
||||||
|
else:
|
||||||
|
while True:
|
||||||
|
line = self.input.readline()
|
||||||
|
with self.lock:
|
||||||
|
self.output.write(line)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -19,18 +19,26 @@ class RemoteGraphicsView(QtGui.QWidget):
|
|||||||
"""
|
"""
|
||||||
def __init__(self, parent=None, *args, **kwds):
|
def __init__(self, parent=None, *args, **kwds):
|
||||||
"""
|
"""
|
||||||
The keyword arguments 'debug' and 'name', if specified, are passed to QtProcess.__init__().
|
The keyword arguments 'useOpenGL' and 'backgound', if specified, are passed to the remote
|
||||||
|
GraphicsView.__init__(). All other keyword arguments are passed to multiprocess.QtProcess.__init__().
|
||||||
"""
|
"""
|
||||||
self._img = None
|
self._img = None
|
||||||
self._imgReq = None
|
self._imgReq = None
|
||||||
self._sizeHint = (640,480) ## no clue why this is needed, but it seems to be the default sizeHint for GraphicsView.
|
self._sizeHint = (640,480) ## no clue why this is needed, but it seems to be the default sizeHint for GraphicsView.
|
||||||
## without it, the widget will not compete for space against another GraphicsView.
|
## without it, the widget will not compete for space against another GraphicsView.
|
||||||
QtGui.QWidget.__init__(self)
|
QtGui.QWidget.__init__(self)
|
||||||
self._proc = mp.QtProcess(debug=kwds.pop('debug', False), name=kwds.pop('name', None))
|
|
||||||
|
# separate local keyword arguments from remote.
|
||||||
|
remoteKwds = {}
|
||||||
|
for kwd in ['useOpenGL', 'background']:
|
||||||
|
if kwd in kwds:
|
||||||
|
remoteKwds[kwd] = kwds.pop(kwd)
|
||||||
|
|
||||||
|
self._proc = mp.QtProcess(**kwds)
|
||||||
self.pg = self._proc._import('pyqtgraph')
|
self.pg = self._proc._import('pyqtgraph')
|
||||||
self.pg.setConfigOptions(**self.pg.CONFIG_OPTIONS)
|
self.pg.setConfigOptions(**self.pg.CONFIG_OPTIONS)
|
||||||
rpgRemote = self._proc._import('pyqtgraph.widgets.RemoteGraphicsView')
|
rpgRemote = self._proc._import('pyqtgraph.widgets.RemoteGraphicsView')
|
||||||
self._view = rpgRemote.Renderer(*args, **kwds)
|
self._view = rpgRemote.Renderer(*args, **remoteKwds)
|
||||||
self._view._setProxyOptions(deferGetattr=True)
|
self._view._setProxyOptions(deferGetattr=True)
|
||||||
|
|
||||||
self.setFocusPolicy(QtCore.Qt.StrongFocus)
|
self.setFocusPolicy(QtCore.Qt.StrongFocus)
|
||||||
@ -72,7 +80,9 @@ class RemoteGraphicsView(QtGui.QWidget):
|
|||||||
else:
|
else:
|
||||||
self.shm = mmap.mmap(self.shmFile.fileno(), size, mmap.MAP_SHARED, mmap.PROT_READ)
|
self.shm = mmap.mmap(self.shmFile.fileno(), size, mmap.MAP_SHARED, mmap.PROT_READ)
|
||||||
self.shm.seek(0)
|
self.shm.seek(0)
|
||||||
self._img = QtGui.QImage(self.shm.read(w*h*4), w, h, QtGui.QImage.Format_ARGB32)
|
data = self.shm.read(w*h*4)
|
||||||
|
self._img = QtGui.QImage(data, w, h, QtGui.QImage.Format_ARGB32)
|
||||||
|
self._img.data = data # data must be kept alive or PySide 1.2.1 (and probably earlier) will crash.
|
||||||
self.update()
|
self.update()
|
||||||
|
|
||||||
def paintEvent(self, ev):
|
def paintEvent(self, ev):
|
||||||
@ -118,7 +128,12 @@ class RemoteGraphicsView(QtGui.QWidget):
|
|||||||
def remoteProcess(self):
|
def remoteProcess(self):
|
||||||
"""Return the remote process handle. (see multiprocess.remoteproxy.RemoteEventHandler)"""
|
"""Return the remote process handle. (see multiprocess.remoteproxy.RemoteEventHandler)"""
|
||||||
return self._proc
|
return self._proc
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""Close the remote process. After this call, the widget will no longer be updated."""
|
||||||
|
self._proc.close()
|
||||||
|
|
||||||
|
|
||||||
class Renderer(GraphicsView):
|
class Renderer(GraphicsView):
|
||||||
## Created by the remote process to handle render requests
|
## Created by the remote process to handle render requests
|
||||||
|
|
||||||
@ -146,9 +161,9 @@ class Renderer(GraphicsView):
|
|||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.shm.close()
|
self.shm.close()
|
||||||
if sys.platform.startswith('win'):
|
if not sys.platform.startswith('win'):
|
||||||
self.shmFile.close()
|
self.shmFile.close()
|
||||||
|
|
||||||
def shmFileName(self):
|
def shmFileName(self):
|
||||||
if sys.platform.startswith('win'):
|
if sys.platform.startswith('win'):
|
||||||
return self.shmtag
|
return self.shmtag
|
||||||
|
Loading…
Reference in New Issue
Block a user