Skip to content

Commit b1dfea2

Browse files
committed
Fix StreamingDataflowWorkerTest, new tag is added
1 parent f49c628 commit b1dfea2

File tree

1 file changed

+22
-2
lines changed

1 file changed

+22
-2
lines changed

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1807,6 +1807,7 @@ public void testMergeWindows() throws Exception {
18071807
String timerTagPrefix = "/s" + window + "+0";
18081808
ByteString bufferTag = ByteString.copyFromUtf8(window + "+ubuf");
18091809
ByteString paneInfoTag = ByteString.copyFromUtf8(window + "+upaneInfo");
1810+
ByteString pipelineMetadataTag = ByteString.copyFromUtf8(window + "+upipelineMetadata");
18101811
String watermarkDataHoldTag = window + "+uhold";
18111812
String watermarkExtraHoldTag = window + "+uextra";
18121813
String stateFamily = "MergeWindows";
@@ -1946,11 +1947,20 @@ public void testMergeWindows() throws Exception {
19461947
assertThat(
19471948
"" + actualOutput.getValueUpdatesList(),
19481949
actualOutput.getValueUpdatesList(),
1949-
Matchers.contains(
1950+
Matchers.containsInAnyOrder(
19501951
Matchers.equalTo(
19511952
Windmill.TagValue.newBuilder()
19521953
.setTag(paneInfoTag)
19531954
.setStateFamily(stateFamily)
1955+
.setValue(
1956+
Windmill.Value.newBuilder()
1957+
.setTimestamp(Long.MAX_VALUE)
1958+
.setData(ByteString.EMPTY))
1959+
.build()),
1960+
Matchers.equalTo(
1961+
Windmill.TagValue.newBuilder()
1962+
.setTag(pipelineMetadataTag)
1963+
.setStateFamily(stateFamily)
19541964
.setValue(
19551965
Windmill.Value.newBuilder()
19561966
.setTimestamp(Long.MAX_VALUE)
@@ -2097,6 +2107,7 @@ public void testMergeWindowsCaching() throws Exception {
20972107
String timerTagPrefix = "/s" + window + "+0";
20982108
ByteString bufferTag = ByteString.copyFromUtf8(window + "+ubuf");
20992109
ByteString paneInfoTag = ByteString.copyFromUtf8(window + "+upaneInfo");
2110+
ByteString pipelineMetadataTag = ByteString.copyFromUtf8(window + "+upipelineMetadata");
21002111
String watermarkDataHoldTag = window + "+uhold";
21012112
String watermarkExtraHoldTag = window + "+uextra";
21022113
String stateFamily = "MergeWindows";
@@ -2236,11 +2247,20 @@ public void testMergeWindowsCaching() throws Exception {
22362247
assertThat(
22372248
"" + actualOutput.getValueUpdatesList(),
22382249
actualOutput.getValueUpdatesList(),
2239-
Matchers.contains(
2250+
Matchers.containsInAnyOrder(
22402251
Matchers.equalTo(
22412252
Windmill.TagValue.newBuilder()
22422253
.setTag(paneInfoTag)
22432254
.setStateFamily(stateFamily)
2255+
.setValue(
2256+
Windmill.Value.newBuilder()
2257+
.setTimestamp(Long.MAX_VALUE)
2258+
.setData(ByteString.EMPTY))
2259+
.build()),
2260+
Matchers.equalTo(
2261+
Windmill.TagValue.newBuilder()
2262+
.setTag(pipelineMetadataTag)
2263+
.setStateFamily(stateFamily)
22442264
.setValue(
22452265
Windmill.Value.newBuilder()
22462266
.setTimestamp(Long.MAX_VALUE)

0 commit comments

Comments
 (0)