Skip to content

Commit d6fd069

Browse files
authored
[Iceberg] Support nested column paths in keep/drop configuration (#37516)
* [Iceberg] Support nested column paths in keep/drop configuration This change fixes the validation logic in IcebergScanConfig to support nested column paths using dot notation (e.g., "data.name"). Previously, the validation only checked top-level column names, causing nested paths like "colA.colB" to fail with "unknown field(s)" error. The fix uses Iceberg's Schema.findField() which natively resolves dot-notation paths for nested fields. Fixes #37486 * [Iceberg] Also fix drop functionality for nested column paths - Use TypeUtil.indexByName() to enumerate all field paths - Only select leaf fields to prevent parent struct from including dropped children - Add test for nested drop validation * Apply Spotless formatting * Rework nested column pruning to match Beam code style - Replace inline fully-qualified names with proper imports - Use TypeUtil.indexNameById (canonical paths only) instead of indexByName (which includes short aliases that break drop logic) - Remove verbose line-by-line comments - Consolidate tests into a single method using sameSchema() assertions to match the existing testProjectedSchema() pattern
1 parent b2e97de commit d6fd069

2 files changed

Lines changed: 43 additions & 7 deletions

File tree

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
import com.google.auto.value.AutoValue;
2525
import java.io.Serializable;
2626
import java.util.ArrayList;
27+
import java.util.Collections;
2728
import java.util.List;
2829
import java.util.Set;
29-
import java.util.stream.Collectors;
3030
import org.apache.beam.sdk.io.iceberg.IcebergIO.ReadRows.StartingStrategy;
3131
import org.apache.beam.sdk.schemas.Schema;
3232
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
@@ -37,7 +37,7 @@
3737
import org.apache.iceberg.catalog.TableIdentifier;
3838
import org.apache.iceberg.expressions.Evaluator;
3939
import org.apache.iceberg.expressions.Expression;
40-
import org.apache.iceberg.types.Types;
40+
import org.apache.iceberg.types.TypeUtil;
4141
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
4242
import org.checkerframework.checker.nullness.qual.Nullable;
4343
import org.checkerframework.dataflow.qual.Pure;
@@ -93,10 +93,16 @@ static org.apache.iceberg.Schema resolveSchema(
9393
if (keep != null && !keep.isEmpty()) {
9494
selectedFieldsBuilder.addAll(keep);
9595
} else if (drop != null && !drop.isEmpty()) {
96-
Set<String> fields =
97-
schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toSet());
98-
drop.forEach(fields::remove);
99-
selectedFieldsBuilder.addAll(fields);
96+
List<String> paths = new ArrayList<>(TypeUtil.indexNameById(schema.asStruct()).values());
97+
Collections.sort(paths);
98+
for (int i = 0; i < paths.size(); i++) {
99+
String path = paths.get(i);
100+
boolean isParent = i + 1 < paths.size() && paths.get(i + 1).startsWith(path + ".");
101+
boolean isDrop = drop.stream().anyMatch(d -> path.equals(d) || path.startsWith(d + "."));
102+
if (!isParent && !isDrop) {
103+
selectedFieldsBuilder.add(path);
104+
}
105+
}
100106
} else {
101107
// default: include all columns
102108
return schema;
@@ -327,7 +333,7 @@ void validate(Table table) {
327333
param = "drop";
328334
fieldsSpecified = newHashSet(checkNotNull(drop));
329335
}
330-
table.schema().columns().forEach(nf -> fieldsSpecified.remove(nf.name()));
336+
fieldsSpecified.removeIf(name -> table.schema().findField(name) != null);
331337

332338
checkArgument(
333339
fieldsSpecified.isEmpty(),

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,36 @@ public void testProjectedSchema() {
273273
assertTrue(projectKeep.sameSchema(expectedKeep));
274274
}
275275

276+
@Test
277+
public void testProjectedSchemaWithNestedFields() {
278+
org.apache.iceberg.Schema schema =
279+
new org.apache.iceberg.Schema(
280+
required(1, "id", StringType.get()),
281+
required(
282+
2,
283+
"data",
284+
StructType.of(
285+
required(3, "name", StringType.get()), required(4, "value", StringType.get()))),
286+
required(5, "metadata", StringType.get()));
287+
288+
// test nested keep
289+
org.apache.iceberg.Schema projectKeep = resolveSchema(schema, asList("id", "data.name"), null);
290+
org.apache.iceberg.Schema expectedKeep =
291+
new org.apache.iceberg.Schema(
292+
required(1, "id", StringType.get()),
293+
required(2, "data", StructType.of(required(3, "name", StringType.get()))));
294+
assertTrue(projectKeep.sameSchema(expectedKeep));
295+
296+
// test nested drop
297+
org.apache.iceberg.Schema projectDrop = resolveSchema(schema, null, asList("data.name"));
298+
org.apache.iceberg.Schema expectedDrop =
299+
new org.apache.iceberg.Schema(
300+
required(1, "id", StringType.get()),
301+
required(2, "data", StructType.of(required(4, "value", StringType.get()))),
302+
required(5, "metadata", StringType.get()));
303+
assertTrue(projectDrop.sameSchema(expectedDrop));
304+
}
305+
276306
@Test
277307
public void testSimpleScan() throws Exception {
278308
TableIdentifier tableId =

0 commit comments

Comments
 (0)