From bef018308e2cdeb0fb4c942a62b3eac76d810505 Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Wed, 10 Jan 2024 15:31:22 -0600 Subject: [PATCH] Add support for transformations in resource templates --- Makefile | 2 +- deploy/dev/kafka.yaml | 2 +- etc/integration-tests.sql | 17 +++--- .../linkedin/hoptimator/catalog/Names.java | 19 +++++++ .../linkedin/hoptimator/catalog/Resource.java | 52 +++++++++++++++++-- .../hoptimator/catalog/ResourceTest.java | 23 ++++++++ .../hoptimator/catalog/kafka/KafkaTopic.java | 2 - .../catalog/kafka/KafkaTopicAcl.java | 1 - .../main/resources/KafkaTopic.yaml.template | 2 +- .../resources/KafkaTopicAcl.yaml.template | 4 +- .../subscription/SubscriptionReconciler.java | 6 +++ 11 files changed, 111 insertions(+), 19 deletions(-) create mode 100644 hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Names.java diff --git a/Makefile b/Makefile index 85d937a6..2a50c072 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ deploy-dev-environment: kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping" kubectl create namespace kafka || echo "skipping" kubectl create namespace mysql || echo "skipping" - helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.4.0/ + helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.6.1/ helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io diff --git a/deploy/dev/kafka.yaml b/deploy/dev/kafka.yaml index aad99078..5fd90897 100644 --- a/deploy/dev/kafka.yaml +++ b/deploy/dev/kafka.yaml @@ -23,7 +23,7 @@ metadata: namespace: kafka spec: kafka: - version: 3.4.0 + version: 3.6.1 replicas: 1 listeners: - name: plain diff --git a/etc/integration-tests.sql b/etc/integration-tests.sql index d107fd88..b6844c02 100644 --- a/etc/integration-tests.sql +++ b/etc/integration-tests.sql @@ -7,19 +7,20 @@ SELECT * FROM DATAGEN.PERSON; SELECT * FROM DATAGEN.COMPANY; +-- test mermaid and yaml commands +!mermaid insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON +!yaml insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON + +-- test insert into command +!insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON +SELECT * FROM RAWKAFKA."test-sink" LIMIT 5; + -- MySQL CDC tables SELECT * FROM INVENTORY."products_on_hand" LIMIT 1; -- Test check command !check not empty SELECT * FROM INVENTORY."products_on_hand"; --- MySQL CDC -> Kafka +-- MySQL CDC -> Kafka (via sample subscription "products") SELECT * FROM RAWKAFKA."products" LIMIT 1; --- test insert into command -!insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON -SELECT * FROM RAWKAFKA."test-sink" LIMIT 5; - --- test mermaid and yaml commands -!mermaid insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON -!yaml insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Names.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Names.java new file mode 100644 index 00000000..a6371715 --- /dev/null +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Names.java @@ -0,0 +1,19 @@ +package com.linkedin.hoptimator.catalog; + +import java.util.Locale; + +public final class Names { + + private Names() { + } + + /** Attempt to format s as a K8s object name, or part of one. */ + public static String canonicalize(String s) { + return s.toLowerCase(Locale.ROOT) + .replaceAll("[^a-z0-9\\-]+", "-") + .replaceAll("^[^a-z0-9]*", "") + .replaceAll("[^a-z0-9]*$", "") + .replaceAll("\\-+", "-"); + } + +} diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java index fb5c3af2..bd884325 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java @@ -2,11 +2,13 @@ import java.io.InputStream; import java.io.IOException; +import java.util.Arrays; import java.util.ArrayList; import java.util.Collection; import java.util.Enumeration; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -255,7 +257,21 @@ public interface Template { * Replaces `{{var}}` in a template file with the corresponding variable. * * Resource-scoped variables take precedence over Environment-scoped - * variables. Default values can supplied with `{{var:default}}`. + * variables. + * + * Default values can supplied with `{{var:default}}`. + * + * Built-in transformations can be applied to variables, including: + * + * - `{{var toName}}`, `{{var:default toName}}`: canonicalize the + * variable as a valid K8s object name. + * - `{{var toUpperCase}}`, `{{var:default toUpperCase}}`: render in + * all upper case. + * - `{{var toLowerCase}}`, `{{var:default toLowerCase}}`: render in + * all lower case. + * - `{{var concat}}`, `{{var:default concat}}`: concatinate a multiline + * string into one line + * - `{{var concat toUpperCase}}`: apply both transformations in sequence. * * If `var` contains multiple lines, the behavior depends on context; * specifically, whether the pattern appears within a list or comment @@ -288,7 +304,8 @@ public SimpleTemplate(Environment env, String template) { @Override public String render(Resource resource) { StringBuffer sb = new StringBuffer(); - Pattern p = Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*(:([\\w_\\-\\.]+))?\\s*\\}\\}"); + Pattern p = Pattern.compile( + "([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*(:([\\w_\\-\\.]+))?\\s*((\\w+\\W*)*)\\s*\\}\\}"); Matcher m = p.matcher(template); while (m.find()) { String prefix = m.group(1); @@ -297,18 +314,47 @@ public String render(Resource resource) { } String key = m.group(2); String defaultValue = m.group(4); + String transform = m.group(5); String value = resource.getOrDefault(key, () -> env.getOrDefault(key, () -> defaultValue)); if (value == null) { throw new IllegalArgumentException(template + " has no value for key " + key + "."); } + String transformedValue = applyTransform(value, transform); String quotedPrefix = Matcher.quoteReplacement(prefix); - String quotedValue = Matcher.quoteReplacement(value); + String quotedValue = Matcher.quoteReplacement(transformedValue); String replacement = quotedPrefix + quotedValue.replaceAll("\\n", quotedPrefix); m.appendReplacement(sb, replacement); } m.appendTail(sb); return sb.toString(); } + + private static String applyTransform(String value, String transform) { + String res = value; + String[] funcs = transform.split("\\W+"); + for (String f : funcs) { + switch (f) { + case "toLowerCase": + res = res.toLowerCase(Locale.ROOT); + break; + case "toUpperCase": + res = res.toUpperCase(Locale.ROOT); + break; + case "toName": + res = canonicalizeName(res); + break; + case "concat": + res = res.replace("\n", ""); + break; + } + } + return res; + } + + /** Attempt to format s as a K8s object name, or part of one. */ + protected static String canonicalizeName(String s) { + return Names.canonicalize(s); + } } /** Locates a Template for a given Resource */ diff --git a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ResourceTest.java b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ResourceTest.java index 5345767d..11df605c 100644 --- a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ResourceTest.java +++ b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ResourceTest.java @@ -5,6 +5,8 @@ import static org.junit.Assert.assertEquals; import org.junit.Test; +import java.util.function.Function; + public class ResourceTest { @Test @@ -25,4 +27,25 @@ public void handlesChainedEnvironments() { assertEquals("bar", env.getOrDefault("foo", () -> "x")); assertEquals("x", env.getOrDefault("oof", () -> "x")); } + + @Test + public void rendersTemplates() { + Resource.Environment env = new Resource.SimpleEnvironment() {{ + export("one", "1"); + export("foo", "bar"); + }}; + Resource res = new Resource("x") {{ + export("car", "Hyundai Accent"); + export("parts", "wheels\nseats\nbrakes\nwipers"); + }}; + + Function f = x -> new Resource.SimpleTemplate(env, x); + assertEquals("xyz", f.apply("xyz").render(res)); + assertEquals("bar", f.apply("{{foo}}").render(res)); + assertEquals("bar", f.apply("{{ foo }}").render(res)); + assertEquals("abc", f.apply("{{xyz:abc}}").render(res)); + assertEquals("hyundai-accent", f.apply("{{car toName}}").render(res)); + assertEquals("HYUNDAI-ACCENT", f.apply("{{car toName toUpperCase}}").render(res)); + assertEquals("WHEELSSEATSBRAKESWIPERS", f.apply("{{parts concat toUpperCase}}").render(res)); + } } diff --git a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopic.java b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopic.java index 7d096b22..848edf64 100644 --- a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopic.java +++ b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopic.java @@ -2,14 +2,12 @@ import com.linkedin.hoptimator.catalog.Resource; -import java.util.Locale; import java.util.Map; class KafkaTopic extends Resource { KafkaTopic(String topicName, Map clientOverrides) { super("KafkaTopic"); export("topicName", topicName); - export("topicNameLowerCase", topicName.toLowerCase(Locale.ROOT)); export("clientOverrides", clientOverrides); } } diff --git a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopicAcl.java b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopicAcl.java index a5b8d553..ba753236 100644 --- a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopicAcl.java +++ b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopicAcl.java @@ -8,7 +8,6 @@ class KafkaTopicAcl extends Resource { public KafkaTopicAcl(String topicName, String principal, String method) { super("KafkaTopicAcl"); export("topicName", topicName); - export("topicNameLowerCase", topicName.toLowerCase(Locale.ROOT)); export("principal", principal); export("method", method); } diff --git a/hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template b/hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template index 734703fc..fa9b76f9 100644 --- a/hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template +++ b/hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template @@ -1,7 +1,7 @@ apiVersion: hoptimator.linkedin.com/v1alpha1 kind: KafkaTopic metadata: - name: {{topicNameLowerCase}} + name: {{topicName toName}} namespace: {{pipeline.namespace}} spec: topicName: {{topicName}} diff --git a/hoptimator-kafka-adapter/src/main/resources/KafkaTopicAcl.yaml.template b/hoptimator-kafka-adapter/src/main/resources/KafkaTopicAcl.yaml.template index f69c870e..16ff9731 100644 --- a/hoptimator-kafka-adapter/src/main/resources/KafkaTopicAcl.yaml.template +++ b/hoptimator-kafka-adapter/src/main/resources/KafkaTopicAcl.yaml.template @@ -1,11 +1,11 @@ apiVersion: hoptimator.linkedin.com/v1alpha1 kind: Acl metadata: - name: {{topicNameLowerCase}}-acl-{{id}} + name: {{topicName toName}}-acl-{{id}} namespace: {{pipeline.namespace}} spec: resource: kind: KafkaTopic - name: {{topicNameLowerCase}} + name: {{topicName toName}} method: {{method}} principal: {{principal}} diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java index 4ac3e183..85e5f0be 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java @@ -71,6 +71,8 @@ public Result reconcile(Request request) { String kind = object.getKind(); + object.getMetadata().setNamespace(namespace); + V1alpha1SubscriptionStatus status = object.getStatus(); if (status == null) { status = new V1alpha1SubscriptionStatus(); @@ -198,6 +200,10 @@ private boolean apply(String yaml, V1alpha1Subscription owner) { DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml); String namespace = obj.getMetadata().getNamespace(); + if (namespace == null) { + namespace = owner.getMetadata().getNamespace(); + obj.getMetadata().setNamespace(namespace); + } String name = obj.getMetadata().getName(); KubernetesApiResponse existing = operator.apiFor(obj).get(namespace, name); if (existing.isSuccess()) {