Improved performance for remote plotting:
- reduced cost of transferring arrays between processes (pickle is too slow) - avoid unnecessary synchronous calls Added RemoteSpeedTest example
This commit is contained in:
parent
8355e6148d
commit
9a1d7d74cb
@ -1,6 +1,6 @@
|
||||
import os, sys, time, multiprocessing, re
|
||||
from processes import ForkedProcess
|
||||
from remoteproxy import ExitError
|
||||
from remoteproxy import ClosedError
|
||||
|
||||
class CanceledError(Exception):
|
||||
"""Raised when the progress dialog is canceled during a processing operation."""
|
||||
@ -152,7 +152,7 @@ class Parallelize(object):
|
||||
n = ch.processRequests()
|
||||
if n > 0:
|
||||
waitingChildren += 1
|
||||
except ExitError:
|
||||
except ClosedError:
|
||||
#print ch.childPid, 'process finished'
|
||||
rem.append(ch)
|
||||
if self.showProgress:
|
||||
|
@ -1,9 +1,9 @@
|
||||
from remoteproxy import RemoteEventHandler, ExitError, NoResultError, LocalObjectProxy, ObjectProxy
|
||||
from remoteproxy import RemoteEventHandler, ClosedError, NoResultError, LocalObjectProxy, ObjectProxy
|
||||
import subprocess, atexit, os, sys, time, random, socket, signal
|
||||
import cPickle as pickle
|
||||
import multiprocessing.connection
|
||||
|
||||
__all__ = ['Process', 'QtProcess', 'ForkedProcess', 'ExitError', 'NoResultError']
|
||||
__all__ = ['Process', 'QtProcess', 'ForkedProcess', 'ClosedError', 'NoResultError']
|
||||
|
||||
class Process(RemoteEventHandler):
|
||||
"""
|
||||
@ -100,7 +100,7 @@ def startEventLoop(name, port, authkey):
|
||||
try:
|
||||
HANDLER.processRequests() # exception raised when the loop should exit
|
||||
time.sleep(0.01)
|
||||
except ExitError:
|
||||
except ClosedError:
|
||||
break
|
||||
|
||||
|
||||
@ -225,7 +225,7 @@ class ForkedProcess(RemoteEventHandler):
|
||||
try:
|
||||
self.processRequests() # exception raised when the loop should exit
|
||||
time.sleep(0.01)
|
||||
except ExitError:
|
||||
except ClosedError:
|
||||
break
|
||||
except:
|
||||
print "Error occurred in forked event loop:"
|
||||
@ -267,11 +267,11 @@ class RemoteQtEventHandler(RemoteEventHandler):
|
||||
def processRequests(self):
|
||||
try:
|
||||
RemoteEventHandler.processRequests(self)
|
||||
except ExitError:
|
||||
except ClosedError:
|
||||
from pyqtgraph.Qt import QtGui, QtCore
|
||||
QtGui.QApplication.instance().quit()
|
||||
self.timer.stop()
|
||||
#raise
|
||||
#raise SystemExit
|
||||
|
||||
class QtProcess(Process):
|
||||
"""
|
||||
@ -315,7 +315,7 @@ class QtProcess(Process):
|
||||
def processRequests(self):
|
||||
try:
|
||||
Process.processRequests(self)
|
||||
except ExitError:
|
||||
except ClosedError:
|
||||
self.timer.stop()
|
||||
|
||||
def startQtEventLoop(name, port, authkey):
|
||||
|
@ -1,10 +1,15 @@
|
||||
import os, __builtin__, time, sys, traceback, weakref
|
||||
import cPickle as pickle
|
||||
import numpy as np
|
||||
|
||||
class ExitError(Exception):
|
||||
class ClosedError(Exception):
|
||||
"""Raised when an event handler receives a request to close the connection
|
||||
or discovers that the connection has been closed."""
|
||||
pass
|
||||
|
||||
class NoResultError(Exception):
|
||||
"""Raised when a request for the return value of a remote call fails
|
||||
because the call has not yet returned."""
|
||||
pass
|
||||
|
||||
|
||||
@ -82,14 +87,14 @@ class RemoteEventHandler(object):
|
||||
Returns the number of events processed.
|
||||
"""
|
||||
if self.exited:
|
||||
raise ExitError()
|
||||
raise ClosedError()
|
||||
|
||||
numProcessed = 0
|
||||
while self.conn.poll():
|
||||
try:
|
||||
self.handleRequest()
|
||||
numProcessed += 1
|
||||
except ExitError:
|
||||
except ClosedError:
|
||||
self.exited = True
|
||||
raise
|
||||
except IOError as err:
|
||||
@ -108,14 +113,20 @@ class RemoteEventHandler(object):
|
||||
Blocks until a request is available."""
|
||||
result = None
|
||||
try:
|
||||
cmd, reqId, optStr = self.conn.recv() ## args, kwds are double-pickled to ensure this recv() call never fails
|
||||
except EOFError:
|
||||
cmd, reqId, nByteMsgs, optStr = self.conn.recv() ## args, kwds are double-pickled to ensure this recv() call never fails
|
||||
except EOFError, IOError:
|
||||
## remote process has shut down; end event loop
|
||||
raise ExitError()
|
||||
except IOError:
|
||||
raise ExitError()
|
||||
raise ClosedError()
|
||||
#print os.getpid(), "received request:", cmd, reqId
|
||||
|
||||
## read byte messages following the main request
|
||||
byteData = []
|
||||
for i in range(nByteMsgs):
|
||||
try:
|
||||
byteData.append(self.conn.recv_bytes())
|
||||
except EOFError, IOError:
|
||||
raise ClosedError()
|
||||
|
||||
|
||||
try:
|
||||
if cmd == 'result' or cmd == 'error':
|
||||
@ -137,17 +148,36 @@ class RemoteEventHandler(object):
|
||||
obj = opts['obj']
|
||||
fnargs = opts['args']
|
||||
fnkwds = opts['kwds']
|
||||
|
||||
## If arrays were sent as byte messages, they must be re-inserted into the
|
||||
## arguments
|
||||
if len(byteData) > 0:
|
||||
for i,arg in enumerate(fnargs):
|
||||
if isinstance(arg, tuple) and len(arg) > 0 and arg[0] == '__byte_message__':
|
||||
ind = arg[1]
|
||||
dtype, shape = arg[2]
|
||||
fnargs[i] = np.fromstring(byteData[ind], dtype=dtype).reshape(shape)
|
||||
for k,arg in fnkwds.items():
|
||||
if isinstance(arg, tuple) and len(arg) > 0 and arg[0] == '__byte_message__':
|
||||
ind = arg[1]
|
||||
dtype, shape = arg[2]
|
||||
fnkwds[k] = np.fromstring(byteData[ind], dtype=dtype).reshape(shape)
|
||||
|
||||
if len(fnkwds) == 0: ## need to do this because some functions do not allow keyword arguments.
|
||||
#print obj, fnargs
|
||||
result = obj(*fnargs)
|
||||
else:
|
||||
result = obj(*fnargs, **fnkwds)
|
||||
|
||||
elif cmd == 'getObjValue':
|
||||
result = opts['obj'] ## has already been unpickled into its local value
|
||||
returnType = 'value'
|
||||
elif cmd == 'transfer':
|
||||
result = opts['obj']
|
||||
returnType = 'proxy'
|
||||
elif cmd == 'transferArray':
|
||||
## read array data from next message:
|
||||
result = np.fromstring(byteData[0], dtype=opts['dtype']).reshape(opts['shape'])
|
||||
returnType = 'proxy'
|
||||
elif cmd == 'import':
|
||||
name = opts['module']
|
||||
fromlist = opts.get('fromlist', [])
|
||||
@ -201,7 +231,7 @@ class RemoteEventHandler(object):
|
||||
## (more importantly, do not call any code that would
|
||||
## normally be invoked at exit)
|
||||
else:
|
||||
raise ExitError()
|
||||
raise ClosedError()
|
||||
|
||||
|
||||
|
||||
@ -216,7 +246,7 @@ class RemoteEventHandler(object):
|
||||
except:
|
||||
self.send(request='error', reqId=reqId, callSync='off', opts=dict(exception=None, excString=excStr))
|
||||
|
||||
def send(self, request, opts=None, reqId=None, callSync='sync', timeout=10, returnType=None, **kwds):
|
||||
def send(self, request, opts=None, reqId=None, callSync='sync', timeout=10, returnType=None, byteData=None, **kwds):
|
||||
"""Send a request or return packet to the remote process.
|
||||
Generally it is not necessary to call this method directly; it is for internal use.
|
||||
(The docstring has information that is nevertheless useful to the programmer
|
||||
@ -235,6 +265,9 @@ class RemoteEventHandler(object):
|
||||
opts Extra arguments sent to the remote process that determine the way
|
||||
the request will be handled (see below)
|
||||
returnType 'proxy', 'value', or 'auto'
|
||||
byteData If specified, this is a list of objects to be sent as byte messages
|
||||
to the remote process.
|
||||
This is used to send large arrays without the cost of pickling.
|
||||
========== ====================================================================
|
||||
|
||||
Description of request strings and options allowed for each:
|
||||
@ -312,7 +345,9 @@ class RemoteEventHandler(object):
|
||||
|
||||
if returnType is not None:
|
||||
opts['returnType'] = returnType
|
||||
#print "send", opts
|
||||
|
||||
#print os.getpid(), "send request:", request, reqId, opts
|
||||
|
||||
## double-pickle args to ensure that at least status and request ID get through
|
||||
try:
|
||||
optStr = pickle.dumps(opts)
|
||||
@ -322,9 +357,19 @@ class RemoteEventHandler(object):
|
||||
print "======================================="
|
||||
raise
|
||||
|
||||
request = (request, reqId, optStr)
|
||||
nByteMsgs = 0
|
||||
if byteData is not None:
|
||||
nByteMsgs = len(byteData)
|
||||
|
||||
## Send primary request
|
||||
request = (request, reqId, nByteMsgs, optStr)
|
||||
self.conn.send(request)
|
||||
|
||||
## follow up by sending byte messages
|
||||
if byteData is not None:
|
||||
for obj in byteData: ## Remote process _must_ be prepared to read the same number of byte messages!
|
||||
self.conn.send_bytes(obj)
|
||||
|
||||
if callSync == 'off':
|
||||
return
|
||||
|
||||
@ -345,10 +390,10 @@ class RemoteEventHandler(object):
|
||||
## raises NoResultError if the result is not available yet
|
||||
#print self.results.keys(), os.getpid()
|
||||
if reqId not in self.results:
|
||||
#self.readPipe()
|
||||
try:
|
||||
self.processRequests()
|
||||
except ExitError:
|
||||
except ClosedError: ## even if remote connection has closed, we may have
|
||||
## received new data during this call to processRequests()
|
||||
pass
|
||||
if reqId not in self.results:
|
||||
raise NoResultError()
|
||||
@ -393,17 +438,33 @@ class RemoteEventHandler(object):
|
||||
|
||||
def callObj(self, obj, args, kwds, **opts):
|
||||
opts = opts.copy()
|
||||
args = list(args)
|
||||
|
||||
## Decide whether to send arguments by value or by proxy
|
||||
noProxyTypes = opts.pop('noProxyTypes', None)
|
||||
if noProxyTypes is None:
|
||||
noProxyTypes = self.proxyOptions['noProxyTypes']
|
||||
autoProxy = opts.pop('autoProxy', self.proxyOptions['autoProxy'])
|
||||
|
||||
autoProxy = opts.pop('autoProxy', self.proxyOptions['autoProxy'])
|
||||
if autoProxy is True:
|
||||
args = tuple([self.autoProxy(v, noProxyTypes) for v in args])
|
||||
args = [self.autoProxy(v, noProxyTypes) for v in args]
|
||||
for k, v in kwds.iteritems():
|
||||
opts[k] = self.autoProxy(v, noProxyTypes)
|
||||
|
||||
return self.send(request='callObj', opts=dict(obj=obj, args=args, kwds=kwds), **opts)
|
||||
byteMsgs = []
|
||||
|
||||
## If there are arrays in the arguments, send those as byte messages.
|
||||
## We do this because pickling arrays is too expensive.
|
||||
for i,arg in enumerate(args):
|
||||
if arg.__class__ == np.ndarray:
|
||||
args[i] = ("__byte_message__", len(byteMsgs), (arg.dtype, arg.shape))
|
||||
byteMsgs.append(arg)
|
||||
for k,v in kwds.items():
|
||||
if v.__class__ == np.ndarray:
|
||||
kwds[k] = ("__byte_message__", len(byteMsgs), (v.dtype, v.shape))
|
||||
byteMsgs.append(v)
|
||||
|
||||
return self.send(request='callObj', opts=dict(obj=obj, args=args, kwds=kwds), byteData=byteMsgs, **opts)
|
||||
|
||||
def registerProxy(self, proxy):
|
||||
ref = weakref.ref(proxy, self.deleteProxy)
|
||||
@ -421,7 +482,11 @@ class RemoteEventHandler(object):
|
||||
Transfer an object by value to the remote host (the object must be picklable)
|
||||
and return a proxy for the new remote object.
|
||||
"""
|
||||
return self.send(request='transfer', opts=dict(obj=obj), **kwds)
|
||||
if obj.__class__ is np.ndarray:
|
||||
opts = {'dtype': obj.dtype, 'shape': obj.shape}
|
||||
return self.send(request='transferArray', opts=opts, byteData=[obj], **kwds)
|
||||
else:
|
||||
return self.send(request='transfer', opts=dict(obj=obj), **kwds)
|
||||
|
||||
def autoProxy(self, obj, noProxyTypes):
|
||||
## Return object wrapped in LocalObjectProxy _unless_ its type is in noProxyTypes.
|
||||
@ -453,6 +518,8 @@ class Request(object):
|
||||
If block is True, wait until the result has arrived or *timeout* seconds passes.
|
||||
If the timeout is reached, raise NoResultError. (use timeout=None to disable)
|
||||
If block is False, raise NoResultError immediately if the result has not arrived yet.
|
||||
|
||||
If the process's connection has closed before the result arrives, raise ClosedError.
|
||||
"""
|
||||
|
||||
if self.gotResult:
|
||||
@ -464,6 +531,8 @@ class Request(object):
|
||||
if block:
|
||||
start = time.time()
|
||||
while not self.hasResult():
|
||||
if self.proc.exited:
|
||||
raise ClosedError()
|
||||
time.sleep(0.005)
|
||||
if timeout >= 0 and time.time() - start > timeout:
|
||||
print "Request timed out:", self.description
|
||||
|
@ -18,6 +18,8 @@ class RemoteGraphicsView(QtGui.QWidget):
|
||||
def __init__(self, parent=None, *args, **kwds):
|
||||
self._img = None
|
||||
self._imgReq = None
|
||||
self._sizeHint = (640,480) ## no clue why this is needed, but it seems to be the default sizeHint for GraphicsView.
|
||||
## without it, the widget will not compete for space against another GraphicsView.
|
||||
QtGui.QWidget.__init__(self)
|
||||
self._proc = mp.QtProcess()
|
||||
self.pg = self._proc._import('pyqtgraph')
|
||||
@ -26,12 +28,18 @@ class RemoteGraphicsView(QtGui.QWidget):
|
||||
self._view = rpgRemote.Renderer(*args, **kwds)
|
||||
self._view._setProxyOptions(deferGetattr=True)
|
||||
self.setFocusPolicy(self._view.focusPolicy())
|
||||
self.setSizePolicy(QtGui.QSizePolicy.Expanding, QtGui.QSizePolicy.Expanding)
|
||||
self.setMouseTracking(True)
|
||||
|
||||
shmFileName = self._view.shmFileName()
|
||||
self.shmFile = open(shmFileName, 'r')
|
||||
self.shm = mmap.mmap(self.shmFile.fileno(), mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_READ)
|
||||
|
||||
self._view.sceneRendered.connect(mp.proxy(self.remoteSceneChanged))
|
||||
self._view.sceneRendered.connect(mp.proxy(self.remoteSceneChanged)) #, callSync='off'))
|
||||
## Note: we need synchronous signals
|
||||
## even though there is no return value--
|
||||
## this informs the renderer that it is
|
||||
## safe to begin rendering again.
|
||||
|
||||
for method in ['scene', 'setCentralItem']:
|
||||
setattr(self, method, getattr(self._view, method))
|
||||
@ -41,8 +49,12 @@ class RemoteGraphicsView(QtGui.QWidget):
|
||||
self._view.resize(self.size(), _callSync='off')
|
||||
return ret
|
||||
|
||||
def sizeHint(self):
|
||||
return QtCore.QSize(*self._sizeHint)
|
||||
|
||||
def remoteSceneChanged(self, data):
|
||||
w, h, size = data
|
||||
#self._sizeHint = (whint, hhint)
|
||||
if self.shm.size != size:
|
||||
self.shm.close()
|
||||
self.shm = mmap.mmap(self.shmFile.fileno(), size, mmap.MAP_SHARED, mmap.PROT_READ)
|
||||
@ -82,7 +94,17 @@ class RemoteGraphicsView(QtGui.QWidget):
|
||||
ev.accept()
|
||||
return QtGui.QWidget.keyEvent(self, ev)
|
||||
|
||||
|
||||
def enterEvent(self, ev):
|
||||
self._view.enterEvent(ev.type(), _callSync='off')
|
||||
return QtGui.QWidget.enterEvent(self, ev)
|
||||
|
||||
def leaveEvent(self, ev):
|
||||
self._view.leaveEvent(ev.type(), _callSync='off')
|
||||
return QtGui.QWidget.leaveEvent(self, ev)
|
||||
|
||||
def remoteProcess(self):
|
||||
"""Return the remote process handle. (see multiprocess.remoteproxy.RemoteEventHandler)"""
|
||||
return self._proc
|
||||
|
||||
class Renderer(GraphicsView):
|
||||
|
||||
@ -126,6 +148,8 @@ class Renderer(GraphicsView):
|
||||
def renderView(self):
|
||||
if self.img is None:
|
||||
## make sure shm is large enough and get its address
|
||||
if self.width() == 0 or self.height() == 0:
|
||||
return
|
||||
size = self.width() * self.height() * 4
|
||||
if size > self.shm.size():
|
||||
self.shm.resize(size)
|
||||
@ -168,5 +192,14 @@ class Renderer(GraphicsView):
|
||||
GraphicsView.keyEvent(self, QtGui.QKeyEvent(typ, mods, text, autorep, count))
|
||||
return ev.accepted()
|
||||
|
||||
def enterEvent(self, typ):
|
||||
ev = QtCore.QEvent(QtCore.QEvent.Type(typ))
|
||||
return GraphicsView.enterEvent(self, ev)
|
||||
|
||||
def leaveEvent(self, typ):
|
||||
ev = QtCore.QEvent(QtCore.QEvent.Type(typ))
|
||||
return GraphicsView.leaveEvent(self, ev)
|
||||
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user