This commit is contained in:
Luke Campagnola 2013-02-13 11:43:22 -05:00
parent 4dbc411d19
commit ccc81c6919
4 changed files with 114 additions and 34 deletions

View File

@ -5,20 +5,25 @@ if __name__ == '__main__':
if hasattr(os, 'setpgrp'): if hasattr(os, 'setpgrp'):
os.setpgrp() ## prevents signals (notably keyboard interrupt) being forwarded from parent to this process os.setpgrp() ## prevents signals (notably keyboard interrupt) being forwarded from parent to this process
if sys.version[0] == '3': if sys.version[0] == '3':
name, port, authkey, ppid, targetStr, path, pyside = pickle.load(sys.stdin.buffer) #name, port, authkey, ppid, targetStr, path, pyside = pickle.load(sys.stdin.buffer)
opts = pickle.load(sys.stdin.buffer)
else: else:
name, port, authkey, ppid, targetStr, path, pyside = pickle.load(sys.stdin) #name, port, authkey, ppid, targetStr, path, pyside = pickle.load(sys.stdin)
opts = pickle.load(sys.stdin)
#print "key:", ' '.join([str(ord(x)) for x in authkey]) #print "key:", ' '.join([str(ord(x)) for x in authkey])
path = opts.pop('path', None)
if path is not None: if path is not None:
## rewrite sys.path without assigning a new object--no idea who already has a reference to the existing list. ## rewrite sys.path without assigning a new object--no idea who already has a reference to the existing list.
while len(sys.path) > 0: while len(sys.path) > 0:
sys.path.pop() sys.path.pop()
sys.path.extend(path) sys.path.extend(path)
if pyside: if opts.pop('pyside', False):
import PySide import PySide
#import pyqtgraph #import pyqtgraph
#import pyqtgraph.multiprocess.processes #import pyqtgraph.multiprocess.processes
targetStr = opts.pop('targetStr')
target = pickle.loads(targetStr) ## unpickling the target should import everything we need target = pickle.loads(targetStr) ## unpickling the target should import everything we need
target(name, port, authkey, ppid) #target(name, port, authkey, ppid)
target(**opts) ## Send all other options to the target function
sys.exit(0) sys.exit(0)

View File

