From d65008dd63152d9da709211b42ffdbf020a0d1e5 Mon Sep 17 00:00:00 2001 From: Luke Campagnola Date: Fri, 4 Sep 2015 15:53:08 -0400 Subject: [PATCH] defer debug message formatting to improve multiprocess communication performance --- pyqtgraph/multiprocess/processes.py | 6 +++--- pyqtgraph/multiprocess/remoteproxy.py | 30 +++++++++++++++------------ 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/pyqtgraph/multiprocess/processes.py b/pyqtgraph/multiprocess/processes.py index a121487b..c7e4a80c 100644 --- a/pyqtgraph/multiprocess/processes.py +++ b/pyqtgraph/multiprocess/processes.py @@ -156,14 +156,14 @@ class Process(RemoteEventHandler): time.sleep(0.05) self.debugMsg('Child process exited. (%d)' % self.proc.returncode) - def debugMsg(self, msg): + def debugMsg(self, msg, *args): if hasattr(self, '_stdoutForwarder'): ## Lock output from subprocess to make sure we do not get line collisions with self._stdoutForwarder.lock: with self._stderrForwarder.lock: - RemoteEventHandler.debugMsg(self, msg) + RemoteEventHandler.debugMsg(self, msg, *args) else: - RemoteEventHandler.debugMsg(self, msg) + RemoteEventHandler.debugMsg(self, msg, *args) def startEventLoop(name, port, authkey, ppid, debug=False): diff --git a/pyqtgraph/multiprocess/remoteproxy.py b/pyqtgraph/multiprocess/remoteproxy.py index 4f484b74..66db1221 100644 --- a/pyqtgraph/multiprocess/remoteproxy.py +++ b/pyqtgraph/multiprocess/remoteproxy.py @@ -88,10 +88,10 @@ class RemoteEventHandler(object): print(pid, cls.handlers) raise - def debugMsg(self, msg): + def debugMsg(self, msg, *args): if not self.debug: return - cprint.cout(self.debug, "[%d] %s\n" % (os.getpid(), str(msg)), -1) + cprint.cout(self.debug, "[%d] %s\n" % (os.getpid(), str(msg)%args), -1) def getProxyOption(self, opt): with self.optsLock: @@ -145,7 +145,7 @@ class RemoteEventHandler(object): sys.excepthook(*sys.exc_info()) if numProcessed > 0: - self.debugMsg('processRequests: finished %d requests' % numProcessed) + self.debugMsg('processRequests: finished %d requests', numProcessed) return numProcessed def handleRequest(self): @@ -166,15 +166,15 @@ class RemoteEventHandler(object): self.debugMsg(' handleRequest: got IOError 4 from recv; try again.') continue else: - self.debugMsg(' handleRequest: got IOError %d from recv (%s); raise ClosedError.' % (err.errno, err.strerror)) + self.debugMsg(' handleRequest: got IOError %d from recv (%s); raise ClosedError.', err.errno, err.strerror) raise ClosedError() - self.debugMsg(" handleRequest: received %s %s" % (str(cmd), str(reqId))) + self.debugMsg(" handleRequest: received %s %s", cmd, reqId) ## read byte messages following the main request byteData = [] if nByteMsgs > 0: - self.debugMsg(" handleRequest: reading %d byte messages" % nByteMsgs) + self.debugMsg(" handleRequest: reading %d byte messages", nByteMsgs) for i in range(nByteMsgs): while True: try: @@ -199,7 +199,7 @@ class RemoteEventHandler(object): ## (this is already a return from a previous request) opts = pickle.loads(optStr) - self.debugMsg(" handleRequest: id=%s opts=%s" % (str(reqId), str(opts))) + self.debugMsg(" handleRequest: id=%s opts=%s", reqId, opts) #print os.getpid(), "received request:", cmd, reqId, opts returnType = opts.get('returnType', 'auto') @@ -279,7 +279,7 @@ class RemoteEventHandler(object): if reqId is not None: if exc is None: - self.debugMsg(" handleRequest: sending return value for %d: %s" % (reqId, str(result))) + self.debugMsg(" handleRequest: sending return value for %d: %s", reqId, result) #print "returnValue:", returnValue, result if returnType == 'auto': with self.optsLock: @@ -294,7 +294,7 @@ class RemoteEventHandler(object): sys.excepthook(*sys.exc_info()) self.replyError(reqId, *sys.exc_info()) else: - self.debugMsg(" handleRequest: returning exception for %d" % reqId) + self.debugMsg(" handleRequest: returning exception for %d", reqId) self.replyError(reqId, *exc) elif exc is not None: @@ -443,16 +443,16 @@ class RemoteEventHandler(object): ## Send primary request request = (request, reqId, nByteMsgs, optStr) - self.debugMsg('send request: cmd=%s nByteMsgs=%d id=%s opts=%s' % (str(request[0]), nByteMsgs, str(reqId), str(opts))) + self.debugMsg('send request: cmd=%s nByteMsgs=%d id=%s opts=%s', request[0], nByteMsgs, reqId, opts) self.conn.send(request) ## follow up by sending byte messages if byteData is not None: for obj in byteData: ## Remote process _must_ be prepared to read the same number of byte messages! self.conn.send_bytes(obj) - self.debugMsg(' sent %d byte messages' % len(byteData)) + self.debugMsg(' sent %d byte messages', len(byteData)) - self.debugMsg(' call sync: %s' % callSync) + self.debugMsg(' call sync: %s', callSync) if callSync == 'off': return @@ -572,7 +572,7 @@ class RemoteEventHandler(object): try: self.send(request='del', opts=dict(proxyId=proxyId), callSync='off') - except IOError: ## if remote process has closed down, there is no need to send delete requests anymore + except ClosedError: ## if remote process has closed down, there is no need to send delete requests anymore pass def transfer(self, obj, **kwds): @@ -786,6 +786,7 @@ class ObjectProxy(object): 'returnType': None, ## 'proxy', 'value', 'auto', None 'deferGetattr': None, ## True, False, None 'noProxyTypes': None, ## list of types to send by value instead of by proxy + 'autoProxy': None, } self.__dict__['_handler'] = RemoteEventHandler.getHandler(processId) @@ -839,6 +840,9 @@ class ObjectProxy(object): sent to the remote process. ============= ============================================================= """ + for k in kwds: + if k not in self._proxyOptions: + raise KeyError("Unrecognized proxy option '%s'" % k) self._proxyOptions.update(kwds) def _getValue(self):