From 6f640886b6125f159b42f90503614c329f9ff0b9 Mon Sep 17 00:00:00 2001 From: Jinyang Li Date: Wed, 15 Jan 2025 00:22:17 -0800 Subject: [PATCH] Fix splits generation from iceberg TableChangesSplitSource --- .../tablechanges/TableChangesSplitSource.java | 2 +- .../plugin/iceberg/IcebergTestUtils.java | 17 ++++-- .../TestIcebergParquetConnectorTest.java | 58 +++++++++++++++++++ 3 files changed, 72 insertions(+), 5 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplitSource.java index 097c0d04919d..6ee9fddd9303 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplitSource.java @@ -96,7 +96,7 @@ public CompletableFuture getNextBatch(int maxSize) @Override public boolean isFinished() { - return changelogScanIterator != null && !changelogScanIterator.hasNext(); + return changelogScanIterator != null && !changelogScanIterator.hasNext() && !fileTasksIterator.hasNext(); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java index 35f514551533..3078e25694f1 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java @@ -138,12 +138,9 @@ private static boolean checkOrcFileSorting(Supplier dataSourceSup @SuppressWarnings({"unchecked", "rawtypes"}) public static boolean checkParquetFileSorting(TrinoInputFile inputFile, String sortColumnName) { - ParquetMetadata parquetMetadata; + ParquetMetadata parquetMetadata = getParquetFileMetadata(inputFile); List blocks; try { - parquetMetadata = MetadataReader.readFooter( - new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), new FileFormatDataSourceStats()), - Optional.empty()); blocks = parquetMetadata.getBlocks(); } catch (IOException e) { @@ -216,4 +213,16 @@ public static Map getMetadataFileAndUpdatedMillis(TrinoFileSystem } return metadataFiles; } + + public static ParquetMetadata getParquetFileMetadata(TrinoInputFile inputFile) + { + try { + return MetadataReader.readFooter( + new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), new FileFormatDataSourceStats()), + Optional.empty()); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java index 6c6a9023e59e..28481bdc8ebe 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java @@ -13,9 +13,12 @@ */ package io.trino.plugin.iceberg; +import com.google.common.collect.Iterables; import io.trino.Session; +import io.trino.execution.QueryManagerConfig; import io.trino.filesystem.Location; import io.trino.operator.OperatorStats; +import io.trino.parquet.metadata.ParquetMetadata; import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; import io.trino.testing.QueryRunner.MaterializedResultWithPlan; @@ -23,14 +26,19 @@ import org.intellij.lang.annotations.Language; import org.junit.jupiter.api.Test; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET; import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting; +import static io.trino.plugin.iceberg.IcebergTestUtils.getParquetFileMetadata; import static io.trino.plugin.iceberg.IcebergTestUtils.withSmallRowGroups; import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder; +import static java.lang.String.format; +import static java.time.ZoneOffset.UTC; import static org.assertj.core.api.Assertions.assertThat; public class TestIcebergParquetConnectorTest @@ -146,6 +154,56 @@ public void testPushdownPredicateToParquetAfterColumnRename() } } + @Test + void testTableChangesOnMultiRowGroups() + throws Exception + { + try (TestTable table = newTrinoTable( + "test_table_changes_function_multi_row_groups_", + "AS SELECT orderkey, partkey, suppkey FROM tpch.tiny.lineitem WITH NO DATA")) { + long initialSnapshot = getMostRecentSnapshotId(table.getName()); + assertUpdate( + withSmallRowGroups(getSession()), + "INSERT INTO %s SELECT orderkey, partkey, suppkey FROM tpch.tiny.lineitem".formatted(table.getName()), + 60175L); + long snapshotAfterInsert = getMostRecentSnapshotId(table.getName()); + DateTimeFormatter instantMillisFormatter = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSVV").withZone(UTC); + String snapshotAfterInsertTime = getSnapshotTime(table.getName(), snapshotAfterInsert).format(instantMillisFormatter); + + // make sure splits are processed in more than one batch + // Decrease parquet row groups size or add more columns if this test fails + String filePath = getOnlyTableFilePath(table.getName()); + ParquetMetadata parquetMetadata = getParquetFileMetadata(fileSystem.newInputFile(Location.of(filePath))); + int blocksSize = parquetMetadata.getBlocks().size(); + int splitBatchSize = new QueryManagerConfig().getScheduleSplitBatchSize(); + assertThat(blocksSize > splitBatchSize && blocksSize % splitBatchSize != 0).isTrue(); + + assertQuery( + """ + SELECT orderkey, partkey, suppkey, _change_type, _change_version_id, to_iso8601(_change_timestamp), _change_ordinal + FROM TABLE(system.table_changes(CURRENT_SCHEMA, '%s', %s, %s)) + """.formatted(table.getName(), initialSnapshot, snapshotAfterInsert), + "SELECT orderkey, partkey, suppkey, 'insert', %s, '%s', 0 FROM lineitem".formatted(snapshotAfterInsert, snapshotAfterInsertTime)); + } + } + + private String getOnlyTableFilePath(String tableName) + { + return (String) Iterables.getOnlyElement(getQueryRunner().execute(format("SELECT file_path FROM \"%s$files\"", tableName)).getOnlyColumnAsSet()); + } + + private long getMostRecentSnapshotId(String tableName) + { + return (long) Iterables.getOnlyElement(getQueryRunner().execute(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC LIMIT 1", tableName)) + .getOnlyColumnAsSet()); + } + + private ZonedDateTime getSnapshotTime(String tableName, long snapshotId) + { + return (ZonedDateTime) Iterables.getOnlyElement(getQueryRunner().execute(format("SELECT committed_at FROM \"%s$snapshots\" WHERE snapshot_id = %s", tableName, snapshotId)) + .getOnlyColumnAsSet()); + } + @Override protected boolean isFileSorted(String path, String sortColumnName) {