Skip to content

Commit

Permalink
Derive name mapping from schema when ORC iceberg.id is missing
Browse files Browse the repository at this point in the history
  • Loading branch information
weijiii committed Jan 24, 2025
1 parent 76d1e6f commit d5f34c2
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.MappedField;
import org.apache.iceberg.mapping.MappedFields;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
Expand Down Expand Up @@ -265,7 +266,7 @@ public ConnectorPageSource createPageSource(
split.getFileFormat(),
split.getFileIoProperties(),
split.getDataSequenceNumber(),
tableHandle.getNameMappingJson().map(NameMappingParser::fromJson));
tableHandle.getNameMappingJson().map(NameMappingParser::fromJson).orElseGet(() -> MappingUtil.create(schema)));
}

public ConnectorPageSource createPageSource(
Expand All @@ -287,7 +288,7 @@ public ConnectorPageSource createPageSource(
IcebergFileFormat fileFormat,
Map<String, String> fileIoProperties,
long dataSequenceNumber,
Optional<NameMapping> nameMapping)
NameMapping nameMapping)
{
Set<IcebergColumnHandle> deleteFilterRequiredColumns = requiredColumnsForDeletes(tableSchema, deletes);
Map<Integer, Optional<String>> partitionKeys = getPartitionKeys(partitionData, partitionSpec);
Expand Down Expand Up @@ -491,7 +492,7 @@ private ReaderPageSourceWithRowPositions createDataPageSource(
Schema fileSchema,
List<IcebergColumnHandle> dataColumns,
TupleDomain<IcebergColumnHandle> predicate,
Optional<NameMapping> nameMapping,
NameMapping nameMapping,
Map<Integer, Optional<String>> partitionKeys)
{
return switch (fileFormat) {
Expand Down Expand Up @@ -587,7 +588,7 @@ private static ReaderPageSourceWithRowPositions createOrcPageSource(
OrcReaderOptions options,
FileFormatDataSourceStats stats,
TypeManager typeManager,
Optional<NameMapping> nameMapping,
NameMapping nameMapping,
Map<Integer, Optional<String>> partitionKeys)
{
OrcDataSource orcDataSource = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ public final class OrcIcebergIds
{
private OrcIcebergIds() {}

public static Map<Integer, OrcColumn> fileColumnsByIcebergId(OrcReader reader, Optional<NameMapping> nameMapping)
public static Map<Integer, OrcColumn> fileColumnsByIcebergId(OrcReader reader, NameMapping nameMapping)
{
List<OrcColumn> fileColumns = reader.getRootColumn().getNestedColumns();

if (nameMapping.isPresent() && !hasIds(reader.getRootColumn())) {
if (!hasIds(reader.getRootColumn())) {
fileColumns = fileColumns.stream()
.map(orcColumn -> setMissingFieldIds(orcColumn, nameMapping.get(), ImmutableList.of(orcColumn.getColumnName())))
.map(orcColumn -> setMissingFieldIds(orcColumn, nameMapping, ImmutableList.of(orcColumn.getColumnName())))
.collect(toImmutableList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static Metrics fileMetrics(TrinoInputFile file, MetricsConfig metricsConf
Footer footer = reader.get().getFooter();

// use name mapping to compute missing Iceberg field IDs
Optional<NameMapping> nameMapping = Optional.of(MappingUtil.create(schema));
NameMapping nameMapping = MappingUtil.create(schema);
Map<OrcColumnId, OrcColumn> mappedColumns = fileColumnsByIcebergId(reader.get(), nameMapping)
.values().stream()
.collect(toImmutableMap(OrcColumn::getColumnId, identity()));
Expand Down

0 comments on commit d5f34c2

Please sign in to comment.