Skip to content

Commit 4739405

Browse files
authored
Merge pull request #37727: [Drain] Reduce Fn Runner changes
2 parents fcbb034 + e761581 commit 4739405

14 files changed

Lines changed: 280 additions & 53 deletions

File tree

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,15 @@ public <T> void output(TupleTag<T> tag, T value) {
438438
@Override
439439
public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
440440
outputReceiver.output(
441-
tag, WindowedValues.of(value, timestamp, element.getWindows(), element.getPaneInfo()));
441+
tag,
442+
WindowedValues.of(
443+
value,
444+
timestamp,
445+
element.getWindows(),
446+
element.getPaneInfo(),
447+
element.getRecordId(),
448+
element.getRecordOffset(),
449+
element.causedByDrain()));
442450
}
443451

444452
@Override
@@ -452,7 +460,16 @@ public <T> void outputWindowedValue(
452460
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
453461
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
454462
}
455-
outputReceiver.output(tag, WindowedValues.of(value, timestamp, windows, paneInfo));
463+
outputReceiver.output(
464+
tag,
465+
WindowedValues.of(
466+
value,
467+
timestamp,
468+
windows,
469+
paneInfo,
470+
element.getRecordId(),
471+
element.getRecordOffset(),
472+
element.causedByDrain()));
456473
}
457474

458475
private void noteOutput() {

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFn.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.beam.sdk.state.Timers;
2323
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2424
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
25+
import org.apache.beam.sdk.values.CausedByDrain;
2526
import org.apache.beam.sdk.values.WindowingStrategy;
2627
import org.joda.time.Instant;
2728

@@ -59,6 +60,7 @@ public abstract class ProcessValueContext extends Context {
5960
/** Return the actual value being processed. */
6061
public abstract InputT value();
6162

63+
public abstract CausedByDrain causedByDrain();
6264
/** Return the timestamp associated with the value. */
6365
public abstract Instant timestamp();
6466
}
@@ -75,6 +77,7 @@ public abstract class OnTriggerContext extends Context {
7577
/** Returns the {@link PaneInfo} for the trigger firing being processed. */
7678
public abstract PaneInfo paneInfo();
7779

80+
public abstract CausedByDrain causedByDrain();
7881
/** Output the given value in the current window. */
7982
public abstract void output(OutputT value);
8083
}

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,19 @@ public ReduceFn<K, InputT, OutputT, W>.Context base(W window, StateStyle style)
9999
}
100100

101101
public ReduceFn<K, InputT, OutputT, W>.ProcessValueContext forValue(
102-
W window, InputT value, Instant timestamp, StateStyle style) {
103-
return new ProcessValueContextImpl(stateAccessor(window, style), value, timestamp);
102+
W window, InputT value, Instant timestamp, StateStyle style, CausedByDrain causedByDrain) {
103+
return new ProcessValueContextImpl(
104+
stateAccessor(window, style), value, timestamp, causedByDrain);
104105
}
105106

106107
public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(
107-
W window, PaneInfo paneInfo, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
108-
return new OnTriggerContextImpl(stateAccessor(window, style), paneInfo, callbacks);
108+
W window,
109+
PaneInfo paneInfo,
110+
StateStyle style,
111+
OnTriggerCallbacks<OutputT> callbacks,
112+
CausedByDrain causedByDrain) {
113+
return new OnTriggerContextImpl(
114+
stateAccessor(window, style), paneInfo, callbacks, causedByDrain);
109115
}
110116

