Skip to content

Commit 233ccc1

Browse files
committed
Executor.map: avoid temporarily exceeding the buffersize while collecting the next result
1 parent b8367e7 commit 233ccc1

2 files changed

Lines changed: 32 additions & 5 deletions

File tree

Lib/concurrent/futures/_base.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -628,17 +628,18 @@ def result_iterator():
628628
# reverse to keep finishing order
629629
fs.reverse()
630630
while fs:
631+
# Careful not to keep a reference to the popped future
632+
if timeout is None:
633+
result = _result_or_cancel(fs.pop())
634+
else:
635+
result = _result_or_cancel(fs.pop(), end_time - time.monotonic())
631636
if (
632637
buffersize
633638
and (executor := executor_weakref())
634639
and (args := next(zipped_iterables, None))
635640
):
636641
fs.appendleft(executor.submit(fn, *args))
637-
# Careful not to keep a reference to the popped future
638-
if timeout is None:
639-
yield _result_or_cancel(fs.pop())
640-
else:
641-
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
642+
yield result
642643
finally:
643644
for future in fs:
644645
future.cancel()

Lib/test/test_concurrent_futures/executor.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import itertools
2+
import operator
23
import threading
34
import time
45
import weakref
56
from concurrent import futures
67
from operator import add
8+
from functools import partial
9+
from contextlib import suppress
710
from test import support
811
from test.support import Py_GIL_DISABLED
912

@@ -143,6 +146,29 @@ def test_map_buffersize_when_buffer_is_full(self):
143146
msg="should have fetched only `buffersize` elements from `ints`.",
144147
)
145148

149+
def test_map_buffersize_when_error(self):
150+
ints = [1, 2, 3, 0, 4, 5, 6]
151+
index_of_zero = ints.index(0)
152+
ints_iter = iter(ints)
153+
buffersize = 2
154+
reciprocal = partial(operator.truediv, 1)
155+
results = []
156+
with suppress(ZeroDivisionError):
157+
for result in self.executor.map(
158+
reciprocal, ints_iter, buffersize=buffersize
159+
):
160+
results.append(result)
161+
self.assertEqual(
162+
len(results),
163+
index_of_zero,
164+
msg="should have mapped until reaching the zero.",
165+
)
166+
self.assertEqual(
167+
len(results) + buffersize + len(list(ints_iter)),
168+
len(ints),
169+
msg="ints should be either processed, or buffered, or not fetched.",
170+
)
171+
146172
def test_shutdown_race_issue12456(self):
147173
# Issue #12456: race condition at shutdown where trying to post a
148174
# sentinel in the call queue blocks (the queue is full while processes

0 commit comments

Comments
 (0)