Improved performance for remote plotting:

- reduced cost of transferring arrays between processes (pickle is too slow)
  - avoid unnecessary synchronous calls

Added RemoteSpeedTest example
This commit is contained in:
Luke Campagnola 2013-01-10 16:10:27 -05:00
parent 01b8968a0a
commit 513e904a59
8 changed files with 277 additions and 30 deletions

View File

@ -0,0 +1,63 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
## Add path to library (just for examples; you do not need this)
import initExample
from pyqtgraph.Qt import QtGui, QtCore
import numpy as np
import pyqtgraph as pg
from pyqtgraph.ptime import time
#QtGui.QApplication.setGraphicsSystem('raster')
app = QtGui.QApplication([])
#mw = QtGui.QMainWindow()
#mw.resize(800,800)
p = pg.plot()
#p.setRange(QtCore.QRectF(0, -10, 5000, 20))
p.setLabel('bottom', 'Index', units='B')
nPlots = 10
#curves = [p.plot(pen=(i,nPlots*1.3)) for i in range(nPlots)]
curves = [pg.PlotCurveItem(pen=(i,nPlots*1.3)) for i in range(nPlots)]
for c in curves:
p.addItem(c)
rgn = pg.LinearRegionItem([1,100])
p.addItem(rgn)
data = np.random.normal(size=(53,5000/nPlots))
ptr = 0
lastTime = time()
fps = None
count = 0
def update():
global curve, data, ptr, p, lastTime, fps, nPlots, count
count += 1
#print "---------", count
for i in range(nPlots):
curves[i].setData(i+data[(ptr+i)%data.shape[0]])
#print " setData done."
ptr += nPlots
now = time()
dt = now - lastTime
lastTime = now
if fps is None:
fps = 1.0/dt
else:
s = np.clip(dt*3., 0, 1)
fps = fps * (1-s) + (1.0/dt) * s
p.setTitle('%0.2f fps' % fps)
#app.processEvents() ## force complete redraw for every plot
timer = QtCore.QTimer()
timer.timeout.connect(update)
timer.start(0)
## Start Qt event loop unless running in interactive mode.
if __name__ == '__main__':
import sys
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()

View File

@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-
"""
This example demonstrates the use of RemoteGraphicsView to improve performance in
applications with heavy load. It works by starting a second process to handle
all graphics rendering, thus freeing up the main process to do its work.
In this example, the update() function is very expensive and is called frequently.
After update() generates a new set of data, it can either plot directly to a local
plot (bottom) or remotely via a RemoteGraphicsView (top), allowing speed comparison
between the two cases. IF you have a multi-core CPU, it should be obvious that the
remote case is much faster.
"""
import initExample ## Add path to library (just for examples; you do not need this)
from pyqtgraph.Qt import QtGui, QtCore
import pyqtgraph as pg
import pyqtgraph.widgets.RemoteGraphicsView
import numpy as np
app = pg.mkQApp()
view = pg.widgets.RemoteGraphicsView.RemoteGraphicsView()
pg.setConfigOptions(antialias=True) ## this will be expensive for the local plot
view.pg.setConfigOptions(antialias=True) ## prettier plots at no cost to the main process!
label = QtGui.QLabel()
rcheck = QtGui.QCheckBox('plot remote')
rcheck.setChecked(True)
lcheck = QtGui.QCheckBox('plot local')
lplt = pg.PlotWidget()
layout = pg.LayoutWidget()
layout.addWidget(rcheck)
layout.addWidget(lcheck)
layout.addWidget(label)
layout.addWidget(view, row=1, col=0, colspan=3)
layout.addWidget(lplt, row=2, col=0, colspan=3)
layout.resize(800,800)
layout.show()
## Create a PlotItem in the remote process that will be displayed locally
rplt = view.pg.PlotItem()
rplt._setProxyOptions(deferGetattr=True) ## speeds up access to rplt.plot
view.setCentralItem(rplt)
lastUpdate = pg.ptime.time()
avgFps = 0.0
def update():
global check, label, plt, lastUpdate, avgFps, rpltfunc
data = np.random.normal(size=(10000,50)).sum(axis=1)
data += 5 * np.sin(np.linspace(0, 10, data.shape[0]))
if rcheck.isChecked():
rplt.plot(data, clear=True, _callSync='off') ## We do not expect a return value.
## By turning off callSync, we tell
## the proxy that it does not need to
## wait for a reply from the remote
## process.
if lcheck.isChecked():
lplt.plot(data, clear=True)
now = pg.ptime.time()
fps = 1.0 / (now - lastUpdate)
lastUpdate = now
avgFps = avgFps * 0.8 + fps * 0.2
label.setText("Generating %0.2f fps" % avgFps)
timer = QtCore.QTimer()
timer.timeout.connect(update)
timer.start(0)
## Start Qt event loop unless running in interactive mode or using pyside.
if __name__ == '__main__':
import sys
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()

