From 3af3caea4991569b18b89d178c37a0e05acb1673 Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Tue, 26 Nov 2024 12:39:27 -0600 Subject: [PATCH 1/4] Add new planner, JDBC driver, and DDL machinery --- hoptimator-jdbc/build.gradle | 4 ++-- .../src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java | 1 - .../src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java | 1 + 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hoptimator-jdbc/build.gradle b/hoptimator-jdbc/build.gradle index 0217612c..474145ba 100644 --- a/hoptimator-jdbc/build.gradle +++ b/hoptimator-jdbc/build.gradle @@ -32,6 +32,6 @@ test { } java { - sourceCompatibility = JavaVersion.VERSION_11 - targetCompatibility = JavaVersion.VERSION_11 + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 } diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java index 4a799795..fd05c380 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java @@ -116,7 +116,6 @@ public void updateStatus(T obj, Object status) throws SQLException { checkResponse(resp); } - private void checkResponse(KubernetesApiResponse resp) throws SQLException { if (!resp.isSuccess()) { throw new SQLException(resp.getStatus().getMessage()); diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java index 437b5fcd..3997cb80 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java @@ -8,6 +8,7 @@ import io.kubernetes.client.openapi.ApiException; import java.util.Collection; +import java.util.Locale; import java.sql.SQLException; public class K8sYamlApi implements Api { From 2599d857e05de713061e0fb89eddf51ec3657161 Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Sun, 1 Dec 2024 18:35:09 -0600 Subject: [PATCH 2/4] Add Pipeline controller and Kafka JDBC driver --- .../src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java | 1 + .../src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java index fd05c380..4a799795 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java @@ -116,6 +116,7 @@ public void updateStatus(T obj, Object status) throws SQLException { checkResponse(resp); } + private void checkResponse(KubernetesApiResponse resp) throws SQLException { if (!resp.isSuccess()) { throw new SQLException(resp.getStatus().getMessage()); diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java index 3997cb80..437b5fcd 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java @@ -8,7 +8,6 @@ import io.kubernetes.client.openapi.ApiException; import java.util.Collection; -import java.util.Locale; import java.sql.SQLException; public class K8sYamlApi implements Api { From 5f1343e86967458d60fc7eed64f266ee12821f0e Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Wed, 4 Dec 2024 01:20:19 -0600 Subject: [PATCH 3/4] Add JobTemplates and Flink demo --- deploy/samples/flink.yaml | 36 ++++ generate-models.sh | 1 + .../java/sqlline/HoptimatorAppConfig.java | 5 +- .../jdbc/HoptimatorDdlExecutor.java | 19 +- .../hoptimator/jdbc/QuidemTestBase.java | 5 +- .../hoptimator/k8s/K8sApiEndpoints.java | 5 + .../hoptimator/k8s/K8sDeployerProvider.java | 5 + .../hoptimator/k8s/K8sJobDeployer.java | 51 +++++ .../k8s/K8sMaterializedViewDeployer.java | 2 +- .../hoptimator/k8s/K8sPipelineDeployer.java | 8 +- .../hoptimator/k8s/K8sSourceDeployer.java | 2 +- .../linkedin/hoptimator/k8s/K8sYamlApi.java | 2 +- .../k8s/models/V1alpha1Database.java | 2 +- .../k8s/models/V1alpha1DatabaseList.java | 2 +- .../k8s/models/V1alpha1DatabaseSpec.java | 2 +- .../k8s/models/V1alpha1JobTemplate.java | 189 +++++++++++++++++ .../k8s/models/V1alpha1JobTemplateList.java | 195 ++++++++++++++++++ .../k8s/models/V1alpha1JobTemplateSpec.java | 139 +++++++++++++ .../k8s/models/V1alpha1Pipeline.java | 2 +- .../k8s/models/V1alpha1PipelineList.java | 2 +- .../k8s/models/V1alpha1PipelineSpec.java | 2 +- .../k8s/models/V1alpha1PipelineStatus.java | 2 +- .../k8s/models/V1alpha1Subscription.java | 2 +- .../k8s/models/V1alpha1SubscriptionList.java | 2 +- .../k8s/models/V1alpha1SubscriptionSpec.java | 2 +- .../models/V1alpha1SubscriptionStatus.java | 2 +- .../k8s/models/V1alpha1TableTemplate.java | 2 +- .../k8s/models/V1alpha1TableTemplateList.java | 2 +- .../k8s/models/V1alpha1TableTemplateSpec.java | 2 +- .../hoptimator/k8s/models/V1alpha1View.java | 2 +- .../k8s/models/V1alpha1ViewList.java | 2 +- .../k8s/models/V1alpha1ViewSpec.java | 2 +- .../src/main/resources/jobtemplates.crd.yaml | 43 ++++ .../src/main/resources/sqljobs.crd.yaml | 84 ++++++++ .../main/resources/tabletemplates.crd.yaml | 3 +- .../src/test/resources/kafka-ddl.id | 32 ++- .../operator/pipeline/PipelineReconciler.java | 2 +- .../hoptimator/util/DeploymentService.java | 3 +- .../com/linkedin/hoptimator/util/Job.java | 31 +++ .../hoptimator/util/MaterializedView.java | 63 ++++-- .../com/linkedin/hoptimator/util/Sink.java | 3 +- .../linkedin/hoptimator/util/Template.java | 35 ++-- .../hoptimator/util/planner/Pipeline.java | 10 +- .../hoptimator/util/planner/PipelineRel.java | 100 ++++++--- .../util/planner/PipelineRules.java | 9 +- .../util/planner/ScriptImplementor.java | 9 +- 46 files changed, 1022 insertions(+), 103 deletions(-) create mode 100644 deploy/samples/flink.yaml create mode 100644 hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java create mode 100644 hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplate.java create mode 100644 hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplateList.java create mode 100644 hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplateSpec.java create mode 100644 hoptimator-k8s/src/main/resources/jobtemplates.crd.yaml create mode 100644 hoptimator-k8s/src/main/resources/sqljobs.crd.yaml create mode 100644 hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Job.java diff --git a/deploy/samples/flink.yaml b/deploy/samples/flink.yaml new file mode 100644 index 00000000..764475a5 --- /dev/null +++ b/deploy/samples/flink.yaml @@ -0,0 +1,36 @@ +## This template adds Flink support. + +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: JobTemplate +metadata: + name: flink-template +spec: + yaml: | + apiVersion: flink.apache.org/v1beta1 + kind: FlinkDeployment + metadata: + name: {{name}} + spec: + image: docker.io/library/hoptimator-flink-runner + imagePullPolicy: Never + flinkVersion: v1_16 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 0.1 + taskManager: + resource: + memory: "2048m" + cpu: 0.1 + job: + entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner + args: + - {{sql}} + jarURI: local:///opt/hoptimator-flink-runner.jar + parallelism: 1 + upgradeMode: stateless + state: running + diff --git a/generate-models.sh b/generate-models.sh index 3fec7e6a..5f7a6ff7 100755 --- a/generate-models.sh +++ b/generate-models.sh @@ -11,6 +11,7 @@ docker run \ ghcr.io/kubernetes-client/java/crd-model-gen:v1.0.6 \ /generate.sh -o "$(pwd)/hoptimator-k8s" -n "" -p "com.linkedin.hoptimator.k8s" \ -u "$(pwd)/hoptimator-k8s/src/main/resources/databases.crd.yaml" \ + -u "$(pwd)/hoptimator-k8s/src/main/resources/jobtemplates.crd.yaml" \ -u "$(pwd)/hoptimator-k8s/src/main/resources/pipelines.crd.yaml" \ -u "$(pwd)/hoptimator-k8s/src/main/resources/tabletemplates.crd.yaml" \ -u "$(pwd)/hoptimator-k8s/src/main/resources/views.crd.yaml" \ diff --git a/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java b/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java index f751547e..804bf7ea 100644 --- a/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java +++ b/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java @@ -7,6 +7,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.sql.dialect.AnsiSqlDialect; import org.jline.reader.Completer; @@ -90,9 +91,7 @@ public void execute(String line, DispatchCallback dispatchCallback) { try { RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel; PipelineRel.Implementor plan = DeploymentService.plan(rel); - Sink sink = new Sink("PIPELINE", Arrays.asList(new String[]{"PIPELINE", "SINK"}), plan.rowType(), - Collections.emptyMap()); - sqlline.output(plan.insertInto(sink).sql()); + sqlline.output(plan.sql().apply(AnsiSqlDialect.DEFAULT)); } catch (SQLException e) { sqlline.error(e); dispatchCallback.setToFailure(); diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java index 06eb6326..36943b6d 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java @@ -22,9 +22,14 @@ import com.linkedin.hoptimator.Database; import com.linkedin.hoptimator.util.DeploymentService; import com.linkedin.hoptimator.util.MaterializedView; +import com.linkedin.hoptimator.util.Sink; +import com.linkedin.hoptimator.util.planner.Pipeline; +import com.linkedin.hoptimator.util.planner.PipelineRel; +import com.linkedin.hoptimator.util.planner.ScriptImplementor; import org.apache.calcite.jdbc.CalcitePrepare; import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.server.DdlExecutor; @@ -35,6 +40,7 @@ import org.apache.calcite.schema.impl.ViewTable; import org.apache.calcite.schema.impl.ViewTableMacro; import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; @@ -166,9 +172,18 @@ public void execute(SqlCreateMaterializedView create, MaterializedViewTable materializedViewTable = new MaterializedViewTable(viewTableMacro); RelDataType rowType = materializedViewTable.getRowType(new SqlTypeFactoryImpl( RelDataTypeSystem.DEFAULT)); - MaterializedView hook = new MaterializedView(context, database, viewPath, rowType, sql, - Collections.emptyMap()); // TODO support CREATE ... WITH (options...) + + // Plan a pipeline to materialize the view. + RelNode rel = HoptimatorDriver.convert(context, sql).root.rel; + PipelineRel.Implementor plan = DeploymentService.plan(rel); + plan.setSink(database, viewPath, rowType, Collections.emptyMap()); + Pipeline pipeline = plan.pipeline(); + + MaterializedView hook = new MaterializedView(database, viewPath, sql, plan.sql(), + plan.pipeline()); + // TODO support CREATE ... WITH (options...) ValidationService.validateOrThrow(hook, MaterializedView.class); + pipeline.update(); if (create.getReplace()) { DeploymentService.update(hook, MaterializedView.class); } else { diff --git a/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java b/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java index a9bd2e3b..40628740 100644 --- a/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java +++ b/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java @@ -83,8 +83,9 @@ public void execute(Context context, boolean execute) throws Exception { CalciteConnection conn = (CalciteConnection) context.connection(); RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel; String specs = DeploymentService.plan(rel).pipeline().specify().stream() - .collect(Collectors.joining("\n---\n")); - context.echo(Collections.singletonList(specs)); + .collect(Collectors.joining("---\n")); + String[] lines = specs.replaceAll(";\n","\n").split("\n"); + context.echo(Arrays.asList(lines)); } else { context.echo(content); } diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApiEndpoints.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApiEndpoints.java index 610f174f..2745e8df 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApiEndpoints.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApiEndpoints.java @@ -2,6 +2,8 @@ import com.linkedin.hoptimator.k8s.models.V1alpha1Database; import com.linkedin.hoptimator.k8s.models.V1alpha1DatabaseList; +import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate; +import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList; import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline; import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList; import com.linkedin.hoptimator.k8s.models.V1alpha1View; @@ -35,6 +37,9 @@ public final class K8sApiEndpoints { public static final K8sApiEndpoint TABLE_TEMPLATES = new K8sApiEndpoint<>("TableTemplate", "hoptimator.linkedin.com", "v1alpha1", "tabletemplates", false, V1alpha1TableTemplate.class, V1alpha1TableTemplateList.class); + public static final K8sApiEndpoint JOB_TEMPLATES = + new K8sApiEndpoint<>("JobTemplate", "hoptimator.linkedin.com", "v1alpha1", "jobtemplates", false, + V1alpha1JobTemplate.class, V1alpha1JobTemplateList.class); private K8sApiEndpoints() { } diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java index ae72ec48..e1b6621f 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java @@ -1,6 +1,8 @@ package com.linkedin.hoptimator.k8s; +import com.linkedin.hoptimator.util.Sink; import com.linkedin.hoptimator.util.Source; +import com.linkedin.hoptimator.util.Job; import com.linkedin.hoptimator.Deployer; import com.linkedin.hoptimator.DeployerProvider; @@ -28,6 +30,9 @@ public Collection> deployers(Class clazz) { if (Source.class.isAssignableFrom(clazz)) { list.add((Deployer) new K8sSourceDeployer(K8sContext.currentContext())); } + if (Job.class.isAssignableFrom(clazz)) { + list.add((Deployer) new K8sJobDeployer(K8sContext.currentContext())); + } return list; } } diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java new file mode 100644 index 00000000..222e0d8b --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java @@ -0,0 +1,51 @@ +package com.linkedin.hoptimator.k8s; + +import com.linkedin.hoptimator.util.Job; +import com.linkedin.hoptimator.util.Template; +import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate; +import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList; + +import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.dialect.AnsiSqlDialect; +import org.apache.calcite.sql.dialect.MysqlSqlDialect; +import org.apache.calcite.sql.dialect.CalciteSqlDialect; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.sql.SQLException; + +/** Specifies an abstract Job with concrete YAML by applying JobTemplates. */ +class K8sJobDeployer extends K8sYamlDeployer { + + private final K8sApi jobTemplateApi; + + K8sJobDeployer(K8sContext context) { + super(context); + this.jobTemplateApi = new K8sApi<>(context, K8sApiEndpoints.JOB_TEMPLATES); + } + + @Override + public List specify(Job job) throws SQLException { + Function sql = job.sql(); + Template.Environment env = Template.Environment.EMPTY + .with("name", job.sink().database() + "-" + job.sink().table().toLowerCase(Locale.ROOT)) + .with("database", job.sink().database()) + .with("schema", job.sink().schema()) + .with("table", job.sink().table()) + .with("sql", sql.apply(MysqlSqlDialect.DEFAULT)) + .with("ansisql", sql.apply(AnsiSqlDialect.DEFAULT)) + .with("calcitesql", sql.apply(CalciteSqlDialect.DEFAULT)) + .with(job.sink().options()); + return jobTemplateApi.list().stream() + .map(x -> x.getSpec()) + .filter(x -> x.getDatabases() == null || x.getDatabases().contains(job.sink().database())) + .filter(x -> x.getYaml() != null) + .map(x -> x.getYaml()) + .map(x -> new Template.SimpleTemplate(x).render(env)) + .collect(Collectors.toList()); + } +} diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployer.java index 3788422e..b5e2bc14 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployer.java @@ -25,6 +25,6 @@ protected V1alpha1View toK8sObject(MaterializedView view) { .metadata(new V1ObjectMeta().name(name)) .spec(new V1alpha1ViewSpec() .view(q.pollLast()).schema(q.pollLast()) - .sql(view.sql()).materialized(true)); + .sql(view.viewSql()).materialized(true)); } } diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployer.java index 32e59358..befc5d69 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployer.java @@ -11,6 +11,7 @@ import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.sql.dialect.AnsiSqlDialect; import io.kubernetes.client.openapi.models.V1ObjectMeta; @@ -27,12 +28,9 @@ class K8sPipelineDeployer extends K8sDeployer { private final K8sApi tableTemplateApi; diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java index 437b5fcd..065979d4 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java @@ -61,7 +61,7 @@ public void update(String yaml) throws SQLException { private DynamicKubernetesObject objFromYaml(String yaml) { DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml); if (obj.getMetadata().getNamespace() == null) { - obj.getMetadata().namespace(context.namespace()); + obj.setMetadata(obj.getMetadata().namespace(context.namespace())); } return obj; } diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Database.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Database.java index 0ac69b41..2c32aaa9 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Database.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Database.java @@ -30,7 +30,7 @@ * Database metadata. */ @ApiModel(description = "Database metadata.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1Database implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1DatabaseList.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1DatabaseList.java index 8650d712..bfebbd8d 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1DatabaseList.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1DatabaseList.java @@ -32,7 +32,7 @@ * DatabaseList is a list of Database */ @ApiModel(description = "DatabaseList is a list of Database") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1DatabaseList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1DatabaseSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1DatabaseSpec.java index 3c90c6d4..89230ca7 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1DatabaseSpec.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1DatabaseSpec.java @@ -28,7 +28,7 @@ * Database spec. */ @ApiModel(description = "Database spec.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1DatabaseSpec { /** * SQL dialect the driver expects. diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplate.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplate.java new file mode 100644 index 00000000..b1e56897 --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplate.java @@ -0,0 +1,189 @@ +/* + * Kubernetes + * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) + * + * The version of the OpenAPI document: v1.21.1 + * + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + + +package com.linkedin.hoptimator.k8s.models; + +import java.util.Objects; +import java.util.Arrays; +import com.google.gson.TypeAdapter; +import com.google.gson.annotations.JsonAdapter; +import com.google.gson.annotations.SerializedName; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateSpec; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import java.io.IOException; + +/** + * Template to apply to matching jobs. + */ +@ApiModel(description = "Template to apply to matching jobs.") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") +public class V1alpha1JobTemplate implements io.kubernetes.client.common.KubernetesObject { + public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; + @SerializedName(SERIALIZED_NAME_API_VERSION) + private String apiVersion; + + public static final String SERIALIZED_NAME_KIND = "kind"; + @SerializedName(SERIALIZED_NAME_KIND) + private String kind; + + public static final String SERIALIZED_NAME_METADATA = "metadata"; + @SerializedName(SERIALIZED_NAME_METADATA) + private V1ObjectMeta metadata = null; + + public static final String SERIALIZED_NAME_SPEC = "spec"; + @SerializedName(SERIALIZED_NAME_SPEC) + private V1alpha1JobTemplateSpec spec; + + + public V1alpha1JobTemplate apiVersion(String apiVersion) { + + this.apiVersion = apiVersion; + return this; + } + + /** + * APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + * @return apiVersion + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources") + + public String getApiVersion() { + return apiVersion; + } + + + public void setApiVersion(String apiVersion) { + this.apiVersion = apiVersion; + } + + + public V1alpha1JobTemplate kind(String kind) { + + this.kind = kind; + return this; + } + + /** + * Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + * @return kind + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds") + + public String getKind() { + return kind; + } + + + public void setKind(String kind) { + this.kind = kind; + } + + + public V1alpha1JobTemplate metadata(V1ObjectMeta metadata) { + + this.metadata = metadata; + return this; + } + + /** + * Get metadata + * @return metadata + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "") + + public V1ObjectMeta getMetadata() { + return metadata; + } + + + public void setMetadata(V1ObjectMeta metadata) { + this.metadata = metadata; + } + + + public V1alpha1JobTemplate spec(V1alpha1JobTemplateSpec spec) { + + this.spec = spec; + return this; + } + + /** + * Get spec + * @return spec + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "") + + public V1alpha1JobTemplateSpec getSpec() { + return spec; + } + + + public void setSpec(V1alpha1JobTemplateSpec spec) { + this.spec = spec; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + V1alpha1JobTemplate v1alpha1JobTemplate = (V1alpha1JobTemplate) o; + return Objects.equals(this.apiVersion, v1alpha1JobTemplate.apiVersion) && + Objects.equals(this.kind, v1alpha1JobTemplate.kind) && + Objects.equals(this.metadata, v1alpha1JobTemplate.metadata) && + Objects.equals(this.spec, v1alpha1JobTemplate.spec); + } + + @Override + public int hashCode() { + return Objects.hash(apiVersion, kind, metadata, spec); + } + + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class V1alpha1JobTemplate {\n"); + sb.append(" apiVersion: ").append(toIndentedString(apiVersion)).append("\n"); + sb.append(" kind: ").append(toIndentedString(kind)).append("\n"); + sb.append(" metadata: ").append(toIndentedString(metadata)).append("\n"); + sb.append(" spec: ").append(toIndentedString(spec)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + +} + diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplateList.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplateList.java new file mode 100644 index 00000000..f4f9884a --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplateList.java @@ -0,0 +1,195 @@ +/* + * Kubernetes + * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) + * + * The version of the OpenAPI document: v1.21.1 + * + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + + +package com.linkedin.hoptimator.k8s.models; + +import java.util.Objects; +import java.util.Arrays; +import com.google.gson.TypeAdapter; +import com.google.gson.annotations.JsonAdapter; +import com.google.gson.annotations.SerializedName; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate; +import io.kubernetes.client.openapi.models.V1ListMeta; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * JobTemplateList is a list of JobTemplate + */ +@ApiModel(description = "JobTemplateList is a list of JobTemplate") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") +public class V1alpha1JobTemplateList implements io.kubernetes.client.common.KubernetesListObject { + public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; + @SerializedName(SERIALIZED_NAME_API_VERSION) + private String apiVersion; + + public static final String SERIALIZED_NAME_ITEMS = "items"; + @SerializedName(SERIALIZED_NAME_ITEMS) + private List items = new ArrayList<>(); + + public static final String SERIALIZED_NAME_KIND = "kind"; + @SerializedName(SERIALIZED_NAME_KIND) + private String kind; + + public static final String SERIALIZED_NAME_METADATA = "metadata"; + @SerializedName(SERIALIZED_NAME_METADATA) + private V1ListMeta metadata = null; + + + public V1alpha1JobTemplateList apiVersion(String apiVersion) { + + this.apiVersion = apiVersion; + return this; + } + + /** + * APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + * @return apiVersion + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources") + + public String getApiVersion() { + return apiVersion; + } + + + public void setApiVersion(String apiVersion) { + this.apiVersion = apiVersion; + } + + + public V1alpha1JobTemplateList items(List items) { + + this.items = items; + return this; + } + + public V1alpha1JobTemplateList addItemsItem(V1alpha1JobTemplate itemsItem) { + this.items.add(itemsItem); + return this; + } + + /** + * List of jobtemplates. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md + * @return items + **/ + @ApiModelProperty(required = true, value = "List of jobtemplates. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md") + + public List getItems() { + return items; + } + + + public void setItems(List items) { + this.items = items; + } + + + public V1alpha1JobTemplateList kind(String kind) { + + this.kind = kind; + return this; + } + + /** + * Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + * @return kind + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds") + + public String getKind() { + return kind; + } + + + public void setKind(String kind) { + this.kind = kind; + } + + + public V1alpha1JobTemplateList metadata(V1ListMeta metadata) { + + this.metadata = metadata; + return this; + } + + /** + * Get metadata + * @return metadata + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "") + + public V1ListMeta getMetadata() { + return metadata; + } + + + public void setMetadata(V1ListMeta metadata) { + this.metadata = metadata; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + V1alpha1JobTemplateList v1alpha1JobTemplateList = (V1alpha1JobTemplateList) o; + return Objects.equals(this.apiVersion, v1alpha1JobTemplateList.apiVersion) && + Objects.equals(this.items, v1alpha1JobTemplateList.items) && + Objects.equals(this.kind, v1alpha1JobTemplateList.kind) && + Objects.equals(this.metadata, v1alpha1JobTemplateList.metadata); + } + + @Override + public int hashCode() { + return Objects.hash(apiVersion, items, kind, metadata); + } + + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class V1alpha1JobTemplateList {\n"); + sb.append(" apiVersion: ").append(toIndentedString(apiVersion)).append("\n"); + sb.append(" items: ").append(toIndentedString(items)).append("\n"); + sb.append(" kind: ").append(toIndentedString(kind)).append("\n"); + sb.append(" metadata: ").append(toIndentedString(metadata)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + +} + diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplateSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplateSpec.java new file mode 100644 index 00000000..5cdaac4e --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1JobTemplateSpec.java @@ -0,0 +1,139 @@ +/* + * Kubernetes + * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) + * + * The version of the OpenAPI document: v1.21.1 + * + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + + +package com.linkedin.hoptimator.k8s.models; + +import java.util.Objects; +import java.util.Arrays; +import com.google.gson.TypeAdapter; +import com.google.gson.annotations.JsonAdapter; +import com.google.gson.annotations.SerializedName; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * TableTemplate spec. + */ +@ApiModel(description = "TableTemplate spec.") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") +public class V1alpha1JobTemplateSpec { + public static final String SERIALIZED_NAME_DATABASES = "databases"; + @SerializedName(SERIALIZED_NAME_DATABASES) + private List databases = null; + + public static final String SERIALIZED_NAME_YAML = "yaml"; + @SerializedName(SERIALIZED_NAME_YAML) + private String yaml; + + + public V1alpha1JobTemplateSpec databases(List databases) { + + this.databases = databases; + return this; + } + + public V1alpha1JobTemplateSpec addDatabasesItem(String databasesItem) { + if (this.databases == null) { + this.databases = new ArrayList<>(); + } + this.databases.add(databasesItem); + return this; + } + + /** + * Databases this template matches. If null, matches everything. + * @return databases + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "Databases this template matches. If null, matches everything.") + + public List getDatabases() { + return databases; + } + + + public void setDatabases(List databases) { + this.databases = databases; + } + + + public V1alpha1JobTemplateSpec yaml(String yaml) { + + this.yaml = yaml; + return this; + } + + /** + * YAML template used to generate K8s specs. + * @return yaml + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "YAML template used to generate K8s specs.") + + public String getYaml() { + return yaml; + } + + + public void setYaml(String yaml) { + this.yaml = yaml; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + V1alpha1JobTemplateSpec v1alpha1JobTemplateSpec = (V1alpha1JobTemplateSpec) o; + return Objects.equals(this.databases, v1alpha1JobTemplateSpec.databases) && + Objects.equals(this.yaml, v1alpha1JobTemplateSpec.yaml); + } + + @Override + public int hashCode() { + return Objects.hash(databases, yaml); + } + + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class V1alpha1JobTemplateSpec {\n"); + sb.append(" databases: ").append(toIndentedString(databases)).append("\n"); + sb.append(" yaml: ").append(toIndentedString(yaml)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + +} + diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Pipeline.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Pipeline.java index 9c71ee61..f7202187 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Pipeline.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Pipeline.java @@ -31,7 +31,7 @@ * A set of objects that work together to deliver data. */ @ApiModel(description = "A set of objects that work together to deliver data.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1Pipeline implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineList.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineList.java index 3f03b2d6..888f1f6e 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineList.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineList.java @@ -32,7 +32,7 @@ * PipelineList is a list of Pipeline */ @ApiModel(description = "PipelineList is a list of Pipeline") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1PipelineList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineSpec.java index 42025f91..540fc7bb 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineSpec.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineSpec.java @@ -28,7 +28,7 @@ * Pipeline spec. */ @ApiModel(description = "Pipeline spec.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1PipelineSpec { public static final String SERIALIZED_NAME_SQL = "sql"; @SerializedName(SERIALIZED_NAME_SQL) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineStatus.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineStatus.java index 93e8133e..f1578b41 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineStatus.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1PipelineStatus.java @@ -28,7 +28,7 @@ * Pipeline status. */ @ApiModel(description = "Pipeline status.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1PipelineStatus { public static final String SERIALIZED_NAME_FAILED = "failed"; @SerializedName(SERIALIZED_NAME_FAILED) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Subscription.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Subscription.java index 71af798d..da4e885c 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Subscription.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1Subscription.java @@ -31,7 +31,7 @@ * Hoptimator Subscription */ @ApiModel(description = "Hoptimator Subscription") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionList.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionList.java index d9f51a95..662533dd 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionList.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionList.java @@ -32,7 +32,7 @@ * SubscriptionList is a list of Subscription */ @ApiModel(description = "SubscriptionList is a list of Subscription") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionSpec.java index a2db1c2f..7faba2fd 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionSpec.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionSpec.java @@ -31,7 +31,7 @@ * Subscription spec */ @ApiModel(description = "Subscription spec") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1SubscriptionSpec { public static final String SERIALIZED_NAME_DATABASE = "database"; @SerializedName(SERIALIZED_NAME_DATABASE) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionStatus.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionStatus.java index 9711ec59..fb806ab3 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionStatus.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SubscriptionStatus.java @@ -32,7 +32,7 @@ * Filled in by the operator. */ @ApiModel(description = "Filled in by the operator.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1SubscriptionStatus { public static final String SERIALIZED_NAME_ATTRIBUTES = "attributes"; @SerializedName(SERIALIZED_NAME_ATTRIBUTES) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplate.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplate.java index dda57b1d..b3711387 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplate.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplate.java @@ -30,7 +30,7 @@ * Template to apply to matching tables. */ @ApiModel(description = "Template to apply to matching tables.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1TableTemplate implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplateList.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplateList.java index 56aaa685..0f710e92 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplateList.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplateList.java @@ -32,7 +32,7 @@ * TableTemplateList is a list of TableTemplate */ @ApiModel(description = "TableTemplateList is a list of TableTemplate") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1TableTemplateList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplateSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplateSpec.java index 5065c847..80bca5d6 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplateSpec.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTemplateSpec.java @@ -30,7 +30,7 @@ * TableTemplate spec. */ @ApiModel(description = "TableTemplate spec.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1TableTemplateSpec { public static final String SERIALIZED_NAME_CONNECTOR = "connector"; @SerializedName(SERIALIZED_NAME_CONNECTOR) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1View.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1View.java index b93327f3..d233af35 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1View.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1View.java @@ -30,7 +30,7 @@ * A SQL view. */ @ApiModel(description = "A SQL view.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1View implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1ViewList.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1ViewList.java index 2751224c..5ca2aaa2 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1ViewList.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1ViewList.java @@ -32,7 +32,7 @@ * ViewList is a list of View */ @ApiModel(description = "ViewList is a list of View") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1ViewList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1ViewSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1ViewSpec.java index 57dacda7..18854a26 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1ViewSpec.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1ViewSpec.java @@ -28,7 +28,7 @@ * View spec. */ @ApiModel(description = "View spec.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]") public class V1alpha1ViewSpec { public static final String SERIALIZED_NAME_MATERIALIZED = "materialized"; @SerializedName(SERIALIZED_NAME_MATERIALIZED) diff --git a/hoptimator-k8s/src/main/resources/jobtemplates.crd.yaml b/hoptimator-k8s/src/main/resources/jobtemplates.crd.yaml new file mode 100644 index 00000000..4aa64eea --- /dev/null +++ b/hoptimator-k8s/src/main/resources/jobtemplates.crd.yaml @@ -0,0 +1,43 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: jobtemplates.hoptimator.linkedin.com +spec: + group: hoptimator.linkedin.com + names: + kind: JobTemplate + listKind: JobTemplateList + plural: jobtemplates + singular: jobtemplate + shortNames: + - jobt + preserveUnknownFields: false + scope: Namespaced + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + description: Template to apply to matching jobs. + type: object + properties: + apiVersion: + type: string + kind: + type: string + metadata: + type: object + spec: + description: TableTemplate spec. + type: object + properties: + yaml: + description: YAML template used to generate K8s specs. + type: string + databases: + description: Databases this template matches. If null, matches everything. + type: array + items: + type: string + diff --git a/hoptimator-k8s/src/main/resources/sqljobs.crd.yaml b/hoptimator-k8s/src/main/resources/sqljobs.crd.yaml new file mode 100644 index 00000000..2eb9089b --- /dev/null +++ b/hoptimator-k8s/src/main/resources/sqljobs.crd.yaml @@ -0,0 +1,84 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: sqljobs.hoptimator.linkedin.com +spec: + group: hoptimator.linkedin.com + names: + kind: SqlJob + listKind: SqlJobList + plural: sqljobs + singular: sqljob + shortNames: + - sql + - sj + preserveUnknownFields: false + scope: Namespaced + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + description: Hoptimator generic SQL job + type: object + properties: + apiVersion: + type: string + kind: + type: string + metadata: + type: object + spec: + description: SQL job spec + type: object + properties: + sql: + description: SQL script the job should run. + type: array + items: + type: string + dialect: + description: Flink, etc. + type: string + enum: + - Flink + default: Flink + executionMode: + description: Streaming or Batch. + type: string + enum: + - Streaming + - Batch + default: Streaming + required: + - sql + status: + description: Filled in by the operator. + type: object + properties: + ready: + description: Whether the SqlJob is running or completed. + type: boolean + failed: + description: Whether the SqlJob has failed. + type: boolean + message: + description: Error or success message, for information only. + type: string + subresources: + status: {} + additionalPrinterColumns: + - name: DIALECT + type: string + description: SQL dialect. + jsonPath: .spec.dialect + - name: MODE + type: string + description: Execution mode. + jsonPath: .spec.executionMode + - name: STATUS + type: string + description: Job status. + jsonPath: .status.message + diff --git a/hoptimator-k8s/src/main/resources/tabletemplates.crd.yaml b/hoptimator-k8s/src/main/resources/tabletemplates.crd.yaml index 7865d864..87e338ea 100644 --- a/hoptimator-k8s/src/main/resources/tabletemplates.crd.yaml +++ b/hoptimator-k8s/src/main/resources/tabletemplates.crd.yaml @@ -10,8 +10,7 @@ spec: plural: tabletemplates singular: tabletemplate shortNames: - - tmpl - - tmpls + - tabt preserveUnknownFields: false scope: Namespaced versions: diff --git a/hoptimator-kafka/src/test/resources/kafka-ddl.id b/hoptimator-kafka/src/test/resources/kafka-ddl.id index b8575af0..349b7177 100644 --- a/hoptimator-kafka/src/test/resources/kafka-ddl.id +++ b/hoptimator-kafka/src/test/resources/kafka-ddl.id @@ -16,7 +16,6 @@ spec: config: retention.ms: 7200000 segment.bytes: 1073741824 - --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic @@ -32,5 +31,34 @@ spec: config: retention.ms: 7200000 segment.bytes: 1073741824 - +--- +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: kafka-database-existing-topic-1 +spec: + image: docker.io/library/hoptimator-flink-runner + imagePullPolicy: Never + flinkVersion: v1_16 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 0.1 + taskManager: + resource: + memory: "2048m" + cpu: 0.1 + job: + entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner + args: + - CREATE TABLE IF NOT EXISTS `existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-2', 'connector'='kafka') + - CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-1', 'connector'='kafka') + - INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `KAFKA`.`existing-topic-2` + jarURI: local:///opt/hoptimator-flink-runner.jar + parallelism: 1 + upgradeMode: stateless + state: running !specify diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java index 3d3cbcee..9f595af1 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java @@ -62,7 +62,7 @@ public Result reconcile(Request request) { log.info("Checking status of Pipeline {}...", name); - boolean ready = Arrays.asList(object.getSpec().getYaml().split("^---$")).stream() + boolean ready = Arrays.asList(object.getSpec().getYaml().split("\n---\n")).stream() .filter(x -> x != null && !x.isEmpty()) .allMatch(x -> isReady(x)); diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DeploymentService.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DeploymentService.java index b671f40a..6b303970 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DeploymentService.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DeploymentService.java @@ -97,7 +97,8 @@ public static PipelineRel.Implementor plan(RelNode rel) throws SQLException { // TODO add materializations here (currently empty list) PipelineRel plan = (PipelineRel) program.run(rel.getCluster().getPlanner(), rel, traitSet, Collections.emptyList(), Collections.emptyList()); - PipelineRel.Implementor implementor = new PipelineRel.Implementor(plan); + PipelineRel.Implementor implementor = new PipelineRel.Implementor(); + implementor.visit(plan); return implementor; } } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Job.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Job.java new file mode 100644 index 00000000..38367f59 --- /dev/null +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Job.java @@ -0,0 +1,31 @@ +package com.linkedin.hoptimator.util; + +import org.apache.calcite.sql.SqlDialect; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +public class Job { + + private final Sink sink; + private final Function sql; + + public Job(Sink sink, Function sql) { + this.sink = sink; + this.sql = sql; + } + + public Sink sink() { + return sink; + } + + public Function sql() { + return sql; + } + + @Override + public String toString() { + return "Job[" + sink.pathString() + "]"; + } +} diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/MaterializedView.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/MaterializedView.java index 6c02f8b4..ef75e344 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/MaterializedView.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/MaterializedView.java @@ -1,32 +1,67 @@ package com.linkedin.hoptimator.util; -import org.apache.calcite.jdbc.CalcitePrepare; +import com.linkedin.hoptimator.util.planner.Pipeline; +import com.linkedin.hoptimator.util.planner.ScriptImplementor; + import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlDialect; import java.util.List; import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class MaterializedView { + + private final String database; + private final List path; + private final String viewSql; + private final Function pipelineSql; + private final Pipeline pipeline; + + public MaterializedView(String database, List path, String viewSql, + Function pipelineSql, Pipeline pipeline) { + this.database = database; + this.path = path; + this.viewSql = viewSql; + this.pipelineSql = pipelineSql; + this.pipeline = pipeline; + } + + /** SQL query which defines this view, e.g. SELECT ... FROM ... */ + public String viewSql() { + return viewSql; + } + + public Pipeline pipeline() { + return pipeline; + } -public class MaterializedView extends Sink { + public Function pipelineSql() { + return pipelineSql; + } - private final CalcitePrepare.Context context; - private final String sql; + /** The internal name for the database this table belongs to. Not necessary the same as schema. */ + public String database() { + return database; + } - public MaterializedView(CalcitePrepare.Context context, String database, List path, - RelDataType rowType, String sql, Map options) { - super(database, path, rowType, options); - this.context = context; - this.sql = sql; + public String table() { + return path.get(path.size() - 1); } - /** Context required to evaluate the view */ - public CalcitePrepare.Context context() { - return context; + public String schema() { + return path.get(path.size() - 2); } - public String sql() { - return sql; + public List path() { + return path; } + protected String pathString() { + return path.stream().collect(Collectors.joining(".")); + } + @Override public String toString() { return "MaterializedView[" + pathString() + "]"; diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Sink.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Sink.java index 6e6f9eeb..015a5a6e 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Sink.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Sink.java @@ -7,7 +7,8 @@ public class Sink extends Source { - public Sink(String database, List path, RelDataType rowType, Map options) { + public Sink(String database, List path, RelDataType rowType, + Map options) { super(database, path, rowType, options); } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java index 78109c16..a51768ce 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java @@ -31,53 +31,56 @@ default Environment orIgnore() { /** Basic Environment implementation */ class SimpleEnvironment implements Environment { - private final Map vars = new HashMap<>(); + private final Map> vars; public SimpleEnvironment() { + this.vars = new HashMap<>(); } - public SimpleEnvironment(Properties properties) { - properties.forEach((k, v) -> vars.put(k.toString(), v.toString())); + public SimpleEnvironment(Map> vars) { + this.vars = vars; } - public SimpleEnvironment(Map vars) { - exportAll(vars); + protected void export(String key, String value) { + vars.put(key, () -> value); } - protected void export(String property, String value) { - vars.put(property, value); + protected void export(String key, Supplier supplier) { + vars.put(key, supplier); } protected void exportAll(Map properties) { - vars.putAll(properties); + properties.forEach((k, v) -> vars.put(k, () -> v)); } public SimpleEnvironment with(String key, String value) { - Map thisVars = this.vars; - return new SimpleEnvironment(){{ - exportAll(thisVars); + return new SimpleEnvironment(vars){{ export(key, value); }}; } public SimpleEnvironment with(Map values) { - Map thisVars = this.vars; - return new SimpleEnvironment() {{ - exportAll(thisVars); + return new SimpleEnvironment(vars) {{ exportAll(values); }}; } + public SimpleEnvironment with(String key, Supplier supplier) { + return new SimpleEnvironment(vars){{ + export(key, supplier); + }}; + } + @Override public String getOrDefault(String key, Supplier f) { - if (!vars.containsKey(key)) { + if (!vars.containsKey(key) || vars.get(key).get() == null) { if (f == null || f.get() == null) { throw new IllegalArgumentException("No variable '" + key + "' found in the environment"); } else { return f.get(); } } - return vars.get(key); + return vars.get(key).get(); } } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/Pipeline.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/Pipeline.java index a0470b0f..63effa26 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/Pipeline.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/Pipeline.java @@ -2,8 +2,12 @@ import com.linkedin.hoptimator.Deployable; +import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.dialect.AnsiSqlDialect; + import java.util.ArrayList; import java.util.List; +import java.util.function.Function; import java.sql.SQLException; /** @@ -11,10 +15,10 @@ */ public class Pipeline implements Deployable { - private List deployables = new ArrayList<>(); + private List deployables; - public void add(Deployable deployable) { - deployables.add(deployable); + public Pipeline(List deployables) { + this.deployables = deployables; } @Override diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java index 0d833c36..f9e929bc 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java @@ -1,17 +1,25 @@ package com.linkedin.hoptimator.util.planner; +import com.linkedin.hoptimator.Deployable; import com.linkedin.hoptimator.util.ConnectionService; import com.linkedin.hoptimator.util.DeploymentService; import com.linkedin.hoptimator.util.Source; import com.linkedin.hoptimator.util.Sink; +import com.linkedin.hoptimator.util.Job; import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.util.Litmus; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.function.Function; import java.sql.SQLException; /** @@ -30,49 +38,89 @@ public interface PipelineRel extends RelNode { /** Implements a deployable Pipeline. */ class Implementor { - private final RelNode relNode; - private ScriptImplementor script = ScriptImplementor.empty(); - private final Pipeline pipeline = new Pipeline(); - - public Implementor(RelNode relNode) throws SQLException { - this.relNode = relNode; - visit(relNode); - } + private final List sources = new ArrayList<>(); + private RelNode query; + private String sinkDatabase = "pipeline"; + private List sinkPath = Arrays.asList(new String[]{"PIPELINE", "SINK"}); + private RelDataType sinkRowType = null; + private Map sinkOptions = Collections.emptyMap(); public void visit(RelNode node) throws SQLException { + if (query == null) { + query = node; + } for (RelNode input : node.getInputs()) { visit(input); } ((PipelineRel) node).implement(this); } - public RelDataType rowType() { - return relNode.getRowType(); - } - /** - * Adds a Source/Sink to the pipeline. + * Adds a source to the pipeline. * * This involves deploying any relevant objects, and configuring a * a connector. The connector is configured via `CREATE TABLE...WITH(...)`. */ - public void implement(T source, Class clazz) throws SQLException { - DeploymentService.deployables(source, clazz).forEach(x -> pipeline.add(x)); - Map configs = ConnectionService.configure(source, clazz); - script = script.connector(source.table(), source.rowType(), configs); + public void addSource(String database, List path, RelDataType rowType, + Map options) { + sources.add(new Source(database, path, rowType, options)); } - /** Script ending in INSERT INTO ... */ - public ScriptImplementor insertInto(Sink sink) throws SQLException { - RelOptUtil.eq(sink.table(), sink.rowType(), "pipeline", rowType(), Litmus.THROW); - Map configs = ConnectionService.configure(sink, Sink.class); - return script.connector(sink.table(), sink.rowType(), configs) - .insert(sink.table(), relNode); + /** + * Sets the sink to use for the pipeline. + * + * By default, the sink is `PIPELINE.SINK`. An expected row type is required + * for validation purposes. + */ + public void setSink(String database, List path, RelDataType rowType, + Map options) { + this.sinkDatabase = database; + this.sinkPath = path; + this.sinkRowType = rowType; + this.sinkOptions = options; } - /** Combine SQL and any Deployables into a Pipeline */ - public Pipeline pipeline() { - return pipeline; + public void setQuery(RelNode query) { + this.query = query; + } + + /** Combine Deployables into a Pipeline */ + public Pipeline pipeline() throws SQLException { + List deployables = new ArrayList<>(); + for (Source source : sources) { + DeploymentService.deployables(source, Source.class).forEach(x -> deployables.add(x)); + Map configs = ConnectionService.configure(source, Source.class); + } + RelDataType targetRowType = sinkRowType; + if (targetRowType == null) { + targetRowType = query.getRowType(); + } + Sink sink = new Sink(sinkDatabase, sinkPath, targetRowType, sinkOptions); + Map sinkConfigs = ConnectionService.configure(sink, Sink.class); + Job job = new Job(sink, sql()); + RelOptUtil.eq(sink.table(), sink.rowType(), "pipeline", query.getRowType(), Litmus.THROW); + DeploymentService.deployables(sink, Sink.class).forEach(x -> deployables.add(x)); + DeploymentService.deployables(job, Job.class).forEach(x -> deployables.add(x)); + return new Pipeline(deployables); + } + + public Function sql() throws SQLException { + ScriptImplementor script = ScriptImplementor.empty(); + List deployables = new ArrayList<>(); + for (Source source : sources) { + Map configs = ConnectionService.configure(source, Source.class); + script = script.connector(source.table(), source.rowType(), configs); + } + RelDataType targetRowType = sinkRowType; + if (targetRowType == null) { + targetRowType = query.getRowType(); + } + Sink sink = new Sink(sinkDatabase, sinkPath, targetRowType, sinkOptions); + Map sinkConfigs = ConnectionService.configure(sink, Sink.class); + script = script.connector(sink.table(), sink.rowType(), sinkConfigs); + script = script.insert(sink.table(), query); + RelOptUtil.eq(sink.table(), sink.rowType(), "pipeline", query.getRowType(), Litmus.THROW); + return script.seal(); } } } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java index 43de78f0..8bbd6585 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java @@ -93,8 +93,8 @@ static class PipelineTableScan extends TableScan implements PipelineRel { @Override public void implement(Implementor implementor) throws SQLException { - implementor.implement(new Source(database, table.getQualifiedName(), table.getRowType(), - Collections.emptyMap()), Source.class); // TODO pass in table scan hints + implementor.addSource(database, table.getQualifiedName(), table.getRowType(), + Collections.emptyMap()); // TODO pass in table scan hints } } @@ -149,8 +149,9 @@ public RelNode copy(RelTraitSet traitSet, List inputs) { @Override public void implement(Implementor implementor) throws SQLException { - implementor.implement(new Sink(database, table.getQualifiedName(), table.getRowType(), - Collections.emptyMap()), Sink.class); + implementor.setSink(database, table.getQualifiedName(), table.getRowType(), + Collections.emptyMap()); + implementor.setQuery(getInput()); } } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java index c720ab61..3d8feb24 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Arrays; import java.util.ArrayList; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.Optional; @@ -105,6 +106,11 @@ default String sql(SqlDialect dialect) { .replaceAll("\\n", " ").replaceAll("\\s*;\\s*", ";\n").trim(); } + /** Generate SQL for a given dialect */ + default Function seal() { + return x -> sql(x); + } + /** Implements an arbitrary RelNode as a statement */ class StatementImplementor implements ScriptImplementor { private final RelNode relNode; @@ -137,7 +143,8 @@ public void implement(SqlWriter w) { if (select.getSelectList() != null) { select.setSelectList((SqlNodeList) select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR)); } - w.literal(select.toSqlString(w.getDialect()).getSql()); +// w.literal(select.toSqlString(w.getDialect()).getSql()); + select.unparse(w, 0, 0); } // A `ROW(...)` operator which will unparse as just `(...)`. From 079c9980898ab3adcfda6a383cc3f7a5a34591af Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Fri, 6 Dec 2024 14:50:06 -0600 Subject: [PATCH 4/4] PR fixes h/t @jogrogan --- .../com/linkedin/hoptimator/util/planner/ScriptImplementor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java index 3d8feb24..bfc32714 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java @@ -143,7 +143,6 @@ public void implement(SqlWriter w) { if (select.getSelectList() != null) { select.setSelectList((SqlNodeList) select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR)); } -// w.literal(select.toSqlString(w.getDialect()).getSql()); select.unparse(w, 0, 0); }