Skip to content

Commit d3c8925

Browse files
committed
Refactor ActionQueue's _delayed_flush method for improved clarity and efficiency; add tests for ActionQueueSettings validation and handling of empty actions.
1 parent d2b1cb4 commit d3c8925

2 files changed

Lines changed: 108 additions & 14 deletions

File tree

pyoverkiz/action_queue.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -201,20 +201,15 @@ async def add(
201201

202202
async def _delayed_flush(self) -> None:
203203
"""Wait for the delay period, then flush the queue."""
204-
waiters: list[QueuedExecution] = []
205-
try:
206-
await asyncio.sleep(self._settings.delay)
207-
async with self._lock:
208-
batch = self._prepare_flush()
209-
if not batch[0]:
210-
return
211-
actions, mode, label, waiters = batch
212-
213-
await self._execute_batch(actions, mode, label, waiters)
214-
except asyncio.CancelledError as exc:
215-
for waiter in waiters:
216-
waiter.set_exception(exc)
217-
raise
204+
await asyncio.sleep(self._settings.delay)
205+
async with self._lock:
206+
self._flush_task = None
207+
# Another coroutine may have already flushed the queue before we acquired the lock.
208+
actions, mode, label, waiters = self._prepare_flush()
209+
if not actions:
210+
return
211+
212+
await self._execute_batch(actions, mode, label, waiters)
218213

219214
def _prepare_flush(
220215
self,

tests/test_action_queue.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,3 +350,102 @@ async def set_result():
350350

351351
# Ensure background task has completed
352352
await task
353+
354+
355+
@pytest.mark.asyncio
356+
async def test_action_queue_settings_validate():
357+
"""Test that validate raises on invalid settings."""
358+
with pytest.raises(ValueError, match="positive"):
359+
ActionQueueSettings(delay=-1).validate()
360+
361+
with pytest.raises(ValueError, match="at least 1"):
362+
ActionQueueSettings(max_actions=0).validate()
363+
364+
# Valid settings should not raise
365+
ActionQueueSettings(delay=0.5, max_actions=10).validate()
366+
367+
368+
@pytest.mark.asyncio
369+
async def test_action_queue_add_empty_actions(mock_executor):
370+
"""Test that add raises ValueError for empty action list."""
371+
queue = ActionQueue(executor=mock_executor, settings=ActionQueueSettings(delay=0.1))
372+
373+
with pytest.raises(ValueError, match="at least one Action"):
374+
await queue.add([])
375+
376+
377+
@pytest.mark.asyncio
378+
async def test_action_queue_executor_cancelled_propagates():
379+
"""Test that CancelledError during execution propagates to waiters."""
380+
381+
async def cancelling_executor(actions, mode, label):
382+
raise asyncio.CancelledError
383+
384+
queue = ActionQueue(
385+
executor=AsyncMock(side_effect=cancelling_executor),
386+
settings=ActionQueueSettings(delay=0.05),
387+
)
388+
389+
action = Action(
390+
device_url="io://1234-5678-9012/1",
391+
commands=[Command(name=OverkizCommand.CLOSE)],
392+
)
393+
394+
queued = await queue.add([action])
395+
396+
with pytest.raises(asyncio.CancelledError):
397+
await queued
398+
399+
400+
@pytest.mark.asyncio
401+
async def test_action_queue_flush_empty(mock_executor):
402+
"""Test that flushing an empty queue is a no-op."""
403+
queue = ActionQueue(executor=mock_executor, settings=ActionQueueSettings(delay=0.1))
404+
405+
await queue.flush()
406+
mock_executor.assert_not_called()
407+
408+
409+
@pytest.mark.asyncio
410+
async def test_action_queue_shutdown_empty(mock_executor):
411+
"""Test that shutting down an empty queue is a no-op."""
412+
queue = ActionQueue(executor=mock_executor, settings=ActionQueueSettings(delay=0.1))
413+
414+
await queue.shutdown()
415+
mock_executor.assert_not_called()
416+
417+
418+
@pytest.mark.asyncio
419+
async def test_action_queue_no_self_cancel_during_delayed_flush():
420+
"""Test that _delayed_flush does not cancel itself via _prepare_flush.
421+
422+
When _delayed_flush fires and calls _prepare_flush, the flush task is still
423+
the running coroutine. _prepare_flush must not cancel it, otherwise the batch
424+
would fail with CancelledError when the executor performs I/O.
425+
"""
426+
cancel_detected = False
427+
428+
async def slow_executor(actions, mode, label):
429+
nonlocal cancel_detected
430+
try:
431+
await asyncio.sleep(0.05)
432+
except asyncio.CancelledError:
433+
cancel_detected = True
434+
raise
435+
return "exec-ok"
436+
437+
queue = ActionQueue(
438+
executor=AsyncMock(side_effect=slow_executor),
439+
settings=ActionQueueSettings(delay=0.05),
440+
)
441+
442+
action = Action(
443+
device_url="io://1234-5678-9012/1",
444+
commands=[Command(name=OverkizCommand.CLOSE)],
445+
)
446+
447+
queued = await queue.add([action])
448+
exec_id = await queued
449+
450+
assert exec_id == "exec-ok"
451+
assert not cancel_detected, "_delayed_flush cancelled itself via _prepare_flush"

0 commit comments

Comments
 (0)