diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index fc00d2861260a8..dcd79781b400e4 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -470,6 +470,7 @@ class Listener(object): connections, or for a Windows named pipe. ''' def __init__(self, address=None, family=None, backlog=1, authkey=None): + family = family or (address and address_type(address)) \ or default_family address = address or arbitrary_address(family) @@ -485,6 +486,12 @@ def __init__(self, address=None, family=None, backlog=1, authkey=None): self._authkey = authkey + def settimeout(self, timeout): + ''' + Set timeout for the accept method. + ''' + self._listener.settimeout(timeout) + def accept(self): ''' Accept a connection on the bound socket or named pipe of `self`. @@ -639,6 +646,10 @@ def __init__(self, address, family, backlog=1): else: self._unlink = None + def settimeout(self, timeout): + if timeout: + self._socket.settimeout(timeout) + def accept(self): s, self._last_accepted = self._socket.accept() s.setblocking(True) @@ -684,6 +695,7 @@ def __init__(self, address, backlog=None): self, PipeListener._finalize_pipe_listener, args=(self._handle_queue, self._address), exitpriority=0 ) + self._timeout = _winapi.INFINITE def _new_handle(self, first=False): flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED @@ -697,6 +709,10 @@ def _new_handle(self, first=False): _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL ) + def settimeout(self, timeout): + if timeout: + self._timeout = int(timeout * 1000) + def accept(self): self._handle_queue.append(self._new_handle()) handle = self._handle_queue.pop(0) @@ -710,7 +726,9 @@ def accept(self): else: try: res = _winapi.WaitForMultipleObjects( - [ov.event], False, INFINITE) + [ov.event], False, self._timeout) + if res == _winapi.WAIT_TIMEOUT: + raise TimeoutError("PipeListener timed out") except: ov.cancel() _winapi.CloseHandle(handle) diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 91bcf243e78e5b..c53859feca8bfc 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -164,6 +164,7 @@ def __init__(self, registry, address, authkey, serializer): self.id_to_refcount = {} self.id_to_local_proxy_obj = {} self.mutex = threading.Lock() + self._handler_threads = [] def serve_forever(self): ''' @@ -171,6 +172,7 @@ def serve_forever(self): ''' self.stop_event = threading.Event() process.current_process()._manager_server = self + accepter = None try: accepter = threading.Thread(target=self.accepter) accepter.daemon = True @@ -185,15 +187,21 @@ def serve_forever(self): util.debug('resetting stdout, stderr') sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ + for t in self._handler_threads: + t.join() + accepter.join() + self.listener.close() sys.exit(0) def accepter(self): - while True: + while True and not self.stop_event.is_set(): try: + self.listener.settimeout(0.5) c = self.listener.accept() except OSError: continue t = threading.Thread(target=self.handle_request, args=(c,)) + self._handler_threads.append(t) t.daemon = True t.start() diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 9f8412fe9394eb..553bf87f681787 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3399,7 +3399,7 @@ def test_mymanager(self): # bpo-30356: BaseManager._finalize_manager() sends SIGTERM # to the manager process if it takes longer than 1 second to stop, # which happens on slow buildbots. - self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) + self.assertEqual(manager._process.exitcode, 0) @warnings_helper.ignore_fork_in_thread_deprecation_warnings() def test_mymanager_context(self): @@ -3417,7 +3417,7 @@ def test_mymanager_context_prestarted(self): manager.start() with manager: self.common(manager) - self.assertEqual(manager._process.exitcode, 0) + self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) def common(self, manager): foo = manager.Foo() diff --git a/Misc/NEWS.d/next/Library/2025-11-19-05-43-58.gh-issue-140267.DTFG1E.rst b/Misc/NEWS.d/next/Library/2025-11-19-05-43-58.gh-issue-140267.DTFG1E.rst new file mode 100644 index 00000000000000..174e0ca22ac051 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-11-19-05-43-58.gh-issue-140267.DTFG1E.rst @@ -0,0 +1,2 @@ +Fixed thread leak in multiprocessing.Manager accepter thread by adding accepter.join() when before the serve_forever() method exits. +Fixed test_mymanager_context_prestarted test to expect either an exit code 0 or -signal.SIGTERM