Skip to content

Commit

Permalink
feat: Expose parquet chunk size to swordfish reads (#3714)
Browse files Browse the repository at this point in the history
Parquet reads in swordfish currently do not respect the `_chunk_size`
parameter that users pass in.

Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
colin-ho and Colin Ho authored Jan 29, 2025
1 parent d00e444 commit 4048816
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/daft-local-execution/src/sources/scan_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ async fn stream_scan_task(
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit,
field_id_mapping,
chunk_size,
..
}) => {
let inference_options =
Expand Down Expand Up @@ -373,6 +374,7 @@ async fn stream_scan_task(
metadata,
maintain_order,
delete_rows,
*chunk_size,
)
.await?
}
Expand Down
8 changes: 8 additions & 0 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ async fn stream_parquet_single(
metadata: Option<Arc<FileMetaData>>,
delete_rows: Option<Vec<i64>>,
maintain_order: bool,
chunk_size: Option<usize>,
) -> DaftResult<impl Stream<Item = DaftResult<Table>> + Send> {
let field_id_mapping_provided = field_id_mapping.is_some();
let columns_to_return = columns.map(|s| s.iter().map(|s| (*s).to_string()).collect_vec());
Expand Down Expand Up @@ -417,6 +418,7 @@ async fn stream_parquet_single(
metadata,
maintain_order,
io_stats,
chunk_size,
)
.await
} else {
Expand All @@ -427,6 +429,9 @@ async fn stream_parquet_single(
field_id_mapping,
)
.await?;

let builder = builder.set_chunk_size(chunk_size);

let builder = builder.set_infer_schema_options(schema_infer_options);

let builder = if let Some(columns) = &columns_to_read {
Expand Down Expand Up @@ -873,6 +878,7 @@ pub async fn stream_parquet(
metadata: Option<Arc<FileMetaData>>,
maintain_order: bool,
delete_rows: Option<Vec<i64>>,
chunk_size: Option<usize>,
) -> DaftResult<BoxStream<'static, DaftResult<Table>>> {
let stream = stream_parquet_single(
uri.to_string(),
Expand All @@ -887,6 +893,7 @@ pub async fn stream_parquet(
metadata,
delete_rows,
maintain_order,
chunk_size,
)
.await?;
Ok(Box::pin(stream))
Expand Down Expand Up @@ -1169,6 +1176,7 @@ mod tests {
None,
false,
None,
None,
)
.await?
.collect::<Vec<_>>()
Expand Down
3 changes: 2 additions & 1 deletion src/daft-parquet/src/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,11 +599,12 @@ pub async fn local_parquet_stream(
metadata: Option<Arc<parquet2::metadata::FileMetaData>>,
maintain_order: bool,
io_stats: Option<IOStatsRef>,
chunk_size: Option<usize>,
) -> DaftResult<(
Arc<parquet2::metadata::FileMetaData>,
BoxStream<'static, DaftResult<Table>>,
)> {
let chunk_size = PARQUET_MORSEL_SIZE;
let chunk_size = chunk_size.unwrap_or(PARQUET_MORSEL_SIZE);
let (metadata, schema_ref, row_ranges, column_iters) = local_parquet_read_into_column_iters(
uri,
columns.as_deref(),
Expand Down

0 comments on commit 4048816

Please sign in to comment.