Skip to content

Commit 5a6f763

Browse files
authored
[Drain] OnTimer - propagate caused by drain bit up to DoFnRunner (#37012)
* add causedByDrain to DoFnRunner.onTimer interface and all implementations. Mostly passthrough.
1 parent af81a0c commit 5a6f763

55 files changed

Lines changed: 353 additions & 97 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.beam.sdk.state.TimeDomain;
2121
import org.apache.beam.sdk.transforms.DoFn;
2222
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
23+
import org.apache.beam.sdk.values.CausedByDrain;
2324
import org.apache.beam.sdk.values.WindowedValue;
2425
import org.checkerframework.checker.nullness.qual.Nullable;
2526
import org.joda.time.Instant;
@@ -46,7 +47,8 @@ public interface DoFnRunner<InputT extends @Nullable Object, OutputT extends @Nu
4647
BoundedWindow window,
4748
Instant timestamp,
4849
Instant outputTimestamp,
49-
TimeDomain timeDomain);
50+
TimeDomain timeDomain,
51+
CausedByDrain causedByDrain);
5052

5153
/**
5254
* Calls a {@link DoFn DoFn's} {@link DoFn.FinishBundle @FinishBundle} method and performs

runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.beam.sdk.transforms.DoFn;
2626
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2727
import org.apache.beam.sdk.util.WindowTracing;
28+
import org.apache.beam.sdk.values.CausedByDrain;
2829
import org.apache.beam.sdk.values.KV;
2930
import org.apache.beam.sdk.values.WindowedValue;
3031
import org.apache.beam.sdk.values.WindowedValues;
@@ -89,8 +90,10 @@ public <KeyT> void onTimer(
8990
BoundedWindow window,
9091
Instant timestamp,
9192
Instant outputTimestamp,
92-
TimeDomain timeDomain) {
93-
doFnRunner.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
93+
TimeDomain timeDomain,
94+
CausedByDrain causedByDrain) {
95+
doFnRunner.onTimer(
96+
timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain, causedByDrain);
9497
}
9598

9699
@Override

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
4848
import org.apache.beam.sdk.util.OutputBuilderSuppliers;
4949
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
50+
import org.apache.beam.sdk.values.CausedByDrain;
5051
import org.apache.beam.sdk.values.KV;
5152
import org.apache.beam.sdk.values.PCollectionView;
5253
import org.apache.beam.sdk.values.Row;
@@ -396,6 +397,11 @@ public PaneInfo pane() {
396397
return element.getRecordOffset();
397398
}
398399

400+
@Override
401+
public CausedByDrain causedByDrain() {
402+
return element.causedByDrain();
403+
}
404+
399405
@Override
400406
public PipelineOptions getPipelineOptions() {
401407
return pipelineOptions;

runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.beam.sdk.transforms.DoFn;
2828
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2929
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
30+
import org.apache.beam.sdk.values.CausedByDrain;
3031
import org.apache.beam.sdk.values.KV;
3132
import org.apache.beam.sdk.values.PCollectionView;
3233
import org.apache.beam.sdk.values.WindowedValue;
@@ -90,7 +91,8 @@ public <KeyT> void onTimer(
9091
BoundedWindow window,
9192
Instant timestamp,
9293
Instant outputTimestamp,
93-
TimeDomain timeDomain) {
94+
TimeDomain timeDomain,
95+
CausedByDrain causedByDrain) {
9496
throw new UnsupportedOperationException("User timers unsupported in ProcessFn");
9597
}
9698

runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.beam.sdk.state.TimeDomain;
2121
import org.apache.beam.sdk.transforms.DoFn;
2222
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
23+
import org.apache.beam.sdk.values.CausedByDrain;
2324
import org.apache.beam.sdk.values.WindowedValue;
2425
import org.joda.time.Instant;
2526

@@ -50,7 +51,8 @@ <KeyT> void onTimer(
5051
BoundedWindow window,
5152
Instant timestamp,
5253
Instant outputTimestamp,
53-
TimeDomain timeDomain);
54+
TimeDomain timeDomain,
55+
CausedByDrain causedByDrain);
5456

5557
/** Calls the underlying {@link DoFn.OnWindowExpiration} method. */
5658
<KeyT> void onWindowExpiration(BoundedWindow window, Instant outputTimestamp, KeyT key);

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.beam.sdk.util.SystemDoFnInternal;
5858
import org.apache.beam.sdk.util.UserCodeException;
5959
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
60+
import org.apache.beam.sdk.values.CausedByDrain;
6061
import org.apache.beam.sdk.values.PCollectionView;
6162
import org.apache.beam.sdk.values.Row;
6263
import org.apache.beam.sdk.values.TupleTag;
@@ -200,11 +201,13 @@ public <KeyT> void onTimer(
200201
BoundedWindow window,
201202
Instant timestamp,
202203
Instant outputTimestamp,
203-
TimeDomain timeDomain) {
204+
TimeDomain timeDomain,
205+
CausedByDrain causedByDrain) {
204206
Preconditions.checkNotNull(outputTimestamp, "outputTimestamp");
205207

206208
OnTimerArgumentProvider<KeyT> argumentProvider =
207-
new OnTimerArgumentProvider<>(timerId, key, window, timestamp, outputTimestamp, timeDomain);
209+
new OnTimerArgumentProvider<>(
210+
timerId, key, window, timestamp, outputTimestamp, timeDomain, causedByDrain);
208211
invoker.invokeOnTimer(timerId, timerFamilyId, argumentProvider);
209212
}
210213

