Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,6 @@ tests/integration/.binding-data/

# Crypto test material generated at test time (see tests/crypto_utils.py)
tests/integration/keys/

# Generated benchmark report (machine-specific, regenerated by bench_async_activities.py)
ext/dapr-ext-workflow/benchmarks/RESULTS.md
106 changes: 106 additions & 0 deletions examples/workflow/async_activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# -*- coding: utf-8 -*-
# Copyright 2026 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Async activities running alongside a sync one in a fan-out/fan-in workflow.

Each async activity simulates an I/O-bound call: it takes a payload, awaits a fixed
delay (standing in for a network round-trip), and returns a result payload. The async
instances run concurrently on the worker's event loop; a final sync activity aggregates
the results. Fan-out width, input/output payload sizes, and the delay are configurable
via environment variables.

Run with:

dapr run --app-id async-activities --app-protocol grpc --dapr-grpc-port 50001 \\
-- python async_activities.py
"""

from __future__ import annotations

import asyncio
import os
import random
import string
from time import sleep

import dapr.ext.workflow as wf
from pydantic import BaseModel

FAN_OUT = int(os.environ.get('WORKFLOW_FAN_OUT', '5'))
INPUT_BYTES = int(os.environ.get('WORKFLOW_INPUT_BYTES', '2048'))
OUTPUT_BYTES = int(os.environ.get('WORKFLOW_OUTPUT_BYTES', '1024'))
IO_SECONDS = float(os.environ.get('WORKFLOW_IO_SECONDS', '1.0'))

wfr = wf.WorkflowRuntime()


def _random_digits(n: int) -> str:
return ''.join(random.choices(string.digits, k=n))


class Payload(BaseModel):
index: int
data: str


@wfr.workflow(name='fan_out_fan_in_workflow')
def fan_out_fan_in_workflow(ctx: wf.DaprWorkflowContext, payloads: list[dict]):
tasks = [ctx.call_activity(process_payload, input=p) for p in payloads]
results = yield wf.when_all(tasks)
summary = yield ctx.call_activity(summarize, input=results)
return summary


@wfr.activity(name='process_payload')
async def process_payload(ctx: wf.WorkflowActivityContext, payload: Payload) -> str:
"""Async activity: simulate an I/O-bound call. Instances run concurrently on the loop."""
await asyncio.sleep(IO_SECONDS)
result = _random_digits(OUTPUT_BYTES)
print(
f'[async] payload {payload.index}: {len(payload.data)}B in -> {len(result)}B out',
flush=True,
)
return result


@wfr.activity(name='summarize')
def summarize(ctx: wf.WorkflowActivityContext, results: list[str]) -> str:
"""Sync activity: aggregate the fan-out results on the thread pool."""
total_bytes = sum(len(r) for r in results)
total_zeros = sum(r.count('0') for r in results)
summary = f'{len(results)} results, {total_bytes} bytes, {total_zeros} zeros'
print(f'[sync] {summary}', flush=True)
return summary


def main() -> None:
payloads = [
Payload(index=i, data=_random_digits(INPUT_BYTES)).model_dump() for i in range(FAN_OUT)
]

wfr.start()
sleep(5) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(workflow=fan_out_fan_in_workflow, input=payloads)
print(f'Workflow started. Instance ID: {instance_id}')

state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
assert state is not None
print(f'Workflow completed! Status: {state.runtime_status.name}')
print(f'Workflow result: {state.serialized_output.strip(chr(34))}')

wfr.shutdown()


if __name__ == '__main__':
main()
21 changes: 20 additions & 1 deletion ext/dapr-ext-workflow/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,24 @@ The entry point for registration and lifecycle:

Internally wraps user functions: workflow functions get a `DaprWorkflowContext`, activity functions get a `WorkflowActivityContext`. Tracks registration state via `_workflow_registered` / `_activity_registered` attributes on functions to prevent double registration.

#### Sync and async activities

Activities can be either `def my_activity(ctx, inp)` or `async def my_activity(ctx, inp)`. At registration, `_make_activity_wrapper` calls `_is_async_callable(fn)` to detect async-ness. That helper unwraps `functools.partial`, `@functools.wraps` chains, and callable-class `__call__` so common decorator patterns route correctly. The wrapper is built `async def` or `def` to match, then stored in the registry.

At dispatch time (the gRPC stream loop in `_durabletask/worker.py`), `is_async_callable(activity_fn)` on the wrapper selects between two handlers.

- **Async activities** go through `_execute_activity_async`, then `_ActivityExecutor.execute_async`, which awaits `fn(...)` directly on the event loop. The gRPC response is delivered via `loop.run_in_executor(self._async_worker_manager.thread_pool, stub.CompleteActivityTask, ...)` — the same pool sync activities use, sized by `maximum_thread_pool_workers`.
- **Sync activities** go through `_execute_activity`, dispatched to the thread pool by `_AsyncWorkerManager._run_func`. The activity runs on a worker thread, and the response is delivered from the same thread.

Workflow (orchestrator) functions must remain generators (`def` with `yield`). They cannot be `async def` because durabletask's deterministic replay depends on synchronous generator semantics. Only activities support async.

**Decorator ordering gotcha.** Stacking `@wfr.activity` over `@alternate_name(...)` over `async def` works because `@alternate_name` now emits an `async def innerfn` when the wrapped function is async. A user-written decorator that wraps an async function in a sync `def` (without `@functools.wraps` exposing `__wrapped__`) defeats `_is_async_callable`, routes the activity to the sync path, and produces an un-awaited coroutine. Such decorators should use `@functools.wraps(fn)` so the unwrap walks through them.

**`maximum_thread_pool_workers` covers both paths.** This knob sizes the worker thread pool used for sync-activity bodies and for async-activity gRPC response sends. Mixed workloads with long-running sync activities can starve async response delivery (and vice versa) since they share the pool — size to the sum of peak sync activity concurrency and peak in-flight async response sends.

**Concurrency sizing and load characterization.** See `docs/concurrency.md` for sizing recommendations (`maximum_concurrent_activity_work_items`, `maximum_thread_pool_workers`) and an async-vs-sync decision tree. The `benchmarks/` directory ships `bench_async_activities.py`; re-run it locally before claiming a perf regression. The generated `RESULTS.md` is gitignored because numbers are machine-specific; see `docs/concurrency.md` for the regen command.


### DaprWorkflowClient (`dapr_workflow_client.py`)

Client for workflow lifecycle management:
Expand Down Expand Up @@ -163,7 +181,7 @@ Retry configuration for activities and child workflows:
1. **Registration**: User decorates functions with `@wfr.workflow` / `@wfr.activity`. The runtime wraps them and stores them in the durabletask worker's registry.
2. **Startup**: `wfr.start()` opens a gRPC stream to the Dapr sidecar. The worker polls for work items.
3. **Scheduling**: Client calls `schedule_new_workflow(fn, input=...)`. The function's name (or `_dapr_alternate_name`) is sent to the backend.
4. **Execution**: The durabletask engine dispatches work items. Workflow functions are Python **generators** that `yield` tasks (activity calls, timers, child workflows). The engine records history; on replay, yielded tasks return cached results without re-executing.
4. **Execution**: The durabletask engine dispatches work items. Workflow functions are Python **generators** that `yield` tasks (activity calls, timers, child workflows). Activity functions are either sync (dispatched to the worker's thread pool) or `async def` (awaited directly on the worker's event loop). The engine records history; on replay, yielded tasks return cached results without re-executing.
5. **Determinism**: Workflows must be deterministic — no random, no wall-clock time, no I/O. Use `ctx.current_utc_datetime` instead of `datetime.now()`. Use `ctx.is_replaying` to guard side effects like logging.
6. **Completion**: Client polls via `wait_for_workflow_completion()` or `get_workflow_state()`.

Expand Down Expand Up @@ -191,6 +209,7 @@ Two example directories exercise workflows:
- `cross-app1.py`, `cross-app2.py`, `cross-app3.py` — cross-app calls
- `versioning.py` — workflow versioning with `is_patched()`
- `simple_aio_client.py` — async client variant
- `async_activities.py` — `async def` activities (fan-out/fan-in with simulated I/O, configurable payload sizes)

## Testing

Expand Down
223 changes: 223 additions & 0 deletions ext/dapr-ext-workflow/benchmarks/_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
# -*- coding: utf-8 -*-
# Copyright 2026 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Run-environment capture and Markdown formatting for the benchmark report."""