@ -35,7 +35,7 @@ class Process(RemoteEventHandler):
ProxyObject for more information. ProxyObject for more information.
""" """
def __init__(self, name=None, target=None, executable=None, copySysPath=True): def __init__(self, name=None, target=None, executable=None, copySysPath=True, debug=False):
""" """
============ ============================================================= ============ =============================================================
Arguments: Arguments:
@ -46,7 +46,9 @@ class Process(RemoteEventHandler):
process to process requests from the parent process until it process to process requests from the parent process until it
is asked to quit. If you wish to specify a different target, is asked to quit. If you wish to specify a different target,
it must be picklable (bound methods are not). it must be picklable (bound methods are not).
copySysPath If true, copy the contents of sys.path to the remote process copySysPath If True, copy the contents of sys.path to the remote process
debug If True, print detailed information about communication
with the child process.
============ ============================================================= ============ =============================================================
""" """
@ -56,6 +58,7 @@ class Process(RemoteEventHandler):
name = str(self) name = str(self)
if executable is None: if executable is None:
executable = sys.executable executable = sys.executable
self.debug = debug
## random authentication key ## random authentication key
authkey = os.urandom(20) authkey = os.urandom(20)
@ -75,23 +78,46 @@ class Process(RemoteEventHandler):
## start remote process, instruct it to run target function ## start remote process, instruct it to run target function
sysPath = sys.path if copySysPath else None sysPath = sys.path if copySysPath else None
bootstrap = os.path.abspath(os.path.join(os.path.dirname(__file__), 'bootstrap.py')) bootstrap = os.path.abspath(os.path.join(os.path.dirname(__file__), 'bootstrap.py'))
self.debugMsg('Starting child process (%s %s)' % (executable, bootstrap))
self.proc = subprocess.Popen((executable, bootstrap), stdin=subprocess.PIPE) self.proc = subprocess.Popen((executable, bootstrap), stdin=subprocess.PIPE)
targetStr = pickle.dumps(target) ## double-pickle target so that child has a chance to targetStr = pickle.dumps(target) ## double-pickle target so that child has a chance to
## set its sys.path properly before unpickling the target ## set its sys.path properly before unpickling the target
pid = os.getpid() # we must sent pid to child because windows does not have getppid pid = os.getpid() # we must send pid to child because windows does not have getppid
pyside = USE_PYSIDE pyside = USE_PYSIDE
## Send everything the remote process needs to start correctly ## Send everything the remote process needs to start correctly
pickle.dump((name+'_child', port, authkey, pid, targetStr, sysPath, pyside), self.proc.stdin) data = dict(
name=name+'_child',
port=port,
authkey=authkey,
ppid=pid,
targetStr=targetStr,
path=sysPath,
pyside=pyside,
debug=debug
)
pickle.dump(data, self.proc.stdin)
self.proc.stdin.close() self.proc.stdin.close()
## open connection for remote process ## open connection for remote process
conn = l.accept() self.debugMsg('Listening for child process..')
RemoteEventHandler.__init__(self, conn, name+'_parent', pid=self.proc.pid) while True:
try:
conn = l.accept()
break
except IOError as err:
if err.errno == 4: # interrupted; try again
continue
else:
raise
RemoteEventHandler.__init__(self, conn, name+'_parent', pid=self.proc.pid, debug=debug)
self.debugMsg('Connected to child process.')
atexit.register(self.join) atexit.register(self.join)
def join(self, timeout=10): def join(self, timeout=10):
self.debugMsg('Joining child process..')
if self.proc.poll() is None: if self.proc.poll() is None:
self.close() self.close()
start = time.time() start = time.time()
@ -99,13 +125,14 @@ class Process(RemoteEventHandler):
if timeout is not None and time.time() - start > timeout: if timeout is not None and time.time() - start > timeout:
raise Exception('Timed out waiting for remote process to end.') raise Exception('Timed out waiting for remote process to end.')
time.sleep(0.05) time.sleep(0.05)
self.debugMsg('Child process exited. (%d)' % self.proc.returncode)
def startEventLoop(name, port, authkey, ppid): def startEventLoop(name, port, authkey, ppid, debug=False):
conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey) conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey)
global HANDLER global HANDLER
#ppid = 0 if not hasattr(os, 'getppid') else os.getppid() #ppid = 0 if not hasattr(os, 'getppid') else os.getppid()
HANDLER = RemoteEventHandler(conn, name, ppid) HANDLER = RemoteEventHandler(conn, name, ppid, debug=debug)
while True: while True:
try: try:
HANDLER.processRequests() # exception raised when the loop should exit HANDLER.processRequests() # exception raised when the loop should exit
@ -329,7 +356,7 @@ class QtProcess(Process):
except ClosedError: except ClosedError:
self.timer.stop() self.timer.stop()
def startQtEventLoop(name, port, authkey, ppid): def startQtEventLoop(name, port, authkey, ppid, debug=False):
conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey) conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey)
from pyqtgraph.Qt import QtGui, QtCore from pyqtgraph.Qt import QtGui, QtCore
#from PyQt4 import QtGui, QtCore #from PyQt4 import QtGui, QtCore
@ -342,7 +369,7 @@ def startQtEventLoop(name, port, authkey, ppid):
global HANDLER global HANDLER
#ppid = 0 if not hasattr(os, 'getppid') else os.getppid() #ppid = 0 if not hasattr(os, 'getppid') else os.getppid()
HANDLER = RemoteQtEventHandler(conn, name, ppid) HANDLER = RemoteQtEventHandler(conn, name, ppid, debug=debug)
HANDLER.startEventTimer() HANDLER.startEventTimer()
app.exec_() app.exec_()

View File