@@ -399,6 +402,11 @@ public InputT element() {
399402
return elem.getValue();
400403
}
401404

405+
@Override
406+
public CausedByDrain causedByDrain() {
407+
return elem.causedByDrain();
408+
}
409+
402410
@Override
403411
public <T> T sideInput(PCollectionView<T> view) {
404412
checkNotNull(view, "View passed to sideInput cannot be null");
@@ -702,6 +710,7 @@ private class OnTimerArgumentProvider<KeyT> extends DoFn<InputT, OutputT>.OnTime
702710
private final TimeDomain timeDomain;
703711
private final String timerId;
704712
private final KeyT key;
713+
private final CausedByDrain causedByDrain;
705714
private final OutputBuilderSupplier builderSupplier;
706715

707716
/** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
@@ -727,28 +736,36 @@ private OnTimerArgumentProvider(
727736
BoundedWindow window,
728737
Instant fireTimestamp,
729738
Instant timestamp,
730-
TimeDomain timeDomain) {
739+
TimeDomain timeDomain,
740+
CausedByDrain causedByDrain) {
731741
fn.super();
732742
this.timerId = timerId;
733743
this.window = window;
734744
this.fireTimestamp = fireTimestamp;
735745
this.timestamp = timestamp;
736746
this.timeDomain = timeDomain;
737747
this.key = key;
748+
this.causedByDrain = causedByDrain;
738749
this.builderSupplier =
739750
OutputBuilderSuppliers.supplierForElement(
740751
WindowedValues.builder()
741752
.setValue(null)
742753
.setTimestamp(timestamp)
743754
.setWindow(window)
744-
.setPaneInfo(PaneInfo.NO_FIRING));
755+
.setPaneInfo(PaneInfo.NO_FIRING)
756+
.setCausedByDrain(causedByDrain));
745757
}
746758

747759
@Override
748760
public Instant timestamp() {
749761
return timestamp;
750762
}
751763

764+
@Override
765+
public CausedByDrain causedByDrain() {
766+
return causedByDrain;
767+
}
768+
752769
@Override
753770
public Instant fireTimestamp() {
754771
return fireTimestamp;

runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.beam.sdk.state.TimeDomain;
2525
import org.apache.beam.sdk.transforms.DoFn;
2626
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
27+
import org.apache.beam.sdk.values.CausedByDrain;
2728
import org.apache.beam.sdk.values.PCollectionView;
2829
import org.apache.beam.sdk.values.WindowedValue;
2930
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -115,8 +116,10 @@ public <KeyT> void onTimer(
115116
BoundedWindow window,
116117
Instant timestamp,
117118
Instant outputTimestamp,
118-
TimeDomain timeDomain) {
119-
underlying.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
119+
TimeDomain timeDomain,
120+
CausedByDrain causedByDrain) {
121+
underlying.onTimer(
122+
timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain, causedByDrain);
120123
}
121124

122125
@Override

runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ public String getErrorContext() {
605605
wakeupTime,
606606
wakeupTime,
607607
TimeDomain.PROCESSING_TIME,
608-
CausedByDrain.NORMAL));
608+
timer == null ? CausedByDrain.NORMAL : timer.causedByDrain()));
609609
}
610610

611611
private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapOptionsAsSetup(

runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
3939
import org.apache.beam.sdk.transforms.windowing.WindowFn;
4040
import org.apache.beam.sdk.util.WindowTracing;
41+
import org.apache.beam.sdk.values.CausedByDrain;
4142
import org.apache.beam.sdk.values.WindowedValue;
4243
import org.apache.beam.sdk.values.WindowedValues;
4344
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -48,7 +49,8 @@
4849
/**
4950
* A customized {@link DoFnRunner} that handles late data dropping and garbage collection for
5051
* stateful {@link DoFn DoFns}. It registers a GC timer in {@link #processElement(WindowedValue)}
51-
* and does cleanup in {@link #onTimer(String, String, BoundedWindow, Instant, Instant, TimeDomain)}
52+
* and does cleanup in {@link #onTimer(String, String, BoundedWindow, Instant, Instant, TimeDomain,
53+
* boolean)}
5254
*
5355
* @param <InputT> the type of the {@link DoFn} (main) input elements
5456
* @param <OutputT> the type of the {@link DoFn} (main) output elements
@@ -208,7 +210,8 @@ public <KeyT> void onTimer(
208210
BoundedWindow window,
209211
Instant timestamp,
210212
Instant outputTimestamp,
211-
TimeDomain timeDomain) {
213+
TimeDomain timeDomain,
214+
CausedByDrain causedByDrain) {
212215

213216
if (timerId.equals(SORT_FLUSH_TIMER)) {
214217
onSortFlushTimer(window, stepContext.timerInternals().currentInputWatermarkTime());
@@ -232,7 +235,14 @@ public <KeyT> void onTimer(
232235
stepContext.timerInternals().currentInputWatermarkTime());
233236
} else {
234237
doFnRunner.onTimer(
235-
timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
238+
timerId,
239+
timerFamilyId,
240+
key,
241+
window,
242+
timestamp,
243+
outputTimestamp,
244+
timeDomain,
245+
causedByDrain);
236246
}
237247
}
238248
}

runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ public void testOnTimerExceptionsWrappedAsUserCodeException() {
141141
GlobalWindow.INSTANCE,
142142
new Instant(0),
143143
new Instant(0),
144-
TimeDomain.EVENT_TIME);
144+
TimeDomain.EVENT_TIME,
145+
CausedByDrain.NORMAL);
145146
}
146147

147148
/**
@@ -266,7 +267,8 @@ public void testOnTimerCalled() {
266267
GlobalWindow.INSTANCE,
267268
currentTime.plus(offset),
268269
currentTime.plus(offset),
269-
TimeDomain.EVENT_TIME);
270+
TimeDomain.EVENT_TIME,
271+
CausedByDrain.CAUSED_BY_DRAIN);
270272

271273
assertThat(
272274
fn.onTimerInvocations,
@@ -277,7 +279,8 @@ public void testOnTimerCalled() {
277279
StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE),
278280
currentTime.plus(offset),
279281
currentTime.plus(offset),
280-
TimeDomain.EVENT_TIME)));
282+
TimeDomain.EVENT_TIME,
283+
CausedByDrain.CAUSED_BY_DRAIN)));
281284
}
282285

283286
/**
@@ -593,7 +596,8 @@ public void testOnTimerAllowedSkew() {
593596
GlobalWindow.INSTANCE,
594597
new Instant(0),
595598
new Instant(0),
596-
TimeDomain.EVENT_TIME);
599+
TimeDomain.EVENT_TIME,
600+
CausedByDrain.NORMAL);
597601
}
598602

599603
@Test
@@ -625,7 +629,8 @@ public void testOnTimerNoSkew() {
625629
GlobalWindow.INSTANCE,
626630
new Instant(0),
627631
new Instant(0),
628-
TimeDomain.EVENT_TIME);
632+
TimeDomain.EVENT_TIME,
633+
CausedByDrain.NORMAL);
629634
});
630635

631636
assertThat(exception.getCause(), isA(IllegalArgumentException.class));
@@ -703,7 +708,7 @@ public void onTimer(OnTimerContext context) {
703708
context.fireTimestamp(),
704709
context.timestamp(),
705710
context.timeDomain(),
706-
CausedByDrain.NORMAL));
711+
context.causedByDrain()));
707712
}
708713
}
709714

0 commit comments

Comments
 (0)