Skip to content

Commit a60a71e

Browse files
authored
[Iceberg AddFiles] Attach name-mapping to table (#38189)
* Attach name-mapping to table * add to tests
1 parent 4ee32cd commit a60a71e

2 files changed

Lines changed: 37 additions & 7 deletions

File tree

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
import org.apache.iceberg.PartitionSpec;
9595
import org.apache.iceberg.Snapshot;
9696
import org.apache.iceberg.Table;
97+
import org.apache.iceberg.TableProperties;
9798
import org.apache.iceberg.avro.Avro;
9899
import org.apache.iceberg.catalog.TableIdentifier;
99100
import org.apache.iceberg.exceptions.AlreadyExistsException;
@@ -102,6 +103,7 @@
102103
import org.apache.iceberg.io.OutputFile;
103104
import org.apache.iceberg.mapping.MappingUtil;
104105
import org.apache.iceberg.mapping.NameMapping;
106+
import org.apache.iceberg.mapping.NameMappingParser;
105107
import org.apache.iceberg.orc.OrcMetrics;
106108
import org.apache.iceberg.parquet.ParquetSchemaUtil;
107109
import org.apache.iceberg.parquet.ParquetUtil;
@@ -514,22 +516,37 @@ static <W, T> T transformValue(Transform<W, T> transform, Type type, ByteBuffer
514516

515517
private Table getOrCreateTable(String filePath, FileFormat format) throws IOException {
516518
TableIdentifier tableId = TableIdentifier.parse(identifier);
519+
@Nullable Table t;
517520
try {
518-
return catalogConfig.catalog().loadTable(tableId);
521+
t = catalogConfig.catalog().loadTable(tableId);
519522
} catch (NoSuchTableException e) {
520523
try {
521524
org.apache.iceberg.Schema schema = getSchema(filePath, format);
522525
PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema);
523526

524-
return tableProps == null
525-
? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec)
526-
: catalogConfig
527-
.catalog()
528-
.createTable(TableIdentifier.parse(identifier), schema, spec, tableProps);
527+
t =
528+
tableProps == null
529+
? catalogConfig
530+
.catalog()
531+
.createTable(TableIdentifier.parse(identifier), schema, spec)
532+
: catalogConfig
533+
.catalog()
534+
.createTable(TableIdentifier.parse(identifier), schema, spec, tableProps);
529535
} catch (AlreadyExistsException e2) { // if table already exists, just load it
530-
return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
536+
t = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
531537
}
532538
}
539+
ensureNameMappingPresent(t);
540+
return t;
541+
}
542+
543+
private static void ensureNameMappingPresent(Table table) {
544+
if (table.properties().get(TableProperties.DEFAULT_NAME_MAPPING) == null) {
545+
// Forces Name based resolution instead of position based resolution
546+
NameMapping mapping = MappingUtil.create(table.schema());
547+
String mappingJson = NameMappingParser.toJson(mapping);
548+
table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson).commit();
549+
}
533550
}
534551

535552
/**

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.apache.iceberg.Snapshot;
6565
import org.apache.iceberg.StructLike;
6666
import org.apache.iceberg.Table;
67+
import org.apache.iceberg.TableProperties;
6768
import org.apache.iceberg.catalog.TableIdentifier;
6869
import org.apache.iceberg.data.GenericRecord;
6970
import org.apache.iceberg.data.Record;
@@ -72,6 +73,7 @@
7273
import org.apache.iceberg.io.DataWriter;
7374
import org.apache.iceberg.io.InputFile;
7475
import org.apache.iceberg.mapping.MappingUtil;
76+
import org.apache.iceberg.mapping.NameMappingParser;
7577
import org.apache.iceberg.parquet.Parquet;
7678
import org.apache.iceberg.types.Conversions;
7779
import org.apache.iceberg.types.Types;
@@ -219,6 +221,12 @@ public void testAddFilesWithPartitionPath(boolean isPartitioned) throws Exceptio
219221
// check partition metadata is preserved
220222
assertEquals(writtenDf1.partition(), addedDf1.partition());
221223
assertEquals(writtenDf2.partition(), addedDf2.partition());
224+
225+
// check that mapping util was added
226+
assertEquals(
227+
MappingUtil.create(icebergSchema).asMappedFields(),
228+
NameMappingParser.fromJson(table.properties().get(TableProperties.DEFAULT_NAME_MAPPING))
229+
.asMappedFields());
222230
}
223231

224232
@Test
@@ -319,6 +327,11 @@ && checkStateNotNull(errorRow.getString(1))
319327
// check partition metadata is preserved
320328
assertEquals(expectedPartition1, addedDf1.partition());
321329
assertEquals(expectedPartition2, addedDf2.partition());
330+
331+
assertEquals(
332+
MappingUtil.create(icebergSchema).asMappedFields(),
333+
NameMappingParser.fromJson(table.properties().get(TableProperties.DEFAULT_NAME_MAPPING))
334+
.asMappedFields());
322335
}
323336

324337
@Test

0 commit comments

Comments
 (0)