From 51505d9071918c7166627f4e1f71bd902d7f6a59 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Sat, 25 Apr 2026 20:11:24 +0200 Subject: [PATCH 1/4] Simplify ActionQueue: extract _merge_actions, deduplicate _delayed_flush, fix shutdown lock - Extract _merge_actions() helper to deduplicate action-merging logic in add() - Replace hand-rolled snapshot in _delayed_flush with _prepare_flush() call - Move cancelled task await outside lock in shutdown() to prevent potential deadlock --- pyoverkiz/action_queue.py | 83 +++++++++++++++++---------------------- 1 file changed, 35 insertions(+), 48 deletions(-) diff --git a/pyoverkiz/action_queue.py b/pyoverkiz/action_queue.py index f78be230..8077152b 100644 --- a/pyoverkiz/action_queue.py +++ b/pyoverkiz/action_queue.py @@ -107,13 +107,26 @@ def __init__( self._lock = asyncio.Lock() @staticmethod - def _copy_action(action: Action) -> Action: - """Return an `Action` copy with an independent commands list. - - The queue merges commands for duplicate devices, so caller-owned action - instances must be copied to avoid mutating user input while batching. - """ - return Action(device_url=action.device_url, commands=list(action.commands)) + def _merge_actions( + target: list[Action], + index: dict[str, Action], + source: list[Action], + *, + copy: bool = False, + ) -> None: + """Merge *source* actions into *target*, combining commands for duplicate devices.""" + for action in source: + existing = index.get(action.device_url) + if existing is None: + merged = ( + Action(device_url=action.device_url, commands=list(action.commands)) + if copy + else action + ) + target.append(merged) + index[action.device_url] = merged + else: + existing.commands.extend(action.commands) async def add( self, @@ -146,14 +159,7 @@ async def add( normalized_actions: list[Action] = [] normalized_index: dict[str, Action] = {} - for action in actions: - existing = normalized_index.get(action.device_url) - if existing is None: - action_copy = self._copy_action(action) - normalized_actions.append(action_copy) - normalized_index[action.device_url] = action_copy - else: - existing.commands.extend(action.commands) + self._merge_actions(normalized_actions, normalized_index, actions, copy=True) async with self._lock: # If mode or label changes, flush existing queue first @@ -162,18 +168,10 @@ async def add( ): batches_to_execute.append(self._prepare_flush()) - # Add actions to pending queue - pending_index = { - pending_action.device_url: pending_action - for pending_action in self._pending_actions - } - for action in normalized_actions: - pending = pending_index.get(action.device_url) - if pending is None: - self._pending_actions.append(action) - pending_index[action.device_url] = action - else: - pending.commands.extend(action.commands) + pending_index = {a.device_url: a for a in self._pending_actions} + self._merge_actions( + self._pending_actions, pending_index, normalized_actions + ) self._pending_mode = mode self._pending_label = label @@ -207,25 +205,13 @@ async def _delayed_flush(self) -> None: try: await asyncio.sleep(self._settings.delay) async with self._lock: - if not self._pending_actions: + batch = self._prepare_flush() + if not batch[0]: return + actions, mode, label, waiters = batch - # Take snapshot and clear state while holding lock - actions = self._pending_actions - mode = self._pending_mode - label = self._pending_label - waiters = self._pending_waiters - - self._pending_actions = [] - self._pending_mode = None - self._pending_label = None - self._pending_waiters = [] - self._flush_task = None - - # Execute outside the lock await self._execute_batch(actions, mode, label, waiters) except asyncio.CancelledError as exc: - # Ensure all waiters are notified if this task is cancelled for waiter in waiters: waiter.set_exception(exc) raise @@ -317,19 +303,20 @@ def get_pending_count(self) -> int: async def shutdown(self) -> None: """Shutdown the queue, flushing any pending actions.""" + cancelled_task: asyncio.Task[None] | None = None batch_to_execute = None async with self._lock: if self._flush_task and not self._flush_task.done(): - task = self._flush_task - task.cancel() + cancelled_task = self._flush_task + cancelled_task.cancel() self._flush_task = None - # Wait for cancellation to complete - with contextlib.suppress(asyncio.CancelledError): - await task if self._pending_actions: batch_to_execute = self._prepare_flush() - # Execute outside the lock + if cancelled_task: + with contextlib.suppress(asyncio.CancelledError): + await cancelled_task + if batch_to_execute: await self._execute_batch(*batch_to_execute) From d3c8925e060e71a4194908b06c223096d387a863 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Sat, 25 Apr 2026 19:07:11 +0000 Subject: [PATCH 2/4] Refactor ActionQueue's _delayed_flush method for improved clarity and efficiency; add tests for ActionQueueSettings validation and handling of empty actions. --- pyoverkiz/action_queue.py | 23 ++++----- tests/test_action_queue.py | 99 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 14 deletions(-) diff --git a/pyoverkiz/action_queue.py b/pyoverkiz/action_queue.py index 8077152b..7c839344 100644 --- a/pyoverkiz/action_queue.py +++ b/pyoverkiz/action_queue.py @@ -201,20 +201,15 @@ async def add( async def _delayed_flush(self) -> None: """Wait for the delay period, then flush the queue.""" - waiters: list[QueuedExecution] = [] - try: - await asyncio.sleep(self._settings.delay) - async with self._lock: - batch = self._prepare_flush() - if not batch[0]: - return - actions, mode, label, waiters = batch - - await self._execute_batch(actions, mode, label, waiters) - except asyncio.CancelledError as exc: - for waiter in waiters: - waiter.set_exception(exc) - raise + await asyncio.sleep(self._settings.delay) + async with self._lock: + self._flush_task = None + # Another coroutine may have already flushed the queue before we acquired the lock. + actions, mode, label, waiters = self._prepare_flush() + if not actions: + return + + await self._execute_batch(actions, mode, label, waiters) def _prepare_flush( self, diff --git a/tests/test_action_queue.py b/tests/test_action_queue.py index 0a8c0219..06f5658b 100644 --- a/tests/test_action_queue.py +++ b/tests/test_action_queue.py @@ -350,3 +350,102 @@ async def set_result(): # Ensure background task has completed await task + + +@pytest.mark.asyncio +async def test_action_queue_settings_validate(): + """Test that validate raises on invalid settings.""" + with pytest.raises(ValueError, match="positive"): + ActionQueueSettings(delay=-1).validate() + + with pytest.raises(ValueError, match="at least 1"): + ActionQueueSettings(max_actions=0).validate() + + # Valid settings should not raise + ActionQueueSettings(delay=0.5, max_actions=10).validate() + + +@pytest.mark.asyncio +async def test_action_queue_add_empty_actions(mock_executor): + """Test that add raises ValueError for empty action list.""" + queue = ActionQueue(executor=mock_executor, settings=ActionQueueSettings(delay=0.1)) + + with pytest.raises(ValueError, match="at least one Action"): + await queue.add([]) + + +@pytest.mark.asyncio +async def test_action_queue_executor_cancelled_propagates(): + """Test that CancelledError during execution propagates to waiters.""" + + async def cancelling_executor(actions, mode, label): + raise asyncio.CancelledError + + queue = ActionQueue( + executor=AsyncMock(side_effect=cancelling_executor), + settings=ActionQueueSettings(delay=0.05), + ) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + queued = await queue.add([action]) + + with pytest.raises(asyncio.CancelledError): + await queued + + +@pytest.mark.asyncio +async def test_action_queue_flush_empty(mock_executor): + """Test that flushing an empty queue is a no-op.""" + queue = ActionQueue(executor=mock_executor, settings=ActionQueueSettings(delay=0.1)) + + await queue.flush() + mock_executor.assert_not_called() + + +@pytest.mark.asyncio +async def test_action_queue_shutdown_empty(mock_executor): + """Test that shutting down an empty queue is a no-op.""" + queue = ActionQueue(executor=mock_executor, settings=ActionQueueSettings(delay=0.1)) + + await queue.shutdown() + mock_executor.assert_not_called() + + +@pytest.mark.asyncio +async def test_action_queue_no_self_cancel_during_delayed_flush(): + """Test that _delayed_flush does not cancel itself via _prepare_flush. + + When _delayed_flush fires and calls _prepare_flush, the flush task is still + the running coroutine. _prepare_flush must not cancel it, otherwise the batch + would fail with CancelledError when the executor performs I/O. + """ + cancel_detected = False + + async def slow_executor(actions, mode, label): + nonlocal cancel_detected + try: + await asyncio.sleep(0.05) + except asyncio.CancelledError: + cancel_detected = True + raise + return "exec-ok" + + queue = ActionQueue( + executor=AsyncMock(side_effect=slow_executor), + settings=ActionQueueSettings(delay=0.05), + ) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + queued = await queue.add([action]) + exec_id = await queued + + assert exec_id == "exec-ok" + assert not cancel_detected, "_delayed_flush cancelled itself via _prepare_flush" From e0842417dbefcfade8da247e6f09669e5e948dc7 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Sat, 25 Apr 2026 19:23:07 +0000 Subject: [PATCH 3/4] Enhance ActionQueue documentation: clarify batching behavior and merging logic for device actions --- docs/action-queue.md | 65 +++++++++++++++++++++++++++++++++++---- pyoverkiz/action_queue.py | 44 ++++++++++++++++++++------ 2 files changed, 93 insertions(+), 16 deletions(-) diff --git a/docs/action-queue.md b/docs/action-queue.md index 8da02d90..79a3b738 100644 --- a/docs/action-queue.md +++ b/docs/action-queue.md @@ -2,12 +2,65 @@ The action queue automatically groups rapid, consecutive calls to `execute_action_group()` into a single ActionGroup execution. This minimizes the number of API calls and helps prevent rate limiting issues, such as `TooManyRequestsError`, `TooManyConcurrentRequestsError`, `TooManyExecutionsError`, or `ExecutionQueueFullError` which can occur if actions are sent individually in quick succession. -Important limitation: -- Gateways only allow a single action per device in each action group. The queue - merges commands for the same `device_url` into a single action to keep the - batch valid and preserve command order for that device. -- If you pass multiple actions for the same `device_url` in a single - `execute_action_group()` call, the queue will merge them for you. +## How batching and merging works + +The Overkiz API uses three levels of nesting: + +- **Command** — a single device instruction (e.g. `close`, `setClosure(50)`) +- **Action** — one device URL + one or more commands +- **ActionGroup** — a batch of actions submitted as a single API call + +The gateway enforces **one action per device** in each action group. The queue handles this automatically: when multiple actions target the same `device_url`, their commands are merged into a single action while preserving order. + +### Different devices — no merging needed + +Three commands for three different devices produce three actions in one action group: + +```python +# These three calls arrive within the delay window: +await client.execute_action_group([Action("io://1234-5678-1234/12345678", [Command(name=OverkizCommand.CLOSE)])]) +await client.execute_action_group([Action("io://1234-5678-1234/87654321", [Command(name=OverkizCommand.OPEN)])]) +await client.execute_action_group([Action("io://1234-5678-1234/11111111", [Command(name=OverkizCommand.STOP)])]) + +# Sent as one API call: +# ActionGroup(actions=[ +# Action("io://…/12345678", commands=[close]), +# Action("io://…/87654321", commands=[open]), +# Action("io://…/11111111", commands=[stop]), +# ]) +``` + +### Same device — commands are merged + +When two calls target the same device, the queue merges their commands into a single action: + +```python +await client.execute_action_group([Action("io://1234-5678-1234/12345678", [Command(name=OverkizCommand.CLOSE)])]) +await client.execute_action_group([Action("io://1234-5678-1234/12345678", [Command(name=OverkizCommand.SET_CLOSURE, parameters=[50])])]) + +# Sent as one API call: +# ActionGroup(actions=[ +# Action("io://…/12345678", commands=[close, setClosure(50)]), +# ]) +``` + +### Mixed — both behaviors combined + +```python +await client.execute_action_group([Action("io://1234-5678-1234/12345678", [Command(name=OverkizCommand.CLOSE)])]) +await client.execute_action_group([ + Action("io://1234-5678-1234/87654321", [Command(name=OverkizCommand.OPEN)]), + Action("io://1234-5678-1234/12345678", [Command(name=OverkizCommand.SET_CLOSURE, parameters=[50])]), +]) + +# Sent as one API call: +# ActionGroup(actions=[ +# Action("io://…/12345678", commands=[close, setClosure(50)]), # merged +# Action("io://…/87654321", commands=[open]), +# ]) +``` + +The original action objects passed to `execute_action_group()` are never mutated — the queue works on internal copies. ## Enable with defaults diff --git a/pyoverkiz/action_queue.py b/pyoverkiz/action_queue.py index 7c839344..d562f3d7 100644 --- a/pyoverkiz/action_queue.py +++ b/pyoverkiz/action_queue.py @@ -71,16 +71,32 @@ def __await__(self) -> Generator[Any, None, str]: class ActionQueue: - """Batches multiple action executions into single API calls. + """Batches device actions into single API calls (action groups). - When actions are added, they are held for a configurable delay period. - If more actions arrive during this window, they are batched together. - The batch is flushed when: + The Overkiz API executes commands via action groups. Each action group + contains one Action per device, and each Action holds one or more Commands. + The gateway enforces at most one Action per device per action group. + + Batching example — two add() calls arriving within the delay window:: + + add([Action("device/1", [close])]) + add([Action("device/2", [open]), Action("device/1", [setClosure(50)])]) + + Produces one action group with two actions:: + + ActionGroup(actions=[ + Action("device/1", [close, setClosure(50)]), # commands merged + Action("device/2", [open]), + ]) + + Three separate devices would remain three separate actions in the group. + Merging only happens when the same device_url appears more than once. + + The queue flushes when: - The delay timer expires - The max actions limit is reached - - The execution mode changes - - The label changes - - Manual flush is requested + - The execution mode or label changes + - flush() or shutdown() is called """ def __init__( @@ -114,7 +130,12 @@ def _merge_actions( *, copy: bool = False, ) -> None: - """Merge *source* actions into *target*, combining commands for duplicate devices.""" + """Merge *source* actions into *target*, combining commands for duplicate devices. + + New device_urls are appended to *target*; existing ones get their commands + extended. When *copy* is True, source actions are copied to avoid mutating + caller-owned objects. + """ for action in source: existing = index.get(action.device_url) if existing is None: @@ -194,8 +215,11 @@ async def add( # Execute batches outside the lock if we flushed for batch in batches_to_execute: - if batch[0]: - await self._execute_batch(*batch) + batch_actions, batch_mode, batch_label, batch_waiters = batch + if batch_actions: + await self._execute_batch( + batch_actions, batch_mode, batch_label, batch_waiters + ) return waiter From 86129caef9221bba2c53bff3a4468ff10ae7c5e1 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Sat, 25 Apr 2026 19:35:43 +0000 Subject: [PATCH 4/4] Fix Action examples to use keyword arguments matching kw_only=True definition --- docs/action-queue.md | 28 ++++++++++++++-------------- pyoverkiz/action_queue.py | 11 +++++++---- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/docs/action-queue.md b/docs/action-queue.md index 79a3b738..8a55c6b2 100644 --- a/docs/action-queue.md +++ b/docs/action-queue.md @@ -18,15 +18,15 @@ Three commands for three different devices produce three actions in one action g ```python # These three calls arrive within the delay window: -await client.execute_action_group([Action("io://1234-5678-1234/12345678", [Command(name=OverkizCommand.CLOSE)])]) -await client.execute_action_group([Action("io://1234-5678-1234/87654321", [Command(name=OverkizCommand.OPEN)])]) -await client.execute_action_group([Action("io://1234-5678-1234/11111111", [Command(name=OverkizCommand.STOP)])]) +await client.execute_action_group([Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.CLOSE)])]) +await client.execute_action_group([Action(device_url="io://1234-5678-1234/87654321", commands=[Command(name=OverkizCommand.OPEN)])]) +await client.execute_action_group([Action(device_url="io://1234-5678-1234/11111111", commands=[Command(name=OverkizCommand.STOP)])]) # Sent as one API call: # ActionGroup(actions=[ -# Action("io://…/12345678", commands=[close]), -# Action("io://…/87654321", commands=[open]), -# Action("io://…/11111111", commands=[stop]), +# Action(device_url="io://…/12345678", commands=[close]), +# Action(device_url="io://…/87654321", commands=[open]), +# Action(device_url="io://…/11111111", commands=[stop]), # ]) ``` @@ -35,28 +35,28 @@ await client.execute_action_group([Action("io://1234-5678-1234/11111111", [Comma When two calls target the same device, the queue merges their commands into a single action: ```python -await client.execute_action_group([Action("io://1234-5678-1234/12345678", [Command(name=OverkizCommand.CLOSE)])]) -await client.execute_action_group([Action("io://1234-5678-1234/12345678", [Command(name=OverkizCommand.SET_CLOSURE, parameters=[50])])]) +await client.execute_action_group([Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.CLOSE)])]) +await client.execute_action_group([Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.SET_CLOSURE, parameters=[50])])]) # Sent as one API call: # ActionGroup(actions=[ -# Action("io://…/12345678", commands=[close, setClosure(50)]), +# Action(device_url="io://…/12345678", commands=[close, setClosure(50)]), # ]) ``` ### Mixed — both behaviors combined ```python -await client.execute_action_group([Action("io://1234-5678-1234/12345678", [Command(name=OverkizCommand.CLOSE)])]) +await client.execute_action_group([Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.CLOSE)])]) await client.execute_action_group([ - Action("io://1234-5678-1234/87654321", [Command(name=OverkizCommand.OPEN)]), - Action("io://1234-5678-1234/12345678", [Command(name=OverkizCommand.SET_CLOSURE, parameters=[50])]), + Action(device_url="io://1234-5678-1234/87654321", commands=[Command(name=OverkizCommand.OPEN)]), + Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.SET_CLOSURE, parameters=[50])]), ]) # Sent as one API call: # ActionGroup(actions=[ -# Action("io://…/12345678", commands=[close, setClosure(50)]), # merged -# Action("io://…/87654321", commands=[open]), +# Action(device_url="io://…/12345678", commands=[close, setClosure(50)]), # merged +# Action(device_url="io://…/87654321", commands=[open]), # ]) ``` diff --git a/pyoverkiz/action_queue.py b/pyoverkiz/action_queue.py index d562f3d7..8a124686 100644 --- a/pyoverkiz/action_queue.py +++ b/pyoverkiz/action_queue.py @@ -79,14 +79,17 @@ class ActionQueue: Batching example — two add() calls arriving within the delay window:: - add([Action("device/1", [close])]) - add([Action("device/2", [open]), Action("device/1", [setClosure(50)])]) + add([Action(device_url="device/1", commands=[close])]) + add([ + Action(device_url="device/2", commands=[open]), + Action(device_url="device/1", commands=[setClosure(50)]), + ]) Produces one action group with two actions:: ActionGroup(actions=[ - Action("device/1", [close, setClosure(50)]), # commands merged - Action("device/2", [open]), + Action(device_url="device/1", commands=[close, setClosure(50)]), # merged + Action(device_url="device/2", commands=[open]), ]) Three separate devices would remain three separate actions in the group.