Skip to content

Commit

Permalink
Add JobTemplates and Flink demo (#74)
Browse files Browse the repository at this point in the history
* Add JobTemplates and Flink demo

* PR fixes h/t @jogrogan
  • Loading branch information
ryannedolan authored Dec 6, 2024
1 parent 159a0e9 commit 11c53f8
Show file tree
Hide file tree
Showing 47 changed files with 1,023 additions and 105 deletions.
36 changes: 36 additions & 0 deletions deploy/samples/flink.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
## This template adds Flink support.

apiVersion: hoptimator.linkedin.com/v1alpha1
kind: JobTemplate
metadata:
name: flink-template
spec:
yaml: |
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: {{name}}
spec:
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 0.1
taskManager:
resource:
memory: "2048m"
cpu: 0.1
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- {{sql}}
jarURI: local:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running
1 change: 1 addition & 0 deletions generate-models.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ docker run \
ghcr.io/kubernetes-client/java/crd-model-gen:v1.0.6 \
/generate.sh -o "$(pwd)/hoptimator-k8s" -n "" -p "com.linkedin.hoptimator.k8s" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/databases.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/jobtemplates.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/pipelines.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/tabletemplates.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/views.crd.yaml" \
Expand Down
5 changes: 2 additions & 3 deletions hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;

import org.jline.reader.Completer;

Expand Down Expand Up @@ -90,9 +91,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
try {
RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel;
PipelineRel.Implementor plan = DeploymentService.plan(rel);
Sink sink = new Sink("PIPELINE", Arrays.asList(new String[]{"PIPELINE", "SINK"}), plan.rowType(),
Collections.emptyMap());
sqlline.output(plan.insertInto(sink).sql());
sqlline.output(plan.sql().apply(AnsiSqlDialect.DEFAULT));
} catch (SQLException e) {
sqlline.error(e);
dispatchCallback.setToFailure();
Expand Down
4 changes: 2 additions & 2 deletions hoptimator-jdbc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ test {
}

java {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@
import com.linkedin.hoptimator.Database;
import com.linkedin.hoptimator.util.DeploymentService;
import com.linkedin.hoptimator.util.MaterializedView;
import com.linkedin.hoptimator.util.Sink;
import com.linkedin.hoptimator.util.planner.Pipeline;
import com.linkedin.hoptimator.util.planner.PipelineRel;
import com.linkedin.hoptimator.util.planner.ScriptImplementor;

import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.server.DdlExecutor;
Expand All @@ -35,6 +40,7 @@
import org.apache.calcite.schema.impl.ViewTable;
import org.apache.calcite.schema.impl.ViewTableMacro;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
Expand Down Expand Up @@ -166,9 +172,18 @@ public void execute(SqlCreateMaterializedView create,
MaterializedViewTable materializedViewTable = new MaterializedViewTable(viewTableMacro);
RelDataType rowType = materializedViewTable.getRowType(new SqlTypeFactoryImpl(
RelDataTypeSystem.DEFAULT));
MaterializedView hook = new MaterializedView(context, database, viewPath, rowType, sql,
Collections.emptyMap()); // TODO support CREATE ... WITH (options...)

// Plan a pipeline to materialize the view.
RelNode rel = HoptimatorDriver.convert(context, sql).root.rel;
PipelineRel.Implementor plan = DeploymentService.plan(rel);
plan.setSink(database, viewPath, rowType, Collections.emptyMap());
Pipeline pipeline = plan.pipeline();

MaterializedView hook = new MaterializedView(database, viewPath, sql, plan.sql(),
plan.pipeline());
// TODO support CREATE ... WITH (options...)
ValidationService.validateOrThrow(hook, MaterializedView.class);
pipeline.update();
if (create.getReplace()) {
DeploymentService.update(hook, MaterializedView.class);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ public void execute(Context context, boolean execute) throws Exception {
CalciteConnection conn = (CalciteConnection) context.connection();
RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel;
String specs = DeploymentService.plan(rel).pipeline().specify().stream()
.collect(Collectors.joining("\n---\n"));
context.echo(Collections.singletonList(specs));
.collect(Collectors.joining("---\n"));
String[] lines = specs.replaceAll(";\n","\n").split("\n");
context.echo(Arrays.asList(lines));
} else {
context.echo(content);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.linkedin.hoptimator.k8s.models.V1alpha1Database;
import com.linkedin.hoptimator.k8s.models.V1alpha1DatabaseList;
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate;
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList;
import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList;
import com.linkedin.hoptimator.k8s.models.V1alpha1View;
Expand Down Expand Up @@ -35,6 +37,9 @@ public final class K8sApiEndpoints {
public static final K8sApiEndpoint<V1alpha1TableTemplate, V1alpha1TableTemplateList> TABLE_TEMPLATES =
new K8sApiEndpoint<>("TableTemplate", "hoptimator.linkedin.com", "v1alpha1", "tabletemplates", false,
V1alpha1TableTemplate.class, V1alpha1TableTemplateList.class);
public static final K8sApiEndpoint<V1alpha1JobTemplate, V1alpha1JobTemplateList> JOB_TEMPLATES =
new K8sApiEndpoint<>("JobTemplate", "hoptimator.linkedin.com", "v1alpha1", "jobtemplates", false,
V1alpha1JobTemplate.class, V1alpha1JobTemplateList.class);

private K8sApiEndpoints() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.linkedin.hoptimator.k8s;

import com.linkedin.hoptimator.util.Sink;
import com.linkedin.hoptimator.util.Source;
import com.linkedin.hoptimator.util.Job;

import com.linkedin.hoptimator.Deployer;
import com.linkedin.hoptimator.DeployerProvider;
Expand Down Expand Up @@ -28,6 +30,9 @@ public <T> Collection<Deployer<T>> deployers(Class<T> clazz) {
if (Source.class.isAssignableFrom(clazz)) {
list.add((Deployer<T>) new K8sSourceDeployer(K8sContext.currentContext()));
}
if (Job.class.isAssignableFrom(clazz)) {
list.add((Deployer<T>) new K8sJobDeployer(K8sContext.currentContext()));
}
return list;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.linkedin.hoptimator.k8s;

import com.linkedin.hoptimator.util.Job;
import com.linkedin.hoptimator.util.Template;
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate;
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList;

import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
import org.apache.calcite.sql.dialect.MysqlSqlDialect;
import org.apache.calcite.sql.dialect.CalciteSqlDialect;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.sql.SQLException;

/** Specifies an abstract Job with concrete YAML by applying JobTemplates. */
class K8sJobDeployer extends K8sYamlDeployer<Job> {

private final K8sApi<V1alpha1JobTemplate, V1alpha1JobTemplateList> jobTemplateApi;

K8sJobDeployer(K8sContext context) {
super(context);
this.jobTemplateApi = new K8sApi<>(context, K8sApiEndpoints.JOB_TEMPLATES);
}

@Override
public List<String> specify(Job job) throws SQLException {
Function<SqlDialect, String> sql = job.sql();
Template.Environment env = Template.Environment.EMPTY
.with("name", job.sink().database() + "-" + job.sink().table().toLowerCase(Locale.ROOT))
.with("database", job.sink().database())
.with("schema", job.sink().schema())
.with("table", job.sink().table())
.with("sql", sql.apply(MysqlSqlDialect.DEFAULT))
.with("ansisql", sql.apply(AnsiSqlDialect.DEFAULT))
.with("calcitesql", sql.apply(CalciteSqlDialect.DEFAULT))
.with(job.sink().options());
return jobTemplateApi.list().stream()
.map(x -> x.getSpec())
.filter(x -> x.getDatabases() == null || x.getDatabases().contains(job.sink().database()))
.filter(x -> x.getYaml() != null)
.map(x -> x.getYaml())
.map(x -> new Template.SimpleTemplate(x).render(env))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ protected V1alpha1View toK8sObject(MaterializedView view) {
.metadata(new V1ObjectMeta().name(name))
.spec(new V1alpha1ViewSpec()
.view(q.pollLast()).schema(q.pollLast())
.sql(view.sql()).materialized(true));
.sql(view.viewSql()).materialized(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList;

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;

import io.kubernetes.client.openapi.models.V1ObjectMeta;

Expand All @@ -27,12 +28,9 @@ class K8sPipelineDeployer extends K8sDeployer<MaterializedView, V1alpha1Pipeline
@Override
protected V1alpha1Pipeline toK8sObject(MaterializedView view) throws SQLException {
String name = K8sUtils.canonicalizeName(view.path());
RelNode rel = HoptimatorDriver.convert(view.context(), view.sql()).root.rel;
PipelineRel.Implementor plan = DeploymentService.plan(rel);
String sql = plan.insertInto(view).sql();
plan.implement(view, Sink.class); // include the sink in the plan
String yaml = plan.pipeline().specify().stream()
String yaml = view.pipeline().specify().stream()
.collect(Collectors.joining("\n---\n"));
String sql = view.pipelineSql().apply(AnsiSqlDialect.DEFAULT);
return new V1alpha1Pipeline().kind(K8sApiEndpoints.PIPELINES.kind())
.apiVersion(K8sApiEndpoints.PIPELINES.apiVersion())
.metadata(new V1ObjectMeta().name(name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.sql.SQLException;
import java.util.Locale;

/** Specifies an abstract Source/Sink with concrete YAML by applying TableTemplates. */
/** Specifies an abstract Source with concrete YAML by applying TableTemplates. */
class K8sSourceDeployer extends K8sYamlDeployer<Source> {

private final K8sApi<V1alpha1TableTemplate, V1alpha1TableTemplateList> tableTemplateApi;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void update(String yaml) throws SQLException {
private DynamicKubernetesObject objFromYaml(String yaml) {
DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml);
if (obj.getMetadata().getNamespace() == null) {
obj.getMetadata().namespace(context.namespace());
obj.setMetadata(obj.getMetadata().namespace(context.namespace()));
}
return obj;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* Database metadata.
*/
@ApiModel(description = "Database metadata.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]")
public class V1alpha1Database implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* DatabaseList is a list of Database
*/
@ApiModel(description = "DatabaseList is a list of Database")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]")
public class V1alpha1DatabaseList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Database spec.
*/
@ApiModel(description = "Database spec.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]")
public class V1alpha1DatabaseSpec {
/**
* SQL dialect the driver expects.
Expand Down
Loading

0 comments on commit 11c53f8

Please sign in to comment.