Skip to content

Commit 587d472

Browse files
authored
Restart workers when worker-ttl expires (#8538)
1 parent 6be418d commit 587d472

4 files changed

Lines changed: 109 additions & 25 deletions

File tree

distributed/nanny.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -390,17 +390,14 @@ async def start_unsafe(self):
390390

391391
return self
392392

393-
async def kill(self, timeout: float = 2, reason: str = "nanny-kill") -> None:
393+
async def kill(self, timeout: float = 5, reason: str = "nanny-kill") -> None:
394394
"""Kill the local worker process
395395
396396
Blocks until both the process is down and the scheduler is properly
397397
informed
398398
"""
399-
if self.process is None:
400-
return
401-
402-
deadline = time() + timeout
403-
await self.process.kill(reason=reason, timeout=0.8 * (deadline - time()))
399+
if self.process is not None:
400+
await self.process.kill(reason=reason, timeout=timeout)
404401

405402
async def instantiate(self) -> Status:
406403
"""Start a local worker process
@@ -822,7 +819,7 @@ def mark_stopped(self):
822819

823820
async def kill(
824821
self,
825-
timeout: float = 2,
822+
timeout: float = 5,
826823
executor_wait: bool = True,
827824
reason: str = "workerprocess-kill",
828825
) -> None:
@@ -876,7 +873,7 @@ async def kill(
876873
pass
877874

878875
logger.warning(
879-
f"Worker process still alive after {wait_timeout} seconds, killing"
876+
f"Worker process still alive after {wait_timeout:.1f} seconds, killing"
880877
)
881878
await process.kill()
882879
await process.join(max(0, deadline - time()))

distributed/scheduler.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ def __init__(
547547
self._memory_unmanaged_old = 0
548548
self._memory_unmanaged_history = deque()
549549
self.metrics = {}
550-
self.last_seen = 0
550+
self.last_seen = time()
551551
self.time_delay = 0
552552
self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth"))
553553
self.actors = set()
@@ -6335,7 +6335,10 @@ async def restart_workers(
63356335
# FIXME does not raise if the process fails to shut down,
63366336
# see https://github.com/dask/distributed/pull/6427/files#r894917424
63376337
# NOTE: Nanny will automatically restart worker process when it's killed
6338-
nanny.kill(reason=stimulus_id, timeout=timeout),
6338+
# NOTE: Don't propagate timeout to kill(): we don't want to
6339+
# spend (.8*.8)=64% of our end-to-end timeout waiting for a hung
6340+
# process to restart.
6341+
nanny.kill(reason=stimulus_id),
63396342
timeout,
63406343
)
63416344
for nanny in nannies
@@ -8406,19 +8409,29 @@ async def get_worker_monitor_info(self, recent=False, starts=None):
84068409
# Cleanup #
84078410
###########
84088411

8409-
async def check_worker_ttl(self):
8412+
@log_errors
8413+
async def check_worker_ttl(self) -> None:
84108414
now = time()
84118415
stimulus_id = f"check-worker-ttl-{now}"
8416+
assert self.worker_ttl
8417+
ttl = max(self.worker_ttl, 10 * heartbeat_interval(len(self.workers)))
8418+
to_restart = []
8419+
84128420
for ws in self.workers.values():
8413-
if (ws.last_seen < now - self.worker_ttl) and (
8414-
ws.last_seen < now - 10 * heartbeat_interval(len(self.workers))
8415-
):
8421+
last_seen = now - ws.last_seen
8422+
if last_seen > ttl:
8423+
to_restart.append(ws.address)
84168424
logger.warning(
8417-
"Worker failed to heartbeat within %s seconds. Closing: %s",
8418-
self.worker_ttl,
8419-
ws,
8425+
f"Worker failed to heartbeat for {last_seen:.0f}s; "
8426+
f"{'attempting restart' if ws.nanny else 'removing'}: {ws}"
84208427
)
8421-
await self.remove_worker(address=ws.address, stimulus_id=stimulus_id)
8428+
8429+
if to_restart:
8430+
await self.restart_workers(
8431+
to_restart,
8432+
wait_for_workers=False,
8433+
stimulus_id=stimulus_id,
8434+
)
84228435

84238436
def check_idle(self) -> float | None:
84248437
if self.status in (Status.closing, Status.closed):

distributed/tests/test_active_memory_manager.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1084,7 +1084,6 @@ async def test_RetireWorker_new_keys_arrive_after_all_keys_moved_away(c, s, a, b
10841084
@gen_cluster(
10851085
client=True,
10861086
config={
1087-
"distributed.scheduler.worker-ttl": "500ms",
10881087
"distributed.scheduler.active-memory-manager.start": True,
10891088
"distributed.scheduler.active-memory-manager.interval": 0.05,
10901089
"distributed.scheduler.active-memory-manager.measure": "managed",

distributed/tests/test_failed_workers.py

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414
from dask import delayed
1515
from dask.utils import parse_bytes
1616

17-
from distributed import Client, Nanny, profile, wait
17+
from distributed import Client, KilledWorker, Nanny, get_worker, profile, wait
1818
from distributed.comm import CommClosedError
1919
from distributed.compatibility import MACOS
20+
from distributed.core import Status
2021
from distributed.metrics import time
2122
from distributed.utils import CancelledError, sync
2223
from distributed.utils_test import (
@@ -450,10 +451,10 @@ async def test_restart_timeout_on_long_running_task(c, s, a):
450451

451452

452453
@pytest.mark.slow
453-
@gen_cluster(client=True, scheduler_kwargs={"worker_ttl": "500ms"})
454+
@gen_cluster(client=True, config={"distributed.scheduler.worker-ttl": "500ms"})
454455
async def test_worker_time_to_live(c, s, a, b):
455-
from distributed.scheduler import heartbeat_interval
456-
456+
# Note that this value is ignored because is less than 10x heartbeat_interval
457+
assert s.worker_ttl == 0.5
457458
assert set(s.workers) == {a.address, b.address}
458459

459460
a.periodic_callbacks["heartbeat"].stop()
@@ -465,10 +466,84 @@ async def test_worker_time_to_live(c, s, a, b):
465466

466467
# Worker removal is triggered after 10 * heartbeat
467468
# This is 10 * 0.5s at the moment of writing.
468-
interval = 10 * heartbeat_interval(len(s.workers))
469469
# Currently observing an extra 0.3~0.6s on top of the interval.
470470
# Adding some padding to prevent flakiness.
471-
assert time() - start < interval + 2.0
471+
assert time() - start < 7
472+
473+
474+
@pytest.mark.slow
475+
@pytest.mark.parametrize("block_evloop", [False, True])
476+
@gen_cluster(
477+
client=True,
478+
Worker=Nanny,
479+
nthreads=[("", 1)],
480+
scheduler_kwargs={"worker_ttl": "500ms", "allowed_failures": 0},
481+
)
482+
async def test_worker_ttl_restarts_worker(c, s, a, block_evloop):
483+
"""If the event loop of a worker becomes completely unresponsive, the scheduler will
484+
restart it through the nanny.
485+
"""
486+
ws = s.workers[a.worker_address]
487+
488+
async def f():
489+
w = get_worker()
490+
w.periodic_callbacks["heartbeat"].stop()
491+
if block_evloop:
492+
sleep(9999) # Block event loop indefinitely
493+
else:
494+
await asyncio.sleep(9999)
495+
496+
fut = c.submit(f, key="x")
497+
498+
while not s.workers or (
499+
(new_ws := next(iter(s.workers.values()))) is ws
500+
or new_ws.status != Status.running
501+
):
502+
await asyncio.sleep(0.01)
503+
504+
if block_evloop:
505+
# The nanny killed the worker with SIGKILL.
506+
# The restart has increased the suspicious count.
507+
with pytest.raises(KilledWorker):
508+
await fut
509+
assert s.tasks["x"].state == "erred"
510+
assert s.tasks["x"].suspicious == 1
511+
else:
512+
# The nanny sent to the WorkerProcess a {op: stop} through IPC, which in turn
513+
# successfully invoked Worker.close(nanny=False).
514+
# This behaviour makes sense as the worker-ttl timeout was most likely caused
515+
# by a failure in networking, rather than a hung process.
516+
assert s.tasks["x"].state == "processing"
517+
assert s.tasks["x"].suspicious == 0
518+
519+
520+
@pytest.mark.slow
521+
@gen_cluster(
522+
client=True,
523+
Worker=Nanny,
524+
nthreads=[("", 2)],
525+
scheduler_kwargs={"allowed_failures": 0},
526+
)
527+
async def test_restart_hung_worker(c, s, a):
528+
"""Test restart_workers() to restart a worker whose event loop has become completely
529+
unresponsive.
530+
"""
531+
ws = s.workers[a.worker_address]
532+
533+
async def f():
534+
w = get_worker()
535+
w.periodic_callbacks["heartbeat"].stop()
536+
sleep(9999) # Block event loop indefinitely
537+
538+
fut = c.submit(f)
539+
# Wait for worker to hang
540+
with pytest.raises(asyncio.TimeoutError):
541+
while True:
542+
await wait(c.submit(inc, 1, pure=False), timeout=0.2)
543+
544+
await c.restart_workers([a.worker_address])
545+
assert len(s.workers) == 1
546+
assert next(iter(s.workers.values())) is not ws
472547

473548

474549
@gen_cluster(client=True, nthreads=[("", 1)])

0 commit comments

Comments
 (0)