-
Notifications
You must be signed in to change notification settings - Fork 34
Expand file tree
/
Copy pathaction_queue.py
More file actions
341 lines (278 loc) · 12.6 KB
/
action_queue.py
File metadata and controls
341 lines (278 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
"""Action queue for batching multiple action executions into single API calls."""
from __future__ import annotations
import asyncio
import contextlib
from collections.abc import Callable, Coroutine, Generator
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from pyoverkiz.models import Action
if TYPE_CHECKING:
from pyoverkiz.enums import ExecutionMode
@dataclass(frozen=True, slots=True)
class ActionQueueSettings:
"""Settings for configuring the action queue behavior."""
delay: float = 0.5
max_actions: int = 20
def validate(self) -> None:
"""Validate configuration values for the action queue."""
if self.delay <= 0:
raise ValueError(f"action_queue_delay must be positive, got {self.delay!r}")
if self.max_actions < 1:
raise ValueError(
f"action_queue_max_actions must be at least 1, got {self.max_actions!r}"
)
class QueuedExecution:
"""Represents a queued action execution that will resolve to an exec_id when the batch executes."""
def __init__(self) -> None:
"""Initialize the queued execution."""
# Future is created lazily to ensure it is bound to the running event loop.
# Creating it in __init__ would fail if no loop is running yet.
self._future: asyncio.Future[str] | None = None
def _ensure_future(self) -> asyncio.Future[str]:
"""Create the underlying future lazily, bound to the running event loop."""
# This method is the single point of future creation to guarantee
# consistent loop binding for callers that await or set results later.
if self._future is None:
loop = asyncio.get_running_loop()
self._future = loop.create_future()
return self._future
def set_result(self, exec_id: str) -> None:
"""Set the execution ID result."""
future = self._ensure_future()
if not future.done():
future.set_result(exec_id)
def set_exception(self, exception: BaseException) -> None:
"""Set an exception if the batch execution failed."""
future = self._ensure_future()
if not future.done():
future.set_exception(exception)
def is_done(self) -> bool:
"""Check if the execution has completed (either with result or exception)."""
return self._future.done() if self._future is not None else False
def __await__(self) -> Generator[Any, None, str]:
"""Make this awaitable."""
return self._ensure_future().__await__()
class ActionQueue:
"""Batches device actions into single API calls (action groups).
The Overkiz API executes commands via action groups. Each action group
contains one Action per device, and each Action holds one or more Commands.
The gateway enforces at most one Action per device per action group.
Batching example — two add() calls arriving within the delay window::
add([Action("device/1", [close])])
add([Action("device/2", [open]), Action("device/1", [setClosure(50)])])
Produces one action group with two actions::
ActionGroup(actions=[
Action("device/1", [close, setClosure(50)]), # commands merged
Action("device/2", [open]),
])
Three separate devices would remain three separate actions in the group.
Merging only happens when the same device_url appears more than once.
The queue flushes when:
- The delay timer expires
- The max actions limit is reached
- The execution mode or label changes
- flush() or shutdown() is called
"""
def __init__(
self,
executor: Callable[
[list[Action], ExecutionMode | None, str | None], Coroutine[None, None, str]
],
settings: ActionQueueSettings | None = None,
) -> None:
"""Initialize the action queue.
:param executor: Async function to execute batched actions
:param settings: Queue configuration (uses defaults if None)
"""
self._executor = executor
self._settings = settings or ActionQueueSettings()
self._pending_actions: list[Action] = []
self._pending_mode: ExecutionMode | None = None
self._pending_label: str | None = None
self._pending_waiters: list[QueuedExecution] = []
self._flush_task: asyncio.Task[None] | None = None
self._lock = asyncio.Lock()
@staticmethod
def _merge_actions(
target: list[Action],
index: dict[str, Action],
source: list[Action],
*,
copy: bool = False,
) -> None:
"""Merge *source* actions into *target*, combining commands for duplicate devices.
New device_urls are appended to *target*; existing ones get their commands
extended. When *copy* is True, source actions are copied to avoid mutating
caller-owned objects.
"""
for action in source:
existing = index.get(action.device_url)
if existing is None:
merged = (
Action(device_url=action.device_url, commands=list(action.commands))
if copy
else action
)
target.append(merged)
index[action.device_url] = merged
else:
existing.commands.extend(action.commands)
async def add(
self,
actions: list[Action],
mode: ExecutionMode | None = None,
label: str | None = None,
) -> QueuedExecution:
"""Add actions to the queue.
When multiple actions target the same device, their commands are merged
into a single action to respect the gateway limitation of one action per
device in each action group.
Args:
actions: Actions to queue.
mode: Execution mode, which triggers a flush if it differs from the
pending mode.
label: Label for the action group.
Returns:
A `QueuedExecution` that resolves to the `exec_id` when the batch
executes.
"""
batches_to_execute: list[
tuple[list[Action], ExecutionMode | None, str | None, list[QueuedExecution]]
] = []
if not actions:
raise ValueError("actions must contain at least one Action")
normalized_actions: list[Action] = []
normalized_index: dict[str, Action] = {}
self._merge_actions(normalized_actions, normalized_index, actions, copy=True)
async with self._lock:
# If mode or label changes, flush existing queue first
if self._pending_actions and (
mode != self._pending_mode or label != self._pending_label
):
batches_to_execute.append(self._prepare_flush())
pending_index = {a.device_url: a for a in self._pending_actions}
self._merge_actions(
self._pending_actions, pending_index, normalized_actions
)
self._pending_mode = mode
self._pending_label = label
# Create waiter for this caller. This waiter is added to the current
# batch being built, even if we flushed a previous batch above due to
# a mode/label change. This ensures the waiter belongs to the batch
# containing the actions we just added.
waiter = QueuedExecution()
self._pending_waiters.append(waiter)
# If we hit max actions, flush immediately
if len(self._pending_actions) >= self._settings.max_actions:
# Prepare the current batch for flushing (which includes the actions
# we just added). If we already flushed due to mode change, this is
# a second batch.
batches_to_execute.append(self._prepare_flush())
elif self._flush_task is None or self._flush_task.done():
# Schedule delayed flush if not already scheduled
self._flush_task = asyncio.create_task(self._delayed_flush())
# Execute batches outside the lock if we flushed
for batch in batches_to_execute:
batch_actions, batch_mode, batch_label, batch_waiters = batch
if batch_actions:
await self._execute_batch(
batch_actions, batch_mode, batch_label, batch_waiters
)
return waiter
async def _delayed_flush(self) -> None:
"""Wait for the delay period, then flush the queue."""
await asyncio.sleep(self._settings.delay)
async with self._lock:
self._flush_task = None
# Another coroutine may have already flushed the queue before we acquired the lock.
actions, mode, label, waiters = self._prepare_flush()
if not actions:
return
await self._execute_batch(actions, mode, label, waiters)
def _prepare_flush(
self,
) -> tuple[list[Action], ExecutionMode | None, str | None, list[QueuedExecution]]:
"""Prepare a flush by taking snapshot and clearing state (must be called with lock held).
Returns a tuple of (actions, mode, label, waiters) that should be executed
outside the lock using _execute_batch().
"""
if not self._pending_actions:
return ([], None, None, [])
# Cancel any pending flush task
if self._flush_task and not self._flush_task.done():
self._flush_task.cancel()
self._flush_task = None
# Take snapshot of current batch
actions = self._pending_actions
mode = self._pending_mode
label = self._pending_label
waiters = self._pending_waiters
# Clear pending state
self._pending_actions = []
self._pending_mode = None
self._pending_label = None
self._pending_waiters = []
return (actions, mode, label, waiters)
async def _execute_batch(
self,
actions: list[Action],
mode: ExecutionMode | None,
label: str | None,
waiters: list[QueuedExecution],
) -> None:
"""Execute a batch of actions and notify waiters (must be called without lock)."""
if not actions:
return
try:
exec_id = await self._executor(actions, mode, label)
# Notify all waiters
for waiter in waiters:
waiter.set_result(exec_id)
except asyncio.CancelledError as exc:
# Propagate cancellation to all waiters, then re-raise.
for waiter in waiters:
waiter.set_exception(exc)
raise
except Exception as exc: # noqa: BLE001
# Propagate exceptions to all waiters without swallowing system-level exits.
for waiter in waiters:
waiter.set_exception(exc)
async def flush(self) -> None:
"""Force flush all pending actions immediately.
This method forces the queue to execute any pending batched actions
without waiting for the delay timer. The execution results are delivered
to the corresponding QueuedExecution objects returned by add().
This method is useful for forcing immediate execution without having to
wait for the delay timer to expire.
"""
batch_to_execute = None
async with self._lock:
if self._pending_actions:
batch_to_execute = self._prepare_flush()
# Execute outside the lock
if batch_to_execute:
await self._execute_batch(*batch_to_execute)
def get_pending_count(self) -> int:
"""Get the (approximate) number of actions currently waiting in the queue.
This method does not acquire the internal lock and therefore returns a
best-effort snapshot that may be slightly out of date if the queue is
being modified concurrently by other coroutines. Do not rely on this
value for critical control flow or for making flush decisions.
"""
return len(self._pending_actions)
async def shutdown(self) -> None:
"""Shutdown the queue, flushing any pending actions."""
cancelled_task: asyncio.Task[None] | None = None
batch_to_execute = None
async with self._lock:
if self._flush_task and not self._flush_task.done():
cancelled_task = self._flush_task
cancelled_task.cancel()
self._flush_task = None
if self._pending_actions:
batch_to_execute = self._prepare_flush()
if cancelled_task:
with contextlib.suppress(asyncio.CancelledError):
await cancelled_task
if batch_to_execute:
await self._execute_batch(*batch_to_execute)