@ -42,7 +42,8 @@ class RemoteEventHandler(object):
handlers = {} ## maps {process ID : handler}. This allows unpickler to determine which process handlers = {} ## maps {process ID : handler}. This allows unpickler to determine which process
## an object proxy belongs to ## an object proxy belongs to
def __init__(self, connection, name, pid): def __init__(self, connection, name, pid, debug=False):
self.debug = debug
self.conn = connection self.conn = connection
self.name = name self.name = name
self.results = {} ## reqId: (status, result); cache of request results received from the remote process self.results = {} ## reqId: (status, result); cache of request results received from the remote process
@ -76,6 +77,11 @@ class RemoteEventHandler(object):
print(pid, cls.handlers) print(pid, cls.handlers)
raise raise
def debugMsg(self, msg):
if not self.debug:
return
print("[%d] %s" % (os.getpid(), str(msg)))
def getProxyOption(self, opt): def getProxyOption(self, opt):
return self.proxyOptions[opt] return self.proxyOptions[opt]
@ -91,7 +97,9 @@ class RemoteEventHandler(object):
after no more events are immediately available. (non-blocking) after no more events are immediately available. (non-blocking)
Returns the number of events processed. Returns the number of events processed.
""" """
self.debugMsg('processRequests:')
if self.exited: if self.exited:
self.debugMsg(' processRequests: exited already; raise ClosedError.')
raise ClosedError() raise ClosedError()
numProcessed = 0 numProcessed = 0
@ -100,37 +108,64 @@ class RemoteEventHandler(object):
self.handleRequest() self.handleRequest()
numProcessed += 1 numProcessed += 1
except ClosedError: except ClosedError:
self.debugMsg(' processRequests: got ClosedError from handleRequest; setting exited=True.')
self.exited = True self.exited = True
raise raise
except IOError as err: #except IOError as err: ## let handleRequest take care of this.
if err.errno == 4: ## interrupted system call; try again #self.debugMsg(' got IOError from handleRequest; try again.')
continue #if err.errno == 4: ## interrupted system call; try again
else: #continue
raise #else:
#raise
except: except:
print("Error in process %s" % self.name) print("Error in process %s" % self.name)
sys.excepthook(*sys.exc_info()) sys.excepthook(*sys.exc_info())
self.debugMsg(' processRequests: finished %d requests' % numProcessed)
return numProcessed return numProcessed
def handleRequest(self): def handleRequest(self):
"""Handle a single request from the remote process. """Handle a single request from the remote process.
Blocks until a request is available.""" Blocks until a request is available."""
result = None result = None
try: while True:
cmd, reqId, nByteMsgs, optStr = self.conn.recv() ## args, kwds are double-pickled to ensure this recv() call never fails try:
except (EOFError, IOError): ## args, kwds are double-pickled to ensure this recv() call never fails
## remote process has shut down; end event loop cmd, reqId, nByteMsgs, optStr = self.conn.recv()
raise ClosedError() break
#print os.getpid(), "received request:", cmd, reqId except EOFError:
self.debugMsg(' handleRequest: got EOFError from recv; raise ClosedError.')
## remote process has shut down; end event loop
raise ClosedError()
except IOError as err:
if err.errno == 4: ## interrupted system call; try again
self.debugMsg(' handleRequest: got IOError 4 from recv; try again.')
continue
else:
self.debugMsg(' handleRequest: got IOError %d from recv (%s); raise ClosedError.' % (err.errno, err.strerror))
raise ClosedError()
self.debugMsg(" handleRequest: received %s %s" % (str(cmd), str(reqId)))
## read byte messages following the main request ## read byte messages following the main request
byteData = [] byteData = []
if nByteMsgs > 0:
self.debugMsg(" handleRequest: reading %d byte messages" % nByteMsgs)
for i in range(nByteMsgs): for i in range(nByteMsgs):
try: while True:
byteData.append(self.conn.recv_bytes()) try:
except (EOFError, IOError): byteData.append(self.conn.recv_bytes())
raise ClosedError() break
except EOFError:
self.debugMsg(" handleRequest: got EOF while reading byte messages; raise ClosedError.")
raise ClosedError()
except IOError as err:
if err.errno == 4:
self.debugMsg(" handleRequest: got IOError 4 while reading byte messages; try again.")
continue
else:
self.debugMsg(" handleRequest: got IOError while reading byte messages; raise ClosedError.")
raise ClosedError()
try: try:
@ -140,6 +175,7 @@ class RemoteEventHandler(object):
## (this is already a return from a previous request) ## (this is already a return from a previous request)
opts = pickle.loads(optStr) opts = pickle.loads(optStr)
self.debugMsg(" handleRequest: id=%s opts=%s" % (str(reqId), str(opts)))
#print os.getpid(), "received request:", cmd, reqId, opts #print os.getpid(), "received request:", cmd, reqId, opts
returnType = opts.get('returnType', 'auto') returnType = opts.get('returnType', 'auto')
@ -213,6 +249,7 @@ class RemoteEventHandler(object):
if reqId is not None: if reqId is not None:
if exc is None: if exc is None:
self.debugMsg(" handleRequest: sending return value for %d: %s" % (reqId, str(result)))
#print "returnValue:", returnValue, result #print "returnValue:", returnValue, result
if returnType == 'auto': if returnType == 'auto':
result = self.autoProxy(result, self.proxyOptions['noProxyTypes']) result = self.autoProxy(result, self.proxyOptions['noProxyTypes'])
@ -225,6 +262,7 @@ class RemoteEventHandler(object):
sys.excepthook(*sys.exc_info()) sys.excepthook(*sys.exc_info())
self.replyError(reqId, *sys.exc_info()) self.replyError(reqId, *sys.exc_info())
else: else:
self.debugMsg(" handleRequest: returning exception for %d" % reqId)
self.replyError(reqId, *exc) self.replyError(reqId, *exc)
elif exc is not None: elif exc is not None:
@ -368,13 +406,16 @@ class RemoteEventHandler(object):
## Send primary request ## Send primary request
request = (request, reqId, nByteMsgs, optStr) request = (request, reqId, nByteMsgs, optStr)
self.debugMsg('send request: cmd=%s nByteMsgs=%d id=%s opts=%s' % (str(request[0]), nByteMsgs, str(reqId), str(opts)))
self.conn.send(request) self.conn.send(request)
## follow up by sending byte messages ## follow up by sending byte messages
if byteData is not None: if byteData is not None:
for obj in byteData: ## Remote process _must_ be prepared to read the same number of byte messages! for obj in byteData: ## Remote process _must_ be prepared to read the same number of byte messages!
self.conn.send_bytes(obj) self.conn.send_bytes(obj)
self.debugMsg(' sent %d byte messages' % len(byteData))
self.debugMsg(' call sync: %s' % callSync)
if callSync == 'off': if callSync == 'off':
return return

