Skip to content

Commit a224c6f

Browse files
Added timeout and joining daemon thread
1 parent d7df812 commit a224c6f

3 files changed

Lines changed: 20 additions & 10 deletions

File tree

Lib/multiprocessing/connection.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -487,8 +487,10 @@ def __init__(self, address=None, family=None, backlog=1, authkey=None):
487487
self._authkey = authkey
488488

489489
def settimeout(self, timeout):
490-
if timeout:
491-
self._listener.settimeout(timeout)
490+
'''
491+
Set timeout for the accept method.
492+
'''
493+
self._listener.settimeout(timeout)
492494

493495
def accept(self):
494496
'''
@@ -645,7 +647,8 @@ def __init__(self, address, family, backlog=1):
645647
self._unlink = None
646648

647649
def settimeout(self, timeout):
648-
self._socket.settimeout(timeout)
650+
if timeout:
651+
self._socket.settimeout(timeout)
649652

650653
def accept(self):
651654
s, self._last_accepted = self._socket.accept()
@@ -692,6 +695,7 @@ def __init__(self, address, backlog=None):
692695
self, PipeListener._finalize_pipe_listener,
693696
args=(self._handle_queue, self._address), exitpriority=0
694697
)
698+
self._timeout = _winapi.INFINITE
695699

696700
def _new_handle(self, first=False):
697701
flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
@@ -706,7 +710,8 @@ def _new_handle(self, first=False):
706710
)
707711

708712
def settimeout(self, timeout):
709-
pass
713+
if timeout:
714+
self._timeout = int(timeout * 1000)
710715

711716
def accept(self):
712717
self._handle_queue.append(self._new_handle())
@@ -721,7 +726,9 @@ def accept(self):
721726
else:
722727
try:
723728
res = _winapi.WaitForMultipleObjects(
724-
[ov.event], False, INFINITE)
729+
[ov.event], False, self._timeout)
730+
if res == _winapi.WAIT_TIMEOUT:
731+
raise TimeoutError("PipeListener timed out")
725732
except:
726733
ov.cancel()
727734
_winapi.CloseHandle(handle)

Lib/multiprocessing/managers.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ def serve_forever(self):
171171
'''
172172
self.stop_event = threading.Event()
173173
process.current_process()._manager_server = self
174+
accepter = None
174175
try:
175176
accepter = threading.Thread(target=self.accepter)
176177
accepter.daemon = True
@@ -189,17 +190,19 @@ def serve_forever(self):
189190
sys.exit(0)
190191

191192
def accepter(self):
192-
while True:
193+
handler_threads = []
194+
while True and not self.stop_event.is_set():
193195
try:
194-
self.listener.settimeout(20)
196+
self.listener.settimeout(3)
195197
c = self.listener.accept()
196198
except OSError:
197199
continue
198200
t = threading.Thread(target=self.handle_request, args=(c,))
201+
handler_threads.append(t)
199202
t.daemon = True
200203
t.start()
201-
if self.stop_event.is_set():
202-
break
204+
for t in handler_threads:
205+
t.join()
203206

204207
def _handle_request(self, c):
205208
request = None

Lib/test/_test_multiprocessing.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3399,7 +3399,7 @@ def test_mymanager(self):
33993399
# bpo-30356: BaseManager._finalize_manager() sends SIGTERM
34003400
# to the manager process if it takes longer than 1 second to stop,
34013401
# which happens on slow buildbots.
3402-
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
3402+
self.assertEqual(manager._process.exitcode, 0)
34033403

34043404
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
34053405
def test_mymanager_context(self):

0 commit comments

Comments
 (0)