diff --git a/examples/MultiPlotSpeedTest.py b/examples/MultiPlotSpeedTest.py new file mode 100644 index 00000000..e25de42e --- /dev/null +++ b/examples/MultiPlotSpeedTest.py @@ -0,0 +1,63 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +## Add path to library (just for examples; you do not need this) +import initExample + + +from pyqtgraph.Qt import QtGui, QtCore +import numpy as np +import pyqtgraph as pg +from pyqtgraph.ptime import time +#QtGui.QApplication.setGraphicsSystem('raster') +app = QtGui.QApplication([]) +#mw = QtGui.QMainWindow() +#mw.resize(800,800) + +p = pg.plot() +#p.setRange(QtCore.QRectF(0, -10, 5000, 20)) +p.setLabel('bottom', 'Index', units='B') + +nPlots = 10 +#curves = [p.plot(pen=(i,nPlots*1.3)) for i in range(nPlots)] +curves = [pg.PlotCurveItem(pen=(i,nPlots*1.3)) for i in range(nPlots)] +for c in curves: + p.addItem(c) + +rgn = pg.LinearRegionItem([1,100]) +p.addItem(rgn) + + +data = np.random.normal(size=(53,5000/nPlots)) +ptr = 0 +lastTime = time() +fps = None +count = 0 +def update(): + global curve, data, ptr, p, lastTime, fps, nPlots, count + count += 1 + #print "---------", count + for i in range(nPlots): + curves[i].setData(i+data[(ptr+i)%data.shape[0]]) + #print " setData done." + ptr += nPlots + now = time() + dt = now - lastTime + lastTime = now + if fps is None: + fps = 1.0/dt + else: + s = np.clip(dt*3., 0, 1) + fps = fps * (1-s) + (1.0/dt) * s + p.setTitle('%0.2f fps' % fps) + #app.processEvents() ## force complete redraw for every plot +timer = QtCore.QTimer() +timer.timeout.connect(update) +timer.start(0) + + + +## Start Qt event loop unless running in interactive mode. +if __name__ == '__main__': + import sys + if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'): + QtGui.QApplication.instance().exec_() diff --git a/examples/RemoteSpeedTest.py b/examples/RemoteSpeedTest.py new file mode 100644 index 00000000..b3415a9d --- /dev/null +++ b/examples/RemoteSpeedTest.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +""" +This example demonstrates the use of RemoteGraphicsView to improve performance in +applications with heavy load. It works by starting a second process to handle +all graphics rendering, thus freeing up the main process to do its work. + +In this example, the update() function is very expensive and is called frequently. +After update() generates a new set of data, it can either plot directly to a local +plot (bottom) or remotely via a RemoteGraphicsView (top), allowing speed comparison +between the two cases. IF you have a multi-core CPU, it should be obvious that the +remote case is much faster. +""" + +import initExample ## Add path to library (just for examples; you do not need this) +from pyqtgraph.Qt import QtGui, QtCore +import pyqtgraph as pg +import pyqtgraph.widgets.RemoteGraphicsView +import numpy as np + +app = pg.mkQApp() + +view = pg.widgets.RemoteGraphicsView.RemoteGraphicsView() +pg.setConfigOptions(antialias=True) ## this will be expensive for the local plot +view.pg.setConfigOptions(antialias=True) ## prettier plots at no cost to the main process! + +label = QtGui.QLabel() +rcheck = QtGui.QCheckBox('plot remote') +rcheck.setChecked(True) +lcheck = QtGui.QCheckBox('plot local') +lplt = pg.PlotWidget() +layout = pg.LayoutWidget() +layout.addWidget(rcheck) +layout.addWidget(lcheck) +layout.addWidget(label) +layout.addWidget(view, row=1, col=0, colspan=3) +layout.addWidget(lplt, row=2, col=0, colspan=3) +layout.resize(800,800) +layout.show() + +## Create a PlotItem in the remote process that will be displayed locally +rplt = view.pg.PlotItem() +rplt._setProxyOptions(deferGetattr=True) ## speeds up access to rplt.plot +view.setCentralItem(rplt) + +lastUpdate = pg.ptime.time() +avgFps = 0.0 + +def update(): + global check, label, plt, lastUpdate, avgFps, rpltfunc + data = np.random.normal(size=(10000,50)).sum(axis=1) + data += 5 * np.sin(np.linspace(0, 10, data.shape[0])) + + if rcheck.isChecked(): + rplt.plot(data, clear=True, _callSync='off') ## We do not expect a return value. + ## By turning off callSync, we tell + ## the proxy that it does not need to + ## wait for a reply from the remote + ## process. + if lcheck.isChecked(): + lplt.plot(data, clear=True) + + now = pg.ptime.time() + fps = 1.0 / (now - lastUpdate) + lastUpdate = now + avgFps = avgFps * 0.8 + fps * 0.2 + label.setText("Generating %0.2f fps" % avgFps) + +timer = QtCore.QTimer() +timer.timeout.connect(update) +timer.start(0) + + + +## Start Qt event loop unless running in interactive mode or using pyside. +if __name__ == '__main__': + import sys + if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'): + QtGui.QApplication.instance().exec_() diff --git a/examples/__main__.py b/examples/__main__.py index e234a9da..40fbb5e9 100644 --- a/examples/__main__.py +++ b/examples/__main__.py @@ -22,6 +22,7 @@ examples = OrderedDict([ ('Dock widgets', 'dockarea.py'), ('Console', 'ConsoleWidget.py'), ('Histograms', 'histogram.py'), + ('Remote Plotting', 'RemoteSpeedTest.py'), ('GraphicsItems', OrderedDict([ ('Scatter Plot', 'ScatterPlot.py'), #('PlotItem', 'PlotItem.py'), diff --git a/examples/multiprocess.py b/examples/multiprocess.py index 0b2d7ed8..f6756345 100644 --- a/examples/multiprocess.py +++ b/examples/multiprocess.py @@ -43,6 +43,9 @@ from pyqtgraph.Qt import QtCore, QtGui app = pg.QtGui.QApplication([]) print "\n=================\nStart QtProcess" +import sys +if (sys.flags.interactive != 1): + print " (not interactive; remote process will exit immediately.)" proc = mp.QtProcess() d1 = proc.transfer(np.random.normal(size=1000)) d2 = proc.transfer(np.random.normal(size=1000)) diff --git a/pyqtgraph/multiprocess/parallelizer.py b/pyqtgraph/multiprocess/parallelizer.py index 0d5d6f5c..2d03c000 100644 --- a/pyqtgraph/multiprocess/parallelizer.py +++ b/pyqtgraph/multiprocess/parallelizer.py @@ -1,6 +1,6 @@ import os, sys, time, multiprocessing, re from processes import ForkedProcess -from remoteproxy import ExitError +from remoteproxy import ClosedError class CanceledError(Exception): """Raised when the progress dialog is canceled during a processing operation.""" @@ -152,7 +152,7 @@ class Parallelize(object): n = ch.processRequests() if n > 0: waitingChildren += 1 - except ExitError: + except ClosedError: #print ch.childPid, 'process finished' rem.append(ch) if self.showProgress: diff --git a/pyqtgraph/multiprocess/processes.py b/pyqtgraph/multiprocess/processes.py index a4769679..f95a3ec4 100644 --- a/pyqtgraph/multiprocess/processes.py +++ b/pyqtgraph/multiprocess/processes.py @@ -1,9 +1,9 @@ -from remoteproxy import RemoteEventHandler, ExitError, NoResultError, LocalObjectProxy, ObjectProxy +from remoteproxy import RemoteEventHandler, ClosedError, NoResultError, LocalObjectProxy, ObjectProxy import subprocess, atexit, os, sys, time, random, socket, signal import cPickle as pickle import multiprocessing.connection -__all__ = ['Process', 'QtProcess', 'ForkedProcess', 'ExitError', 'NoResultError'] +__all__ = ['Process', 'QtProcess', 'ForkedProcess', 'ClosedError', 'NoResultError'] class Process(RemoteEventHandler): """ @@ -100,7 +100,7 @@ def startEventLoop(name, port, authkey): try: HANDLER.processRequests() # exception raised when the loop should exit time.sleep(0.01) - except ExitError: + except ClosedError: break @@ -225,7 +225,7 @@ class ForkedProcess(RemoteEventHandler): try: self.processRequests() # exception raised when the loop should exit time.sleep(0.01) - except ExitError: + except ClosedError: break except: print "Error occurred in forked event loop:" @@ -267,11 +267,11 @@ class RemoteQtEventHandler(RemoteEventHandler): def processRequests(self): try: RemoteEventHandler.processRequests(self) - except ExitError: + except ClosedError: from pyqtgraph.Qt import QtGui, QtCore QtGui.QApplication.instance().quit() self.timer.stop() - #raise + #raise SystemExit class QtProcess(Process): """ @@ -315,7 +315,7 @@ class QtProcess(Process): def processRequests(self): try: Process.processRequests(self) - except ExitError: + except ClosedError: self.timer.stop() def startQtEventLoop(name, port, authkey): diff --git a/pyqtgraph/multiprocess/remoteproxy.py b/pyqtgraph/multiprocess/remoteproxy.py index f8da1bd7..ee5a0d6c 100644 --- a/pyqtgraph/multiprocess/remoteproxy.py +++ b/pyqtgraph/multiprocess/remoteproxy.py @@ -1,10 +1,15 @@ import os, __builtin__, time, sys, traceback, weakref import cPickle as pickle +import numpy as np -class ExitError(Exception): +class ClosedError(Exception): + """Raised when an event handler receives a request to close the connection + or discovers that the connection has been closed.""" pass class NoResultError(Exception): + """Raised when a request for the return value of a remote call fails + because the call has not yet returned.""" pass @@ -82,14 +87,14 @@ class RemoteEventHandler(object): Returns the number of events processed. """ if self.exited: - raise ExitError() + raise ClosedError() numProcessed = 0 while self.conn.poll(): try: self.handleRequest() numProcessed += 1 - except ExitError: + except ClosedError: self.exited = True raise except IOError as err: @@ -108,14 +113,20 @@ class RemoteEventHandler(object): Blocks until a request is available.""" result = None try: - cmd, reqId, optStr = self.conn.recv() ## args, kwds are double-pickled to ensure this recv() call never fails - except EOFError: + cmd, reqId, nByteMsgs, optStr = self.conn.recv() ## args, kwds are double-pickled to ensure this recv() call never fails + except EOFError, IOError: ## remote process has shut down; end event loop - raise ExitError() - except IOError: - raise ExitError() + raise ClosedError() #print os.getpid(), "received request:", cmd, reqId + ## read byte messages following the main request + byteData = [] + for i in range(nByteMsgs): + try: + byteData.append(self.conn.recv_bytes()) + except EOFError, IOError: + raise ClosedError() + try: if cmd == 'result' or cmd == 'error': @@ -137,17 +148,36 @@ class RemoteEventHandler(object): obj = opts['obj'] fnargs = opts['args'] fnkwds = opts['kwds'] + + ## If arrays were sent as byte messages, they must be re-inserted into the + ## arguments + if len(byteData) > 0: + for i,arg in enumerate(fnargs): + if isinstance(arg, tuple) and len(arg) > 0 and arg[0] == '__byte_message__': + ind = arg[1] + dtype, shape = arg[2] + fnargs[i] = np.fromstring(byteData[ind], dtype=dtype).reshape(shape) + for k,arg in fnkwds.items(): + if isinstance(arg, tuple) and len(arg) > 0 and arg[0] == '__byte_message__': + ind = arg[1] + dtype, shape = arg[2] + fnkwds[k] = np.fromstring(byteData[ind], dtype=dtype).reshape(shape) + if len(fnkwds) == 0: ## need to do this because some functions do not allow keyword arguments. - #print obj, fnargs result = obj(*fnargs) else: result = obj(*fnargs, **fnkwds) + elif cmd == 'getObjValue': result = opts['obj'] ## has already been unpickled into its local value returnType = 'value' elif cmd == 'transfer': result = opts['obj'] returnType = 'proxy' + elif cmd == 'transferArray': + ## read array data from next message: + result = np.fromstring(byteData[0], dtype=opts['dtype']).reshape(opts['shape']) + returnType = 'proxy' elif cmd == 'import': name = opts['module'] fromlist = opts.get('fromlist', []) @@ -201,7 +231,7 @@ class RemoteEventHandler(object): ## (more importantly, do not call any code that would ## normally be invoked at exit) else: - raise ExitError() + raise ClosedError() @@ -216,7 +246,7 @@ class RemoteEventHandler(object): except: self.send(request='error', reqId=reqId, callSync='off', opts=dict(exception=None, excString=excStr)) - def send(self, request, opts=None, reqId=None, callSync='sync', timeout=10, returnType=None, **kwds): + def send(self, request, opts=None, reqId=None, callSync='sync', timeout=10, returnType=None, byteData=None, **kwds): """Send a request or return packet to the remote process. Generally it is not necessary to call this method directly; it is for internal use. (The docstring has information that is nevertheless useful to the programmer @@ -235,6 +265,9 @@ class RemoteEventHandler(object): opts Extra arguments sent to the remote process that determine the way the request will be handled (see below) returnType 'proxy', 'value', or 'auto' + byteData If specified, this is a list of objects to be sent as byte messages + to the remote process. + This is used to send large arrays without the cost of pickling. ========== ==================================================================== Description of request strings and options allowed for each: @@ -312,7 +345,9 @@ class RemoteEventHandler(object): if returnType is not None: opts['returnType'] = returnType - #print "send", opts + + #print os.getpid(), "send request:", request, reqId, opts + ## double-pickle args to ensure that at least status and request ID get through try: optStr = pickle.dumps(opts) @@ -322,9 +357,19 @@ class RemoteEventHandler(object): print "=======================================" raise - request = (request, reqId, optStr) + nByteMsgs = 0 + if byteData is not None: + nByteMsgs = len(byteData) + + ## Send primary request + request = (request, reqId, nByteMsgs, optStr) self.conn.send(request) + ## follow up by sending byte messages + if byteData is not None: + for obj in byteData: ## Remote process _must_ be prepared to read the same number of byte messages! + self.conn.send_bytes(obj) + if callSync == 'off': return @@ -345,10 +390,10 @@ class RemoteEventHandler(object): ## raises NoResultError if the result is not available yet #print self.results.keys(), os.getpid() if reqId not in self.results: - #self.readPipe() try: self.processRequests() - except ExitError: + except ClosedError: ## even if remote connection has closed, we may have + ## received new data during this call to processRequests() pass if reqId not in self.results: raise NoResultError() @@ -393,17 +438,33 @@ class RemoteEventHandler(object): def callObj(self, obj, args, kwds, **opts): opts = opts.copy() + args = list(args) + + ## Decide whether to send arguments by value or by proxy noProxyTypes = opts.pop('noProxyTypes', None) if noProxyTypes is None: noProxyTypes = self.proxyOptions['noProxyTypes'] - autoProxy = opts.pop('autoProxy', self.proxyOptions['autoProxy']) + autoProxy = opts.pop('autoProxy', self.proxyOptions['autoProxy']) if autoProxy is True: - args = tuple([self.autoProxy(v, noProxyTypes) for v in args]) + args = [self.autoProxy(v, noProxyTypes) for v in args] for k, v in kwds.iteritems(): opts[k] = self.autoProxy(v, noProxyTypes) - return self.send(request='callObj', opts=dict(obj=obj, args=args, kwds=kwds), **opts) + byteMsgs = [] + + ## If there are arrays in the arguments, send those as byte messages. + ## We do this because pickling arrays is too expensive. + for i,arg in enumerate(args): + if arg.__class__ == np.ndarray: + args[i] = ("__byte_message__", len(byteMsgs), (arg.dtype, arg.shape)) + byteMsgs.append(arg) + for k,v in kwds.items(): + if v.__class__ == np.ndarray: + kwds[k] = ("__byte_message__", len(byteMsgs), (v.dtype, v.shape)) + byteMsgs.append(v) + + return self.send(request='callObj', opts=dict(obj=obj, args=args, kwds=kwds), byteData=byteMsgs, **opts) def registerProxy(self, proxy): ref = weakref.ref(proxy, self.deleteProxy) @@ -421,7 +482,11 @@ class RemoteEventHandler(object): Transfer an object by value to the remote host (the object must be picklable) and return a proxy for the new remote object. """ - return self.send(request='transfer', opts=dict(obj=obj), **kwds) + if obj.__class__ is np.ndarray: + opts = {'dtype': obj.dtype, 'shape': obj.shape} + return self.send(request='transferArray', opts=opts, byteData=[obj], **kwds) + else: + return self.send(request='transfer', opts=dict(obj=obj), **kwds) def autoProxy(self, obj, noProxyTypes): ## Return object wrapped in LocalObjectProxy _unless_ its type is in noProxyTypes. @@ -453,6 +518,8 @@ class Request(object): If block is True, wait until the result has arrived or *timeout* seconds passes. If the timeout is reached, raise NoResultError. (use timeout=None to disable) If block is False, raise NoResultError immediately if the result has not arrived yet. + + If the process's connection has closed before the result arrives, raise ClosedError. """ if self.gotResult: @@ -464,6 +531,8 @@ class Request(object): if block: start = time.time() while not self.hasResult(): + if self.proc.exited: + raise ClosedError() time.sleep(0.005) if timeout >= 0 and time.time() - start > timeout: print "Request timed out:", self.description diff --git a/pyqtgraph/widgets/RemoteGraphicsView.py b/pyqtgraph/widgets/RemoteGraphicsView.py index 3752a6bb..d8e720b5 100644 --- a/pyqtgraph/widgets/RemoteGraphicsView.py +++ b/pyqtgraph/widgets/RemoteGraphicsView.py @@ -18,6 +18,8 @@ class RemoteGraphicsView(QtGui.QWidget): def __init__(self, parent=None, *args, **kwds): self._img = None self._imgReq = None + self._sizeHint = (640,480) ## no clue why this is needed, but it seems to be the default sizeHint for GraphicsView. + ## without it, the widget will not compete for space against another GraphicsView. QtGui.QWidget.__init__(self) self._proc = mp.QtProcess() self.pg = self._proc._import('pyqtgraph') @@ -26,12 +28,18 @@ class RemoteGraphicsView(QtGui.QWidget): self._view = rpgRemote.Renderer(*args, **kwds) self._view._setProxyOptions(deferGetattr=True) self.setFocusPolicy(self._view.focusPolicy()) + self.setSizePolicy(QtGui.QSizePolicy.Expanding, QtGui.QSizePolicy.Expanding) + self.setMouseTracking(True) shmFileName = self._view.shmFileName() self.shmFile = open(shmFileName, 'r') self.shm = mmap.mmap(self.shmFile.fileno(), mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_READ) - self._view.sceneRendered.connect(mp.proxy(self.remoteSceneChanged)) + self._view.sceneRendered.connect(mp.proxy(self.remoteSceneChanged)) #, callSync='off')) + ## Note: we need synchronous signals + ## even though there is no return value-- + ## this informs the renderer that it is + ## safe to begin rendering again. for method in ['scene', 'setCentralItem']: setattr(self, method, getattr(self._view, method)) @@ -41,8 +49,12 @@ class RemoteGraphicsView(QtGui.QWidget): self._view.resize(self.size(), _callSync='off') return ret + def sizeHint(self): + return QtCore.QSize(*self._sizeHint) + def remoteSceneChanged(self, data): w, h, size = data + #self._sizeHint = (whint, hhint) if self.shm.size != size: self.shm.close() self.shm = mmap.mmap(self.shmFile.fileno(), size, mmap.MAP_SHARED, mmap.PROT_READ) @@ -82,7 +94,17 @@ class RemoteGraphicsView(QtGui.QWidget): ev.accept() return QtGui.QWidget.keyEvent(self, ev) - + def enterEvent(self, ev): + self._view.enterEvent(ev.type(), _callSync='off') + return QtGui.QWidget.enterEvent(self, ev) + + def leaveEvent(self, ev): + self._view.leaveEvent(ev.type(), _callSync='off') + return QtGui.QWidget.leaveEvent(self, ev) + + def remoteProcess(self): + """Return the remote process handle. (see multiprocess.remoteproxy.RemoteEventHandler)""" + return self._proc class Renderer(GraphicsView): @@ -126,6 +148,8 @@ class Renderer(GraphicsView): def renderView(self): if self.img is None: ## make sure shm is large enough and get its address + if self.width() == 0 or self.height() == 0: + return size = self.width() * self.height() * 4 if size > self.shm.size(): self.shm.resize(size) @@ -168,5 +192,14 @@ class Renderer(GraphicsView): GraphicsView.keyEvent(self, QtGui.QKeyEvent(typ, mods, text, autorep, count)) return ev.accepted() + def enterEvent(self, typ): + ev = QtCore.QEvent(QtCore.QEvent.Type(typ)) + return GraphicsView.enterEvent(self, ev) + + def leaveEvent(self, typ): + ev = QtCore.QEvent(QtCore.QEvent.Type(typ)) + return GraphicsView.leaveEvent(self, ev) + + \ No newline at end of file