Skip to content

Commit e434793

Browse files
authored
Refresh documentation for annotations and spans (#8593)
1 parent 30d5df7 commit e434793

4 files changed

Lines changed: 104 additions & 21 deletions

File tree

distributed/tests/test_client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7275,6 +7275,20 @@ async def test_annotations_submit_map(c, s, a, b):
72757275
assert not b.state.tasks
72767276

72777277

7278+
@gen_cluster(client=True)
7279+
async def test_annotations_global_vs_local(c, s, a, b):
7280+
"""Test that local annotations take precedence over global annotations"""
7281+
with dask.annotate(foo=1):
7282+
x = delayed(inc)(1, dask_key_name="x")
7283+
y = delayed(inc)(2, dask_key_name="y")
7284+
with dask.annotate(foo=2):
7285+
xf, yf = c.compute([x, y])
7286+
7287+
await c.gather(xf, yf)
7288+
assert s.tasks["x"].annotations == {"foo": 1}
7289+
assert s.tasks["y"].annotations == {"foo": 2}
7290+
7291+
72787292
@gen_cluster(client=True)
72797293
async def test_workers_collection_restriction(c, s, a, b):
72807294
da = pytest.importorskip("dask.array")

distributed/tests/test_spans.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -841,3 +841,21 @@ def f():
841841

842842
# No annotation is created for the default span
843843
assert await c.submit(dask.get_annotations) == {}
844+
845+
846+
@gen_cluster(client=True)
847+
async def test_span_on_persist(c, s, a, b):
848+
"""As a workaround to lack of annotations support in dask-expr and loss of
849+
annotations due to low level optimization in dask.array, you can use span() to wrap
850+
calls to persist() and compute()
851+
"""
852+
x = delayed(inc)(1, dask_key_name="x")
853+
with span("x") as x_id:
854+
x = c.persist(x)
855+
y = delayed(inc)(x, dask_key_name="y")
856+
with span("y") as y_id:
857+
y = c.compute(y)
858+
assert await y == 3
859+
860+
assert s.tasks["x"].group.span_id == x_id
861+
assert s.tasks["y"].group.span_id == y_id

docs/source/resources.rst

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -143,19 +143,49 @@ memory as actual resources and uses these in normal scheduling operation.
143143
Resources with collections
144144
--------------------------
145145

146-
You can also use resources with Dask collections, like arrays, dataframes, and
147-
delayed objects. You can annotate operations on collections with specific resources
148-
that should be required perform the computation using the dask annotations machinery.
146+
You can also use resources with Dask collections, like arrays and delayed objects. You
147+
can annotate operations on collections with specific resources that should be required
148+
to perform the computation using the dask annotations machinery.
149149

150150
.. code-block:: python
151151
152-
x = dd.read_csv(...)
152+
# Read note below!
153+
dask.config.set({"optimization.fuse.active": False})
154+
x = da.read_zarr(...)
153155
with dask.annotate(resources={'GPU': 1}):
154-
y = x.map_partitions(func1)
155-
z = y.map_partitions(func2)
156+
y = x.map_blocks(func1)
157+
z = y.map_blocks(func2)
158+
z.compute()
156159
157-
z.compute(optimize_graph=False)
160+
.. note::
158161

159-
In most cases (such as the case above) the annotations for ``y`` may be lost during
160-
graph optimization before execution. You can avoid that by passing the
161-
``optimize_graph=False`` keyword.
162+
This feature is currently supported for dataframes only when
163+
``with dask.annotate(...):`` wraps the `compute()` or `persist()` call; in that
164+
case, the annotation applies to the whole graph, starting from and excluding
165+
any previously persisted collections.
166+
167+
For other collections, like arrays and delayed objects, annotations can get lost
168+
during the optimization phase. To prevent this issue, you must set:
169+
170+
>>> dask.config.set({"optimization.fuse.active": False})
171+
172+
Or in dask.yaml:
173+
174+
.. code-block:: yaml
175+
176+
optimization:
177+
fuse:
178+
active: false
179+
180+
A possible workaround, that also works for dataframes, can be to perform
181+
intermediate calls to `persist()`. Note however that this can significantly
182+
impact optimizations and reduce overall performance.
183+
184+
.. code-block:: python
185+
186+
x = dd.read_parquet(...)
187+
with dask.annotate(resources={'GPU': 1}):
188+
y = x.map_partitions(func1).persist()
189+
z = y.map_partitions(func2)
190+
del y # Release distributed memory for y as soon as possible
191+
z.compute()

docs/source/spans.rst

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,21 @@ For example:
2626
.. code-block:: python
2727
2828
import dask.config
29-
import dask.dataframe as dd
29+
import dask.array as da
3030
from distributed import Client, span
3131
32+
# Read important note below
3233
dask.config.set({"optimization.fuse.active": False})
3334
client = Client()
3435
3536
with span("Alice's workflow"):
3637
with span("data load"):
37-
df = dd.read_parquet(...)
38-
38+
a = da.read_zarr(...)
3939
with span("ML preprocessing"):
40-
df = preprocess(df)
41-
40+
a = preprocess(a)
4241
with span("Model training"):
43-
model = train(df)
44-
45-
model = model.compute()
42+
model = train(a)
43+
model = model.compute()
4644
4745
Note how the :func:`span` context manager can be nested.
4846
The example will create the following spans on the scheduler:
@@ -95,10 +93,16 @@ Additionally, spans can be queried using scheduler extensions or
9593

9694
User API
9795
--------
98-
.. warning::
96+
.. important::
9997

100-
Spans are based on annotations, and just like annotations they can be lost during
101-
optimization. To prevent this issue, you must set
98+
Dataframes have a minimum granularity of a single call to `compute()` or `persist()`
99+
and can't break it down further into groups of operations - if the example above
100+
used dataframes, everything would have been uniformly tagged as "Alice's Workflow",
101+
as it is the span that's active during `compute()`.
102+
103+
In other collections, such as arrays and delayed objects, spans that don't wrap
104+
around a call to `compute()` or `persist()` can get lost during the optimization
105+
phase. To prevent this issue, you must set
102106

103107
>>> dask.config.set({"optimization.fuse.active": False})
104108

@@ -110,6 +114,23 @@ User API
110114
fuse:
111115
active: false
112116
117+
A possible workaround, that also works for dataframes, can be to perform
118+
intermediate calls to `persist()`. Note however that this can significantly
119+
impact optimizations and reduce overall performance.
120+
121+
.. code-block:: python
122+
123+
with span("Alice's workflow"):
124+
with span("data load"):
125+
a = dd.read_parquet(...).persist()
126+
with span("ML preprocessing"):
127+
a = preprocess(a).persist()
128+
del a # Release distributed memory for a as soon as possible
129+
with span("Model training"):
130+
model = train(b).persist()
131+
del b # Release distributed memory for b as soon as possible
132+
model = model.compute()
133+
113134
.. autofunction:: span
114135

115136

0 commit comments

Comments
 (0)