diff --git a/hoptimator-demodb/src/main/java/com/linkedin/hoptimator/demodb/DemoDriver.java b/hoptimator-demodb/src/main/java/com/linkedin/hoptimator/demodb/DemoDriver.java index d622b387..5acccfaf 100644 --- a/hoptimator-demodb/src/main/java/com/linkedin/hoptimator/demodb/DemoDriver.java +++ b/hoptimator-demodb/src/main/java/com/linkedin/hoptimator/demodb/DemoDriver.java @@ -13,7 +13,6 @@ import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.jdbc.Driver; import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.impl.AbstractSchema; /** JDBC driver with fake in-memory data. */ diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java index f3bd594c..e65c7a15 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java @@ -27,7 +27,6 @@ import org.apache.calcite.jdbc.CalcitePrepare; import org.apache.calcite.jdbc.CalciteSchema; -import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeSystem; diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java index 62438603..4b8b60ee 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java @@ -1,5 +1,6 @@ package com.linkedin.hoptimator.util.planner; +import java.util.AbstractList; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -10,16 +11,15 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import com.linkedin.hoptimator.util.DataTypeUtils; - -import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.RelFactories; import org.apache.calcite.rel.rel2sql.RelToSqlConverter; import org.apache.calcite.rel.rel2sql.SqlImplementor; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlBasicTypeNameSpec; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlCollectionTypeNameSpec; @@ -41,7 +41,9 @@ import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.util.SqlShuttle; +import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; import com.google.common.collect.ImmutableList; @@ -303,6 +305,7 @@ private static RelNode dropFields(String schema, RelNode relNode, // TODO: Need a better way to dynamically modify the script implementer based on the schema if (schema.startsWith("VENICE") && !targetFieldNames.contains(field.getName())) { + i++; continue; } if (!field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) { @@ -310,10 +313,50 @@ private static RelNode dropFields(String schema, RelNode relNode, } i++; } - return RelOptUtil.createProject(relNode, cols); + return createForceProject(relNode, cols); } } + static RelNode createForceProject(final RelNode child, final List posList) { + return createForceProject(RelFactories.DEFAULT_PROJECT_FACTORY, child, posList); + } + + // By default, "projectNamed" will try to provide an optimization by not creating a new project if the + // field types are the same. This is not desirable in the insert case as the field names need to match the sink. + // + // Example: + // INSERT INTO `my-store` (`KEY_id`, `stringField`) SELECT * FROM `KAFKA`.`existing-topic-1`; + // Without forced projection this will get optimized to: + // INSERT INTO `my-store` (`KEYFIELD`, `VARCHARFIELD`) SELECT * FROM `KAFKA`.`existing-topic-1`; + // With forced project this will resolve as: + // INSERT INTO `my-store` (`KEY_id`, `stringField`) SELECT `KEYFIELD` AS `KEY_id`, \ + // `VARCHARFIELD` AS `stringField` FROM `KAFKA`.`existing-topic-1`; + // + // This implementation is largely a duplicate of RelOptUtil.createProject(relNode, cols); which does not allow + // overriding the "force" argument of "projectNamed". + static RelNode createForceProject(final RelFactories.ProjectFactory factory, + final RelNode child, final List posList) { + RelDataType rowType = child.getRowType(); + final List fieldNames = rowType.getFieldNames(); + final RelBuilder relBuilder = + RelBuilder.proto(factory).create(child.getCluster(), null); + final List exprs = new AbstractList() { + @Override public int size() { + return posList.size(); + } + + @Override public RexNode get(int index) { + final int pos = posList.get(index); + return relBuilder.getRexBuilder().makeInputRef(child, pos); + } + }; + final List names = Util.select(fieldNames, posList); + return relBuilder + .push(child) + .projectNamed(exprs, names, true) + .build(); + } + /** Implements a CREATE DATABASE IF NOT EXISTS statement */ class DatabaseImplementor implements ScriptImplementor { private final String database; diff --git a/hoptimator-venice/src/test/resources/venice-ddl.id b/hoptimator-venice/src/test/resources/venice-ddl.id index e2657869..578aa34c 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl.id +++ b/hoptimator-venice/src/test/resources/venice-ddl.id @@ -25,9 +25,9 @@ spec: job: entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner args: - - CREATE TABLE IF NOT EXISTS `test-store` (`KEY_string` VARCHAR, `string` VARCHAR) WITH ('storeName'='test-store', 'connector'='venice', 'key.fields-prefix'='KEY_', 'key.fields'='KEY_string', 'partial-update-mode'='true') - - CREATE TABLE IF NOT EXISTS `test-store-1` (`KEY_string` VARCHAR, `string` VARCHAR) WITH ('storeName'='test-store-1', 'connector'='venice', 'key.fields-prefix'='KEY_', 'key.fields'='KEY_string', 'partial-update-mode'='true') - - INSERT INTO `test-store-1` (`KEY_string`, `string`) SELECT * FROM `VENICE-CLUSTER0`.`test-store` + - CREATE TABLE IF NOT EXISTS `test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('value.fields-include'='EXCEPT_KEY', 'storeName'='test-store', 'connector'='venice', 'key.fields-prefix'='KEY_', 'key.fields'='KEY_id', 'partial-update-mode'='true') + - CREATE TABLE IF NOT EXISTS `test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('value.fields-include'='EXCEPT_KEY', 'storeName'='test-store-1', 'connector'='venice', 'key.fields-prefix'='KEY_', 'key.fields'='KEY_id', 'partial-update-mode'='true') + - INSERT INTO `test-store-1` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE-CLUSTER0`.`test-store` jarURI: local:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless