diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 91bcf243e78e5b..cfb6be9a49048f 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -531,8 +531,7 @@ def connect(self): ''' Connect manager object to the server process ''' - Listener, Client = listener_client[self._serializer] - conn = Client(self._address, authkey=self._authkey) + conn = self._Client(self._address, authkey=self._authkey) dispatch(conn, None, 'dummy') self._state.value = State.STARTED @@ -786,8 +785,13 @@ def __init__(self, token, serializer, manager=None, self._token = token self._id = self._token.id self._manager = manager - self._serializer = serializer - self._Client = listener_client[serializer][1] + + if manager is not None: + self._serializer = manager._serializer + self._Client = manager._Client + else: + self._serializer = serializer + self._Client = listener_client[serializer][1] # Should be set to True only when a proxy object is being created # on the manager server; primary use case: nested proxy objects. @@ -989,7 +993,10 @@ def AutoProxy(token, serializer, manager=None, authkey=None, ''' Return an auto-proxy for `token` ''' - _Client = listener_client[serializer][1] + if manager is not None: + _Client = manager._Client + else: + _Client = listener_client[serializer][1] if exposed is None: conn = _Client(token.address, authkey=authkey) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index cc07062eee6f98..2f39f5b5ac3846 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3517,6 +3517,32 @@ def test_remote(self): # Make queue finalizer run before the server is stopped del queue + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + @support.skip_if_sanitizer('TSan: leaks threads', thread=True) + def test_client_propagation(self): + authkey = os.urandom(32) + + manager = QueueManager( + address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER, + shutdown_timeout=SHUTDOWN_TIMEOUT) + manager.start() + self.addCleanup(manager.shutdown) + + manager2 = QueueManager2( + address=manager.address, authkey=authkey, serializer=SERIALIZER, + shutdown_timeout=SHUTDOWN_TIMEOUT) + + def MyXmlClient(*args, **kwargs): + return manager._Client(*args, **kwargs) + + manager2._Client = MyXmlClient + manager2.connect() + queue = manager2.get_queue() + + self.assertIsNot(queue._Client, manager._Client) + self.assertIs(queue._Client, manager2._Client) + + del queue @hashlib_helper.requires_hashdigest('sha256') class _TestManagerRestart(BaseTestCase): diff --git a/Misc/NEWS.d/next/Library/2020-04-20-10-40-54.bpo-40307.b4YXXY.rst b/Misc/NEWS.d/next/Library/2020-04-20-10-40-54.bpo-40307.b4YXXY.rst new file mode 100644 index 00000000000000..a68f9434d03273 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-04-20-10-40-54.bpo-40307.b4YXXY.rst @@ -0,0 +1 @@ +:class:`multiprocessing.Manager` and its proxies now uses the client picked at the time the manager was created to guarantee consistent behaviour across the whole manager life span.