defer debug message formatting to improve multiprocess communication performance

This commit is contained in:
Luke Campagnola 2015-09-04 15:53:08 -04:00
parent cc655197d1
commit d65008dd63
2 changed files with 20 additions and 16 deletions

View File

@ -156,14 +156,14 @@ class Process(RemoteEventHandler):
time.sleep(0.05)
self.debugMsg('Child process exited. (%d)' % self.proc.returncode)
def debugMsg(self, msg):
def debugMsg(self, msg, *args):
if hasattr(self, '_stdoutForwarder'):
## Lock output from subprocess to make sure we do not get line collisions
with self._stdoutForwarder.lock:
with self._stderrForwarder.lock:
RemoteEventHandler.debugMsg(self, msg)
RemoteEventHandler.debugMsg(self, msg, *args)
else:
RemoteEventHandler.debugMsg(self, msg)
RemoteEventHandler.debugMsg(self, msg, *args)
def startEventLoop(name, port, authkey, ppid, debug=False):

View File

@ -88,10 +88,10 @@ class RemoteEventHandler(object):
print(pid, cls.handlers)
raise
def debugMsg(self, msg):
def debugMsg(self, msg, *args):
if not self.debug:
return
cprint.cout(self.debug, "[%d] %s\n" % (os.getpid(), str(msg)), -1)
cprint.cout(self.debug, "[%d] %s\n" % (os.getpid(), str(msg)%args), -1)
def getProxyOption(self, opt):
with self.optsLock:
@ -145,7 +145,7 @@ class RemoteEventHandler(object):
sys.excepthook(*sys.exc_info())
if numProcessed > 0:
self.debugMsg('processRequests: finished %d requests' % numProcessed)
self.debugMsg('processRequests: finished %d requests', numProcessed)
return numProcessed
def handleRequest(self):
@ -166,15 +166,15 @@ class RemoteEventHandler(object):
self.debugMsg(' handleRequest: got IOError 4 from recv; try again.')
continue
else:
self.debugMsg(' handleRequest: got IOError %d from recv (%s); raise ClosedError.' % (err.errno, err.strerror))
self.debugMsg(' handleRequest: got IOError %d from recv (%s); raise ClosedError.', err.errno, err.strerror)
raise ClosedError()
self.debugMsg(" handleRequest: received %s %s" % (str(cmd), str(reqId)))
self.debugMsg(" handleRequest: received %s %s", cmd, reqId)
## read byte messages following the main request
byteData = []
if nByteMsgs > 0:
self.debugMsg(" handleRequest: reading %d byte messages" % nByteMsgs)
self.debugMsg(" handleRequest: reading %d byte messages", nByteMsgs)
for i in range(nByteMsgs):
while True:
try:
@ -199,7 +199,7 @@ class RemoteEventHandler(object):
## (this is already a return from a previous request)
opts = pickle.loads(optStr)
self.debugMsg(" handleRequest: id=%s opts=%s" % (str(reqId), str(opts)))
self.debugMsg(" handleRequest: id=%s opts=%s", reqId, opts)
#print os.getpid(), "received request:", cmd, reqId, opts
returnType = opts.get('returnType', 'auto')
@ -279,7 +279,7 @@ class RemoteEventHandler(object):
if reqId is not None:
if exc is None:
self.debugMsg(" handleRequest: sending return value for %d: %s" % (reqId, str(result)))
self.debugMsg(" handleRequest: sending return value for %d: %s", reqId, result)
#print "returnValue:", returnValue, result
if returnType == 'auto':
with self.optsLock:
@ -294,7 +294,7 @@ class RemoteEventHandler(object):
sys.excepthook(*sys.exc_info())
self.replyError(reqId, *sys.exc_info())
else:
self.debugMsg(" handleRequest: returning exception for %d" % reqId)
self.debugMsg(" handleRequest: returning exception for %d", reqId)
self.replyError(reqId, *exc)
elif exc is not None:
@ -443,16 +443,16 @@ class RemoteEventHandler(object):
## Send primary request
request = (request, reqId, nByteMsgs, optStr)
self.debugMsg('send request: cmd=%s nByteMsgs=%d id=%s opts=%s' % (str(request[0]), nByteMsgs, str(reqId), str(opts)))
self.debugMsg('send request: cmd=%s nByteMsgs=%d id=%s opts=%s', request[0], nByteMsgs, reqId, opts)
self.conn.send(request)
## follow up by sending byte messages
if byteData is not None:
for obj in byteData: ## Remote process _must_ be prepared to read the same number of byte messages!
self.conn.send_bytes(obj)
self.debugMsg(' sent %d byte messages' % len(byteData))
self.debugMsg(' sent %d byte messages', len(byteData))
self.debugMsg(' call sync: %s' % callSync)
self.debugMsg(' call sync: %s', callSync)
if callSync == 'off':
return
@ -572,7 +572,7 @@ class RemoteEventHandler(object):
try:
self.send(request='del', opts=dict(proxyId=proxyId), callSync='off')
except IOError: ## if remote process has closed down, there is no need to send delete requests anymore
except ClosedError: ## if remote process has closed down, there is no need to send delete requests anymore
pass
def transfer(self, obj, **kwds):
@ -786,6 +786,7 @@ class ObjectProxy(object):
'returnType': None, ## 'proxy', 'value', 'auto', None
'deferGetattr': None, ## True, False, None
'noProxyTypes': None, ## list of types to send by value instead of by proxy
'autoProxy': None,
}
self.__dict__['_handler'] = RemoteEventHandler.getHandler(processId)
@ -839,6 +840,9 @@ class ObjectProxy(object):
sent to the remote process.
============= =============================================================
"""
for k in kwds:
if k not in self._proxyOptions:
raise KeyError("Unrecognized proxy option '%s'" % k)
self._proxyOptions.update(kwds)
def _getValue(self):