111117
public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge(
@@ -358,14 +364,19 @@ private class ProcessValueContextImpl
358364
private final Instant timestamp;
359365
private final StateAccessorImpl<K, W> state;
360366
private final TimersImpl timers;
367+
private final CausedByDrain causedByDrain;
361368

362369
private ProcessValueContextImpl(
363-
StateAccessorImpl<K, W> state, InputT value, Instant timestamp) {
370+
StateAccessorImpl<K, W> state,
371+
InputT value,
372+
Instant timestamp,
373+
CausedByDrain causedByDrain) {
364374
reduceFn.super();
365375
this.state = state;
366376
this.value = value;
367377
this.timestamp = timestamp;
368378
this.timers = new TimersImpl(state.namespace());
379+
this.causedByDrain = causedByDrain;
369380
}
370381

371382
@Override
@@ -393,6 +404,11 @@ public InputT value() {
393404
return value;
394405
}
395406

407+
@Override
408+
public CausedByDrain causedByDrain() {
409+
return causedByDrain;
410+
}
411+
396412
@Override
397413
public Instant timestamp() {
398414
return timestamp;
@@ -409,14 +425,19 @@ private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTri
409425
private final PaneInfo paneInfo;
410426
private final OnTriggerCallbacks<OutputT> callbacks;
411427
private final TimersImpl timers;
428+
private final CausedByDrain causedByDrain;
412429

413430
private OnTriggerContextImpl(
414-
StateAccessorImpl<K, W> state, PaneInfo paneInfo, OnTriggerCallbacks<OutputT> callbacks) {
431+
StateAccessorImpl<K, W> state,
432+
PaneInfo paneInfo,
433+
OnTriggerCallbacks<OutputT> callbacks,
434+
CausedByDrain causedByDrain) {
415435
reduceFn.super();
416436
this.state = state;
417437
this.paneInfo = paneInfo;
418438
this.callbacks = callbacks;
419439
this.timers = new TimersImpl(state.namespace());
440+
this.causedByDrain = causedByDrain;
420441
}
421442

422443
@Override
@@ -444,6 +465,11 @@ public PaneInfo paneInfo() {
444465
return paneInfo;
445466
}
446467

468+
@Override
469+
public CausedByDrain causedByDrain() {
470+
return causedByDrain;
471+
}
472+
447473
@Override
448474
public void output(OutputT value) {
449475
callbacks.output(value);

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.beam.sdk.transforms.windowing.WindowFn;
4949
import org.apache.beam.sdk.util.WindowTracing;
5050
import org.apache.beam.sdk.util.WindowedValueReceiver;
51+
import org.apache.beam.sdk.values.CausedByDrain;
5152
import org.apache.beam.sdk.values.KV;
5253
import org.apache.beam.sdk.values.PCollection;
5354
import org.apache.beam.sdk.values.WindowedValue;
@@ -374,7 +375,8 @@ public void processElements(Iterable<WindowedValue<InputT>> values) throws Excep
374375
for (W window : windowsToFire) {
375376
emit(
376377
contextFactory.base(window, StateStyle.DIRECT),
377-
contextFactory.base(window, StateStyle.RENAMED));
378+
contextFactory.base(window, StateStyle.RENAMED),
379+
null);
378380
}
379381

380382
// We're all done with merging and emitting elements so can compress the activeWindow state.
@@ -583,7 +585,11 @@ private void processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT>
583585
for (W window : windows) {
584586
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext =
585587
contextFactory.forValue(
586-
window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
588+
window,
589+
value.getValue(),
590+
value.getTimestamp(),
591+
StateStyle.DIRECT,
592+
value.causedByDrain());
587593
if (triggerRunner.isClosed(directContext.state())) {
588594
// This window has already been closed.
589595
droppedDueToClosedWindow.inc();
@@ -601,7 +607,11 @@ private void processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT>
601607
activeWindows.ensureWindowIsActive(window);
602608
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext =
603609
contextFactory.forValue(
604-
window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
610+
window,
611+
value.getValue(),
612+
value.getTimestamp(),
613+
StateStyle.RENAMED,
614+
value.causedByDrain());
605615

606616
nonEmptyPanes.recordContent(renamedContext.state());
607617
scheduleGarbageCollectionTimer(directContext);
@@ -639,12 +649,15 @@ private class WindowActivation {
639649
// garbage collect the window. We'll consider any timer at or after the
640650
// end-of-window time to be a signal to garbage collect.
641651
public final boolean isGarbageCollection;
652+
public final CausedByDrain causedByDrain;
642653

643654
WindowActivation(
644655
ReduceFn<K, InputT, OutputT, W>.Context directContext,
645-
ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
656+
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
657+
CausedByDrain causedByDrain) {
646658
this.directContext = directContext;
647659
this.renamedContext = renamedContext;
660+
this.causedByDrain = causedByDrain;
648661
W window = directContext.window();
649662

650663
// The output watermark is before the end of the window if it is either unknown
@@ -701,12 +714,13 @@ public void onTimers(Iterable<TimerData> timers) throws Exception {
701714

702715
WindowTracing.debug(
703716
"ReduceFnRunner: Received timer key:{}; window:{}; data:{} with "
704-
+ "inputWatermark:{}; outputWatermark:{}",
717+
+ "inputWatermark:{}; outputWatermark:{}; draining:{}",
705718
key,
706719
window,
707720
timer,
708721
timerInternals.currentInputWatermarkTime(),
709-
timerInternals.currentOutputWatermarkTime());
722+
timerInternals.currentOutputWatermarkTime(),
723+
timer.causedByDrain());
710724

711725
// Processing time timers for an expired window are ignored, just like elements
712726
// that show up too late. Window GC is management by an event time timer
@@ -722,11 +736,13 @@ public void onTimers(Iterable<TimerData> timers) throws Exception {
722736
continue;
723737
}
724738

739+
// pass draining bit
725740
ReduceFn<K, InputT, OutputT, W>.Context directContext =
726741
contextFactory.base(window, StateStyle.DIRECT);
727742
ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
728743
contextFactory.base(window, StateStyle.RENAMED);
729-
WindowActivation windowActivation = new WindowActivation(directContext, renamedContext);
744+
WindowActivation windowActivation =
745+
new WindowActivation(directContext, renamedContext, timer.causedByDrain());
730746
windowActivations.put(window, windowActivation);
731747

732748
// Perform prefetching of state to determine if the trigger should fire.
@@ -757,11 +773,12 @@ public void onTimers(Iterable<TimerData> timers) throws Exception {
757773

758774
if (windowActivation.isGarbageCollection) {
759775
WindowTracing.debug(
760-
"ReduceFnRunner: Cleaning up for key:{}; window:{} with inputWatermark:{}; outputWatermark:{}",
776+
"ReduceFnRunner: Cleaning up for key:{}; window:{} with inputWatermark:{}; outputWatermark:{}; draining:{}",
761777
key,
762778
directContext.window(),
763779
timerInternals.currentInputWatermarkTime(),
764-
timerInternals.currentOutputWatermarkTime());
780+
timerInternals.currentOutputWatermarkTime(),
781+
windowActivation.causedByDrain);
765782

766783
boolean windowIsActiveAndOpen = windowActivation.windowIsActiveAndOpen();
767784
if (windowIsActiveAndOpen) {
@@ -774,7 +791,8 @@ public void onTimers(Iterable<TimerData> timers) throws Exception {
774791
directContext,
775792
renamedContext,
776793
true /* isFinished */,
777-
windowActivation.isEndOfWindow);
794+
windowActivation.isEndOfWindow,
795+
windowActivation.causedByDrain);
778796
checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold);
779797
}
780798

@@ -792,7 +810,7 @@ public void onTimers(Iterable<TimerData> timers) throws Exception {
792810
if (windowActivation.windowIsActiveAndOpen()
793811
&& triggerRunner.shouldFire(
794812
directContext.window(), directContext.timers(), directContext.state())) {
795-
emit(directContext, renamedContext);
813+
emit(directContext, renamedContext, windowActivation.causedByDrain);
796814
}
797815

798816
if (windowActivation.isEndOfWindow) {
@@ -915,7 +933,8 @@ private void prefetchEmit(
915933
/** Emit if a trigger is ready to fire or timers require it, and cleanup state. */
916934
private void emit(
917935
ReduceFn<K, InputT, OutputT, W>.Context directContext,
918-
ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
936+
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
937+
CausedByDrain causedByDrain)
919938
throws Exception {
920939
checkState(
921940
triggerRunner.shouldFire(
@@ -931,7 +950,7 @@ private void emit(
931950
// Run onTrigger to produce the actual pane contents.
932951
// As a side effect it will clear all element holds, but not necessarily any
933952
// end-of-window or garbage collection holds.
934-
onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/);
953+
onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/, causedByDrain);
935954

936955
// Now that we've triggered, the pane is empty.
937956
nonEmptyPanes.clearPane(renamedContext.state());
@@ -989,7 +1008,8 @@ private void prefetchOnTrigger(
9891008
final ReduceFn<K, InputT, OutputT, W>.Context directContext,
9901009
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
9911010
final boolean isFinished,
992-
boolean isEndOfWindow)
1011+
boolean isEndOfWindow,
1012+
CausedByDrain causedByDrain)
9931013
throws Exception {
9941014
// Extract the window hold, and as a side effect clear it.
9951015
final WatermarkHold.OldAndNewHolds pair =
@@ -1061,10 +1081,12 @@ private void prefetchOnTrigger(
10611081
.setValue(KV.of(key, toOutput))
10621082
.setTimestamp(outputTimestamp)
10631083
.setWindows(windows)
1084+
.setCausedByDrain(causedByDrain)
10641085
.setPaneInfo(paneInfo)
10651086
.setReceiver(outputter)
10661087
.output();
1067-
});
1088+
},
1089+
causedByDrain);
10681090

10691091
reduceFn.onTrigger(renamedTriggerContext);
10701092
}

runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,7 @@ public String getErrorContext() {
599599
timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
600600
holdState.add(futureOutputWatermark);
601601
// Set a timer to continue processing this element.
602+
// todo radoslws@ decide if draining should be set on timer
602603
timerInternals.setTimer(
603604
TimerInternals.TimerData.of(
604605
stateNamespace,

0 commit comments

Comments
 (0)