from __future__ import annotations

import os
import platform
import shutil
import subprocess
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path

from dapr.ext.workflow._bench_harness import IS_DARWIN, ScenarioMetrics, SustainedMetrics


def _read_text(path: str) -> str:
try:
return Path(path).read_text(encoding='utf-8', errors='ignore')
except OSError:
return ''


def _cpu_model() -> str:
"""Best-effort CPU model name. Cross-platform; returns a placeholder on failure."""
if IS_DARWIN:
sysctl = shutil.which('sysctl')
if sysctl is not None:
try:
out = subprocess.run(
[sysctl, '-n', 'machdep.cpu.brand_string'],
capture_output=True,
text=True,
timeout=2,
)
if out.returncode == 0 and out.stdout.strip():
return out.stdout.strip()
except (subprocess.SubprocessError, OSError):
pass
cpuinfo = _read_text('/proc/cpuinfo')
for line in cpuinfo.splitlines():
if line.startswith('model name'):
return line.split(':', 1)[1].strip()
return platform.processor() or platform.machine() or 'unknown'


def _total_memory_gb() -> float:
"""Best-effort total physical memory in GB. Returns 0 on failure."""
if IS_DARWIN:
sysctl = shutil.which('sysctl')
if sysctl is not None:
try:
out = subprocess.run(
[sysctl, '-n', 'hw.memsize'],
capture_output=True,
text=True,
timeout=2,
)
if out.returncode == 0 and out.stdout.strip().isdigit():
return int(out.stdout.strip()) / (1024**3)
except (subprocess.SubprocessError, OSError):
pass
meminfo = _read_text('/proc/meminfo')
for line in meminfo.splitlines():
if line.startswith('MemTotal:'):
parts = line.split()
if len(parts) >= 2 and parts[1].isdigit():
return int(parts[1]) / (1024**2)
return 0.0


