Skip to content

Commit 822c010

Browse files
committed
Refactor metadata propagation in ReduceFnRunner to support extensible PipelineMetadata
1 parent 1f83bf0 commit 822c010

File tree

7 files changed

+311
-16
lines changed

7 files changed

+311
-16
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.core;
19+
20+
/** Interface for combining pipeline metadata. */
21+
interface MetadataCombiner<T> {
22+
T createAccumulator();
23+
24+
T addInput(T accumulator, T input);
25+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.core;
19+
20+
import org.apache.beam.sdk.values.CausedByDrain;
21+
import org.apache.beam.sdk.values.PipelineMetadata;
22+
23+
/** Combiner for PipelineMetadata. */
24+
class PipelineMetadataCombiner implements MetadataCombiner<PipelineMetadata> {
25+
private static final PipelineMetadataCombiner INSTANCE = new PipelineMetadataCombiner();
26+
27+
public static PipelineMetadataCombiner of() {
28+
return INSTANCE;
29+
}
30+
31+
private final CausedByDrainCombiner causedByDrainCombiner = CausedByDrainCombiner.of();
32+
33+
@Override
34+
public PipelineMetadata createAccumulator() {
35+
return PipelineMetadata.create(causedByDrainCombiner.createAccumulator());
36+
}
37+
38+
@Override
39+
public PipelineMetadata addInput(PipelineMetadata accumulator, PipelineMetadata input) {
40+
return PipelineMetadata.create(
41+
causedByDrainCombiner.addInput(accumulator.causedByDrain(), input.causedByDrain()));
42+
}
43+
44+
/** Combiner for CausedByDrain metadata. */
45+
static class CausedByDrainCombiner implements MetadataCombiner<CausedByDrain> {
46+
private static final CausedByDrainCombiner INSTANCE = new CausedByDrainCombiner();
47+
48+
public static CausedByDrainCombiner of() {
49+
return INSTANCE;
50+
}
51+
52+
@Override
53+
public CausedByDrain createAccumulator() {
54+
return CausedByDrain.NORMAL;
55+
}
56+
57+
@Override
58+
public CausedByDrain addInput(CausedByDrain current, CausedByDrain input) {
59+
if (current == CausedByDrain.CAUSED_BY_DRAIN || input == CausedByDrain.CAUSED_BY_DRAIN) {
60+
return CausedByDrain.CAUSED_BY_DRAIN;
61+
}
62+
return CausedByDrain.NORMAL;
63+
}
64+
}
65+
}

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.beam.sdk.metrics.Metrics;
4040
import org.apache.beam.sdk.options.PipelineOptions;
4141
import org.apache.beam.sdk.state.TimeDomain;
42+
import org.apache.beam.sdk.state.ValueState;
4243
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
4344
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
4445
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
@@ -51,6 +52,7 @@
5152
import org.apache.beam.sdk.values.CausedByDrain;
5253
import org.apache.beam.sdk.values.KV;
5354
import org.apache.beam.sdk.values.PCollection;
55+
import org.apache.beam.sdk.values.PipelineMetadata;
5456
import org.apache.beam.sdk.values.WindowedValue;
5557
import org.apache.beam.sdk.values.WindowedValues;
5658
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -107,6 +109,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
107109
* <li>It uses discarding or accumulation mode according to the {@link WindowingStrategy}.
108110
* </ul>
109111
*/
112+
static final StateTag<ValueState<PipelineMetadata>> METADATA_TAG =
113+
StateTags.makeSystemTagInternal(
114+
StateTags.value("pipelineMetadata", PipelineMetadata.Coder.of()));
115+
110116
private final WindowingStrategy<Object, W> windowingStrategy;
111117

112118
private final WindowedValueReceiver<KV<K, OutputT>> outputter;
@@ -376,7 +382,7 @@ public void processElements(Iterable<WindowedValue<InputT>> values) throws Excep
376382
emit(
377383
contextFactory.base(window, StateStyle.DIRECT),
378384
contextFactory.base(window, StateStyle.RENAMED),
379-
CausedByDrain.NORMAL);
385+
PipelineMetadata.createDefault());
380386
}
381387

