Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions Lib/multiprocessing/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,7 @@ def connect(self):
'''
Connect manager object to the server process
'''
Listener, Client = listener_client[self._serializer]
conn = Client(self._address, authkey=self._authkey)
conn = self._Client(self._address, authkey=self._authkey)
dispatch(conn, None, 'dummy')
self._state.value = State.STARTED

Expand Down Expand Up @@ -786,8 +785,13 @@ def __init__(self, token, serializer, manager=None,
self._token = token
self._id = self._token.id
self._manager = manager
self._serializer = serializer
self._Client = listener_client[serializer][1]

if manager is not None:
self._serializer = manager._serializer
self._Client = manager._Client
else:
self._serializer = serializer
self._Client = listener_client[serializer][1]

# Should be set to True only when a proxy object is being created
# on the manager server; primary use case: nested proxy objects.
Expand Down Expand Up @@ -989,7 +993,10 @@ def AutoProxy(token, serializer, manager=None, authkey=None,
'''
Return an auto-proxy for `token`
'''
_Client = listener_client[serializer][1]
if manager is not None:
_Client = manager._Client
else:
_Client = listener_client[serializer][1]

if exposed is None:
conn = _Client(token.address, authkey=authkey)
Expand Down
26 changes: 26 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3517,6 +3517,32 @@ def test_remote(self):
# Make queue finalizer run before the server is stopped
del queue

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.skip_if_sanitizer('TSan: leaks threads', thread=True)
def test_client_propagation(self):
authkey = os.urandom(32)

manager = QueueManager(
address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start()
self.addCleanup(manager.shutdown)

manager2 = QueueManager2(
address=manager.address, authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)

def MyXmlClient(*args, **kwargs):
return manager._Client(*args, **kwargs)

manager2._Client = MyXmlClient
manager2.connect()
queue = manager2.get_queue()

self.assertIsNot(queue._Client, manager._Client)
self.assertIs(queue._Client, manager2._Client)

del queue

@hashlib_helper.requires_hashdigest('sha256')
class _TestManagerRestart(BaseTestCase):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:class:`multiprocessing.Manager` and its proxies now uses the client picked at the time the manager was created to guarantee consistent behaviour across the whole manager life span.
Loading