-
Notifications
You must be signed in to change notification settings - Fork 34
Expand file tree
/
Copy pathaction_queue.py
More file actions
339 lines (280 loc) · 12.6 KB
/
action_queue.py
File metadata and controls
339 lines (280 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""Action queue for batching multiple action executions into single API calls."""
from __future__ import annotations
import asyncio
import contextlib
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from pyoverkiz.models import Action
if TYPE_CHECKING:
from collections.abc import Callable, Coroutine, Generator
from pyoverkiz.enums import ExecutionMode
@dataclass(frozen=True, slots=True)
class ActionQueueSettings:
"""Settings for configuring the action queue behavior."""
delay: float = 0.5
max_actions: int = 20
def validate(self) -> None:
"""Validate configuration values for the action queue."""
if self.delay <= 0:
raise ValueError(f"action_queue_delay must be positive, got {self.delay!r}")
if self.max_actions < 1:
raise ValueError(
f"action_queue_max_actions must be at least 1, got {self.max_actions!r}"
)
class QueuedExecution:
"""Represents a queued action execution that will resolve to an exec_id when the batch executes."""
def __init__(self) -> None:
"""Initialize the queued execution."""
# Future is created lazily to ensure it is bound to the running event loop.
# Creating it in __init__ would fail if no loop is running yet.
self._future: asyncio.Future[str] | None = None
def _ensure_future(self) -> asyncio.Future[str]:
"""Create the underlying future lazily, bound to the running event loop."""
# This method is the single point of future creation to guarantee
# consistent loop binding for callers that await or set results later.
if self._future is None:
loop = asyncio.get_running_loop()
self._future = loop.create_future()
return self._future
def set_result(self, exec_id: str) -> None:
"""Set the execution ID result."""
future = self._ensure_future()
if not future.done():
future.set_result(exec_id)
def set_exception(self, exception: BaseException) -> None:
"""Set an exception if the batch execution failed."""
future = self._ensure_future()
if not future.done():
future.set_exception(exception)
def is_done(self) -> bool:
"""Check if the execution has completed (either with result or exception)."""
return self._future.done() if self._future is not None else False
def __await__(self) -> Generator[Any, None, str]:
"""Make this awaitable."""
return self._ensure_future().__await__()
class ActionQueue:
"""Batches multiple action executions into single API calls.
When actions are added, they are held for a configurable delay period.
If more actions arrive during this window, they are batched together.
The batch is flushed when:
- The delay timer expires
- The max actions limit is reached
- The execution mode changes
- The label changes
- Manual flush is requested
"""
def __init__(
self,
executor: Callable[
[list[Action], ExecutionMode | None, str | None], Coroutine[None, None, str]
],
delay: float = 0.5,
max_actions: int = 20,
) -> None:
"""Initialize the action queue.
:param executor: Async function to execute batched actions
:param delay: Seconds to wait before auto-flushing (default 0.5)
:param max_actions: Maximum actions per batch before forced flush (default 20)
"""
self._executor = executor
self._delay = delay
self._max_actions = max_actions
self._pending_actions: list[Action] = []
self._pending_mode: ExecutionMode | None = None
self._pending_label: str | None = None
self._pending_waiters: list[QueuedExecution] = []
self._flush_task: asyncio.Task[None] | None = None
self._lock = asyncio.Lock()
@staticmethod
def _copy_action(action: Action) -> Action:
"""Return an `Action` copy with an independent commands list.
The queue merges commands for duplicate devices, so caller-owned action
instances must be copied to avoid mutating user input while batching.
"""
return Action(device_url=action.device_url, commands=list(action.commands))
async def add(
self,
actions: list[Action],
mode: ExecutionMode | None = None,
label: str | None = None,
) -> QueuedExecution:
"""Add actions to the queue.
When multiple actions target the same device, their commands are merged
into a single action to respect the gateway limitation of one action per
device in each action group.
Args:
actions: Actions to queue.
mode: Execution mode, which triggers a flush if it differs from the
pending mode.
label: Label for the action group.
Returns:
A `QueuedExecution` that resolves to the `exec_id` when the batch
executes.
"""
batches_to_execute: list[
tuple[list[Action], ExecutionMode | None, str | None, list[QueuedExecution]]
] = []
if not actions:
raise ValueError("actions must contain at least one Action")
normalized_actions: list[Action] = []
normalized_index: dict[str, Action] = {}
for action in actions:
existing = normalized_index.get(action.device_url)
if existing is None:
action_copy = self._copy_action(action)
normalized_actions.append(action_copy)
normalized_index[action.device_url] = action_copy
else:
existing.commands.extend(action.commands)
async with self._lock:
# If mode or label changes, flush existing queue first
if self._pending_actions and (
mode != self._pending_mode or label != self._pending_label
):
batches_to_execute.append(self._prepare_flush())
# Add actions to pending queue
pending_index = {
pending_action.device_url: pending_action
for pending_action in self._pending_actions
}
for action in normalized_actions:
pending = pending_index.get(action.device_url)
if pending is None:
self._pending_actions.append(action)
pending_index[action.device_url] = action
else:
pending.commands.extend(action.commands)
self._pending_mode = mode
self._pending_label = label
# Create waiter for this caller. This waiter is added to the current
# batch being built, even if we flushed a previous batch above due to
# a mode/label change. This ensures the waiter belongs to the batch
# containing the actions we just added.
waiter = QueuedExecution()
self._pending_waiters.append(waiter)
# If we hit max actions, flush immediately
if len(self._pending_actions) >= self._max_actions:
# Prepare the current batch for flushing (which includes the actions
# we just added). If we already flushed due to mode change, this is
# a second batch.
batches_to_execute.append(self._prepare_flush())
elif self._flush_task is None or self._flush_task.done():
# Schedule delayed flush if not already scheduled
self._flush_task = asyncio.create_task(self._delayed_flush())
# Execute batches outside the lock if we flushed
for batch in batches_to_execute:
if batch[0]:
await self._execute_batch(*batch)
return waiter
async def _delayed_flush(self) -> None:
"""Wait for the delay period, then flush the queue."""
waiters: list[QueuedExecution] = []
try:
await asyncio.sleep(self._delay)
async with self._lock:
if not self._pending_actions:
return
# Take snapshot and clear state while holding lock
actions = self._pending_actions
mode = self._pending_mode
label = self._pending_label
waiters = self._pending_waiters
self._pending_actions = []
self._pending_mode = None
self._pending_label = None
self._pending_waiters = []
self._flush_task = None
# Execute outside the lock
await self._execute_batch(actions, mode, label, waiters)
except asyncio.CancelledError as exc:
# Ensure all waiters are notified if this task is cancelled
for waiter in waiters:
waiter.set_exception(exc)
raise
def _prepare_flush(
self,
) -> tuple[list[Action], ExecutionMode | None, str | None, list[QueuedExecution]]:
"""Prepare a flush by taking snapshot and clearing state (must be called with lock held).
Returns a tuple of (actions, mode, label, waiters) that should be executed
outside the lock using _execute_batch().
"""
if not self._pending_actions:
return ([], None, None, [])
# Cancel any pending flush task
if self._flush_task and not self._flush_task.done():
self._flush_task.cancel()
self._flush_task = None
# Take snapshot of current batch
actions = self._pending_actions
mode = self._pending_mode
label = self._pending_label
waiters = self._pending_waiters
# Clear pending state
self._pending_actions = []
self._pending_mode = None
self._pending_label = None
self._pending_waiters = []
return (actions, mode, label, waiters)
async def _execute_batch(
self,
actions: list[Action],
mode: ExecutionMode | None,
label: str | None,
waiters: list[QueuedExecution],
) -> None:
"""Execute a batch of actions and notify waiters (must be called without lock)."""
if not actions:
return
try:
exec_id = await self._executor(actions, mode, label)
# Notify all waiters
for waiter in waiters:
waiter.set_result(exec_id)
except asyncio.CancelledError as exc:
# Propagate cancellation to all waiters, then re-raise.
for waiter in waiters:
waiter.set_exception(exc)
raise
except Exception as exc: # noqa: BLE001
# Propagate exceptions to all waiters without swallowing system-level exits.
for waiter in waiters:
waiter.set_exception(exc)
async def flush(self) -> None:
"""Force flush all pending actions immediately.
This method forces the queue to execute any pending batched actions
without waiting for the delay timer. The execution results are delivered
to the corresponding QueuedExecution objects returned by add().
This method is useful for forcing immediate execution without having to
wait for the delay timer to expire.
"""
batch_to_execute = None
async with self._lock:
if self._pending_actions:
batch_to_execute = self._prepare_flush()
# Execute outside the lock
if batch_to_execute:
await self._execute_batch(*batch_to_execute)
def get_pending_count(self) -> int:
"""Get the (approximate) number of actions currently waiting in the queue.
This method does not acquire the internal lock and therefore returns a
best-effort snapshot that may be slightly out of date if the queue is
being modified concurrently by other coroutines. Do not rely on this
value for critical control flow or for making flush decisions.
"""
return len(self._pending_actions)
async def shutdown(self) -> None:
"""Shutdown the queue, flushing any pending actions."""
batch_to_execute = None
async with self._lock:
if self._flush_task and not self._flush_task.done():
task = self._flush_task
task.cancel()
self._flush_task = None
# Wait for cancellation to complete
with contextlib.suppress(asyncio.CancelledError):
await task
if self._pending_actions:
batch_to_execute = self._prepare_flush()
# Execute outside the lock
if batch_to_execute:
await self._execute_batch(*batch_to_execute)