From 4823714d2a7e754ee9c34616277cd36a3118e9fa Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Wed, 22 Jan 2025 11:16:21 +0100 Subject: [PATCH 1/2] Flink: Add null check to writers to prevent resurrecting null values Flink's BinaryRowData uses a magic byte to indicate null values in the backing byte arrays. Flink's internal RowData#createFieldGetter method which Iceberg uses, only adds a null check whenever a type is nullable. We map Iceberg's optional attribute to nullable, but Iceberg's required attribute to non-nullable. The latter creates an issue when the user, by mistake, nulls a field. The resulting RowData field will then be interpreted as actual data because the null field is not checked. This yields random values which should have been null and produced an error in the writer. The solution is to always check if a field is nullable before attempting to read data from it. --- .../org/apache/iceberg/data/DataTest.java | 20 ++++++++ .../iceberg/data/avro/TestGenericData.java | 14 ++++- .../iceberg/data/orc/TestGenericData.java | 12 +++++ .../iceberg/data/parquet/TestGenericData.java | 13 ++++- ...TestParquetEncryptionWithWriteSupport.java | 5 ++ .../flink/data/FlinkParquetWriters.java | 6 +++ .../AbstractTestFlinkAvroReaderWriter.java | 12 ++--- .../flink/data/TestFlinkOrcReaderWriter.java | 14 ++++- .../flink/data/TestFlinkParquetReader.java | 5 ++ .../flink/data/TestFlinkParquetWriter.java | 17 +++++++ .../iceberg/flink/sink/TestIcebergSink.java | 51 +++++++++++++++++++ 11 files changed, 160 insertions(+), 9 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/DataTest.java b/data/src/test/java/org/apache/iceberg/data/DataTest.java index 657fa805e5a6..f052ff33a0c8 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTest.java +++ b/data/src/test/java/org/apache/iceberg/data/DataTest.java @@ -27,6 +27,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.file.Path; +import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -40,6 +41,7 @@ import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.DateTimeUtil; import org.assertj.core.api.Assumptions; +import org.junit.Assert; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -50,6 +52,8 @@ public abstract class DataTest { protected abstract void writeAndValidate(Schema schema) throws IOException; + protected abstract void writeAndValidate(Schema schema, List data) throws IOException; + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { throw new UnsupportedEncodingException( "Cannot run test, writeAndValidate(Schema, Schema) is not implemented"); @@ -486,4 +490,20 @@ public void testPrimitiveTypeDefaultValues(Type.PrimitiveType type, Object defau writeAndValidate(writeSchema, readSchema); } + + @Test + public void testWriteNullValueForRequiredType() { + Schema schema = + new Schema( + required(0, "id", LongType.get()), required(1, "string", Types.StringType.get())); + + GenericRecord genericRecord = GenericRecord.create(schema); + genericRecord.set(0, 42L); + genericRecord.set(1, null); + + Assert.assertThrows( + // The actual exception depends on the implementation, e.g. + // NullPointerException or IllegalArgumentException. + Exception.class, () -> writeAndValidate(schema, List.of(genericRecord))); + } } diff --git a/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java index bf5160fd18dc..615f4a4bc082 100644 --- a/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java @@ -35,14 +35,26 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; public class TestGenericData extends DataTest { + @Override protected void writeAndValidate(Schema schema) throws IOException { writeAndValidate(schema, schema); } + @Override + protected void writeAndValidate(Schema writeSchema, List expectedData) + throws IOException { + writeAndValidate(writeSchema, writeSchema, expectedData); + } + @Override protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { - List expected = RandomGenericData.generate(writeSchema, 100, 0L); + List data = RandomGenericData.generate(writeSchema, 100, 0L); + writeAndValidate(writeSchema, expectedSchema, data); + } + + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, List expected) + throws IOException { File testFile = File.createTempFile("junit", null, temp.toFile()); assertThat(testFile.delete()).isTrue(); diff --git a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java index 5147fd377c62..e25a52e2477b 100644 --- a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java @@ -50,6 +50,7 @@ import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.LongColumnVector; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; +import org.junit.Ignore; import org.junit.jupiter.api.Test; public class TestGenericData extends DataTest { @@ -61,6 +62,11 @@ protected void writeAndValidate(Schema schema) throws IOException { writeAndValidateRecords(schema, expected); } + @Override + protected void writeAndValidate(Schema schema, List expectedData) throws IOException { + writeAndValidateRecords(schema, expectedData); + } + @Test public void writeAndValidateRepeatingRecords() throws IOException { Schema structSchema = @@ -237,4 +243,10 @@ private void writeAndValidateRecords(Schema schema, List expected) throw DataTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i)); } } + + @Override + @Ignore("ORC file format supports null values even for required fields") + public void testWriteNullValueForRequiredType() { + super.testWriteNullValueForRequiredType(); + } } diff --git a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java index 5a63f7a1fc9d..47f00e8dc08c 100644 --- a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java @@ -46,14 +46,25 @@ import org.junit.jupiter.api.Test; public class TestGenericData extends DataTest { + @Override protected void writeAndValidate(Schema schema) throws IOException { writeAndValidate(schema, schema); } + @Override + protected void writeAndValidate(Schema schema, List expected) throws IOException { + writeAndValidate(schema, schema, expected); + } + @Override protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { - List expected = RandomGenericData.generate(writeSchema, 100, 12228L); + List data = RandomGenericData.generate(writeSchema, 100, 12228L); + writeAndValidate(writeSchema, expectedSchema, data); + } + + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, List expected) + throws IOException { File testFile = File.createTempFile("junit", null, temp.toFile()); assertThat(testFile.delete()).isTrue(); diff --git a/data/src/test/java/org/apache/iceberg/data/parquet/TestParquetEncryptionWithWriteSupport.java b/data/src/test/java/org/apache/iceberg/data/parquet/TestParquetEncryptionWithWriteSupport.java index 4b0a10830221..a401c326dea6 100644 --- a/data/src/test/java/org/apache/iceberg/data/parquet/TestParquetEncryptionWithWriteSupport.java +++ b/data/src/test/java/org/apache/iceberg/data/parquet/TestParquetEncryptionWithWriteSupport.java @@ -59,6 +59,11 @@ public class TestParquetEncryptionWithWriteSupport extends DataTest { @Override protected void writeAndValidate(Schema schema) throws IOException { List expected = RandomGenericData.generate(schema, 100, 0L); + writeAndValidate(schema, expected); + } + + @Override + protected void writeAndValidate(Schema schema, List expected) throws IOException { File testFile = File.createTempFile("junit", null, temp.toFile()); assertThat(testFile.delete()).isTrue(); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index db4f1730a134..e4d9af699c5b 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -498,6 +498,12 @@ private static class RowDataWriter extends ParquetValueWriters.StructWriter expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1991L); - writeAndValidate(schema, expectedRecords, NUM_RECORDS); + writeAndValidate(schema, expectedRecords); } protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema); - private void writeAndValidate(Schema schema, List expectedRecords, int numRecord) - throws IOException { + @Override + protected void writeAndValidate(Schema schema, List expectedRecords) throws IOException { RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords)); @@ -93,7 +93,7 @@ private void writeAndValidate(Schema schema, List expectedRecords, int n try (CloseableIterable reader = createAvroReadBuilder(recordsFile, schema).build()) { Iterator expected = expectedRecords.iterator(); Iterator rows = reader.iterator(); - for (int i = 0; i < numRecord; i++) { + for (int i = 0; i < expectedRecords.size(); i++) { assertThat(rows).hasNext(); TestHelpers.assertRowData(schema.asStruct(), flinkSchema, expected.next(), rows.next()); } @@ -120,7 +120,7 @@ private void writeAndValidate(Schema schema, List expectedRecords, int n .build()) { Iterator expected = expectedRows.iterator(); Iterator records = reader.iterator(); - for (int i = 0; i < numRecord; i += 1) { + for (int i = 0; i < expectedRecords.size(); i += 1) { assertThat(records).hasNext(); TestHelpers.assertRowData(schema.asStruct(), flinkSchema, records.next(), expected.next()); } @@ -177,6 +177,6 @@ public void testNumericTypes() throws IOException { 1643811742000L, 10.24d)); - writeAndValidate(SCHEMA_NUM_TYPE, expected, 2); + writeAndValidate(SCHEMA_NUM_TYPE, expected); } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java index 72f2ce4f4bce..4772b07c0dc7 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java @@ -39,14 +39,20 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Ignore; public class TestFlinkOrcReaderWriter extends DataTest { private static final int NUM_RECORDS = 100; @Override protected void writeAndValidate(Schema schema) throws IOException { - RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1990L); + writeAndValidate(schema, expectedRecords); + } + + @Override + protected void writeAndValidate(Schema schema, List expectedRecords) throws IOException { + RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords)); File recordsFile = File.createTempFile("junit", null, temp.toFile()); @@ -104,4 +110,10 @@ protected void writeAndValidate(Schema schema) throws IOException { assertThat(records).isExhausted(); } } + + @Override + @Ignore("ORC file format supports null values even for required fields") + public void testWriteNullValueForRequiredType() { + super.testWriteNullValueForRequiredType(); + } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 4cfb24f62921..f366a349e70f 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -236,4 +236,9 @@ protected void writeAndValidate(Schema schema) throws IOException { RandomGenericData.generateFallbackRecords(schema, NUM_RECORDS, 21124, NUM_RECORDS / 20), schema); } + + @Override + protected void writeAndValidate(Schema schema, List expectedData) throws IOException { + writeAndValidate(expectedData, schema); + } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java index b1e6f5aa00ff..a6d06bf11146 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java @@ -24,7 +24,10 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Iterator; +import java.util.List; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.logical.LogicalType; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; @@ -33,10 +36,12 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.io.TempDir; public class TestFlinkParquetWriter extends DataTest { @@ -91,4 +96,16 @@ protected void writeAndValidate(Schema schema) throws IOException { schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)), schema); } + + @Override + protected void writeAndValidate(Schema schema, List expectedData) throws IOException { + RowDataSerializer rowDataSerializer = new RowDataSerializer(FlinkSchemaUtil.convert(schema)); + List binaryRowList = Lists.newArrayList(); + for (Record record : expectedData) { + RowData rowData = RowDataConverter.convert(schema, record); + BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData); + binaryRowList.add(binaryRow); + } + writeAndValidate(binaryRowList, schema); + } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java index 7f355c1e8403..fb2375e798c5 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.sink; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -27,20 +28,24 @@ import java.util.Map; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.types.Row; +import org.apache.flink.util.ExceptionUtils; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.flink.MiniFlinkClusterExtension; import org.apache.iceberg.flink.SimpleDataUtil; @@ -51,6 +56,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Assume; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -414,6 +422,49 @@ void testOperatorsUidNameWitUidSuffix() throws Exception { assertThat(secondTransformation.getName()).isEqualTo("data-ingestion"); } + @TestTemplate + void testErrorOnNullForRequiredField() throws Exception { + Assume.assumeFalse( + "ORC file format supports null values even for required fields.", format == FileFormat.ORC); + + Schema icebergSchema = + new Schema( + Types.NestedField.required(1, "id2", Types.IntegerType.get()), + Types.NestedField.required(2, "data2", Types.StringType.get())); + Table table2 = + CATALOG_EXTENSION + .catalog() + .createTable( + TableIdentifier.of(DATABASE, "t2"), + icebergSchema, + PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + + // Null out a required field + List rows = List.of(Row.of(42, null)); + + env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream dataStream = + env.addSource(createBoundedSource(rows), ROW_TYPE_INFO).uid("mySourceId"); + + TableSchema flinkSchema = FlinkSchemaUtil.toSchema(icebergSchema); + IcebergSink.forRow(dataStream, flinkSchema) + .table(table2) + .tableLoader(tableLoader) + .tableSchema(flinkSchema) + .writeParallelism(parallelism) + .distributionMode(DistributionMode.HASH) + .append(); + + try { + env.execute(); + Assert.fail(); + } catch (JobExecutionException e) { + assertThat(ExceptionUtils.findThrowable(e, t -> t instanceof NullPointerException)); + } + } + private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode) throws Exception { List rows = createRows(""); From 59dacfb6606db94cbf55c89c6e53a29e8528649a Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Wed, 22 Jan 2025 11:16:21 +0100 Subject: [PATCH 2/2] Re-add 1.18 and 1.19 changes because they inherit base class changes --- .../flink/data/FlinkParquetWriters.java | 6 +++ .../AbstractTestFlinkAvroReaderWriter.java | 12 ++--- .../flink/data/TestFlinkOrcReaderWriter.java | 14 ++++- .../flink/data/TestFlinkParquetReader.java | 5 ++ .../flink/data/TestFlinkParquetWriter.java | 17 +++++++ .../iceberg/flink/data/FlinkOrcWriters.java | 3 ++ .../flink/data/FlinkParquetWriters.java | 6 +++ .../AbstractTestFlinkAvroReaderWriter.java | 12 ++--- .../flink/data/TestFlinkOrcReaderWriter.java | 14 ++++- .../flink/data/TestFlinkParquetReader.java | 5 ++ .../flink/data/TestFlinkParquetWriter.java | 17 +++++++ .../iceberg/flink/sink/TestIcebergSink.java | 51 +++++++++++++++++++ 12 files changed, 148 insertions(+), 14 deletions(-) diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index db4f1730a134..e4d9af699c5b 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -498,6 +498,12 @@ private static class RowDataWriter extends ParquetValueWriters.StructWriter expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1991L); - writeAndValidate(schema, expectedRecords, NUM_RECORDS); + writeAndValidate(schema, expectedRecords); } protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema); - private void writeAndValidate(Schema schema, List expectedRecords, int numRecord) - throws IOException { + @Override + protected void writeAndValidate(Schema schema, List expectedRecords) throws IOException { RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords)); @@ -93,7 +93,7 @@ private void writeAndValidate(Schema schema, List expectedRecords, int n try (CloseableIterable reader = createAvroReadBuilder(recordsFile, schema).build()) { Iterator expected = expectedRecords.iterator(); Iterator rows = reader.iterator(); - for (int i = 0; i < numRecord; i++) { + for (int i = 0; i < expectedRecords.size(); i++) { assertThat(rows).hasNext(); TestHelpers.assertRowData(schema.asStruct(), flinkSchema, expected.next(), rows.next()); } @@ -120,7 +120,7 @@ private void writeAndValidate(Schema schema, List expectedRecords, int n .build()) { Iterator expected = expectedRows.iterator(); Iterator records = reader.iterator(); - for (int i = 0; i < numRecord; i += 1) { + for (int i = 0; i < expectedRecords.size(); i += 1) { assertThat(records).hasNext(); TestHelpers.assertRowData(schema.asStruct(), flinkSchema, records.next(), expected.next()); } @@ -177,6 +177,6 @@ public void testNumericTypes() throws IOException { 1643811742000L, 10.24d)); - writeAndValidate(SCHEMA_NUM_TYPE, expected, 2); + writeAndValidate(SCHEMA_NUM_TYPE, expected); } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java index 72f2ce4f4bce..4772b07c0dc7 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java @@ -39,14 +39,20 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Ignore; public class TestFlinkOrcReaderWriter extends DataTest { private static final int NUM_RECORDS = 100; @Override protected void writeAndValidate(Schema schema) throws IOException { - RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1990L); + writeAndValidate(schema, expectedRecords); + } + + @Override + protected void writeAndValidate(Schema schema, List expectedRecords) throws IOException { + RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords)); File recordsFile = File.createTempFile("junit", null, temp.toFile()); @@ -104,4 +110,10 @@ protected void writeAndValidate(Schema schema) throws IOException { assertThat(records).isExhausted(); } } + + @Override + @Ignore("ORC file format supports null values even for required fields") + public void testWriteNullValueForRequiredType() { + super.testWriteNullValueForRequiredType(); + } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 4cfb24f62921..f366a349e70f 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -236,4 +236,9 @@ protected void writeAndValidate(Schema schema) throws IOException { RandomGenericData.generateFallbackRecords(schema, NUM_RECORDS, 21124, NUM_RECORDS / 20), schema); } + + @Override + protected void writeAndValidate(Schema schema, List expectedData) throws IOException { + writeAndValidate(expectedData, schema); + } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java index b1e6f5aa00ff..a6d06bf11146 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java @@ -24,7 +24,10 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Iterator; +import java.util.List; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.logical.LogicalType; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; @@ -33,10 +36,12 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.io.TempDir; public class TestFlinkParquetWriter extends DataTest { @@ -91,4 +96,16 @@ protected void writeAndValidate(Schema schema) throws IOException { schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)), schema); } + + @Override + protected void writeAndValidate(Schema schema, List expectedData) throws IOException { + RowDataSerializer rowDataSerializer = new RowDataSerializer(FlinkSchemaUtil.convert(schema)); + List binaryRowList = Lists.newArrayList(); + for (Record record : expectedData) { + RowData rowData = RowDataConverter.convert(schema, record); + BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData); + binaryRowList.add(binaryRow); + } + writeAndValidate(binaryRowList, schema); + } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java index da2f95cf822f..61a52a2ff641 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java @@ -304,6 +304,9 @@ static class RowDataWriter extends GenericOrcWriters.StructWriter { @Override protected Object get(RowData struct, int index) { + if (struct.isNullAt(index)) { + return null; + } return fieldGetters.get(index).getFieldOrNull(struct); } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index db4f1730a134..e4d9af699c5b 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -498,6 +498,12 @@ private static class RowDataWriter extends ParquetValueWriters.StructWriter expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1991L); - writeAndValidate(schema, expectedRecords, NUM_RECORDS); + writeAndValidate(schema, expectedRecords); } protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema); - private void writeAndValidate(Schema schema, List expectedRecords, int numRecord) - throws IOException { + @Override + protected void writeAndValidate(Schema schema, List expectedRecords) throws IOException { RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords)); @@ -93,7 +93,7 @@ private void writeAndValidate(Schema schema, List expectedRecords, int n try (CloseableIterable reader = createAvroReadBuilder(recordsFile, schema).build()) { Iterator expected = expectedRecords.iterator(); Iterator rows = reader.iterator(); - for (int i = 0; i < numRecord; i++) { + for (int i = 0; i < expectedRecords.size(); i++) { assertThat(rows).hasNext(); TestHelpers.assertRowData(schema.asStruct(), flinkSchema, expected.next(), rows.next()); } @@ -120,7 +120,7 @@ private void writeAndValidate(Schema schema, List expectedRecords, int n .build()) { Iterator expected = expectedRows.iterator(); Iterator records = reader.iterator(); - for (int i = 0; i < numRecord; i += 1) { + for (int i = 0; i < expectedRecords.size(); i += 1) { assertThat(records).hasNext(); TestHelpers.assertRowData(schema.asStruct(), flinkSchema, records.next(), expected.next()); } @@ -177,6 +177,6 @@ public void testNumericTypes() throws IOException { 1643811742000L, 10.24d)); - writeAndValidate(SCHEMA_NUM_TYPE, expected, 2); + writeAndValidate(SCHEMA_NUM_TYPE, expected); } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java index 72f2ce4f4bce..4772b07c0dc7 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java @@ -39,14 +39,20 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Ignore; public class TestFlinkOrcReaderWriter extends DataTest { private static final int NUM_RECORDS = 100; @Override protected void writeAndValidate(Schema schema) throws IOException { - RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1990L); + writeAndValidate(schema, expectedRecords); + } + + @Override + protected void writeAndValidate(Schema schema, List expectedRecords) throws IOException { + RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords)); File recordsFile = File.createTempFile("junit", null, temp.toFile()); @@ -104,4 +110,10 @@ protected void writeAndValidate(Schema schema) throws IOException { assertThat(records).isExhausted(); } } + + @Override + @Ignore("ORC file format supports null values even for required fields") + public void testWriteNullValueForRequiredType() { + super.testWriteNullValueForRequiredType(); + } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 4cfb24f62921..f366a349e70f 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -236,4 +236,9 @@ protected void writeAndValidate(Schema schema) throws IOException { RandomGenericData.generateFallbackRecords(schema, NUM_RECORDS, 21124, NUM_RECORDS / 20), schema); } + + @Override + protected void writeAndValidate(Schema schema, List expectedData) throws IOException { + writeAndValidate(expectedData, schema); + } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java index b1e6f5aa00ff..a6d06bf11146 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java @@ -24,7 +24,10 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Iterator; +import java.util.List; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.logical.LogicalType; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; @@ -33,10 +36,12 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.io.TempDir; public class TestFlinkParquetWriter extends DataTest { @@ -91,4 +96,16 @@ protected void writeAndValidate(Schema schema) throws IOException { schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)), schema); } + + @Override + protected void writeAndValidate(Schema schema, List expectedData) throws IOException { + RowDataSerializer rowDataSerializer = new RowDataSerializer(FlinkSchemaUtil.convert(schema)); + List binaryRowList = Lists.newArrayList(); + for (Record record : expectedData) { + RowData rowData = RowDataConverter.convert(schema, record); + BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData); + binaryRowList.add(binaryRow); + } + writeAndValidate(binaryRowList, schema); + } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java index 7f355c1e8403..fb2375e798c5 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.sink; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -27,20 +28,24 @@ import java.util.Map; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.types.Row; +import org.apache.flink.util.ExceptionUtils; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.flink.MiniFlinkClusterExtension; import org.apache.iceberg.flink.SimpleDataUtil; @@ -51,6 +56,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Assume; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -414,6 +422,49 @@ void testOperatorsUidNameWitUidSuffix() throws Exception { assertThat(secondTransformation.getName()).isEqualTo("data-ingestion"); } + @TestTemplate + void testErrorOnNullForRequiredField() throws Exception { + Assume.assumeFalse( + "ORC file format supports null values even for required fields.", format == FileFormat.ORC); + + Schema icebergSchema = + new Schema( + Types.NestedField.required(1, "id2", Types.IntegerType.get()), + Types.NestedField.required(2, "data2", Types.StringType.get())); + Table table2 = + CATALOG_EXTENSION + .catalog() + .createTable( + TableIdentifier.of(DATABASE, "t2"), + icebergSchema, + PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + + // Null out a required field + List rows = List.of(Row.of(42, null)); + + env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream dataStream = + env.addSource(createBoundedSource(rows), ROW_TYPE_INFO).uid("mySourceId"); + + TableSchema flinkSchema = FlinkSchemaUtil.toSchema(icebergSchema); + IcebergSink.forRow(dataStream, flinkSchema) + .table(table2) + .tableLoader(tableLoader) + .tableSchema(flinkSchema) + .writeParallelism(parallelism) + .distributionMode(DistributionMode.HASH) + .append(); + + try { + env.execute(); + Assert.fail(); + } catch (JobExecutionException e) { + assertThat(ExceptionUtils.findThrowable(e, t -> t instanceof NullPointerException)); + } + } + private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode) throws Exception { List rows = createRows("");