View File

@ -22,6 +22,7 @@ examples = OrderedDict([
('Dock widgets', 'dockarea.py'),
('Console', 'ConsoleWidget.py'),
('Histograms', 'histogram.py'),
('Remote Plotting', 'RemoteSpeedTest.py'),
('GraphicsItems', OrderedDict([
('Scatter Plot', 'ScatterPlot.py'),
#('PlotItem', 'PlotItem.py'),

View File

@ -43,6 +43,9 @@ from pyqtgraph.Qt import QtCore, QtGui
app = pg.QtGui.QApplication([])
print "\n=================\nStart QtProcess"
import sys
if (sys.flags.interactive != 1):
print " (not interactive; remote process will exit immediately.)"
proc = mp.QtProcess()
d1 = proc.transfer(np.random.normal(size=1000))
d2 = proc.transfer(np.random.normal(size=1000))

View File

@ -1,6 +1,6 @@
import os, sys, time, multiprocessing, re
from processes import ForkedProcess
from remoteproxy import ExitError
from remoteproxy import ClosedError
class CanceledError(Exception):
"""Raised when the progress dialog is canceled during a processing operation."""
@ -152,7 +152,7 @@ class Parallelize(object):
n = ch.processRequests()
if n > 0:
waitingChildren += 1
except ExitError:
except ClosedError:
#print ch.childPid, 'process finished'
rem.append(ch)
if self.showProgress:

View File

@ -1,9 +1,9 @@
from remoteproxy import RemoteEventHandler, ExitError, NoResultError, LocalObjectProxy, ObjectProxy
from remoteproxy import RemoteEventHandler, ClosedError, NoResultError, LocalObjectProxy, ObjectProxy
import subprocess, atexit, os, sys, time, random, socket, signal
import cPickle as pickle
import multiprocessing.connection
__all__ = ['Process', 'QtProcess', 'ForkedProcess', 'ExitError', 'NoResultError']
__all__ = ['Process', 'QtProcess', 'ForkedProcess', 'ClosedError', 'NoResultError']
class Process(RemoteEventHandler):
"""
@ -100,7 +100,7 @@ def startEventLoop(name, port, authkey):
try:
HANDLER.processRequests() # exception raised when the loop should exit
time.sleep(0.01)
except ExitError:
except ClosedError:
break
@ -225,7 +225,7 @@ class ForkedProcess(RemoteEventHandler):
try:
self.processRequests() # exception raised when the loop should exit
time.sleep(0.01)
except ExitError:
except ClosedError:
break
except:
print "Error occurred in forked event loop:"
@ -267,11 +267,11 @@ class RemoteQtEventHandler(RemoteEventHandler):
def processRequests(self):
try:
RemoteEventHandler.processRequests(self)
except ExitError:
except ClosedError:
from pyqtgraph.Qt import QtGui, QtCore
QtGui.QApplication.instance().quit()
self.timer.stop()
#raise
#raise SystemExit
class QtProcess(Process):
"""
@ -315,7 +315,7 @@ class QtProcess(Process):
def processRequests(self):
try:
Process.processRequests(self)
except ExitError:
except ClosedError:
self.timer.stop()
def startQtEventLoop(name, port, authkey):

View File

@ -1,10 +1,15 @@
import os, __builtin__, time, sys, traceback, weakref
import cPickle as pickle
import numpy as np
class ExitError(Exception):
class ClosedError(Exception):
"""Raised when an event handler receives a request to close the connection
or discovers that the connection has been closed."""
pass
class NoResultError(Exception):
"""Raised when a request for the return value of a remote call fails
because the call has not yet returned."""
pass
@ -82,14 +87,14 @@ class RemoteEventHandler(object):
Returns the number of events processed.
"""
if self.exited:
raise ExitError()
raise ClosedError()
numProcessed = 0
while self.conn.poll():
try:
self.handleRequest()
numProcessed += 1
except ExitError:
except ClosedError:
self.exited = True
raise
except IOError as err:
@ -108,14 +113,20 @@ class RemoteEventHandler(object):
Blocks until a request is available."""
result = None
try:
cmd, reqId, optStr = self.conn.recv() ## args, kwds are double-pickled to ensure this recv() call never fails
except EOFError:
cmd, reqId, nByteMsgs, optStr = self.conn.recv() ## args, kwds are double-pickled to ensure this recv() call never fails
except EOFError, IOError:
## remote process has shut down; end event loop
raise ExitError()
except IOError:
raise ExitError()
raise ClosedError()
#print os.getpid(), "received request:", cmd, reqId
## read byte messages following the main request
byteData = []
for i in range(nByteMsgs):
try:
byteData.append(self.conn.recv_bytes())
except EOFError, IOError:
raise ClosedError()
try:
if cmd == 'result' or cmd == 'error':
@ -137,17 +148,36 @@ class RemoteEventHandler(object):
obj = opts['obj']
fnargs = opts['args']
fnkwds = opts['kwds']
## If arrays were sent as byte messages, they must be re-inserted into the
## arguments
if len(byteData) > 0:
for i,arg in enumerate(fnargs):
if isinstance(arg, tuple) and len(arg) > 0 and arg[0] == '__byte_message__':
ind = arg[1]
dtype, shape = arg[2]
fnargs[i] = np.fromstring(byteData[ind], dtype=dtype).reshape(shape)
for k,arg in fnkwds.items():
if isinstance(arg, tuple) and len(arg) > 0 and arg[0] == '__byte_message__':
ind = arg[1]
dtype, shape = arg[2]
fnkwds[k] = np.fromstring(byteData[ind], dtype=dtype).reshape(shape)
if len(fnkwds) == 0: ## need to do this because some functions do not allow keyword arguments.
#print obj, fnargs
result = obj(*fnargs)
else:
result = obj(*fnargs, **fnkwds)
elif cmd == 'getObjValue':
result = opts['obj'] ## has already been unpickled into its local value
returnType = 'value'
elif cmd == 'transfer':
result = opts['obj']
returnType = 'proxy'
elif cmd == 'transferArray':
## read array data from next message:
result = np.fromstring(byteData[0], dtype=opts['dtype']).reshape(opts['shape'])
returnType = 'proxy'
elif cmd == 'import':
name = opts['module']
fromlist = opts.get('fromlist', [])
@ -201,7 +231,7 @@ class RemoteEventHandler(object):
## (more importantly, do not call any code that would
## normally be invoked at exit)
else:
raise ExitError()
raise ClosedError()
@ -216,7 +246,7 @@ class RemoteEventHandler(object):
except:
self.send(request='error', reqId=reqId, callSync='off', opts=dict(exception=None, excString=excStr))
def send(self, request, opts=None, reqId=None, callSync='sync', timeout=10, returnType=None, **kwds):
def send(self, request, opts=None, reqId=None, callSync='sync', timeout=10, returnType=None, byteData=None, **kwds):
"""Send a request or return packet to the remote process.
Generally it is not necessary to call this method directly; it is for internal use.
(The docstring has information that is nevertheless useful to the programmer
@ -235,6 +265,9 @@ class RemoteEventHandler(object):
opts Extra arguments sent to the remote process that determine the way
the request will be handled (see below)
returnType 'proxy', 'value', or 'auto'
byteData If specified, this is a list of objects to be sent as byte messages
to the remote process.
This is used to send large arrays without the cost of pickling.
========== ====================================================================
Description of request strings and options allowed for each:
@ -312,7 +345,9 @@ class RemoteEventHandler(object):
if returnType is not None:
opts['returnType'] = returnType
#print "send", opts
#print os.getpid(), "send request:", request, reqId, opts
## double-pickle args to ensure that at least status and request ID get through
try:
optStr = pickle.dumps(opts)
@ -322,9 +357,19 @@ class RemoteEventHandler(object):
print "======================================="
raise
request = (request, reqId, optStr)
nByteMsgs = 0
if byteData is not None:
nByteMsgs = len(byteData)
## Send primary request
request = (request, reqId, nByteMsgs, optStr)
self.conn.send(request)
## follow up by sending byte messages
if byteData is not None:
for obj in byteData: ## Remote process _must_ be prepared to read the same number of byte messages!
self.conn.send_bytes(obj)
if callSync == 'off':
return
@ -345,10 +390,10 @@ class RemoteEventHandler(object):
## raises NoResultError if the result is not available yet
#print self.results.keys(), os.getpid()
if reqId not in self.results:
#self.readPipe()
try:
self.processRequests()
except ExitError:
except ClosedError: ## even if remote connection has closed, we may have
## received new data during this call to processRequests()
pass
if reqId not in self.results:
raise NoResultError()
@ -393,17 +438,33 @@ class RemoteEventHandler(object):
def callObj(self, obj, args, kwds, **opts):
opts = opts.copy()
args = list(args)
## Decide whether to send arguments by value or by proxy
noProxyTypes = opts.pop('noProxyTypes', None)
if noProxyTypes is None:
noProxyTypes = self.proxyOptions['noProxyTypes']
autoProxy = opts.pop('autoProxy', self.proxyOptions['autoProxy'])
autoProxy = opts.pop('autoProxy', self.proxyOptions['autoProxy'])
if autoProxy is True:
args = tuple([self.autoProxy(v, noProxyTypes) for v in args])
args = [self.autoProxy(v, noProxyTypes) for v in args]
for k, v in kwds.iteritems():
opts[k] = self.autoProxy(v, noProxyTypes)
return self.send(request='callObj', opts=dict(obj=obj, args=args, kwds=kwds), **opts)
byteMsgs = []
## If there are arrays in the arguments, send those as byte messages.
## We do this because pickling arrays is too expensive.
for i,arg in enumerate(args):
if arg.__class__ == np.ndarray:
args[i] = ("__byte_message__", len(byteMsgs), (arg.dtype, arg.shape))
byteMsgs.append(arg)
for k,v in kwds.items():
if v.__class__ == np.ndarray:
kwds[k] = ("__byte_message__", len(byteMsgs), (v.dtype, v.shape))
byteMsgs.append(v)
return self.send(request='callObj', opts=dict(obj=obj, args=args, kwds=kwds), byteData=byteMsgs, **opts)
def registerProxy(self, proxy):
ref = weakref.ref(proxy, self.deleteProxy)
@ -421,7 +482,11 @@ class RemoteEventHandler(object):
Transfer an object by value to the remote host (the object must be picklable)
and return a proxy for the new remote object.
"""
return self.send(request='transfer', opts=dict(obj=obj), **kwds)
if obj.__class__ is np.ndarray:
opts = {'dtype': obj.dtype, 'shape': obj.shape}
return self.send(request='transferArray', opts=opts, byteData=[obj], **kwds)
else:
return self.send(request='transfer', opts=dict(obj=obj), **kwds)
def autoProxy(self, obj, noProxyTypes):
## Return object wrapped in LocalObjectProxy _unless_ its type is in noProxyTypes.
@ -453,6 +518,8 @@ class Request(object):
If block is True, wait until the result has arrived or *timeout* seconds passes.
If the timeout is reached, raise NoResultError. (use timeout=None to disable)
If block is False, raise NoResultError immediately if the result has not arrived yet.
If the process's connection has closed before the result arrives, raise ClosedError.
"""
if self.gotResult:
@ -464,6 +531,8 @@ class Request(object):
if block:
start = time.time()
while not self.hasResult():
if self.proc.exited:
raise ClosedError()
time.sleep(0.005)
if timeout >= 0 and time.time() - start > timeout:
print "Request timed out:", self.description

View File

@ -18,6 +18,8 @@ class RemoteGraphicsView(QtGui.QWidget):
def __init__(self, parent=None, *args, **kwds):
self._img = None
self._imgReq = None
self._sizeHint = (640,480) ## no clue why this is needed, but it seems to be the default sizeHint for GraphicsView.
## without it, the widget will not compete for space against another GraphicsView.
QtGui.QWidget.__init__(self)
self._proc = mp.QtProcess()
self.pg = self._proc._import('pyqtgraph')
@ -26,12 +28,18 @@ class RemoteGraphicsView(QtGui.QWidget):
self._view = rpgRemote.Renderer(*args, **kwds)
self._view._setProxyOptions(deferGetattr=True)
self.setFocusPolicy(self._view.focusPolicy())
self.setSizePolicy(QtGui.QSizePolicy.Expanding, QtGui.QSizePolicy.Expanding)
self.setMouseTracking(True)
shmFileName = self._view.shmFileName()
self.shmFile = open(shmFileName, 'r')
self.shm = mmap.mmap(self.shmFile.fileno(), mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_READ)
self._view.sceneRendered.connect(mp.proxy(self.remoteSceneChanged))
self._view.sceneRendered.connect(mp.proxy(self.remoteSceneChanged)) #, callSync='off'))
## Note: we need synchronous signals
## even though there is no return value--
## this informs the renderer that it is
## safe to begin rendering again.
for method in ['scene', 'setCentralItem']:
setattr(self, method, getattr(self._view, method))
@ -41,8 +49,12 @@ class RemoteGraphicsView(QtGui.QWidget):
self._view.resize(self.size(), _callSync='off')
return ret
def sizeHint(self):
return QtCore.QSize(*self._sizeHint)
def remoteSceneChanged(self, data):
w, h, size = data
#self._sizeHint = (whint, hhint)
if self.shm.size != size:
self.shm.close()
self.shm = mmap.mmap(self.shmFile.fileno(), size, mmap.MAP_SHARED, mmap.PROT_READ)
@ -82,7 +94,17 @@ class RemoteGraphicsView(QtGui.QWidget):
ev.accept()
return QtGui.QWidget.keyEvent(self, ev)
def enterEvent(self, ev):
self._view.enterEvent(ev.type(), _callSync='off')
return QtGui.QWidget.enterEvent(self, ev)
def leaveEvent(self, ev):
self._view.leaveEvent(ev.type(), _callSync='off')
return QtGui.QWidget.leaveEvent(self, ev)
def remoteProcess(self):
"""Return the remote process handle. (see multiprocess.remoteproxy.RemoteEventHandler)"""
return self._proc
class Renderer(GraphicsView):
@ -126,6 +148,8 @@ class Renderer(GraphicsView):
def renderView(self):
if self.img is None:
## make sure shm is large enough and get its address
if self.width() == 0 or self.height() == 0:
return
size = self.width() * self.height() * 4
if size > self.shm.size():
self.shm.resize(size)
@ -168,5 +192,14 @@ class Renderer(GraphicsView):
GraphicsView.keyEvent(self, QtGui.QKeyEvent(typ, mods, text, autorep, count))
return ev.accepted()
def enterEvent(self, typ):
ev = QtCore.QEvent(QtCore.QEvent.Type(typ))
return GraphicsView.enterEvent(self, ev)
def leaveEvent(self, typ):
ev = QtCore.QEvent(QtCore.QEvent.Type(typ))
return GraphicsView.leaveEvent(self, ev)