Skip to content

Commit d1d5469

Browse files
authored
[Java] Add ByteStringOutputStream.isEmpty() and reduce ByteStringOutputStream.size() calls (#37947)
1 parent b05b7f3 commit d1d5469

7 files changed

Lines changed: 32 additions & 19 deletions

File tree

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import static org.apache.beam.runners.dataflow.util.Structs.getBytes;
2121
import static org.apache.beam.runners.dataflow.util.Structs.getString;
22-
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2322

2423
import com.google.auto.service.AutoService;
2524
import java.io.IOException;
@@ -156,10 +155,10 @@ private PubsubWriter(String topic) {
156155

157156
@Override
158157
public long add(WindowedValue<T> data) throws IOException {
159-
checkState(
160-
stream.size() == 0,
161-
"Expected output stream to be empty but had %s",
162-
stream.toByteString());
158+
if (!stream.isEmpty()) {
159+
throw new IllegalStateException(
160+
"Expected output stream to be empty but was of size " + stream.size());
161+
}
163162
ByteString byteString = null;
164163
try {
165164
if (formatFn != null) {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ private WindmillStreamWriter(String destinationName) {
196196
}
197197

198198
private <EncodeT> ByteString encode(Coder<EncodeT> coder, EncodeT object) throws IOException {
199-
if (stream.size() != 0) {
199+
if (!stream.isEmpty()) {
200200
throw new IllegalStateException(
201201
"Expected output stream to be empty but had " + stream.toByteString());
202202
}

sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public void discard() {
237237
private Elements.Builder convertBufferForTransmission() {
238238
Elements.Builder bufferedElements = Elements.newBuilder();
239239
for (Map.Entry<String, Receiver<?>> entry : outputDataReceivers.entrySet()) {
240-
if (entry.getValue().bufferedSize() == 0) {
240+
if (!entry.getValue().hasBufferedOutput()) {
241241
continue;
242242
}
243243
ByteString bytes = entry.getValue().toByteStringAndResetBuffer();
@@ -248,7 +248,7 @@ private Elements.Builder convertBufferForTransmission() {
248248
.setData(bytes);
249249
}
250250
for (Map.Entry<TimerEndpoint, Receiver<?>> entry : outputTimersReceivers.entrySet()) {
251-
if (entry.getValue().bufferedSize() == 0) {
251+
if (!entry.getValue().hasBufferedOutput()) {
252252
continue;
253253
}
254254
ByteString bytes = entry.getValue().toByteStringAndResetBuffer();
@@ -340,10 +340,11 @@ public Receiver(Coder<T> coder) {
340340
public void accept(T input) throws Exception {
341341
int size = output.size();
342342
coder.encode(input, output);
343-
if (output.size() - size == 0) {
343+
long delta = (long) output.size() - size;
344+
if (delta == 0) {
344345
output.write(0);
346+
delta = 1;
345347
}
346-
final long delta = (long) output.size() - size;
347348
bytesWrittenSinceFlush += delta;
348349
perBundleByteCount += delta;
349350
perBundleElementCount += 1;
@@ -360,8 +361,8 @@ public long getElementCount() {
360361
return perBundleElementCount;
361362
}
362363

363-
public int bufferedSize() {
364-
return output.size();
364+
public boolean hasBufferedOutput() {
365+
return !output.isEmpty();
365366
}
366367

367368
public ByteString toByteStringAndResetBuffer() {

sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,8 @@ public ByteString toByteStringAndReset() {
162162
* Resets the output stream to be re-used possibly re-using any existing buffers.
163163
*/
164164
public void reset() {
165-
if (size() == 0) {
166-
return;
167-
}
168-
toByteStringAndReset();
165+
bufferPos = 0;
166+
result = ByteString.EMPTY;
169167
}
170168

171169
/**
@@ -216,6 +214,12 @@ public int size() {
216214
return result.size() + bufferPos;
217215
}
218216

217+
/** Returns if the output stream is currently empty. */
218+
@SuppressWarnings("ReferenceEquality")
219+
public boolean isEmpty() {
220+
return bufferPos == 0 && result == ByteString.EMPTY;
221+
}
222+
219223
@Override
220224
public Appendable append(@Nullable CharSequence csq) throws IOException {
221225
write(Preconditions.checkNotNull(csq).toString().getBytes(StandardCharsets.UTF_8));

sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.hamcrest.Matchers.empty;
2121
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertFalse;
2223
import static org.junit.Assert.fail;
2324

2425
import java.io.IOException;
@@ -146,7 +147,7 @@ public void testConfiguredBufferLimit() throws Exception {
146147
} else {
147148
receiver = Iterables.getOnlyElement(aggregator.outputDataReceivers.values());
148149
}
149-
assertEquals(0L, receiver.bufferedSize());
150+
assertFalse(receiver.hasBufferedOutput());
150151
assertEquals(102L, receiver.getByteCount());
151152
assertEquals(2L, receiver.getElementCount());
152153

@@ -156,7 +157,7 @@ public void testConfiguredBufferLimit() throws Exception {
156157
aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
157158
// Test that receiver stats have been reset after
158159
// sendOrCollectBufferedDataAndFinishOutboundStreams.
159-
assertEquals(0L, receiver.bufferedSize());
160+
assertFalse(receiver.hasBufferedOutput());
160161
assertEquals(0L, receiver.getByteCount());
161162
assertEquals(0L, receiver.getElementCount());
162163

sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.junit.Assert.assertEquals;
2121
import static org.junit.Assert.assertThrows;
22+
import static org.junit.Assert.assertTrue;
2223

2324
import java.io.ByteArrayOutputStream;
2425
import java.io.DataOutputStream;
@@ -145,6 +146,7 @@ public void testWriteBytesWithZeroInitialCapacity() throws Exception {
145146

146147
ByteStringOutputStream out = new ByteStringOutputStream(0);
147148
assertEquals(0, out.size());
149+
assertTrue(out.isEmpty());
148150

149151
for (int pos = 0; pos < testBuffer.length; ) {
150152
if (testBuffer[pos] == 0) {
@@ -157,8 +159,14 @@ public void testWriteBytesWithZeroInitialCapacity() throws Exception {
157159
}
158160
assertEquals(pos, out.size());
159161
}
162+
assertEquals(testBuffer.length == 0, out.isEmpty());
163+
assertEquals(testBuffer.length, out.size());
160164
assertEquals(UnsafeByteOperations.unsafeWrap(testBuffer), out.toByteString());
165+
assertEquals(testBuffer.length == 0, out.isEmpty());
166+
assertEquals(testBuffer.length, out.size());
161167
assertEquals(UnsafeByteOperations.unsafeWrap(testBuffer), out.toByteStringAndReset());
168+
assertTrue(out.isEmpty());
169+
assertEquals(0, out.size());
162170
}
163171
}
164172

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public void asyncClose() throws Exception {
155155
.setAppend(StateAppendRequest.newBuilder().setData(out.toByteStringAndReset())));
156156
}
157157
}
158-
if (out.size() > 0) {
158+
if (!out.isEmpty()) {
159159
beamFnStateClient.handle(
160160
request
161161
.toBuilder()

0 commit comments

Comments
 (0)