From b9e31d79fd08ef3caea4d339a5e0df5db42cc0f5 Mon Sep 17 00:00:00 2001 From: "J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F" Date: Tue, 4 May 2021 10:29:51 +0200 Subject: [PATCH] Added Monkeypatch to make multiprocessing work with a list of queues in the manager --- lasp/lasp_multiprocessingpatch.py | 57 +++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 lasp/lasp_multiprocessingpatch.py diff --git a/lasp/lasp_multiprocessingpatch.py b/lasp/lasp_multiprocessingpatch.py new file mode 100644 index 0000000..fcd8e79 --- /dev/null +++ b/lasp/lasp_multiprocessingpatch.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +""" +Author: J.A. de Jong + +Description: MonkeyPatch required to let the Multiprocessing library work properly. +Should be applied prior to running any other multiprocessing code. Comes from +Stackoverflow and is mainly used for managing a list of queues that can be +shared between processes. + +For more information, see: +https://stackoverflow.com/questions/46779860/multiprocessing-managers-and-custom-classes +""" +from multiprocessing import managers +import logging +from functools import wraps +from inspect import signature + +orig_AutoProxy = managers.AutoProxy + +__all__ = ['apply_patch'] + + +@wraps(managers.AutoProxy) +def AutoProxy(*args, incref=True, manager_owned=False, **kwargs): + # Create the autoproxy without the manager_owned flag, then + # update the flag on the generated instance. If the manager_owned flag + # is set, `incref` is disabled, so set it to False here for the same + # result. + autoproxy_incref = False if manager_owned else incref + proxy = orig_AutoProxy(*args, incref=autoproxy_incref, **kwargs) + proxy._owned_by_manager = manager_owned + return proxy + + +def apply_patch(): + if "manager_owned" in signature(managers.AutoProxy).parameters: + return + + logging.debug("Patching multiprocessing.managers.AutoProxy to add manager_owned") + managers.AutoProxy = AutoProxy + + # re-register any types already registered to SyncManager without a custom + # proxy type, as otherwise these would all be using the old unpatched AutoProxy + SyncManager = managers.SyncManager + registry = managers.SyncManager._registry + for typeid, (callable, exposed, method_to_typeid, proxytype) in registry.items(): + if proxytype is not orig_AutoProxy: + continue + create_method = hasattr(managers.SyncManager, typeid) + SyncManager.register( + typeid, + callable=callable, + exposed=exposed, + method_to_typeid=method_to_typeid, + create_method=create_method, + ) +