Skip to content

Commit 4e24a9c

Browse files
authored
[IcebergIO AddFiles] Apply name mapping at commit time (#38232)
* apply name mapping at commit time * trigger ITs
1 parent f300e59 commit 4e24a9c

File tree

2 files changed

+18
-23
lines changed

2 files changed

+18
-23
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 2
3+
"modification": 3
44
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -516,37 +516,22 @@ static <W, T> T transformValue(Transform<W, T> transform, Type type, ByteBuffer
516516

517517
private Table getOrCreateTable(String filePath, FileFormat format) throws IOException {
518518
TableIdentifier tableId = TableIdentifier.parse(identifier);
519-
@Nullable Table t;
520519
try {
521-
t = catalogConfig.catalog().loadTable(tableId);
520+
return catalogConfig.catalog().loadTable(tableId);
522521
} catch (NoSuchTableException e) {
523522
try {
524523
org.apache.iceberg.Schema schema = getSchema(filePath, format);
525524
PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema);
526525

527-
t =
528-
tableProps == null
529-
? catalogConfig
530-
.catalog()
531-
.createTable(TableIdentifier.parse(identifier), schema, spec)
532-
: catalogConfig
533-
.catalog()
534-
.createTable(TableIdentifier.parse(identifier), schema, spec, tableProps);
526+
return tableProps == null
527+
? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec)
528+
: catalogConfig
529+
.catalog()
530+
.createTable(TableIdentifier.parse(identifier), schema, spec, tableProps);
535531
} catch (AlreadyExistsException e2) { // if table already exists, just load it
536-
t = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
532+
return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
537533
}
538534
}
539-
ensureNameMappingPresent(t);
540-
return t;
541-
}
542-
543-
private static void ensureNameMappingPresent(Table table) {
544-
if (table.properties().get(TableProperties.DEFAULT_NAME_MAPPING) == null) {
545-
// Forces Name based resolution instead of position based resolution
546-
NameMapping mapping = MappingUtil.create(table.schema());
547-
String mappingJson = NameMappingParser.toJson(mapping);
548-
table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson).commit();
549-
}
550535
}
551536

552537
/**
@@ -743,6 +728,15 @@ public CommitManifestFilesDoFn(IcebergCatalogConfig catalogConfig, String identi
743728
this.identifier = identifier;
744729
}
745730

731+
private static void ensureNameMappingPresent(Table table) {
732+
if (table.properties().get(TableProperties.DEFAULT_NAME_MAPPING) == null) {
733+
// Forces Name based resolution instead of position based resolution
734+
NameMapping mapping = MappingUtil.create(table.schema());
735+
String mappingJson = NameMappingParser.toJson(mapping);
736+
table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson).commit();
737+
}
738+
}
739+
746740
@ProcessElement
747741
public void process(
748742
@Element KV<String, Iterable<byte[]>> batch,
@@ -758,6 +752,7 @@ public void process(
758752
table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
759753
}
760754
table.refresh();
755+
ensureNameMappingPresent(table);
761756

762757
if (shouldSkip(commitId, lastCommitTimestamp.read())) {
763758
return;

0 commit comments

Comments
 (0)