From 72006fe05b5f5c595b611c1ab9cf038c8287276b Mon Sep 17 00:00:00 2001 From: Luke Campagnola <> Date: Mon, 18 Jun 2012 15:20:35 -0400 Subject: [PATCH] Added custom multiprocessing module: - allows starting new processes and controlling them remotely from the parent process - remote processes can run their own GUI, Qt signals can be connected between processes (in general this is not possible with the built-in multiprocessing module due to the use of fork() ). - Control works by a system of proxy-objects such that controlling a remote process looks almost exactly like working with local objects. - Uses sockets to communicate between processes (so in theory could be made to work over a network), but also includes a mode that uses fork() to allow fast parallelization. - Wicked-easy inline parallelization by adding only one line of code to break up work between processes (requires fork; sorry windows users) --- examples/multiprocess.py | 84 ++++ multiprocess/__init__.py | 22 + multiprocess/parallelizer.py | 176 ++++++++ multiprocess/processes.py | 208 ++++++++++ multiprocess/remoteproxy.py | 759 +++++++++++++++++++++++++++++++++++ 5 files changed, 1249 insertions(+) create mode 100644 examples/multiprocess.py create mode 100644 multiprocess/__init__.py create mode 100644 multiprocess/parallelizer.py create mode 100644 multiprocess/processes.py create mode 100644 multiprocess/remoteproxy.py diff --git a/examples/multiprocess.py b/examples/multiprocess.py new file mode 100644 index 00000000..72337d20 --- /dev/null +++ b/examples/multiprocess.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +import initExample ## Add path to library (just for examples; you do not need this) + +import numpy as np +import pyqtgraph.multiprocess as mp +from pyqtgraph.multiprocess.parallelizer import Parallelize #, Parallelizer +import time + +print "\n=================\nParallelize" +tasks = [1,2,4,8] +results = [None] * len(tasks) +size = 2000000 + +start = time.time() +with Parallelize(enumerate(tasks), results=results, workers=1) as tasker: + for i, x in tasker: + print i, x + tot = 0 + for j in xrange(size): + tot += j * x + results[i] = tot +print results +print "serial:", time.time() - start + +start = time.time() +with Parallelize(enumerate(tasks), results=results) as tasker: + for i, x in tasker: + print i, x + tot = 0 + for j in xrange(size): + tot += j * x + results[i] = tot +print results +print "parallel:", time.time() - start + + + + +print "\n=================\nStart Process" +proc = mp.Process() +import os +print "parent:", os.getpid(), "child:", proc.proc.pid +print "started" +rnp = proc._import('numpy') +arr = rnp.array([1,2,3,4]) +print repr(arr) +print str(arr) +print "return value:", repr(arr.mean(_returnType='value')) +print "return proxy:", repr(arr.mean(_returnType='proxy')) +print "return auto: ", repr(arr.mean(_returnType='auto')) +proc.join() +print "process finished" + + + +print "\n=================\nStart ForkedProcess" +proc = mp.ForkedProcess() +rnp = proc._import('numpy') +arr = rnp.array([1,2,3,4]) +print repr(arr) +print str(arr) +print repr(arr.mean()) +proc.join() +print "process finished" + + + + +import pyqtgraph as pg +from pyqtgraph.Qt import QtCore, QtGui +app = pg.QtGui.QApplication([]) + +print "\n=================\nStart QtProcess" +proc = mp.QtProcess() +d1 = proc.transfer(np.random.normal(size=1000)) +d2 = proc.transfer(np.random.normal(size=1000)) +rpg = proc._import('pyqtgraph') +plt = rpg.plot(d1+d2) + + +## Start Qt event loop unless running in interactive mode or using pyside. +#import sys +#if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'): + #QtGui.QApplication.instance().exec_() diff --git a/multiprocess/__init__.py b/multiprocess/__init__.py new file mode 100644 index 00000000..190c1b67 --- /dev/null +++ b/multiprocess/__init__.py @@ -0,0 +1,22 @@ +""" +Multiprocessing utility library +(parallelization done the way I like it) + +Luke Campagnola +2012.06.10 + +This library provides: + + - simple mechanism for starting a new python interpreter process that can be controlled from the original process + (this allows, for example, displaying and manipulating plots in a remote process + while the parent process is free to do other work) + - proxy system that allows objects hosted in the remote process to be used as if they were local + - Qt signal connection between processes + - very simple in-line parallelization (fork only; does not work on windows) for number-crunching + +TODO: + allow remote processes to serve as rendering engines that pass pixmaps back to the parent process for display + (RemoteGraphicsView class) +""" + +from processes import * diff --git a/multiprocess/parallelizer.py b/multiprocess/parallelizer.py new file mode 100644 index 00000000..19819cf2 --- /dev/null +++ b/multiprocess/parallelizer.py @@ -0,0 +1,176 @@ +import os, sys, time, multiprocessing +from processes import ForkedProcess +from remoteproxy import ExitError + +class Parallelize: + """ + Class for ultra-simple inline parallelization on multi-core CPUs + + Example:: + + ## Here is the serial (single-process) task: + + tasks = [1, 2, 4, 8] + results = [] + for task in tasks: + result = processTask(task) + results.append(result) + print results + + + ## Here is the parallelized version: + + tasks = [1, 2, 4, 8] + results = [] + with Parallelize(tasks, workers=4, results=results) as tasker: + for task in tasker: + result = processTask(task) + tasker.results.append(result) + print results + + + The only major caveat is that *result* in the example above must be picklable. + """ + + def __init__(self, tasks, workers=None, block=True, **kwds): + """ + Args: + tasks - list of objects to be processed (Parallelize will determine how to distribute the tasks) + workers - number of worker processes or None to use number of CPUs in the system + kwds - objects to be shared by proxy with child processes + """ + + self.block = block + if workers is None: + workers = multiprocessing.cpu_count() + if not hasattr(os, 'fork'): + workers = 1 + self.workers = workers + self.tasks = list(tasks) + self.kwds = kwds + + def __enter__(self): + self.proc = None + workers = self.workers + if workers == 1: + return Tasker(None, self.tasks, self.kwds) + + self.childs = [] + + ## break up tasks into one set per worker + chunks = [[] for i in xrange(workers)] + i = 0 + for i in range(len(self.tasks)): + chunks[i%workers].append(self.tasks[i]) + + ## fork and assign tasks to each worker + for i in range(workers): + proc = ForkedProcess(target=None, preProxy=self.kwds) + if not proc.isParent: + self.proc = proc + return Tasker(proc, chunks[i], proc.forkedProxies) + else: + self.childs.append(proc) + + ## process events from workers until all have exited. + activeChilds = self.childs[:] + while len(activeChilds) > 0: + for ch in activeChilds: + rem = [] + try: + ch.processRequests() + except ExitError: + rem.append(ch) + for ch in rem: + activeChilds.remove(ch) + time.sleep(0.1) + + return [] ## no tasks for parent process. + + def __exit__(self, *exc_info): + if exc_info[0] is not None: + sys.excepthook(*exc_info) + if self.proc is not None: + os._exit(0) + + def wait(self): + ## wait for all child processes to finish + pass + +class Tasker: + def __init__(self, proc, tasks, kwds): + self.proc = proc + self.tasks = tasks + for k, v in kwds.iteritems(): + setattr(self, k, v) + + def __iter__(self): + ## we could fix this up such that tasks are retrieved from the parent process one at a time.. + for task in self.tasks: + yield task + if self.proc is not None: + self.proc.close() + + + +#class Parallelizer: + #""" + #Use:: + + #p = Parallelizer() + #with p(4) as i: + #p.finish(do_work(i)) + #print p.results() + + #""" + #def __init__(self): + #pass + + #def __call__(self, n): + #self.replies = [] + #self.conn = None ## indicates this is the parent process + #return Session(self, n) + + #def finish(self, data): + #if self.conn is None: + #self.replies.append((self.i, data)) + #else: + ##print "send", self.i, data + #self.conn.send((self.i, data)) + #os._exit(0) + + #def result(self): + #print self.replies + +#class Session: + #def __init__(self, par, n): + #self.par = par + #self.n = n + + #def __enter__(self): + #self.childs = [] + #for i in range(1, self.n): + #c1, c2 = multiprocessing.Pipe() + #pid = os.fork() + #if pid == 0: ## child + #self.par.i = i + #self.par.conn = c2 + #self.childs = None + #c1.close() + #return i + #else: + #self.childs.append(c1) + #c2.close() + #self.par.i = 0 + #return 0 + + + + #def __exit__(self, *exc_info): + #if exc_info[0] is not None: + #sys.excepthook(*exc_info) + #if self.childs is not None: + #self.par.replies.extend([conn.recv() for conn in self.childs]) + #else: + #self.par.finish(None) + diff --git a/multiprocess/processes.py b/multiprocess/processes.py new file mode 100644 index 00000000..1b4f8106 --- /dev/null +++ b/multiprocess/processes.py @@ -0,0 +1,208 @@ +from remoteproxy import RemoteEventHandler, ExitError, NoResultError, LocalObjectProxy, ObjectProxy +import subprocess, atexit, os, sys, time, random, socket +import cPickle as pickle +import multiprocessing.connection + +class Process(RemoteEventHandler): + def __init__(self, name=None, target=None): + if target is None: + target = startEventLoop + if name is None: + name = str(self) + + ## random authentication key + authkey = ''.join([chr(random.getrandbits(7)) for i in range(20)]) + + ## Listen for connection from remote process (and find free port number) + port = 10000 + while True: + try: + l = multiprocessing.connection.Listener(('localhost', int(port)), authkey=authkey) + break + except socket.error as ex: + if ex.errno != 98: + raise + port += 1 + + ## start remote process, instruct it to run target function + self.proc = subprocess.Popen((sys.executable, __file__, 'remote'), stdin=subprocess.PIPE) + pickle.dump((name+'_child', port, authkey, target), self.proc.stdin) + self.proc.stdin.close() + + ## open connection for remote process + conn = l.accept() + RemoteEventHandler.__init__(self, conn, name+'_parent', pid=self.proc.pid) + + atexit.register(self.join) + + def join(self, timeout=10): + if self.proc.poll() is None: + self.close() + start = time.time() + while self.proc.poll() is None: + if timeout is not None and time.time() - start > timeout: + raise Exception('Timed out waiting for remote process to end.') + time.sleep(0.05) + + +def startEventLoop(name, port, authkey): + conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey) + global HANDLER + HANDLER = RemoteEventHandler(conn, name, os.getppid()) + while True: + try: + HANDLER.processRequests() # exception raised when the loop should exit + time.sleep(0.01) + except ExitError: + break + + +class ForkedProcess(RemoteEventHandler): + """ + ForkedProcess is a substitute for Process that uses os.fork() to generate a new process. + This is much faster than starting a completely new interpreter, but carries some caveats + and limitations: + - open file handles are shared with the parent process, which is potentially dangerous + - it is not possible to have a QApplication in both parent and child process + (unless both QApplications are created _after_ the call to fork()) + - generally not thread-safe. Also, threads are not copied by fork(); the new process + will have only one thread that starts wherever fork() was called in the parent process. + - forked processes are unceremoniously terminated when join() is called; they are not + given any opportunity to clean up. (This prevents them calling any cleanup code that + was only intended to be used by the parent process) + """ + + def __init__(self, name=None, target=0, preProxy=None): + """ + When initializing, an optional target may be given. + If no target is specified, self.eventLoop will be used. + If None is given, no target will be called (and it will be up + to the caller to properly shut down the forked process) + + preProxy may be a dict of values that will appear as ObjectProxy + in the remote process (but do not need to be sent explicitly since + they are available immediately before the call to fork(). + Proxies will be availabe as self.proxies[name]. + """ + self.hasJoined = False + if target == 0: + target = self.eventLoop + if name is None: + name = str(self) + + conn, remoteConn = multiprocessing.Pipe() + + proxyIDs = {} + if preProxy is not None: + for k, v in preProxy.iteritems(): + proxyId = LocalObjectProxy.registerObject(v) + proxyIDs[k] = proxyId + + pid = os.fork() + if pid == 0: + self.isParent = False + conn.close() + sys.stdin.close() ## otherwise we screw with interactive prompts. + RemoteEventHandler.__init__(self, remoteConn, name+'_child', pid=os.getppid()) + if target is not None: + target() + + ppid = os.getppid() + self.forkedProxies = {} + for name, proxyId in proxyIDs.iteritems(): + self.forkedProxies[name] = ObjectProxy(ppid, proxyId=proxyId, typeStr=repr(preProxy[name])) + else: + self.isParent = True + self.childPid = pid + remoteConn.close() + RemoteEventHandler.handlers = {} ## don't want to inherit any of this from the parent. + + RemoteEventHandler.__init__(self, conn, name+'_parent', pid=pid) + atexit.register(self.join) + + + def eventLoop(self): + while True: + try: + self.processRequests() # exception raised when the loop should exit + time.sleep(0.01) + except ExitError: + sys.exit(0) + except: + print "Error occurred in forked event loop:" + sys.excepthook(*sys.exc_info()) + + def join(self, timeout=10): + if self.hasJoined: + return + #os.kill(pid, 9) + try: + self.close(callSync='sync', timeout=timeout, noCleanup=True) ## ask the child process to exit and require that it return a confirmation. + except IOError: ## probably remote process has already quit + pass + self.hasJoined = True + + +##Special set of subclasses that implement a Qt event loop instead. + +class RemoteQtEventHandler(RemoteEventHandler): + def __init__(self, *args, **kwds): + RemoteEventHandler.__init__(self, *args, **kwds) + + def startEventTimer(self): + from pyqtgraph.Qt import QtGui, QtCore + self.timer = QtCore.QTimer() + self.timer.timeout.connect(self.processRequests) + self.timer.start(10) + + def processRequests(self): + try: + RemoteEventHandler.processRequests(self) + except ExitError: + from pyqtgraph.Qt import QtGui, QtCore + QtGui.QApplication.instance().quit() + self.timer.stop() + #raise + +class QtProcess(Process): + def __init__(self, name=None): + Process.__init__(self, name, target=startQtEventLoop) + self.startEventTimer() + + def startEventTimer(self): + from pyqtgraph.Qt import QtGui, QtCore ## avoid module-level import to keep bootstrap snappy. + self.timer = QtCore.QTimer() + app = QtGui.QApplication.instance() + if app is None: + raise Exception("Must create QApplication before starting QtProcess") + self.timer.timeout.connect(self.processRequests) + self.timer.start(10) + + def processRequests(self): + try: + Process.processRequests(self) + except ExitError: + self.timer.stop() + +def startQtEventLoop(name, port, authkey): + conn = multiprocessing.connection.Client(('localhost', int(port)), authkey=authkey) + from pyqtgraph.Qt import QtGui, QtCore + #from PyQt4 import QtGui, QtCore + app = QtGui.QApplication.instance() + #print app + if app is None: + app = QtGui.QApplication([]) + app.setQuitOnLastWindowClosed(False) ## generally we want the event loop to stay open + ## until it is explicitly closed by the parent process. + + global HANDLER + HANDLER = RemoteQtEventHandler(conn, name, os.getppid()) + HANDLER.startEventTimer() + app.exec_() + + +if __name__ == '__main__': + if len(sys.argv) == 2 and sys.argv[1] == 'remote': ## module has been invoked as script in new python interpreter. + name, port, authkey, target = pickle.load(sys.stdin) + target(name, port, authkey) + sys.exit(0) diff --git a/multiprocess/remoteproxy.py b/multiprocess/remoteproxy.py new file mode 100644 index 00000000..f70d88d7 --- /dev/null +++ b/multiprocess/remoteproxy.py @@ -0,0 +1,759 @@ +import os, __builtin__, time, sys, traceback, weakref +import cPickle as pickle + +class ExitError(Exception): + pass + +class NoResultError(Exception): + pass + + +class RemoteEventHandler(object): + + handlers = {} ## maps {process ID : handler}. This allows unpickler to determine which process + ## an object proxy belongs to + + def __init__(self, connection, name, pid): + self.conn = connection + self.name = name + self.results = {} ## reqId: (status, result); cache of request results received from the remote process + ## status is either 'result' or 'error' + ## if 'error', then result will be (exception, formatted exceprion) + ## where exception may be None if it could not be passed through the Connection. + + self.proxies = {} ## maps {weakref(proxy): proxyId}; used to inform the remote process when a proxy has been deleted. + + ## attributes that affect the behavior of the proxy. + ## See ObjectProxy._setProxyOptions for description + self.proxyOptions = { + 'callSync': 'sync', ## 'sync', 'async', 'off' + 'timeout': 10, ## float + 'returnType': 'auto', ## 'proxy', 'value', 'auto' + 'autoProxy': False, ## bool + 'deferGetattr': False, ## True, False + 'noProxyTypes': [ type(None), str, int, float, tuple, list, dict, LocalObjectProxy, ObjectProxy ], + } + + self.nextRequestId = 0 + self.exited = False + + RemoteEventHandler.handlers[pid] = self ## register this handler as the one communicating with pid + + @classmethod + def getHandler(cls, pid): + return cls.handlers[pid] + + def getProxyOption(self, opt): + return self.proxyOptions[opt] + + def setProxyOptions(self, **kwds): + """ + Set the default behavior options for object proxies. + See ObjectProxy._setProxyOptions for more info. + """ + self.proxyOptions.update(kwds) + + def processRequests(self): + """Process all pending requests from the pipe, return + after no more events are immediately available. (non-blocking)""" + if self.exited: + raise ExitError() + + while self.conn.poll(): + try: + self.handleRequest() + except ExitError: + self.exited = True + raise + except: + print "Error in process %s" % self.name + sys.excepthook(*sys.exc_info()) + + def handleRequest(self): + """Handle a single request from the remote process. + Blocks until a request is available.""" + result = None + try: + cmd, reqId, optStr = self.conn.recv() ## args, kwds are double-pickled to ensure this recv() call never fails + except EOFError: + ## remote process has shut down; end event loop + raise ExitError() + except IOError: + raise ExitError() + #print os.getpid(), "received request:", cmd, reqId + + + try: + if cmd == 'result' or cmd == 'error': + resultId = reqId + reqId = None ## prevents attempt to return information from this request + ## (this is already a return from a previous request) + + opts = pickle.loads(optStr) + #print os.getpid(), "received request:", cmd, reqId, opts + returnType = opts.get('returnType', 'auto') + + if cmd == 'result': + self.results[resultId] = ('result', opts['result']) + elif cmd == 'error': + self.results[resultId] = ('error', (opts['exception'], opts['excString'])) + elif cmd == 'getObjAttr': + result = getattr(opts['obj'], opts['attr']) + elif cmd == 'callObj': + obj = opts['obj'] + fnargs = opts['args'] + fnkwds = opts['kwds'] + if len(fnkwds) == 0: ## need to do this because some functions do not allow keyword arguments. + #print obj, fnargs + result = obj(*fnargs) + else: + result = obj(*fnargs, **fnkwds) + elif cmd == 'getObjValue': + result = opts['obj'] ## has already been unpickled into its local value + returnType = 'value' + elif cmd == 'transfer': + result = opts['obj'] + returnType = 'proxy' + elif cmd == 'import': + name = opts['module'] + fromlist = opts.get('fromlist', []) + mod = __builtin__.__import__(name, fromlist=fromlist) + + if len(fromlist) == 0: + parts = name.lstrip('.').split('.') + result = mod + for part in parts[1:]: + result = getattr(result, part) + else: + result = map(mod.__getattr__, fromlist) + + elif cmd == 'del': + LocalObjectProxy.releaseProxyId(opts['proxyId']) + #del self.proxiedObjects[opts['objId']] + + elif cmd == 'close': + if reqId is not None: + result = True + returnType = 'value' + + exc = None + except: + exc = sys.exc_info() + + + + if reqId is not None: + if exc is None: + #print "returnValue:", returnValue, result + if returnType == 'auto': + result = self.autoProxy(result, self.proxyOptions['noProxyTypes']) + elif returnType == 'proxy': + result = LocalObjectProxy(result) + + try: + self.replyResult(reqId, result) + except: + sys.excepthook(*sys.exc_info()) + self.replyError(reqId, *sys.exc_info()) + else: + self.replyError(reqId, *exc) + + elif exc is not None: + sys.excepthook(*exc) + + if cmd == 'close': + if opts.get('noCleanup', False) is True: + os._exit(0) ## exit immediately, do not pass GO, do not collect $200. + ## (more importantly, do not call any code that would + ## normally be invoked at exit) + else: + raise ExitError() + + + + def replyResult(self, reqId, result): + self.send(request='result', reqId=reqId, callSync='off', opts=dict(result=result)) + + def replyError(self, reqId, *exc): + excStr = traceback.format_exception(*exc) + try: + self.send(request='error', reqId=reqId, callSync='off', opts=dict(exception=exc[1], excString=excStr)) + except: + self.send(request='error', reqId=reqId, callSync='off', opts=dict(exception=None, excString=excStr)) + + def send(self, request, opts=None, reqId=None, callSync='sync', timeout=10, returnType=None, **kwds): + """Send a request or return packet to the remote process. + Generally it is not necessary to call this method directly; it is for internal use. + (The docstring has information that is nevertheless useful to the programmer + as it describes the internal protocol used to communicate between processes) + + ========== ==================================================================== + Arguments: + request String describing the type of request being sent (see below) + reqId Integer uniquely linking a result back to the request that generated + it. (most requests leave this blank) + callSync 'sync': return the actual result of the request + 'async': return a Request object which can be used to look up the + result later + 'off': return no result + timeout Time in seconds to wait for a response when callSync=='sync' + opts Extra arguments sent to the remote process that determine the way + the request will be handled (see below) + returnType 'proxy', 'value', or 'auto' + ========== ==================================================================== + + Description of request strings and options allowed for each: + + ============= ============= ======================================================== + request option description + ------------- ------------- -------------------------------------------------------- + getObjAttr Request the remote process return (proxy to) an + attribute of an object. + obj reference to object whose attribute should be + returned + attr string name of attribute to return + returnValue bool or 'auto' indicating whether to return a proxy or + the actual value. + + callObj Request the remote process call a function or + method. If a request ID is given, then the call's + return value will be sent back (or information + about the error that occurred while running the + function) + obj the (reference to) object to call + args tuple of arguments to pass to callable + kwds dict of keyword arguments to pass to callable + returnValue bool or 'auto' indicating whether to return a proxy or + the actual value. + + getObjValue Request the remote process return the value of + a proxied object (must be picklable) + obj reference to object whose value should be returned + + transfer Copy an object to the remote process and request + it return a proxy for the new object. + obj The object to transfer. + + import Request the remote process import new symbols + and return proxy(ies) to the imported objects + module the string name of the module to import + fromlist optional list of string names to import from module + + del Inform the remote process that a proxy has been + released (thus the remote process may be able to + release the original object) + proxyId id of proxy which is no longer referenced by + remote host + + close Instruct the remote process to stop its event loop + and exit. Optionally, this request may return a + confirmation. + + result Inform the remote process that its request has + been processed + result return value of a request + + error Inform the remote process that its request failed + exception the Exception that was raised (or None if the + exception could not be pickled) + excString string-formatted version of the exception and + traceback + ============= ===================================================================== + """ + #if len(kwds) > 0: + #print "Warning: send() ignored args:", kwds + + if opts is None: + opts = {} + + assert callSync in ['off', 'sync', 'async'], 'callSync must be one of "off", "sync", or "async"' + if reqId is None: + if callSync != 'off': ## requested return value; use the next available request ID + reqId = self.nextRequestId + self.nextRequestId += 1 + else: + ## If requestId is provided, this _must_ be a response to a previously received request. + assert request in ['result', 'error'] + + if returnType is not None: + opts['returnType'] = returnType + #print "send", opts + ## double-pickle args to ensure that at least status and request ID get through + try: + optStr = pickle.dumps(opts) + except: + print "Error pickling:", opts + raise + + request = (request, reqId, optStr) + self.conn.send(request) + + if callSync == 'off': + return + + req = Request(self, reqId, description=str(request), timeout=timeout) + if callSync == 'async': + return req + + if callSync == 'sync': + try: + return req.result() + except NoResultError: + return req + + def close(self, callSync='off', noCleanup=False, **kwds): + self.send(request='close', opts=dict(noCleanup=noCleanup), callSync=callSync, **kwds) + + def getResult(self, reqId): + ## raises NoResultError if the result is not available yet + #print self.results.keys(), os.getpid() + if reqId not in self.results: + #self.readPipe() + try: + self.processRequests() + except ExitError: + pass + if reqId not in self.results: + raise NoResultError() + status, result = self.results.pop(reqId) + if status == 'result': + return result + elif status == 'error': + #print ''.join(result) + exc, excStr = result + if exc is not None: + print "===== Remote process raised exception on request: =====" + print ''.join(excStr) + print "===== Local Traceback to request follows: =====" + raise exc + else: + print ''.join(excStr) + raise Exception("Error getting result. See above for exception from remote process.") + + else: + raise Exception("Internal error.") + + def _import(self, mod, **kwds): + """ + Request the remote process import a module (or symbols from a module) + and return the proxied results. Uses built-in __import__() function, but + adds a bit more processing: + + _import('module') => returns module + _import('module.submodule') => returns submodule + (note this differs from behavior of __import__) + _import('module', fromlist=[name1, name2, ...]) => returns [module.name1, module.name2, ...] + (this also differs from behavior of __import__) + + """ + return self.send(request='import', callSync='sync', opts=dict(module=mod), **kwds) + + def getObjAttr(self, obj, attr, **kwds): + return self.send(request='getObjAttr', opts=dict(obj=obj, attr=attr), **kwds) + + def getObjValue(self, obj, **kwds): + return self.send(request='getObjValue', opts=dict(obj=obj), **kwds) + + def callObj(self, obj, args, kwds, **opts): + opts = opts.copy() + noProxyTypes = opts.pop('noProxyTypes', None) + if noProxyTypes is None: + noProxyTypes = self.proxyOptions['noProxyTypes'] + autoProxy = opts.pop('autoProxy', self.proxyOptions['autoProxy']) + + if autoProxy is True: + args = tuple([self.autoProxy(v, noProxyTypes) for v in args]) + for k, v in kwds.iteritems(): + opts[k] = self.autoProxy(v, noProxyTypes) + + return self.send(request='callObj', opts=dict(obj=obj, args=args, kwds=kwds), **opts) + + def registerProxy(self, proxy): + ref = weakref.ref(proxy, self.deleteProxy) + self.proxies[ref] = proxy._proxyId + + def deleteProxy(self, ref): + proxyId = self.proxies.pop(ref) + try: + self.send(request='del', opts=dict(proxyId=proxyId), callSync='off') + except IOError: ## if remote process has closed down, there is no need to send delete requests anymore + pass + + def transfer(self, obj, **kwds): + """ + Transfer an object to the remote host (the object must be picklable) and return + a proxy for the new remote object. + """ + return self.send(request='transfer', opts=dict(obj=obj), **kwds) + + def autoProxy(self, obj, noProxyTypes): + ## Return object wrapped in LocalObjectProxy _unless_ its type is in noProxyTypes. + for typ in noProxyTypes: + if isinstance(obj, typ): + return obj + return LocalObjectProxy(obj) + + +class Request: + ## used internally for tracking asynchronous requests and returning results + def __init__(self, process, reqId, description=None, timeout=10): + self.proc = process + self.description = description + self.reqId = reqId + self.gotResult = False + self._result = None + self.timeout = timeout + + def result(self, block=True, timeout=None): + """Return the result for this request. + If block is True, wait until the result has arrived or *timeout* seconds passes. + If the timeout is reached, raise an exception. (use timeout=None to disable) + If block is False, raises an exception if the result has not arrived yet.""" + + if self.gotResult: + return self._result + + if timeout is None: + timeout = self.timeout + + if block: + start = time.time() + while not self.hasResult(): + time.sleep(0.005) + if timeout >= 0 and time.time() - start > timeout: + print "Request timed out:", self.description + import traceback + traceback.print_stack() + raise NoResultError() + return self._result + else: + self._result = self.proc.getResult(self.reqId) ## raises NoResultError if result is not available yet + self.gotResult = True + return self._result + + def hasResult(self): + """Returns True if the result for this request has arrived.""" + try: + #print "check result", self.description + self.result(block=False) + except NoResultError: + #print " -> not yet" + pass + + return self.gotResult + +class LocalObjectProxy(object): + """Used for wrapping local objects to ensure that they are send by proxy to a remote host.""" + nextProxyId = 0 + proxiedObjects = {} ## maps {proxyId: object} + + + @classmethod + def registerObject(cls, obj): + ## assign it a unique ID so we can keep a reference to the local object + + pid = cls.nextProxyId + cls.nextProxyId += 1 + cls.proxiedObjects[pid] = obj + #print "register:", cls.proxiedObjects + return pid + + @classmethod + def lookupProxyId(cls, pid): + return cls.proxiedObjects[pid] + + @classmethod + def releaseProxyId(cls, pid): + del cls.proxiedObjects[pid] + #print "release:", cls.proxiedObjects + + def __init__(self, obj): + self.processId = os.getpid() + #self.objectId = id(obj) + self.typeStr = repr(obj) + #self.handler = handler + self.obj = obj + + def __reduce__(self): + ## a proxy is being pickled and sent to a remote process. + ## every time this happens, a new proxy will be generated in the remote process, + ## so we keep a new ID so we can track when each is released. + pid = LocalObjectProxy.registerObject(self.obj) + return (unpickleObjectProxy, (self.processId, pid, self.typeStr)) + +## alias +proxy = LocalObjectProxy + +def unpickleObjectProxy(processId, proxyId, typeStr, attributes=None): + if processId == os.getpid(): + obj = LocalObjectProxy.lookupProxyId(proxyId) + if attributes is not None: + for attr in attributes: + obj = getattr(obj, attr) + return obj + else: + return ObjectProxy(processId, proxyId=proxyId, typeStr=typeStr) + +class ObjectProxy(object): + """ + Proxy to an object stored by the remote process. Proxies are created + by calling Process._import(), Process.transfer(), or by requesting/calling + attributes on existing proxy objects. + + For the most part, this object can be used exactly as if it + were a local object. + """ + def __init__(self, processId, proxyId, typeStr='', parent=None): + object.__init__(self) + ## can't set attributes directly because setattr is overridden. + self.__dict__['_processId'] = processId + self.__dict__['_typeStr'] = typeStr + self.__dict__['_proxyId'] = proxyId + self.__dict__['_attributes'] = () + ## attributes that affect the behavior of the proxy. + ## in all cases, a value of None causes the proxy to ask + ## its parent event handler to make the decision + self.__dict__['_proxyOptions'] = { + 'callSync': None, ## 'sync', 'async', None + 'timeout': None, ## float, None + 'returnType': None, ## 'proxy', 'value', 'auto', None + 'deferGetattr': None, ## True, False, None + 'noProxyTypes': None, ## list of types to send by value instead of by proxy + } + + self.__dict__['_handler'] = RemoteEventHandler.getHandler(processId) + self.__dict__['_handler'].registerProxy(self) ## handler will watch proxy; inform remote process when the proxy is deleted. + + def _setProxyOptions(self, **kwds): + """ + Change the behavior of this proxy. For all options, a value of None + will cause the proxy to instead use the default behavior defined + by its parent Process. + + Options are: + + ============= ============================================================= + callSync 'sync', 'async', 'off', or None. + If 'async', then calling methods will return a Request object + which can be used to inquire later about the result of the + method call. + If 'sync', then calling a method + will block until the remote process has returned its result + or the timeout has elapsed (in this case, a Request object + is returned instead). + If 'off', then the remote process is instructed _not_ to + reply and the method call will return None immediately. + returnType 'auto', 'proxy', 'value', or None. + If 'proxy', then the value returned when calling a method + will be a proxy to the object on the remote process. + If 'value', then attempt to pickle the returned object and + send it back. + If 'auto', then the decision is made by consulting the + 'noProxyTypes' option. + autoProxy bool or None. If True, arguments to __call__ are + automatically converted to proxy unless their type is + listed in noProxyTypes (see below). If False, arguments + are left untouched. Use proxy(obj) to manually convert + arguments before sending. + timeout float or None. Length of time to wait during synchronous + requests before returning a Request object instead. + deferGetattr True, False, or None. + If False, all attribute requests will be sent to the remote + process immediately and will block until a response is + received (or timeout has elapsed). + If True, requesting an attribute from the proxy returns a + new proxy immediately. The remote process is _not_ contacted + to make this request. This is faster, but it is possible to + request an attribute that does not exist on the proxied + object. In this case, AttributeError will not be raised + until an attempt is made to look up the attribute on the + remote process. + noProxyTypes List of object types that should _not_ be proxied when + sent to the remote process. + ============= ============================================================= + """ + self._proxyOptions.update(kwds) + + def _getProxyOption(self, opt): + val = self._proxyOptions[opt] + if val is None: + return self._handler.getProxyOption(opt) + return val + + def _getProxyOptions(self): + return {k: self._getProxyOption(k) for k in self._proxyOptions} + + def __reduce__(self): + return (unpickleObjectProxy, (self._processId, self._proxyId, self._typeStr, self._attributes)) + + def __repr__(self): + #objRepr = self.__getattr__('__repr__')(callSync='value') + return "" % (self._processId, self._proxyId, self._typeStr) + + + def __getattr__(self, attr): + #if '_processId' not in self.__dict__: + #raise Exception("ObjectProxy has no processId") + #proc = Process._processes[self._processId] + deferred = self._getProxyOption('deferGetattr') + if deferred is True: + return self._deferredAttr(attr) + else: + opts = self._getProxyOptions() + return self._handler.getObjAttr(self, attr, **opts) + + def _deferredAttr(self, attr): + return DeferredObjectProxy(self, attr) + + def __call__(self, *args, **kwds): + """ + Attempts to call the proxied object from the remote process. + Accepts extra keyword arguments: + + _callSync 'off', 'sync', or 'async' + _returnType 'value', 'proxy', or 'auto' + + """ + #opts = {} + #callSync = kwds.pop('_callSync', self.) + #if callSync is not None: + #opts['callSync'] = callSync + #returnType = kwds.pop('_returnType', self._defaultReturnValue) + #if returnType is not None: + #opts['returnType'] = returnType + opts = self._getProxyOptions() + for k in opts: + if '_'+k in kwds: + opts[k] = kwds.pop('_'+k) + #print "call", opts + return self._handler.callObj(obj=self, args=args, kwds=kwds, **opts) + + def _getValue(self): + ## this just gives us an easy way to change the behavior of the special methods + #proc = Process._processes[self._processId] + return self._handler.getObjValue(self) + + + ## Explicitly proxy special methods. Is there a better way to do this?? + + def _getSpecialAttr(self, attr): + #return self.__getattr__(attr) + return self._deferredAttr(attr) + + def __getitem__(self, *args): + return self._getSpecialAttr('__getitem__')(*args) + + def __setitem__(self, *args): + return self._getSpecialAttr('__setitem__')(*args) + + def __setattr__(self, *args): + return self._getSpecialAttr('__setattr__')(*args) + + def __str__(self, *args): + return self._getSpecialAttr('__str__')(*args, _returnType=True) + + def __len__(self, *args): + return self._getSpecialAttr('__len__')(*args) + + def __add__(self, *args): + return self._getSpecialAttr('__add__')(*args) + + def __sub__(self, *args): + return self._getSpecialAttr('__sub__')(*args) + + def __div__(self, *args): + return self._getSpecialAttr('__div__')(*args) + + def __mul__(self, *args): + return self._getSpecialAttr('__mul__')(*args) + + def __pow__(self, *args): + return self._getSpecialAttr('__pow__')(*args) + + def __rshift__(self, *args): + return self._getSpecialAttr('__rshift__')(*args) + + def __lshift__(self, *args): + return self._getSpecialAttr('__lshift__')(*args) + + def __floordiv__(self, *args): + return self._getSpecialAttr('__pow__')(*args) + + def __eq__(self, *args): + return self._getSpecialAttr('__eq__')(*args) + + def __ne__(self, *args): + return self._getSpecialAttr('__ne__')(*args) + + def __lt__(self, *args): + return self._getSpecialAttr('__lt__')(*args) + + def __gt__(self, *args): + return self._getSpecialAttr('__gt__')(*args) + + def __le__(self, *args): + return self._getSpecialAttr('__le__')(*args) + + def __ge__(self, *args): + return self._getSpecialAttr('__ge__')(*args) + + def __and__(self, *args): + return self._getSpecialAttr('__and__')(*args) + + def __or__(self, *args): + return self._getSpecialAttr('__or__')(*args) + + def __xor__(self, *args): + return self._getSpecialAttr('__or__')(*args) + + def __mod__(self, *args): + return self._getSpecialAttr('__mod__')(*args) + + def __radd__(self, *args): + return self._getSpecialAttr('__radd__')(*args) + + def __rsub__(self, *args): + return self._getSpecialAttr('__rsub__')(*args) + + def __rdiv__(self, *args): + return self._getSpecialAttr('__rdiv__')(*args) + + def __rmul__(self, *args): + return self._getSpecialAttr('__rmul__')(*args) + + def __rpow__(self, *args): + return self._getSpecialAttr('__rpow__')(*args) + + def __rrshift__(self, *args): + return self._getSpecialAttr('__rrshift__')(*args) + + def __rlshift__(self, *args): + return self._getSpecialAttr('__rlshift__')(*args) + + def __rfloordiv__(self, *args): + return self._getSpecialAttr('__rpow__')(*args) + + def __rand__(self, *args): + return self._getSpecialAttr('__rand__')(*args) + + def __ror__(self, *args): + return self._getSpecialAttr('__ror__')(*args) + + def __rxor__(self, *args): + return self._getSpecialAttr('__ror__')(*args) + + def __rmod__(self, *args): + return self._getSpecialAttr('__rmod__')(*args) + +class DeferredObjectProxy(ObjectProxy): + def __init__(self, parentProxy, attribute): + ## can't set attributes directly because setattr is overridden. + for k in ['_processId', '_typeStr', '_proxyId', '_handler']: + self.__dict__[k] = getattr(parentProxy, k) + self.__dict__['_parent'] = parentProxy ## make sure parent stays alive + self.__dict__['_attributes'] = parentProxy._attributes + (attribute,) + self.__dict__['_proxyOptions'] = parentProxy._proxyOptions.copy() + + def __repr__(self): + return ObjectProxy.__repr__(self) + '.' + '.'.join(self._attributes) + \ No newline at end of file