From 7bed2e7b2be3bda3e7f9fbf63b301cbd2657ebb5 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 16 Jan 2025 06:56:50 +0000 Subject: [PATCH 1/5] limit max files in merged scan tasks --- daft/context.py | 3 +++ daft/daft/__init__.pyi | 3 +++ src/common/daft-config/src/lib.rs | 2 ++ src/common/daft-config/src/python.rs | 10 ++++++++++ src/daft-dsl/src/expr/mod.rs | 2 +- src/daft-scan/src/scan_task_iters/mod.rs | 17 +++++++++++------ 6 files changed, 30 insertions(+), 7 deletions(-) diff --git a/daft/context.py b/daft/context.py index d96400980d..f1892e2575 100644 --- a/daft/context.py +++ b/daft/context.py @@ -331,6 +331,7 @@ def set_execution_config( config: PyDaftExecutionConfig | None = None, scan_tasks_min_size_bytes: int | None = None, scan_tasks_max_size_bytes: int | None = None, + max_files_per_scan_task: int | None = None, broadcast_join_size_bytes_threshold: int | None = None, parquet_split_row_groups_max_files: int | None = None, sort_merge_join_sort_with_aligned_boundaries: bool | None = None, @@ -368,6 +369,7 @@ def set_execution_config( scan_tasks_max_size_bytes: Maximum size in bytes when merging ScanTasks when reading files from storage. Increasing this value will increase the upper bound of the size of merged ScanTasks, which leads to bigger but fewer partitions. (Defaults to 384 MiB) + max_files_per_scan_task: Maximum number of files to read in a single ScanTask. (Defaults to 10) broadcast_join_size_bytes_threshold: If one side of a join is smaller than this threshold, a broadcast join will be used. Default is 10 MiB. parquet_split_row_groups_max_files: Maximum number of files to read in which the row group splitting should happen. (Defaults to 10) @@ -406,6 +408,7 @@ def set_execution_config( new_daft_execution_config = old_daft_execution_config.with_config_values( scan_tasks_min_size_bytes=scan_tasks_min_size_bytes, scan_tasks_max_size_bytes=scan_tasks_max_size_bytes, + max_files_per_scan_task=max_files_per_scan_task, broadcast_join_size_bytes_threshold=broadcast_join_size_bytes_threshold, parquet_split_row_groups_max_files=parquet_split_row_groups_max_files, sort_merge_join_sort_with_aligned_boundaries=sort_merge_join_sort_with_aligned_boundaries, diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 457bdbb895..5ad1dfb736 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1703,6 +1703,7 @@ class PyDaftExecutionConfig: self, scan_tasks_min_size_bytes: int | None = None, scan_tasks_max_size_bytes: int | None = None, + max_files_per_scan_task: int | None = None, broadcast_join_size_bytes_threshold: int | None = None, parquet_split_row_groups_max_files: int | None = None, sort_merge_join_sort_with_aligned_boundaries: bool | None = None, @@ -1731,6 +1732,8 @@ class PyDaftExecutionConfig: @property def scan_tasks_max_size_bytes(self) -> int: ... @property + def max_files_per_scan_task(self) -> int: ... + @property def broadcast_join_size_bytes_threshold(self) -> int: ... @property def sort_merge_join_sort_with_aligned_boundaries(self) -> bool: ... diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index a23090d753..762e8b5f90 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -40,6 +40,7 @@ impl DaftPlanningConfig { pub struct DaftExecutionConfig { pub scan_tasks_min_size_bytes: usize, pub scan_tasks_max_size_bytes: usize, + pub max_files_per_scan_task: usize, pub broadcast_join_size_bytes_threshold: usize, pub sort_merge_join_sort_with_aligned_boundaries: bool, pub hash_join_partition_size_leniency: f64, @@ -69,6 +70,7 @@ impl Default for DaftExecutionConfig { Self { scan_tasks_min_size_bytes: 96 * 1024 * 1024, // 96MB scan_tasks_max_size_bytes: 384 * 1024 * 1024, // 384MB + max_files_per_scan_task: 10, broadcast_join_size_bytes_threshold: 10 * 1024 * 1024, // 10 MiB sort_merge_join_sort_with_aligned_boundaries: false, hash_join_partition_size_leniency: 0.5, diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index 3371ef349c..449f7d47f8 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -78,6 +78,7 @@ impl PyDaftExecutionConfig { #[pyo3(signature = ( scan_tasks_min_size_bytes=None, scan_tasks_max_size_bytes=None, + max_files_per_scan_task=None, broadcast_join_size_bytes_threshold=None, parquet_split_row_groups_max_files=None, sort_merge_join_sort_with_aligned_boundaries=None, @@ -105,6 +106,7 @@ impl PyDaftExecutionConfig { &self, scan_tasks_min_size_bytes: Option, scan_tasks_max_size_bytes: Option, + max_files_per_scan_task: Option, broadcast_join_size_bytes_threshold: Option, parquet_split_row_groups_max_files: Option, sort_merge_join_sort_with_aligned_boundaries: Option, @@ -136,6 +138,9 @@ impl PyDaftExecutionConfig { if let Some(scan_tasks_min_size_bytes) = scan_tasks_min_size_bytes { config.scan_tasks_min_size_bytes = scan_tasks_min_size_bytes; } + if let Some(max_files_per_scan_task) = max_files_per_scan_task { + config.max_files_per_scan_task = max_files_per_scan_task; + } if let Some(broadcast_join_size_bytes_threshold) = broadcast_join_size_bytes_threshold { config.broadcast_join_size_bytes_threshold = broadcast_join_size_bytes_threshold; } @@ -236,6 +241,11 @@ impl PyDaftExecutionConfig { Ok(self.config.scan_tasks_max_size_bytes) } + #[getter] + fn get_max_files_per_scan_task(&self) -> PyResult { + Ok(self.config.max_files_per_scan_task) + } + #[getter] fn get_broadcast_join_size_bytes_threshold(&self) -> PyResult { Ok(self.config.broadcast_join_size_bytes_threshold) diff --git a/src/daft-dsl/src/expr/mod.rs b/src/daft-dsl/src/expr/mod.rs index 876ff26cb4..10d91af86c 100644 --- a/src/daft-dsl/src/expr/mod.rs +++ b/src/daft-dsl/src/expr/mod.rs @@ -1406,5 +1406,5 @@ pub fn estimated_selectivity(expr: &Expr, schema: &Schema) -> f64 { // Everything else doesn't filter Expr::Subquery(_) => 1.0, Expr::Agg(_) => panic!("Aggregates are not allowed in WHERE clauses"), - } + }.max(0.01) } diff --git a/src/daft-scan/src/scan_task_iters/mod.rs b/src/daft-scan/src/scan_task_iters/mod.rs index 226d4c3ee2..7b6ce15deb 100644 --- a/src/daft-scan/src/scan_task_iters/mod.rs +++ b/src/daft-scan/src/scan_task_iters/mod.rs @@ -26,6 +26,7 @@ type BoxScanTaskIter<'a> = Box> + 'a /// * `scan_tasks`: A Boxed Iterator of ScanTaskRefs to perform merging on /// * `min_size_bytes`: Minimum size in bytes of a ScanTask, after which no more merging will be performed /// * `max_size_bytes`: Maximum size in bytes of a ScanTask, capping the maximum size of a merged ScanTask +/// * `max_source_count`: Maximum number of ScanTasks to merge #[must_use] fn merge_by_sizes<'a>( scan_tasks: BoxScanTaskIter<'a>, @@ -57,6 +58,7 @@ fn merge_by_sizes<'a>( target_upper_bound_size_bytes: (limit_bytes * 1.5) as usize, target_lower_bound_size_bytes: (limit_bytes / 2.) as usize, accumulator: None, + max_source_count: cfg.max_files_per_scan_task, }) as BoxScanTaskIter; } } @@ -69,6 +71,7 @@ fn merge_by_sizes<'a>( target_upper_bound_size_bytes: cfg.scan_tasks_max_size_bytes, target_lower_bound_size_bytes: cfg.scan_tasks_min_size_bytes, accumulator: None, + max_source_count: cfg.max_files_per_scan_task, }) as BoxScanTaskIter } } @@ -83,6 +86,9 @@ struct MergeByFileSize<'a> { // Current element being accumulated on accumulator: Option, + + // Maximum number of files in a merged ScanTask + max_source_count: usize, } impl<'a> MergeByFileSize<'a> { @@ -92,11 +98,10 @@ impl<'a> MergeByFileSize<'a> { /// in estimated bytes, as well as other factors including any limit pushdowns. fn accumulator_ready(&self) -> bool { // Emit the accumulator as soon as it is bigger than the specified `target_lower_bound_size_bytes` - if let Some(acc) = &self.accumulator - && let Some(acc_bytes) = acc.estimate_in_memory_size_bytes(Some(self.cfg)) - && acc_bytes >= self.target_lower_bound_size_bytes - { - true + if let Some(acc) = &self.accumulator { + acc.sources.len() >= self.max_source_count + || acc.estimate_in_memory_size_bytes(Some(self.cfg)) + .map_or(false, |bytes| bytes >= self.target_lower_bound_size_bytes) } else { false } @@ -143,7 +148,7 @@ impl<'a> Iterator for MergeByFileSize<'a> { }; } - // Emit accumulator if ready + // Emit accumulator if ready or if merge count limit is reached if self.accumulator_ready() { return self.accumulator.take().map(Ok); } From 2d3620479295b0d96e7e8451e4dfa27da82398ae Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 16 Jan 2025 07:00:24 +0000 Subject: [PATCH 2/5] limit max files in merged scan tasks --- src/daft-dsl/src/expr/mod.rs | 3 ++- src/daft-scan/src/scan_task_iters/mod.rs | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/daft-dsl/src/expr/mod.rs b/src/daft-dsl/src/expr/mod.rs index 10d91af86c..3e51f57751 100644 --- a/src/daft-dsl/src/expr/mod.rs +++ b/src/daft-dsl/src/expr/mod.rs @@ -1406,5 +1406,6 @@ pub fn estimated_selectivity(expr: &Expr, schema: &Schema) -> f64 { // Everything else doesn't filter Expr::Subquery(_) => 1.0, Expr::Agg(_) => panic!("Aggregates are not allowed in WHERE clauses"), - }.max(0.01) + } + .max(0.01) } diff --git a/src/daft-scan/src/scan_task_iters/mod.rs b/src/daft-scan/src/scan_task_iters/mod.rs index 7b6ce15deb..33924c55fd 100644 --- a/src/daft-scan/src/scan_task_iters/mod.rs +++ b/src/daft-scan/src/scan_task_iters/mod.rs @@ -100,8 +100,9 @@ impl<'a> MergeByFileSize<'a> { // Emit the accumulator as soon as it is bigger than the specified `target_lower_bound_size_bytes` if let Some(acc) = &self.accumulator { acc.sources.len() >= self.max_source_count - || acc.estimate_in_memory_size_bytes(Some(self.cfg)) - .map_or(false, |bytes| bytes >= self.target_lower_bound_size_bytes) + || acc + .estimate_in_memory_size_bytes(Some(self.cfg)) + .map_or(false, |bytes| bytes >= self.target_lower_bound_size_bytes) } else { false } From e1297274cc320cf41eca0b601cb1f46444f01539 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 16 Jan 2025 07:03:20 +0000 Subject: [PATCH 3/5] rename to max sources --- daft/context.py | 6 +++--- daft/daft/__init__.pyi | 4 ++-- src/common/daft-config/src/lib.rs | 4 ++-- src/common/daft-config/src/python.rs | 12 ++++++------ src/daft-scan/src/scan_task_iters/mod.rs | 4 ++-- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/daft/context.py b/daft/context.py index f1892e2575..76ba82325c 100644 --- a/daft/context.py +++ b/daft/context.py @@ -331,7 +331,7 @@ def set_execution_config( config: PyDaftExecutionConfig | None = None, scan_tasks_min_size_bytes: int | None = None, scan_tasks_max_size_bytes: int | None = None, - max_files_per_scan_task: int | None = None, + max_sources_per_scan_task: int | None = None, broadcast_join_size_bytes_threshold: int | None = None, parquet_split_row_groups_max_files: int | None = None, sort_merge_join_sort_with_aligned_boundaries: bool | None = None, @@ -369,7 +369,7 @@ def set_execution_config( scan_tasks_max_size_bytes: Maximum size in bytes when merging ScanTasks when reading files from storage. Increasing this value will increase the upper bound of the size of merged ScanTasks, which leads to bigger but fewer partitions. (Defaults to 384 MiB) - max_files_per_scan_task: Maximum number of files to read in a single ScanTask. (Defaults to 10) + max_sources_per_scan_task: Maximum number of sources in a single ScanTask. (Defaults to 10) broadcast_join_size_bytes_threshold: If one side of a join is smaller than this threshold, a broadcast join will be used. Default is 10 MiB. parquet_split_row_groups_max_files: Maximum number of files to read in which the row group splitting should happen. (Defaults to 10) @@ -408,7 +408,7 @@ def set_execution_config( new_daft_execution_config = old_daft_execution_config.with_config_values( scan_tasks_min_size_bytes=scan_tasks_min_size_bytes, scan_tasks_max_size_bytes=scan_tasks_max_size_bytes, - max_files_per_scan_task=max_files_per_scan_task, + max_sources_per_scan_task=max_sources_per_scan_task, broadcast_join_size_bytes_threshold=broadcast_join_size_bytes_threshold, parquet_split_row_groups_max_files=parquet_split_row_groups_max_files, sort_merge_join_sort_with_aligned_boundaries=sort_merge_join_sort_with_aligned_boundaries, diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 5ad1dfb736..3e40c8800f 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1703,7 +1703,7 @@ class PyDaftExecutionConfig: self, scan_tasks_min_size_bytes: int | None = None, scan_tasks_max_size_bytes: int | None = None, - max_files_per_scan_task: int | None = None, + max_sources_per_scan_task: int | None = None, broadcast_join_size_bytes_threshold: int | None = None, parquet_split_row_groups_max_files: int | None = None, sort_merge_join_sort_with_aligned_boundaries: bool | None = None, @@ -1732,7 +1732,7 @@ class PyDaftExecutionConfig: @property def scan_tasks_max_size_bytes(self) -> int: ... @property - def max_files_per_scan_task(self) -> int: ... + def max_sources_per_scan_task(self) -> int: ... @property def broadcast_join_size_bytes_threshold(self) -> int: ... @property diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index 762e8b5f90..ddd367a6e3 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -40,7 +40,7 @@ impl DaftPlanningConfig { pub struct DaftExecutionConfig { pub scan_tasks_min_size_bytes: usize, pub scan_tasks_max_size_bytes: usize, - pub max_files_per_scan_task: usize, + pub max_sources_per_scan_task: usize, pub broadcast_join_size_bytes_threshold: usize, pub sort_merge_join_sort_with_aligned_boundaries: bool, pub hash_join_partition_size_leniency: f64, @@ -70,7 +70,7 @@ impl Default for DaftExecutionConfig { Self { scan_tasks_min_size_bytes: 96 * 1024 * 1024, // 96MB scan_tasks_max_size_bytes: 384 * 1024 * 1024, // 384MB - max_files_per_scan_task: 10, + max_sources_per_scan_task: 10, broadcast_join_size_bytes_threshold: 10 * 1024 * 1024, // 10 MiB sort_merge_join_sort_with_aligned_boundaries: false, hash_join_partition_size_leniency: 0.5, diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index 449f7d47f8..bb6c7f9b4d 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -78,7 +78,7 @@ impl PyDaftExecutionConfig { #[pyo3(signature = ( scan_tasks_min_size_bytes=None, scan_tasks_max_size_bytes=None, - max_files_per_scan_task=None, + max_sources_per_scan_task=None, broadcast_join_size_bytes_threshold=None, parquet_split_row_groups_max_files=None, sort_merge_join_sort_with_aligned_boundaries=None, @@ -106,7 +106,7 @@ impl PyDaftExecutionConfig { &self, scan_tasks_min_size_bytes: Option, scan_tasks_max_size_bytes: Option, - max_files_per_scan_task: Option, + max_sources_per_scan_task: Option, broadcast_join_size_bytes_threshold: Option, parquet_split_row_groups_max_files: Option, sort_merge_join_sort_with_aligned_boundaries: Option, @@ -138,8 +138,8 @@ impl PyDaftExecutionConfig { if let Some(scan_tasks_min_size_bytes) = scan_tasks_min_size_bytes { config.scan_tasks_min_size_bytes = scan_tasks_min_size_bytes; } - if let Some(max_files_per_scan_task) = max_files_per_scan_task { - config.max_files_per_scan_task = max_files_per_scan_task; + if let Some(max_sources_per_scan_task) = max_sources_per_scan_task { + config.max_sources_per_scan_task = max_sources_per_scan_task; } if let Some(broadcast_join_size_bytes_threshold) = broadcast_join_size_bytes_threshold { config.broadcast_join_size_bytes_threshold = broadcast_join_size_bytes_threshold; @@ -242,8 +242,8 @@ impl PyDaftExecutionConfig { } #[getter] - fn get_max_files_per_scan_task(&self) -> PyResult { - Ok(self.config.max_files_per_scan_task) + fn get_max_sources_per_scan_task(&self) -> PyResult { + Ok(self.config.max_sources_per_scan_task) } #[getter] diff --git a/src/daft-scan/src/scan_task_iters/mod.rs b/src/daft-scan/src/scan_task_iters/mod.rs index 33924c55fd..87ab489a13 100644 --- a/src/daft-scan/src/scan_task_iters/mod.rs +++ b/src/daft-scan/src/scan_task_iters/mod.rs @@ -58,7 +58,7 @@ fn merge_by_sizes<'a>( target_upper_bound_size_bytes: (limit_bytes * 1.5) as usize, target_lower_bound_size_bytes: (limit_bytes / 2.) as usize, accumulator: None, - max_source_count: cfg.max_files_per_scan_task, + max_source_count: cfg.max_sources_per_scan_task, }) as BoxScanTaskIter; } } @@ -71,7 +71,7 @@ fn merge_by_sizes<'a>( target_upper_bound_size_bytes: cfg.scan_tasks_max_size_bytes, target_lower_bound_size_bytes: cfg.scan_tasks_min_size_bytes, accumulator: None, - max_source_count: cfg.max_files_per_scan_task, + max_source_count: cfg.max_sources_per_scan_task, }) as BoxScanTaskIter } } From 18ca1381b4c265f0be94f0b312af8f43d9b169d2 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 16 Jan 2025 18:53:08 +0000 Subject: [PATCH 4/5] clean up --- src/daft-dsl/src/expr/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/daft-dsl/src/expr/mod.rs b/src/daft-dsl/src/expr/mod.rs index 3e51f57751..876ff26cb4 100644 --- a/src/daft-dsl/src/expr/mod.rs +++ b/src/daft-dsl/src/expr/mod.rs @@ -1407,5 +1407,4 @@ pub fn estimated_selectivity(expr: &Expr, schema: &Schema) -> f64 { Expr::Subquery(_) => 1.0, Expr::Agg(_) => panic!("Aggregates are not allowed in WHERE clauses"), } - .max(0.01) } From a5b9f5ed43428e64c1864c0d32e9637dd0450cd0 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Mon, 27 Jan 2025 10:25:10 -0800 Subject: [PATCH 5/5] test --- tests/io/test_merge_scan_tasks.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/io/test_merge_scan_tasks.py b/tests/io/test_merge_scan_tasks.py index dd7696d8c4..9bd1773917 100644 --- a/tests/io/test_merge_scan_tasks.py +++ b/tests/io/test_merge_scan_tasks.py @@ -81,3 +81,15 @@ def test_merge_scan_task_limit_override(csv_files): ): df = daft.read_csv(str(csv_files)).limit(1) assert df.num_partitions() == 3, "Should have 3 partitions [(CSV1, CSV2, CSV3)] since we have a limit 1" + + +def test_merge_scan_task_up_to_max_sources(csv_files): + with daft.execution_config_ctx( + scan_tasks_min_size_bytes=30, + scan_tasks_max_size_bytes=30, + max_sources_per_scan_task=2, + ): + df = daft.read_csv(str(csv_files)) + assert ( + df.num_partitions() == 2 + ), "Should have 2 partitions [(CSV1, CSV2), (CSV3)] since the third CSV is too large to merge with the first two, and max_sources_per_scan_task is set to 2"