diff --git a/pyqtgraph/multiprocess/bootstrap.py b/pyqtgraph/multiprocess/bootstrap.py index 28818135..4ecfb7da 100644 --- a/pyqtgraph/multiprocess/bootstrap.py +++ b/pyqtgraph/multiprocess/bootstrap.py @@ -5,20 +5,25 @@ if __name__ == '__main__': if hasattr(os, 'setpgrp'): os.setpgrp() ## prevents signals (notably keyboard interrupt) being forwarded from parent to this process if sys.version[0] == '3': - name, port, authkey, ppid, targetStr, path, pyside = pickle.load(sys.stdin.buffer) + #name, port, authkey, ppid, targetStr, path, pyside = pickle.load(sys.stdin.buffer) + opts = pickle.load(sys.stdin.buffer) else: - name, port, authkey, ppid, targetStr, path, pyside = pickle.load(sys.stdin) + #name, port, authkey, ppid, targetStr, path, pyside = pickle.load(sys.stdin) + opts = pickle.load(sys.stdin) #print "key:", ' '.join([str(ord(x)) for x in authkey]) + path = opts.pop('path', None) if path is not None: ## rewrite sys.path without assigning a new object--no idea who already has a reference to the existing list. while len(sys.path) > 0: sys.path.pop() sys.path.extend(path) - if pyside: + if opts.pop('pyside', False): import PySide #import pyqtgraph #import pyqtgraph.multiprocess.processes + targetStr = opts.pop('targetStr') target = pickle.loads(targetStr) ## unpickling the target should import everything we need - target(name, port, authkey, ppid) + #target(name, port, authkey, ppid) + target(**opts) ## Send all other options to the target function sys.exit(0) diff --git a/pyqtgraph/multiprocess/processes.py b/pyqtgraph/multiprocess/processes.py index 4c3be4e9..93a109ed 100644 --- a/pyqtgraph/multiprocess/processes.py +++ b/pyqtgraph/multiprocess/processes.py @@ -35,7 +35,7 @@ class Process(RemoteEventHandler): ProxyObject for more information. """ - def __init__(self, name=None, target=None, executable=None, copySysPath=True): + def __init__(self, name=None, target=None, executable=None, copySysPath=True, debug=False): """ ============ ============================================================= Arguments: @@ -46,7 +46,9 @@ class Process(RemoteEventHandler): process to process requests from the parent process until it is asked to quit. If you wish to specify a different target, it must be picklable (bound methods are not). - copySysPath If true, copy the contents of sys.path to the remote process + copySysPath If True, copy the contents of sys.path to the remote process + debug If True, print detailed information about communication + with the child process. ============ ============================================================= """ @@ -56,6 +58,7 @@ class Process(RemoteEventHandler): name = str(self) if executable is None: executable = sys.executable + self.debug = debug ## random authentication key authkey = os.urandom(20) @@ -75,23 +78,46 @@ class Process(RemoteEventHandler): ## start remote process, instruct it to run target function sysPath = sys.path if copySysPath else None bootstrap = os.path.abspath(os.path.join(os.path.dirname(__file__), 'bootstrap.py')) + self.debugMsg('Starting child process (%s %s)' % (executable, bootstrap)) self.proc = subprocess.Popen((executable, bootstrap), stdin=subprocess.PIPE) targetStr = pickle.dumps(target) ## double-pickle target so that child has a chance to ## set its sys.path properly before unpickling the target - pid = os.getpid() # we must sent pid to child because windows does not have getppid + pid = os.getpid() # we must send pid to child because windows does not have getppid pyside = USE_PYSIDE ## Send everything the remote process needs to start correctly - pickle.dump((name+'_child', port, authkey, pid, targetStr, sysPath, pyside), self.proc.stdin) + data = dict( + name=name+'_child', + port=port, + authkey=authkey, + ppid=pid, + targetStr=targetStr, + path=sysPath, + pyside=pyside, + debug=debug + ) + pickle.dump(data, self.proc.stdin) self.proc.stdin.close() ## open connection for remote process - conn = l.accept() - RemoteEventHandler.__init__(self, conn, name+'_parent', pid=self.proc.pid) + self.debugMsg('Listening for child process..') + while True: + try: + conn = l.accept() + break + except IOError as err: + if err.errno == 4: # interrupted; try again + continue + else: + raise + + RemoteEventHandler.__init__(self, conn, name+'_parent', pid=self.proc.pid, debug=debug) + self.debugMsg('Connected to child process.') atexit.register(self.join) def join(self, timeout=10): + self.debugMsg('Joining child process..') if self.proc.poll() is None: self.close() start = time.time() @@ -99,13 +125,14 @@ class Process(RemoteEventHandler): if timeout is not None and time.time() - start > timeout: raise Exception('Timed out waiting for remote process to end.') time.sleep(0.05) + self.debugMsg('Child process exited. (%d)' % self.proc.returncode) -def startEventLoop(name, port, authkey, ppid): +def startEventLoop(name, port, authkey, ppid, debug=False): conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey) global HANDLER #ppid = 0 if not hasattr(os, 'getppid') else os.getppid() - HANDLER = RemoteEventHandler(conn, name, ppid) + HANDLER = RemoteEventHandler(conn, name, ppid, debug=debug) while True: try: HANDLER.processRequests() # exception raised when the loop should exit @@ -329,7 +356,7 @@ class QtProcess(Process): except ClosedError: self.timer.stop() -def startQtEventLoop(name, port, authkey, ppid): +def startQtEventLoop(name, port, authkey, ppid, debug=False): conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey) from pyqtgraph.Qt import QtGui, QtCore #from PyQt4 import QtGui, QtCore @@ -342,7 +369,7 @@ def startQtEventLoop(name, port, authkey, ppid): global HANDLER #ppid = 0 if not hasattr(os, 'getppid') else os.getppid() - HANDLER = RemoteQtEventHandler(conn, name, ppid) + HANDLER = RemoteQtEventHandler(conn, name, ppid, debug=debug) HANDLER.startEventTimer() app.exec_() diff --git a/pyqtgraph/multiprocess/remoteproxy.py b/pyqtgraph/multiprocess/remoteproxy.py index 096f2006..6cd65f6e 100644 --- a/pyqtgraph/multiprocess/remoteproxy.py +++ b/pyqtgraph/multiprocess/remoteproxy.py @@ -42,7 +42,8 @@ class RemoteEventHandler(object): handlers = {} ## maps {process ID : handler}. This allows unpickler to determine which process ## an object proxy belongs to - def __init__(self, connection, name, pid): + def __init__(self, connection, name, pid, debug=False): + self.debug = debug self.conn = connection self.name = name self.results = {} ## reqId: (status, result); cache of request results received from the remote process @@ -76,6 +77,11 @@ class RemoteEventHandler(object): print(pid, cls.handlers) raise + def debugMsg(self, msg): + if not self.debug: + return + print("[%d] %s" % (os.getpid(), str(msg))) + def getProxyOption(self, opt): return self.proxyOptions[opt] @@ -91,7 +97,9 @@ class RemoteEventHandler(object): after no more events are immediately available. (non-blocking) Returns the number of events processed. """ + self.debugMsg('processRequests:') if self.exited: + self.debugMsg(' processRequests: exited already; raise ClosedError.') raise ClosedError() numProcessed = 0 @@ -100,37 +108,64 @@ class RemoteEventHandler(object): self.handleRequest() numProcessed += 1 except ClosedError: + self.debugMsg(' processRequests: got ClosedError from handleRequest; setting exited=True.') self.exited = True raise - except IOError as err: - if err.errno == 4: ## interrupted system call; try again - continue - else: - raise + #except IOError as err: ## let handleRequest take care of this. + #self.debugMsg(' got IOError from handleRequest; try again.') + #if err.errno == 4: ## interrupted system call; try again + #continue + #else: + #raise except: print("Error in process %s" % self.name) sys.excepthook(*sys.exc_info()) + self.debugMsg(' processRequests: finished %d requests' % numProcessed) return numProcessed def handleRequest(self): """Handle a single request from the remote process. Blocks until a request is available.""" result = None - try: - cmd, reqId, nByteMsgs, optStr = self.conn.recv() ## args, kwds are double-pickled to ensure this recv() call never fails - except (EOFError, IOError): - ## remote process has shut down; end event loop - raise ClosedError() - #print os.getpid(), "received request:", cmd, reqId + while True: + try: + ## args, kwds are double-pickled to ensure this recv() call never fails + cmd, reqId, nByteMsgs, optStr = self.conn.recv() + break + except EOFError: + self.debugMsg(' handleRequest: got EOFError from recv; raise ClosedError.') + ## remote process has shut down; end event loop + raise ClosedError() + except IOError as err: + if err.errno == 4: ## interrupted system call; try again + self.debugMsg(' handleRequest: got IOError 4 from recv; try again.') + continue + else: + self.debugMsg(' handleRequest: got IOError %d from recv (%s); raise ClosedError.' % (err.errno, err.strerror)) + raise ClosedError() + + self.debugMsg(" handleRequest: received %s %s" % (str(cmd), str(reqId))) ## read byte messages following the main request byteData = [] + if nByteMsgs > 0: + self.debugMsg(" handleRequest: reading %d byte messages" % nByteMsgs) for i in range(nByteMsgs): - try: - byteData.append(self.conn.recv_bytes()) - except (EOFError, IOError): - raise ClosedError() + while True: + try: + byteData.append(self.conn.recv_bytes()) + break + except EOFError: + self.debugMsg(" handleRequest: got EOF while reading byte messages; raise ClosedError.") + raise ClosedError() + except IOError as err: + if err.errno == 4: + self.debugMsg(" handleRequest: got IOError 4 while reading byte messages; try again.") + continue + else: + self.debugMsg(" handleRequest: got IOError while reading byte messages; raise ClosedError.") + raise ClosedError() try: @@ -140,6 +175,7 @@ class RemoteEventHandler(object): ## (this is already a return from a previous request) opts = pickle.loads(optStr) + self.debugMsg(" handleRequest: id=%s opts=%s" % (str(reqId), str(opts))) #print os.getpid(), "received request:", cmd, reqId, opts returnType = opts.get('returnType', 'auto') @@ -213,6 +249,7 @@ class RemoteEventHandler(object): if reqId is not None: if exc is None: + self.debugMsg(" handleRequest: sending return value for %d: %s" % (reqId, str(result))) #print "returnValue:", returnValue, result if returnType == 'auto': result = self.autoProxy(result, self.proxyOptions['noProxyTypes']) @@ -225,6 +262,7 @@ class RemoteEventHandler(object): sys.excepthook(*sys.exc_info()) self.replyError(reqId, *sys.exc_info()) else: + self.debugMsg(" handleRequest: returning exception for %d" % reqId) self.replyError(reqId, *exc) elif exc is not None: @@ -368,13 +406,16 @@ class RemoteEventHandler(object): ## Send primary request request = (request, reqId, nByteMsgs, optStr) + self.debugMsg('send request: cmd=%s nByteMsgs=%d id=%s opts=%s' % (str(request[0]), nByteMsgs, str(reqId), str(opts))) self.conn.send(request) ## follow up by sending byte messages if byteData is not None: for obj in byteData: ## Remote process _must_ be prepared to read the same number of byte messages! self.conn.send_bytes(obj) + self.debugMsg(' sent %d byte messages' % len(byteData)) + self.debugMsg(' call sync: %s' % callSync) if callSync == 'off': return diff --git a/pyqtgraph/widgets/RemoteGraphicsView.py b/pyqtgraph/widgets/RemoteGraphicsView.py index 2dd1fe9b..cb36ba62 100644 --- a/pyqtgraph/widgets/RemoteGraphicsView.py +++ b/pyqtgraph/widgets/RemoteGraphicsView.py @@ -1,4 +1,4 @@ -from pyqtgraph.Qt import QtGui, QtCore +from pyqtgraph.Qt import QtGui, QtCore, USE_PYSIDE import pyqtgraph.multiprocess as mp import pyqtgraph as pg from .GraphicsView import GraphicsView @@ -21,13 +21,14 @@ class RemoteGraphicsView(QtGui.QWidget): self._sizeHint = (640,480) ## no clue why this is needed, but it seems to be the default sizeHint for GraphicsView. ## without it, the widget will not compete for space against another GraphicsView. QtGui.QWidget.__init__(self) - self._proc = mp.QtProcess() + self._proc = mp.QtProcess(debug=False) self.pg = self._proc._import('pyqtgraph') self.pg.setConfigOptions(**self.pg.CONFIG_OPTIONS) rpgRemote = self._proc._import('pyqtgraph.widgets.RemoteGraphicsView') self._view = rpgRemote.Renderer(*args, **kwds) self._view._setProxyOptions(deferGetattr=True) - self.setFocusPolicy(QtCore.Qt.FocusPolicy(self._view.focusPolicy())) + + self.setFocusPolicy(QtCore.Qt.StrongFocus) self.setSizePolicy(QtGui.QSizePolicy.Expanding, QtGui.QSizePolicy.Expanding) self.setMouseTracking(True) self.shm = None @@ -114,6 +115,7 @@ class RemoteGraphicsView(QtGui.QWidget): return self._proc class Renderer(GraphicsView): + ## Created by the remote process to handle render requests sceneRendered = QtCore.Signal(object) @@ -175,7 +177,12 @@ class Renderer(GraphicsView): address = ctypes.addressof(ctypes.c_char.from_buffer(self.shm, 0)) ## render the scene directly to shared memory - self.img = QtGui.QImage(address, self.width(), self.height(), QtGui.QImage.Format_ARGB32) + if USE_PYSIDE: + ch = ctypes.c_char.from_buffer(self.shm, 0) + #ch = ctypes.c_char_p(address) + self.img = QtGui.QImage(ch, self.width(), self.height(), QtGui.QImage.Format_ARGB32) + else: + self.img = QtGui.QImage(address, self.width(), self.height(), QtGui.QImage.Format_ARGB32) self.img.fill(0xffffffff) p = QtGui.QPainter(self.img) self.render(p, self.viewRect(), self.rect())