From 618aa589b17284c3a4061122b26e36e6e2e52767 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Thu, 19 Dec 2024 00:25:01 +0200 Subject: [PATCH 1/7] fix: Wrap is_null expressions in parens, to avoid operator precedence issues Expression like `(foo IS NOT NULL = bar IS NOT NULL)`` would try to compare `foo IS NOT NULL` with `bar`, not with `bar IS NOT NULL` --- .../src/adapter/BaseQuery.js | 2 +- rust/cubesql/cubesql/src/compile/mod.rs | 17 ++++++++++++----- rust/cubesql/cubesql/src/compile/test/mod.rs | 2 +- .../cubesql/src/compile/test/test_wrapper.rs | 2 +- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index f7154b203e0e6..d7863713f6b2a 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -3327,7 +3327,7 @@ export class BaseQuery { column_aliased: '{{expr}} {{quoted_alias}}', query_aliased: '{{ query }} AS {{ quoted_alias }}', case: 'CASE{% if expr %} {{ expr }}{% endif %}{% for when, then in when_then %} WHEN {{ when }} THEN {{ then }}{% endfor %}{% if else_expr %} ELSE {{ else_expr }}{% endif %} END', - is_null: '{{ expr }} IS {% if negate %}NOT {% endif %}NULL', + is_null: '({{ expr }} IS {% if negate %}NOT {% endif %}NULL)', binary: '({{ left }} {{ op }} {{ right }})', sort: '{{ expr }} {% if asc %}ASC{% else %}DESC{% endif %} NULLS {% if nulls_first %}FIRST{% else %}LAST{% endif %}', order_by: '{% if index %} {{ index }} {% else %} {{ expr }} {% endif %} {% if asc %}ASC{% else %}DESC{% endif %}{% if nulls_first %} NULLS FIRST{% endif %}', diff --git a/rust/cubesql/cubesql/src/compile/mod.rs b/rust/cubesql/cubesql/src/compile/mod.rs index 36ea607b8f932..5d1691ffe5a92 100644 --- a/rust/cubesql/cubesql/src/compile/mod.rs +++ b/rust/cubesql/cubesql/src/compile/mod.rs @@ -12188,7 +12188,7 @@ ORDER BY "source"."str0" ASC } init_testing_logger(); - let logical_plan = convert_select_to_query_plan( + let query_plan = convert_select_to_query_plan( r#" WITH "qt_0" AS ( SELECT "ta_1"."customer_gender" "ca_1" @@ -12205,13 +12205,20 @@ ORDER BY "source"."str0" ASC .to_string(), DatabaseProtocol::PostgreSQL, ) - .await - .as_logical_plan(); + .await; + + let physical_plan = query_plan.as_physical_plan().await.unwrap(); + println!( + "Physical plan: {}", + displayable(physical_plan.as_ref()).indent() + ); + + let logical_plan = query_plan.as_logical_plan(); let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql; - // check wrapping for `NOT(.. IS NULL OR LOWER(..) IN)` - let re = Regex::new(r"NOT \(.+ IS NULL OR .*LOWER\(.+ IN ").unwrap(); + // check wrapping for `NOT((.. IS NULL) OR LOWER(..) IN)` + let re = Regex::new(r"NOT \(\(.+ IS NULL\) OR .*LOWER\(.+ IN ").unwrap(); assert!(re.is_match(&sql)); } diff --git a/rust/cubesql/cubesql/src/compile/test/mod.rs b/rust/cubesql/cubesql/src/compile/test/mod.rs index 8c07ac6b7041a..c9b789270b973 100644 --- a/rust/cubesql/cubesql/src/compile/test/mod.rs +++ b/rust/cubesql/cubesql/src/compile/test/mod.rs @@ -578,7 +578,7 @@ OFFSET {{ offset }}{% endif %}"#.to_string(), "{{expr}} {{quoted_alias}}".to_string(), ), ("expressions/binary".to_string(), "({{ left }} {{ op }} {{ right }})".to_string()), - ("expressions/is_null".to_string(), "{{ expr }} IS {% if negate %}NOT {% endif %}NULL".to_string()), + ("expressions/is_null".to_string(), "({{ expr }} IS {% if negate %}NOT {% endif %}NULL)".to_string()), ("expressions/case".to_string(), "CASE{% if expr %} {{ expr }}{% endif %}{% for when, then in when_then %} WHEN {{ when }} THEN {{ then }}{% endfor %}{% if else_expr %} ELSE {{ else_expr }}{% endif %} END".to_string()), ("expressions/sort".to_string(), "{{ expr }} {% if asc %}ASC{% else %}DESC{% endif %}{% if nulls_first %} NULLS FIRST {% endif %}".to_string()), ("expressions/cast".to_string(), "CAST({{ expr }} AS {{ data_type }})".to_string()), diff --git a/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs b/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs index fc6267d32c44e..abc8e1a8b506e 100644 --- a/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs @@ -1174,7 +1174,7 @@ WHERE let last_mod_month_re = Regex::new(r#""logs_alias"."[a-zA-Z0-9_]{1,16}" "last_mod_month""#).unwrap(); assert!(last_mod_month_re.is_match(&sql)); - let sum_price_re = Regex::new(r#"CASE WHEN "logs_alias"."[a-zA-Z0-9_]{1,16}" IS NOT NULL THEN "logs_alias"."[a-zA-Z0-9_]{1,16}" ELSE 0 END "sum_price""#) + let sum_price_re = Regex::new(r#"CASE WHEN \("logs_alias"."[a-zA-Z0-9_]{1,16}" IS NOT NULL\) THEN "logs_alias"."[a-zA-Z0-9_]{1,16}" ELSE 0 END "sum_price""#) .unwrap(); assert!(sum_price_re.is_match(&sql)); let cube_user_re = Regex::new(r#""logs_alias"."[a-zA-Z0-9_]{1,16}" "cube_user""#).unwrap(); From d22c768fc64318d52284a468f8db05c02339e854 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Tue, 28 Jan 2025 19:43:24 +0200 Subject: [PATCH 2/7] fix(cubesql): Replace alias to cube during wrapper pull up --- .../src/compile/rewrite/rules/members.rs | 2 +- .../rewrite/rules/wrapper/wrapper_pull_up.rs | 80 +++++++++++++++++-- 2 files changed, 75 insertions(+), 7 deletions(-) diff --git a/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs b/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs index f5f7c809fc428..d23bb3718fb1c 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs @@ -1437,7 +1437,7 @@ impl MemberRules { } } - fn replace_alias( + pub fn replace_alias( alias_to_cube: &Vec<(String, String)>, projection_alias: &Option, ) -> Vec<(String, String)> { diff --git a/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/wrapper_pull_up.rs b/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/wrapper_pull_up.rs index 6030fd7574a05..5c7d698e5e58a 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/wrapper_pull_up.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/wrapper_pull_up.rs @@ -2,14 +2,15 @@ use crate::{ compile::rewrite::{ cube_scan_wrapper, rewriter::{CubeEGraph, CubeRewrite}, - rules::wrapper::WrapperRules, + rules::{members::MemberRules, wrapper::WrapperRules}, transforming_rewrite, wrapped_select, wrapped_select_having_expr_empty_tail, wrapped_select_joins_empty_tail, wrapper_pullup_replacer, wrapper_replacer_context, - WrappedSelectSelectType, WrappedSelectType, + LogicalPlanLanguage, WrappedSelectAlias, WrappedSelectSelectType, WrappedSelectType, + WrapperReplacerContextAliasToCube, }, var, var_iter, var_list_iter, }; -use egg::Subst; +use egg::{Subst, Var}; impl WrapperRules { pub fn wrapper_pull_up_rules(&self, rules: &mut Vec) { @@ -150,7 +151,7 @@ impl WrapperRules { "?select_ungrouped_scan", ), wrapper_replacer_context( - "?alias_to_cube", + "?alias_to_cube_out", // This is fixed to false for any LHS because we should only allow to push to Cube when from is ungrouped CubeScan // And after pulling replacer over this node it will be WrappedSelect(from=CubeScan), so it should not allow to push for whatever LP is on top of it "WrapperReplacerContextPushToCube:false", @@ -162,7 +163,12 @@ impl WrapperRules { ), "CubeScanWrapperFinalized:false", ), - self.transform_pull_up_wrapper_select("?cube_scan_input"), + self.transform_pull_up_wrapper_select( + "?cube_scan_input", + "?alias_to_cube", + "?select_alias", + "?alias_to_cube_out", + ), ), transforming_rewrite( "wrapper-pull-up-to-cube-scan-non-trivial-wrapped-select", @@ -338,7 +344,7 @@ impl WrapperRules { "?select_ungrouped_scan", ), wrapper_replacer_context( - "?alias_to_cube", + "?alias_to_cube_out", // This is fixed to false for any LHS because we should only allow to push to Cube when from is ungrouped CubeSCan // And after pulling replacer over this node it will be WrappedSelect(from=WrappedSelect), so it should not allow to push for whatever LP is on top of it "WrapperReplacerContextPushToCube:false", @@ -359,20 +365,66 @@ impl WrapperRules { "?inner_projection_expr", "?inner_group_expr", "?inner_aggr_expr", + "?alias_to_cube", + "?select_alias", + "?alias_to_cube_out", ), ), ]); } + fn replace_aliases( + egraph: &mut CubeEGraph, + subst: &mut Subst, + alias_to_cube_var: Var, + select_alias_var: Var, + alias_to_cube_out_var: Var, + ) -> bool { + for alias_to_cube in var_iter!( + egraph[subst[alias_to_cube_var]], + WrapperReplacerContextAliasToCube + ) { + for projection_alias in var_iter!(egraph[subst[select_alias_var]], WrappedSelectAlias) { + let replaced_alias_to_cube = + MemberRules::replace_alias(&alias_to_cube, &projection_alias); + let new_alias_to_cube = + egraph.add(LogicalPlanLanguage::WrapperReplacerContextAliasToCube( + WrapperReplacerContextAliasToCube(replaced_alias_to_cube), + )); + subst.insert(alias_to_cube_out_var, new_alias_to_cube); + return true; + } + } + + false + } + fn transform_pull_up_wrapper_select( &self, cube_scan_input_var: &'static str, + alias_to_cube_var: &'static str, + select_alias_var: &'static str, + alias_to_cube_out_var: &'static str, ) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool { let cube_scan_input_var = var!(cube_scan_input_var); + let alias_to_cube_var = var!(alias_to_cube_var); + let select_alias_var = var!(select_alias_var); + let alias_to_cube_out_var = var!(alias_to_cube_out_var); move |egraph, subst| { for _ in var_list_iter!(egraph[subst[cube_scan_input_var]], WrappedSelect).cloned() { return false; } + + if !Self::replace_aliases( + egraph, + subst, + alias_to_cube_var, + select_alias_var, + alias_to_cube_out_var, + ) { + return false; + } + true } } @@ -387,12 +439,28 @@ impl WrapperRules { inner_projection_expr_var: &'static str, _inner_group_expr_var: &'static str, _inner_aggr_expr_var: &'static str, + alias_to_cube_var: &'static str, + select_alias_var: &'static str, + alias_to_cube_out_var: &'static str, ) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool { let select_type_var = var!(select_type_var); let projection_expr_var = var!(projection_expr_var); let inner_select_type_var = var!(inner_select_type_var); let inner_projection_expr_var = var!(inner_projection_expr_var); + let alias_to_cube_var = var!(alias_to_cube_var); + let select_alias_var = var!(select_alias_var); + let alias_to_cube_out_var = var!(alias_to_cube_out_var); move |egraph, subst| { + if !Self::replace_aliases( + egraph, + subst, + alias_to_cube_var, + select_alias_var, + alias_to_cube_out_var, + ) { + return false; + } + for select_type in var_iter!(egraph[subst[select_type_var]], WrappedSelectSelectType).cloned() { From 52c0bb7178b17d5ec6170492cc76fed12f17a7e2 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Tue, 28 Jan 2025 23:45:20 +0200 Subject: [PATCH 3/7] fix(cubesql): Make zero members wrapper more expensive than filter member --- rust/cubesql/cubesql/src/compile/rewrite/cost.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rust/cubesql/cubesql/src/compile/rewrite/cost.rs b/rust/cubesql/cubesql/src/compile/rewrite/cost.rs index 2d46d8dfc5bff..55fc18125026c 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/cost.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/cost.rs @@ -234,6 +234,7 @@ impl BestCubePlan { /// - `empty_wrappers` > `non_detected_cube_scans` - we don't want empty wrapper to hide non detected cube scan errors /// - `non_detected_cube_scans` > other nodes - minimize cube scans without members /// - `filters` > `filter_members` - optimize for push down of filters +/// - `zero_members_wrapper` > `filter_members` - prefer CubeScan(filters) to WrappedSelect(CubeScan(*), filters) /// - `filter_members` > `cube_members` - optimize for `inDateRange` filter push down to time dimension /// - `member_errors` > `cube_members` - extra cube members may be required (e.g. CASE) /// - `member_errors` > `wrapper_nodes` - use SQL push down where possible if cube scan can't be detected @@ -259,12 +260,12 @@ pub struct CubePlanCost { wrapped_select_ungrouped_scan: usize, filters: i64, structure_points: i64, - filter_members: i64, // This is separate from both non_detected_cube_scans and cube_members // Because it's ok to use all members inside wrapper (so non_detected_cube_scans would be zero) // And we want to select representation with less members // But only when members are present! zero_members_wrapper: i64, + filter_members: i64, cube_members: i64, errors: i64, time_dimensions_used_as_dimensions: i64, From cdbf178a8425625e3487efe58ee8079b0f412035 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Mon, 13 Jan 2025 20:48:18 +0200 Subject: [PATCH 4/7] feat(cubesql): Allow non-push_to_cube WrappedSelect in grouped subquery position in join --- rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs index 8ec14fe5e0a61..28ff55e6be59c 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs @@ -994,6 +994,14 @@ impl CubeScanWrapperNode { "Unsupported ungrouped CubeScan as join subquery: {join_cube_scan:?}" ))); } + } else if let Some(wrapped_select) = + node.as_any().downcast_ref::() + { + if wrapped_select.push_to_cube { + return Err(CubeError::internal(format!( + "Unsupported push_to_cube WrappedSelect as join subquery: {wrapped_select:?}" + ))); + } } else { // TODO support more grouped cases here return Err(CubeError::internal(format!( From dabbc7f1319cd68b03ef7e9324c3cd2c0e81b260 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Mon, 13 Jan 2025 20:50:11 +0200 Subject: [PATCH 5/7] feat(cubesql): Allow grouped join sides to have different in_projection flag --- .../src/compile/rewrite/rules/wrapper/join.rs | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/join.rs b/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/join.rs index 8bc53d5e66bed..be102182558fb 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/join.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/join.rs @@ -48,7 +48,7 @@ impl WrapperRules { // Rule would place ?left_cube_scan_input to `from` position of WrappedSelect(WrappedSelectPushToCube:true) // So it need to support push-to-Cube "WrapperReplacerContextPushToCube:true", - "?in_projection", + "?left_in_projection", // Going to use this in RHS of rule // RHS of join is grouped, so it shouldn't have any cubes or members "?left_cube_members", @@ -65,7 +65,7 @@ impl WrapperRules { // Going to ignore this "?right_alias_to_cube", "?right_push_to_cube", - "?in_projection", + "?right_in_projection", // Going to ignore this "?right_cube_members", "?right_grouped_subqueries", @@ -80,6 +80,9 @@ impl WrapperRules { "?join_constraint", "JoinNullEqualsNull:false", ), + // RHS is using WrapperReplacerContextInProjection:false because only part + // that should have push down replacer is join condition, and it should only contain dimensions + // Other way of thinking about it: join condition is more like filter than projection cube_scan_wrapper( wrapped_select( "WrappedSelectSelectType:Projection", @@ -88,7 +91,7 @@ impl WrapperRules { wrapper_replacer_context( "?left_alias_to_cube", "WrapperReplacerContextPushToCube:true", - "?in_projection", + "WrapperReplacerContextInProjection:false", "?left_cube_members", "?out_grouped_subqueries", // Can use it, because we've checked that left input allows push-to-Cube, @@ -101,7 +104,7 @@ impl WrapperRules { wrapper_replacer_context( "?left_alias_to_cube", "WrapperReplacerContextPushToCube:true", - "?in_projection", + "WrapperReplacerContextInProjection:false", "?left_cube_members", "?out_grouped_subqueries", "WrapperReplacerContextUngroupedScan:true", @@ -112,7 +115,7 @@ impl WrapperRules { wrapper_replacer_context( "?left_alias_to_cube", "WrapperReplacerContextPushToCube:true", - "?in_projection", + "WrapperReplacerContextInProjection:false", "?left_cube_members", "?out_grouped_subqueries", "WrapperReplacerContextUngroupedScan:true", @@ -123,7 +126,7 @@ impl WrapperRules { wrapper_replacer_context( "?left_alias_to_cube", "WrapperReplacerContextPushToCube:true", - "?in_projection", + "WrapperReplacerContextInProjection:false", "?left_cube_members", "?out_grouped_subqueries", "WrapperReplacerContextUngroupedScan:true", @@ -134,7 +137,7 @@ impl WrapperRules { wrapper_replacer_context( "?left_alias_to_cube", "WrapperReplacerContextPushToCube:true", - "?in_projection", + "WrapperReplacerContextInProjection:false", "?left_cube_members", "?out_grouped_subqueries", "WrapperReplacerContextUngroupedScan:true", @@ -147,7 +150,7 @@ impl WrapperRules { wrapper_replacer_context( "?left_alias_to_cube", "WrapperReplacerContextPushToCube:true", - "?in_projection", + "WrapperReplacerContextInProjection:false", "?left_cube_members", "?out_grouped_subqueries", "WrapperReplacerContextUngroupedScan:true", @@ -161,7 +164,7 @@ impl WrapperRules { wrapper_replacer_context( "?left_alias_to_cube", "WrapperReplacerContextPushToCube:true", - "?in_projection", + "WrapperReplacerContextInProjection:false", "?left_cube_members", "?out_grouped_subqueries", "WrapperReplacerContextUngroupedScan:true", @@ -175,7 +178,7 @@ impl WrapperRules { // On other: RHS is grouped, so any column is just a column // Right now, it is relying on grouped_subqueries + PushToCube:true, to allow both dimensions and grouped columns "WrapperReplacerContextPushToCube:true", - "?in_projection", + "WrapperReplacerContextInProjection:false", "?left_cube_members", "?out_grouped_subqueries", "WrapperReplacerContextUngroupedScan:true", @@ -189,7 +192,7 @@ impl WrapperRules { wrapper_replacer_context( "?left_alias_to_cube", "WrapperReplacerContextPushToCube:true", - "?in_projection", + "WrapperReplacerContextInProjection:false", "?left_cube_members", "?out_grouped_subqueries", "WrapperReplacerContextUngroupedScan:true", @@ -201,7 +204,7 @@ impl WrapperRules { wrapper_replacer_context( "?left_alias_to_cube", "WrapperReplacerContextPushToCube:true", - "?in_projection", + "WrapperReplacerContextInProjection:false", "?left_cube_members", "?out_grouped_subqueries", "WrapperReplacerContextUngroupedScan:true", @@ -215,7 +218,7 @@ impl WrapperRules { wrapper_replacer_context( "?left_alias_to_cube", "WrapperReplacerContextPushToCube:true", - "?in_projection", + "WrapperReplacerContextInProjection:false", "?left_cube_members", "?out_grouped_subqueries", "WrapperReplacerContextUngroupedScan:true", From da2c7471e55dbb96c1cc95d14eaf506b9d85dca0 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Thu, 16 Jan 2025 02:06:57 +0200 Subject: [PATCH 6/7] feat(cubesql): Support complex join conditions for grouped joins * Support COALESCE + IS NOT NULL join condition * Support IS NOT DISTINCT join condition * Support expression on top of columns, like CAST --- .../cubesql/src/compile/rewrite/cost.rs | 3 + .../cubesql/src/compile/rewrite/mod.rs | 25 + .../src/compile/rewrite/rules/wrapper/join.rs | 536 +++++++++++++++++- .../compile/test/test_cube_join_grouped.rs | 304 ++++++++++ 4 files changed, 857 insertions(+), 11 deletions(-) diff --git a/rust/cubesql/cubesql/src/compile/rewrite/cost.rs b/rust/cubesql/cubesql/src/compile/rewrite/cost.rs index 55fc18125026c..b012588c370c0 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/cost.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/cost.rs @@ -122,6 +122,9 @@ impl BestCubePlan { LogicalPlanLanguage::ProjectionSplitPushDownReplacer(_) => 1, LogicalPlanLanguage::ProjectionSplitPullUpReplacer(_) => 1, LogicalPlanLanguage::QueryParam(_) => 1, + LogicalPlanLanguage::JoinCheckStage(_) => 1, + LogicalPlanLanguage::JoinCheckPushDown(_) => 1, + LogicalPlanLanguage::JoinCheckPullUp(_) => 1, // Not really replacers but those should be deemed as mandatory rewrites and as soon as // there's always rewrite rule it's fine to have replacer cost. // Needs to be added as alias rewrite always more expensive than original function. diff --git a/rust/cubesql/cubesql/src/compile/rewrite/mod.rs b/rust/cubesql/cubesql/src/compile/rewrite/mod.rs index cffb58f07151a..33800d168b710 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/mod.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/mod.rs @@ -514,6 +514,19 @@ crate::plan_to_language! { QueryParam { index: usize, }, + JoinCheckStage { + expr: Arc, + }, + JoinCheckPushDown { + expr: Arc, + left_input: Arc, + right_input: Arc, + }, + JoinCheckPullUp { + expr: Arc, + left_input: Arc, + right_input: Arc, + }, } } @@ -2152,6 +2165,18 @@ fn distinct(input: impl Display) -> String { format!("(Distinct {})", input) } +fn join_check_stage(expr: impl Display) -> String { + format!("(JoinCheckStage {expr})") +} + +fn join_check_push_down(expr: impl Display, left: impl Display, right: impl Display) -> String { + format!("(JoinCheckPushDown {expr} {left} {right})") +} + +fn join_check_pull_up(expr: impl Display, left: impl Display, right: impl Display) -> String { + format!("(JoinCheckPullUp {expr} {left} {right})") +} + pub fn original_expr_name(egraph: &CubeEGraph, id: Id) -> Option { egraph[id] .data diff --git a/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/join.rs b/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/join.rs index be102182558fb..1332139a72c58 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/join.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/join.rs @@ -1,21 +1,26 @@ use crate::{ compile::rewrite::{ - cube_scan_wrapper, join, rewrite, rewriter::CubeRewrite, rules::wrapper::WrapperRules, - transforming_rewrite, wrapped_select, wrapped_select_aggr_expr_empty_tail, - wrapped_select_filter_expr_empty_tail, wrapped_select_group_expr_empty_tail, - wrapped_select_having_expr_empty_tail, wrapped_select_join, wrapped_select_joins, - wrapped_select_joins_empty_tail, wrapped_select_order_expr_empty_tail, - wrapped_select_projection_expr_empty_tail, wrapped_select_subqueries_empty_tail, - wrapped_select_window_expr_empty_tail, wrapper_pullup_replacer, wrapper_pushdown_replacer, - wrapper_replacer_context, BinaryExprOp, ColumnExprColumn, CubeEGraph, JoinLeftOn, - JoinRightOn, LogicalPlanLanguage, WrappedSelectJoinJoinType, + analysis::Member, binary_expr, cross_join, cube_scan_wrapper, filter, fun_expr, + is_not_null_expr, join, join_check_pull_up, join_check_push_down, join_check_stage, + rewrite, rewriter::CubeRewrite, rules::wrapper::WrapperRules, transforming_rewrite, + wrapped_select, wrapped_select_aggr_expr_empty_tail, wrapped_select_filter_expr_empty_tail, + wrapped_select_group_expr_empty_tail, wrapped_select_having_expr_empty_tail, + wrapped_select_join, wrapped_select_joins, wrapped_select_joins_empty_tail, + wrapped_select_order_expr_empty_tail, wrapped_select_projection_expr_empty_tail, + wrapped_select_subqueries_empty_tail, wrapped_select_window_expr_empty_tail, + wrapper_pullup_replacer, wrapper_pushdown_replacer, wrapper_replacer_context, BinaryExprOp, + ColumnExprColumn, CubeEGraph, JoinLeftOn, JoinRightOn, LogicalPlanLanguage, + WrappedSelectJoinJoinType, WrapperReplacerContextAliasToCube, WrapperReplacerContextGroupedSubqueries, }, var, var_iter, var_list_iter, }; -use crate::compile::rewrite::analysis::Member; -use datafusion::{logical_expr::Operator, logical_plan::Column}; +use datafusion::{ + logical_expr::{Expr, Operator}, + logical_plan::Column, + prelude::JoinType, +}; use egg::{Id, Subst}; impl WrapperRules { @@ -247,6 +252,371 @@ impl WrapperRules { ), ]); + // DataFusion plans complex join conditions as Filter(?join_condition, CrossJoin(...)) + // Handling each and every condition in here is not that easy, so for now + // it just handles several special cases of conditions actually generated by BI tools + // Each condition is defined for a single pair of joined columns, like a special equals operator + // Join condition can join on multiple columns, and per-column conditions will be joined with AND + // Because AND is binary, we can have arbitrary binary tree, with single column condition in leaves + // To process outer ANDs join_check_stage in introduced: + // 1. Push down over ANDs + // 2. Turn push down to pull up on proper condition for a single column + // 3. Pull up results over ANDs + // 4. Start regular wrapper replacer for join expression + // Each side in single column condition should contain single reference to column + // But it can contain other expressions. Most notably, it can contain CAST(column AS TEXT) + // referenced_expr analysis is used to pick up column references during check + // Different sides of single expression should reference different sides of CROSS JOIN, but + // it's tricky to do without a proper name resolution, so for now it handles only qualified column expressions + + rules.extend([ + rewrite( + "wrapper-push-down-ungrouped-join-grouped-start-condition-check", + filter( + "?filter_expr", + cross_join( + cube_scan_wrapper("?left", "CubeScanWrapperFinalized:false"), + cube_scan_wrapper("?right", "CubeScanWrapperFinalized:false"), + ), + ), + join_check_stage(join_check_push_down( + "?filter_expr", + cube_scan_wrapper("?left", "CubeScanWrapperFinalized:false"), + cube_scan_wrapper("?right", "CubeScanWrapperFinalized:false"), + )), + ), + rewrite( + "ungrouped-join-grouped-condition-check-pushdown-and", + join_check_push_down( + binary_expr("?left_expr", "AND", "?right_expr"), + "?left_input", + "?right_input", + ), + binary_expr( + join_check_push_down("?left_expr", "?left_input", "?right_input"), + "AND", + join_check_push_down("?right_expr", "?left_input", "?right_input"), + ), + ), + rewrite( + "ungrouped-join-grouped-condition-check-pull-up-and", + binary_expr( + join_check_pull_up("?left_expr", "?left_input", "?right_input"), + "AND", + join_check_pull_up("?right_expr", "?left_input", "?right_input"), + ), + join_check_pull_up( + binary_expr("?left_expr", "AND", "?right_expr"), + "?left_input", + "?right_input", + ), + ), + transforming_rewrite( + "wrapper-push-down-ungrouped-join-grouped-finish-condition-check", + join_check_stage(join_check_pull_up( + "?join_expr", + cube_scan_wrapper( + wrapper_pullup_replacer( + "?left_cube_scan_input", + wrapper_replacer_context( + // Going to use this in RHS of rule + // RHS of join is grouped, so it shouldn't have any cubes or members + "?left_alias_to_cube", + // This check is important + // Rule would place ?left_cube_scan_input to `from` position of WrappedSelect(WrappedSelectPushToCube:true) + // So it need to support push-to-Cube + "WrapperReplacerContextPushToCube:true", + "?left_in_projection", + // Going to use this in RHS of rule + // RHS of join is grouped, so it shouldn't have any cubes or members + "?left_cube_members", + "?left_grouped_subqueries", + "?left_ungrouped_scan", + ), + ), + "CubeScanWrapperFinalized:false", + ), + cube_scan_wrapper( + wrapper_pullup_replacer( + "?right_input", + wrapper_replacer_context( + // Going to ignore this in RHS + "?right_alias_to_cube", + "?right_push_to_cube", + "?right_in_projection", + // Going to ignore this + "?right_cube_members", + "?right_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:false", + ), + ), + "CubeScanWrapperFinalized:false", + ), + )), + cube_scan_wrapper( + wrapped_select( + "WrappedSelectSelectType:Projection", + wrapper_pullup_replacer( + wrapped_select_projection_expr_empty_tail(), + wrapper_replacer_context( + "?left_alias_to_cube", + "WrapperReplacerContextPushToCube:true", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:true", + ), + ), + wrapper_pullup_replacer( + wrapped_select_subqueries_empty_tail(), + wrapper_replacer_context( + "?left_alias_to_cube", + "WrapperReplacerContextPushToCube:true", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:true", + ), + ), + wrapper_pullup_replacer( + wrapped_select_group_expr_empty_tail(), + wrapper_replacer_context( + "?left_alias_to_cube", + "WrapperReplacerContextPushToCube:true", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:true", + ), + ), + wrapper_pullup_replacer( + wrapped_select_aggr_expr_empty_tail(), + wrapper_replacer_context( + "?left_alias_to_cube", + "WrapperReplacerContextPushToCube:true", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:true", + ), + ), + wrapper_pullup_replacer( + wrapped_select_window_expr_empty_tail(), + wrapper_replacer_context( + "?left_alias_to_cube", + "WrapperReplacerContextPushToCube:true", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:true", + ), + ), + wrapper_pullup_replacer( + // Can move left_cube_scan_input here without checking if it's actually CubeScan + // Check for WrapperReplacerContextPushToCube:true should be enough + "?left_cube_scan_input", + wrapper_replacer_context( + "?left_alias_to_cube", + "WrapperReplacerContextPushToCube:true", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:true", + ), + ), + // We don't want to use list rules here, because ?right_input is already done + wrapped_select_joins( + wrapped_select_join( + wrapper_pullup_replacer( + "?right_input", + wrapper_replacer_context( + "?left_alias_to_cube", + "WrapperReplacerContextPushToCube:true", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:true", + ), + ), + wrapper_pushdown_replacer( + "?join_expr", + wrapper_replacer_context( + "?left_alias_to_cube", + // On one hand, this should be PushToCube:true, so we would only join on dimensions + // On other: RHS is grouped, so any column is just a column + // Right now, it is relying on grouped_subqueries + PushToCube:true, to allow both dimensions and grouped columns + "WrapperReplacerContextPushToCube:true", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:true", + ), + ), + "?out_join_type", + ), + // pullup(tail) just so it could be easily picked up by pullup rules + wrapper_pullup_replacer( + wrapped_select_joins_empty_tail(), + wrapper_replacer_context( + "?left_alias_to_cube", + "WrapperReplacerContextPushToCube:true", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:true", + ), + ), + ), + wrapper_pullup_replacer( + wrapped_select_filter_expr_empty_tail(), + wrapper_replacer_context( + "?left_alias_to_cube", + "WrapperReplacerContextPushToCube:true", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:true", + ), + ), + wrapped_select_having_expr_empty_tail(), + "WrappedSelectLimit:None", + "WrappedSelectOffset:None", + wrapper_pullup_replacer( + wrapped_select_order_expr_empty_tail(), + wrapper_replacer_context( + "?left_alias_to_cube", + "WrapperReplacerContextPushToCube:true", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:true", + ), + ), + "WrappedSelectAlias:None", + "WrappedSelectDistinct:false", + // left input has WrapperReplacerContextPushToCube:true + // Meaning that left input itself is ungrouped CubeScan + // Keep it in result, rely on pull-up rules to drop it, and on flattening rules to pick it up + "WrappedSelectPushToCube:true", + // left input is WrapperReplacerContextPushToCube:true, so result must be ungrouped + "WrappedSelectUngroupedScan:true", + ), + "CubeScanWrapperFinalized:false", + ), + self.transform_ungrouped_join_grouped_after_check( + "?right_alias_to_cube", + "?out_join_type", + "?out_grouped_subqueries", + ), + ), + ]); + + let complex_join_conditions = [ + // This variant is necessary to allow rewrites when join condition is something like this: + // CAST(left AS TEXT) = right + // DF will plan those as Filter(CrossJoin) as well, but joining operator is just `=` + ("equal", binary_expr("?left_expr", "=", "?right_expr")), + ( + "coalesce", + Self::coalesce_join_condition("?left_expr", "?right_expr", "?coalesce_value"), + ), + ( + "distinct", + Self::distinct_join_condition("?left_expr", "?right_expr"), + ), + ]; + + for (name, pattern) in complex_join_conditions { + rules.push(transforming_rewrite( + &format!("ungrouped-join-grouped-condition-check-condition-{name}"), + join_check_push_down( + &pattern, + cube_scan_wrapper( + wrapper_pullup_replacer( + "?left_cube_scan_input", + wrapper_replacer_context( + // Going to use this in RHS of rule + // RHS of join is grouped, so it shouldn't have any cubes or members + "?left_alias_to_cube", + // This check is important + // Rule would place ?left_cube_scan_input to `from` position of WrappedSelect(WrappedSelectPushToCube:true) + // So it need to support push-to-Cube + "WrapperReplacerContextPushToCube:true", + "?left_in_projection", + // Going to use this in RHS of rule + // RHS of join is grouped, so it shouldn't have any cubes or members + "?left_cube_members", + "?left_grouped_subqueries", + "?left_ungrouped_scan", + ), + ), + "CubeScanWrapperFinalized:false", + ), + cube_scan_wrapper( + wrapper_pullup_replacer( + "?right_input", + wrapper_replacer_context( + // Going to ignore this + "?right_alias_to_cube", + "?right_push_to_cube", + "?right_in_projection", + // Going to ignore this + "?right_cube_members", + "?right_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:false", + ), + ), + "CubeScanWrapperFinalized:false", + ), + ), + join_check_pull_up( + &pattern, + cube_scan_wrapper( + wrapper_pullup_replacer( + "?left_cube_scan_input", + wrapper_replacer_context( + // Going to use this in RHS of rule + // RHS of join is grouped, so it shouldn't have any cubes or members + "?left_alias_to_cube", + // This check is important + // Rule would place ?left_cube_scan_input to `from` position of WrappedSelect(WrappedSelectPushToCube:true) + // So it need to support push-to-Cube + "WrapperReplacerContextPushToCube:true", + "?left_in_projection", + // Going to use this in RHS of rule + // RHS of join is grouped, so it shouldn't have any cubes or members + "?left_cube_members", + "?left_grouped_subqueries", + "?left_ungrouped_scan", + ), + ), + "CubeScanWrapperFinalized:false", + ), + cube_scan_wrapper( + wrapper_pullup_replacer( + "?right_input", + wrapper_replacer_context( + // Going to ignore this + "?right_alias_to_cube", + "?right_push_to_cube", + "?right_in_projection", + // Going to ignore this + "?right_cube_members", + "?right_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:false", + ), + ), + "CubeScanWrapperFinalized:false", + ), + ), + self.transform_ungrouped_join_grouped_check_condition( + "?left_cube_members", + "?left_expr", + "?right_expr", + ), + )); + } + // TODO only pullup is necessary here Self::list_pushdown_pullup_rules( rules, @@ -256,6 +626,28 @@ impl WrapperRules { ); } + // JOIN ... ON (coalesce(left.foo, '') = coalesce(right.foo, '')) and ((left.foo is not null) = (right.foo is not null)) + fn coalesce_join_condition(left_expr: &str, right_expr: &str, coalesce_value: &str) -> String { + binary_expr( + binary_expr( + fun_expr("Coalesce", vec![left_expr, coalesce_value], true), + "=", + fun_expr("Coalesce", vec![right_expr, coalesce_value], true), + ), + "AND", + binary_expr( + is_not_null_expr(left_expr), + "=", + is_not_null_expr(right_expr), + ), + ) + } + + // JOIN ... ON left.foo IS NOT DISTINCT FROM right.foo + fn distinct_join_condition(left_expr: &str, right_expr: &str) -> String { + binary_expr(left_expr, "IS_NOT_DISTINCT_FROM", right_expr) + } + fn are_join_members_supported<'egraph, 'columns>( egraph: &'egraph mut CubeEGraph, members: Id, @@ -418,4 +810,126 @@ impl WrapperRules { return false; } } + + fn transform_ungrouped_join_grouped_check_condition( + &self, + left_members_var: &'static str, + left_expr_var: &'static str, + right_expr_var: &'static str, + ) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool { + let left_members_var = var!(left_members_var); + let left_expr_var = var!(left_expr_var); + + let right_expr_var = var!(right_expr_var); + + // Only left is allowed to be ungrouped query, so right would be a subquery join for left ungrouped CubeScan + // It means we don't care about just a "single cube" in LHS, and there's essentially no cubes by this moment in RHS + + move |egraph, subst| { + // We are going to generate join with grouped subquery + // TODO Do we have to check stuff like `transform_check_subquery_allowed` is checking: + // * Both inputs depend on a single data source + // * SQL generator for that data source have `expressions/subquery` template + // It could be checked later, in WrappedSelect as well + + let left_columns = egraph[subst[left_expr_var]].data.referenced_expr.as_ref(); + let Some(left_columns) = left_columns else { + return false; + }; + if left_columns.len() != 1 { + return false; + } + let left_column = &left_columns[0]; + + let right_columns = egraph[subst[right_expr_var]].data.referenced_expr.as_ref(); + let Some(right_columns) = right_columns else { + return false; + }; + if right_columns.len() != 1 { + return false; + } + let right_column = &right_columns[0]; + + let left_column = match left_column { + Expr::Column(column) => column, + _ => return false, + }; + let right_column = match right_column { + Expr::Column(column) => column, + _ => return false, + }; + + // Simple check that column expressions reference different join sides + let Some(left_relation) = left_column.relation.as_ref() else { + return false; + }; + let Some(right_relation) = right_column.relation.as_ref() else { + return false; + }; + if left_relation == right_relation { + return false; + } + + let left_column = left_column.clone(); + + // Don't check right, as it is already grouped + + if !Self::are_join_members_supported(egraph, subst[left_members_var], [&left_column]) { + return false; + } + + // TODO check that right column is coming from right crossjoin input + + return true; + } + } + + fn transform_ungrouped_join_grouped_after_check( + &self, + right_alias_to_cube_var: &'static str, + out_join_type_var: &'static str, + out_grouped_subqueries_var: &'static str, + ) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool { + let right_alias_to_cube_var = var!(right_alias_to_cube_var); + let out_join_type_var = var!(out_join_type_var); + let out_grouped_subqueries_var = var!(out_grouped_subqueries_var); + + move |egraph, subst| { + for right_alias_to_cube in var_iter!( + egraph[subst[right_alias_to_cube_var]], + WrapperReplacerContextAliasToCube + ) { + if right_alias_to_cube.len() != 1 { + return false; + } + + let right_alias = &right_alias_to_cube[0].0; + // LHS is ungrouped, RHS is grouped + // Don't pass ungrouped queries from below, their qualifiers should not be accessible during join condition rewrite + let out_grouped_subqueries = vec![right_alias.clone()]; + + // TODO why fixed to inner? Check how left join in input is planned + let out_join_type = JoinType::Inner; + + subst.insert( + out_join_type_var, + egraph.add(LogicalPlanLanguage::WrappedSelectJoinJoinType( + WrappedSelectJoinJoinType(out_join_type), + )), + ); + subst.insert( + out_grouped_subqueries_var, + egraph.add( + LogicalPlanLanguage::WrapperReplacerContextGroupedSubqueries( + WrapperReplacerContextGroupedSubqueries(out_grouped_subqueries), + ), + ), + ); + + return true; + } + + return false; + } + } } diff --git a/rust/cubesql/cubesql/src/compile/test/test_cube_join_grouped.rs b/rust/cubesql/cubesql/src/compile/test/test_cube_join_grouped.rs index 7d363e58168f0..8595774977059 100644 --- a/rust/cubesql/cubesql/src/compile/test/test_cube_join_grouped.rs +++ b/rust/cubesql/cubesql/src/compile/test/test_cube_join_grouped.rs @@ -462,3 +462,307 @@ LIMIT 1000 // Outer limit assert!(wrapped_sql_node.wrapped_sql.sql.contains("LIMIT 1000")); } + +#[tokio::test] +async fn test_tableau_topk() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let query_plan = convert_select_to_query_plan( + // language=PostgreSQL + r#" +SELECT + COUNT(DISTINCT "MultiTypeCube"."countDistinct") AS "measure_count_distinct", + CAST("MultiTypeCube"."dim_str0" AS TEXT) AS "dim_str0" +FROM "MultiTypeCube" +INNER JOIN ( + SELECT + CAST("MultiTypeCube"."dim_str0" AS TEXT) AS "dim_str0", + COUNT(DISTINCT "MultiTypeCube"."countDistinct") AS "$__alias__0" + FROM "MultiTypeCube" + WHERE + ( + CAST("MultiTypeCube"."dim_str0" AS TEXT) IN + ( + 'foo', + 'bar', + 'baz' + ) + ) + GROUP BY + 1 + ORDER BY + 2 DESC NULLS LAST, + 1 ASC NULLS FIRST + LIMIT 15 +) "t0" +ON ( + CAST("MultiTypeCube"."dim_str0" AS TEXT) = "t0"."dim_str0" +) +WHERE + ( + CAST("MultiTypeCube"."dim_str1" AS TEXT) = 'active' + ) +GROUP BY + 2 +; + "# + .to_string(), + DatabaseProtocol::PostgreSQL, + ) + .await; + + let physical_plan = query_plan.as_physical_plan().await.unwrap(); + println!( + "Physical plan: {}", + displayable(physical_plan.as_ref()).indent() + ); + + let wrapped_sql_node = query_plan.as_logical_plan().find_cube_scan_wrapped_sql(); + + assert_eq!(wrapped_sql_node.request.ungrouped, None); + + assert_eq!( + wrapped_sql_node + .request + .subquery_joins + .as_ref() + .unwrap() + .len(), + 1 + ); + + let subquery = &wrapped_sql_node.request.subquery_joins.unwrap()[0]; + + assert!(!subquery.sql.contains("ungrouped")); + // Inner order + let re = + Regex::new(r#"ORDER BY "MultiTypeCube"\.".+" DESC, "MultiTypeCube"\.".+" ASC NULLS FIRST"#) + .unwrap(); + assert!(re.is_match(&subquery.sql)); + assert!(subquery.sql.contains(r#"LIMIT 15"#)); + assert_eq!(subquery.join_type, "INNER"); + assert!(subquery + .on + .contains(r#"(CAST(${MultiTypeCube.dim_str0} AS STRING) = \"t0\".\"dim_str0\")"#)); + + // Outer filter + assert_eq!(wrapped_sql_node.request.segments.as_ref().unwrap().len(), 1); + assert!(wrapped_sql_node + .wrapped_sql + .sql + .contains(r#"\"expr\":\"(CAST(${MultiTypeCube.dim_str1} AS STRING) = $1)\""#)); + + // Dimension from top aggregation + + assert_eq!( + wrapped_sql_node.request.dimensions.as_ref().unwrap().len(), + 1 + ); + assert!(wrapped_sql_node + .wrapped_sql + .sql + .contains(r#"\"expr\":\"CAST(${MultiTypeCube.dim_str0} AS STRING)\""#)); + + // Measure from top aggregation + assert_eq!(wrapped_sql_node.request.measures.as_ref().unwrap().len(), 1); + assert!(wrapped_sql_node + .wrapped_sql + .sql + .contains(r#"\"expr\":\"${MultiTypeCube.countDistinct}\""#)); +} + +/// Ungrouped-grouped join with complex condition should plan as push-to-Cube query +/// with all the complex expressions inside join condition +#[tokio::test] +async fn test_join_ungrouped_with_grouped_on_complex_condition() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let query_plan = convert_select_to_query_plan( + // language=PostgreSQL + r#" +SELECT + MultiTypeCube.dim_str0, + MultiTypeCube.dim_str1, + MultiTypeCube.dim_str2, + AVG(MultiTypeCube.avgPrice) AS price +FROM + MultiTypeCube +INNER JOIN ( + SELECT + dim_str0, + dim_str1, + dim_str2 + FROM + MultiTypeCube + GROUP BY 1, 2, 3 + LIMIT 10 +) grouped +ON ( + (CAST(MultiTypeCube.dim_str0 AS TEXT) = grouped.dim_str0) + AND + (MultiTypeCube.dim_str1 IS NOT DISTINCT FROM grouped.dim_str1) + AND + ( + (COALESCE(MultiTypeCube.dim_str2, '') = COALESCE(grouped.dim_str2, '')) AND + ((MultiTypeCube.dim_str2 IS NOT NULL) = (grouped.dim_str2 IS NOT NULL)) + ) +) +GROUP BY + 1, + 2, + 3 +; + "# + .to_string(), + DatabaseProtocol::PostgreSQL, + ) + .await; + + let physical_plan = query_plan.as_physical_plan().await.unwrap(); + println!( + "Physical plan: {}", + displayable(physical_plan.as_ref()).indent() + ); + + let request = query_plan + .as_logical_plan() + .find_cube_scan_wrapped_sql() + .request; + + assert_eq!(request.ungrouped, None); + + assert_eq!(request.subquery_joins.as_ref().unwrap().len(), 1); + + let subquery = &request.subquery_joins.unwrap()[0]; + + dbg!(&subquery.on); + + assert!(!subquery.sql.contains("ungrouped")); + assert_eq!(subquery.join_type, "INNER"); + assert!(subquery + .on + .contains(r#"CAST(${MultiTypeCube.dim_str0} AS STRING) = \"grouped\".\"dim_str0\""#)); + assert!(subquery + .on + .contains(r#"${MultiTypeCube.dim_str1} IS NOT DISTINCT FROM \"grouped\".\"dim_str1\""#)); + assert!(subquery.on.contains( + r#"COALESCE(${MultiTypeCube.dim_str2}, $0$) = COALESCE(\"grouped\".\"dim_str2\", $0$)"# + )); + assert!(subquery.on.contains( + r#"(${MultiTypeCube.dim_str2} IS NOT NULL) = (\"grouped\".\"dim_str2\" IS NOT NULL)"# + )); + + // Measure from top aggregation + assert!(query_plan + .as_logical_plan() + .find_cube_scan_wrapped_sql() + .wrapped_sql + .sql + .contains(r#"\"expr\":\"${MultiTypeCube.avgPrice}\""#)); + // Dimension from ungrouped side + assert!(query_plan + .as_logical_plan() + .find_cube_scan_wrapped_sql() + .wrapped_sql + .sql + .contains(r#"\"expr\":\"${MultiTypeCube.dim_str0}\""#)); +} + +#[tokio::test] +async fn test_tableau_topk_2() { + init_testing_logger(); + + let query_plan = convert_select_to_query_plan( + // language=PostgreSQL + r#" +SELECT + KibanaSampleDataEcommerce.customer_gender, + SUM(KibanaSampleDataEcommerce."sumPrice") AS "sum:int2:ok" +FROM KibanaSampleDataEcommerce +INNER JOIN ( + SELECT + KibanaSampleDataEcommerce.customer_gender AS customer_gender, + SUM(KibanaSampleDataEcommerce."sumPrice") AS "$__alias__0" + FROM KibanaSampleDataEcommerce + GROUP BY 1 + ORDER BY 2 DESC NULLS LAST + LIMIT 2 +) "t0" +ON (KibanaSampleDataEcommerce.customer_gender = "t0".customer_gender) +WHERE + (((KibanaSampleDataEcommerce."notes" >= 'foo1') AND (KibanaSampleDataEcommerce."notes" <= 'foo2')) OR + ((KibanaSampleDataEcommerce."notes" >= 'bar1') AND (KibanaSampleDataEcommerce."notes" <= 'bar2'))) +GROUP BY + 1 + "# + .to_string(), + DatabaseProtocol::PostgreSQL, + ) + .await; + + let physical_plan = query_plan.as_physical_plan().await.unwrap(); + println!( + "Physical plan: {}", + displayable(physical_plan.as_ref()).indent() + ); + + let wrapped_sql_node = query_plan.as_logical_plan().find_cube_scan_wrapped_sql(); + + assert_eq!(wrapped_sql_node.request.ungrouped, None); + + assert_eq!( + wrapped_sql_node + .request + .subquery_joins + .as_ref() + .unwrap() + .len(), + 1 + ); + + let subquery = &wrapped_sql_node.request.subquery_joins.unwrap()[0]; + + assert!(!subquery.sql.contains("ungrouped")); + // Inner order + let re = Regex::new( + r#""order":\s*\[\s*\[\s*"KibanaSampleDataEcommerce.sumPrice",\s*"desc"\s*\]\s*\]"#, + ) + .unwrap(); + assert!(re.is_match(&subquery.sql)); + assert!(subquery.sql.contains(r#""limit": 2"#)); + assert_eq!(subquery.join_type, "INNER"); + + assert!(subquery.on.contains( + r#"(${KibanaSampleDataEcommerce.customer_gender} = \"t0\".\"customer_gender\")"# + )); + + // Outer filter + assert_eq!(wrapped_sql_node.request.segments.as_ref().unwrap().len(), 1); + assert!(wrapped_sql_node + .wrapped_sql + .sql + .contains(r#"\"expr\":\"(((${KibanaSampleDataEcommerce.notes} >= $1) AND (${KibanaSampleDataEcommerce.notes} <= $2)) OR ((${KibanaSampleDataEcommerce.notes} >= $3) AND (${KibanaSampleDataEcommerce.notes} <= $4)))\""#)); + + // Dimension from top aggregation + assert_eq!( + wrapped_sql_node.request.dimensions.as_ref().unwrap().len(), + 1 + ); + assert!(wrapped_sql_node + .wrapped_sql + .sql + .contains(r#"\"expr\":\"${KibanaSampleDataEcommerce.customer_gender}\""#)); + + // Measure from top aggregation + assert_eq!(wrapped_sql_node.request.measures.as_ref().unwrap().len(), 1); + assert!(wrapped_sql_node + .wrapped_sql + .sql + .contains(r#"\"expr\":\"${KibanaSampleDataEcommerce.sumPrice}\""#)); +} From db43682d7bd5c58fa6cb98394664c6b35c7d9c8a Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Thu, 19 Dec 2024 00:14:56 +0200 Subject: [PATCH 7/7] test: Add COALESCE + IS NOT NULL join pushdown smoke test --- .../__snapshots__/smoke-cubesql.test.ts.snap | 13 ++++++++ .../cubejs-testing/test/smoke-cubesql.test.ts | 30 +++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap b/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap index fe01e2340efa8..969d2d139e00c 100644 --- a/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap +++ b/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap @@ -70,6 +70,19 @@ Array [ ] `; +exports[`SQL API Postgres (Data) join with grouped query on coalesce: join grouped on coalesce 1`] = ` +Array [ + Object { + "count": "2", + "status": "processed", + }, + Object { + "count": "1", + "status": "shipped", + }, +] +`; + exports[`SQL API Postgres (Data) join with grouped query: join grouped 1`] = ` Array [ Object { diff --git a/packages/cubejs-testing/test/smoke-cubesql.test.ts b/packages/cubejs-testing/test/smoke-cubesql.test.ts index 0d1595956bdf0..514686806c444 100644 --- a/packages/cubejs-testing/test/smoke-cubesql.test.ts +++ b/packages/cubejs-testing/test/smoke-cubesql.test.ts @@ -535,6 +535,36 @@ filter_subq AS ( expect(res.rows).toMatchSnapshot('join grouped with filter'); }); + test('join with grouped query on coalesce', async () => { + const query = ` + SELECT + "Orders".status AS status, + COUNT(*) AS count + FROM + "Orders" + INNER JOIN + ( + SELECT + status, + SUM(totalAmount) + FROM + "Orders" + GROUP BY 1 + ORDER BY 2 DESC + LIMIT 2 + ) top_orders + ON + (COALESCE("Orders".status, '') = COALESCE(top_orders.status, '')) AND + (("Orders".status IS NOT NULL) = (top_orders.status IS NOT NULL)) + GROUP BY 1 + ORDER BY 1 + `; + + const res = await connection.query(query); + // Expect only top statuses 2 by total amount: processed and shipped + expect(res.rows).toMatchSnapshot('join grouped on coalesce'); + }); + test('where segment is false', async () => { const query = 'SELECT value AS val, * FROM "SegmentTest" WHERE segment_eq_1 IS FALSE ORDER BY value;';