Added custom multiprocessing module:

- allows starting new processes and controlling them remotely from the parent process
  - remote processes can run their own GUI, Qt signals can be connected between processes
    (in general this is not possible with the built-in multiprocessing module due to
    the use of fork() ).
  - Control works by a system of proxy-objects such that controlling a remote process
    looks almost exactly like working with local objects.
  - Uses sockets to communicate between processes (so in theory could be made to 
    work over a network), but also includes a mode that uses fork() to allow fast
    parallelization.
  - Wicked-easy inline parallelization by adding only one line of code to break up work between
    processes (requires fork; sorry windows users)
This commit is contained in:
Luke Campagnola 2012-06-18 15:20:35 -04:00
parent c7a78642fd
commit 72006fe05b
5 changed files with 1249 additions and 0 deletions

84
examples/multiprocess.py Normal file
View File

@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
import initExample ## Add path to library (just for examples; you do not need this)
import numpy as np
import pyqtgraph.multiprocess as mp
from pyqtgraph.multiprocess.parallelizer import Parallelize #, Parallelizer
import time
print "\n=================\nParallelize"
tasks = [1,2,4,8]
results = [None] * len(tasks)
size = 2000000
start = time.time()
with Parallelize(enumerate(tasks), results=results, workers=1) as tasker:
for i, x in tasker:
print i, x
tot = 0
for j in xrange(size):
tot += j * x
results[i] = tot
print results
print "serial:", time.time() - start
start = time.time()
with Parallelize(enumerate(tasks), results=results) as tasker:
for i, x in tasker:
print i, x
tot = 0
for j in xrange(size):
tot += j * x
results[i] = tot
print results
print "parallel:", time.time() - start
print "\n=================\nStart Process"
proc = mp.Process()
import os
print "parent:", os.getpid(), "child:", proc.proc.pid
print "started"
rnp = proc._import('numpy')
arr = rnp.array([1,2,3,4])
print repr(arr)
print str(arr)
print "return value:", repr(arr.mean(_returnType='value'))
print "return proxy:", repr(arr.mean(_returnType='proxy'))
print "return auto: ", repr(arr.mean(_returnType='auto'))
proc.join()
print "process finished"
print "\n=================\nStart ForkedProcess"
proc = mp.ForkedProcess()
rnp = proc._import('numpy')
arr = rnp.array([1,2,3,4])
print repr(arr)
print str(arr)
print repr(arr.mean())
proc.join()
print "process finished"
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore, QtGui
app = pg.QtGui.QApplication([])
print "\n=================\nStart QtProcess"
proc = mp.QtProcess()
d1 = proc.transfer(np.random.normal(size=1000))
d2 = proc.transfer(np.random.normal(size=1000))
rpg = proc._import('pyqtgraph')
plt = rpg.plot(d1+d2)
## Start Qt event loop unless running in interactive mode or using pyside.
#import sys
#if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
#QtGui.QApplication.instance().exec_()

22
multiprocess/__init__.py Normal file
View File

@ -0,0 +1,22 @@
"""
Multiprocessing utility library
(parallelization done the way I like it)
Luke Campagnola
2012.06.10
This library provides:
- simple mechanism for starting a new python interpreter process that can be controlled from the original process
(this allows, for example, displaying and manipulating plots in a remote process
while the parent process is free to do other work)
- proxy system that allows objects hosted in the remote process to be used as if they were local
- Qt signal connection between processes
- very simple in-line parallelization (fork only; does not work on windows) for number-crunching
TODO:
allow remote processes to serve as rendering engines that pass pixmaps back to the parent process for display
(RemoteGraphicsView class)
"""
from processes import *

View File

