|
29 | 29 | import org.junit.runner.RunWith; |
30 | 30 | import org.junit.runners.JUnit4; |
31 | 31 |
|
| 32 | +@RunWith(JUnit4.class) |
32 | 33 | public class MetadataPropagationTest { |
33 | 34 |
|
34 | | - @RunWith(JUnit4.class) |
35 | | - public static class MiscTest { |
| 35 | + /** Tests for metadata propagation. */ |
| 36 | + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); |
36 | 37 |
|
37 | | - /** Tests for metadata propagation. */ |
38 | | - @Rule public final transient TestPipeline pipeline = TestPipeline.create(); |
39 | | - |
40 | | - static class CausedByDrainSettingDoFn extends DoFn<Integer, String> { |
41 | | - @ProcessElement |
42 | | - public void process(OutputReceiver<String> r) { |
43 | | - r.builder("value").setCausedByDrain(CausedByDrain.CAUSED_BY_DRAIN).output(); |
44 | | - } |
| 38 | + static class CausedByDrainSettingDoFn extends DoFn<Integer, String> { |
| 39 | + @ProcessElement |
| 40 | + public void process(OutputReceiver<String> r) { |
| 41 | + r.builder("value").setCausedByDrain(CausedByDrain.CAUSED_BY_DRAIN).output(); |
45 | 42 | } |
| 43 | + } |
46 | 44 |
|
47 | | - static class CausedByDrainExtractingDoFn extends DoFn<String, String> { |
48 | | - @ProcessElement |
49 | | - public void process(ProcessContext pc, OutputReceiver<String> r) { |
50 | | - r.output(pc.causedByDrain().toString()); |
51 | | - } |
| 45 | + static class CausedByDrainExtractingDoFn extends DoFn<String, String> { |
| 46 | + @ProcessElement |
| 47 | + public void process(ProcessContext pc, OutputReceiver<String> r) { |
| 48 | + r.output(pc.causedByDrain().toString()); |
52 | 49 | } |
| 50 | + } |
53 | 51 |
|
54 | | - @Test |
55 | | - @Category(NeedsRunner.class) |
56 | | - public void testMetadataPropagationAcrossShuffleParameter() { |
57 | | - WindowedValues.WindowedValueCoder.setMetadataSupported(); |
58 | | - PCollection<String> results = |
59 | | - pipeline |
60 | | - .apply(Create.of(1)) |
61 | | - .apply(ParDo.of(new CausedByDrainSettingDoFn())) |
62 | | - .apply(Redistribute.arbitrarily()) |
63 | | - .apply(ParDo.of(new CausedByDrainExtractingDoFn())); |
| 52 | + @Test |
| 53 | + @Category(NeedsRunner.class) |
| 54 | + public void testMetadataPropagationAcrossShuffleParameter() { |
| 55 | + WindowedValues.WindowedValueCoder.setMetadataSupported(); |
| 56 | + PCollection<String> results = |
| 57 | + pipeline |
| 58 | + .apply(Create.of(1)) |
| 59 | + .apply(ParDo.of(new CausedByDrainSettingDoFn())) |
| 60 | + .apply(Redistribute.arbitrarily()) |
| 61 | + .apply(ParDo.of(new CausedByDrainExtractingDoFn())); |
64 | 62 |
|
65 | | - PAssert.that(results).containsInAnyOrder("CAUSED_BY_DRAIN"); |
| 63 | + PAssert.that(results).containsInAnyOrder("CAUSED_BY_DRAIN"); |
66 | 64 |
|
67 | | - pipeline.run(); |
68 | | - } |
| 65 | + pipeline.run(); |
| 66 | + } |
69 | 67 |
|
70 | | - @Test |
71 | | - @Category(NeedsRunner.class) |
72 | | - public void testMetadataPropagationParameter() { |
73 | | - PCollection<String> results = |
74 | | - pipeline |
75 | | - .apply(Create.of(1)) |
76 | | - .apply(ParDo.of(new CausedByDrainSettingDoFn())) |
77 | | - .apply(ParDo.of(new CausedByDrainExtractingDoFn())); |
| 68 | + @Test |
| 69 | + @Category(NeedsRunner.class) |
| 70 | + public void testMetadataPropagationParameter() { |
| 71 | + PCollection<String> results = |
| 72 | + pipeline |
| 73 | + .apply(Create.of(1)) |
| 74 | + .apply(ParDo.of(new CausedByDrainSettingDoFn())) |
| 75 | + .apply(ParDo.of(new CausedByDrainExtractingDoFn())); |
78 | 76 |
|
79 | | - PAssert.that(results).containsInAnyOrder("CAUSED_BY_DRAIN"); |
| 77 | + PAssert.that(results).containsInAnyOrder("CAUSED_BY_DRAIN"); |
80 | 78 |
|
81 | | - pipeline.run(); |
82 | | - } |
| 79 | + pipeline.run(); |
83 | 80 | } |
84 | 81 | } |
0 commit comments