Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cubesql): Support complex join conditions for grouped joins #9157

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -3327,7 +3327,7 @@ export class BaseQuery {
column_aliased: '{{expr}} {{quoted_alias}}',
query_aliased: '{{ query }} AS {{ quoted_alias }}',
case: 'CASE{% if expr %} {{ expr }}{% endif %}{% for when, then in when_then %} WHEN {{ when }} THEN {{ then }}{% endfor %}{% if else_expr %} ELSE {{ else_expr }}{% endif %} END',
is_null: '{{ expr }} IS {% if negate %}NOT {% endif %}NULL',
is_null: '({{ expr }} IS {% if negate %}NOT {% endif %}NULL)',
binary: '({{ left }} {{ op }} {{ right }})',
sort: '{{ expr }} {% if asc %}ASC{% else %}DESC{% endif %} NULLS {% if nulls_first %}FIRST{% else %}LAST{% endif %}',
order_by: '{% if index %} {{ index }} {% else %} {{ expr }} {% endif %} {% if asc %}ASC{% else %}DESC{% endif %}{% if nulls_first %} NULLS FIRST{% endif %}',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ Array [
]
`;

exports[`SQL API Postgres (Data) join with grouped query on coalesce: join grouped on coalesce 1`] = `
Array [
Object {
"count": "2",
"status": "processed",
},
Object {
"count": "1",
"status": "shipped",
},
]
`;

exports[`SQL API Postgres (Data) join with grouped query: join grouped 1`] = `
Array [
Object {
Expand Down
30 changes: 30 additions & 0 deletions packages/cubejs-testing/test/smoke-cubesql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,36 @@ filter_subq AS (
expect(res.rows).toMatchSnapshot('join grouped with filter');
});

test('join with grouped query on coalesce', async () => {
const query = `
SELECT
"Orders".status AS status,
COUNT(*) AS count
FROM
"Orders"
INNER JOIN
(
SELECT
status,
SUM(totalAmount)
FROM
"Orders"
GROUP BY 1
ORDER BY 2 DESC
LIMIT 2
) top_orders
ON
(COALESCE("Orders".status, '') = COALESCE(top_orders.status, '')) AND
(("Orders".status IS NOT NULL) = (top_orders.status IS NOT NULL))
GROUP BY 1
ORDER BY 1
`;

const res = await connection.query(query);
// Expect only top statuses 2 by total amount: processed and shipped
expect(res.rows).toMatchSnapshot('join grouped on coalesce');
});

test('where segment is false', async () => {
const query =
'SELECT value AS val, * FROM "SegmentTest" WHERE segment_eq_1 IS FALSE ORDER BY value;';
Expand Down
8 changes: 8 additions & 0 deletions rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,14 @@
"Unsupported ungrouped CubeScan as join subquery: {join_cube_scan:?}"
)));
}
} else if let Some(wrapped_select) =
node.as_any().downcast_ref::<WrappedSelectNode>()
{
if wrapped_select.push_to_cube {
return Err(CubeError::internal(format!(
"Unsupported push_to_cube WrappedSelect as join subquery: {wrapped_select:?}"
)));

Check warning on line 1003 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L1001-L1003

Added lines #L1001 - L1003 were not covered by tests
}
} else {
// TODO support more grouped cases here
return Err(CubeError::internal(format!(
Expand Down
17 changes: 12 additions & 5 deletions rust/cubesql/cubesql/src/compile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12188,7 +12188,7 @@ ORDER BY "source"."str0" ASC
}
init_testing_logger();

let logical_plan = convert_select_to_query_plan(
let query_plan = convert_select_to_query_plan(
r#"
WITH "qt_0" AS (
SELECT "ta_1"."customer_gender" "ca_1"
Expand All @@ -12205,13 +12205,20 @@ ORDER BY "source"."str0" ASC
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();
.await;

let physical_plan = query_plan.as_physical_plan().await.unwrap();
println!(
"Physical plan: {}",
displayable(physical_plan.as_ref()).indent()
);

let logical_plan = query_plan.as_logical_plan();

let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql;

// check wrapping for `NOT(.. IS NULL OR LOWER(..) IN)`
let re = Regex::new(r"NOT \(.+ IS NULL OR .*LOWER\(.+ IN ").unwrap();
// check wrapping for `NOT((.. IS NULL) OR LOWER(..) IN)`
let re = Regex::new(r"NOT \(\(.+ IS NULL\) OR .*LOWER\(.+ IN ").unwrap();
assert!(re.is_match(&sql));
}

Expand Down
6 changes: 5 additions & 1 deletion rust/cubesql/cubesql/src/compile/rewrite/cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ impl BestCubePlan {
LogicalPlanLanguage::ProjectionSplitPushDownReplacer(_) => 1,
LogicalPlanLanguage::ProjectionSplitPullUpReplacer(_) => 1,
LogicalPlanLanguage::QueryParam(_) => 1,
LogicalPlanLanguage::JoinCheckStage(_) => 1,
LogicalPlanLanguage::JoinCheckPushDown(_) => 1,
LogicalPlanLanguage::JoinCheckPullUp(_) => 1,
// Not really replacers but those should be deemed as mandatory rewrites and as soon as
// there's always rewrite rule it's fine to have replacer cost.
// Needs to be added as alias rewrite always more expensive than original function.
Expand Down Expand Up @@ -234,6 +237,7 @@ impl BestCubePlan {
/// - `empty_wrappers` > `non_detected_cube_scans` - we don't want empty wrapper to hide non detected cube scan errors
/// - `non_detected_cube_scans` > other nodes - minimize cube scans without members
/// - `filters` > `filter_members` - optimize for push down of filters
/// - `zero_members_wrapper` > `filter_members` - prefer CubeScan(filters) to WrappedSelect(CubeScan(*), filters)
/// - `filter_members` > `cube_members` - optimize for `inDateRange` filter push down to time dimension
/// - `member_errors` > `cube_members` - extra cube members may be required (e.g. CASE)
/// - `member_errors` > `wrapper_nodes` - use SQL push down where possible if cube scan can't be detected
Expand All @@ -259,12 +263,12 @@ pub struct CubePlanCost {
wrapped_select_ungrouped_scan: usize,
filters: i64,
structure_points: i64,
filter_members: i64,
// This is separate from both non_detected_cube_scans and cube_members
// Because it's ok to use all members inside wrapper (so non_detected_cube_scans would be zero)
// And we want to select representation with less members
// But only when members are present!
zero_members_wrapper: i64,
filter_members: i64,
cube_members: i64,
errors: i64,
time_dimensions_used_as_dimensions: i64,
Expand Down
25 changes: 25 additions & 0 deletions rust/cubesql/cubesql/src/compile/rewrite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,19 @@ crate::plan_to_language! {
QueryParam {
index: usize,
},
JoinCheckStage {
expr: Arc<Expr>,
},
JoinCheckPushDown {
expr: Arc<Expr>,
left_input: Arc<LogicalPlan>,
right_input: Arc<LogicalPlan>,
},
JoinCheckPullUp {
expr: Arc<Expr>,
left_input: Arc<LogicalPlan>,
right_input: Arc<LogicalPlan>,
},
}
}

Expand Down Expand Up @@ -2152,6 +2165,18 @@ fn distinct(input: impl Display) -> String {
format!("(Distinct {})", input)
}

fn join_check_stage(expr: impl Display) -> String {
format!("(JoinCheckStage {expr})")
}

fn join_check_push_down(expr: impl Display, left: impl Display, right: impl Display) -> String {
format!("(JoinCheckPushDown {expr} {left} {right})")
}

fn join_check_pull_up(expr: impl Display, left: impl Display, right: impl Display) -> String {
format!("(JoinCheckPullUp {expr} {left} {right})")
}

pub fn original_expr_name(egraph: &CubeEGraph, id: Id) -> Option<String> {
egraph[id]
.data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,7 @@ impl MemberRules {
}
}

fn replace_alias(
pub fn replace_alias(
alias_to_cube: &Vec<(String, String)>,
projection_alias: &Option<String>,
) -> Vec<(String, String)> {
Expand Down
Loading
Loading