@ -0,0 +1,176 @@
import os, sys, time, multiprocessing
from processes import ForkedProcess
from remoteproxy import ExitError
class Parallelize:
"""
Class for ultra-simple inline parallelization on multi-core CPUs
Example::
## Here is the serial (single-process) task:
tasks = [1, 2, 4, 8]
results = []
for task in tasks:
result = processTask(task)
results.append(result)
print results
## Here is the parallelized version:
tasks = [1, 2, 4, 8]
results = []
with Parallelize(tasks, workers=4, results=results) as tasker:
for task in tasker:
result = processTask(task)
tasker.results.append(result)
print results
The only major caveat is that *result* in the example above must be picklable.
"""
def __init__(self, tasks, workers=None, block=True, **kwds):
"""
Args:
tasks - list of objects to be processed (Parallelize will determine how to distribute the tasks)
workers - number of worker processes or None to use number of CPUs in the system
kwds - objects to be shared by proxy with child processes
"""
self.block = block
if workers is None:
workers = multiprocessing.cpu_count()
if not hasattr(os, 'fork'):
workers = 1
self.workers = workers
self.tasks = list(tasks)
self.kwds = kwds
def __enter__(self):
self.proc = None
workers = self.workers
if workers == 1:
return Tasker(None, self.tasks, self.kwds)
self.childs = []
## break up tasks into one set per worker
chunks = [[] for i in xrange(workers)]
i = 0
for i in range(len(self.tasks)):
chunks[i%workers].append(self.tasks[i])
## fork and assign tasks to each worker
for i in range(workers):
proc = ForkedProcess(target=None, preProxy=self.kwds)
if not proc.isParent:
self.proc = proc
return Tasker(proc, chunks[i], proc.forkedProxies)
else:
self.childs.append(proc)
## process events from workers until all have exited.
activeChilds = self.childs[:]
while len(activeChilds) > 0:
for ch in activeChilds:
rem = []
try:
ch.processRequests()
except ExitError:
rem.append(ch)
for ch in rem:
activeChilds.remove(ch)
time.sleep(0.1)
return [] ## no tasks for parent process.
def __exit__(self, *exc_info):
if exc_info[0] is not None:
sys.excepthook(*exc_info)
if self.proc is not None:
os._exit(0)
def wait(self):
## wait for all child processes to finish
pass
class Tasker:
def __init__(self, proc, tasks, kwds):
self.proc = proc
self.tasks = tasks
for k, v in kwds.iteritems():
setattr(self, k, v)
def __iter__(self):
## we could fix this up such that tasks are retrieved from the parent process one at a time..
for task in self.tasks:
yield task
if self.proc is not None:
self.proc.close()
#class Parallelizer:
#"""
#Use::
#p = Parallelizer()
#with p(4) as i:
#p.finish(do_work(i))
#print p.results()
#"""
#def __init__(self):
#pass
#def __call__(self, n):
#self.replies = []
#self.conn = None ## indicates this is the parent process
#return Session(self, n)
#def finish(self, data):
#if self.conn is None:
#self.replies.append((self.i, data))
#else:
##print "send", self.i, data
#self.conn.send((self.i, data))
#os._exit(0)
#def result(self):
#print self.replies
#class Session:
#def __init__(self, par, n):
#self.par = par
#self.n = n
#def __enter__(self):
#self.childs = []
#for i in range(1, self.n):
#c1, c2 = multiprocessing.Pipe()
#pid = os.fork()
#if pid == 0: ## child
#self.par.i = i
#self.par.conn = c2
#self.childs = None
#c1.close()
#return i
#else:
#self.childs.append(c1)
#c2.close()
#self.par.i = 0
#return 0
#def __exit__(self, *exc_info):
#if exc_info[0] is not None:
#sys.excepthook(*exc_info)
#if self.childs is not None:
#self.par.replies.extend([conn.recv() for conn in self.childs])
#else:
#self.par.finish(None)

208
multiprocess/processes.py Normal file
View File

