From cb8e4220a00d987f6681a3efffdd26ff68a43512 Mon Sep 17 00:00:00 2001
From: Prithviraj Chaudhuri
Date: Wed, 19 Nov 2025 00:38:56 -0500
Subject: [PATCH 1/8] Waiting for the accepter thread to finish when the
serve_forever method has the stop_event.is_set() == True.
---
Lib/multiprocessing/managers.py | 1 +
1 file changed, 1 insertion(+)
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 91bcf243e78e5b..433395579355a7 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -185,6 +185,7 @@ def serve_forever(self):
util.debug('resetting stdout, stderr')
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
+ accepter.join()
sys.exit(0)
def accepter(self):
From 4862fb87a1ad06f873fb25a825144bb70bcc847e Mon Sep 17 00:00:00 2001
From: Prithviraj Chaudhuri
Date: Wed, 19 Nov 2025 00:39:33 -0500
Subject: [PATCH 2/8] Updated test_mymanager_context_prestarted to expect 0 or
-signal.SIGTERM exit codes
---
Lib/test/_test_multiprocessing.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 0f9c5c222250ae..f1295c622c50bb 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -3417,7 +3417,7 @@ def test_mymanager_context_prestarted(self):
manager.start()
with manager:
self.common(manager)
- self.assertEqual(manager._process.exitcode, 0)
+ self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
def common(self, manager):
foo = manager.Foo()
From 83014b5e8989b819b27dea25ed2064f25827ccd4 Mon Sep 17 00:00:00 2001
From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com>
Date: Wed, 19 Nov 2025 05:43:59 +0000
Subject: [PATCH 3/8] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?=
=?UTF-8?q?rb=5Fit.?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../next/Library/2025-11-19-05-43-58.gh-issue-140267.DTFG1E.rst | 2 ++
1 file changed, 2 insertions(+)
create mode 100644 Misc/NEWS.d/next/Library/2025-11-19-05-43-58.gh-issue-140267.DTFG1E.rst
diff --git a/Misc/NEWS.d/next/Library/2025-11-19-05-43-58.gh-issue-140267.DTFG1E.rst b/Misc/NEWS.d/next/Library/2025-11-19-05-43-58.gh-issue-140267.DTFG1E.rst
new file mode 100644
index 00000000000000..174e0ca22ac051
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2025-11-19-05-43-58.gh-issue-140267.DTFG1E.rst
@@ -0,0 +1,2 @@
+Fixed thread leak in multiprocessing.Manager accepter thread by adding accepter.join() when before the serve_forever() method exits.
+Fixed test_mymanager_context_prestarted test to expect either an exit code 0 or -signal.SIGTERM
From d7df8129e38a8acdbf3772657bdb46ad4fa12dab Mon Sep 17 00:00:00 2001
From: Prithviraj Chaudhuri
Date: Fri, 21 Nov 2025 19:07:50 -0500
Subject: [PATCH 4/8] Added timeout loop
---
Lib/multiprocessing/connection.py | 11 +++++++++++
Lib/multiprocessing/managers.py | 3 +++
2 files changed, 14 insertions(+)
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index fc00d2861260a8..72cdf7fbc2c44d 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -470,6 +470,7 @@ class Listener(object):
connections, or for a Windows named pipe.
'''
def __init__(self, address=None, family=None, backlog=1, authkey=None):
+
family = family or (address and address_type(address)) \
or default_family
address = address or arbitrary_address(family)
@@ -485,6 +486,10 @@ def __init__(self, address=None, family=None, backlog=1, authkey=None):
self._authkey = authkey
+ def settimeout(self, timeout):
+ if timeout:
+ self._listener.settimeout(timeout)
+
def accept(self):
'''
Accept a connection on the bound socket or named pipe of `self`.
@@ -639,6 +644,9 @@ def __init__(self, address, family, backlog=1):
else:
self._unlink = None
+ def settimeout(self, timeout):
+ self._socket.settimeout(timeout)
+
def accept(self):
s, self._last_accepted = self._socket.accept()
s.setblocking(True)
@@ -697,6 +705,9 @@ def _new_handle(self, first=False):
_winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
)
+ def settimeout(self, timeout):
+ pass
+
def accept(self):
self._handle_queue.append(self._new_handle())
handle = self._handle_queue.pop(0)
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 433395579355a7..27a97a750bf141 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -191,12 +191,15 @@ def serve_forever(self):
def accepter(self):
while True:
try:
+ self.listener.settimeout(20)
c = self.listener.accept()
except OSError:
continue
t = threading.Thread(target=self.handle_request, args=(c,))
t.daemon = True
t.start()
+ if self.stop_event.is_set():
+ break
def _handle_request(self, c):
request = None
From a224c6fba5142aed18f318fa037e6a19a3206e5c Mon Sep 17 00:00:00 2001
From: Prithviraj Chaudhuri
Date: Sun, 23 Nov 2025 11:31:42 -0500
Subject: [PATCH 5/8] Added timeout and joining daemon thread
---
Lib/multiprocessing/connection.py | 17 ++++++++++++-----
Lib/multiprocessing/managers.py | 11 +++++++----
Lib/test/_test_multiprocessing.py | 2 +-
3 files changed, 20 insertions(+), 10 deletions(-)
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index 72cdf7fbc2c44d..dcd79781b400e4 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -487,8 +487,10 @@ def __init__(self, address=None, family=None, backlog=1, authkey=None):
self._authkey = authkey
def settimeout(self, timeout):
- if timeout:
- self._listener.settimeout(timeout)
+ '''
+ Set timeout for the accept method.
+ '''
+ self._listener.settimeout(timeout)
def accept(self):
'''
@@ -645,7 +647,8 @@ def __init__(self, address, family, backlog=1):
self._unlink = None
def settimeout(self, timeout):
- self._socket.settimeout(timeout)
+ if timeout:
+ self._socket.settimeout(timeout)
def accept(self):
s, self._last_accepted = self._socket.accept()
@@ -692,6 +695,7 @@ def __init__(self, address, backlog=None):
self, PipeListener._finalize_pipe_listener,
args=(self._handle_queue, self._address), exitpriority=0
)
+ self._timeout = _winapi.INFINITE
def _new_handle(self, first=False):
flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
@@ -706,7 +710,8 @@ def _new_handle(self, first=False):
)
def settimeout(self, timeout):
- pass
+ if timeout:
+ self._timeout = int(timeout * 1000)
def accept(self):
self._handle_queue.append(self._new_handle())
@@ -721,7 +726,9 @@ def accept(self):
else:
try:
res = _winapi.WaitForMultipleObjects(
- [ov.event], False, INFINITE)
+ [ov.event], False, self._timeout)
+ if res == _winapi.WAIT_TIMEOUT:
+ raise TimeoutError("PipeListener timed out")
except:
ov.cancel()
_winapi.CloseHandle(handle)
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 27a97a750bf141..2378716fba7a8c 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -171,6 +171,7 @@ def serve_forever(self):
'''
self.stop_event = threading.Event()
process.current_process()._manager_server = self
+ accepter = None
try:
accepter = threading.Thread(target=self.accepter)
accepter.daemon = True
@@ -189,17 +190,19 @@ def serve_forever(self):
sys.exit(0)
def accepter(self):
- while True:
+ handler_threads = []
+ while True and not self.stop_event.is_set():
try:
- self.listener.settimeout(20)
+ self.listener.settimeout(3)
c = self.listener.accept()
except OSError:
continue
t = threading.Thread(target=self.handle_request, args=(c,))
+ handler_threads.append(t)
t.daemon = True
t.start()
- if self.stop_event.is_set():
- break
+ for t in handler_threads:
+ t.join()
def _handle_request(self, c):
request = None
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index f1295c622c50bb..954cb5e214a786 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -3399,7 +3399,7 @@ def test_mymanager(self):
# bpo-30356: BaseManager._finalize_manager() sends SIGTERM
# to the manager process if it takes longer than 1 second to stop,
# which happens on slow buildbots.
- self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
+ self.assertEqual(manager._process.exitcode, 0)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_mymanager_context(self):
From 7a64e6812d5935e2f529f3e1b0f37cf70053ff43 Mon Sep 17 00:00:00 2001
From: Prithviraj Chaudhuri
Date: Wed, 26 Nov 2025 18:41:00 -0500
Subject: [PATCH 6/8] Changed timeout to 0.5
---
Lib/multiprocessing/managers.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 2378716fba7a8c..8d47d8bff1be4c 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -193,7 +193,7 @@ def accepter(self):
handler_threads = []
while True and not self.stop_event.is_set():
try:
- self.listener.settimeout(3)
+ self.listener.settimeout(0.5)
c = self.listener.accept()
except OSError:
continue
From 22fb4b626e395982676f13fa00a413e563b0ef2f Mon Sep 17 00:00:00 2001
From: Prithviraj Chaudhuri
Date: Wed, 26 Nov 2025 19:37:59 -0500
Subject: [PATCH 7/8] Closing handler threads before accepter and after socket
close
---
Lib/multiprocessing/managers.py | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 8d47d8bff1be4c..b4bb03e819fbee 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -164,6 +164,7 @@ def __init__(self, registry, address, authkey, serializer):
self.id_to_refcount = {}
self.id_to_local_proxy_obj = {}
self.mutex = threading.Lock()
+ self._handler_threads = []
def serve_forever(self):
'''
@@ -186,11 +187,12 @@ def serve_forever(self):
util.debug('resetting stdout, stderr')
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
+ for t in self._handler_threads:
+ t.join()
accepter.join()
sys.exit(0)
def accepter(self):
- handler_threads = []
while True and not self.stop_event.is_set():
try:
self.listener.settimeout(0.5)
@@ -198,11 +200,9 @@ def accepter(self):
except OSError:
continue
t = threading.Thread(target=self.handle_request, args=(c,))
- handler_threads.append(t)
+ self._handler_threads.append(t)
t.daemon = True
t.start()
- for t in handler_threads:
- t.join()
def _handle_request(self, c):
request = None
From 49cdc22a653b605118f677086160f03702e390ac Mon Sep 17 00:00:00 2001
From: Prithviraj Chaudhuri
Date: Thu, 27 Nov 2025 21:00:29 -0500
Subject: [PATCH 8/8] Closing listener
---
Lib/multiprocessing/managers.py | 1 +
1 file changed, 1 insertion(+)
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index b4bb03e819fbee..c53859feca8bfc 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -190,6 +190,7 @@ def serve_forever(self):
for t in self._handler_threads:
t.join()
accepter.join()
+ self.listener.close()
sys.exit(0)
def accepter(self):