From cb8e4220a00d987f6681a3efffdd26ff68a43512 Mon Sep 17 00:00:00 2001 From: Prithviraj Chaudhuri Date: Wed, 19 Nov 2025 00:38:56 -0500 Subject: [PATCH 1/8] Waiting for the accepter thread to finish when the serve_forever method has the stop_event.is_set() == True. --- Lib/multiprocessing/managers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 91bcf243e78e5b..433395579355a7 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -185,6 +185,7 @@ def serve_forever(self): util.debug('resetting stdout, stderr') sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ + accepter.join() sys.exit(0) def accepter(self): From 4862fb87a1ad06f873fb25a825144bb70bcc847e Mon Sep 17 00:00:00 2001 From: Prithviraj Chaudhuri Date: Wed, 19 Nov 2025 00:39:33 -0500 Subject: [PATCH 2/8] Updated test_mymanager_context_prestarted to expect 0 or -signal.SIGTERM exit codes --- Lib/test/_test_multiprocessing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 0f9c5c222250ae..f1295c622c50bb 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3417,7 +3417,7 @@ def test_mymanager_context_prestarted(self): manager.start() with manager: self.common(manager) - self.assertEqual(manager._process.exitcode, 0) + self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) def common(self, manager): foo = manager.Foo() From 83014b5e8989b819b27dea25ed2064f25827ccd4 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Wed, 19 Nov 2025 05:43:59 +0000 Subject: [PATCH 3/8] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2025-11-19-05-43-58.gh-issue-140267.DTFG1E.rst | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2025-11-19-05-43-58.gh-issue-140267.DTFG1E.rst diff --git a/Misc/NEWS.d/next/Library/2025-11-19-05-43-58.gh-issue-140267.DTFG1E.rst b/Misc/NEWS.d/next/Library/2025-11-19-05-43-58.gh-issue-140267.DTFG1E.rst new file mode 100644 index 00000000000000..174e0ca22ac051 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-11-19-05-43-58.gh-issue-140267.DTFG1E.rst @@ -0,0 +1,2 @@ +Fixed thread leak in multiprocessing.Manager accepter thread by adding accepter.join() when before the serve_forever() method exits. +Fixed test_mymanager_context_prestarted test to expect either an exit code 0 or -signal.SIGTERM From d7df8129e38a8acdbf3772657bdb46ad4fa12dab Mon Sep 17 00:00:00 2001 From: Prithviraj Chaudhuri Date: Fri, 21 Nov 2025 19:07:50 -0500 Subject: [PATCH 4/8] Added timeout loop --- Lib/multiprocessing/connection.py | 11 +++++++++++ Lib/multiprocessing/managers.py | 3 +++ 2 files changed, 14 insertions(+) diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index fc00d2861260a8..72cdf7fbc2c44d 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -470,6 +470,7 @@ class Listener(object): connections, or for a Windows named pipe. ''' def __init__(self, address=None, family=None, backlog=1, authkey=None): + family = family or (address and address_type(address)) \ or default_family address = address or arbitrary_address(family) @@ -485,6 +486,10 @@ def __init__(self, address=None, family=None, backlog=1, authkey=None): self._authkey = authkey + def settimeout(self, timeout): + if timeout: + self._listener.settimeout(timeout) + def accept(self): ''' Accept a connection on the bound socket or named pipe of `self`. @@ -639,6 +644,9 @@ def __init__(self, address, family, backlog=1): else: self._unlink = None + def settimeout(self, timeout): + self._socket.settimeout(timeout) + def accept(self): s, self._last_accepted = self._socket.accept() s.setblocking(True) @@ -697,6 +705,9 @@ def _new_handle(self, first=False): _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL ) + def settimeout(self, timeout): + pass + def accept(self): self._handle_queue.append(self._new_handle()) handle = self._handle_queue.pop(0) diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 433395579355a7..27a97a750bf141 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -191,12 +191,15 @@ def serve_forever(self): def accepter(self): while True: try: + self.listener.settimeout(20) c = self.listener.accept() except OSError: continue t = threading.Thread(target=self.handle_request, args=(c,)) t.daemon = True t.start() + if self.stop_event.is_set(): + break def _handle_request(self, c): request = None From a224c6fba5142aed18f318fa037e6a19a3206e5c Mon Sep 17 00:00:00 2001 From: Prithviraj Chaudhuri Date: Sun, 23 Nov 2025 11:31:42 -0500 Subject: [PATCH 5/8] Added timeout and joining daemon thread --- Lib/multiprocessing/connection.py | 17 ++++++++++++----- Lib/multiprocessing/managers.py | 11 +++++++---- Lib/test/_test_multiprocessing.py | 2 +- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 72cdf7fbc2c44d..dcd79781b400e4 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -487,8 +487,10 @@ def __init__(self, address=None, family=None, backlog=1, authkey=None): self._authkey = authkey def settimeout(self, timeout): - if timeout: - self._listener.settimeout(timeout) + ''' + Set timeout for the accept method. + ''' + self._listener.settimeout(timeout) def accept(self): ''' @@ -645,7 +647,8 @@ def __init__(self, address, family, backlog=1): self._unlink = None def settimeout(self, timeout): - self._socket.settimeout(timeout) + if timeout: + self._socket.settimeout(timeout) def accept(self): s, self._last_accepted = self._socket.accept() @@ -692,6 +695,7 @@ def __init__(self, address, backlog=None): self, PipeListener._finalize_pipe_listener, args=(self._handle_queue, self._address), exitpriority=0 ) + self._timeout = _winapi.INFINITE def _new_handle(self, first=False): flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED @@ -706,7 +710,8 @@ def _new_handle(self, first=False): ) def settimeout(self, timeout): - pass + if timeout: + self._timeout = int(timeout * 1000) def accept(self): self._handle_queue.append(self._new_handle()) @@ -721,7 +726,9 @@ def accept(self): else: try: res = _winapi.WaitForMultipleObjects( - [ov.event], False, INFINITE) + [ov.event], False, self._timeout) + if res == _winapi.WAIT_TIMEOUT: + raise TimeoutError("PipeListener timed out") except: ov.cancel() _winapi.CloseHandle(handle) diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 27a97a750bf141..2378716fba7a8c 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -171,6 +171,7 @@ def serve_forever(self): ''' self.stop_event = threading.Event() process.current_process()._manager_server = self + accepter = None try: accepter = threading.Thread(target=self.accepter) accepter.daemon = True @@ -189,17 +190,19 @@ def serve_forever(self): sys.exit(0) def accepter(self): - while True: + handler_threads = [] + while True and not self.stop_event.is_set(): try: - self.listener.settimeout(20) + self.listener.settimeout(3) c = self.listener.accept() except OSError: continue t = threading.Thread(target=self.handle_request, args=(c,)) + handler_threads.append(t) t.daemon = True t.start() - if self.stop_event.is_set(): - break + for t in handler_threads: + t.join() def _handle_request(self, c): request = None diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index f1295c622c50bb..954cb5e214a786 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3399,7 +3399,7 @@ def test_mymanager(self): # bpo-30356: BaseManager._finalize_manager() sends SIGTERM # to the manager process if it takes longer than 1 second to stop, # which happens on slow buildbots. - self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) + self.assertEqual(manager._process.exitcode, 0) @warnings_helper.ignore_fork_in_thread_deprecation_warnings() def test_mymanager_context(self): From 7a64e6812d5935e2f529f3e1b0f37cf70053ff43 Mon Sep 17 00:00:00 2001 From: Prithviraj Chaudhuri Date: Wed, 26 Nov 2025 18:41:00 -0500 Subject: [PATCH 6/8] Changed timeout to 0.5 --- Lib/multiprocessing/managers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 2378716fba7a8c..8d47d8bff1be4c 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -193,7 +193,7 @@ def accepter(self): handler_threads = [] while True and not self.stop_event.is_set(): try: - self.listener.settimeout(3) + self.listener.settimeout(0.5) c = self.listener.accept() except OSError: continue From 22fb4b626e395982676f13fa00a413e563b0ef2f Mon Sep 17 00:00:00 2001 From: Prithviraj Chaudhuri Date: Wed, 26 Nov 2025 19:37:59 -0500 Subject: [PATCH 7/8] Closing handler threads before accepter and after socket close --- Lib/multiprocessing/managers.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 8d47d8bff1be4c..b4bb03e819fbee 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -164,6 +164,7 @@ def __init__(self, registry, address, authkey, serializer): self.id_to_refcount = {} self.id_to_local_proxy_obj = {} self.mutex = threading.Lock() + self._handler_threads = [] def serve_forever(self): ''' @@ -186,11 +187,12 @@ def serve_forever(self): util.debug('resetting stdout, stderr') sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ + for t in self._handler_threads: + t.join() accepter.join() sys.exit(0) def accepter(self): - handler_threads = [] while True and not self.stop_event.is_set(): try: self.listener.settimeout(0.5) @@ -198,11 +200,9 @@ def accepter(self): except OSError: continue t = threading.Thread(target=self.handle_request, args=(c,)) - handler_threads.append(t) + self._handler_threads.append(t) t.daemon = True t.start() - for t in handler_threads: - t.join() def _handle_request(self, c): request = None From 49cdc22a653b605118f677086160f03702e390ac Mon Sep 17 00:00:00 2001 From: Prithviraj Chaudhuri Date: Thu, 27 Nov 2025 21:00:29 -0500 Subject: [PATCH 8/8] Closing listener --- Lib/multiprocessing/managers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index b4bb03e819fbee..c53859feca8bfc 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -190,6 +190,7 @@ def serve_forever(self): for t in self._handler_threads: t.join() accepter.join() + self.listener.close() sys.exit(0) def accepter(self):