diff --git a/dataframe-arrow/build.gradle.kts b/dataframe-arrow/build.gradle.kts index a5d8304f2b..0c000431ec 100644 --- a/dataframe-arrow/build.gradle.kts +++ b/dataframe-arrow/build.gradle.kts @@ -15,6 +15,7 @@ dependencies { implementation(libs.arrow.vector) implementation(libs.arrow.format) implementation(libs.arrow.memory) + implementation(libs.arrow.dataset) implementation(libs.commonsCompress) implementation(libs.kotlin.reflect) implementation(libs.kotlin.datetimeJvm) diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt index dac1fe680e..bf34f0c83f 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt @@ -1,5 +1,6 @@ package org.jetbrains.kotlinx.dataframe.io +import org.apache.arrow.dataset.file.FileFormat import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.ipc.ArrowReader import org.apache.commons.compress.utils.SeekableInMemoryByteChannel @@ -186,3 +187,11 @@ public fun DataFrame.Companion.readArrow( public fun ArrowReader.toDataFrame( nullability: NullabilityOptions = NullabilityOptions.Infer ): AnyFrame = DataFrame.Companion.readArrowImpl(this, nullability) + +/** + * Read [Parquet](https://parquet.apache.org/) data from existing [url] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html) + */ +public fun DataFrame.Companion.readParquet( + url: URL, + nullability: NullabilityOptions = NullabilityOptions.Infer +): AnyFrame = readArrowDataset(url.toString(), FileFormat.PARQUET, nullability) diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt index 842551d534..308aa2d215 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt @@ -1,5 +1,10 @@ package org.jetbrains.kotlinx.dataframe.io +import org.apache.arrow.dataset.file.FileFormat +import org.apache.arrow.dataset.file.FileSystemDatasetFactory +import org.apache.arrow.dataset.jni.DirectReservationListener +import org.apache.arrow.dataset.jni.NativeMemoryPool +import org.apache.arrow.dataset.scanner.ScanOptions import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.BigIntVector import org.apache.arrow.vector.BitVector @@ -330,3 +335,27 @@ internal fun DataFrame.Companion.readArrowImpl( return flattened.concatKeepingSchema() } } + +internal fun DataFrame.Companion.readArrowDataset( + fileUri: String, + fileFormat: FileFormat, + nullability: NullabilityOptions = NullabilityOptions.Infer, +): AnyFrame { + val scanOptions = ScanOptions(32768) + RootAllocator().use { allocator -> + FileSystemDatasetFactory( + allocator, + NativeMemoryPool.createListenable(DirectReservationListener.instance()), + fileFormat, + fileUri + ).use { datasetFactory -> + datasetFactory.finish().use { dataset -> + dataset.newScan(scanOptions).use { scanner -> + scanner.scanBatches().use { reader -> + return readArrow(reader, nullability) + } + } + } + } + } +} diff --git a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt index b095b16f52..81d46536c3 100644 --- a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt +++ b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt @@ -33,7 +33,6 @@ import org.jetbrains.kotlinx.dataframe.api.columnOf import org.jetbrains.kotlinx.dataframe.api.convertToBoolean import org.jetbrains.kotlinx.dataframe.api.copy import org.jetbrains.kotlinx.dataframe.api.dataFrameOf -import org.jetbrains.kotlinx.dataframe.api.describe import org.jetbrains.kotlinx.dataframe.api.map import org.jetbrains.kotlinx.dataframe.api.pathOf import org.jetbrains.kotlinx.dataframe.api.remove @@ -613,4 +612,17 @@ internal class ArrowKtTest { DataFrame.readArrow(dbArrowReader) shouldBe expected } } + + @Test + fun testReadParquet(){ + val path = testResource("test.arrow.parquet").path + val dataFrame = DataFrame.readParquet(URL("file:$path")) + dataFrame.rowsCount() shouldBe 300 + assertEstimations( + exampleFrame = dataFrame, + expectedNullable = false, + hasNulls = false, + fromParquet = true + ) + } } diff --git a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt index 66a2713518..74b0bcb405 100644 --- a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt +++ b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt @@ -18,7 +18,12 @@ import kotlin.reflect.typeOf * Assert that we have got the same data that was originally saved on example creation. * Example generation project is currently located at https://github.com/Kopilov/arrow_example */ -internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean, hasNulls: Boolean) { +internal fun assertEstimations( + exampleFrame: AnyFrame, + expectedNullable: Boolean, + hasNulls: Boolean, + fromParquet: Boolean = false +) { /** * In [exampleFrame] we get two concatenated batches. To assert the estimations, we should transform frame row number to batch row number */ @@ -129,10 +134,19 @@ internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean assertValueOrNull(iBatch(i), element, LocalDate.ofEpochDay(iBatch(i).toLong() * 30)) } - val datetimeCol = exampleFrame["date64"] as DataColumn - datetimeCol.type() shouldBe typeOf().withNullability(expectedNullable) - datetimeCol.forEachIndexed { i, element -> - assertValueOrNull(iBatch(i), element, LocalDateTime.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC)) + if (fromParquet){ + //parquet format have only one type of date: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date without time + val datetimeCol = exampleFrame["date64"] as DataColumn + datetimeCol.type() shouldBe typeOf().withNullability(expectedNullable) + datetimeCol.forEachIndexed { i, element -> + assertValueOrNull(iBatch(i), element, LocalDate.ofEpochDay(iBatch(i).toLong() * 30)) + } + }else { + val datetimeCol = exampleFrame["date64"] as DataColumn + datetimeCol.type() shouldBe typeOf().withNullability(expectedNullable) + datetimeCol.forEachIndexed { i, element -> + assertValueOrNull(iBatch(i), element, LocalDateTime.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC)) + } } val timeSecCol = exampleFrame["time32_seconds"] as DataColumn diff --git a/dataframe-arrow/src/test/resources/test.arrow.parquet b/dataframe-arrow/src/test/resources/test.arrow.parquet new file mode 100644 index 0000000000..cf78b1c255 Binary files /dev/null and b/dataframe-arrow/src/test/resources/test.arrow.parquet differ diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 402002d817..76fce000e3 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -47,7 +47,7 @@ junit-platform = "1.10.2" kotestAsserions = "5.5.4" jsoup = "1.17.2" -arrow = "15.0.0" +arrow = "16.0.0" docProcessor = "0.3.5" simpleGit = "2.0.3" dependencyVersions = "0.51.0" @@ -98,6 +98,7 @@ jsoup = { group = "org.jsoup", name = "jsoup", version.ref = "jsoup" } arrow-format = { group = "org.apache.arrow", name = "arrow-format", version.ref = "arrow" } arrow-vector = { group = "org.apache.arrow", name = "arrow-vector", version.ref = "arrow" } arrow-memory = { group = "org.apache.arrow", name = "arrow-memory-unsafe", version.ref = "arrow" } +arrow-dataset = { group = "org.apache.arrow", name = "arrow-dataset", version.ref = "arrow" } arrow-c-data = { group = "org.apache.arrow", name = "arrow-c-data", version.ref = "arrow" }