From eba9397a2d85d560d9ec4b71b3df7d848bc5472c Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Tue, 14 Jan 2025 17:57:54 -0600 Subject: [PATCH] Add support for external engines --- README.md | 2 +- .../flink/docker-compose-sql-gateway.yaml | 4 +- deploy/samples/demodb.yaml | 5 +- deploy/samples/flinkDeployment.yaml | 2 +- deploy/samples/flinkengine.yaml | 8 + gradle/libs.versions.toml | 1 + .../java/com/linkedin/hoptimator/Engine.java | 2 + hoptimator-cli/build.gradle | 1 + .../linkedin/hoptimator/k8s/K8sEngine.java | 11 +- .../hoptimator/k8s/K8sEngineTable.java | 1 - .../hoptimator/util/DelegatingConnection.java | 311 ++++++++++++++ .../hoptimator/util/DelegatingDataSource.java | 70 ++++ .../hoptimator/util/DelegatingStatement.java | 257 ++++++++++++ .../hoptimator/util/planner/EngineRules.java | 58 ++- .../planner/HoptimatorJdbcConvention.java | 1 + .../hoptimator/util/planner/PipelineRel.java | 13 +- .../util/planner/RemoteConvention.java | 21 + .../hoptimator/util/planner/RemoteRel.java | 9 + .../planner/RemoteToEnumerableConverter.java | 390 ++++++++++++++++++ .../RemoteToEnumerableConverterRule.java | 52 +++ 20 files changed, 1178 insertions(+), 41 deletions(-) create mode 100644 deploy/samples/flinkengine.yaml create mode 100644 hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingConnection.java create mode 100644 hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingDataSource.java create mode 100644 hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingStatement.java create mode 100644 hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteConvention.java create mode 100644 hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteRel.java create mode 100644 hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteToEnumerableConverter.java create mode 100644 hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteToEnumerableConverterRule.java diff --git a/README.md b/README.md index ec027d6c..c0ed852d 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,7 @@ Once the Flink deployment pod has STATUS 'Running', you can forward port 8081 an to access the Flink dashboard. ``` - $ kubectl port-forward basic-session-deployment-7b94b98b6b-d6jt5 8081 & + $ kubectl port-forward svc/basic-session-deployment-rest 8081 & ``` See the [Flink SQL Gateway Documentation](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql-gateway/overview/) diff --git a/deploy/docker/flink/docker-compose-sql-gateway.yaml b/deploy/docker/flink/docker-compose-sql-gateway.yaml index 177d0037..01cbf977 100644 --- a/deploy/docker/flink/docker-compose-sql-gateway.yaml +++ b/deploy/docker/flink/docker-compose-sql-gateway.yaml @@ -3,10 +3,12 @@ services: image: flink:1.18.1 restart: unless-stopped entrypoint: > - /bin/sh -c "./bin/sql-gateway.sh start-foreground -Dsql-gateway.endpoint.rest.address=localhost" + /bin/sh -c "./bin/sql-gateway.sh start-foreground -Dsql-gateway.endpoint.rest.address=localhost -Drest.address=host.docker.internal" ports: - 8083:8083 deploy: resources: limits: memory: 1024M + extra_hosts: + - "host.docker.internal:host-gateway" diff --git a/deploy/samples/demodb.yaml b/deploy/samples/demodb.yaml index 349ffccb..b7c63ccc 100644 --- a/deploy/samples/demodb.yaml +++ b/deploy/samples/demodb.yaml @@ -29,7 +29,6 @@ spec: - profile-database - ads-database connector: | - connector = demo - database = {{database}} - table = {{table}} + connector = datagen + number-of-rows = 10 diff --git a/deploy/samples/flinkDeployment.yaml b/deploy/samples/flinkDeployment.yaml index f2464add..3ac57a75 100644 --- a/deploy/samples/flinkDeployment.yaml +++ b/deploy/samples/flinkDeployment.yaml @@ -7,7 +7,7 @@ spec: imagePullPolicy: Never flinkVersion: v1_18 flinkConfiguration: - taskmanager.numberOfTaskSlots: "3" + taskmanager.numberOfTaskSlots: "6" serviceAccount: flink jobManager: resource: diff --git a/deploy/samples/flinkengine.yaml b/deploy/samples/flinkengine.yaml new file mode 100644 index 00000000..4941f975 --- /dev/null +++ b/deploy/samples/flinkengine.yaml @@ -0,0 +1,8 @@ +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: Engine +metadata: + name: flink-engine +spec: + url: jdbc:flink://localhost:8083 + dialect: Flink + diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index a1b9d64a..14f2cf88 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -8,6 +8,7 @@ flink-clients = "org.apache.flink:flink-clients:1.18.1" flink-connector-base = "org.apache.flink:flink-connector-base:1.18.1" flink-core = "org.apache.flink:flink-core:1.18.1" flink-csv = "org.apache.flink:flink-csv:1.18.1" +flink-jdbc = "org.apache.flink:flink-sql-jdbc-driver-bundle:1.18.1" flink-streaming-java = "org.apache.flink:flink-streaming-java:1.18.1" flink-table-api-java = "org.apache.flink:flink-table-api-java:1.18.1" flink-table-api-java-bridge = "org.apache.flink:flink-table-api-java-bridge:1.18.1" diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Engine.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Engine.java index 5b3091d4..c6ddc217 100644 --- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Engine.java +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Engine.java @@ -10,4 +10,6 @@ public interface Engine { DataSource dataSource(); SqlDialect dialect(); + + String url(); } diff --git a/hoptimator-cli/build.gradle b/hoptimator-cli/build.gradle index 0702e8ed..713adc3c 100644 --- a/hoptimator-cli/build.gradle +++ b/hoptimator-cli/build.gradle @@ -17,6 +17,7 @@ dependencies { implementation libs.calcite.core implementation libs.sqlline implementation libs.slf4j.simple + implementation libs.flink.jdbc } publishing { diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngine.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngine.java index 6cbdeeab..5ba3df67 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngine.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngine.java @@ -5,6 +5,8 @@ import com.linkedin.hoptimator.Engine; import com.linkedin.hoptimator.SqlDialect; +import java.util.Objects; + import org.apache.calcite.adapter.jdbc.JdbcSchema; @@ -17,7 +19,7 @@ public class K8sEngine implements Engine { public K8sEngine(String name, String url, SqlDialect dialect, String driver) { this.name = name; - this.url = url; + this.url = Objects.requireNonNull(url, "url"); this.dialect = dialect; this.driver = driver; } @@ -33,8 +35,13 @@ public DataSource dataSource() { return JdbcSchema.dataSource(url, driver, null, null); } + @Override + public String url() { + return url; + } + @Override public SqlDialect dialect() { - return SqlDialect.FLINK; // TODO fix hardcoded dialect + return dialect; } } diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngineTable.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngineTable.java index 60263086..0b12d7f8 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngineTable.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sEngineTable.java @@ -5,7 +5,6 @@ import java.util.Locale; import java.util.Optional; import java.util.stream.Collectors; -import javax.sql.DataSource; import org.apache.calcite.adapter.jdbc.JdbcSchema; import org.apache.calcite.schema.Schema; diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingConnection.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingConnection.java new file mode 100644 index 00000000..886b219a --- /dev/null +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingConnection.java @@ -0,0 +1,311 @@ +package com.linkedin.hoptimator.util; + +import java.util.Properties; +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLClientInfoException; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.Map; +import java.util.concurrent.Executor; + +class DelegatingConnection implements Connection { + + private final Connection connection; + + DelegatingConnection(Connection connection) { + this.connection = connection; + } + + @Override + public String getSchema() throws SQLException { + return connection.getSchema(); + } + + @Override + public void setSchema(String schema) throws SQLException { + connection.setSchema(schema); + } + + @Override + public String getCatalog() throws SQLException { + return connection.getCatalog(); + } + + @Override + public void setCatalog(String catalog) throws SQLException { + connection.setCatalog(catalog); + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + // nop + } + + @Override + public boolean getAutoCommit() throws SQLException { + return true; + } + + @Override + public void close() throws SQLException { + connection.close(); + } + + @Override + public boolean isClosed() throws SQLException { + return connection.isClosed(); + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return connection.getMetaData(); + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + // nop + } + + @Override + public int getTransactionIsolation() throws SQLException { + return Connection.TRANSACTION_NONE; + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + connection.setClientInfo(name, value); + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + connection.setClientInfo(properties); + } + + @Override + public String getClientInfo(String name) throws SQLException { + return connection.getClientInfo(name); + } + + @Override + public Properties getClientInfo() throws SQLException { + return connection.getClientInfo(); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public String nativeSQL(String sql) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void commit() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void rollback() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public boolean isReadOnly() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void clearWarnings() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public Statement createStatement() throws SQLException { + return new DelegatingStatement(connection); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public PreparedStatement prepareStatement( + String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public Map> getTypeMap() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setTypeMap(Map> map) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setHoldability(int holdability) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public int getHoldability() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public Savepoint setSavepoint() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public Statement createStatement( + int resultSetType, int resultSetConcurrency, int resultSetHoldability) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public PreparedStatement prepareStatement( + String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public CallableStatement prepareCall( + String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public Clob createClob() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public Blob createBlob() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public NClob createNClob() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public boolean isValid(int timeout) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void abort(Executor executor) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public int getNetworkTimeout() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public T unwrap(Class iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return false; + } +} diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingDataSource.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingDataSource.java new file mode 100644 index 00000000..ea06f632 --- /dev/null +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingDataSource.java @@ -0,0 +1,70 @@ +package com.linkedin.hoptimator.util; + +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.logging.Logger; +import javax.sql.DataSource; + + +/** DataSource which loads a driver by URL, and papers over features the driver may lack. */ +public class DelegatingDataSource implements DataSource { + private static final Logger logger = Logger.getLogger("DelegatingDataSource"); + + private String url; + private int loginTimeout = 60; + private PrintWriter printWriter = new PrintWriter(System.out); + + public DelegatingDataSource() { + } + + public void setUrl(String url) { + this.url = url; + } + + @Override + public Connection getConnection() throws SQLException { + return new DelegatingConnection(DriverManager.getConnection(url)); + } + + @Override + public Connection getConnection(String user, String pass) throws SQLException { + return new DelegatingConnection(DriverManager.getConnection(url, user, pass)); + } + + @Override + public int getLoginTimeout() { + return loginTimeout; + } + + @Override + public void setLoginTimeout(int timeout) { + this.loginTimeout = timeout; + } + + @Override + public PrintWriter getLogWriter() { + return printWriter; + } + + @Override + public void setLogWriter(PrintWriter printWriter) { + this.printWriter = printWriter; + } + + @Override + public Logger getParentLogger() { + return logger; + } + + @Override + public boolean isWrapperFor(Class clazz) throws SQLException { + return false; + } + + @Override + public T unwrap(Class clazz) throws SQLException { + return null; + } +} diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingStatement.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingStatement.java new file mode 100644 index 00000000..594b8883 --- /dev/null +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DelegatingStatement.java @@ -0,0 +1,257 @@ +package com.linkedin.hoptimator.util; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLWarning; +import java.sql.Statement; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +class DelegatingStatement implements Statement { + private static final Logger logger = LoggerFactory.getLogger(DelegatingStatement.class); + + private final Connection connection; + private ResultSet resultSet; + + DelegatingStatement(Connection connection) { + this.connection = connection; + } + + @Override + public boolean execute(String sql) throws SQLException { + executeQuery(sql); + return true; + } + + @Override + public ResultSet executeQuery(String sql) throws SQLException { + // Split multi-statement queries and execute any leading updates + String[] parts = sql.split(";\n"); + int i = 0; + for (; i < parts.length - 1; i++) { + try (Statement stmt = connection.createStatement()) { + logger.info("DDL: " + parts[i]); + stmt.execute(parts[i]); + } + } + Statement stmt = connection.createStatement(); + logger.info("SQL: " + parts[i]); + resultSet = stmt.executeQuery(parts[i]); + return resultSet; + } + + @Override + public ResultSet getResultSet() { + return resultSet; + } + + @Override + public void cancel() throws SQLException { + // nop + } + + @Override + public void close() throws SQLException { + // nop + } + + + @Override + public void clearWarnings() throws SQLException { + // nop + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public int executeUpdate(String sql) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public int getUpdateCount() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public int getMaxFieldSize() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public Connection getConnection() throws SQLException { + return connection; + } + + @Override + public boolean isClosed() throws SQLException { + return connection.isClosed(); + } + + @Override + public void setMaxFieldSize(int max) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public int getMaxRows() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setMaxRows(int max) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setEscapeProcessing(boolean enable) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public int getQueryTimeout() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setQueryTimeout(int seconds) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setCursorName(String name) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setFetchDirection(int direction) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public int getFetchDirection() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setFetchSize(int rows) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public int getFetchSize() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public int getResultSetConcurrency() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public int getResultSetType() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void addBatch(String sql) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void clearBatch() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public int[] executeBatch() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public boolean getMoreResults() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public boolean getMoreResults(int current) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public ResultSet getGeneratedKeys() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public int executeUpdate(String sql, String[] columnNames) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public boolean execute(String sql, int[] columnIndexes) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public boolean execute(String sql, String[] columnNames) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public int getResultSetHoldability() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setPoolable(boolean poolable) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public boolean isPoolable() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public T unwrap(Class iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return false; + } + + @Override + public void closeOnCompletion() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public boolean isCloseOnCompletion() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } +} diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/EngineRules.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/EngineRules.java index e3f3399a..d4107fc7 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/EngineRules.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/EngineRules.java @@ -4,7 +4,6 @@ import org.apache.calcite.adapter.jdbc.JdbcConvention; import org.apache.calcite.adapter.jdbc.JdbcImplementor; -import org.apache.calcite.adapter.jdbc.JdbcRel; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.plan.Convention; @@ -31,9 +30,29 @@ import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.dialect.AnsiSqlDialect; +import org.apache.calcite.sql.dialect.CalciteSqlDialect; import org.apache.calcite.sql.dialect.MysqlSqlDialect; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.JavaRowFormat; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.linq4j.tree.BlockBuilder; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterImpl; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.runtime.Hook; +import org.apache.calcite.util.BuiltInMethod; + import com.linkedin.hoptimator.Engine; @@ -47,18 +66,16 @@ public EngineRules(Engine engine) { } public void register(HoptimatorJdbcConvention inTrait, RelOptPlanner planner) { - SqlDialect dialect = dialect(engine); - String name = engine.engineName() + "-" + inTrait.database(); - JdbcConvention outTrait = JdbcConvention.of(dialect, inTrait.expression, name); - - System.out.println("Registering rules for " + name + " using dialect " + dialect.toString()); + String name = engine.engineName() + "-" + inTrait.database(); + RemoteConvention remote = new RemoteConvention(name, engine); + planner.addRule(RemoteToEnumerableConverterRule.create(remote)); planner.addRule(RemoteJoinRule.Config.INSTANCE - .withConversion(Join.class, Convention.NONE, outTrait, "RemoteJoinRule") + .withConversion(PipelineRules.PipelineJoin.class, PipelineRel.CONVENTION, remote, "RemoteJoinRule") .withRuleFactory(RemoteJoinRule::new) .as(RemoteJoinRule.Config.class) .toRule(RemoteJoinRule.class)); planner.addRule(RemoteTableScanRule.Config.INSTANCE - .withConversion(TableScan.class, Convention.NONE, outTrait, "RemoteTableScan") + .withConversion(PipelineRules.PipelineTableScan.class, PipelineRel.CONVENTION, remote, "RemoteTableScan") .withRuleFactory(RemoteTableScanRule::new) .as(RemoteTableScanRule.Config.class) .toRule(RemoteTableScanRule.class)); @@ -79,20 +96,11 @@ public RelNode convert(RelNode rel) { } } - private class RemoteTableScan extends TableScan implements JdbcRel { + private class RemoteTableScan extends TableScan implements RemoteRel { public RemoteTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { super(cluster, traitSet, Collections.emptyList(), table); } - - @Override - public JdbcImplementor.Result implement(JdbcImplementor implementor) { - SqlDialect dialect = dialect(engine); - System.out.println("Generating sql in dialect " + dialect.toString()); - JdbcImplementor.Result res = new JdbcImplementor(dialect, new JavaTypeFactoryImpl()).implement(getInput(0)); - System.out.println("Implemented: " + res.toString()); - return res; - } } private class RemoteJoinRule extends ConverterRule { @@ -109,13 +117,12 @@ public RelNode convert(RelNode rel) { return new RemoteJoin(rel.getCluster(), newTraitSet, join.getLeft(), join.getRight(), join.getCondition(), join.getJoinType()); } catch (InvalidRelException e) { - System.out.println(e); throw new AssertionError(e); } } } - private class RemoteJoin extends Join implements JdbcRel { + private class RemoteJoin extends Join implements RemoteRel { protected RemoteJoin(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right, RexNode condition, JoinRelType joinType) @@ -130,7 +137,6 @@ public Join copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode try { return new RemoteJoin(getCluster(), traitSet, left, right, condition, joinType); } catch (InvalidRelException e) { - System.out.println(e); throw new AssertionError(e); } } @@ -139,16 +145,6 @@ public Join copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { return super.computeSelfCost(planner, mq).multiplyBy(0); // TODO fix zero cost } - - @Override - public JdbcImplementor.Result implement(JdbcImplementor implementor) { - SqlDialect dialect = dialect(engine); - System.out.println("Generating sql in dialect " + dialect.toString()); - JdbcImplementor.Result res = new JdbcImplementor(dialect, - new JavaTypeFactoryImpl()).implement(getInput(0)); - System.out.println("implemented (2) " + res.toString()); - return res; - } } private static SqlDialect dialect(Engine engine) { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcConvention.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcConvention.java index 23ebbd46..4f8e3b40 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcConvention.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcConvention.java @@ -35,6 +35,7 @@ public void register(RelOptPlanner planner) { super.register(planner); planner.addRule(PipelineRules.PipelineTableScanRule.create(this)); planner.addRule(PipelineRules.PipelineTableModifyRule.create(this)); + PipelineRules.rules().forEach(x -> planner.addRule(x)); engines().forEach(x -> new EngineRules(x).register(this, planner)); } } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java index 39e0187c..24925d4b 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java @@ -145,12 +145,18 @@ public Pipeline pipeline() throws SQLException { return new Pipeline(deployables); } - public Function sql() throws SQLException { + private ScriptImplementor script() throws SQLException { ScriptImplementor script = ScriptImplementor.empty(); for (Map.Entry source : sources.entrySet()) { Map configs = ConnectionService.configure(source.getKey(), Source.class); script = script.connector(source.getKey().table(), source.getValue(), configs); } + return script; + } + + /** SQL script ending in an INSERT INTO */ + public Function sql() throws SQLException { + ScriptImplementor script = script(); RelDataType targetRowType = sinkRowType; if (targetRowType == null) { targetRowType = query.getRowType(); @@ -162,5 +168,10 @@ public Function sql() throws SQLException { RelOptUtil.equal(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW); return script.seal(); } + + /** SQL script ending in a SELECT */ + public Function query() throws SQLException { + return script().query(query).seal(); + } } } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteConvention.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteConvention.java new file mode 100644 index 00000000..4de6955e --- /dev/null +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteConvention.java @@ -0,0 +1,21 @@ +package com.linkedin.hoptimator.util.planner; + +import com.linkedin.hoptimator.Engine; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.plan.Convention; + + +class RemoteConvention extends Convention.Impl { + + private final Engine engine; + + RemoteConvention(String name, Engine engine) { + super(name, RemoteRel.class); + this.engine = engine; + } + + Engine engine() { + return engine; + } +} diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteRel.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteRel.java new file mode 100644 index 00000000..b55968b0 --- /dev/null +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteRel.java @@ -0,0 +1,9 @@ +package com.linkedin.hoptimator.util.planner; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.plan.Convention; + + +public interface RemoteRel extends RelNode { + +} diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteToEnumerableConverter.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteToEnumerableConverter.java new file mode 100644 index 00000000..9c3775e4 --- /dev/null +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteToEnumerableConverter.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * N.B. this file copy-pasted from Apache Calcite with modifications. + */ +package com.linkedin.hoptimator.util.planner; + +import com.linkedin.hoptimator.util.DelegatingDataSource; +import com.linkedin.hoptimator.util.DeploymentService; + +import org.apache.calcite.DataContext; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.JavaRowFormat; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.adapter.enumerable.RexImpTable; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.adapter.jdbc.JdbcConvention; +import org.apache.calcite.adapter.jdbc.JdbcRel; +import org.apache.calcite.config.CalciteSystemProperty; +import org.apache.calcite.linq4j.tree.BlockBuilder; +import org.apache.calcite.linq4j.tree.ConstantExpression; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.ParameterExpression; +import org.apache.calcite.linq4j.tree.Primitive; +import org.apache.calcite.linq4j.tree.UnaryExpression; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.convert.ConverterImpl; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.runtime.Hook; +import org.apache.calcite.runtime.SqlFunctions; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.dialect.MysqlSqlDialect; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.util.SqlString; +import org.apache.calcite.util.BuiltInMethod; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.TimeZone; +import java.util.stream.Collectors; +import javax.sql.DataSource; + +import static org.apache.calcite.linq4j.Nullness.castNonNull; + +import static java.util.Objects.requireNonNull; + +/** + * Relational expression representing a scan of a table in a JDBC data source. + */ +public class RemoteToEnumerableConverter + extends ConverterImpl + implements EnumerableRel { + protected RemoteToEnumerableConverter( + RelOptCluster cluster, + RelTraitSet traits, + RelNode input) { + super(cluster, ConventionTraitDef.INSTANCE, traits, input); + } + + /** This method modified from upstream */ + private SqlString generateSql(SqlDialect dialect) { + RelRoot root = RelRoot.of(getInput(), SqlKind.SELECT); + try { + PipelineRel.Implementor plan = DeploymentService.plan(root); + return new SqlString(MysqlSqlDialect.DEFAULT, plan.query().apply(com.linkedin.hoptimator.SqlDialect.FLINK)); // TODO dialect + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override public RelNode copy(RelTraitSet traitSet, List inputs) { + return new RemoteToEnumerableConverter( + getCluster(), traitSet, sole(inputs)); + } + + @Override public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, + RelMetadataQuery mq) { + RelOptCost cost = super.computeSelfCost(planner, mq); + if (cost == null) { + return null; + } + return cost.multiplyBy(.1); + } + + @Override public Result implement(EnumerableRelImplementor implementor, Prefer pref) { + // Generate: + // ResultSetEnumerable.of(schema.getDataSource(), "select ...") + final BlockBuilder builder0 = new BlockBuilder(false); + final RemoteRel child = (RemoteRel) getInput(); + final PhysType physType = + PhysTypeImpl.of( + implementor.getTypeFactory(), getRowType(), + pref.prefer(JavaRowFormat.CUSTOM)); + final RemoteConvention convention = + (RemoteConvention) requireNonNull(child.getConvention(), + () -> "child.getConvention() is null for " + child); + SqlString sqlString = generateSql(MysqlSqlDialect.DEFAULT); // TODO hard-coded dialect + String sql = sqlString.getSql(); + if (CalciteSystemProperty.DEBUG.value()) { + System.out.println("[" + sql + "]"); + } + Hook.QUERY_PLAN.run(sql); + final Expression sql_ = + builder0.append("sql", Expressions.constant(sql)); + final int fieldCount = getRowType().getFieldCount(); + BlockBuilder builder = new BlockBuilder(); + final ParameterExpression resultSet_ = + Expressions.parameter(Modifier.FINAL, ResultSet.class, + builder.newName("resultSet")); + final SqlDialect.CalendarPolicy calendarPolicy = + MysqlSqlDialect.DEFAULT.getCalendarPolicy(); // TODO hard-coded dialect + final Expression calendar_; + switch (calendarPolicy) { + case LOCAL: + calendar_ = + builder0.append("calendar", + Expressions.call(Calendar.class, "getInstance", + getTimeZoneExpression(implementor))); + break; + default: + calendar_ = null; + } + if (fieldCount == 1) { + final ParameterExpression value_ = + Expressions.parameter(Object.class, builder.newName("value")); + builder.add(Expressions.declare(Modifier.FINAL, value_, null)); + generateGet(implementor, physType, builder, resultSet_, 0, value_, + calendar_, calendarPolicy); + builder.add(Expressions.return_(null, value_)); + } else { + final Expression values_ = + builder.append("values", + Expressions.newArrayBounds(Object.class, 1, + Expressions.constant(fieldCount))); + for (int i = 0; i < fieldCount; i++) { + generateGet(implementor, physType, builder, resultSet_, i, + Expressions.arrayIndex(values_, Expressions.constant(i)), + calendar_, calendarPolicy); + } + builder.add( + Expressions.return_(null, values_)); + } + final ParameterExpression e_ = + Expressions.parameter(SQLException.class, builder.newName("e")); + final Expression rowBuilderFactory_ = + builder0.append("rowBuilderFactory", + Expressions.lambda( + Expressions.block( + Expressions.return_(null, + Expressions.lambda( + Expressions.block( + Expressions.tryCatch( + builder.toBlock(), + Expressions.catch_( + e_, + Expressions.throw_( + Expressions.new_( + RuntimeException.class, + e_)))))))), + resultSet_)); + + String dataSourceUrl = convention.engine().url(); + Expression dataSource = builder0.append("dataSource", + Expressions.new_(DelegatingDataSource.class)); + + builder0.add( + Expressions.statement( + Expressions.call(dataSource, "setUrl", Expressions.constant(dataSourceUrl)))); + + final Expression enumerable; + + if (sqlString.getDynamicParameters() != null + && !sqlString.getDynamicParameters().isEmpty()) { + final Expression preparedStatementConsumer_ = + builder0.append("preparedStatementConsumer", + Expressions.call(BuiltInMethod.CREATE_ENRICHER.method, + Expressions.newArrayInit(Integer.class, 1, + toIndexesTableExpression(sqlString)), + DataContext.ROOT)); + + enumerable = + builder0.append("enumerable", + Expressions.call( + BuiltInMethod.RESULT_SET_ENUMERABLE_OF_PREPARED.method, +// Schemas.unwrap(convention.expression(), DataSource.class), + dataSource, + sql_, + rowBuilderFactory_, + preparedStatementConsumer_)); + } else { + enumerable = + builder0.append("enumerable", + Expressions.call( + BuiltInMethod.RESULT_SET_ENUMERABLE_OF.method, +// Schemas.unwrap(convention.expression(), DataSource.class), + dataSource, + sql_, + rowBuilderFactory_)); + } + builder0.add( + Expressions.statement( + Expressions.call(enumerable, + BuiltInMethod.RESULT_SET_ENUMERABLE_SET_TIMEOUT.method, + DataContext.ROOT))); + builder0.add( + Expressions.return_(null, enumerable)); + return implementor.result(physType, builder0.toBlock()); + } + + private static List toIndexesTableExpression(SqlString sqlString) { + return requireNonNull(sqlString.getDynamicParameters(), + () -> "sqlString.getDynamicParameters() is null for " + sqlString).stream() + .map(Expressions::constant) + .collect(Collectors.toList()); + } + + private static UnaryExpression getTimeZoneExpression( + EnumerableRelImplementor implementor) { + return Expressions.convert_( + Expressions.call( + implementor.getRootExpression(), + "get", + Expressions.constant("timeZone")), + TimeZone.class); + } + + private static void generateGet(EnumerableRelImplementor implementor, + PhysType physType, BlockBuilder builder, ParameterExpression resultSet_, + int i, Expression target, @Nullable Expression calendar_, + SqlDialect.CalendarPolicy calendarPolicy) { + final Primitive primitive = Primitive.ofBoxOr(physType.fieldClass(i)); + final RelDataType fieldType = + physType.getRowType().getFieldList().get(i).getType(); + final List dateTimeArgs = new ArrayList<>(); + dateTimeArgs.add(Expressions.constant(i + 1)); + SqlTypeName sqlTypeName = fieldType.getSqlTypeName(); + boolean offset = false; + switch (calendarPolicy) { + case LOCAL: + requireNonNull(calendar_, "calendar_"); + dateTimeArgs.add(calendar_); + break; + case NULL: + // We don't specify a calendar at all, so we don't add an argument and + // instead use the version of the getXXX that doesn't take a Calendar + break; + case DIRECT: + sqlTypeName = SqlTypeName.ANY; + break; + case SHIFT: + switch (sqlTypeName) { + case TIMESTAMP: + case DATE: + offset = true; + break; + default: + break; + } + break; + default: + break; + } + final Expression source; + switch (sqlTypeName) { + case DATE: + case TIME: + case TIMESTAMP: + source = + Expressions.call( + getMethod(sqlTypeName, fieldType.isNullable(), offset), + Expressions.list() + .append( + Expressions.call(resultSet_, + getMethod2(sqlTypeName), dateTimeArgs)) + .appendIf(offset, getTimeZoneExpression(implementor))); + break; + case ARRAY: + final Expression x = + Expressions.convert_( + Expressions.call(resultSet_, jdbcGetMethod(primitive), + Expressions.constant(i + 1)), + java.sql.Array.class); + source = Expressions.call(BuiltInMethod.JDBC_ARRAY_TO_LIST.method, x); + break; + case NULL: + source = RexImpTable.NULL_EXPR; + break; + default: + source = + Expressions.call(resultSet_, jdbcGetMethod(primitive), + Expressions.constant(i + 1)); + } + builder.add( + Expressions.statement( + Expressions.assign( + target, source))); + + // [CALCITE-596] If primitive type columns contain null value, returns null + // object + if (primitive != null) { + builder.add( + Expressions.ifThen( + Expressions.call(resultSet_, "wasNull"), + Expressions.statement( + Expressions.assign(target, + Expressions.constant(null))))); + } + } + + private static Method getMethod(SqlTypeName sqlTypeName, boolean nullable, + boolean offset) { + switch (sqlTypeName) { + case DATE: + return (nullable + ? (offset + ? BuiltInMethod.DATE_TO_INT_OPTIONAL_OFFSET + : BuiltInMethod.DATE_TO_INT_OPTIONAL) + : (offset + ? BuiltInMethod.DATE_TO_INT_OFFSET + : BuiltInMethod.DATE_TO_INT)).method; + case TIME: + return (nullable + ? BuiltInMethod.TIME_TO_INT_OPTIONAL + : BuiltInMethod.TIME_TO_INT).method; + case TIMESTAMP: + return (nullable + ? (offset + ? BuiltInMethod.TIMESTAMP_TO_LONG_OPTIONAL_OFFSET + : BuiltInMethod.TIMESTAMP_TO_LONG_OPTIONAL) + : (offset + ? BuiltInMethod.TIMESTAMP_TO_LONG_OFFSET + : BuiltInMethod.TIMESTAMP_TO_LONG)).method; + default: + throw new AssertionError(sqlTypeName + ":" + nullable); + } + } + + private static Method getMethod2(SqlTypeName sqlTypeName) { + switch (sqlTypeName) { + case DATE: + return BuiltInMethod.RESULT_SET_GET_DATE2.method; + case TIME: + return BuiltInMethod.RESULT_SET_GET_TIME2.method; + case TIMESTAMP: + return BuiltInMethod.RESULT_SET_GET_TIMESTAMP2.method; + default: + throw new AssertionError(sqlTypeName); + } + } + + /** E,g, {@code jdbcGetMethod(int)} returns "getInt". */ + private static String jdbcGetMethod(@Nullable Primitive primitive) { + return primitive == null + ? "getObject" + : "get" + SqlFunctions.initcap(castNonNull(primitive.primitiveName)); + } +} diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteToEnumerableConverterRule.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteToEnumerableConverterRule.java new file mode 100644 index 00000000..3804874f --- /dev/null +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/RemoteToEnumerableConverterRule.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * N.B. this file copy-pasted from Apache Calcite with modifications. + */ +package com.linkedin.hoptimator.util.planner; + +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; + +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Rule to convert a relational expression from + * {@link JdbcConvention} to + * {@link EnumerableConvention}. + */ +public class RemoteToEnumerableConverterRule extends ConverterRule { + /** Creates a RemoteToEnumerableConverterRule. */ + public static RemoteToEnumerableConverterRule create(RemoteConvention inTrait) { + return Config.INSTANCE + .withConversion(RelNode.class, inTrait, EnumerableConvention.INSTANCE, + "RemoteToEnumerableConverterRule") + .withRuleFactory(RemoteToEnumerableConverterRule::new) + .toRule(RemoteToEnumerableConverterRule.class); + } + + /** Called from the Config. */ + protected RemoteToEnumerableConverterRule(Config config) { + super(config); + } + + @Override public @Nullable RelNode convert(RelNode rel) { + RelTraitSet newTraitSet = rel.getTraitSet().replace(getOutTrait()); + return new RemoteToEnumerableConverter(rel.getCluster(), newTraitSet, rel); + } +}