3939import org .apache .beam .sdk .metrics .Metrics ;
4040import org .apache .beam .sdk .options .PipelineOptions ;
4141import org .apache .beam .sdk .state .TimeDomain ;
42+ import org .apache .beam .sdk .state .ValueState ;
4243import org .apache .beam .sdk .transforms .windowing .BoundedWindow ;
4344import org .apache .beam .sdk .transforms .windowing .PaneInfo ;
4445import org .apache .beam .sdk .transforms .windowing .PaneInfo .Timing ;
@@ -107,6 +108,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
107108 * <li>It uses discarding or accumulation mode according to the {@link WindowingStrategy}.
108109 * </ul>
109110 */
111+ static final StateTag <ValueState <CombinedMetadata >> METADATA_TAG =
112+ StateTags .makeSystemTagInternal (
113+ StateTags .value ("combinedMetadata" , CombinedMetadata .Coder .of ()));
114+
110115 private final WindowingStrategy <Object , W > windowingStrategy ;
111116
112117 private final WindowedValueReceiver <KV <K , OutputT >> outputter ;
@@ -376,7 +381,7 @@ public void processElements(Iterable<WindowedValue<InputT>> values) throws Excep
376381 emit (
377382 contextFactory .base (window , StateStyle .DIRECT ),
378383 contextFactory .base (window , StateStyle .RENAMED ),
379- CausedByDrain . NORMAL );
384+ CombinedMetadata . createDefault () );
380385 }
381386
382387 // We're all done with merging and emitting elements so can compress the activeWindow state.
@@ -590,6 +595,17 @@ private void processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT>
590595 value .getTimestamp (),
591596 StateStyle .DIRECT ,
592597 value .causedByDrain ());
598+
599+ ValueState <CombinedMetadata > metadataState = directContext .state ().access (METADATA_TAG );
600+ CombinedMetadata currentMetadata = metadataState .read ();
601+ if (currentMetadata == null ) {
602+ currentMetadata = CombinedMetadata .createDefault ();
603+ }
604+ CombinedMetadata inputMetadata = CombinedMetadata .create (value .causedByDrain ());
605+ CombinedMetadata newMetadata =
606+ CombinedMetadataCombiner .of ().addInput (currentMetadata , inputMetadata );
607+ metadataState .write (newMetadata );
608+
593609 if (triggerRunner .isClosed (directContext .state ())) {
594610 // This window has already been closed.
595611 droppedDueToClosedWindow .inc ();
@@ -792,7 +808,7 @@ public void onTimers(Iterable<TimerData> timers) throws Exception {
792808 renamedContext ,
793809 true /* isFinished */ ,
794810 windowActivation .isEndOfWindow ,
795- windowActivation .causedByDrain );
811+ CombinedMetadata . create ( windowActivation .causedByDrain ) );
796812 checkState (newHold == null , "Hold placed at %s despite isFinished being true." , newHold );
797813 }
798814
@@ -810,7 +826,10 @@ public void onTimers(Iterable<TimerData> timers) throws Exception {
810826 if (windowActivation .windowIsActiveAndOpen ()
811827 && triggerRunner .shouldFire (
812828 directContext .window (), directContext .timers (), directContext .state ())) {
813- emit (directContext , renamedContext , windowActivation .causedByDrain );
829+ emit (
830+ directContext ,
831+ renamedContext ,
832+ CombinedMetadata .create (windowActivation .causedByDrain ));
814833 }
815834
816835 if (windowActivation .isEndOfWindow ) {
@@ -874,6 +893,7 @@ private void clearAllState(
874893 triggerRunner .clearState (
875894 directContext .window (), directContext .timers (), directContext .state ());
876895 paneInfoTracker .clear (directContext .state ());
896+ directContext .state ().access (METADATA_TAG ).clear ();
877897 } else {
878898 // If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2).
879899 // For (1), if !activeWindows.isActive then the window must be merging and has been
@@ -934,8 +954,9 @@ private void prefetchEmit(
934954 private void emit (
935955 ReduceFn <K , InputT , OutputT , W >.Context directContext ,
936956 ReduceFn <K , InputT , OutputT , W >.Context renamedContext ,
937- CausedByDrain causedByDrain )
957+ CombinedMetadata metadata )
938958 throws Exception {
959+
939960 checkState (
940961 triggerRunner .shouldFire (
941962 directContext .window (), directContext .timers (), directContext .state ()));
@@ -950,13 +971,14 @@ private void emit(
950971 // Run onTrigger to produce the actual pane contents.
951972 // As a side effect it will clear all element holds, but not necessarily any
952973 // end-of-window or garbage collection holds.
953- onTrigger (directContext , renamedContext , isFinished , false /*isEndOfWindow*/ , causedByDrain );
974+ onTrigger (directContext , renamedContext , isFinished , false /*isEndOfWindow*/ , metadata );
954975
955976 // Now that we've triggered, the pane is empty.
956977 nonEmptyPanes .clearPane (renamedContext .state ());
957978
958979 // Cleanup buffered data if appropriate
959980 if (shouldDiscard ) {
981+ directContext .state ().access (METADATA_TAG ).clear ();
960982 // Cleanup flavor C: The user does not want any buffered data to persist between panes.
961983 reduceFn .clearState (renamedContext );
962984 }
@@ -1009,8 +1031,19 @@ private void prefetchOnTrigger(
10091031 ReduceFn <K , InputT , OutputT , W >.Context renamedContext ,
10101032 final boolean isFinished ,
10111033 boolean isEndOfWindow ,
1012- CausedByDrain causedByDrain )
1034+ CombinedMetadata metadata )
10131035 throws Exception {
1036+ ValueState <CombinedMetadata > metadataState = directContext .state ().access (METADATA_TAG );
1037+ CombinedMetadata aggregatedMetadata = metadataState .read ();
1038+ if (aggregatedMetadata == null ) {
1039+ aggregatedMetadata = CombinedMetadata .createDefault ();
1040+ }
1041+ CombinedMetadata fullyAggregatedMetadata =
1042+ CombinedMetadataCombiner .of ().addInput (aggregatedMetadata , metadata );
1043+ final CausedByDrain aggregatedCausedByDrain = fullyAggregatedMetadata .causedByDrain ();
1044+ if (isFinished ) {
1045+ metadataState .clear ();
1046+ }
10141047 // Extract the window hold, and as a side effect clear it.
10151048 final WatermarkHold .OldAndNewHolds pair =
10161049 watermarkHold .extractAndRelease (renamedContext , isFinished ).read ();
@@ -1081,12 +1114,12 @@ private void prefetchOnTrigger(
10811114 .setValue (KV .of (key , toOutput ))
10821115 .setTimestamp (outputTimestamp )
10831116 .setWindows (windows )
1084- .setCausedByDrain (causedByDrain )
1117+ .setCausedByDrain (aggregatedCausedByDrain )
10851118 .setPaneInfo (paneInfo )
10861119 .setReceiver (outputter )
10871120 .output ();
10881121 },
1089- causedByDrain );
1122+ aggregatedCausedByDrain );
10901123
10911124 reduceFn .onTrigger (renamedTriggerContext );
10921125 }
0 commit comments