Skip to content

Commit eba1ec3

Browse files
authored
Bigtable: skip reading large rows (#37586)
* Bigtable: add experimental option to skip reading large rows in the pipeline * fix test * fix test
1 parent 695498f commit eba1ec3

4 files changed

Lines changed: 74 additions & 8 deletions

File tree

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ class BigtableServiceFactory implements Serializable {
5454
private static final String BIGTABLE_ENABLE_CLIENT_SIDE_METRICS =
5555
"bigtable_enable_client_side_metrics";
5656

57+
private static final String BIGTABLE_ENABLE_SKIP_LARGE_ROWS = "bigtable_enable_skip_large_rows";
58+
5759
@AutoValue
5860
abstract static class ConfigId implements Serializable {
5961

@@ -133,7 +135,10 @@ BigtableServiceEntry getServiceForReading(
133135
BigtableDataSettings.enableBuiltinMetrics();
134136
}
135137

136-
BigtableService service = new BigtableServiceImpl(settings);
138+
boolean skipLargeRows =
139+
ExperimentalOptions.hasExperiment(pipelineOptions, BIGTABLE_ENABLE_SKIP_LARGE_ROWS);
140+
141+
BigtableService service = new BigtableServiceImpl(settings, skipLargeRows);
137142
entry = BigtableServiceEntry.create(configId, service);
138143
entries.put(configId.id(), entry);
139144
refCounts.put(configId.id(), new AtomicInteger(1));

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,17 @@ class BigtableServiceImpl implements BigtableService {
104104
private static final double WATERMARK_PERCENTAGE = .1;
105105
private static final long MIN_BYTE_BUFFER_SIZE = 100 * 1024 * 1024; // 100MB
106106

107+
private final boolean skipLargeRows;
108+
107109
BigtableServiceImpl(BigtableDataSettings settings) throws IOException {
110+
this(settings, false);
111+
}
112+
113+
BigtableServiceImpl(BigtableDataSettings settings, boolean skipLargeRows) throws IOException {
108114
this.projectId = settings.getProjectId();
109115
this.instanceId = settings.getInstanceId();
110116
this.client = BigtableDataClient.create(settings);
117+
this.skipLargeRows = skipLargeRows;
111118
LOG.info("Started Bigtable service with settings {}", settings);
112119
}
113120

@@ -142,6 +149,7 @@ static class BigtableReaderImpl implements Reader {
142149
private ServerStream<Row> stream;
143150

144151
private boolean exhausted;
152+
private final boolean skipLargeRows;
145153

146154
@VisibleForTesting
147155
BigtableReaderImpl(
@@ -150,13 +158,15 @@ static class BigtableReaderImpl implements Reader {
150158
String instanceId,
151159
String tableId,
152160
List<ByteKeyRange> ranges,
153-
@Nullable RowFilter rowFilter) {
161+
@Nullable RowFilter rowFilter,
162+
boolean skipLargeRows) {
154163
this.client = client;
155164
this.projectId = projectId;
156165
this.instanceId = instanceId;
157166
this.tableId = tableId;
158167
this.ranges = ranges;
159168
this.rowFilter = rowFilter;
169+
this.skipLargeRows = skipLargeRows;
160170
}
161171

162172
@Override
@@ -173,11 +183,19 @@ public boolean start() throws IOException {
173183
if (rowFilter != null) {
174184
query.filter(Filters.FILTERS.fromProto(rowFilter));
175185
}
186+
176187
try {
177-
stream =
178-
client
179-
.readRowsCallable(new BigtableRowProtoAdapter())
180-
.call(query, GrpcCallContext.createDefault());
188+
if (skipLargeRows) {
189+
stream =
190+
client
191+
.skipLargeRowsCallable(new BigtableRowProtoAdapter())
192+
.call(query, GrpcCallContext.createDefault());
193+
} else {
194+
stream =
195+
client
196+
.readRowsCallable(new BigtableRowProtoAdapter())
197+
.call(query, GrpcCallContext.createDefault());
198+
}
181199
results = stream.iterator();
182200
serviceCallMetric.call("ok");
183201
} catch (StatusRuntimeException e) {
@@ -667,7 +685,8 @@ public Reader createReader(BigtableSource source) throws IOException {
667685
instanceId,
668686
source.getTableId().get(),
669687
source.getRanges(),
670-
source.getRowFilter());
688+
source.getRowFilter(),
689+
skipLargeRows);
671690
}
672691
}
673692

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
2828
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
2929
import com.google.cloud.bigtable.data.v2.models.RowMutation;
30+
import com.google.cloud.bigtable.data.v2.models.TableId;
3031
import java.io.IOException;
3132
import java.util.Date;
3233
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
3334
import org.apache.beam.sdk.Pipeline;
3435
import org.apache.beam.sdk.PipelineResult;
3536
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
3637
import org.apache.beam.sdk.metrics.Lineage;
38+
import org.apache.beam.sdk.options.ExperimentalOptions;
3739
import org.apache.beam.sdk.options.PipelineOptionsFactory;
3840
import org.apache.beam.sdk.testing.PAssert;
3941
import org.apache.beam.sdk.testing.TestPipeline;
@@ -148,6 +150,45 @@ public void testE2EBigtableSegmentRead() {
148150
checkLineageSourceMetric(r, tableId);
149151
}
150152

153+
@Test
154+
public void testE2EBigtableReadWithSkippingLargeRows() {
155+
tableAdminClient.createTable(CreateTableRequest.of(tableId).addFamily(COLUMN_FAMILY_NAME));
156+
157+
// Write a few rows first
158+
int numRows = 20;
159+
int numLargeRows = 3;
160+
// Each mutation can't exceed 100 MB. Break it down to 3 columns
161+
String value = StringUtils.repeat("v", 90 * 1000 * 1000);
162+
for (int i = 0; i < numLargeRows; i++) {
163+
for (int j = 0; j < 3; j++) {
164+
client.mutateRow(
165+
RowMutation.create(TableId.of(tableId), "large_row-" + i)
166+
.setCell(COLUMN_FAMILY_NAME, "q" + i, value));
167+
}
168+
}
169+
170+
for (int i = 0; i < numRows - numLargeRows; i++) {
171+
client.mutateRow(
172+
RowMutation.create(TableId.of(tableId), "row-" + i)
173+
.setCell(COLUMN_FAMILY_NAME, "q", "value"));
174+
}
175+
176+
ExperimentalOptions.addExperiment(
177+
options.as(ExperimentalOptions.class), "bigtable_enable_skip_large_rows");
178+
179+
Pipeline p = Pipeline.create(options);
180+
PCollection<Long> count =
181+
p.apply(
182+
BigtableIO.read()
183+
.withProjectId(project)
184+
.withInstanceId(options.getInstanceId())
185+
.withTableId(tableId))
186+
.apply(Count.globally());
187+
PAssert.thatSingleton(count).isEqualTo((long) numRows - numLargeRows);
188+
PipelineResult r = p.run();
189+
checkLineageSourceMetric(r, tableId);
190+
}
191+
151192
private void checkLineageSourceMetric(PipelineResult r, String tableId) {
152193
// TODO(https://github.com/apache/beam/issues/32071) test malformed,
153194
// when pipeline.run() is non-blocking, the metrics are not available by the time of query

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,8 @@ public void testRead() throws IOException {
179179
bigtableDataSettings.getInstanceId(),
180180
mockBigtableSource.getTableId().get(),
181181
mockBigtableSource.getRanges(),
182-
null);
182+
null,
183+
false);
183184

184185
underTest.start();
185186
Assert.assertEquals(expectedRow, underTest.getCurrentRow());

0 commit comments

Comments
 (0)