diff --git a/.gitignore b/.gitignore index 9c5befa4f..7472fd6ca 100644 --- a/.gitignore +++ b/.gitignore @@ -148,3 +148,6 @@ tests/integration/.binding-data/ # Crypto test material generated at test time (see tests/crypto_utils.py) tests/integration/keys/ + +# Generated benchmark report (machine-specific, regenerated by bench_async_activities.py) +ext/dapr-ext-workflow/benchmarks/RESULTS.md diff --git a/examples/workflow/async_activities.py b/examples/workflow/async_activities.py new file mode 100644 index 000000000..3ab028eb4 --- /dev/null +++ b/examples/workflow/async_activities.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +# Copyright 2026 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Async activities running alongside a sync one in a fan-out/fan-in workflow. + +Each async activity simulates an I/O-bound call: it takes a payload, awaits a fixed +delay (standing in for a network round-trip), and returns a result payload. The async +instances run concurrently on the worker's event loop; a final sync activity aggregates +the results. Fan-out width, input/output payload sizes, and the delay are configurable +via environment variables. + +Run with: + + dapr run --app-id async-activities --app-protocol grpc --dapr-grpc-port 50001 \\ + -- python async_activities.py +""" + +from __future__ import annotations + +import asyncio +import os +import random +import string +from time import sleep + +import dapr.ext.workflow as wf +from pydantic import BaseModel + +FAN_OUT = int(os.environ.get('WORKFLOW_FAN_OUT', '5')) +INPUT_BYTES = int(os.environ.get('WORKFLOW_INPUT_BYTES', '2048')) +OUTPUT_BYTES = int(os.environ.get('WORKFLOW_OUTPUT_BYTES', '1024')) +IO_SECONDS = float(os.environ.get('WORKFLOW_IO_SECONDS', '1.0')) + +wfr = wf.WorkflowRuntime() + + +def _random_digits(n: int) -> str: + return ''.join(random.choices(string.digits, k=n)) + + +class Payload(BaseModel): + index: int + data: str + + +@wfr.workflow(name='fan_out_fan_in_workflow') +def fan_out_fan_in_workflow(ctx: wf.DaprWorkflowContext, payloads: list[dict]): + tasks = [ctx.call_activity(process_payload, input=p) for p in payloads] + results = yield wf.when_all(tasks) + summary = yield ctx.call_activity(summarize, input=results) + return summary + + +@wfr.activity(name='process_payload') +async def process_payload(ctx: wf.WorkflowActivityContext, payload: Payload) -> str: + """Async activity: simulate an I/O-bound call. Instances run concurrently on the loop.""" + await asyncio.sleep(IO_SECONDS) + result = _random_digits(OUTPUT_BYTES) + print( + f'[async] payload {payload.index}: {len(payload.data)}B in -> {len(result)}B out', + flush=True, + ) + return result + + +@wfr.activity(name='summarize') +def summarize(ctx: wf.WorkflowActivityContext, results: list[str]) -> str: + """Sync activity: aggregate the fan-out results on the thread pool.""" + total_bytes = sum(len(r) for r in results) + total_zeros = sum(r.count('0') for r in results) + summary = f'{len(results)} results, {total_bytes} bytes, {total_zeros} zeros' + print(f'[sync] {summary}', flush=True) + return summary + + +def main() -> None: + payloads = [ + Payload(index=i, data=_random_digits(INPUT_BYTES)).model_dump() for i in range(FAN_OUT) + ] + + wfr.start() + sleep(5) # wait for workflow runtime to start + + wf_client = wf.DaprWorkflowClient() + instance_id = wf_client.schedule_new_workflow(workflow=fan_out_fan_in_workflow, input=payloads) + print(f'Workflow started. Instance ID: {instance_id}') + + state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60) + assert state is not None + print(f'Workflow completed! Status: {state.runtime_status.name}') + print(f'Workflow result: {state.serialized_output.strip(chr(34))}') + + wfr.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ext/dapr-ext-workflow/AGENTS.md b/ext/dapr-ext-workflow/AGENTS.md index 635cb8705..bbb7299c8 100644 --- a/ext/dapr-ext-workflow/AGENTS.md +++ b/ext/dapr-ext-workflow/AGENTS.md @@ -105,6 +105,24 @@ The entry point for registration and lifecycle: Internally wraps user functions: workflow functions get a `DaprWorkflowContext`, activity functions get a `WorkflowActivityContext`. Tracks registration state via `_workflow_registered` / `_activity_registered` attributes on functions to prevent double registration. +#### Sync and async activities + +Activities can be either `def my_activity(ctx, inp)` or `async def my_activity(ctx, inp)`. At registration, `_make_activity_wrapper` calls `_is_async_callable(fn)` to detect async-ness. That helper unwraps `functools.partial`, `@functools.wraps` chains, and callable-class `__call__` so common decorator patterns route correctly. The wrapper is built `async def` or `def` to match, then stored in the registry. + +At dispatch time (the gRPC stream loop in `_durabletask/worker.py`), `is_async_callable(activity_fn)` on the wrapper selects between two handlers. + +- **Async activities** go through `_execute_activity_async`, then `_ActivityExecutor.execute_async`, which awaits `fn(...)` directly on the event loop. The gRPC response is delivered via `loop.run_in_executor(self._async_worker_manager.thread_pool, stub.CompleteActivityTask, ...)` — the same pool sync activities use, sized by `maximum_thread_pool_workers`. +- **Sync activities** go through `_execute_activity`, dispatched to the thread pool by `_AsyncWorkerManager._run_func`. The activity runs on a worker thread, and the response is delivered from the same thread. + +Workflow (orchestrator) functions must remain generators (`def` with `yield`). They cannot be `async def` because durabletask's deterministic replay depends on synchronous generator semantics. Only activities support async. + +**Decorator ordering gotcha.** Stacking `@wfr.activity` over `@alternate_name(...)` over `async def` works because `@alternate_name` now emits an `async def innerfn` when the wrapped function is async. A user-written decorator that wraps an async function in a sync `def` (without `@functools.wraps` exposing `__wrapped__`) defeats `_is_async_callable`, routes the activity to the sync path, and produces an un-awaited coroutine. Such decorators should use `@functools.wraps(fn)` so the unwrap walks through them. + +**`maximum_thread_pool_workers` covers both paths.** This knob sizes the worker thread pool used for sync-activity bodies and for async-activity gRPC response sends. Mixed workloads with long-running sync activities can starve async response delivery (and vice versa) since they share the pool — size to the sum of peak sync activity concurrency and peak in-flight async response sends. + +**Concurrency sizing and load characterization.** See `docs/concurrency.md` for sizing recommendations (`maximum_concurrent_activity_work_items`, `maximum_thread_pool_workers`) and an async-vs-sync decision tree. The `benchmarks/` directory ships `bench_async_activities.py`; re-run it locally before claiming a perf regression. The generated `RESULTS.md` is gitignored because numbers are machine-specific; see `docs/concurrency.md` for the regen command. + + ### DaprWorkflowClient (`dapr_workflow_client.py`) Client for workflow lifecycle management: @@ -163,7 +181,7 @@ Retry configuration for activities and child workflows: 1. **Registration**: User decorates functions with `@wfr.workflow` / `@wfr.activity`. The runtime wraps them and stores them in the durabletask worker's registry. 2. **Startup**: `wfr.start()` opens a gRPC stream to the Dapr sidecar. The worker polls for work items. 3. **Scheduling**: Client calls `schedule_new_workflow(fn, input=...)`. The function's name (or `_dapr_alternate_name`) is sent to the backend. -4. **Execution**: The durabletask engine dispatches work items. Workflow functions are Python **generators** that `yield` tasks (activity calls, timers, child workflows). The engine records history; on replay, yielded tasks return cached results without re-executing. +4. **Execution**: The durabletask engine dispatches work items. Workflow functions are Python **generators** that `yield` tasks (activity calls, timers, child workflows). Activity functions are either sync (dispatched to the worker's thread pool) or `async def` (awaited directly on the worker's event loop). The engine records history; on replay, yielded tasks return cached results without re-executing. 5. **Determinism**: Workflows must be deterministic — no random, no wall-clock time, no I/O. Use `ctx.current_utc_datetime` instead of `datetime.now()`. Use `ctx.is_replaying` to guard side effects like logging. 6. **Completion**: Client polls via `wait_for_workflow_completion()` or `get_workflow_state()`. @@ -191,6 +209,7 @@ Two example directories exercise workflows: - `cross-app1.py`, `cross-app2.py`, `cross-app3.py` — cross-app calls - `versioning.py` — workflow versioning with `is_patched()` - `simple_aio_client.py` — async client variant + - `async_activities.py` — `async def` activities (fan-out/fan-in with simulated I/O, configurable payload sizes) ## Testing diff --git a/ext/dapr-ext-workflow/benchmarks/_report.py b/ext/dapr-ext-workflow/benchmarks/_report.py new file mode 100644 index 000000000..31f8c9440 --- /dev/null +++ b/ext/dapr-ext-workflow/benchmarks/_report.py @@ -0,0 +1,223 @@ +# -*- coding: utf-8 -*- +# Copyright 2026 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Run-environment capture and Markdown formatting for the benchmark report.""" + +from __future__ import annotations + +import os +import platform +import shutil +import subprocess +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path + +from dapr.ext.workflow._bench_harness import IS_DARWIN, ScenarioMetrics, SustainedMetrics + + +def _read_text(path: str) -> str: + try: + return Path(path).read_text(encoding='utf-8', errors='ignore') + except OSError: + return '' + + +def _cpu_model() -> str: + """Best-effort CPU model name. Cross-platform; returns a placeholder on failure.""" + if IS_DARWIN: + sysctl = shutil.which('sysctl') + if sysctl is not None: + try: + out = subprocess.run( + [sysctl, '-n', 'machdep.cpu.brand_string'], + capture_output=True, + text=True, + timeout=2, + ) + if out.returncode == 0 and out.stdout.strip(): + return out.stdout.strip() + except (subprocess.SubprocessError, OSError): + pass + cpuinfo = _read_text('/proc/cpuinfo') + for line in cpuinfo.splitlines(): + if line.startswith('model name'): + return line.split(':', 1)[1].strip() + return platform.processor() or platform.machine() or 'unknown' + + +def _total_memory_gb() -> float: + """Best-effort total physical memory in GB. Returns 0 on failure.""" + if IS_DARWIN: + sysctl = shutil.which('sysctl') + if sysctl is not None: + try: + out = subprocess.run( + [sysctl, '-n', 'hw.memsize'], + capture_output=True, + text=True, + timeout=2, + ) + if out.returncode == 0 and out.stdout.strip().isdigit(): + return int(out.stdout.strip()) / (1024**3) + except (subprocess.SubprocessError, OSError): + pass + meminfo = _read_text('/proc/meminfo') + for line in meminfo.splitlines(): + if line.startswith('MemTotal:'): + parts = line.split() + if len(parts) >= 2 and parts[1].isdigit(): + return int(parts[1]) / (1024**2) + return 0.0 + + +def _git_commit() -> str: + """Short git commit hash, or 'unknown' if not in a git repo.""" + git = shutil.which('git') + if git is None: + return 'unknown' + try: + out = subprocess.run( + [git, 'rev-parse', '--short', 'HEAD'], + capture_output=True, + text=True, + timeout=2, + cwd=Path(__file__).parent, + ) + if out.returncode == 0: + commit = out.stdout.strip() + # Mark dirty if there are uncommitted changes. + status = subprocess.run( + [git, 'status', '--porcelain'], + capture_output=True, + text=True, + timeout=2, + cwd=Path(__file__).parent, + ) + if status.returncode == 0 and status.stdout.strip(): + return f'{commit}-dirty' + return commit + except (subprocess.SubprocessError, OSError): + pass + return 'unknown' + + +@dataclass(slots=True) +class RunEnvironment: + """Snapshot of the machine the benchmark ran on.""" + + timestamp_utc: str + git_commit: str + python_version: str + python_implementation: str + platform: str + os_release: str + cpu_model: str + cpu_logical_cores: int + cpu_physical_cores_hint: int + total_memory_gb: float + is_ci: bool + + @classmethod + def capture(cls) -> 'RunEnvironment': + return cls( + timestamp_utc=datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC'), + git_commit=_git_commit(), + python_version=platform.python_version(), + python_implementation=platform.python_implementation(), + platform=platform.platform(), + os_release=f'{platform.system()} {platform.release()} ({platform.machine()})', + cpu_model=_cpu_model(), + cpu_logical_cores=os.cpu_count() or 0, + cpu_physical_cores_hint=os.cpu_count() or 0, + total_memory_gb=_total_memory_gb(), + is_ci=any(os.environ.get(k) for k in ('CI', 'GITHUB_ACTIONS', 'TRAVIS', 'BUILDKITE')), + ) + + +def _format_environment_block(env: RunEnvironment) -> str: + mem_str = f'{env.total_memory_gb:.1f} GB' if env.total_memory_gb > 0 else 'unknown' + return ( + '## Run environment\n' + '\n' + f'- **Timestamp**: {env.timestamp_utc}\n' + f'- **Git commit**: `{env.git_commit}`\n' + f'- **Python**: {env.python_implementation} {env.python_version}\n' + f'- **OS**: {env.os_release}\n' + f'- **CPU**: {env.cpu_model} ({env.cpu_logical_cores} logical cores)\n' + f'- **Memory**: {mem_str}\n' + '\n' + 'Numbers are specific to this machine; the sync-vs-async gap is what transfers across' + ' hardware, not the absolute values.' + ) + + +def _speedup_cell(speedup: float) -> str: + if speedup > 1.2: + dot = '🟢' + elif speedup >= 0.8: + dot = '⚪' + else: + dot = '🔴' + return f'{dot} {speedup:.1f}x' + + +def _format_comparison_table( + rows: list[tuple[str, ScenarioMetrics, ScenarioMetrics]], + key_label: str = 'N', + show_async_rss: bool = False, +) -> str: + rss_header = ' Async RAM (MB) |' if show_async_rss else '' + rss_rule = ' ---: |' if show_async_rss else '' + header = ( + f'| {key_label} | Sync (s) | Async (s) | Speedup |{rss_header}\n' + f'| ---: | ---: | ---: | ---: |{rss_rule}\n' + ) + lines = [] + for key, sync_m, async_m in rows: + speedup = sync_m.wallclock_s / async_m.wallclock_s if async_m.wallclock_s > 0 else 0.0 + rss = f' {async_m.peak_rss_delta_mb:.0f} |' if show_async_rss else '' + lines.append( + f'| {key} | {sync_m.wallclock_s:.2f} | {async_m.wallclock_s:.2f} |' + f' {_speedup_cell(speedup)} |{rss}' + ) + return header + '\n'.join(lines) + + +def _format_sustained_table(sync_m: SustainedMetrics, async_m: SustainedMetrics) -> str: + def row(label: str, sync_val: str, async_val: str) -> str: + return f'| {label} | {sync_val} | {async_val} |' + + header = '| Metric | Sync | Async |\n| --- | ---: | ---: |\n' + rows = [ + row( + 'Effective throughput', + f'{sync_m.throughput_per_s:.0f}/s', + f'{async_m.throughput_per_s:.0f}/s', + ), + row( + 'p99 latency', + f'{sync_m.latency_overall.p99_ms:.0f} ms', + f'{async_m.latency_overall.p99_ms:.0f} ms', + ), + row( + 'p99 first quarter', + f'{sync_m.latency_first_quarter.p99_ms:.0f} ms', + f'{async_m.latency_first_quarter.p99_ms:.0f} ms', + ), + row( + 'p99 last quarter', + f'{sync_m.latency_last_quarter.p99_ms:.0f} ms', + f'{async_m.latency_last_quarter.p99_ms:.0f} ms', + ), + ] + return header + '\n'.join(rows) diff --git a/ext/dapr-ext-workflow/benchmarks/bench_async_activities.py b/ext/dapr-ext-workflow/benchmarks/bench_async_activities.py new file mode 100644 index 000000000..79afd8e47 --- /dev/null +++ b/ext/dapr-ext-workflow/benchmarks/bench_async_activities.py @@ -0,0 +1,353 @@ +# -*- coding: utf-8 -*- +# Copyright 2026 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Sync-vs-async activity benchmarks for ``dapr-ext-workflow``. + +Runs the same I/O-bound activity workload as ``def`` and ``async def`` through the +production dispatch path against a mock sidecar stub. Scenarios: a fan-out burst, a +fan-out shaped as many small workflows, and a sustained open-loop run. + +Run: + + uv run python ext/dapr-ext-workflow/benchmarks/bench_async_activities.py + +``DAPR_BENCH_ACTIVITY_MS`` overrides the activity duration, ``DAPR_BENCH_SUSTAINED_SECONDS`` +the sustained run. Writes ``benchmarks/RESULTS.md`` and asserts pass-criteria budgets. +""" + +from __future__ import annotations + +import asyncio +import logging +import os +from pathlib import Path + +from _report import ( + RunEnvironment, + _format_comparison_table, + _format_environment_block, + _format_sustained_table, +) +from dapr.ext.workflow._bench_harness import ( + ScenarioMetrics, + SustainedMetrics, + _async_payload_factory, + _async_sleep_factory, + _run_full, + _run_lite, + _run_sustained, + _sync_payload_factory, +) + +RESULTS_PATH = Path(__file__).parent / 'RESULTS.md' +SUSTAINED_DURATION_S = float(os.environ.get('DAPR_BENCH_SUSTAINED_SECONDS', '120')) +ACTIVITY_LATENCY_S = float(os.environ.get('DAPR_BENCH_ACTIVITY_MS', '200')) / 1000.0 + +ACTIVITY_GRID = (10, 100, 1000) +WF_ACTIVITY_GRID = (10, 100, 1000) +ACTIVITIES_PER_WORKFLOW = 3 +FIXED_PAYLOAD_BYTES = 10 * 1024 +PAYLOAD_WORKFLOWS = 100 +PAYLOAD_GRID = (('1KB', 1024), ('100KB', 100 * 1024), ('1MB', 1024 * 1024)) +WF_IO_S = 0.05 +_THREAD_POOL = 16 +_SEMAPHORE_CAP = 1200 + +Row = tuple[str, ScenarioMetrics, ScenarioMetrics] + + +def _sustained_rate() -> float: + """Arrival rate just above the sync ceiling (pool / latency).""" + return round((_THREAD_POOL / ACTIVITY_LATENCY_S) * 1.25) + + +async def _run(kind: str, n_items: int) -> ScenarioMetrics: + return await _run_full( + name=f'{kind} {n_items}', + n_items=n_items, + semaphore_cap=_SEMAPHORE_CAP, + thread_pool_workers=_THREAD_POOL, + server_latency_s=ACTIVITY_LATENCY_S, + activity_kind=kind, + ) + + +async def _fanout_pair(label: str, n_items: int) -> Row: + return label, await _run('sync', n_items), await _run('async', n_items) + + +async def run_fanout() -> list[Row]: + """One workflow fanning out a varying number of activities, sync vs async.""" + return [await _fanout_pair(str(a), a) for a in ACTIVITY_GRID] + + +async def _workflow_pair(label: str, n_items: int, payload_bytes: int) -> Row: + """Sync vs async: ``n_items`` sleep activities, each carrying a ``payload_bytes`` payload.""" + sync_m = await _run_full( + name=f'sync {label}', + n_items=n_items, + semaphore_cap=_SEMAPHORE_CAP, + thread_pool_workers=_THREAD_POOL, + server_latency_s=WF_IO_S, + activity_kind='sync', + activity_factory=lambda s, e: _sync_payload_factory(WF_IO_S, payload_bytes, s, e), + input_bytes=payload_bytes, + ) + async_m = await _run_full( + name=f'async {label}', + n_items=n_items, + semaphore_cap=_SEMAPHORE_CAP, + thread_pool_workers=_THREAD_POOL, + server_latency_s=WF_IO_S, + activity_kind='async', + activity_factory=lambda s, e: _async_payload_factory(WF_IO_S, payload_bytes, s, e), + input_bytes=payload_bytes, + ) + return label, sync_m, async_m + + +async def run_by_payload() -> list[Row]: + """Fixed workflows x activities, varying payload size.""" + n_items = PAYLOAD_WORKFLOWS * ACTIVITIES_PER_WORKFLOW + return [await _workflow_pair(label, n_items, size) for label, size in PAYLOAD_GRID] + + +async def run_by_scale() -> list[Row]: + """Varying workflows x activities, fixed payload size.""" + rows: list[Row] = [] + for w in WF_ACTIVITY_GRID: + label = f'{w} × {ACTIVITIES_PER_WORKFLOW}' + rows.append(await _workflow_pair(label, w * ACTIVITIES_PER_WORKFLOW, FIXED_PAYLOAD_BYTES)) + return rows + + +async def run_sustained() -> tuple[SustainedMetrics, SustainedMetrics]: + """Open-loop arrival above the sync ceiling, sync vs async.""" + rate = _sustained_rate() + sync_m = await _run_sustained( + duration_s=SUSTAINED_DURATION_S, + target_rate_per_s=rate, + semaphore_cap=_SEMAPHORE_CAP, + thread_pool_workers=_THREAD_POOL, + server_latency_s=ACTIVITY_LATENCY_S, + activity_kind='sync', + ) + async_m = await _run_sustained( + duration_s=SUSTAINED_DURATION_S, + target_rate_per_s=rate, + semaphore_cap=_SEMAPHORE_CAP, + thread_pool_workers=_THREAD_POOL, + server_latency_s=ACTIVITY_LATENCY_S, + activity_kind='async', + ) + return sync_m, async_m + + +async def _run_semaphore_gate() -> ScenarioMetrics: + """Async fan-out behind a small semaphore, to confirm the cap still gates.""" + return await _run_full( + name='semaphore gate', + n_items=1000, + semaphore_cap=50, + thread_pool_workers=_THREAD_POOL, + server_latency_s=ACTIVITY_LATENCY_S, + activity_kind='async', + ) + + +async def _run_oom_safety() -> ScenarioMetrics: + """10k async activities behind a 1k semaphore, checking parked Tasks don't blow RSS.""" + return await _run_lite( + name='oom safety', + activity=_async_sleep_factory(ACTIVITY_LATENCY_S, {}, {}), + n_items=10_000, + semaphore_cap=1000, + thread_pool_workers=_THREAD_POOL, + server_latency_s=ACTIVITY_LATENCY_S, + ) + + +def _write_results( + *, + env: RunEnvironment, + fanout: list[Row], + by_payload: list[Row], + by_scale: list[Row], + sustained: tuple[SustainedMetrics, SustainedMetrics], +) -> None: + sustained_sync, sustained_async = sustained + rate = _sustained_rate() + ceiling = _THREAD_POOL / ACTIVITY_LATENCY_S + latency_ms = ACTIVITY_LATENCY_S * 1000 + fixed_kb = FIXED_PAYLOAD_BYTES // 1024 + report = f""" +# Sync vs async activity benchmark + +Generated by `bench_async_activities.py`. Re-run with: + +```bash +uv run python ext/dapr-ext-workflow/benchmarks/bench_async_activities.py +``` + +{_format_environment_block(env)} + +Each I/O activity is a {latency_ms:.0f} ms wait. A `def` activity holds a worker thread for that wait, so the sync path runs at most {_THREAD_POOL} at a time (~{ceiling:.0f}/s); an `async def` activity holds only an event-loop slot, so the waits overlap. + +## 1. Fan-out (one workflow) + +One workflow fans out a batch of activities at once. Below the thread pool the paths match; above it the sync path serializes and the gap grows with the batch size. + +{_format_comparison_table(fanout, key_label='Activities')} + +## 2. Workflows: all sync vs all async + +Each workflow runs {ACTIVITIES_PER_WORKFLOW} activities (a fan-out and its fan-in aggregator), all the same kind, a sleep carrying a payload. The sync column runs them on the thread pool, the async column on the event loop. A row labeled `{PAYLOAD_WORKFLOWS} × {ACTIVITIES_PER_WORKFLOW}` is {PAYLOAD_WORKFLOWS} workflows of {ACTIVITIES_PER_WORKFLOW} activities each. + +### 2a. Varying workflows × activities (fixed {fixed_kb}KB payload) + +Scaling the workflow count. The async advantage widens with the count, since the sync path is still capped by the thread pool while the async path keeps overlapping. + +{_format_comparison_table(by_scale, key_label='Workflows × activities')} + +### 2b. Varying payload (fixed {PAYLOAD_WORKFLOWS} workflows × {ACTIVITIES_PER_WORKFLOW} activities) + +Each activity receives and returns a payload of the given size. Async still wins because the waits overlap, but the win narrows and RAM climbs as the payload grows. Every in-flight activity holds its input and output, and JSON ser/deser runs on the loop. RAM is the async run, which holds the most in flight at once. + +{_format_comparison_table(by_payload, key_label='Payload', show_async_rss=True)} + +## 3. Sustained arrival + +Open-loop submission at {rate:.0f}/s, above the ~{ceiling:.0f}/s sync ceiling. The sync path falls behind and its latency drifts upward across the run; the async path holds steady. Compare the first-quarter and last-quarter p99. + +{_format_sustained_table(sustained_sync, sustained_async)} + +See `ext/dapr-ext-workflow/docs/concurrency.md` for sizing guidance. +""" + RESULTS_PATH.write_text(report, encoding='utf-8') + + +def _assert_budgets( + *, + fanout: list[Row], + by_payload: list[Row], + by_scale: list[Row], + sustained: tuple[SustainedMetrics, SustainedMetrics], + semaphore_gate: ScenarioMetrics, + oom: ScenarioMetrics, +) -> None: + """Pass criteria. Generous bounds: catch order-of-magnitude regressions, not jitter.""" + _, sync_big, async_big = fanout[-1] + assert async_big.wallclock_s < ACTIVITY_LATENCY_S * 8, ( + f'Async fan-out N={async_big.n_items} took {async_big.wallclock_s:.2f}s for' + f' {ACTIVITY_LATENCY_S}s activities; async is not overlapping I/O.' + ) + assert async_big.wallclock_s * 3.0 < sync_big.wallclock_s, ( + f'Fan-out N={async_big.n_items}: async {async_big.wallclock_s:.2f}s not 3x faster' + f' than sync {sync_big.wallclock_s:.2f}s.' + ) + + # At the largest scale, async clears the workflows far faster than sync. + _, scale_sync, scale_async = by_scale[-1] + assert scale_async.wallclock_s * 3.0 < scale_sync.wallclock_s, ( + f'Scale sweep: async {scale_async.wallclock_s:.2f}s not 3x faster than sync' + f' {scale_sync.wallclock_s:.2f}s.' + ) + + # Async still beats sync at the largest payload, even as the gap narrows. + _, pay_sync, pay_async = by_payload[-1] + assert pay_async.wallclock_s < pay_sync.wallclock_s, ( + f'Payload sweep: async {pay_async.wallclock_s:.2f}s was not faster than sync' + f' {pay_sync.wallclock_s:.2f}s at the largest payload.' + ) + + sustained_sync, sustained_async = sustained + async_first = max(sustained_async.latency_first_quarter.p99_ms, 1.0) + async_last = sustained_async.latency_last_quarter.p99_ms + assert async_last <= async_first * 3.0, ( + f'Async sustained tail drifted: first-quarter p99 {async_first:.0f} ms,' + f' last-quarter p99 {async_last:.0f} ms.' + ) + sync_first = max(sustained_sync.latency_first_quarter.p99_ms, 1.0) + sync_last = sustained_sync.latency_last_quarter.p99_ms + assert sync_last > sync_first * 2.0, ( + f'Sync sustained tail did not drift: first-quarter p99 {sync_first:.0f} ms,' + f' last-quarter p99 {sync_last:.0f} ms.' + ) + assert sync_last > async_last * 3.0, ( + f'Sync last-quarter p99 ({sync_last:.0f} ms) not >3x async ({async_last:.0f} ms).' + ) + + assert semaphore_gate.wallclock_s > async_big.wallclock_s * 3.0, ( + f'Semaphore did not gate: capped async {semaphore_gate.wallclock_s:.2f}s not' + f' meaningfully slower than ungated {async_big.wallclock_s:.2f}s.' + ) + + assert oom.peak_tasks <= int(oom.n_items * 1.5), ( + f'Peak Tasks ({oom.peak_tasks}) exceeded 1.5x N={oom.n_items}.' + ) + assert oom.peak_rss_delta_mb < 500.0, ( + f'Peak RSS delta {oom.peak_rss_delta_mb:.1f} MB exceeded the 500 MB budget.' + ) + + +async def main() -> None: + logging.basicConfig(level=logging.WARNING) + env = RunEnvironment.capture() + print( + f'[env] {env.cpu_model} | {env.cpu_logical_cores} cores |' + f' {env.python_implementation} {env.python_version}', + flush=True, + ) + + print('[1/4] fan-out (one workflow) sync vs async...', flush=True) + fanout = await run_fanout() + + print('[2/4] workflows all sync vs all async (payload and scale sweeps)...', flush=True) + by_payload = await run_by_payload() + by_scale = await run_by_scale() + + print(f'[3/4] sustained sync vs async ({SUSTAINED_DURATION_S:.0f}s each)...', flush=True) + sustained = await run_sustained() + + print('[4/4] hidden regression checks...', flush=True) + semaphore_gate = await _run_semaphore_gate() + oom = await _run_oom_safety() + + _write_results( + env=env, + fanout=fanout, + by_payload=by_payload, + by_scale=by_scale, + sustained=sustained, + ) + print('\n=== fan-out (one workflow) ===') + print(_format_comparison_table(fanout, key_label='Activities')) + print('\n=== 2a. varying workflows × activities ===') + print(_format_comparison_table(by_scale, key_label='Workflows × activities')) + print('\n=== 2b. varying payload ===') + print(_format_comparison_table(by_payload, key_label='Payload', show_async_rss=True)) + print('\n=== sustained ===') + print(_format_sustained_table(sustained[0], sustained[1])) + print(f'\nWrote {RESULTS_PATH.relative_to(Path.cwd())}') + + _assert_budgets( + fanout=fanout, + by_payload=by_payload, + by_scale=by_scale, + sustained=sustained, + semaphore_gate=semaphore_gate, + oom=oom, + ) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_bench_harness.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_bench_harness.py new file mode 100644 index 000000000..79fcba437 --- /dev/null +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_bench_harness.py @@ -0,0 +1,541 @@ +# -*- coding: utf-8 -*- +# Copyright 2026 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared dispatch harness for the workflow benchmarks and perf regression tests. + +Metrics, the mock sidecar stub, and the runners that drive ``_execute_activity`` and +``_execute_activity_async`` through ``_AsyncWorkerManager``. Imported by both +``benchmarks/bench_async_activities.py`` and ``tests/durabletask`` so they share one +dispatch harness. Internal: not part of the public API. +""" + +from __future__ import annotations + +import asyncio +import logging +import math +import os +import statistics +import sys +import time +from contextlib import asynccontextmanager +from dataclasses import dataclass, field +from typing import Awaitable, Callable + +import dapr.ext.workflow._durabletask.internal.protos as pb +from dapr.ext.workflow._durabletask import task +from dapr.ext.workflow._durabletask.internal import shared +from dapr.ext.workflow._durabletask.worker import ( + ConcurrencyOptions, + TaskHubGrpcWorker, + _AsyncWorkerManager, +) + +LOGGER = logging.getLogger('bench') +IS_DARWIN = sys.platform == 'darwin' + +# ============================================================================ +# Data classes +# ============================================================================ + + +@dataclass(slots=True) +class LatencyStats: + """Summary statistics for a population of end-to-end latency samples.""" + + count: int + mean_ms: float + p50_ms: float + p95_ms: float + p99_ms: float + max_ms: float + + @classmethod + def from_samples(cls, samples_s: list[float]) -> 'LatencyStats': + if not samples_s: + return cls(count=0, mean_ms=0.0, p50_ms=0.0, p95_ms=0.0, p99_ms=0.0, max_ms=0.0) + samples_ms = sorted(s * 1000.0 for s in samples_s) + return cls( + count=len(samples_ms), + mean_ms=statistics.fmean(samples_ms), + p50_ms=_percentile(samples_ms, 0.50), + p95_ms=_percentile(samples_ms, 0.95), + p99_ms=_percentile(samples_ms, 0.99), + max_ms=samples_ms[-1], + ) + + +@dataclass(slots=True) +class ScenarioMetrics: + """Per-scenario summary written to the results table.""" + + name: str + n_items: int + semaphore_cap: int + thread_pool_workers: int + server_latency_s: float + wallclock_s: float + throughput_per_s: float + latency: LatencyStats + peak_tasks: int + peak_queue_depth: int + peak_rss_delta_mb: float + notes: str = '' + + +@dataclass +class _Sampler: + """Background sampler for in-flight task count, queue depth, and RSS.""" + + interval_s: float = 0.05 + peak_tasks: int = 0 + peak_rss_kb: int = 0 + peak_queue_depth: int = 0 + _queues: list[asyncio.Queue] = field(default_factory=list) + _stop_event: asyncio.Event = field(default_factory=asyncio.Event) + + def watch_queue(self, q: asyncio.Queue | None) -> None: + if q is not None: + self._queues.append(q) + + async def run(self) -> None: + while not self._stop_event.is_set(): + self.peak_tasks = max(self.peak_tasks, len(asyncio.all_tasks())) + self.peak_rss_kb = max(self.peak_rss_kb, _current_rss_kb()) + for q in self._queues: + self.peak_queue_depth = max(self.peak_queue_depth, q.qsize()) + try: + await asyncio.wait_for(self._stop_event.wait(), timeout=self.interval_s) + except asyncio.TimeoutError: + continue + + def stop(self) -> None: + self._stop_event.set() + + +# ============================================================================ +# Helpers +# ============================================================================ + + +def _percentile(sorted_samples_ms: list[float], q: float) -> float: + if not sorted_samples_ms: + return 0.0 + if len(sorted_samples_ms) == 1: + return sorted_samples_ms[0] + pos = q * (len(sorted_samples_ms) - 1) + lo = math.floor(pos) + hi = math.ceil(pos) + if lo == hi: + return sorted_samples_ms[lo] + frac = pos - lo + return sorted_samples_ms[lo] + frac * (sorted_samples_ms[hi] - sorted_samples_ms[lo]) + + +try: + import resource as _resource # POSIX only +except ImportError: + _resource = None + + +def _current_rss_kb() -> int: + """Process RSS in KB. macOS returns bytes from getrusage; Linux returns KB. + Returns 0 on Windows since `resource` is unavailable there. + """ + if _resource is None: + return 0 + rss = _resource.getrusage(_resource.RUSAGE_SELF).ru_maxrss + if IS_DARWIN: + return rss // 1024 + return rss + + +# ============================================================================ +# Mock sidecar stub (production response path goes through here) +# ============================================================================ + + +class _MockSidecarStub: + """In-process stand-in for ``TaskHubSidecarServiceStub``. + + ``_execute_activity_async`` and ``_execute_activity`` deliver responses via + ``stub.CompleteActivityTask``. The mock records completion timestamps so the + harness can compute end-to-end latency (submit -> delivery). ``send_latency_s`` + simulates a slow sidecar. + """ + + def __init__(self, send_latency_s: float = 0.0): + self.send_latency_s = send_latency_s + self.completions: dict[int, float] = {} + self.calls = 0 + + def Hello(self, *_args, **_kwargs) -> None: # noqa: N802 + return None + + def CompleteActivityTask(self, response: pb.ActivityResponse) -> None: # noqa: N802 + if self.send_latency_s > 0: + time.sleep(self.send_latency_s) + self.completions[response.taskId] = time.perf_counter() + self.calls += 1 + + def CompleteOrchestratorTask(self, *_args, **_kwargs) -> None: # noqa: N802 + return None + + +def _random_payload(n: int) -> str: + """A JSON-safe random string of ~n characters (hex of random bytes).""" + return os.urandom(max(1, n // 2)).hex()[:n] + + +def _build_activity_request( + name: str, task_id: int, instance_id: str, encoded_input: str = '' +) -> pb.ActivityRequest: + req = pb.ActivityRequest( + name=name, + taskId=task_id, + workflowInstance=pb.WorkflowInstance(instanceId=instance_id), + parentTraceContext=pb.TraceContext(traceParent=''), + taskExecutionId='', + ) + if encoded_input: + req.input.value = encoded_input + return req + + +# ============================================================================ +# Activity factories. Record per-invocation timestamps so the harness can +# decompose end-to-end latency into queue-wait / work / delivery. +# ============================================================================ + + +def _async_sleep_factory( + latency_s: float, start_ts: dict[int, float], end_ts: dict[int, float] +) -> Callable[[task.ActivityContext, object], Awaitable[None]]: + """Build an async activity that sleeps. Records per-task start/end timestamps.""" + + async def sleep(ctx: task.ActivityContext, _inp: object) -> None: + start_ts[ctx.task_id] = time.perf_counter() + await asyncio.sleep(latency_s) + end_ts[ctx.task_id] = time.perf_counter() + + return sleep + + +def _sync_sleep_factory( + latency_s: float, start_ts: dict[int, float], end_ts: dict[int, float] +) -> Callable[[task.ActivityContext, object], None]: + """Build a sync activity that sleeps. Records per-task start/end timestamps.""" + + def sleep(ctx: task.ActivityContext, _inp: object) -> None: + start_ts[ctx.task_id] = time.perf_counter() + time.sleep(latency_s) + end_ts[ctx.task_id] = time.perf_counter() + + return sleep + + +def _async_payload_factory( + latency_s: float, out_bytes: int, start_ts: dict[int, float], end_ts: dict[int, float] +) -> Callable[[task.ActivityContext, object], Awaitable[str]]: + """Async activity that returns an ``out_bytes`` payload, exercising result serialization.""" + payload = _random_payload(out_bytes) + + async def run(ctx: task.ActivityContext, _inp: object) -> str: + start_ts[ctx.task_id] = time.perf_counter() + await asyncio.sleep(latency_s) + end_ts[ctx.task_id] = time.perf_counter() + return payload + + return run + + +def _sync_payload_factory( + latency_s: float, out_bytes: int, start_ts: dict[int, float], end_ts: dict[int, float] +) -> Callable[[task.ActivityContext, object], str]: + """Sync counterpart of ``_async_payload_factory``.""" + payload = _random_payload(out_bytes) + + def run(ctx: task.ActivityContext, _inp: object) -> str: + start_ts[ctx.task_id] = time.perf_counter() + time.sleep(latency_s) + end_ts[ctx.task_id] = time.perf_counter() + return payload + + return run + + +# ============================================================================ +# Full-path harness. Exercises _execute_activity_async / _execute_activity +# through _AsyncWorkerManager with a mock CompleteActivityTask stub. +# ============================================================================ + + +def _build_worker(options: ConcurrencyOptions) -> TaskHubGrpcWorker: + """Build a TaskHubGrpcWorker without calling start(). We only need its dispatch + code and registry; the gRPC stream is replaced by the mock stub. + """ + return TaskHubGrpcWorker( + host_address='in-process-mock', + concurrency_options=options, + ) + + +ActivityFactory = Callable[[dict[int, float], dict[int, float]], Callable[..., object]] + + +def _options(semaphore_cap: int, thread_pool_workers: int) -> ConcurrencyOptions: + return ConcurrencyOptions( + maximum_concurrent_activity_work_items=semaphore_cap, + maximum_concurrent_orchestration_work_items=semaphore_cap, + maximum_thread_pool_workers=thread_pool_workers, + ) + + +def _activity_and_handler( + worker: TaskHubGrpcWorker, + kind: str, + factory: ActivityFactory | None, + latency_s: float, + start_ts: dict[int, float], + end_ts: dict[int, float], +) -> tuple[Callable[..., object], Callable[..., object]]: + """Build the activity callable and pick the matching dispatch handler for ``kind``.""" + if kind == 'async': + fn = ( + factory(start_ts, end_ts) + if factory + else _async_sleep_factory(latency_s, start_ts, end_ts) + ) + return fn, worker._execute_activity_async + if kind == 'sync': + fn = ( + factory(start_ts, end_ts) + if factory + else _sync_sleep_factory(latency_s, start_ts, end_ts) + ) + return fn, worker._execute_activity + raise ValueError(f'unknown activity_kind: {kind}') + + +@asynccontextmanager +async def _running_manager(manager): + """Start the manager and an RSS/task sampler; on exit drain the queue and tear down. + + Yields ``(sampler, baseline_rss_kb)``. + """ + baseline_rss_kb = _current_rss_kb() + sampler = _Sampler() + sampler_task = asyncio.create_task(sampler.run()) + worker_task = asyncio.create_task(manager.run()) + while manager.activity_queue is None: + await asyncio.sleep(0) + sampler.watch_queue(manager.activity_queue) + try: + yield sampler, baseline_rss_kb + finally: + manager._shutdown = True + sampler.stop() + await asyncio.gather(worker_task, sampler_task, return_exceptions=True) + manager.shutdown() + + +def _metrics( + *, + name: str, + n_items: int, + semaphore_cap: int, + thread_pool_workers: int, + server_latency_s: float, + wallclock_s: float, + e2e_samples: list[float], + sampler: _Sampler, + baseline_rss_kb: int, +) -> ScenarioMetrics: + completed = len(e2e_samples) if e2e_samples else n_items + return ScenarioMetrics( + name=name, + n_items=n_items, + semaphore_cap=semaphore_cap, + thread_pool_workers=thread_pool_workers, + server_latency_s=server_latency_s, + wallclock_s=wallclock_s, + throughput_per_s=completed / wallclock_s if wallclock_s > 0 else 0.0, + latency=LatencyStats.from_samples(e2e_samples), + peak_tasks=sampler.peak_tasks, + peak_queue_depth=sampler.peak_queue_depth, + peak_rss_delta_mb=max(0.0, (sampler.peak_rss_kb - baseline_rss_kb) / 1024.0), + ) + + +def _make_activity_context(orchestration_id: str, task_id: int) -> task.ActivityContext: + return task.ActivityContext(orchestration_id, task_id, '', propagated_history=None) + + +@dataclass(slots=True) +class SustainedMetrics: + """Steady-state metrics for the sustained-load scenario.""" + + target_rate_per_s: float + duration_s: float + submitted: int + completed: int + wallclock_s: float + throughput_per_s: float + latency_overall: LatencyStats + latency_first_quarter: LatencyStats + latency_last_quarter: LatencyStats + peak_tasks: int + peak_queue_depth: int + peak_rss_delta_mb: float + + +async def _run_full( + *, + name: str, + n_items: int, + semaphore_cap: int, + thread_pool_workers: int, + server_latency_s: float, + activity_kind: str, + activity_factory: ActivityFactory | None = None, + input_bytes: int = 0, +) -> ScenarioMetrics: + """Submit ``n_items`` activities through the production dispatch path, timing the batch. + + ``input_bytes`` attaches a serialized input payload to each request so the executor's + input deserialization is part of the measurement. + """ + worker = _build_worker(_options(semaphore_cap, thread_pool_workers)) + manager = worker._async_worker_manager + stub = _MockSidecarStub() + start_ts: dict[int, float] = {} + end_ts: dict[int, float] = {} + activity_fn, handler = _activity_and_handler( + worker, activity_kind, activity_factory, server_latency_s, start_ts, end_ts + ) + activity_name = f'bench_{activity_kind}' + worker._registry.add_named_activity(activity_name, activity_fn) + encoded_input = shared.to_json(_random_payload(input_bytes)) if input_bytes else '' + + submit_ts: dict[int, float] = {} + async with _running_manager(manager) as (sampler, baseline_rss_kb): + submit_start = time.perf_counter() + for i in range(n_items): + req = _build_activity_request(activity_name, i, 'bench', encoded_input) + submit_ts[i] = time.perf_counter() + manager.submit_activity(handler, activity_fn, req, stub, '') + await manager.activity_queue.join() + wallclock_s = time.perf_counter() - submit_start + + e2e = [stub.completions[i] - t for i, t in submit_ts.items() if i in stub.completions] + return _metrics( + name=name, + n_items=n_items, + semaphore_cap=semaphore_cap, + thread_pool_workers=thread_pool_workers, + server_latency_s=server_latency_s, + wallclock_s=wallclock_s, + e2e_samples=e2e, + sampler=sampler, + baseline_rss_kb=baseline_rss_kb, + ) + + +async def _run_lite( + *, + name: str, + activity: Callable, + n_items: int, + semaphore_cap: int, + thread_pool_workers: int, + server_latency_s: float, +) -> ScenarioMetrics: + """Submit activities straight to a bare manager (no proto/stub), for the OOM check.""" + manager = _AsyncWorkerManager(_options(semaphore_cap, thread_pool_workers), logger=LOGGER) + async with _running_manager(manager) as (sampler, baseline_rss_kb): + start = time.perf_counter() + for i in range(n_items): + manager.submit_activity(activity, _make_activity_context('bench', i), None) + await manager.activity_queue.join() + wallclock_s = time.perf_counter() - start + + return _metrics( + name=name, + n_items=n_items, + semaphore_cap=semaphore_cap, + thread_pool_workers=thread_pool_workers, + server_latency_s=server_latency_s, + wallclock_s=wallclock_s, + e2e_samples=[], + sampler=sampler, + baseline_rss_kb=baseline_rss_kb, + ) + + +async def _run_sustained( + *, + duration_s: float, + target_rate_per_s: float, + semaphore_cap: int, + thread_pool_workers: int, + server_latency_s: float, + activity_kind: str = 'async', +) -> SustainedMetrics: + """Submit open-loop at ``target_rate_per_s`` for ``duration_s``, then drain.""" + worker = _build_worker(_options(semaphore_cap, thread_pool_workers)) + manager = worker._async_worker_manager + stub = _MockSidecarStub() + start_ts: dict[int, float] = {} + end_ts: dict[int, float] = {} + activity_fn, handler = _activity_and_handler( + worker, activity_kind, None, server_latency_s, start_ts, end_ts + ) + activity_name = f'bench_sustained_{activity_kind}' + worker._registry.add_named_activity(activity_name, activity_fn) + + submit_ts: dict[int, float] = {} + submit_interval = 1.0 / target_rate_per_s + submitted = 0 + async with _running_manager(manager) as (sampler, baseline_rss_kb): + bench_start = time.perf_counter() + next_submit = bench_start + while time.perf_counter() - bench_start < duration_s: + if time.perf_counter() >= next_submit: + req = _build_activity_request(activity_name, submitted, 'bench-sus') + submit_ts[submitted] = time.perf_counter() + manager.submit_activity(handler, activity_fn, req, stub, '') + submitted += 1 + next_submit += submit_interval + continue + await asyncio.sleep(max(0.0, next_submit - time.perf_counter())) + await manager.activity_queue.join() + wallclock_s = time.perf_counter() - bench_start + + samples = sorted( + (t, stub.completions[i] - t) for i, t in submit_ts.items() if i in stub.completions + ) + overall = [d for _, d in samples] + quarter = max(1, len(overall) // 4) + return SustainedMetrics( + target_rate_per_s=target_rate_per_s, + duration_s=duration_s, + submitted=submitted, + completed=len(overall), + wallclock_s=wallclock_s, + throughput_per_s=len(overall) / wallclock_s if wallclock_s > 0 else 0.0, + latency_overall=LatencyStats.from_samples(overall), + latency_first_quarter=LatencyStats.from_samples(overall[:quarter]), + latency_last_quarter=LatencyStats.from_samples(overall[-quarter:]), + peak_tasks=sampler.peak_tasks, + peak_queue_depth=sampler.peak_queue_depth, + peak_rss_delta_mb=max(0.0, (sampler.peak_rss_kb - baseline_rss_kb) / 1024.0), + ) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/shared.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/shared.py index 5c9bd9f9b..f2629775c 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/shared.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/shared.py @@ -10,6 +10,8 @@ # limitations under the License. import dataclasses +import functools +import inspect import json import logging import os @@ -19,6 +21,28 @@ import grpc from dapr.ext.workflow import _model_protocol + +def is_async_callable(fn: Any) -> bool: + """Return True if ``fn`` is async. Catches ``functools.partial`` of coroutines, + sync decorators that wrap async functions, and callable instances with ``async __call__``. + """ + candidate = fn + while isinstance(candidate, functools.partial): + candidate = candidate.func + if callable(candidate): + try: + candidate = inspect.unwrap(candidate) + except ValueError: + # Cyclic ``__wrapped__`` chain from a malformed decorator. Fall back to the + # outermost callable; misclassification is preferable to crashing dispatch. + pass + if inspect.iscoroutinefunction(candidate): + return True + if not inspect.isfunction(candidate) and hasattr(candidate, '__call__'): + return inspect.iscoroutinefunction(candidate.__call__) + return False + + ClientInterceptor = Union[ grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor, diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py index 84663064f..0316e357e 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py @@ -30,6 +30,7 @@ import grpc from dapr.ext.workflow._durabletask import deterministic, task from dapr.ext.workflow._durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl +from dapr.ext.workflow._durabletask.internal.shared import is_async_callable from dapr.ext.workflow.propagation import PropagatedHistory, PropagationScope from google.protobuf import empty_pb2, timestamp_pb2 @@ -65,10 +66,10 @@ def _log_all_threads(logger: logging.Logger, context: str = ''): class ConcurrencyOptions: - """Configuration options for controlling concurrency of different work item types and the thread pool size. + """Concurrency limits for the worker. - This class provides fine-grained control over concurrent processing limits for - activities, orchestrations and the thread pool size. + ``maximum_thread_pool_workers`` sizes the pool used to run sync activities and to + deliver async-activity responses to the sidecar. """ def __init__( @@ -80,11 +81,13 @@ def __init__( """Initialize concurrency options. Args: - maximum_concurrent_activity_work_items: Maximum number of activity work items - that can be processed concurrently. Defaults to 100 * processor_count. - maximum_concurrent_orchestration_work_items: Maximum number of orchestration work items - that can be processed concurrently. Defaults to 100 * processor_count. - maximum_thread_pool_workers: Maximum number of thread pool workers to use. + maximum_concurrent_activity_work_items: Cap on concurrent activity work items. + Defaults to ``100 * cpu_count``. + maximum_concurrent_orchestration_work_items: Cap on concurrent orchestration work + items. Defaults to ``100 * cpu_count``. + maximum_thread_pool_workers: Size of the worker thread pool. Sync activities run + on this pool, and async-activity gRPC response sends also borrow a thread + from it. Defaults to ``cpu_count + 4``. """ processor_count = os.cpu_count() or 1 default_concurrency = 100 * processor_count @@ -349,6 +352,7 @@ def __init__( self._interceptors = None self._async_worker_manager = _AsyncWorkerManager(self._concurrency_options, self._logger) + self._activity_executor = _ActivityExecutor(self._logger) @property def concurrency_options(self) -> ConcurrencyOptions: @@ -658,8 +662,19 @@ def stream_reader(): work_item.completionToken, ) elif work_item.HasField('activityRequest'): + # Async user activities run on the event loop. Sync ones fall through + # to the thread pool via _execute_activity. + activity_fn = self._registry.get_activity( + work_item.activityRequest.name + ) + activity_handler = ( + self._execute_activity_async + if activity_fn is not None and is_async_callable(activity_fn) + else self._execute_activity + ) self._async_worker_manager.submit_activity( - self._execute_activity, + activity_handler, + activity_fn, work_item.activityRequest, stub, work_item.completionToken, @@ -964,98 +979,178 @@ def _execute_orchestrator( f"Failed to deliver orchestrator response for '{req.instanceId}' to sidecar: {ex}" ) + def _activity_span(self, req: pb.ActivityRequest, instance_id: str): + """Return an OTel span context manager, or a nullcontext if OTel is not installed.""" + if otel_tracer is None: + return contextlib.nullcontext() + return otel_tracer.start_as_current_span( + name=f'activity: {req.name}', + context=otel_propagator.extract( + carrier={'traceparent': req.parentTraceContext.traceParent} + ), + attributes={ + 'dapr.ext.workflow._durabletask.task.instance_id': instance_id, + 'dapr.ext.workflow._durabletask.task.id': req.taskId, + 'dapr.ext.workflow._durabletask.activity.name': req.name, + }, + ) + + def _propagated_history(self, req: pb.ActivityRequest) -> PropagatedHistory | None: + if req.HasField('propagatedHistory'): + return PropagatedHistory.from_proto(req.propagatedHistory) + return None + + def _build_activity_result_response( + self, + req: pb.ActivityRequest, + instance_id: str, + result: str | None, + completion_token, + ) -> pb.ActivityResponse: + return pb.ActivityResponse( + instanceId=instance_id, + taskId=req.taskId, + result=ph.get_string_value(result), + completionToken=completion_token, + ) + + def _build_activity_failure_response( + self, + req: pb.ActivityRequest, + instance_id: str, + ex: BaseException, + completion_token, + ) -> pb.ActivityResponse: + return pb.ActivityResponse( + instanceId=instance_id, + taskId=req.taskId, + failureDetails=ph.new_failure_details(ex), + completionToken=completion_token, + ) + + def _send_activity_response( + self, + req: pb.ActivityRequest, + stub: stubs.TaskHubSidecarServiceStub, + res: pb.ActivityResponse, + completion_token, + instance_id: str, + ): + """Send an activity response, falling back to a failure response when the + result is too large to deliver.""" + try: + stub.CompleteActivityTask(res) + except grpc.RpcError as rpc_error: # type: ignore + if _is_message_too_large(rpc_error): + # Result is too large to deliver - fail the activity immediately. + # This can only be fixed with infrastructure changes (increasing gRPC max message size). + self._logger.error( + f"Activity '{req.name}#{req.taskId}' result is too large to deliver " + f'(RESOURCE_EXHAUSTED). Failing the activity task: {rpc_error.details()}' + ) + oversize_error = RuntimeError( + f'Activity result exceeds gRPC max message size: {rpc_error.details()}' + ) + failure_res = self._build_activity_failure_response( + req, instance_id, oversize_error, completion_token + ) + try: + stub.CompleteActivityTask(failure_res) + except Exception as ex: + self._logger.exception( + f"Failed to deliver activity failure response for '{req.name}#{req.taskId}' " + f"of orchestration ID '{instance_id}': {ex}" + ) + else: + self._handle_grpc_execution_error(rpc_error, 'activity') + except ValueError: + # gRPC raises ValueError when the underlying channel has been closed (e.g. during reconnection). + self._logger.debug( + f"Could not deliver activity response for '{req.name}#{req.taskId}' of " + f"orchestration ID '{instance_id}': channel was closed (likely due to " + f'reconnection). The sidecar will re-dispatch this work item.' + ) + except Exception as ex: + self._logger.exception( + f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}" + ) + def _execute_activity( self, + fn: task.Activity | None, req: pb.ActivityRequest, stub: stubs.TaskHubSidecarServiceStub, completionToken, ): instance_id = req.workflowInstance.instanceId - - if otel_tracer is not None: - span_context = otel_tracer.start_as_current_span( - name=f'activity: {req.name}', - context=otel_propagator.extract( - carrier={'traceparent': req.parentTraceContext.traceParent} - ), - attributes={ - 'dapr.ext.workflow._durabletask.task.instance_id': instance_id, - 'dapr.ext.workflow._durabletask.task.id': req.taskId, - 'dapr.ext.workflow._durabletask.activity.name': req.name, - }, - ) - else: - span_context = contextlib.nullcontext() - - with span_context: + with self._activity_span(req, instance_id): try: - executor = _ActivityExecutor(self._registry, self._logger) - propagated = ( - PropagatedHistory.from_proto(req.propagatedHistory) - if req.HasField('propagatedHistory') - else None - ) - result = executor.execute( + result = self._activity_executor.execute( + fn, instance_id, req.name, req.taskId, req.input.value, req.taskExecutionId, - propagated_history=propagated, + propagated_history=self._propagated_history(req), ) - res = pb.ActivityResponse( - instanceId=instance_id, - taskId=req.taskId, - result=ph.get_string_value(result), - completionToken=completionToken, + res = self._build_activity_result_response( + req, instance_id, result, completionToken ) except Exception as ex: - res = pb.ActivityResponse( - instanceId=instance_id, - taskId=req.taskId, - failureDetails=ph.new_failure_details(ex), - completionToken=completionToken, - ) + res = self._build_activity_failure_response(req, instance_id, ex, completionToken) + self._send_activity_response(req, stub, res, completionToken, instance_id) + async def _execute_activity_async( + self, + fn: task.Activity, + req: pb.ActivityRequest, + stub: stubs.TaskHubSidecarServiceStub, + completionToken, + ): + """Run an async activity on the event loop and send its result to the sidecar. + The gRPC send runs on the worker thread pool to avoid blocking the loop. + """ + instance_id = req.workflowInstance.instanceId + with self._activity_span(req, instance_id): try: - stub.CompleteActivityTask(res) - except grpc.RpcError as rpc_error: # type: ignore - if _is_message_too_large(rpc_error): - # Result is too large to deliver - fail the activity immediately. - # This can only be fixed with infrastructure changes (increasing gRPC max message size). - self._logger.error( - f"Activity '{req.name}#{req.taskId}' result is too large to deliver " - f'(RESOURCE_EXHAUSTED). Failing the activity task: {rpc_error.details()}' - ) - failure_res = pb.ActivityResponse( - instanceId=instance_id, - taskId=req.taskId, - failureDetails=ph.new_failure_details( - RuntimeError( - f'Activity result exceeds gRPC max message size: {rpc_error.details()}' - ) - ), - completionToken=completionToken, - ) - try: - stub.CompleteActivityTask(failure_res) - except Exception as ex: - self._logger.exception( - f"Failed to deliver activity failure response for '{req.name}#{req.taskId}' " - f"of orchestration ID '{instance_id}': {ex}" - ) - else: - self._handle_grpc_execution_error(rpc_error, 'activity') - except ValueError: - # gRPC raises ValueError when the underlying channel has been closed (e.g. during reconnection). - self._logger.debug( - f"Could not deliver activity response for '{req.name}#{req.taskId}' of " - f"orchestration ID '{instance_id}': channel was closed (likely due to " - f'reconnection). The sidecar will re-dispatch this work item.' + result = await self._activity_executor.execute_async( + fn, + instance_id, + req.name, + req.taskId, + req.input.value, + req.taskExecutionId, + propagated_history=self._propagated_history(req), ) + res = self._build_activity_result_response( + req, instance_id, result, completionToken + ) + except asyncio.CancelledError: + raise except Exception as ex: - self._logger.exception( - f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}" + res = self._build_activity_failure_response(req, instance_id, ex, completionToken) + loop = asyncio.get_running_loop() + try: + await loop.run_in_executor( + self._async_worker_manager.thread_pool, + self._send_activity_response, + req, + stub, + res, + completionToken, + instance_id, + ) + except RuntimeError as exc: + # Swallow only when the thread pool itself is shut down (worker tearing down). + # Other RuntimeErrors are unexpected and propagate to the work-item processor. + # The sidecar will re-dispatch this work item once the worker reconnects. + pool = self._async_worker_manager.thread_pool + if not getattr(pool, '_shutdown', False): + raise + self._logger.warning( + f"Could not deliver activity response for '{req.name}#{req.taskId}': " + f'{exc}. The sidecar will re-dispatch this work item.' ) @@ -1998,27 +2093,25 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven class _ActivityExecutor: - def __init__(self, registry: _Registry, logger: logging.Logger): - self._registry = registry + def __init__(self, logger: logging.Logger): self._logger = logger - def execute( + def _resolve( self, + fn: task.Activity | None, orchestration_id: str, name: str, task_id: int, - encoded_input: Optional[str], - task_execution_id: str = '', - propagated_history: Optional[PropagatedHistory] = None, - ) -> Optional[str]: - """Executes an activity function and returns the serialized result, if any.""" + encoded_input: str | None, + task_execution_id: str, + propagated_history: PropagatedHistory | None, + ) -> tuple[task.Activity, task.ActivityContext, Any]: + """Validate ``fn`` and build its ``(fn, ctx, input)`` call args.""" self._logger.debug(f"{orchestration_id}/{task_id}: Executing activity '{name}'...") - fn = self._registry.get_activity(name) - if not fn: + if fn is None: raise ActivityNotRegisteredError( f"Activity function named '{name}' was not registered!" ) - activity_input = shared.from_json(encoded_input) if encoded_input else None ctx = task.ActivityContext( orchestration_id, @@ -2026,10 +2119,11 @@ def execute( task_execution_id, propagated_history=propagated_history, ) + return fn, ctx, activity_input - # Execute the activity function - activity_output = fn(ctx, activity_input) - + def _encode_output( + self, orchestration_id: str, name: str, task_id: int, activity_output: Any + ) -> str | None: encoded_output = shared.to_json(activity_output) if activity_output is not None else None chars = len(encoded_output) if encoded_output else 0 self._logger.debug( @@ -2037,6 +2131,64 @@ def execute( ) return encoded_output + def execute( + self, + fn: task.Activity | None, + orchestration_id: str, + name: str, + task_id: int, + encoded_input: str | None, + task_execution_id: str = '', + propagated_history: PropagatedHistory | None = None, + ) -> str | None: + """Run a sync activity function and return the serialized result, if any. + + Raises ``RuntimeError`` if the activity returns a coroutine, which happens when + ``is_async_callable`` fails to detect an async callable at registration. + """ + resolved_fn, ctx, activity_input = self._resolve( + fn, + orchestration_id, + name, + task_id, + encoded_input, + task_execution_id, + propagated_history, + ) + activity_output = resolved_fn(ctx, activity_input) + if inspect.iscoroutine(activity_output): + activity_output.close() + raise RuntimeError( + f"Activity '{name}' returned a coroutine on the sync path. " + f'Declare it with ``async def``, or if it already is, ensure any decorator ' + f'wrapping it uses ``@functools.wraps(fn)`` so the runtime can detect the ' + f'underlying async function.' + ) + return self._encode_output(orchestration_id, name, task_id, activity_output) + + async def execute_async( + self, + fn: task.Activity, + orchestration_id: str, + name: str, + task_id: int, + encoded_input: str | None, + task_execution_id: str = '', + propagated_history: PropagatedHistory | None = None, + ) -> str | None: + """Await a coroutine activity function and return the serialized result, if any.""" + resolved_fn, ctx, activity_input = self._resolve( + fn, + orchestration_id, + name, + task_id, + encoded_input, + task_execution_id, + propagated_history, + ) + activity_output = await resolved_fn(ctx, activity_input) + return self._encode_output(orchestration_id, name, task_id, activity_output) + def _get_non_determinism_error(task_id: int, action_name: str) -> task.NonDeterminismError: return task.NonDeterminismError( @@ -2274,7 +2426,7 @@ async def _process_work_item( queue.task_done() async def _run_func(self, func, *args, **kwargs): - if inspect.iscoroutinefunction(func): + if is_async_callable(func): return await func(*args, **kwargs) else: loop = asyncio.get_running_loop() diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index f33622a15..edb89c10d 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -16,10 +16,11 @@ import inspect import time from functools import wraps -from typing import Optional, Sequence, TypeVar, Union +from typing import Any, Awaitable, Callable, Optional, Sequence, TypeVar, Union import grpc from dapr.ext.workflow._durabletask import task, worker +from dapr.ext.workflow._durabletask.internal.shared import is_async_callable as _is_async_callable from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext from dapr.ext.workflow.logger import Logger, LoggerOptions from dapr.ext.workflow.util import getAddress @@ -45,6 +46,60 @@ grpc.StreamStreamClientInterceptor, ] +# Durabletask returns decoded JSON, so we type the input as ``object | None`` and let the +# wrapper narrow it via the activity's declared model. +SyncActivityWrapper = Callable[[task.ActivityContext, object | None], object] +AsyncActivityWrapper = Callable[[task.ActivityContext, object | None], Awaitable[object]] +ActivityWrapper = SyncActivityWrapper | AsyncActivityWrapper + + +def _coerce_activity_input(inp: object | None, input_model: type | None) -> object | None: + """Coerce the raw input to the activity's declared model, if it has one.""" + if inp is None or input_model is None or isinstance(inp, input_model): + return inp + return _model_protocol.coerce_to_model(inp, input_model) + + +def _make_activity_wrapper(fn: Activity, logger: Logger) -> ActivityWrapper: + """Wrap a user activity for the durabletask worker. + + Returns: + An ``async def`` wrapper for async activities, a plain ``def`` for sync. + """ + accepts_input, input_model = _model_protocol.resolve_input(fn) + + def _call_args(ctx: task.ActivityContext, inp: object | None) -> tuple: + wf_ctx = WorkflowActivityContext(ctx) + if not accepts_input: + return (wf_ctx,) + return (wf_ctx, _coerce_activity_input(inp, input_model)) + + def _log_failure(ctx: task.ActivityContext, exc: Exception) -> None: + activity_id = getattr(ctx, 'task_id', 'unknown') + logger.warning(f'Activity execution failed - task_id: {activity_id}, error: {exc}') + + if _is_async_callable(fn): + + async def async_activity_wrapper( + ctx: task.ActivityContext, inp: object | None = None + ) -> object: + try: + return await fn(*_call_args(ctx, inp)) + except Exception as exc: + _log_failure(ctx, exc) + raise + + return async_activity_wrapper + + def sync_activity_wrapper(ctx: task.ActivityContext, inp: object | None = None) -> object: + try: + return fn(*_call_args(ctx, inp)) + except Exception as exc: + _log_failure(ctx, exc) + raise + + return sync_activity_wrapper + class WorkflowRuntime: """WorkflowRuntime is the entry point for registering workflows and activities.""" @@ -180,36 +235,14 @@ def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = fn.__dict__['_workflow_registered'] = True def register_activity(self, fn: Activity, *, name: Optional[str] = None): - """Registers a workflow activity as a function that takes - a specified input type and returns a specified output type. + """Register a workflow activity. ``def`` and ``async def`` are both supported. + Async activities run on the worker's event loop. Sync activities run in the + thread pool sized by ``maximum_thread_pool_workers``. """ effective_name = name or fn.__name__ self._logger.info(f"Registering activity '{effective_name}' with runtime") - accepts_input, input_model = _model_protocol.resolve_input(fn) - - def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): - """Responsible to call Activity function in activityWrapper""" - activity_id = getattr(ctx, 'task_id', 'unknown') - - try: - wfActivityContext = WorkflowActivityContext(ctx) - if not accepts_input: - result = fn(wfActivityContext) - else: - if ( - (inp is not None) - and (input_model is not None) - and not isinstance(inp, input_model) - ): - inp = _model_protocol.coerce_to_model(inp, input_model) - result = fn(wfActivityContext, inp) - return result - except Exception as e: - self._logger.warning( - f'Activity execution failed - task_id: {activity_id}, error: {e}' - ) - raise + activity_wrapper = _make_activity_wrapper(fn, self._logger) if hasattr(fn, '_activity_registered'): # whenever an activity is registered, it has a _dapr_alternate_name attribute @@ -224,7 +257,7 @@ def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): fn.__dict__['_dapr_alternate_name'] = name if name else fn.__name__ self.__worker._registry.add_named_activity( - fn.__dict__['_dapr_alternate_name'], activityWrapper + fn.__dict__['_dapr_alternate_name'], activity_wrapper ) fn.__dict__['_activity_registered'] = True @@ -446,16 +479,23 @@ def add(ctx, x: int, y: int) -> int: the workflow runtime. Defaults to None. """ - def wrapper(fn: any): + def wrapper(fn: Any): if hasattr(fn, '_dapr_alternate_name'): raise ValueError( f'Function {fn.__name__} already has an alternate name {fn._dapr_alternate_name}' ) fn.__dict__['_dapr_alternate_name'] = name if name else fn.__name__ - @wraps(fn) - def innerfn(*args, **kwargs): - return fn(*args, **kwargs) + if _is_async_callable(fn): + + @wraps(fn) + async def innerfn(*args, **kwargs): + return await fn(*args, **kwargs) + else: + + @wraps(fn) + def innerfn(*args, **kwargs): + return fn(*args, **kwargs) innerfn.__dict__['_dapr_alternate_name'] = name if name else fn.__name__ innerfn.__signature__ = inspect.signature(fn) diff --git a/ext/dapr-ext-workflow/docs/concurrency.md b/ext/dapr-ext-workflow/docs/concurrency.md new file mode 100644 index 000000000..c780dfd25 --- /dev/null +++ b/ext/dapr-ext-workflow/docs/concurrency.md @@ -0,0 +1,98 @@ +# Concurrency configuration for `dapr-ext-workflow` + +Sizing notes for the worker's concurrency knobs. Numbers come from +`benchmarks/bench_async_activities.py`. Re-run it on local hardware to validate. + +## Knobs + +| Setting | Default | Effect | +| --- | --- | --- | +| `maximum_concurrent_activity_work_items` | `100 × cpu_count` | Async semaphore cap on in-flight activity work items. | +| `maximum_concurrent_orchestration_work_items` | `100 × cpu_count` | Same, for orchestrations. | +| `maximum_thread_pool_workers` | `cpu_count + 4` | Worker thread pool size. Sync activities run on this pool, and async-activity gRPC response sends also borrow a thread from it. | + +A `def` activity consumes a semaphore slot **and** a thread pool worker. An +`async def` activity consumes only a semaphore slot. + +## Choosing sync vs async + +Sync (`def`) activities are fully supported and unchanged: they run on the thread +pool. Keep CPU-bound work sync. An `async def` that burns CPU blocks the event loop +and starves every other activity. + +For **I/O-bound** activities (HTTP calls, database queries, anything that waits), +prefer `async def`. A sync activity holds a thread for the whole wait, so concurrency +is capped at the pool size (`cpu_count + 4`); an async activity holds only a semaphore +slot, so in-flight concurrency scales to `maximum_concurrent_activity_work_items`. The +benchmark shows the gap widening with fan-out width. If your activities wait on I/O, +moving them to `async def` is the single biggest concurrency win available. + +Raising `maximum_thread_pool_workers` lifts the ceiling for a sync I/O activity you can't +convert yet, but threads scale worse than the loop. Each costs stack memory and contends +on the GIL, so the activity semaphore reaches `100 × cpu_count` in flight where a thread +pool that size would not. It buys headroom, not the async ceiling. + +Async helps concurrent activities, not sequential chains. A chain of dependent steps +costs the sum of its steps either way, sync or async. + +## Sizing the activity cap + +The cap is the lever for throughput and queue wait. Below the cap, in-flight work +runs concurrently; past it, submissions wait in the queue. Rule of thumb: set the +cap to ~2x the expected steady-state in-flight count to absorb bursts. + +If activities call a downstream with a hard concurrency limit (e.g. a database +with a 100-connection pool), set the cap below that limit so it doubles as +backpressure. + +## Sizing the thread pool + +The worker thread pool, sized by `maximum_thread_pool_workers`, has two uses. + +**Sync activity execution.** Each `def` activity holds one thread for its +duration. Size to peak concurrent sync-activity count. + +**Async response delivery.** Each async activity, on completion, schedules +`stub.CompleteActivityTask` on the same pool to avoid blocking the loop during +the gRPC send. If the sidecar takes >5 ms to acknowledge and the worker runs +many concurrent async activities, response delivery can serialize through the +pool and tail latency inflates. Raise `maximum_thread_pool_workers` to widen +response-delivery throughput. + +Mixed workloads with long-running sync activities can starve async response +delivery (and vice versa) since they share the pool. If that becomes an issue, +size `maximum_thread_pool_workers` to the sum of peak sync activity concurrency +and peak in-flight async response sends. + +This thread hop goes away when the worker migrates to `grpc.aio`. + +## Reusing clients in async activities + +When async activities call out over the network (HTTP, a database), a fresh client per +call bounds throughput by connection setup, not the I/O. A per-call `httpx.AsyncClient` +plateaus around a few hundred req/s. Reuse one client and size its pool to the activity +cap: + +```python +_shared_client: httpx.AsyncClient | None = None + +def _get_client() -> httpx.AsyncClient: + global _shared_client + if _shared_client is None: + _shared_client = httpx.AsyncClient(timeout=30.0) + return _shared_client +``` + +The caller owns closing it during worker shutdown. For activities that hit many +hosts or need per-call timeout isolation, stick with per-call clients. + +## Re-running the benchmark + +```bash +uv sync --all-packages --group dev +uv run python ext/dapr-ext-workflow/benchmarks/bench_async_activities.py +``` + +`DAPR_BENCH_SUSTAINED_SECONDS` overrides the 120 s sustained run and +`DAPR_BENCH_ACTIVITY_MS` the per-activity duration, for a faster local check. The +script creates `benchmarks/RESULTS.md`. diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_activity_dispatch_routing.py b/ext/dapr-ext-workflow/tests/durabletask/test_activity_dispatch_routing.py new file mode 100644 index 000000000..404088182 --- /dev/null +++ b/ext/dapr-ext-workflow/tests/durabletask/test_activity_dispatch_routing.py @@ -0,0 +1,92 @@ +# Copyright 2026 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Contract tests for the activity dispatch handlers on ``TaskHubGrpcWorker``. + +The work-item dispatcher at the top of ``worker.py``'s gRPC loop selects between +``_execute_activity`` (sync, runs in the thread pool) and ``_execute_activity_async`` +(coroutine, awaited on the event loop) using ``is_async_callable(handler)`` via +``_AsyncWorkerManager._run_func``. These tests pin the async-ness of each handler so +the dispatch routing stays correct. +""" + +import asyncio +import inspect +import logging +import threading +from typing import Iterator + +import pytest +from dapr.ext.workflow._durabletask.worker import ( + ConcurrencyOptions, + TaskHubGrpcWorker, + _AsyncWorkerManager, +) + + +@pytest.fixture +def worker() -> Iterator[TaskHubGrpcWorker]: + instance = TaskHubGrpcWorker() + try: + yield instance + finally: + # The worker was never started, so ``stop()`` early-returns; shut the manager + # down directly so the test doesn't leak threads if any work was submitted. + instance.stop() + instance._async_worker_manager.shutdown() + + +@pytest.fixture +def manager() -> Iterator[_AsyncWorkerManager]: + instance = _AsyncWorkerManager(ConcurrencyOptions(), logger=logging.getLogger()) + try: + yield instance + finally: + instance.shutdown() + + +def test_sync_activity_handler_is_not_a_coroutine_function(worker: TaskHubGrpcWorker): + assert not inspect.iscoroutinefunction(worker._execute_activity) + + +def test_async_activity_handler_is_a_coroutine_function(worker: TaskHubGrpcWorker): + assert inspect.iscoroutinefunction(worker._execute_activity_async) + + +def test_run_func_awaits_coroutines_directly(manager: _AsyncWorkerManager): + """``_AsyncWorkerManager._run_func`` is the single point that branches on async-ness. + + A coroutine handler returns its value without going through the thread pool. + """ + + async def coroutine_handler(value: int) -> int: + return value + 1 + + async def driver() -> int: + return await manager._run_func(coroutine_handler, 41) + + assert asyncio.run(driver()) == 42 + + +def test_run_func_dispatches_sync_callables_to_thread_pool(manager: _AsyncWorkerManager): + main_thread_id = threading.get_ident() + captured: dict[str, int] = {} + + def sync_handler(value: int) -> int: + captured['thread_id'] = threading.get_ident() + return value + 1 + + async def driver() -> int: + return await manager._run_func(sync_handler, 41) + + result = asyncio.run(driver()) + assert result == 42 + assert captured['thread_id'] != main_thread_id diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_activity_executor.py b/ext/dapr-ext-workflow/tests/durabletask/test_activity_executor.py index f65aaf3f6..8a0b3fe63 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_activity_executor.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_activity_executor.py @@ -34,7 +34,9 @@ def test_activity(ctx: task.ActivityContext, test_input: Any): activity_input = 'Hello, 世界!' executor, name = _get_activity_executor(test_activity) - result = executor.execute(TEST_INSTANCE_ID, name, TEST_TASK_ID, json.dumps(activity_input)) + result = executor.execute( + test_activity, TEST_INSTANCE_ID, name, TEST_TASK_ID, json.dumps(activity_input) + ) assert result is not None result_input, result_orchestration_id, result_task_id = json.loads(result) @@ -44,14 +46,14 @@ def test_activity(ctx: task.ActivityContext, test_input: Any): def test_activity_not_registered(): - def test_activity(ctx: task.ActivityContext, _): - pass # not used - - executor, _ = _get_activity_executor(test_activity) + """Dispatch site passes ``fn=None`` for unknown activity names. Executor surfaces + that as ``ActivityNotRegisteredError`` carrying the requested name. + """ + executor = worker._ActivityExecutor(TEST_LOGGER) caught_exception: Optional[Exception] = None try: - executor.execute(TEST_INSTANCE_ID, 'Bogus', TEST_TASK_ID, None) + executor.execute(None, TEST_INSTANCE_ID, 'Bogus', TEST_TASK_ID, None) except Exception as ex: caught_exception = ex @@ -59,8 +61,29 @@ def test_activity(ctx: task.ActivityContext, _): assert 'Bogus' in str(caught_exception) +def test_sync_execute_rejects_async_activity(): + """Sync ``execute`` must raise a clear RuntimeError when the activity returns a + coroutine. Guards against ``_is_async_callable`` missing an async callable at + registration; without this, JSON encoding would fail with a confusing TypeError. + """ + + async def async_activity(ctx: task.ActivityContext, _): + return 'never reached' + + executor, name = _get_activity_executor(async_activity) + + caught_exception: Optional[Exception] = None + try: + executor.execute(async_activity, TEST_INSTANCE_ID, name, TEST_TASK_ID, None) + except Exception as ex: + caught_exception = ex + + assert type(caught_exception) is RuntimeError + assert 'returned a coroutine' in str(caught_exception) + + def _get_activity_executor(fn: task.Activity) -> Tuple[worker._ActivityExecutor, str]: registry = worker._Registry() name = registry.add_activity(fn) - executor = worker._ActivityExecutor(registry, TEST_LOGGER) + executor = worker._ActivityExecutor(TEST_LOGGER) return executor, name diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_activity_executor_async.py b/ext/dapr-ext-workflow/tests/durabletask/test_activity_executor_async.py new file mode 100644 index 000000000..311e1533c --- /dev/null +++ b/ext/dapr-ext-workflow/tests/durabletask/test_activity_executor_async.py @@ -0,0 +1,98 @@ +# Copyright 2026 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the async branch of ``_ActivityExecutor``. + +These mirror ``test_activity_executor.py`` but exercise the ``execute_async`` path used +when a registered activity is a coroutine function. +""" + +import asyncio +import inspect +import json +import logging +from typing import Any + +import pytest +from dapr.ext.workflow._durabletask import task, worker + +logging.basicConfig( + format='%(asctime)s.%(msecs)03d %(name)s %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=logging.DEBUG, +) +TEST_LOGGER = logging.getLogger('tests') +TEST_INSTANCE_ID = 'abc123' +TEST_TASK_ID = 42 + + +def _get_activity_executor(fn: task.Activity) -> tuple[worker._ActivityExecutor, str]: + registry = worker._Registry() + name = registry.add_activity(fn) + executor = worker._ActivityExecutor(TEST_LOGGER) + return executor, name + + +def test_async_activity_inputs(): + """Validates that execute_async awaits the activity and returns the encoded result.""" + + async def test_async_activity(ctx: task.ActivityContext, test_input: Any): + await asyncio.sleep(0) + return test_input, ctx.orchestration_id, ctx.task_id + + activity_input = 'Hello, 世界!' + executor, name = _get_activity_executor(test_async_activity) + result = asyncio.run( + executor.execute_async( + test_async_activity, + TEST_INSTANCE_ID, + name, + TEST_TASK_ID, + json.dumps(activity_input), + ) + ) + assert result is not None + + result_input, result_orchestration_id, result_task_id = json.loads(result) + assert activity_input == result_input + assert TEST_INSTANCE_ID == result_orchestration_id + assert TEST_TASK_ID == result_task_id + + +def test_async_activity_exception_propagates(): + async def test_async_activity(ctx: task.ActivityContext, _): + raise RuntimeError('boom') + + executor, name = _get_activity_executor(test_async_activity) + + with pytest.raises(RuntimeError) as exc_info: + asyncio.run( + executor.execute_async(test_async_activity, TEST_INSTANCE_ID, name, TEST_TASK_ID, None) + ) + assert 'boom' in str(exc_info.value) + + +def test_async_activity_registry_preserves_coroutine_function(): + """The dispatcher relies on iscoroutinefunction(fn) at the registry lookup level. + + If the registry's add_activity ever wraps coroutine functions in a way that hides their + async-ness (e.g. functools.wraps with a sync decorator), the dispatcher would route + them to the thread pool and break I/O concurrency. This test pins that contract. + """ + + async def test_async_activity(ctx: task.ActivityContext, _): + return None + + registry = worker._Registry() + name = registry.add_activity(test_async_activity) + + retrieved = registry.get_activity(name) + assert inspect.iscoroutinefunction(retrieved) diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_async_dispatch_regression.py b/ext/dapr-ext-workflow/tests/durabletask/test_async_dispatch_regression.py new file mode 100644 index 000000000..5321ef822 --- /dev/null +++ b/ext/dapr-ext-workflow/tests/durabletask/test_async_dispatch_regression.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +# Copyright 2026 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Perf regression tests for the async activity dispatch path. + +Reuses the benchmark harness (``dapr.ext.workflow._bench_harness``) with small, fast +parameters and asserts machine-independent ratios rather than absolute times, so the +checks stay deterministic in CI. Marked ``perf`` so constrained runners can skip them. +""" + +import asyncio + +import pytest +from dapr.ext.workflow._bench_harness import ( + _async_sleep_factory, + _run_full, + _run_lite, + _run_sustained, +) + +pytestmark = pytest.mark.perf + +ACTIVITY_S = 0.02 +POOL = 16 +SEM = 1000 +REPEAT = 2 + + +async def fastest(**kwargs): + """Fastest of REPEAT sequential runs, so spotty CI noise on one run can't flake the + wallclock comparisons. Noise only adds time, so the min is the least-disturbed sample. + """ + runs = [await _run_full(**kwargs) for _ in range(REPEAT)] + return min(runs, key=lambda m: m.wallclock_s) + + +def test_async_fan_out_overlaps_and_beats_sync(): + """Async clears a batch in ~one I/O window; sync serializes through the pool.""" + + async def run(): + kwargs = dict( + n_items=300, + semaphore_cap=SEM, + thread_pool_workers=POOL, + server_latency_s=ACTIVITY_S, + ) + sync_m = await fastest(name='sync', activity_kind='sync', **kwargs) + async_m = await fastest(name='async', activity_kind='async', **kwargs) + return sync_m, async_m + + sync_m, async_m = asyncio.run(run()) + assert async_m.wallclock_s < ACTIVITY_S * 20, 'async did not overlap I/O' + assert async_m.wallclock_s * 2 < sync_m.wallclock_s, 'async did not beat sync at scale' + + +def test_semaphore_caps_async_concurrency(): + """A small semaphore must gate the async path even though it never touches the pool.""" + + async def run(): + kwargs = dict( + n_items=1000, + thread_pool_workers=POOL, + server_latency_s=ACTIVITY_S, + activity_kind='async', + ) + gated = await fastest(name='gated', semaphore_cap=10, **kwargs) + ungated = await fastest(name='ungated', semaphore_cap=SEM, **kwargs) + return gated, ungated + + gated, ungated = asyncio.run(run()) + assert gated.wallclock_s > ungated.wallclock_s * 2, 'semaphore did not gate concurrency' + + +def test_sustained_async_holds_while_sync_drifts(): + """Above the sync ceiling, sync tail latency drifts upward and ends far worse than async.""" + + async def run(): + kwargs = dict( + duration_s=3.0, + target_rate_per_s=1000.0, + semaphore_cap=SEM, + thread_pool_workers=POOL, + server_latency_s=ACTIVITY_S, + ) + sync_m = await _run_sustained(activity_kind='sync', **kwargs) + async_m = await _run_sustained(activity_kind='async', **kwargs) + return sync_m, async_m + + sync_m, async_m = asyncio.run(run()) + sync_first = max(sync_m.latency_first_quarter.p99_ms, 1.0) + assert sync_m.latency_last_quarter.p99_ms > sync_first * 2, 'sync tail did not drift' + assert sync_m.latency_last_quarter.p99_ms > async_m.latency_last_quarter.p99_ms * 2 + + +def test_pending_tasks_stay_bounded(): + """Activities parked on the semaphore must not inflate task count or RSS.""" + + async def run(): + return await _run_lite( + name='oom', + activity=_async_sleep_factory(ACTIVITY_S, {}, {}), + n_items=2000, + semaphore_cap=500, + thread_pool_workers=POOL, + server_latency_s=ACTIVITY_S, + ) + + metrics = asyncio.run(run()) + assert metrics.peak_tasks <= int(metrics.n_items * 1.5), 'task accounting inflated' + assert metrics.peak_rss_delta_mb < 500.0, 'RSS exceeded budget' diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_propagation_wiring.py b/ext/dapr-ext-workflow/tests/durabletask/test_propagation_wiring.py index f74b8fb8e..010f54aae 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_propagation_wiring.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_propagation_wiring.py @@ -191,12 +191,13 @@ def reading_activity(ctx: task.ActivityContext, _): registry = worker._Registry() activity_name = registry.add_activity(reading_activity) - executor = worker._ActivityExecutor(registry, TEST_LOGGER) + executor = worker._ActivityExecutor(TEST_LOGGER) propagated = PropagatedHistory.from_proto(_single_chunk_history('Caller')) assert propagated is not None encoded_output = executor.execute( + reading_activity, orchestration_id='wf-1', name=activity_name, task_id=1, @@ -221,8 +222,9 @@ def reading_activity(ctx: task.ActivityContext, _): registry = worker._Registry() activity_name = registry.add_activity(reading_activity) - executor = worker._ActivityExecutor(registry, TEST_LOGGER) + executor = worker._ActivityExecutor(TEST_LOGGER) executor.execute( + reading_activity, orchestration_id='wf-1', name=activity_name, task_id=1, diff --git a/ext/dapr-ext-workflow/tests/test_async_activity_registration.py b/ext/dapr-ext-workflow/tests/test_async_activity_registration.py new file mode 100644 index 000000000..e154aedb0 --- /dev/null +++ b/ext/dapr-ext-workflow/tests/test_async_activity_registration.py @@ -0,0 +1,259 @@ +# -*- coding: utf-8 -*- + +# Copyright 2026 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for sync/async activity registration and the resulting wrappers. + +These tests exercise the helpers in workflow_runtime that decide whether an activity +runs in a thread pool (sync) or as a coroutine on the event loop (async). The +WorkflowRuntime is constructed against a fake registry so we don't need a sidecar. +""" + +import asyncio +import functools +import inspect +import unittest +from unittest import mock + +from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext +from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, _is_async_callable +from pydantic import BaseModel + + +class OrderInput(BaseModel): + order_id: str + amount: float + + +class FakeRegistry: + def __init__(self): + self.activities: dict[str, object] = {} + + def add_named_activity(self, name: str, fn) -> None: + self.activities[name] = fn + + +class _AsyncActivityRegistrationTestBase(unittest.TestCase): + def setUp(self) -> None: + self._registry_patch = mock.patch( + 'dapr.ext.workflow._durabletask.worker._Registry', return_value=FakeRegistry() + ) + self._registry_patch.start() + self.runtime = WorkflowRuntime() + # Reach into the runtime to grab its registry for assertions. + self.registry: FakeRegistry = self.runtime._WorkflowRuntime__worker._registry + + def tearDown(self) -> None: + # Tear down the worker's ThreadPoolExecutor so each test doesn't leak threads/fds. + # The runtime never started, so ``shutdown()`` -> ``stop()`` early-returns; + # shut the manager down directly to actually close the executor. + worker = self.runtime._WorkflowRuntime__worker + self.runtime.shutdown() + worker._async_worker_manager.shutdown() + self._registry_patch.stop() + + +class AsyncActivityRegistrationTest(_AsyncActivityRegistrationTestBase): + def test_async_activity_registers_coroutine_wrapper(self) -> None: + async def my_async_activity(ctx: WorkflowActivityContext, payload: str) -> str: + return payload.upper() + + self.runtime.register_activity(my_async_activity) + + wrapper = self.registry.activities['my_async_activity'] + self.assertTrue(inspect.iscoroutinefunction(wrapper)) + + def test_sync_activity_registers_plain_wrapper(self) -> None: + def my_sync_activity(ctx: WorkflowActivityContext, payload: str) -> str: + return payload.upper() + + self.runtime.register_activity(my_sync_activity) + + wrapper = self.registry.activities['my_sync_activity'] + self.assertFalse(inspect.iscoroutinefunction(wrapper)) + self.assertTrue(callable(wrapper)) + + def test_async_wrapper_awaits_user_function(self) -> None: + recorded: list[tuple[WorkflowActivityContext, str]] = [] + + async def my_async_activity(ctx: WorkflowActivityContext, payload: str) -> str: + await asyncio.sleep(0) + recorded.append((ctx, payload)) + return payload.upper() + + self.runtime.register_activity(my_async_activity) + wrapper = self.registry.activities['my_async_activity'] + + fake_ctx = mock.MagicMock(spec=['task_id']) + fake_ctx.task_id = 7 + result = asyncio.run(wrapper(fake_ctx, 'hello')) + + self.assertEqual(result, 'HELLO') + self.assertEqual(len(recorded), 1) + self.assertEqual(recorded[0][1], 'hello') + self.assertIsInstance(recorded[0][0], WorkflowActivityContext) + + def test_sync_wrapper_calls_user_function(self) -> None: + recorded: list[tuple[WorkflowActivityContext, str]] = [] + + def my_sync_activity(ctx: WorkflowActivityContext, payload: str) -> str: + recorded.append((ctx, payload)) + return payload.upper() + + self.runtime.register_activity(my_sync_activity) + wrapper = self.registry.activities['my_sync_activity'] + + fake_ctx = mock.MagicMock(spec=['task_id']) + fake_ctx.task_id = 3 + result = wrapper(fake_ctx, 'world') + + self.assertEqual(result, 'WORLD') + self.assertEqual(len(recorded), 1) + self.assertEqual(recorded[0][1], 'world') + self.assertIsInstance(recorded[0][0], WorkflowActivityContext) + + def test_async_wrapper_coerces_input_to_declared_model(self) -> None: + seen: list[OrderInput] = [] + + async def place_order(ctx: WorkflowActivityContext, order: OrderInput) -> str: + seen.append(order) + return order.order_id + + self.runtime.register_activity(place_order) + wrapper = self.registry.activities['place_order'] + + fake_ctx = mock.MagicMock(spec=['task_id']) + fake_ctx.task_id = 99 + raw_input = {'order_id': 'abc-1', 'amount': 9.5} + result = asyncio.run(wrapper(fake_ctx, raw_input)) + + self.assertEqual(result, 'abc-1') + self.assertEqual(len(seen), 1) + self.assertIsInstance(seen[0], OrderInput) + self.assertEqual(seen[0].amount, 9.5) + + def test_async_wrapper_propagates_exceptions(self) -> None: + async def failing(ctx: WorkflowActivityContext, payload: str) -> str: + raise RuntimeError('boom') + + self.runtime.register_activity(failing) + wrapper = self.registry.activities['failing'] + + fake_ctx = mock.MagicMock(spec=['task_id']) + fake_ctx.task_id = 1 + with self.assertRaises(RuntimeError) as caught: + asyncio.run(wrapper(fake_ctx, 'x')) + self.assertEqual(str(caught.exception), 'boom') + + def test_async_wrapper_supports_no_input_parameter(self) -> None: + async def heartbeat(ctx: WorkflowActivityContext) -> str: + return 'ok' + + self.runtime.register_activity(heartbeat) + wrapper = self.registry.activities['heartbeat'] + + fake_ctx = mock.MagicMock(spec=['task_id']) + fake_ctx.task_id = 0 + result = asyncio.run(wrapper(fake_ctx, None)) + self.assertEqual(result, 'ok') + + +class IsAsyncCallableTest(unittest.TestCase): + """Pin the contract of ``_is_async_callable`` against decorator shapes that a bare + ``inspect.iscoroutinefunction`` would miss. These are the patterns the fix for finding + #5 was meant to address. Without coverage, a future refactor can silently regress + async-activity routing for any of them. + """ + + def test_plain_async_function_is_async(self) -> None: + async def fn() -> None: ... + + self.assertTrue(_is_async_callable(fn)) + + def test_plain_sync_function_is_not_async(self) -> None: + def fn() -> None: ... + + self.assertFalse(_is_async_callable(fn)) + + def test_functools_partial_of_async_is_async(self) -> None: + async def fn(prefix: str, payload: str) -> str: + return prefix + payload + + partial_fn = functools.partial(fn, 'hello-') + self.assertTrue(_is_async_callable(partial_fn)) + + def test_functools_partial_of_sync_is_not_async(self) -> None: + def fn(prefix: str, payload: str) -> str: + return prefix + payload + + partial_fn = functools.partial(fn, 'hello-') + self.assertFalse(_is_async_callable(partial_fn)) + + def test_wraps_chain_over_async_is_async(self) -> None: + """A sync decorator that uses @functools.wraps exposes the inner via __wrapped__.""" + + async def inner(ctx: object, inp: object) -> None: ... + + @functools.wraps(inner) + def outer(ctx: object, inp: object) -> object: + return inner(ctx, inp) + + self.assertTrue(_is_async_callable(outer)) + + def test_nested_partial_and_wraps_chain_is_async(self) -> None: + """partial(@wraps over async). Exercises both unwrap stages in order.""" + + async def inner(prefix: str, payload: str) -> str: + return prefix + payload + + @functools.wraps(inner) + def wrapped(prefix: str, payload: str) -> str: + return inner(prefix, payload) + + partial_wrapped = functools.partial(wrapped, 'hi-') + self.assertTrue(_is_async_callable(partial_wrapped)) + + def test_callable_class_instance_with_async_call_is_async(self) -> None: + class AsyncCallable: + async def __call__(self, ctx: object, inp: object) -> str: + return 'ok' + + self.assertTrue(_is_async_callable(AsyncCallable())) + + def test_callable_class_instance_with_sync_call_is_not_async(self) -> None: + class SyncCallable: + def __call__(self, ctx: object, inp: object) -> str: + return 'ok' + + self.assertFalse(_is_async_callable(SyncCallable())) + + +class AsyncAndSyncCoexistTest(_AsyncActivityRegistrationTestBase): + def test_runtime_registers_mixed_sync_and_async_activities(self) -> None: + async def async_activity(ctx: WorkflowActivityContext, payload: int) -> int: + return payload + 1 + + def sync_activity(ctx: WorkflowActivityContext, payload: int) -> int: + return payload * 2 + + self.runtime.register_activity(async_activity) + self.runtime.register_activity(sync_activity) + + async_wrapper = self.registry.activities['async_activity'] + sync_wrapper = self.registry.activities['sync_activity'] + + self.assertTrue(inspect.iscoroutinefunction(async_wrapper)) + self.assertFalse(inspect.iscoroutinefunction(sync_wrapper)) + + +if __name__ == '__main__': + unittest.main() diff --git a/pyproject.toml b/pyproject.toml index 5ce25e6a6..01ffd53cf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -134,6 +134,7 @@ ignore_missing_imports = true [tool.pytest.ini_options] markers = [ 'example_dir(name): set the example directory for the dapr fixture', + 'perf: timing-sensitive dispatch regression tests (reuse the benchmark harness)', ] pythonpath = ["."] asyncio_mode = "auto" diff --git a/uv.lock b/uv.lock index c9df9766b..cc1f4c8cb 100644 --- a/uv.lock +++ b/uv.lock @@ -1467,7 +1467,7 @@ wheels = [ [[package]] name = "langsmith" -version = "0.8.6" +version = "0.7.17" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "httpx" }, @@ -1477,13 +1477,12 @@ dependencies = [ { name = "requests" }, { name = "requests-toolbelt" }, { name = "uuid-utils" }, - { name = "websockets" }, { name = "xxhash" }, { name = "zstandard" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/7a/61/d269b8bd3376031de7be6ac2de8ba94fafff67635195d97aa0e842027ac7/langsmith-0.8.6.tar.gz", hash = "sha256:a46fd3403c2de3a9c34f72ebb7b2e45872627671adcc67c6a4c571520b6931cc", size = 4463093, upload-time = "2026-05-27T22:51:52.928Z" } +sdist = { url = "https://files.pythonhosted.org/packages/71/79/81041dde07a974e728db7def23c1c7255950b8874102925cc77093bc847d/langsmith-0.7.17.tar.gz", hash = "sha256:6c1b0c2863cdd6636d2a58b8d5b1b80060703d98cac2593f4233e09ac25b5a9d", size = 1132228, upload-time = "2026-03-12T20:41:10.808Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/7e/c5/28f99eccd79ce89ec93de9a5039a74ddf4740f2d9671b0a06c5d2e200914/langsmith-0.8.6-py3-none-any.whl", hash = "sha256:b304888ea5ec5fe397db24f0bf474b0c8e472fb23ee36a2007e9837f6ff29cc1", size = 399954, upload-time = "2026-05-27T22:51:50.847Z" }, + { url = "https://files.pythonhosted.org/packages/34/31/62689d57f4d25792bd6a3c05c868771899481be2f3e31f9e71d31e1ac4ab/langsmith-0.7.17-py3-none-any.whl", hash = "sha256:cbec10460cb6c6ecc94c18c807be88a9984838144ae6c4693c9f859f378d7d02", size = 359147, upload-time = "2026-03-12T20:41:08.758Z" }, ] [[package]] @@ -2611,11 +2610,11 @@ wheels = [ [[package]] name = "python-multipart" -version = "0.0.29" +version = "0.0.22" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/4e/fe/70bd71a6738b09a0bdf6480ca6436b167469ca4578b2a0efbe390b4b0e70/python_multipart-0.0.29.tar.gz", hash = "sha256:643e93849196645e2dbdd81a0f8829a23123ad7f797a84a364c6fb3563f18904", size = 45678, upload-time = "2026-05-17T17:29:47.654Z" } +sdist = { url = "https://files.pythonhosted.org/packages/94/01/979e98d542a70714b0cb2b6728ed0b7c46792b695e3eaec3e20711271ca3/python_multipart-0.0.22.tar.gz", hash = "sha256:7340bef99a7e0032613f56dc36027b959fd3b30a787ed62d310e951f7c3a3a58", size = 37612, upload-time = "2026-01-25T10:15:56.219Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/8f/cb/769cfc37177252872a45a71f3fbdde9d51b471a3f3c14bfe95dde3407386/python_multipart-0.0.29-py3-none-any.whl", hash = "sha256:2ddcc971cef266225f54f552d8fa10bcfbb1f14446caec199060daac59ff2d69", size = 29640, upload-time = "2026-05-17T17:29:45.69Z" }, + { url = "https://files.pythonhosted.org/packages/1b/d0/397f9626e711ff749a95d96b7af99b9c566a9bb5129b8e4c10fc4d100304/python_multipart-0.0.22-py3-none-any.whl", hash = "sha256:2b2cd894c83d21bf49d702499531c7bafd057d730c201782048f7945d82de155", size = 24579, upload-time = "2026-01-25T10:15:54.811Z" }, ] [[package]] @@ -3127,11 +3126,11 @@ wheels = [ [[package]] name = "urllib3" -version = "2.7.0" +version = "2.6.3" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/53/0c/06f8b233b8fd13b9e5ee11424ef85419ba0d8ba0b3138bf360be2ff56953/urllib3-2.7.0.tar.gz", hash = "sha256:231e0ec3b63ceb14667c67be60f2f2c40a518cb38b03af60abc813da26505f4c", size = 433602, upload-time = "2026-05-07T16:13:18.596Z" } +sdist = { url = "https://files.pythonhosted.org/packages/c7/24/5f1b3bdffd70275f6661c76461e25f024d5a38a46f04aaca912426a2b1d3/urllib3-2.6.3.tar.gz", hash = "sha256:1b62b6884944a57dbe321509ab94fd4d3b307075e0c2eae991ac71ee15ad38ed", size = 435556, upload-time = "2026-01-07T16:24:43.925Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/7f/3e/5db95bcf282c52709639744ca2a8b149baccf648e39c8cc87553df9eae0c/urllib3-2.7.0-py3-none-any.whl", hash = "sha256:9fb4c81ebbb1ce9531cce37674bbc6f1360472bc18ca9a553ede278ef7276897", size = 131087, upload-time = "2026-05-07T16:13:17.151Z" }, + { url = "https://files.pythonhosted.org/packages/39/08/aaaad47bc4e9dc8c725e68f9d04865dbcb2052843ff09c97b08904852d84/urllib3-2.6.3-py3-none-any.whl", hash = "sha256:bf272323e553dfb2e87d9bfd225ca7b0f467b919d7bbd355436d3fd37cb0acd4", size = 131584, upload-time = "2026-01-07T16:24:42.685Z" }, ] [[package]] @@ -3209,74 +3208,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/33/e8/e40370e6d74ddba47f002a32919d91310d6074130fe4e17dabcafc15cbf1/watchdog-6.0.0-py3-none-win_ia64.whl", hash = "sha256:a1914259fa9e1454315171103c6a30961236f508b9b623eae470268bbcc6a22f", size = 79067, upload-time = "2024-11-01T14:07:11.845Z" }, ] -[[package]] -name = "websockets" -version = "16.0" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/04/24/4b2031d72e840ce4c1ccb255f693b15c334757fc50023e4db9537080b8c4/websockets-16.0.tar.gz", hash = "sha256:5f6261a5e56e8d5c42a4497b364ea24d94d9563e8fbd44e78ac40879c60179b5", size = 179346, upload-time = "2026-01-10T09:23:47.181Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/20/74/221f58decd852f4b59cc3354cccaf87e8ef695fede361d03dc9a7396573b/websockets-16.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:04cdd5d2d1dacbad0a7bf36ccbcd3ccd5a30ee188f2560b7a62a30d14107b31a", size = 177343, upload-time = "2026-01-10T09:22:21.28Z" }, - { url = "https://files.pythonhosted.org/packages/19/0f/22ef6107ee52ab7f0b710d55d36f5a5d3ef19e8a205541a6d7ffa7994e5a/websockets-16.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:8ff32bb86522a9e5e31439a58addbb0166f0204d64066fb955265c4e214160f0", size = 175021, upload-time = "2026-01-10T09:22:22.696Z" }, - { url = "https://files.pythonhosted.org/packages/10/40/904a4cb30d9b61c0e278899bf36342e9b0208eb3c470324a9ecbaac2a30f/websockets-16.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:583b7c42688636f930688d712885cf1531326ee05effd982028212ccc13e5957", size = 175320, upload-time = "2026-01-10T09:22:23.94Z" }, - { url = "https://files.pythonhosted.org/packages/9d/2f/4b3ca7e106bc608744b1cdae041e005e446124bebb037b18799c2d356864/websockets-16.0-cp310-cp310-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:7d837379b647c0c4c2355c2499723f82f1635fd2c26510e1f587d89bc2199e72", size = 183815, upload-time = "2026-01-10T09:22:25.469Z" }, - { url = "https://files.pythonhosted.org/packages/86/26/d40eaa2a46d4302becec8d15b0fc5e45bdde05191e7628405a19cf491ccd/websockets-16.0-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:df57afc692e517a85e65b72e165356ed1df12386ecb879ad5693be08fac65dde", size = 185054, upload-time = "2026-01-10T09:22:27.101Z" }, - { url = "https://files.pythonhosted.org/packages/b0/ba/6500a0efc94f7373ee8fefa8c271acdfd4dca8bd49a90d4be7ccabfc397e/websockets-16.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:2b9f1e0d69bc60a4a87349d50c09a037a2607918746f07de04df9e43252c77a3", size = 184565, upload-time = "2026-01-10T09:22:28.293Z" }, - { url = "https://files.pythonhosted.org/packages/04/b4/96bf2cee7c8d8102389374a2616200574f5f01128d1082f44102140344cc/websockets-16.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:335c23addf3d5e6a8633f9f8eda77efad001671e80b95c491dd0924587ece0b3", size = 183848, upload-time = "2026-01-10T09:22:30.394Z" }, - { url = "https://files.pythonhosted.org/packages/02/8e/81f40fb00fd125357814e8c3025738fc4ffc3da4b6b4a4472a82ba304b41/websockets-16.0-cp310-cp310-win32.whl", hash = "sha256:37b31c1623c6605e4c00d466c9d633f9b812ea430c11c8a278774a1fde1acfa9", size = 178249, upload-time = "2026-01-10T09:22:32.083Z" }, - { url = "https://files.pythonhosted.org/packages/b4/5f/7e40efe8df57db9b91c88a43690ac66f7b7aa73a11aa6a66b927e44f26fa/websockets-16.0-cp310-cp310-win_amd64.whl", hash = "sha256:8e1dab317b6e77424356e11e99a432b7cb2f3ec8c5ab4dabbcee6add48f72b35", size = 178685, upload-time = "2026-01-10T09:22:33.345Z" }, - { url = "https://files.pythonhosted.org/packages/f2/db/de907251b4ff46ae804ad0409809504153b3f30984daf82a1d84a9875830/websockets-16.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:31a52addea25187bde0797a97d6fc3d2f92b6f72a9370792d65a6e84615ac8a8", size = 177340, upload-time = "2026-01-10T09:22:34.539Z" }, - { url = "https://files.pythonhosted.org/packages/f3/fa/abe89019d8d8815c8781e90d697dec52523fb8ebe308bf11664e8de1877e/websockets-16.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:417b28978cdccab24f46400586d128366313e8a96312e4b9362a4af504f3bbad", size = 175022, upload-time = "2026-01-10T09:22:36.332Z" }, - { url = "https://files.pythonhosted.org/packages/58/5d/88ea17ed1ded2079358b40d31d48abe90a73c9e5819dbcde1606e991e2ad/websockets-16.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:af80d74d4edfa3cb9ed973a0a5ba2b2a549371f8a741e0800cb07becdd20f23d", size = 175319, upload-time = "2026-01-10T09:22:37.602Z" }, - { url = "https://files.pythonhosted.org/packages/d2/ae/0ee92b33087a33632f37a635e11e1d99d429d3d323329675a6022312aac2/websockets-16.0-cp311-cp311-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:08d7af67b64d29823fed316505a89b86705f2b7981c07848fb5e3ea3020c1abe", size = 184631, upload-time = "2026-01-10T09:22:38.789Z" }, - { url = "https://files.pythonhosted.org/packages/c8/c5/27178df583b6c5b31b29f526ba2da5e2f864ecc79c99dae630a85d68c304/websockets-16.0-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:7be95cfb0a4dae143eaed2bcba8ac23f4892d8971311f1b06f3c6b78952ee70b", size = 185870, upload-time = "2026-01-10T09:22:39.893Z" }, - { url = "https://files.pythonhosted.org/packages/87/05/536652aa84ddc1c018dbb7e2c4cbcd0db884580bf8e95aece7593fde526f/websockets-16.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d6297ce39ce5c2e6feb13c1a996a2ded3b6832155fcfc920265c76f24c7cceb5", size = 185361, upload-time = "2026-01-10T09:22:41.016Z" }, - { url = "https://files.pythonhosted.org/packages/6d/e2/d5332c90da12b1e01f06fb1b85c50cfc489783076547415bf9f0a659ec19/websockets-16.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:1c1b30e4f497b0b354057f3467f56244c603a79c0d1dafce1d16c283c25f6e64", size = 184615, upload-time = "2026-01-10T09:22:42.442Z" }, - { url = "https://files.pythonhosted.org/packages/77/fb/d3f9576691cae9253b51555f841bc6600bf0a983a461c79500ace5a5b364/websockets-16.0-cp311-cp311-win32.whl", hash = "sha256:5f451484aeb5cafee1ccf789b1b66f535409d038c56966d6101740c1614b86c6", size = 178246, upload-time = "2026-01-10T09:22:43.654Z" }, - { url = "https://files.pythonhosted.org/packages/54/67/eaff76b3dbaf18dcddabc3b8c1dba50b483761cccff67793897945b37408/websockets-16.0-cp311-cp311-win_amd64.whl", hash = "sha256:8d7f0659570eefb578dacde98e24fb60af35350193e4f56e11190787bee77dac", size = 178684, upload-time = "2026-01-10T09:22:44.941Z" }, - { url = "https://files.pythonhosted.org/packages/84/7b/bac442e6b96c9d25092695578dda82403c77936104b5682307bd4deb1ad4/websockets-16.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:71c989cbf3254fbd5e84d3bff31e4da39c43f884e64f2551d14bb3c186230f00", size = 177365, upload-time = "2026-01-10T09:22:46.787Z" }, - { url = "https://files.pythonhosted.org/packages/b0/fe/136ccece61bd690d9c1f715baaeefd953bb2360134de73519d5df19d29ca/websockets-16.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:8b6e209ffee39ff1b6d0fa7bfef6de950c60dfb91b8fcead17da4ee539121a79", size = 175038, upload-time = "2026-01-10T09:22:47.999Z" }, - { url = "https://files.pythonhosted.org/packages/40/1e/9771421ac2286eaab95b8575b0cb701ae3663abf8b5e1f64f1fd90d0a673/websockets-16.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:86890e837d61574c92a97496d590968b23c2ef0aeb8a9bc9421d174cd378ae39", size = 175328, upload-time = "2026-01-10T09:22:49.809Z" }, - { url = "https://files.pythonhosted.org/packages/18/29/71729b4671f21e1eaa5d6573031ab810ad2936c8175f03f97f3ff164c802/websockets-16.0-cp312-cp312-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:9b5aca38b67492ef518a8ab76851862488a478602229112c4b0d58d63a7a4d5c", size = 184915, upload-time = "2026-01-10T09:22:51.071Z" }, - { url = "https://files.pythonhosted.org/packages/97/bb/21c36b7dbbafc85d2d480cd65df02a1dc93bf76d97147605a8e27ff9409d/websockets-16.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:e0334872c0a37b606418ac52f6ab9cfd17317ac26365f7f65e203e2d0d0d359f", size = 186152, upload-time = "2026-01-10T09:22:52.224Z" }, - { url = "https://files.pythonhosted.org/packages/4a/34/9bf8df0c0cf88fa7bfe36678dc7b02970c9a7d5e065a3099292db87b1be2/websockets-16.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:a0b31e0b424cc6b5a04b8838bbaec1688834b2383256688cf47eb97412531da1", size = 185583, upload-time = "2026-01-10T09:22:53.443Z" }, - { url = "https://files.pythonhosted.org/packages/47/88/4dd516068e1a3d6ab3c7c183288404cd424a9a02d585efbac226cb61ff2d/websockets-16.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:485c49116d0af10ac698623c513c1cc01c9446c058a4e61e3bf6c19dff7335a2", size = 184880, upload-time = "2026-01-10T09:22:55.033Z" }, - { url = "https://files.pythonhosted.org/packages/91/d6/7d4553ad4bf1c0421e1ebd4b18de5d9098383b5caa1d937b63df8d04b565/websockets-16.0-cp312-cp312-win32.whl", hash = "sha256:eaded469f5e5b7294e2bdca0ab06becb6756ea86894a47806456089298813c89", size = 178261, upload-time = "2026-01-10T09:22:56.251Z" }, - { url = "https://files.pythonhosted.org/packages/c3/f0/f3a17365441ed1c27f850a80b2bc680a0fa9505d733fe152fdf5e98c1c0b/websockets-16.0-cp312-cp312-win_amd64.whl", hash = "sha256:5569417dc80977fc8c2d43a86f78e0a5a22fee17565d78621b6bb264a115d4ea", size = 178693, upload-time = "2026-01-10T09:22:57.478Z" }, - { url = "https://files.pythonhosted.org/packages/cc/9c/baa8456050d1c1b08dd0ec7346026668cbc6f145ab4e314d707bb845bf0d/websockets-16.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:878b336ac47938b474c8f982ac2f7266a540adc3fa4ad74ae96fea9823a02cc9", size = 177364, upload-time = "2026-01-10T09:22:59.333Z" }, - { url = "https://files.pythonhosted.org/packages/7e/0c/8811fc53e9bcff68fe7de2bcbe75116a8d959ac699a3200f4847a8925210/websockets-16.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:52a0fec0e6c8d9a784c2c78276a48a2bdf099e4ccc2a4cad53b27718dbfd0230", size = 175039, upload-time = "2026-01-10T09:23:01.171Z" }, - { url = "https://files.pythonhosted.org/packages/aa/82/39a5f910cb99ec0b59e482971238c845af9220d3ab9fa76dd9162cda9d62/websockets-16.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:e6578ed5b6981005df1860a56e3617f14a6c307e6a71b4fff8c48fdc50f3ed2c", size = 175323, upload-time = "2026-01-10T09:23:02.341Z" }, - { url = "https://files.pythonhosted.org/packages/bd/28/0a25ee5342eb5d5f297d992a77e56892ecb65e7854c7898fb7d35e9b33bd/websockets-16.0-cp313-cp313-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:95724e638f0f9c350bb1c2b0a7ad0e83d9cc0c9259f3ea94e40d7b02a2179ae5", size = 184975, upload-time = "2026-01-10T09:23:03.756Z" }, - { url = "https://files.pythonhosted.org/packages/f9/66/27ea52741752f5107c2e41fda05e8395a682a1e11c4e592a809a90c6a506/websockets-16.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:c0204dc62a89dc9d50d682412c10b3542d748260d743500a85c13cd1ee4bde82", size = 186203, upload-time = "2026-01-10T09:23:05.01Z" }, - { url = "https://files.pythonhosted.org/packages/37/e5/8e32857371406a757816a2b471939d51c463509be73fa538216ea52b792a/websockets-16.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:52ac480f44d32970d66763115edea932f1c5b1312de36df06d6b219f6741eed8", size = 185653, upload-time = "2026-01-10T09:23:06.301Z" }, - { url = "https://files.pythonhosted.org/packages/9b/67/f926bac29882894669368dc73f4da900fcdf47955d0a0185d60103df5737/websockets-16.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:6e5a82b677f8f6f59e8dfc34ec06ca6b5b48bc4fcda346acd093694cc2c24d8f", size = 184920, upload-time = "2026-01-10T09:23:07.492Z" }, - { url = "https://files.pythonhosted.org/packages/3c/a1/3d6ccdcd125b0a42a311bcd15a7f705d688f73b2a22d8cf1c0875d35d34a/websockets-16.0-cp313-cp313-win32.whl", hash = "sha256:abf050a199613f64c886ea10f38b47770a65154dc37181bfaff70c160f45315a", size = 178255, upload-time = "2026-01-10T09:23:09.245Z" }, - { url = "https://files.pythonhosted.org/packages/6b/ae/90366304d7c2ce80f9b826096a9e9048b4bb760e44d3b873bb272cba696b/websockets-16.0-cp313-cp313-win_amd64.whl", hash = "sha256:3425ac5cf448801335d6fdc7ae1eb22072055417a96cc6b31b3861f455fbc156", size = 178689, upload-time = "2026-01-10T09:23:10.483Z" }, - { url = "https://files.pythonhosted.org/packages/f3/1d/e88022630271f5bd349ed82417136281931e558d628dd52c4d8621b4a0b2/websockets-16.0-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:8cc451a50f2aee53042ac52d2d053d08bf89bcb31ae799cb4487587661c038a0", size = 177406, upload-time = "2026-01-10T09:23:12.178Z" }, - { url = "https://files.pythonhosted.org/packages/f2/78/e63be1bf0724eeb4616efb1ae1c9044f7c3953b7957799abb5915bffd38e/websockets-16.0-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:daa3b6ff70a9241cf6c7fc9e949d41232d9d7d26fd3522b1ad2b4d62487e9904", size = 175085, upload-time = "2026-01-10T09:23:13.511Z" }, - { url = "https://files.pythonhosted.org/packages/bb/f4/d3c9220d818ee955ae390cf319a7c7a467beceb24f05ee7aaaa2414345ba/websockets-16.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:fd3cb4adb94a2a6e2b7c0d8d05cb94e6f1c81a0cf9dc2694fb65c7e8d94c42e4", size = 175328, upload-time = "2026-01-10T09:23:14.727Z" }, - { url = "https://files.pythonhosted.org/packages/63/bc/d3e208028de777087e6fb2b122051a6ff7bbcca0d6df9d9c2bf1dd869ae9/websockets-16.0-cp314-cp314-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:781caf5e8eee67f663126490c2f96f40906594cb86b408a703630f95550a8c3e", size = 185044, upload-time = "2026-01-10T09:23:15.939Z" }, - { url = "https://files.pythonhosted.org/packages/ad/6e/9a0927ac24bd33a0a9af834d89e0abc7cfd8e13bed17a86407a66773cc0e/websockets-16.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:caab51a72c51973ca21fa8a18bd8165e1a0183f1ac7066a182ff27107b71e1a4", size = 186279, upload-time = "2026-01-10T09:23:17.148Z" }, - { url = "https://files.pythonhosted.org/packages/b9/ca/bf1c68440d7a868180e11be653c85959502efd3a709323230314fda6e0b3/websockets-16.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:19c4dc84098e523fd63711e563077d39e90ec6702aff4b5d9e344a60cb3c0cb1", size = 185711, upload-time = "2026-01-10T09:23:18.372Z" }, - { url = "https://files.pythonhosted.org/packages/c4/f8/fdc34643a989561f217bb477cbc47a3a07212cbda91c0e4389c43c296ebf/websockets-16.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:a5e18a238a2b2249c9a9235466b90e96ae4795672598a58772dd806edc7ac6d3", size = 184982, upload-time = "2026-01-10T09:23:19.652Z" }, - { url = "https://files.pythonhosted.org/packages/dd/d1/574fa27e233764dbac9c52730d63fcf2823b16f0856b3329fc6268d6ae4f/websockets-16.0-cp314-cp314-win32.whl", hash = "sha256:a069d734c4a043182729edd3e9f247c3b2a4035415a9172fd0f1b71658a320a8", size = 177915, upload-time = "2026-01-10T09:23:21.458Z" }, - { url = "https://files.pythonhosted.org/packages/8a/f1/ae6b937bf3126b5134ce1f482365fde31a357c784ac51852978768b5eff4/websockets-16.0-cp314-cp314-win_amd64.whl", hash = "sha256:c0ee0e63f23914732c6d7e0cce24915c48f3f1512ec1d079ed01fc629dab269d", size = 178381, upload-time = "2026-01-10T09:23:22.715Z" }, - { url = "https://files.pythonhosted.org/packages/06/9b/f791d1db48403e1f0a27577a6beb37afae94254a8c6f08be4a23e4930bc0/websockets-16.0-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:a35539cacc3febb22b8f4d4a99cc79b104226a756aa7400adc722e83b0d03244", size = 177737, upload-time = "2026-01-10T09:23:24.523Z" }, - { url = "https://files.pythonhosted.org/packages/bd/40/53ad02341fa33b3ce489023f635367a4ac98b73570102ad2cdd770dacc9a/websockets-16.0-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:b784ca5de850f4ce93ec85d3269d24d4c82f22b7212023c974c401d4980ebc5e", size = 175268, upload-time = "2026-01-10T09:23:25.781Z" }, - { url = "https://files.pythonhosted.org/packages/74/9b/6158d4e459b984f949dcbbb0c5d270154c7618e11c01029b9bbd1bb4c4f9/websockets-16.0-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:569d01a4e7fba956c5ae4fc988f0d4e187900f5497ce46339c996dbf24f17641", size = 175486, upload-time = "2026-01-10T09:23:27.033Z" }, - { url = "https://files.pythonhosted.org/packages/e5/2d/7583b30208b639c8090206f95073646c2c9ffd66f44df967981a64f849ad/websockets-16.0-cp314-cp314t-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:50f23cdd8343b984957e4077839841146f67a3d31ab0d00e6b824e74c5b2f6e8", size = 185331, upload-time = "2026-01-10T09:23:28.259Z" }, - { url = "https://files.pythonhosted.org/packages/45/b0/cce3784eb519b7b5ad680d14b9673a31ab8dcb7aad8b64d81709d2430aa8/websockets-16.0-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:152284a83a00c59b759697b7f9e9cddf4e3c7861dd0d964b472b70f78f89e80e", size = 186501, upload-time = "2026-01-10T09:23:29.449Z" }, - { url = "https://files.pythonhosted.org/packages/19/60/b8ebe4c7e89fb5f6cdf080623c9d92789a53636950f7abacfc33fe2b3135/websockets-16.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:bc59589ab64b0022385f429b94697348a6a234e8ce22544e3681b2e9331b5944", size = 186062, upload-time = "2026-01-10T09:23:31.368Z" }, - { url = "https://files.pythonhosted.org/packages/88/a8/a080593f89b0138b6cba1b28f8df5673b5506f72879322288b031337c0b8/websockets-16.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:32da954ffa2814258030e5a57bc73a3635463238e797c7375dc8091327434206", size = 185356, upload-time = "2026-01-10T09:23:32.627Z" }, - { url = "https://files.pythonhosted.org/packages/c2/b6/b9afed2afadddaf5ebb2afa801abf4b0868f42f8539bfe4b071b5266c9fe/websockets-16.0-cp314-cp314t-win32.whl", hash = "sha256:5a4b4cc550cb665dd8a47f868c8d04c8230f857363ad3c9caf7a0c3bf8c61ca6", size = 178085, upload-time = "2026-01-10T09:23:33.816Z" }, - { url = "https://files.pythonhosted.org/packages/9f/3e/28135a24e384493fa804216b79a6a6759a38cc4ff59118787b9fb693df93/websockets-16.0-cp314-cp314t-win_amd64.whl", hash = "sha256:b14dc141ed6d2dde437cddb216004bcac6a1df0935d79656387bd41632ba0bbd", size = 178531, upload-time = "2026-01-10T09:23:35.016Z" }, - { url = "https://files.pythonhosted.org/packages/72/07/c98a68571dcf256e74f1f816b8cc5eae6eb2d3d5cfa44d37f801619d9166/websockets-16.0-pp311-pypy311_pp73-macosx_10_15_x86_64.whl", hash = "sha256:349f83cd6c9a415428ee1005cadb5c2c56f4389bc06a9af16103c3bc3dcc8b7d", size = 174947, upload-time = "2026-01-10T09:23:36.166Z" }, - { url = "https://files.pythonhosted.org/packages/7e/52/93e166a81e0305b33fe416338be92ae863563fe7bce446b0f687b9df5aea/websockets-16.0-pp311-pypy311_pp73-macosx_11_0_arm64.whl", hash = "sha256:4a1aba3340a8dca8db6eb5a7986157f52eb9e436b74813764241981ca4888f03", size = 175260, upload-time = "2026-01-10T09:23:37.409Z" }, - { url = "https://files.pythonhosted.org/packages/56/0c/2dbf513bafd24889d33de2ff0368190a0e69f37bcfa19009ef819fe4d507/websockets-16.0-pp311-pypy311_pp73-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:f4a32d1bd841d4bcbffdcb3d2ce50c09c3909fbead375ab28d0181af89fd04da", size = 176071, upload-time = "2026-01-10T09:23:39.158Z" }, - { url = "https://files.pythonhosted.org/packages/a5/8f/aea9c71cc92bf9b6cc0f7f70df8f0b420636b6c96ef4feee1e16f80f75dd/websockets-16.0-pp311-pypy311_pp73-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0298d07ee155e2e9fda5be8a9042200dd2e3bb0b8a38482156576f863a9d457c", size = 176968, upload-time = "2026-01-10T09:23:41.031Z" }, - { url = "https://files.pythonhosted.org/packages/9a/3f/f70e03f40ffc9a30d817eef7da1be72ee4956ba8d7255c399a01b135902a/websockets-16.0-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:a653aea902e0324b52f1613332ddf50b00c06fdaf7e92624fbf8c77c78fa5767", size = 178735, upload-time = "2026-01-10T09:23:42.259Z" }, - { url = "https://files.pythonhosted.org/packages/6f/28/258ebab549c2bf3e64d2b0217b973467394a9cea8c42f70418ca2c5d0d2e/websockets-16.0-py3-none-any.whl", hash = "sha256:1637db62fad1dc833276dded54215f2c7fa46912301a24bd94d45d46a011ceec", size = 171598, upload-time = "2026-01-10T09:23:45.395Z" }, -] - [[package]] name = "werkzeug" version = "3.1.6"