Skip to content

Commit 8e18390

Browse files
authored
Improve action queue code quality (#2019)
1 parent 098b4ce commit 8e18390

3 files changed

Lines changed: 232 additions & 71 deletions

File tree

docs/action-queue.md

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,65 @@
22

33
The action queue automatically groups rapid, consecutive calls to `execute_action_group()` into a single ActionGroup execution. This minimizes the number of API calls and helps prevent rate limiting issues, such as `TooManyRequestsError`, `TooManyConcurrentRequestsError`, `TooManyExecutionsError`, or `ExecutionQueueFullError` which can occur if actions are sent individually in quick succession.
44

5-
Important limitation:
6-
- Gateways only allow a single action per device in each action group. The queue
7-
merges commands for the same `device_url` into a single action to keep the
8-
batch valid and preserve command order for that device.
9-
- If you pass multiple actions for the same `device_url` in a single
10-
`execute_action_group()` call, the queue will merge them for you.
5+
## How batching and merging works
6+
7+
The Overkiz API uses three levels of nesting:
8+
9+
- **Command** — a single device instruction (e.g. `close`, `setClosure(50)`)
10+
- **Action** — one device URL + one or more commands
11+
- **ActionGroup** — a batch of actions submitted as a single API call
12+
13+
The gateway enforces **one action per device** in each action group. The queue handles this automatically: when multiple actions target the same `device_url`, their commands are merged into a single action while preserving order.
14+
15+
### Different devices — no merging needed
16+
17+
Three commands for three different devices produce three actions in one action group:
18+
19+
```python
20+
# These three calls arrive within the delay window:
21+
await client.execute_action_group([Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.CLOSE)])])
22+
await client.execute_action_group([Action(device_url="io://1234-5678-1234/87654321", commands=[Command(name=OverkizCommand.OPEN)])])
23+
await client.execute_action_group([Action(device_url="io://1234-5678-1234/11111111", commands=[Command(name=OverkizCommand.STOP)])])
24+
25+
# Sent as one API call:
26+
# ActionGroup(actions=[
27+
# Action(device_url="io://…/12345678", commands=[close]),
28+
# Action(device_url="io://…/87654321", commands=[open]),
29+
# Action(device_url="io://…/11111111", commands=[stop]),
30+
# ])
31+
```
32+
33+
### Same device — commands are merged
34+
35+
When two calls target the same device, the queue merges their commands into a single action:
36+
37+
```python
38+
await client.execute_action_group([Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.CLOSE)])])
39+
await client.execute_action_group([Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.SET_CLOSURE, parameters=[50])])])
40+
41+
# Sent as one API call:
42+
# ActionGroup(actions=[
43+
# Action(device_url="io://…/12345678", commands=[close, setClosure(50)]),
44+
# ])
45+
```
46+
47+
### Mixed — both behaviors combined
48+
49+
```python
50+
await client.execute_action_group([Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.CLOSE)])])
51+
await client.execute_action_group([
52+
Action(device_url="io://1234-5678-1234/87654321", commands=[Command(name=OverkizCommand.OPEN)]),
53+
Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.SET_CLOSURE, parameters=[50])]),
54+
])
55+
56+
# Sent as one API call:
57+
# ActionGroup(actions=[
58+
# Action(device_url="io://…/12345678", commands=[close, setClosure(50)]), # merged
59+
# Action(device_url="io://…/87654321", commands=[open]),
60+
# ])
61+
```
62+
63+
The original action objects passed to `execute_action_group()` are never mutated — the queue works on internal copies.
1164

1265
## Enable with defaults
1366

pyoverkiz/action_queue.py

Lines changed: 74 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,35 @@ def __await__(self) -> Generator[Any, None, str]:
7171

7272

7373
class ActionQueue:
74-
"""Batches multiple action executions into single API calls.
74+
"""Batches device actions into single API calls (action groups).
7575
76-
When actions are added, they are held for a configurable delay period.
77-
If more actions arrive during this window, they are batched together.
78-
The batch is flushed when:
76+
The Overkiz API executes commands via action groups. Each action group
77+
contains one Action per device, and each Action holds one or more Commands.
78+
The gateway enforces at most one Action per device per action group.
79+
80+
Batching example — two add() calls arriving within the delay window::
81+
82+
add([Action(device_url="device/1", commands=[close])])
83+
add([
84+
Action(device_url="device/2", commands=[open]),
85+
Action(device_url="device/1", commands=[setClosure(50)]),
86+
])
87+
88+
Produces one action group with two actions::
89+
90+
ActionGroup(actions=[
91+
Action(device_url="device/1", commands=[close, setClosure(50)]), # merged
92+
Action(device_url="device/2", commands=[open]),
93+
])
94+
95+
Three separate devices would remain three separate actions in the group.
96+
Merging only happens when the same device_url appears more than once.
97+
98+
The queue flushes when:
7999
- The delay timer expires
80100
- The max actions limit is reached
81-
- The execution mode changes
82-
- The label changes
83-
- Manual flush is requested
101+
- The execution mode or label changes
102+
- flush() or shutdown() is called
84103
"""
85104

86105
def __init__(
@@ -107,13 +126,31 @@ def __init__(
107126
self._lock = asyncio.Lock()
108127

109128
@staticmethod
110-
def _copy_action(action: Action) -> Action:
111-
"""Return an `Action` copy with an independent commands list.
129+
def _merge_actions(
130+
target: list[Action],
131+
index: dict[str, Action],
132+
source: list[Action],
133+
*,
134+
copy: bool = False,
135+
) -> None:
136+
"""Merge *source* actions into *target*, combining commands for duplicate devices.
112137
113-
The queue merges commands for duplicate devices, so caller-owned action
114-
instances must be copied to avoid mutating user input while batching.
138+
New device_urls are appended to *target*; existing ones get their commands
139+
extended. When *copy* is True, source actions are copied to avoid mutating
140+
caller-owned objects.
115141
"""
116-
return Action(device_url=action.device_url, commands=list(action.commands))
142+
for action in source:
143+
existing = index.get(action.device_url)
144+
if existing is None:
145+
merged = (
146+
Action(device_url=action.device_url, commands=list(action.commands))
147+
if copy
148+
else action
149+
)
150+
target.append(merged)
151+
index[action.device_url] = merged
152+
else:
153+
existing.commands.extend(action.commands)
117154

118155
async def add(
119156
self,
@@ -146,14 +183,7 @@ async def add(
146183

147184
normalized_actions: list[Action] = []
148185
normalized_index: dict[str, Action] = {}
149-
for action in actions:
150-
existing = normalized_index.get(action.device_url)
151-
if existing is None:
152-
action_copy = self._copy_action(action)
153-
normalized_actions.append(action_copy)
154-
normalized_index[action.device_url] = action_copy
155-
else:
156-
existing.commands.extend(action.commands)
186+
self._merge_actions(normalized_actions, normalized_index, actions, copy=True)
157187

158188
async with self._lock:
159189
# If mode or label changes, flush existing queue first
@@ -162,18 +192,10 @@ async def add(
162192
):
163193
batches_to_execute.append(self._prepare_flush())
164194

165-
# Add actions to pending queue
166-
pending_index = {
167-
pending_action.device_url: pending_action
168-
for pending_action in self._pending_actions
169-
}
170-
for action in normalized_actions:
171-
pending = pending_index.get(action.device_url)
172-
if pending is None:
173-
self._pending_actions.append(action)
174-
pending_index[action.device_url] = action
175-
else:
176-
pending.commands.extend(action.commands)
195+
pending_index = {a.device_url: a for a in self._pending_actions}
196+
self._merge_actions(
197+
self._pending_actions, pending_index, normalized_actions
198+
)
177199
self._pending_mode = mode
178200
self._pending_label = label
179201

@@ -196,39 +218,25 @@ async def add(
196218

197219
# Execute batches outside the lock if we flushed
198220
for batch in batches_to_execute:
199-
if batch[0]:
200-
await self._execute_batch(*batch)
221+
batch_actions, batch_mode, batch_label, batch_waiters = batch
222+
if batch_actions:
223+
await self._execute_batch(
224+
batch_actions, batch_mode, batch_label, batch_waiters
225+
)
201226

202227
return waiter
203228

204229
async def _delayed_flush(self) -> None:
205230
"""Wait for the delay period, then flush the queue."""
206-
waiters: list[QueuedExecution] = []
207-
try:
208-
await asyncio.sleep(self._settings.delay)
209-
async with self._lock:
210-
if not self._pending_actions:
211-
return
212-
213-
# Take snapshot and clear state while holding lock
214-
actions = self._pending_actions
215-
mode = self._pending_mode
216-
label = self._pending_label
217-
waiters = self._pending_waiters
218-
219-
self._pending_actions = []
220-
self._pending_mode = None
221-
self._pending_label = None
222-
self._pending_waiters = []
223-
self._flush_task = None
231+
await asyncio.sleep(self._settings.delay)
232+
async with self._lock:
233+
self._flush_task = None
234+
# Another coroutine may have already flushed the queue before we acquired the lock.
235+
actions, mode, label, waiters = self._prepare_flush()
236+
if not actions:
237+
return
224238

225-
# Execute outside the lock
226-
await self._execute_batch(actions, mode, label, waiters)
227-
except asyncio.CancelledError as exc:
228-
# Ensure all waiters are notified if this task is cancelled
229-
for waiter in waiters:
230-
waiter.set_exception(exc)
231-
raise
239+
await self._execute_batch(actions, mode, label, waiters)
232240

233241
def _prepare_flush(
234242
self,
@@ -317,19 +325,20 @@ def get_pending_count(self) -> int:
317325

318326
async def shutdown(self) -> None:
319327
"""Shutdown the queue, flushing any pending actions."""
328+
cancelled_task: asyncio.Task[None] | None = None
320329
batch_to_execute = None
321330
async with self._lock:
322331
if self._flush_task and not self._flush_task.done():
323-
task = self._flush_task
324-
task.cancel()
332+
cancelled_task = self._flush_task
333+
cancelled_task.cancel()
325334
self._flush_task = None
326-
# Wait for cancellation to complete
327-
with contextlib.suppress(asyncio.CancelledError):
328-
await task
329335

330336
if self._pending_actions:
331337
batch_to_execute = self._prepare_flush()
332338

333-
# Execute outside the lock
339+
if cancelled_task:
340+
with contextlib.suppress(asyncio.CancelledError):
341+
await cancelled_task
342+
334343
if batch_to_execute:
335344
await self._execute_batch(*batch_to_execute)

tests/test_action_queue.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,3 +350,102 @@ async def set_result():
350350

351351
# Ensure background task has completed
352352
await task
353+
354+
355+
@pytest.mark.asyncio
356+
async def test_action_queue_settings_validate():
357+
"""Test that validate raises on invalid settings."""
358+
with pytest.raises(ValueError, match="positive"):
359+
ActionQueueSettings(delay=-1).validate()
360+
361+
with pytest.raises(ValueError, match="at least 1"):
362+
ActionQueueSettings(max_actions=0).validate()
363+
364+
# Valid settings should not raise
365+
ActionQueueSettings(delay=0.5, max_actions=10).validate()
366+
367+
368+
@pytest.mark.asyncio
369+
async def test_action_queue_add_empty_actions(mock_executor):
370+
"""Test that add raises ValueError for empty action list."""
371+
queue = ActionQueue(executor=mock_executor, settings=ActionQueueSettings(delay=0.1))
372+
373+
with pytest.raises(ValueError, match="at least one Action"):
374+
await queue.add([])
375+
376+
377+
@pytest.mark.asyncio
378+
async def test_action_queue_executor_cancelled_propagates():
379+
"""Test that CancelledError during execution propagates to waiters."""
380+
381+
async def cancelling_executor(actions, mode, label):
382+
raise asyncio.CancelledError
383+
384+
queue = ActionQueue(
385+
executor=AsyncMock(side_effect=cancelling_executor),
386+
settings=ActionQueueSettings(delay=0.05),
387+
)
388+
389+
action = Action(
390+
device_url="io://1234-5678-9012/1",
391+
commands=[Command(name=OverkizCommand.CLOSE)],
392+
)
393+
394+
queued = await queue.add([action])
395+
396+
with pytest.raises(asyncio.CancelledError):
397+
await queued
398+
399+
400+
@pytest.mark.asyncio
401+
async def test_action_queue_flush_empty(mock_executor):
402+
"""Test that flushing an empty queue is a no-op."""
403+
queue = ActionQueue(executor=mock_executor, settings=ActionQueueSettings(delay=0.1))
404+
405+
await queue.flush()
406+
mock_executor.assert_not_called()
407+
408+
409+
@pytest.mark.asyncio
410+
async def test_action_queue_shutdown_empty(mock_executor):
411+
"""Test that shutting down an empty queue is a no-op."""
412+
queue = ActionQueue(executor=mock_executor, settings=ActionQueueSettings(delay=0.1))
413+
414+
await queue.shutdown()
415+
mock_executor.assert_not_called()
416+
417+
418+
@pytest.mark.asyncio
419+
async def test_action_queue_no_self_cancel_during_delayed_flush():
420+
"""Test that _delayed_flush does not cancel itself via _prepare_flush.
421+
422+
When _delayed_flush fires and calls _prepare_flush, the flush task is still
423+
the running coroutine. _prepare_flush must not cancel it, otherwise the batch
424+
would fail with CancelledError when the executor performs I/O.
425+
"""
426+
cancel_detected = False
427+
428+
async def slow_executor(actions, mode, label):
429+
nonlocal cancel_detected
430+
try:
431+
await asyncio.sleep(0.05)
432+
except asyncio.CancelledError:
433+
cancel_detected = True
434+
raise
435+
return "exec-ok"
436+
437+
queue = ActionQueue(
438+
executor=AsyncMock(side_effect=slow_executor),
439+
settings=ActionQueueSettings(delay=0.05),
440+
)
441+
442+
action = Action(
443+
device_url="io://1234-5678-9012/1",
444+
commands=[Command(name=OverkizCommand.CLOSE)],
445+
)
446+
447+
queued = await queue.add([action])
448+
exec_id = await queued
449+
450+
assert exec_id == "exec-ok"
451+
assert not cancel_detected, "_delayed_flush cancelled itself via _prepare_flush"

0 commit comments

Comments
 (0)