@ -0,0 +1,208 @@
from remoteproxy import RemoteEventHandler, ExitError, NoResultError, LocalObjectProxy, ObjectProxy
import subprocess, atexit, os, sys, time, random, socket
import cPickle as pickle
import multiprocessing.connection
class Process(RemoteEventHandler):
def __init__(self, name=None, target=None):
if target is None:
target = startEventLoop
if name is None:
name = str(self)
## random authentication key
authkey = ''.join([chr(random.getrandbits(7)) for i in range(20)])
## Listen for connection from remote process (and find free port number)
port = 10000
while True:
try:
l = multiprocessing.connection.Listener(('localhost', int(port)), authkey=authkey)
break
except socket.error as ex:
if ex.errno != 98:
raise
port += 1
## start remote process, instruct it to run target function
self.proc = subprocess.Popen((sys.executable, __file__, 'remote'), stdin=subprocess.PIPE)
pickle.dump((name+'_child', port, authkey, target), self.proc.stdin)
self.proc.stdin.close()
## open connection for remote process
conn = l.accept()
RemoteEventHandler.__init__(self, conn, name+'_parent', pid=self.proc.pid)
atexit.register(self.join)
def join(self, timeout=10):
if self.proc.poll() is None:
self.close()
start = time.time()
while self.proc.poll() is None:
if timeout is not None and time.time() - start > timeout:
raise Exception('Timed out waiting for remote process to end.')
time.sleep(0.05)
def startEventLoop(name, port, authkey):
conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey)
global HANDLER
HANDLER = RemoteEventHandler(conn, name, os.getppid())
while True:
try:
HANDLER.processRequests() # exception raised when the loop should exit
time.sleep(0.01)
except ExitError:
break
class ForkedProcess(RemoteEventHandler):
"""
ForkedProcess is a substitute for Process that uses os.fork() to generate a new process.
This is much faster than starting a completely new interpreter, but carries some caveats
and limitations:
- open file handles are shared with the parent process, which is potentially dangerous
- it is not possible to have a QApplication in both parent and child process
(unless both QApplications are created _after_ the call to fork())
- generally not thread-safe. Also, threads are not copied by fork(); the new process
will have only one thread that starts wherever fork() was called in the parent process.
- forked processes are unceremoniously terminated when join() is called; they are not
given any opportunity to clean up. (This prevents them calling any cleanup code that
was only intended to be used by the parent process)
"""
def __init__(self, name=None, target=0, preProxy=None):
"""
When initializing, an optional target may be given.
If no target is specified, self.eventLoop will be used.
If None is given, no target will be called (and it will be up
to the caller to properly shut down the forked process)
preProxy may be a dict of values that will appear as ObjectProxy
in the remote process (but do not need to be sent explicitly since
they are available immediately before the call to fork().
Proxies will be availabe as self.proxies[name].
"""
self.hasJoined = False
if target == 0:
target = self.eventLoop
if name is None:
name = str(self)
conn, remoteConn = multiprocessing.Pipe()
proxyIDs = {}
if preProxy is not None:
for k, v in preProxy.iteritems():
proxyId = LocalObjectProxy.registerObject(v)
proxyIDs[k] = proxyId
pid = os.fork()
if pid == 0:
self.isParent = False
conn.close()
sys.stdin.close() ## otherwise we screw with interactive prompts.
RemoteEventHandler.__init__(self, remoteConn, name+'_child', pid=os.getppid())
if target is not None:
target()
ppid = os.getppid()
self.forkedProxies = {}
for name, proxyId in proxyIDs.iteritems():
self.forkedProxies[name] = ObjectProxy(ppid, proxyId=proxyId, typeStr=repr(preProxy[name]))
else:
self.isParent = True
self.childPid = pid
remoteConn.close()
RemoteEventHandler.handlers = {} ## don't want to inherit any of this from the parent.
RemoteEventHandler.__init__(self, conn, name+'_parent', pid=pid)
atexit.register(self.join)
def eventLoop(self):
while True:
try:
self.processRequests() # exception raised when the loop should exit
time.sleep(0.01)
except ExitError:
sys.exit(0)
except:
print "Error occurred in forked event loop:"
sys.excepthook(*sys.exc_info())
def join(self, timeout=10):
if self.hasJoined:
return
#os.kill(pid, 9)
try:
self.close(callSync='sync', timeout=timeout, noCleanup=True) ## ask the child process to exit and require that it return a confirmation.
except IOError: ## probably remote process has already quit
pass
self.hasJoined = True
##Special set of subclasses that implement a Qt event loop instead.
class RemoteQtEventHandler(RemoteEventHandler):
def __init__(self, *args, **kwds):
RemoteEventHandler.__init__(self, *args, **kwds)
def startEventTimer(self):
from pyqtgraph.Qt import QtGui, QtCore
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.processRequests)
self.timer.start(10)
def processRequests(self):
try:
RemoteEventHandler.processRequests(self)
except ExitError:
from pyqtgraph.Qt import QtGui, QtCore
QtGui.QApplication.instance().quit()
self.timer.stop()
#raise
class QtProcess(Process):
def __init__(self, name=None):
Process.__init__(self, name, target=startQtEventLoop)
self.startEventTimer()
def startEventTimer(self):
from pyqtgraph.Qt import QtGui, QtCore ## avoid module-level import to keep bootstrap snappy.
self.timer = QtCore.QTimer()
app = QtGui.QApplication.instance()
if app is None:
raise Exception("Must create QApplication before starting QtProcess")
self.timer.timeout.connect(self.processRequests)
self.timer.start(10)
def processRequests(self):
try:
Process.processRequests(self)
except ExitError:
self.timer.stop()
def startQtEventLoop(name, port, authkey):
conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey)
from pyqtgraph.Qt import QtGui, QtCore
#from PyQt4 import QtGui, QtCore
app = QtGui.QApplication.instance()
#print app
if app is None:
app = QtGui.QApplication([])
app.setQuitOnLastWindowClosed(False) ## generally we want the event loop to stay open
## until it is explicitly closed by the parent process.
global HANDLER
HANDLER = RemoteQtEventHandler(conn, name, os.getppid())
HANDLER.startEventTimer()
app.exec_()
if __name__ == '__main__':
if len(sys.argv) == 2 and sys.argv[1] == 'remote': ## module has been invoked as script in new python interpreter.
name, port, authkey, target = pickle.load(sys.stdin)
target(name, port, authkey)
sys.exit(0)

759
multiprocess/remoteproxy.py Normal file
View File

@ -0,0 +1,759 @@
import os, __builtin__, time, sys, traceback, weakref
import cPickle as pickle
class ExitError(Exception):
pass
class NoResultError(Exception):
pass
class RemoteEventHandler(object):
handlers = {} ## maps {process ID : handler}. This allows unpickler to determine which process
## an object proxy belongs to
def __init__(self, connection, name, pid):
self.conn = connection
self.name = name
self.results = {} ## reqId: (status, result); cache of request results received from the remote process
## status is either 'result' or 'error'
## if 'error', then result will be (exception, formatted exceprion)
## where exception may be None if it could not be passed through the Connection.
self.proxies = {} ## maps {weakref(proxy): proxyId}; used to inform the remote process when a proxy has been deleted.
## attributes that affect the behavior of the proxy.
## See ObjectProxy._setProxyOptions for description
self.proxyOptions = {
'callSync': 'sync', ## 'sync', 'async', 'off'
'timeout': 10, ## float
'returnType': 'auto', ## 'proxy', 'value', 'auto'
'autoProxy': False, ## bool
'deferGetattr': False, ## True, False
'noProxyTypes': [ type(None), str, int, float, tuple, list, dict, LocalObjectProxy, ObjectProxy ],
}
self.nextRequestId = 0
self.exited = False
RemoteEventHandler.handlers[pid] = self ## register this handler as the one communicating with pid
@classmethod
def getHandler(cls, pid):
return cls.handlers[pid]
def getProxyOption(self, opt):
return self.proxyOptions[opt]
def setProxyOptions(self, **kwds):
"""
Set the default behavior options for object proxies.
See ObjectProxy._setProxyOptions for more info.
"""
self.proxyOptions.update(kwds)
def processRequests(self):
"""Process all pending requests from the pipe, return
after no more events are immediately available. (non-blocking)"""
if self.exited:
raise ExitError()
while self.conn.poll():
try:
self.handleRequest()
except ExitError:
self.exited = True
raise
except:
print "Error in process %s" % self.name
sys.excepthook(*sys.exc_info())
def handleRequest(self):
"""Handle a single request from the remote process.
Blocks until a request is available."""
result = None
try:
cmd, reqId, optStr = self.conn.recv() ## args, kwds are double-pickled to ensure this recv() call never fails
except EOFError:
## remote process has shut down; end event loop
raise ExitError()
except IOError:
raise ExitError()
#print os.getpid(), "received request:", cmd, reqId
try:
if cmd == 'result' or cmd == 'error':
resultId = reqId
reqId = None ## prevents attempt to return information from this request
## (this is already a return from a previous request)
opts = pickle.loads(optStr)
#print os.getpid(), "received request:", cmd, reqId, opts
returnType = opts.get('returnType', 'auto')
if cmd == 'result':
self.results[resultId] = ('result', opts['result'])
elif cmd == 'error':
self.results[resultId] = ('error', (opts['exception'], opts['excString']))
elif cmd == 'getObjAttr':
result = getattr(opts['obj'], opts['attr'])
elif cmd == 'callObj':
obj = opts['obj']
fnargs = opts['args']
fnkwds = opts['kwds']
if len(fnkwds) == 0: ## need to do this because some functions do not allow keyword arguments.
#print obj, fnargs
result = obj(*fnargs)
else:
result = obj(*fnargs, **fnkwds)
elif cmd == 'getObjValue':
result = opts['obj'] ## has already been unpickled into its local value
returnType = 'value'
elif cmd == 'transfer':
result = opts['obj']
returnType = 'proxy'
elif cmd == 'import':
name = opts['module']
fromlist = opts.get('fromlist', [])
mod = __builtin__.__import__(name, fromlist=fromlist)
if len(fromlist) == 0:
parts = name.lstrip('.').split('.')
result = mod
for part in parts[1:]:
result = getattr(result, part)
else:
result = map(mod.__getattr__, fromlist)
elif cmd == 'del':
LocalObjectProxy.releaseProxyId(opts['proxyId'])
#del self.proxiedObjects[opts['objId']]
elif cmd == 'close':
if reqId is not None:
result = True
returnType = 'value'
exc = None
except:
exc = sys.exc_info()
if reqId is not None:
if exc is None:
#print "returnValue:", returnValue, result
if returnType == 'auto':
result = self.autoProxy(result, self.proxyOptions['noProxyTypes'])
elif returnType == 'proxy':
result = LocalObjectProxy(result)
try:
self.replyResult(reqId, result)
except:
sys.excepthook(*sys.exc_info())
self.replyError(reqId, *sys.exc_info())
else:
self.replyError(reqId, *exc)
elif exc is not None:
sys.excepthook(*exc)
if cmd == 'close':
if opts.get('noCleanup', False) is True:
os._exit(0) ## exit immediately, do not pass GO, do not collect $200.
## (more importantly, do not call any code that would
## normally be invoked at exit)
else:
raise ExitError()
def replyResult(self, reqId, result):
self.send(request='result', reqId=reqId, callSync='off', opts=dict(result=result))
def replyError(self, reqId, *exc):
excStr = traceback.format_exception(*exc)
try:
self.send(request='error', reqId=reqId, callSync='off', opts=dict(exception=exc[1], excString=excStr))
except:
self.send(request='error', reqId=reqId, callSync='off', opts=dict(exception=None, excString=excStr))
def send(self, request, opts=None, reqId=None, callSync='sync', timeout=10, returnType=None, **kwds):
"""Send a request or return packet to the remote process.
Generally it is not necessary to call this method directly; it is for internal use.
(The docstring has information that is nevertheless useful to the programmer
as it describes the internal protocol used to communicate between processes)
========== ====================================================================
Arguments:
request String describing the type of request being sent (see below)
reqId Integer uniquely linking a result back to the request that generated
it. (most requests leave this blank)
callSync 'sync': return the actual result of the request
'async': return a Request object which can be used to look up the
result later
'off': return no result
timeout Time in seconds to wait for a response when callSync=='sync'
opts Extra arguments sent to the remote process that determine the way
the request will be handled (see below)
returnType 'proxy', 'value', or 'auto'
========== ====================================================================
Description of request strings and options allowed for each:
============= ============= ========================================================
request option description
------------- ------------- --------------------------------------------------------
getObjAttr Request the remote process return (proxy to) an
attribute of an object.
obj reference to object whose attribute should be
returned
attr string name of attribute to return
returnValue bool or 'auto' indicating whether to return a proxy or
the actual value.
callObj Request the remote process call a function or
method. If a request ID is given, then the call's
return value will be sent back (or information
about the error that occurred while running the
function)
obj the (reference to) object to call
args tuple of arguments to pass to callable
kwds dict of keyword arguments to pass to callable
returnValue bool or 'auto' indicating whether to return a proxy or
the actual value.
getObjValue Request the remote process return the value of
a proxied object (must be picklable)
obj reference to object whose value should be returned
transfer Copy an object to the remote process and request
it return a proxy for the new object.
obj The object to transfer.
import Request the remote process import new symbols
and return proxy(ies) to the imported objects
module the string name of the module to import
fromlist optional list of string names to import from module
del Inform the remote process that a proxy has been
released (thus the remote process may be able to
release the original object)
proxyId id of proxy which is no longer referenced by
remote host
close Instruct the remote process to stop its event loop
and exit. Optionally, this request may return a
confirmation.
result Inform the remote process that its request has
been processed
result return value of a request
error Inform the remote process that its request failed
exception the Exception that was raised (or None if the
exception could not be pickled)
excString string-formatted version of the exception and
traceback
============= =====================================================================
"""
#if len(kwds) > 0:
#print "Warning: send() ignored args:", kwds
if opts is None:
opts = {}
assert callSync in ['off', 'sync', 'async'], 'callSync must be one of "off", "sync", or "async"'
if reqId is None:
if callSync != 'off': ## requested return value; use the next available request ID
reqId = self.nextRequestId
self.nextRequestId += 1
else:
## If requestId is provided, this _must_ be a response to a previously received request.
assert request in ['result', 'error']
if returnType is not None:
opts['returnType'] = returnType
#print "send", opts
## double-pickle args to ensure that at least status and request ID get through
try:
optStr = pickle.dumps(opts)
except:
print "Error pickling:", opts
raise
request = (request, reqId, optStr)
self.conn.send(request)
if callSync == 'off':
return
req = Request(self, reqId, description=str(request), timeout=timeout)
if callSync == 'async':
return req
if callSync == 'sync':
try:
return req.result()
except NoResultError:
return req
def close(self, callSync='off', noCleanup=False, **kwds):
self.send(request='close', opts=dict(noCleanup=noCleanup), callSync=callSync, **kwds)
def getResult(self, reqId):
## raises NoResultError if the result is not available yet
#print self.results.keys(), os.getpid()
if reqId not in self.results:
#self.readPipe()
try:
self.processRequests()
except ExitError:
pass
if reqId not in self.results:
raise NoResultError()
status, result = self.results.pop(reqId)
if status == 'result':
return result
elif status == 'error':
#print ''.join(result)
exc, excStr = result
if exc is not None:
print "===== Remote process raised exception on request: ====="
print ''.join(excStr)
print "===== Local Traceback to request follows: ====="
raise exc
else:
print ''.join(excStr)
raise Exception("Error getting result. See above for exception from remote process.")
else:
raise Exception("Internal error.")
def _import(self, mod, **kwds):
"""
Request the remote process import a module (or symbols from a module)
and return the proxied results. Uses built-in __import__() function, but
adds a bit more processing:
_import('module') => returns module
_import('module.submodule') => returns submodule
(note this differs from behavior of __import__)
_import('module', fromlist=[name1, name2, ...]) => returns [module.name1, module.name2, ...]
(this also differs from behavior of __import__)
"""
return self.send(request='import', callSync='sync', opts=dict(module=mod), **kwds)
def getObjAttr(self, obj, attr, **kwds):
return self.send(request='getObjAttr', opts=dict(obj=obj, attr=attr), **kwds)
def getObjValue(self, obj, **kwds):
return self.send(request='getObjValue', opts=dict(obj=obj), **kwds)
def callObj(self, obj, args, kwds, **opts):
opts = opts.copy()
noProxyTypes = opts.pop('noProxyTypes', None)
if noProxyTypes is None:
noProxyTypes = self.proxyOptions['noProxyTypes']
autoProxy = opts.pop('autoProxy', self.proxyOptions['autoProxy'])
if autoProxy is True:
args = tuple([self.autoProxy(v, noProxyTypes) for v in args])
for k, v in kwds.iteritems():
opts[k] = self.autoProxy(v, noProxyTypes)
return self.send(request='callObj', opts=dict(obj=obj, args=args, kwds=kwds), **opts)
def registerProxy(self, proxy):
ref = weakref.ref(proxy, self.deleteProxy)
self.proxies[ref] = proxy._proxyId
def deleteProxy(self, ref):
proxyId = self.proxies.pop(ref)
try:
self.send(request='del', opts=dict(proxyId=proxyId), callSync='off')
except IOError: ## if remote process has closed down, there is no need to send delete requests anymore
pass
def transfer(self, obj, **kwds):
"""
Transfer an object to the remote host (the object must be picklable) and return
a proxy for the new remote object.
"""
return self.send(request='transfer', opts=dict(obj=obj), **kwds)
def autoProxy(self, obj, noProxyTypes):
## Return object wrapped in LocalObjectProxy _unless_ its type is in noProxyTypes.
for typ in noProxyTypes:
if isinstance(obj, typ):
return obj
return LocalObjectProxy(obj)
class Request:
## used internally for tracking asynchronous requests and returning results
def __init__(self, process, reqId, description=None, timeout=10):
self.proc = process
self.description = description
self.reqId = reqId
self.gotResult = False
self._result = None
self.timeout = timeout
def result(self, block=True, timeout=None):
"""Return the result for this request.
If block is True, wait until the result has arrived or *timeout* seconds passes.
If the timeout is reached, raise an exception. (use timeout=None to disable)
If block is False, raises an exception if the result has not arrived yet."""
if self.gotResult:
return self._result
if timeout is None:
timeout = self.timeout
if block:
start = time.time()
while not self.hasResult():
time.sleep(0.005)
if timeout >= 0 and time.time() - start > timeout:
print "Request timed out:", self.description
import traceback
traceback.print_stack()
raise NoResultError()
return self._result
else:
self._result = self.proc.getResult(self.reqId) ## raises NoResultError if result is not available yet
self.gotResult = True
return self._result
def hasResult(self):
"""Returns True if the result for this request has arrived."""
try:
#print "check result", self.description
self.result(block=False)
except NoResultError:
#print " -> not yet"
pass
return self.gotResult
class LocalObjectProxy(object):
"""Used for wrapping local objects to ensure that they are send by proxy to a remote host."""
nextProxyId = 0
proxiedObjects = {} ## maps {proxyId: object}
@classmethod
def registerObject(cls, obj):
## assign it a unique ID so we can keep a reference to the local object
pid = cls.nextProxyId
cls.nextProxyId += 1
cls.proxiedObjects[pid] = obj
#print "register:", cls.proxiedObjects
return pid
@classmethod
def lookupProxyId(cls, pid):
return cls.proxiedObjects[pid]
@classmethod
def releaseProxyId(cls, pid):
del cls.proxiedObjects[pid]
#print "release:", cls.proxiedObjects
def __init__(self, obj):
self.processId = os.getpid()
#self.objectId = id(obj)
self.typeStr = repr(obj)
#self.handler = handler
self.obj = obj
def __reduce__(self):
## a proxy is being pickled and sent to a remote process.
## every time this happens, a new proxy will be generated in the remote process,
## so we keep a new ID so we can track when each is released.
pid = LocalObjectProxy.registerObject(self.obj)
return (unpickleObjectProxy, (self.processId, pid, self.typeStr))
## alias
proxy = LocalObjectProxy
def unpickleObjectProxy(processId, proxyId, typeStr, attributes=None):
if processId == os.getpid():
obj = LocalObjectProxy.lookupProxyId(proxyId)
if attributes is not None:
for attr in attributes:
obj = getattr(obj, attr)
return obj
else:
return ObjectProxy(processId, proxyId=proxyId, typeStr=typeStr)
class ObjectProxy(object):
"""
Proxy to an object stored by the remote process. Proxies are created
by calling Process._import(), Process.transfer(), or by requesting/calling
attributes on existing proxy objects.
For the most part, this object can be used exactly as if it
were a local object.
"""
def __init__(self, processId, proxyId, typeStr='', parent=None):
object.__init__(self)
## can't set attributes directly because setattr is overridden.
self.__dict__['_processId'] = processId
self.__dict__['_typeStr'] = typeStr
self.__dict__['_proxyId'] = proxyId
self.__dict__['_attributes'] = ()
## attributes that affect the behavior of the proxy.
## in all cases, a value of None causes the proxy to ask
## its parent event handler to make the decision
self.__dict__['_proxyOptions'] = {
'callSync': None, ## 'sync', 'async', None
'timeout': None, ## float, None
'returnType': None, ## 'proxy', 'value', 'auto', None
'deferGetattr': None, ## True, False, None
'noProxyTypes': None, ## list of types to send by value instead of by proxy
}
self.__dict__['_handler'] = RemoteEventHandler.getHandler(processId)
self.__dict__['_handler'].registerProxy(self) ## handler will watch proxy; inform remote process when the proxy is deleted.
def _setProxyOptions(self, **kwds):
"""
Change the behavior of this proxy. For all options, a value of None
will cause the proxy to instead use the default behavior defined
by its parent Process.
Options are:
============= =============================================================
callSync 'sync', 'async', 'off', or None.
If 'async', then calling methods will return a Request object
which can be used to inquire later about the result of the
method call.
If 'sync', then calling a method
will block until the remote process has returned its result
or the timeout has elapsed (in this case, a Request object
is returned instead).
If 'off', then the remote process is instructed _not_ to
reply and the method call will return None immediately.
returnType 'auto', 'proxy', 'value', or None.
If 'proxy', then the value returned when calling a method
will be a proxy to the object on the remote process.
If 'value', then attempt to pickle the returned object and
send it back.
If 'auto', then the decision is made by consulting the
'noProxyTypes' option.
autoProxy bool or None. If True, arguments to __call__ are
automatically converted to proxy unless their type is
listed in noProxyTypes (see below). If False, arguments
are left untouched. Use proxy(obj) to manually convert
arguments before sending.
timeout float or None. Length of time to wait during synchronous
requests before returning a Request object instead.
deferGetattr True, False, or None.
If False, all attribute requests will be sent to the remote
process immediately and will block until a response is
received (or timeout has elapsed).
If True, requesting an attribute from the proxy returns a
new proxy immediately. The remote process is _not_ contacted
to make this request. This is faster, but it is possible to
request an attribute that does not exist on the proxied
object. In this case, AttributeError will not be raised
until an attempt is made to look up the attribute on the
remote process.
noProxyTypes List of object types that should _not_ be proxied when
sent to the remote process.
============= =============================================================
"""
self._proxyOptions.update(kwds)
def _getProxyOption(self, opt):
val = self._proxyOptions[opt]
if val is None:
return self._handler.getProxyOption(opt)
return val
def _getProxyOptions(self):
return {k: self._getProxyOption(k) for k in self._proxyOptions}
def __reduce__(self):
return (unpickleObjectProxy, (self._processId, self._proxyId, self._typeStr, self._attributes))
def __repr__(self):
#objRepr = self.__getattr__('__repr__')(callSync='value')
return "<ObjectProxy for process %d, object 0x%x: %s >" % (self._processId, self._proxyId, self._typeStr)
def __getattr__(self, attr):
#if '_processId' not in self.__dict__:
#raise Exception("ObjectProxy has no processId")
#proc = Process._processes[self._processId]
deferred = self._getProxyOption('deferGetattr')
if deferred is True:
return self._deferredAttr(attr)
else:
opts = self._getProxyOptions()
return self._handler.getObjAttr(self, attr, **opts)
def _deferredAttr(self, attr):
return DeferredObjectProxy(self, attr)
def __call__(self, *args, **kwds):
"""
Attempts to call the proxied object from the remote process.
Accepts extra keyword arguments:
_callSync 'off', 'sync', or 'async'
_returnType 'value', 'proxy', or 'auto'
"""
#opts = {}
#callSync = kwds.pop('_callSync', self.)
#if callSync is not None:
#opts['callSync'] = callSync
#returnType = kwds.pop('_returnType', self._defaultReturnValue)
#if returnType is not None:
#opts['returnType'] = returnType
opts = self._getProxyOptions()
for k in opts:
if '_'+k in kwds:
opts[k] = kwds.pop('_'+k)
#print "call", opts
return self._handler.callObj(obj=self, args=args, kwds=kwds, **opts)
def _getValue(self):
## this just gives us an easy way to change the behavior of the special methods
#proc = Process._processes[self._processId]
return self._handler.getObjValue(self)
## Explicitly proxy special methods. Is there a better way to do this??
def _getSpecialAttr(self, attr):
#return self.__getattr__(attr)
return self._deferredAttr(attr)
def __getitem__(self, *args):
return self._getSpecialAttr('__getitem__')(*args)
def __setitem__(self, *args):
return self._getSpecialAttr('__setitem__')(*args)
def __setattr__(self, *args):
return self._getSpecialAttr('__setattr__')(*args)
def __str__(self, *args):
return self._getSpecialAttr('__str__')(*args, _returnType=True)
def __len__(self, *args):
return self._getSpecialAttr('__len__')(*args)
def __add__(self, *args):
return self._getSpecialAttr('__add__')(*args)
def __sub__(self, *args):
return self._getSpecialAttr('__sub__')(*args)
def __div__(self, *args):
return self._getSpecialAttr('__div__')(*args)
def __mul__(self, *args):
return self._getSpecialAttr('__mul__')(*args)
def __pow__(self, *args):
return self._getSpecialAttr('__pow__')(*args)
def __rshift__(self, *args):
return self._getSpecialAttr('__rshift__')(*args)
def __lshift__(self, *args):
return self._getSpecialAttr('__lshift__')(*args)
def __floordiv__(self, *args):
return self._getSpecialAttr('__pow__')(*args)
def __eq__(self, *args):
return self._getSpecialAttr('__eq__')(*args)
def __ne__(self, *args):
return self._getSpecialAttr('__ne__')(*args)
def __lt__(self, *args):
return self._getSpecialAttr('__lt__')(*args)
def __gt__(self, *args):
return self._getSpecialAttr('__gt__')(*args)
def __le__(self, *args):
return self._getSpecialAttr('__le__')(*args)
def __ge__(self, *args):
return self._getSpecialAttr('__ge__')(*args)
def __and__(self, *args):
return self._getSpecialAttr('__and__')(*args)
def __or__(self, *args):
return self._getSpecialAttr('__or__')(*args)
def __xor__(self, *args):
return self._getSpecialAttr('__or__')(*args)
def __mod__(self, *args):
return self._getSpecialAttr('__mod__')(*args)
def __radd__(self, *args):
return self._getSpecialAttr('__radd__')(*args)
def __rsub__(self, *args):
return self._getSpecialAttr('__rsub__')(*args)
def __rdiv__(self, *args):
return self._getSpecialAttr('__rdiv__')(*args)
def __rmul__(self, *args):
return self._getSpecialAttr('__rmul__')(*args)
def __rpow__(self, *args):
return self._getSpecialAttr('__rpow__')(*args)
def __rrshift__(self, *args):
return self._getSpecialAttr('__rrshift__')(*args)
def __rlshift__(self, *args):
return self._getSpecialAttr('__rlshift__')(*args)
def __rfloordiv__(self, *args):
return self._getSpecialAttr('__rpow__')(*args)
def __rand__(self, *args):
return self._getSpecialAttr('__rand__')(*args)
def __ror__(self, *args):
return self._getSpecialAttr('__ror__')(*args)
def __rxor__(self, *args):
return self._getSpecialAttr('__ror__')(*args)
def __rmod__(self, *args):
return self._getSpecialAttr('__rmod__')(*args)
class DeferredObjectProxy(ObjectProxy):
def __init__(self, parentProxy, attribute):
## can't set attributes directly because setattr is overridden.
for k in ['_processId', '_typeStr', '_proxyId', '_handler']:
self.__dict__[k] = getattr(parentProxy, k)
self.__dict__['_parent'] = parentProxy ## make sure parent stays alive
self.__dict__['_attributes'] = parentProxy._attributes + (attribute,)
self.__dict__['_proxyOptions'] = parentProxy._proxyOptions.copy()
def __repr__(self):
return ObjectProxy.__repr__(self) + '.' + '.'.join(self._attributes)