Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JobTemplates and Flink demo #74

Merged
merged 4 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions deploy/samples/flink.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
## This template adds Flink support.

apiVersion: hoptimator.linkedin.com/v1alpha1
kind: JobTemplate
metadata:
name: flink-template
spec:
yaml: |
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: {{name}}
spec:
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 0.1
taskManager:
resource:
memory: "2048m"
cpu: 0.1
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- {{sql}}
jarURI: local:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running

1 change: 1 addition & 0 deletions generate-models.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ docker run \
ghcr.io/kubernetes-client/java/crd-model-gen:v1.0.6 \
/generate.sh -o "$(pwd)/hoptimator-k8s" -n "" -p "com.linkedin.hoptimator.k8s" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/databases.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/jobtemplates.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/pipelines.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/tabletemplates.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/views.crd.yaml" \
Expand Down
5 changes: 2 additions & 3 deletions hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;

import org.jline.reader.Completer;

Expand Down Expand Up @@ -90,9 +91,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
try {
RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel;
PipelineRel.Implementor plan = DeploymentService.plan(rel);
Sink sink = new Sink("PIPELINE", Arrays.asList(new String[]{"PIPELINE", "SINK"}), plan.rowType(),
Collections.emptyMap());
sqlline.output(plan.insertInto(sink).sql());
sqlline.output(plan.sql().apply(AnsiSqlDialect.DEFAULT));
} catch (SQLException e) {
sqlline.error(e);
dispatchCallback.setToFailure();
Expand Down
4 changes: 2 additions & 2 deletions hoptimator-jdbc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ test {
}

java {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@
import com.linkedin.hoptimator.Database;
import com.linkedin.hoptimator.util.DeploymentService;
import com.linkedin.hoptimator.util.MaterializedView;
import com.linkedin.hoptimator.util.Sink;
import com.linkedin.hoptimator.util.planner.Pipeline;
import com.linkedin.hoptimator.util.planner.PipelineRel;
import com.linkedin.hoptimator.util.planner.ScriptImplementor;

import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.server.DdlExecutor;
Expand All @@ -35,6 +40,7 @@
import org.apache.calcite.schema.impl.ViewTable;
import org.apache.calcite.schema.impl.ViewTableMacro;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
Expand Down Expand Up @@ -166,9 +172,18 @@ public void execute(SqlCreateMaterializedView create,
MaterializedViewTable materializedViewTable = new MaterializedViewTable(viewTableMacro);
RelDataType rowType = materializedViewTable.getRowType(new SqlTypeFactoryImpl(
RelDataTypeSystem.DEFAULT));
MaterializedView hook = new MaterializedView(context, database, viewPath, rowType, sql,
Collections.emptyMap()); // TODO support CREATE ... WITH (options...)

// Plan a pipeline to materialize the view.
RelNode rel = HoptimatorDriver.convert(context, sql).root.rel;
PipelineRel.Implementor plan = DeploymentService.plan(rel);
plan.setSink(database, viewPath, rowType, Collections.emptyMap());
Pipeline pipeline = plan.pipeline();

MaterializedView hook = new MaterializedView(database, viewPath, sql, plan.sql(),
plan.pipeline());
// TODO support CREATE ... WITH (options...)
ValidationService.validateOrThrow(hook, MaterializedView.class);
pipeline.update();
if (create.getReplace()) {
DeploymentService.update(hook, MaterializedView.class);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ public void execute(Context context, boolean execute) throws Exception {
CalciteConnection conn = (CalciteConnection) context.connection();
RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel;
String specs = DeploymentService.plan(rel).pipeline().specify().stream()
.collect(Collectors.joining("\n---\n"));
context.echo(Collections.singletonList(specs));
.collect(Collectors.joining("---\n"));
String[] lines = specs.replaceAll(";\n","\n").split("\n");
context.echo(Arrays.asList(lines));
} else {
context.echo(content);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.linkedin.hoptimator.k8s.models.V1alpha1Database;
import com.linkedin.hoptimator.k8s.models.V1alpha1DatabaseList;
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate;
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList;
import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList;
import com.linkedin.hoptimator.k8s.models.V1alpha1View;
Expand Down Expand Up @@ -35,6 +37,9 @@ public final class K8sApiEndpoints {
public static final K8sApiEndpoint<V1alpha1TableTemplate, V1alpha1TableTemplateList> TABLE_TEMPLATES =
new K8sApiEndpoint<>("TableTemplate", "hoptimator.linkedin.com", "v1alpha1", "tabletemplates", false,
V1alpha1TableTemplate.class, V1alpha1TableTemplateList.class);
public static final K8sApiEndpoint<V1alpha1JobTemplate, V1alpha1JobTemplateList> JOB_TEMPLATES =
new K8sApiEndpoint<>("JobTemplate", "hoptimator.linkedin.com", "v1alpha1", "jobtemplates", false,
V1alpha1JobTemplate.class, V1alpha1JobTemplateList.class);

private K8sApiEndpoints() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.linkedin.hoptimator.k8s;

import com.linkedin.hoptimator.util.Sink;
import com.linkedin.hoptimator.util.Source;
import com.linkedin.hoptimator.util.Job;

import com.linkedin.hoptimator.Deployer;
import com.linkedin.hoptimator.DeployerProvider;
Expand Down Expand Up @@ -28,6 +30,9 @@ public <T> Collection<Deployer<T>> deployers(Class<T> clazz) {
if (Source.class.isAssignableFrom(clazz)) {
list.add((Deployer<T>) new K8sSourceDeployer(K8sContext.currentContext()));
}
if (Job.class.isAssignableFrom(clazz)) {
list.add((Deployer<T>) new K8sJobDeployer(K8sContext.currentContext()));
}
return list;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.linkedin.hoptimator.k8s;

import com.linkedin.hoptimator.util.Job;
import com.linkedin.hoptimator.util.Template;
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate;
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList;

import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
import org.apache.calcite.sql.dialect.MysqlSqlDialect;
import org.apache.calcite.sql.dialect.CalciteSqlDialect;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.sql.SQLException;

/** Specifies an abstract Job with concrete YAML by applying JobTemplates. */
class K8sJobDeployer extends K8sYamlDeployer<Job> {

private final K8sApi<V1alpha1JobTemplate, V1alpha1JobTemplateList> jobTemplateApi;

K8sJobDeployer(K8sContext context) {
super(context);
this.jobTemplateApi = new K8sApi<>(context, K8sApiEndpoints.JOB_TEMPLATES);
}

@Override
public List<String> specify(Job job) throws SQLException {
Function<SqlDialect, String> sql = job.sql();
Template.Environment env = Template.Environment.EMPTY
.with("name", job.sink().database() + "-" + job.sink().table().toLowerCase(Locale.ROOT))
.with("database", job.sink().database())
.with("schema", job.sink().schema())
.with("table", job.sink().table())
.with("sql", sql.apply(MysqlSqlDialect.DEFAULT))
.with("ansisql", sql.apply(AnsiSqlDialect.DEFAULT))
.with("calcitesql", sql.apply(CalciteSqlDialect.DEFAULT))
.with(job.sink().options());
return jobTemplateApi.list().stream()
.map(x -> x.getSpec())
.filter(x -> x.getDatabases() == null || x.getDatabases().contains(job.sink().database()))
.filter(x -> x.getYaml() != null)
.map(x -> x.getYaml())
.map(x -> new Template.SimpleTemplate(x).render(env))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ protected V1alpha1View toK8sObject(MaterializedView view) {
.metadata(new V1ObjectMeta().name(name))
.spec(new V1alpha1ViewSpec()
.view(q.pollLast()).schema(q.pollLast())
.sql(view.sql()).materialized(true));
.sql(view.viewSql()).materialized(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList;

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;

import io.kubernetes.client.openapi.models.V1ObjectMeta;

Expand All @@ -27,12 +28,9 @@ class K8sPipelineDeployer extends K8sDeployer<MaterializedView, V1alpha1Pipeline
@Override
protected V1alpha1Pipeline toK8sObject(MaterializedView view) throws SQLException {
String name = K8sUtils.canonicalizeName(view.path());
RelNode rel = HoptimatorDriver.convert(view.context(), view.sql()).root.rel;
PipelineRel.Implementor plan = DeploymentService.plan(rel);
String sql = plan.insertInto(view).sql();
plan.implement(view, Sink.class); // include the sink in the plan
String yaml = plan.pipeline().specify().stream()
String yaml = view.pipeline().specify().stream()
.collect(Collectors.joining("\n---\n"));
String sql = view.pipelineSql().apply(AnsiSqlDialect.DEFAULT);
return new V1alpha1Pipeline().kind(K8sApiEndpoints.PIPELINES.kind())
.apiVersion(K8sApiEndpoints.PIPELINES.apiVersion())
.metadata(new V1ObjectMeta().name(name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.sql.SQLException;
import java.util.Locale;

/** Specifies an abstract Source/Sink with concrete YAML by applying TableTemplates. */
/** Specifies an abstract Source with concrete YAML by applying TableTemplates. */
class K8sSourceDeployer extends K8sYamlDeployer<Source> {

private final K8sApi<V1alpha1TableTemplate, V1alpha1TableTemplateList> tableTemplateApi;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void update(String yaml) throws SQLException {
private DynamicKubernetesObject objFromYaml(String yaml) {
DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml);
if (obj.getMetadata().getNamespace() == null) {
obj.getMetadata().namespace(context.namespace());
obj.setMetadata(obj.getMetadata().namespace(context.namespace()));
}
return obj;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* Database metadata.
*/
@ApiModel(description = "Database metadata.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]")
public class V1alpha1Database implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* DatabaseList is a list of Database
*/
@ApiModel(description = "DatabaseList is a list of Database")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]")
public class V1alpha1DatabaseList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Database spec.
*/
@ApiModel(description = "Database spec.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-11-26T23:21:26.031Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-04T03:00:43.571Z[Etc/UTC]")
public class V1alpha1DatabaseSpec {
/**
* SQL dialect the driver expects.
Expand Down
Loading
Loading