From 64b7b7337413dcd5ff07c6fe57863cd93034b945 Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Mon, 23 Dec 2024 15:55:53 -0600 Subject: [PATCH] Support flattening and unflattening structured types (#79) --- Makefile | 5 +- hoptimator-avro/build.gradle | 46 ++++++++ .../linkedin/hoptimator/demodb/AdsSchema.java | 1 + hoptimator-jdbc-driver/build.gradle | 1 - .../hoptimator/jdbc/schema/CatalogTable.java | 2 +- .../jdbc/schema/UtilityCatalog.java | 4 +- .../linkedin/hoptimator/k8s/K8sConnector.java | 9 +- .../hoptimator/k8s/K8sDatabaseTable.java | 2 +- .../src/test/resources/kafka-ddl.id | 4 +- hoptimator-util/build.gradle | 61 +++++++++++ .../hoptimator/util/ConnectionService.java | 4 +- .../hoptimator/util/DataTypeUtils.java | 101 ++++++++++++++++++ .../util/HoptimatorJdbcCatalogSchema.java | 38 ------- .../hoptimator/util/HoptimatorJdbcSchema.java | 25 +++-- .../linkedin/hoptimator/util/Template.java | 4 +- .../hoptimator/util/planner/PipelineRel.java | 4 +- .../util/planner/PipelineRules.java | 21 +++- .../util/planner/ScriptImplementor.java | 35 ++++-- .../hoptimator/util/TestDataTypeUtils.java | 81 ++++++++++++++ 19 files changed, 372 insertions(+), 76 deletions(-) create mode 100644 hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java delete mode 100644 hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcCatalogSchema.java create mode 100644 hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java diff --git a/Makefile b/Makefile index 2610735..772de79 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,9 @@ install: ./gradlew compileJava installDist +test: + ./gradlew test -x spotbugsMain -x spotbugsTest -x spotbugsTestFixtures + build: ./gradlew build docker build . -t hoptimator @@ -77,4 +80,4 @@ release: test -n "$(VERSION)" # MISSING ARG: $$VERSION ./gradlew publish -.PHONY: build clean quickstart deploy-dev-environment deploy deploy-samples deploy-demo deploy-config integration-tests bounce generate-models release +.PHONY: build test install clean quickstart deploy-dev-environment deploy deploy-samples deploy-demo deploy-config integration-tests bounce generate-models release diff --git a/hoptimator-avro/build.gradle b/hoptimator-avro/build.gradle index 8acd2c8..475dac0 100644 --- a/hoptimator-avro/build.gradle +++ b/hoptimator-avro/build.gradle @@ -1,5 +1,6 @@ plugins { id 'java' + id 'maven-publish' } dependencies { @@ -7,3 +8,48 @@ dependencies { implementation libs.avro implementation libs.calcite.core } + +publishing { + repositories { + maven { + name 'GitHubPackages' + url = 'https://maven.pkg.github.com/linkedin/Hoptimator' + credentials { + username = System.getenv('GITHUB_ACTOR') + password = System.getenv('GITHUB_TOKEN') + } + } + maven { + name 'LinkedInJFrog' + url 'https://linkedin.jfrog.io/artifactory/hoptimator' + credentials { + username = System.getenv('JFROG_USERNAME') + password = System.getenv('JFROG_API_KEY') + } + } + } + publications { + maven(MavenPublication) { + groupId = 'com.linkedin.hoptimator' + artifactId = 'hoptimator-avro' + version = System.getenv('VERSION') + from components.java + pom { + name = 'hoptimator-avro' + description = 'Hoptimator plugin for Apache Avro' + url = 'https://github.com/linkedin/Hoptimator' + licenses { + license { + name = 'BSD 2-Clause' + url = 'https://raw.githubusercontent.com/linkedin/Hoptimator/main/LICENSE' + } + } + scm { + connection = 'scm:git:git://github.com:linkedin/Hoptimator.git' + developerConnection = 'scm:git:ssh://github.com:linkedin/Hoptimator.git' + url = 'https://github.com/linkedin/Hoptimator' + } + } + } + } +} diff --git a/hoptimator-demodb/src/main/java/com/linkedin/hoptimator/demodb/AdsSchema.java b/hoptimator-demodb/src/main/java/com/linkedin/hoptimator/demodb/AdsSchema.java index 974e3d5..ac52f6c 100644 --- a/hoptimator-demodb/src/main/java/com/linkedin/hoptimator/demodb/AdsSchema.java +++ b/hoptimator-demodb/src/main/java/com/linkedin/hoptimator/demodb/AdsSchema.java @@ -14,6 +14,7 @@ public class AdsSchema extends AbstractSchema { public AdsSchema() { tableMap.put("PAGE_VIEWS", new PageViewTable()); tableMap.put("AD_CLICKS", new AdClickTable()); + tableMap.put("CAMPAIGNS", new CampaignTable()); } @Override diff --git a/hoptimator-jdbc-driver/build.gradle b/hoptimator-jdbc-driver/build.gradle index d39f757..72fd7c6 100644 --- a/hoptimator-jdbc-driver/build.gradle +++ b/hoptimator-jdbc-driver/build.gradle @@ -5,7 +5,6 @@ plugins { } dependencies { - implementation project(':hoptimator-avro') implementation project(':hoptimator-demodb') implementation project(':hoptimator-jdbc') implementation project(':hoptimator-util') diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/schema/CatalogTable.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/schema/CatalogTable.java index dad8fa9..9268215 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/schema/CatalogTable.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/schema/CatalogTable.java @@ -7,7 +7,7 @@ import com.linkedin.hoptimator.util.RemoteTable; -/** A table populated with all available Catlaogs. */ +/** A table populated with all available Catalogs. */ public class CatalogTable extends RemoteTable { // This and other Row classes are used by generated code, so it is important diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/schema/UtilityCatalog.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/schema/UtilityCatalog.java index b7b07f9..4a24e75 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/schema/UtilityCatalog.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/schema/UtilityCatalog.java @@ -2,7 +2,7 @@ import java.sql.SQLException; import java.sql.Wrapper; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import org.apache.calcite.schema.SchemaPlus; @@ -15,7 +15,7 @@ /** Built-in utility tables. */ public class UtilityCatalog extends AbstractSchema implements Catalog { - private final Map tableMap = new HashMap<>(); + private final Map tableMap = new LinkedHashMap<>(); public UtilityCatalog() { tableMap.put("PRINT", new PrintTable()); diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java index 9cd54af..64529bf 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java @@ -3,7 +3,7 @@ import java.io.IOException; import java.io.StringReader; import java.sql.SQLException; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Locale; import java.util.Map; import java.util.Properties; @@ -47,10 +47,9 @@ public Map configure(Source source) throws SQLException { } catch (IOException e) { throw new SQLException(e); } - Map map = new HashMap<>(); - for (String key : props.stringPropertyNames()) { - map.put(key, props.getProperty(key)); - } + Map map = new LinkedHashMap<>(); + props.stringPropertyNames().stream().sorted().forEach(k -> + map.put(k, props.getProperty(k))); return map; } } diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDatabaseTable.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDatabaseTable.java index 38e261d..79541bd 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDatabaseTable.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDatabaseTable.java @@ -47,7 +47,7 @@ public K8sDatabaseTable(K8sContext context) { public void addDatabases(SchemaPlus parentSchema) { for (Row row : rows()) { parentSchema.add(schemaName(row), - HoptimatorJdbcSchema.create(row.NAME, null, row.SCHEMA, dataSource(row), parentSchema, dialect(row))); + HoptimatorJdbcSchema.create(row.NAME, row.SCHEMA, dataSource(row), parentSchema, dialect(row))); } } diff --git a/hoptimator-kafka/src/test/resources/kafka-ddl.id b/hoptimator-kafka/src/test/resources/kafka-ddl.id index 16d96e9..3d5bd1a 100644 --- a/hoptimator-kafka/src/test/resources/kafka-ddl.id +++ b/hoptimator-kafka/src/test/resources/kafka-ddl.id @@ -24,8 +24,8 @@ spec: job: entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner args: - - CREATE TABLE IF NOT EXISTS `existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-2', 'connector'='kafka') - - CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-1', 'connector'='kafka') + - CREATE TABLE IF NOT EXISTS `existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-2') + - CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-1') - INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `KAFKA`.`existing-topic-2` jarURI: local:///opt/hoptimator-flink-runner.jar parallelism: 1 diff --git a/hoptimator-util/build.gradle b/hoptimator-util/build.gradle index e70fe77..c55c112 100644 --- a/hoptimator-util/build.gradle +++ b/hoptimator-util/build.gradle @@ -1,8 +1,69 @@ plugins { id 'java' + id 'maven-publish' } dependencies { implementation project(':hoptimator-api') implementation libs.calcite.core + + testImplementation(testFixtures(project(':hoptimator-jdbc'))) + testImplementation(platform('org.junit:junit-bom:5.11.3')) + testImplementation 'org.junit.jupiter:junit-jupiter' + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine' +} + +test { + useJUnitPlatform { + excludeTags 'integration' + } + testLogging { + events "passed", "skipped", "failed" + } +} + +publishing { + repositories { + maven { + name 'GitHubPackages' + url = 'https://maven.pkg.github.com/linkedin/Hoptimator' + credentials { + username = System.getenv('GITHUB_ACTOR') + password = System.getenv('GITHUB_TOKEN') + } + } + maven { + name 'LinkedInJFrog' + url 'https://linkedin.jfrog.io/artifactory/hoptimator' + credentials { + username = System.getenv('JFROG_USERNAME') + password = System.getenv('JFROG_API_KEY') + } + } + } + publications { + maven(MavenPublication) { + groupId = 'com.linkedin.hoptimator' + artifactId = 'hoptimator-util' + version = System.getenv('VERSION') + from components.java + pom { + name = 'hoptimator-util' + description = 'Utilities to help with extending Hoptimator' + url = 'https://github.com/linkedin/Hoptimator' + licenses { + license { + name = 'BSD 2-Clause' + url = 'https://raw.githubusercontent.com/linkedin/Hoptimator/main/LICENSE' + } + } + scm { + connection = 'scm:git:git://github.com:linkedin/Hoptimator.git' + developerConnection = 'scm:git:ssh://github.com:linkedin/Hoptimator.git' + url = 'https://github.com/linkedin/Hoptimator' + } + } + } + } } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/ConnectionService.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/ConnectionService.java index 06e3722..cd81cff 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/ConnectionService.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/ConnectionService.java @@ -3,7 +3,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.ServiceLoader; @@ -19,7 +19,7 @@ private ConnectionService() { } public static Map configure(T object, Class clazz) throws SQLException { - Map configs = new HashMap<>(); + Map configs = new LinkedHashMap<>(); for (Connector connector : connectors(clazz)) { configs.putAll(connector.configure(object)); } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java new file mode 100644 index 0000000..ba446b9 --- /dev/null +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java @@ -0,0 +1,101 @@ +package com.linkedin.hoptimator.util; + +import java.util.Collections; +import java.util.List; +import java.util.LinkedHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.sql.type.SqlTypeName; + + +public final class DataTypeUtils { + + private DataTypeUtils() { + } + + /** + * Flattens nested structs and complex arrays. + * + * Nested structs like `FOO Row(BAR Row(QUX VARCHAR)))` are promoted to + * top-level fields like `FOO$BAR$QUX VARCHAR`. + * + * Complex arrays are demoted to just `ANY ARRAY`. Primitive arrays are + * unchanged. + * + */ + public static RelDataType flatten(RelDataType dataType, RelDataTypeFactory typeFactory) { + if (!dataType.isStruct()) { + return dataType; + } + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory); + flattenInto(typeFactory, dataType, builder, Collections.emptyList()); + return builder.build(); + } + + private static void flattenInto(RelDataTypeFactory typeFactory, RelDataType dataType, + RelDataTypeFactory.Builder builder, List path) { + if (dataType.getComponentType() != null && (dataType.getComponentType().isStruct() + || dataType.getComponentType().getComponentType() != null)) { + // demote complex arrays to just `ANY ARRAY` + builder.add(path.stream().collect(Collectors.joining("$")), typeFactory.createArrayType( + typeFactory.createSqlType(SqlTypeName.ANY), -1)); + } else if (!dataType.isStruct()) { + builder.add(path.stream().collect(Collectors.joining("$")), dataType); + } else { + for (RelDataTypeField field : dataType.getFieldList()) { + flattenInto(typeFactory, field.getType(), builder, Stream.concat(path.stream(), + Stream.of(field.getName())).collect(Collectors.toList())); + } + } + } + + /** Restructures flattened types, from `FOO$BAR VARCHAR` to `FOO Row(BAR VARCHAR...)` */ + public static RelDataType unflatten(RelDataType dataType, RelDataTypeFactory typeFactory) { + if (!dataType.isStruct()) { + throw new IllegalArgumentException("Can only unflatten a struct type."); + } + Node root = new Node(); + for (RelDataTypeField field : dataType.getFieldList()) { + buildNodes(root, field.getName(), field.getType()); + } + return buildRecord(root, typeFactory); + } + + private static void buildNodes(Node pos, String name, RelDataType dataType) { + if (!name.contains("$")) { + pos.children.put(name, new Node(dataType)); + } else { + String[] parts = name.split("\\$", 2); + Node child = pos.children.computeIfAbsent(parts[0], x -> new Node()); + buildNodes(child, parts[1], dataType); + } + } + + private static RelDataType buildRecord(Node node, RelDataTypeFactory typeFactory) { + if (node.dataType != null) { + return node.dataType; + } + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory); + for (LinkedHashMap.Entry child : node.children.entrySet()) { + builder.add(child.getKey(), buildRecord(child.getValue(), typeFactory)); + } + return builder.build(); + } + + private static class Node { + RelDataType dataType; + LinkedHashMap children = new LinkedHashMap<>(); + + Node(RelDataType dataType) { + this.dataType = dataType; + } + + Node() { + // nop + } + } +} diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcCatalogSchema.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcCatalogSchema.java deleted file mode 100644 index 8cbbb63..0000000 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcCatalogSchema.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.linkedin.hoptimator.util; - -import javax.sql.DataSource; - -import org.apache.calcite.adapter.jdbc.JdbcCatalogSchema; -import org.apache.calcite.adapter.jdbc.JdbcSchema; -import org.apache.calcite.linq4j.tree.Expression; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.Schemas; -import org.apache.calcite.sql.SqlDialect; -import org.apache.calcite.sql.SqlDialectFactory; -import org.apache.calcite.sql.SqlDialectFactoryImpl; - -import com.linkedin.hoptimator.util.planner.HoptimatorJdbcConvention; - - -public class HoptimatorJdbcCatalogSchema extends JdbcCatalogSchema { - - public HoptimatorJdbcCatalogSchema(String name, String database, DataSource dataSource, SqlDialect dialect, - Expression expression) { - super(dataSource, dialect, new HoptimatorJdbcConvention(dialect, expression, name), name); - } - - public HoptimatorJdbcCatalogSchema(String name, String database, DataSource dataSource, SchemaPlus parentSchema, - SqlDialect dialect) { - this(name, database, dataSource, dialect, - Schemas.subSchemaExpression(parentSchema, name, HoptimatorJdbcCatalogSchema.class)); - } - - public HoptimatorJdbcCatalogSchema(String name, String database, DataSource dataSource, SchemaPlus parentSchema, - SqlDialectFactory dialectFactory) { - this(name, database, dataSource, parentSchema, JdbcSchema.createDialect(dialectFactory, dataSource)); - } - - public HoptimatorJdbcCatalogSchema(String name, String database, DataSource dataSource, SchemaPlus parentSchema) { - this(name, database, dataSource, parentSchema, SqlDialectFactoryImpl.INSTANCE); - } -} diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcSchema.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcSchema.java index 4ca2236..c29c5fc 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcSchema.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcSchema.java @@ -2,10 +2,12 @@ import javax.sql.DataSource; +import org.apache.calcite.adapter.jdbc.JdbcConvention; import org.apache.calcite.adapter.jdbc.JdbcSchema; import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.impl.AbstractSchema; import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.SqlDialectFactory; import org.apache.calcite.sql.SqlDialectFactoryImpl; @@ -18,35 +20,36 @@ public class HoptimatorJdbcSchema extends JdbcSchema implements Database { private final String database; - public static HoptimatorJdbcSchema create(String database, String catalog, String schema, DataSource dataSource, + public static HoptimatorJdbcSchema create(String database, String schema, DataSource dataSource, SchemaPlus parentSchema, SqlDialect dialect) { if (dialect == null) { - return new HoptimatorJdbcSchema(database, catalog, schema, dataSource, parentSchema); + return new HoptimatorJdbcSchema(database, schema, dataSource, parentSchema); } else { - return new HoptimatorJdbcSchema(database, catalog, schema, dataSource, parentSchema, dialect); + return new HoptimatorJdbcSchema(database, schema, dataSource, parentSchema, dialect); } } - public HoptimatorJdbcSchema(String database, String catalog, String schema, DataSource dataSource, SqlDialect dialect, + public HoptimatorJdbcSchema(String database, String schema, DataSource dataSource, SqlDialect dialect, Expression expression) { - super(dataSource, dialect, new HoptimatorJdbcConvention(dialect, expression, database), catalog, schema); + super(dataSource, dialect, new HoptimatorJdbcConvention(dialect, expression, database), + null, schema); this.database = database; } - public HoptimatorJdbcSchema(String database, String catalog, String schema, DataSource dataSource, + public HoptimatorJdbcSchema(String database, String schema, DataSource dataSource, SchemaPlus parentSchema, SqlDialect dialect) { - this(database, catalog, schema, dataSource, dialect, + this(database, schema, dataSource, dialect, Schemas.subSchemaExpression(parentSchema, schema, HoptimatorJdbcSchema.class)); } - public HoptimatorJdbcSchema(String database, String catalog, String schema, DataSource dataSource, + public HoptimatorJdbcSchema(String database, String schema, DataSource dataSource, SchemaPlus parentSchema, SqlDialectFactory dialectFactory) { - this(database, catalog, schema, dataSource, parentSchema, createDialect(dialectFactory, dataSource)); + this(database, schema, dataSource, parentSchema, createDialect(dialectFactory, dataSource)); } - public HoptimatorJdbcSchema(String database, String catalog, String schema, DataSource dataSource, + public HoptimatorJdbcSchema(String database, String schema, DataSource dataSource, SchemaPlus parentSchema) { - this(database, catalog, schema, dataSource, parentSchema, SqlDialectFactoryImpl.INSTANCE); + this(database, schema, dataSource, parentSchema, SqlDialectFactoryImpl.INSTANCE); } @Override diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java index f4658c3..26267de 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java @@ -1,6 +1,6 @@ package com.linkedin.hoptimator.util; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Locale; import java.util.Map; import java.util.function.Supplier; @@ -34,7 +34,7 @@ class SimpleEnvironment implements Environment { private final Map> vars; public SimpleEnvironment() { - this.vars = new HashMap<>(); + this.vars = new LinkedHashMap<>(); } public SimpleEnvironment(Map> vars) { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java index b78859a..bcfe9a7 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java @@ -4,7 +4,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -41,7 +41,7 @@ public interface PipelineRel extends RelNode { /** Implements a deployable Pipeline. */ class Implementor { - private final Map sources = new HashMap<>(); + private final Map sources = new LinkedHashMap<>(); private RelNode query; private String sinkDatabase = "pipeline"; private List sinkPath = Arrays.asList(new String[]{"PIPELINE", "SINK"}); diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java index cd291ac..7ac2c02 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java @@ -8,6 +8,10 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.linkedin.hoptimator.util.DataTypeUtils; import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.plan.Convention; @@ -31,9 +35,13 @@ import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexProgram; import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; public final class PipelineRules { @@ -82,7 +90,9 @@ static class PipelineTableScan extends TableScan implements PipelineRel { @Override public void implement(Implementor implementor) throws SQLException { - implementor.addSource(database, table.getQualifiedName(), table.getRowType(), + RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + implementor.addSource(database, table.getQualifiedName(), + DataTypeUtils.unflatten(table.getRowType(), typeFactory), Collections.emptyMap()); // TODO pass in table scan hints } } @@ -134,7 +144,14 @@ public RelNode copy(RelTraitSet traitSet, List inputs) { @Override public void implement(Implementor implementor) throws SQLException { - implementor.setSink(database, table.getQualifiedName(), table.getRowType(), Collections.emptyMap()); + RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RelDataType flattened = DataTypeUtils.flatten(table.getRowType(), typeFactory); + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory); + for (RelDataTypeField field : flattened.getFieldList()) { + // Rewrite FOO$BAR as FOO_BAR in output tables + builder.add(field.getName().replaceAll("\\$", "_"), field.getType()); + } + implementor.setSink(database, table.getQualifiedName(), builder.build(), Collections.emptyMap()); implementor.setQuery(getInput()); } } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java index 78e96a3..3f91c2d 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java @@ -7,13 +7,18 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.linkedin.hoptimator.util.DataTypeUtils; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.rel2sql.RelToSqlConverter; import org.apache.calcite.rel.rel2sql.SqlImplementor; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.sql.SqlBasicTypeNameSpec; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlCollectionTypeNameSpec; @@ -32,6 +37,7 @@ import org.apache.calcite.sql.fun.SqlRowOperator; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.pretty.SqlPrettyWriter; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.util.SqlShuttle; @@ -49,7 +55,7 @@ * * ScriptImplementor.empty() * .database(db) - * .connector(db, name, rowType, configs) + * .connector(name, rowType, configs) * * ... would produce something like * @@ -158,7 +164,7 @@ public void implement(SqlWriter w) { if (select.getSelectList() != null) { select.setSelectList((SqlNodeList) select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR)); } - select.unparse(w, 0, 0); + select.accept(UNFLATTEN_MEMBER_ACCESS).unparse(w, 0, 0); } // A `ROW(...)` operator which will unparse as just `(...)`. @@ -177,6 +183,18 @@ public SqlNode visit(SqlCall call) { } } }; + + // a shuttle that replaces `FOO$BAR` with `FOO.BAR` + private static final SqlShuttle UNFLATTEN_MEMBER_ACCESS = new SqlShuttle() { + @Override + public SqlNode visit(SqlIdentifier id) { + SqlIdentifier replacement = new SqlIdentifier(id.names.stream() + .flatMap(x -> Stream.of(x.split("\\$"))) + .collect(Collectors.toList()), SqlParserPos.ZERO); + id.assignNamesFrom(replacement); + return id; + } + }; } /** @@ -256,10 +274,8 @@ public InsertImplementor(String name, RelNode relNode) { public void implement(SqlWriter w) { w.keyword("INSERT INTO"); (new IdentifierImplementor(name)).implement(w); - SqlWriter.Frame frame1 = w.startList("(", ")"); RelNode project = dropNullFields(relNode); (new ColumnListImplementor(project.getRowType())).implement(w); - w.endList(frame1); (new QueryImplementor(project)).implement(w); w.literal(";"); } @@ -332,6 +348,7 @@ public void implement(SqlWriter w) { * * N.B. the following magic: * - NULL fields are promoted to BYTES + * - Flattened fields like FOO$BAR are renamed FOO_BAR */ class RowTypeSpecImplementor implements ScriptImplementor { private final RelDataType dataType; @@ -342,13 +359,16 @@ public RowTypeSpecImplementor(RelDataType dataType) { @Override public void implement(SqlWriter w) { - List fieldNames = dataType.getFieldList() + RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RelDataType flattened = dataType; + List fieldNames = flattened.getFieldList() .stream() .map(x -> x.getName()) + .map(x -> x.replaceAll("\\$", "_")) // support FOO$BAR columns as FOO_BAR .map(x -> new SqlIdentifier(x, SqlParserPos.ZERO)) .collect(Collectors.toList()); List fieldTypes = - dataType.getFieldList().stream().map(x -> x.getType()).map(x -> toSpec(x)).collect(Collectors.toList()); + flattened.getFieldList().stream().map(x -> x.getType()).map(x -> toSpec(x)).collect(Collectors.toList()); for (int i = 0; i < fieldNames.size(); i++) { w.sep(","); fieldNames.get(i).unparse(w, 0, 0); @@ -408,14 +428,17 @@ public ColumnListImplementor(List fields) { @Override public void implement(SqlWriter w) { + SqlWriter.Frame frame1 = w.startList("(", ")"); List fieldNames = fields.stream() .map(x -> x.getName()) + .map(x -> x.replaceAll("\\$", "_")) // support FOO$BAR columns as FOO_BAR .map(x -> new SqlIdentifier(x, SqlParserPos.ZERO)) .collect(Collectors.toList()); for (int i = 0; i < fieldNames.size(); i++) { w.sep(","); fieldNames.get(i).unparse(w, 0, 0); } + w.endList(frame1); } } diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java new file mode 100644 index 0000000..968273c --- /dev/null +++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java @@ -0,0 +1,81 @@ +package com.linkedin.hoptimator.util; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import com.linkedin.hoptimator.util.planner.ScriptImplementor; + +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.Litmus; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Assertions; + + +public class TestDataTypeUtils { + + @Test + public void flattenUnflatten() { + RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RelDataTypeFactory.Builder builder1 = new RelDataTypeFactory.Builder(typeFactory); + builder1.add("QUX", SqlTypeName.VARCHAR); + builder1.add("QIZ", SqlTypeName.VARCHAR); + RelDataTypeFactory.Builder builder2 = new RelDataTypeFactory.Builder(typeFactory); + builder2.add("BAZ", SqlTypeName.VARCHAR); + RelDataTypeFactory.Builder builder3 = new RelDataTypeFactory.Builder(typeFactory); + builder3.add("FOO", builder1.build()); + builder3.add("BAR", builder2.build()); + RelDataType rowType = builder3.build(); + Assertions.assertEquals(2, rowType.getFieldList().size()); + RelDataType flattenedType = DataTypeUtils.flatten(rowType, typeFactory); + Assertions.assertEquals(3, flattenedType.getFieldList().size()); + List flattenedNames = flattenedType.getFieldList().stream().map(x -> x.getName()) + .collect(Collectors.toList()); + Assertions.assertIterableEquals(Arrays.asList(new String[]{"FOO$QUX", "FOO$QIZ", "BAR$BAZ"}), + flattenedNames); + RelDataType unflattenedType = DataTypeUtils.unflatten(flattenedType, typeFactory); + RelOptUtil.eq("original", rowType, "flattened-unflattened", unflattenedType, Litmus.THROW); + String originalConnector = new ScriptImplementor.ConnectorImplementor("T1", + rowType, Collections.emptyMap()).sql(); + String unflattenedConnector = new ScriptImplementor.ConnectorImplementor("T1", + unflattenedType, Collections.emptyMap()).sql(); + Assertions.assertEquals(originalConnector, unflattenedConnector, + "Flattening and unflattening data types should have no impact on connector"); + Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `T1` (`FOO` ROW(`QUX` VARCHAR, " + + "`QIZ` VARCHAR), `BAR` ROW(`BAZ` VARCHAR)) WITH ();", unflattenedConnector, + "Flattened-unflattened connector should be correct"); + } + + @Test + public void flattenNestedArrays() { + RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RelDataTypeFactory.Builder builder1 = new RelDataTypeFactory.Builder(typeFactory); + builder1.add("QUX", SqlTypeName.VARCHAR); + builder1.add("QIZ", SqlTypeName.VARCHAR); + RelDataTypeFactory.Builder builder2 = new RelDataTypeFactory.Builder(typeFactory); + builder2.add("BAZ", SqlTypeName.VARCHAR); + RelDataTypeFactory.Builder builder3 = new RelDataTypeFactory.Builder(typeFactory); + builder3.add("FOO", typeFactory.createArrayType(builder1.build(), -1)); + builder3.add("BAR", typeFactory.createArrayType(builder2.build(), -1)); + RelDataType rowType = builder3.build(); + Assertions.assertEquals(2, rowType.getFieldList().size()); + RelDataType flattenedType = DataTypeUtils.flatten(rowType, typeFactory); + Assertions.assertEquals(2, flattenedType.getFieldList().size()); + List flattenedNames = flattenedType.getFieldList().stream().map(x -> x.getName()) + .collect(Collectors.toList()); + Assertions.assertIterableEquals(Arrays.asList(new String[]{"FOO", "BAR"}), + flattenedNames); + String flattenedConnector = new ScriptImplementor.ConnectorImplementor("T1", + flattenedType, Collections.emptyMap()).sql(); + Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `T1` (`FOO` ANY ARRAY, " + + "`BAR` ANY ARRAY) WITH ();", flattenedConnector, + "Flattened connector should have simplified arrays"); + } +}