View File

@ -1,4 +1,4 @@
from pyqtgraph.Qt import QtGui, QtCore from pyqtgraph.Qt import QtGui, QtCore, USE_PYSIDE
import pyqtgraph.multiprocess as mp import pyqtgraph.multiprocess as mp
import pyqtgraph as pg import pyqtgraph as pg
from .GraphicsView import GraphicsView from .GraphicsView import GraphicsView
@ -21,13 +21,14 @@ class RemoteGraphicsView(QtGui.QWidget):
self._sizeHint = (640,480) ## no clue why this is needed, but it seems to be the default sizeHint for GraphicsView. self._sizeHint = (640,480) ## no clue why this is needed, but it seems to be the default sizeHint for GraphicsView.
## without it, the widget will not compete for space against another GraphicsView. ## without it, the widget will not compete for space against another GraphicsView.
QtGui.QWidget.__init__(self) QtGui.QWidget.__init__(self)
self._proc = mp.QtProcess() self._proc = mp.QtProcess(debug=False)
self.pg = self._proc._import('pyqtgraph') self.pg = self._proc._import('pyqtgraph')
self.pg.setConfigOptions(**self.pg.CONFIG_OPTIONS) self.pg.setConfigOptions(**self.pg.CONFIG_OPTIONS)
rpgRemote = self._proc._import('pyqtgraph.widgets.RemoteGraphicsView') rpgRemote = self._proc._import('pyqtgraph.widgets.RemoteGraphicsView')
self._view = rpgRemote.Renderer(*args, **kwds) self._view = rpgRemote.Renderer(*args, **kwds)
self._view._setProxyOptions(deferGetattr=True) self._view._setProxyOptions(deferGetattr=True)
self.setFocusPolicy(QtCore.Qt.FocusPolicy(self._view.focusPolicy()))
self.setFocusPolicy(QtCore.Qt.StrongFocus)
self.setSizePolicy(QtGui.QSizePolicy.Expanding, QtGui.QSizePolicy.Expanding) self.setSizePolicy(QtGui.QSizePolicy.Expanding, QtGui.QSizePolicy.Expanding)
self.setMouseTracking(True) self.setMouseTracking(True)
self.shm = None self.shm = None
@ -114,6 +115,7 @@ class RemoteGraphicsView(QtGui.QWidget):
return self._proc return self._proc
class Renderer(GraphicsView): class Renderer(GraphicsView):
## Created by the remote process to handle render requests
sceneRendered = QtCore.Signal(object) sceneRendered = QtCore.Signal(object)
@ -175,7 +177,12 @@ class Renderer(GraphicsView):
address = ctypes.addressof(ctypes.c_char.from_buffer(self.shm, 0)) address = ctypes.addressof(ctypes.c_char.from_buffer(self.shm, 0))
## render the scene directly to shared memory ## render the scene directly to shared memory
self.img = QtGui.QImage(address, self.width(), self.height(), QtGui.QImage.Format_ARGB32) if USE_PYSIDE:
ch = ctypes.c_char.from_buffer(self.shm, 0)
#ch = ctypes.c_char_p(address)
self.img = QtGui.QImage(ch, self.width(), self.height(), QtGui.QImage.Format_ARGB32)
else:
self.img = QtGui.QImage(address, self.width(), self.height(), QtGui.QImage.Format_ARGB32)
self.img.fill(0xffffffff) self.img.fill(0xffffffff)
p = QtGui.QPainter(self.img) p = QtGui.QPainter(self.img)
self.render(p, self.viewRect(), self.rect()) self.render(p, self.viewRect(), self.rect())