Skip to content
20 changes: 19 additions & 1 deletion Lib/multiprocessing/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ class Listener(object):
connections, or for a Windows named pipe.
'''
def __init__(self, address=None, family=None, backlog=1, authkey=None):

family = family or (address and address_type(address)) \
or default_family
address = address or arbitrary_address(family)
Expand All @@ -485,6 +486,12 @@ def __init__(self, address=None, family=None, backlog=1, authkey=None):

self._authkey = authkey

def settimeout(self, timeout):
'''
Set timeout for the accept method.
'''
self._listener.settimeout(timeout)

def accept(self):
'''
Accept a connection on the bound socket or named pipe of `self`.
Expand Down Expand Up @@ -639,6 +646,10 @@ def __init__(self, address, family, backlog=1):
else:
self._unlink = None

def settimeout(self, timeout):
if timeout:
self._socket.settimeout(timeout)

def accept(self):
s, self._last_accepted = self._socket.accept()
s.setblocking(True)
Expand Down Expand Up @@ -684,6 +695,7 @@ def __init__(self, address, backlog=None):
self, PipeListener._finalize_pipe_listener,
args=(self._handle_queue, self._address), exitpriority=0
)
self._timeout = _winapi.INFINITE

def _new_handle(self, first=False):
flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
Expand All @@ -697,6 +709,10 @@ def _new_handle(self, first=False):
_winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
)

def settimeout(self, timeout):
if timeout:
self._timeout = int(timeout * 1000)

def accept(self):
self._handle_queue.append(self._new_handle())
handle = self._handle_queue.pop(0)
Expand All @@ -710,7 +726,9 @@ def accept(self):
else:
try:
res = _winapi.WaitForMultipleObjects(
[ov.event], False, INFINITE)
[ov.event], False, self._timeout)
if res == _winapi.WAIT_TIMEOUT:
raise TimeoutError("PipeListener timed out")
except:
ov.cancel()
_winapi.CloseHandle(handle)
Expand Down
10 changes: 9 additions & 1 deletion Lib/multiprocessing/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,15 @@ def __init__(self, registry, address, authkey, serializer):
self.id_to_refcount = {}
self.id_to_local_proxy_obj = {}
self.mutex = threading.Lock()
self._handler_threads = []

def serve_forever(self):
'''
Run the server forever
'''
self.stop_event = threading.Event()
process.current_process()._manager_server = self
accepter = None
try:
accepter = threading.Thread(target=self.accepter)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move these 2 lines outside of the try: to replace your = None initialization.

accepter.daemon = True
Expand All @@ -185,15 +187,21 @@ def serve_forever(self):
util.debug('resetting stdout, stderr')
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
for t in self._handler_threads:
t.join()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what prevents the handle_request threads from blocking indefinitely?

accepter.join()
self.listener.close()
sys.exit(0)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fundamentally, the accepter and handle_request threads were all being set .daemon = True as they were never intended to be controlled and cleaned up. they'll all die as soon as the server process exits. attempting to join all of the threads is perhaps ideal but only works if they can be guaranteed not to be blocked on anything. Handler threads do blocking IO to a socket connection which is not guaranteed to have been closed because it could come from an uncooperative peer. without rewriting them to do non-blocking IO and check a stop_event themselves, joining them can't be reliable and could lead to anything making a connection to the socket being able to block cleanup.

This is likely why you see test failures for leftover env files with pymp- prefixed tmp directories or Windows \\pipe paths still laying around such as Warning -- test.test_concurrent_futures.test_process_pool leaked temporary files (1): pymp-cdk1jdmh with this change. Those are created (by Server for its self.listener via the Listener use of default_family and call to arbitrary_address(family)) to hold the socket that the manager process uses. If the manager process is blocked instead of exiting cleanly it may still exist.


def accepter(self):
while True:
while True and not self.stop_event.is_set():
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the leftover True and is a no-op and should be removed.

try:
self.listener.settimeout(0.5)
c = self.listener.accept()
except OSError:
continue
t = threading.Thread(target=self.handle_request, args=(c,))
self._handler_threads.append(t)
t.daemon = True
t.start()

Expand Down
4 changes: 2 additions & 2 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3399,7 +3399,7 @@ def test_mymanager(self):
# bpo-30356: BaseManager._finalize_manager() sends SIGTERM
# to the manager process if it takes longer than 1 second to stop,
# which happens on slow buildbots.
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
self.assertEqual(manager._process.exitcode, 0)

@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_mymanager_context(self):
Expand All @@ -3417,7 +3417,7 @@ def test_mymanager_context_prestarted(self):
manager.start()
with manager:
self.common(manager)
self.assertEqual(manager._process.exitcode, 0)
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))

def common(self, manager):
foo = manager.Foo()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixed thread leak in multiprocessing.Manager accepter thread by adding accepter.join() when before the serve_forever() method exits.
Fixed test_mymanager_context_prestarted test to expect either an exit code 0 or -signal.SIGTERM
Loading