382388
// We're all done with merging and emitting elements so can compress the activeWindow state.
@@ -590,6 +596,17 @@ private void processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT>
590596
value.getTimestamp(),
591597
StateStyle.DIRECT,
592598
value.causedByDrain());
599+
600+
ValueState<PipelineMetadata> metadataState = directContext.state().access(METADATA_TAG);
601+
PipelineMetadata currentMetadata = metadataState.read();
602+
if (currentMetadata == null) {
603+
currentMetadata = PipelineMetadata.createDefault();
604+
}
605+
PipelineMetadata inputMetadata = PipelineMetadata.create(value.causedByDrain());
606+
PipelineMetadata newMetadata =
607+
PipelineMetadataCombiner.of().addInput(currentMetadata, inputMetadata);
608+
metadataState.write(newMetadata);
609+
593610
if (triggerRunner.isClosed(directContext.state())) {
594611
// This window has already been closed.
595612
droppedDueToClosedWindow.inc();
@@ -792,7 +809,7 @@ public void onTimers(Iterable<TimerData> timers) throws Exception {
792809
renamedContext,
793810
true /* isFinished */,
794811
windowActivation.isEndOfWindow,
795-
windowActivation.causedByDrain);
812+
PipelineMetadata.create(windowActivation.causedByDrain));
796813
checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold);
797814
}
798815

@@ -810,7 +827,10 @@ public void onTimers(Iterable<TimerData> timers) throws Exception {
810827
if (windowActivation.windowIsActiveAndOpen()
811828
&& triggerRunner.shouldFire(
812829
directContext.window(), directContext.timers(), directContext.state())) {
813-
emit(directContext, renamedContext, windowActivation.causedByDrain);
830+
emit(
831+
directContext,
832+
renamedContext,
833+
PipelineMetadata.create(windowActivation.causedByDrain));
814834
}
815835

816836
if (windowActivation.isEndOfWindow) {
@@ -874,6 +894,7 @@ private void clearAllState(
874894
triggerRunner.clearState(
875895
directContext.window(), directContext.timers(), directContext.state());
876896
paneInfoTracker.clear(directContext.state());
897+
directContext.state().access(METADATA_TAG).clear();
877898
} else {
878899
// If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2).
879900
// For (1), if !activeWindows.isActive then the window must be merging and has been
@@ -934,8 +955,9 @@ private void prefetchEmit(
934955
private void emit(
935956
ReduceFn<K, InputT, OutputT, W>.Context directContext,
936957
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
937-
CausedByDrain causedByDrain)
958+
PipelineMetadata metadata)
938959
throws Exception {
960+
939961
checkState(
940962
triggerRunner.shouldFire(
941963
directContext.window(), directContext.timers(), directContext.state()));
@@ -950,13 +972,14 @@ private void emit(
950972
// Run onTrigger to produce the actual pane contents.
951973
// As a side effect it will clear all element holds, but not necessarily any
952974
// end-of-window or garbage collection holds.
953-
onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/, causedByDrain);
975+
onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/, metadata);
954976

955977
// Now that we've triggered, the pane is empty.
956978
nonEmptyPanes.clearPane(renamedContext.state());
957979

958980
// Cleanup buffered data if appropriate
959981
if (shouldDiscard) {
982+
directContext.state().access(METADATA_TAG).clear();
960983
// Cleanup flavor C: The user does not want any buffered data to persist between panes.
961984
reduceFn.clearState(renamedContext);
962985
}
@@ -1009,8 +1032,19 @@ private void prefetchOnTrigger(
10091032
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
10101033
final boolean isFinished,
10111034
boolean isEndOfWindow,
1012-
CausedByDrain causedByDrain)
1035+
PipelineMetadata metadata)
10131036
throws Exception {
1037+
ValueState<PipelineMetadata> metadataState = directContext.state().access(METADATA_TAG);
1038+
PipelineMetadata aggregatedMetadata = metadataState.read();
1039+
if (aggregatedMetadata == null) {
1040+
aggregatedMetadata = PipelineMetadata.createDefault();
1041+
}
1042+
PipelineMetadata fullyAggregatedMetadata =
1043+
PipelineMetadataCombiner.of().addInput(aggregatedMetadata, metadata);
1044+
final CausedByDrain aggregatedCausedByDrain = fullyAggregatedMetadata.causedByDrain();
1045+
if (isFinished) {
1046+
metadataState.clear();
1047+
}
10141048
// Extract the window hold, and as a side effect clear it.
10151049
final WatermarkHold.OldAndNewHolds pair =
10161050
watermarkHold.extractAndRelease(renamedContext, isFinished).read();
@@ -1081,12 +1115,12 @@ private void prefetchOnTrigger(
10811115
.setValue(KV.of(key, toOutput))
10821116
.setTimestamp(outputTimestamp)
10831117
.setWindows(windows)
1084-
.setCausedByDrain(causedByDrain)
1118+
.setCausedByDrain(aggregatedCausedByDrain)
10851119
.setPaneInfo(paneInfo)
10861120
.setReceiver(outputter)
10871121
.output();
10881122
},
1089-
causedByDrain);
1123+
aggregatedCausedByDrain);
10901124

10911125
reduceFn.onTrigger(renamedTriggerContext);
10921126
}

runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ public boolean hasNoActiveWindows() {
318318
public final void assertHasOnlyGlobalAndFinishedSetsFor(W... expectedWindows) {
319319
assertHasOnlyGlobalAndAllowedTags(
320320
ImmutableSet.copyOf(expectedWindows),
321-
ImmutableSet.of(TriggerStateMachineRunner.FINISHED_BITS_TAG));
321+
ImmutableSet.of(TriggerStateMachineRunner.FINISHED_BITS_TAG, ReduceFnRunner.METADATA_TAG));
322322
}
323323

324324
@SafeVarargs
@@ -331,7 +331,8 @@ public final void assertHasOnlyGlobalAndStateFor(W... expectedWindows) {
331331
PaneInfoTracker.PANE_INFO_TAG,
332332
WatermarkHold.watermarkHoldTagForTimestampCombiner(
333333
objectStrategy.getTimestampCombiner()),
334-
WatermarkHold.EXTRA_HOLD_TAG));
334+
WatermarkHold.EXTRA_HOLD_TAG,
335+
ReduceFnRunner.METADATA_TAG));
335336
}
336337

337338
@SafeVarargs
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.values;
19+
20+
import com.google.auto.value.AutoValue;
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
import java.io.OutputStream;
24+
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
25+
import org.apache.beam.sdk.coders.AtomicCoder;
26+
import org.apache.beam.sdk.coders.ByteArrayCoder;
27+
28+
/**
29+
* Encapsulates metadata that propagates with elements in the pipeline.
30+
*
31+
* <p>This metadata is sent along with {@link WindowedValue} elements. It currently includes fields
32+
* like {@link CausedByDrain}, and is designed to be extensible to support future metadata fields
33+
* such as OpenTelemetry context or CDC (Change Data Capture) kind.
34+
*
35+
* <p>The purpose of this class is to group targeted metadata fields together. This makes it easier
36+
* to define combination strategies (e.g., when accumulating state in {@code ReduceFnRunner}) when
37+
* multiple elements are merged or grouped, without having to extend method signatures or state
38+
* handling for every new metadata field.
39+
*/
40+
@AutoValue
41+
public abstract class PipelineMetadata {
42+
public abstract CausedByDrain causedByDrain();
43+
44+
public static PipelineMetadata create(CausedByDrain causedByDrain) {
45+
return new AutoValue_PipelineMetadata(causedByDrain);
46+
}
47+
48+
public static PipelineMetadata createDefault() {
49+
return create(CausedByDrain.NORMAL);
50+
}
51+
52+
public static class Coder extends AtomicCoder<PipelineMetadata> {
53+
private static final Coder INSTANCE = new Coder();
54+
55+
public static Coder of() {
56+
return INSTANCE;
57+
}
58+
59+
@Override
60+
public void encode(PipelineMetadata value, OutputStream outStream) throws IOException {
61+
BeamFnApi.Elements.ElementMetadata.Builder builder =
62+
BeamFnApi.Elements.ElementMetadata.newBuilder();
63+
builder.setDrain(
64+
value.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
65+
? BeamFnApi.Elements.DrainMode.Enum.DRAINING
66+
: BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING);
67+
68+
ByteArrayCoder.of().encode(builder.build().toByteArray(), outStream);
69+
}
70+
71+
@Override
72+
public PipelineMetadata decode(InputStream inStream) throws IOException {
73+
byte[] bytes = ByteArrayCoder.of().decode(inStream);
74+
BeamFnApi.Elements.ElementMetadata proto =
75+
BeamFnApi.Elements.ElementMetadata.parseFrom(bytes);
76+
77+
CausedByDrain causedByDrain =
78+
proto.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING
79+
? CausedByDrain.CAUSED_BY_DRAIN
80+
: CausedByDrain.NORMAL;
81+
82+
return PipelineMetadata.create(causedByDrain);
83+
}
84+
}
85+
}

sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,10 @@ public static void setMetadataSupported() {
864864
metadataSupported = true;
865865
}
866866

867+
public static void setMetadataNotSupported() {
868+
metadataSupported = false;
869+
}
870+
867871
public static boolean isMetadataSupported() {
868872
return metadataSupported;
869873
}

0 commit comments

Comments
 (0)