def _git_commit() -> str:
"""Short git commit hash, or 'unknown' if not in a git repo."""
git = shutil.which('git')
if git is None:
return 'unknown'
try:
out = subprocess.run(
[git, 'rev-parse', '--short', 'HEAD'],
capture_output=True,
text=True,
timeout=2,
cwd=Path(__file__).parent,
)
if out.returncode == 0:
commit = out.stdout.strip()
# Mark dirty if there are uncommitted changes.
status = subprocess.run(
[git, 'status', '--porcelain'],
capture_output=True,
text=True,
timeout=2,
cwd=Path(__file__).parent,
)
if status.returncode == 0 and status.stdout.strip():
return f'{commit}-dirty'
return commit
except (subprocess.SubprocessError, OSError):
pass
return 'unknown'


@dataclass(slots=True)
class RunEnvironment:
"""Snapshot of the machine the benchmark ran on."""

timestamp_utc: str
git_commit: str
python_version: str
python_implementation: str
platform: str
os_release: str
cpu_model: str
cpu_logical_cores: int
cpu_physical_cores_hint: int
total_memory_gb: float
is_ci: bool

@classmethod
def capture(cls) -> 'RunEnvironment':
return cls(
timestamp_utc=datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC'),
git_commit=_git_commit(),
python_version=platform.python_version(),
python_implementation=platform.python_implementation(),
platform=platform.platform(),
os_release=f'{platform.system()} {platform.release()} ({platform.machine()})',
cpu_model=_cpu_model(),
cpu_logical_cores=os.cpu_count() or 0,
cpu_physical_cores_hint=os.cpu_count() or 0,
total_memory_gb=_total_memory_gb(),
is_ci=any(os.environ.get(k) for k in ('CI', 'GITHUB_ACTIONS', 'TRAVIS', 'BUILDKITE')),
)


def _format_environment_block(env: RunEnvironment) -> str:
mem_str = f'{env.total_memory_gb:.1f} GB' if env.total_memory_gb > 0 else 'unknown'
return (
'## Run environment\n'
'\n'
f'- **Timestamp**: {env.timestamp_utc}\n'
f'- **Git commit**: `{env.git_commit}`\n'
f'- **Python**: {env.python_implementation} {env.python_version}\n'
f'- **OS**: {env.os_release}\n'
f'- **CPU**: {env.cpu_model} ({env.cpu_logical_cores} logical cores)\n'
f'- **Memory**: {mem_str}\n'
'\n'
'Numbers are specific to this machine; the sync-vs-async gap is what transfers across'
' hardware, not the absolute values.'
)


def _speedup_cell(speedup: float) -> str:
if speedup > 1.2:
dot = '🟢'
elif speedup >= 0.8:
dot = '⚪'
else:
dot = '🔴'
return f'{dot} {speedup:.1f}x'


def _format_comparison_table(
rows: list[tuple[str, ScenarioMetrics, ScenarioMetrics]],
key_label: str = 'N',
show_async_rss: bool = False,
) -> str:
rss_header = ' Async RAM (MB) |' if show_async_rss else ''
rss_rule = ' ---: |' if show_async_rss else ''
header = (
f'| {key_label} | Sync (s) | Async (s) | Speedup |{rss_header}\n'
f'| ---: | ---: | ---: | ---: |{rss_rule}\n'
)
lines = []
for key, sync_m, async_m in rows:
speedup = sync_m.wallclock_s / async_m.wallclock_s if async_m.wallclock_s > 0 else 0.0
rss = f' {async_m.peak_rss_delta_mb:.0f} |' if show_async_rss else ''
lines.append(
f'| {key} | {sync_m.wallclock_s:.2f} | {async_m.wallclock_s:.2f} |'
f' {_speedup_cell(speedup)} |{rss}'
)
return header + '\n'.join(lines)


def _format_sustained_table(sync_m: SustainedMetrics, async_m: SustainedMetrics) -> str:
def row(label: str, sync_val: str, async_val: str) -> str:
return f'| {label} | {sync_val} | {async_val} |'

header = '| Metric | Sync | Async |\n| --- | ---: | ---: |\n'
rows = [
row(
'Effective throughput',
f'{sync_m.throughput_per_s:.0f}/s',
f'{async_m.throughput_per_s:.0f}/s',
),
row(
'p99 latency',
f'{sync_m.latency_overall.p99_ms:.0f} ms',
f'{async_m.latency_overall.p99_ms:.0f} ms',
),
row(
'p99 first quarter',
f'{sync_m.latency_first_quarter.p99_ms:.0f} ms',
f'{async_m.latency_first_quarter.p99_ms:.0f} ms',
),
row(
'p99 last quarter',
f'{sync_m.latency_last_quarter.p99_ms:.0f} ms',
f'{async_m.latency_last_quarter.p99_ms:.0f} ms',
),
]
return header + '\n'.join(rows)
Loading
Loading