Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for transformations in resource templates #59

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ deploy-dev-environment:
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
kubectl create namespace kafka || echo "skipping"
kubectl create namespace mysql || echo "skipping"
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.4.0/
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.6.1/
helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
Expand Down
2 changes: 1 addition & 1 deletion deploy/dev/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ metadata:
namespace: kafka
spec:
kafka:
version: 3.4.0
version: 3.6.1
replicas: 1
listeners:
- name: plain
Expand Down
17 changes: 9 additions & 8 deletions etc/integration-tests.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@
SELECT * FROM DATAGEN.PERSON;
SELECT * FROM DATAGEN.COMPANY;

-- test mermaid and yaml commands
!mermaid insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
!yaml insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON

-- test insert into command
!insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
SELECT * FROM RAWKAFKA."test-sink" LIMIT 5;

-- MySQL CDC tables
SELECT * FROM INVENTORY."products_on_hand" LIMIT 1;

-- Test check command
!check not empty SELECT * FROM INVENTORY."products_on_hand";

-- MySQL CDC -> Kafka
-- MySQL CDC -> Kafka (via sample subscription "products")
SELECT * FROM RAWKAFKA."products" LIMIT 1;

-- test insert into command
!insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
SELECT * FROM RAWKAFKA."test-sink" LIMIT 5;

-- test mermaid and yaml commands
!mermaid insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
!yaml insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.linkedin.hoptimator.catalog;

import java.util.Locale;

public final class Names {

private Names() {
}

/** Attempt to format s as a K8s object name, or part of one. */
public static String canonicalize(String s) {
return s.toLowerCase(Locale.ROOT)
.replaceAll("[^a-z0-9\\-]+", "-")
.replaceAll("^[^a-z0-9]*", "")
.replaceAll("[^a-z0-9]*$", "")
.replaceAll("\\-+", "-");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import java.io.InputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -255,7 +257,21 @@ public interface Template {
* Replaces `{{var}}` in a template file with the corresponding variable.
*
* Resource-scoped variables take precedence over Environment-scoped
* variables. Default values can supplied with `{{var:default}}`.
* variables.
*
* Default values can supplied with `{{var:default}}`.
*
* Built-in transformations can be applied to variables, including:
*
* - `{{var toName}}`, `{{var:default toName}}`: canonicalize the
* variable as a valid K8s object name.
* - `{{var toUpperCase}}`, `{{var:default toUpperCase}}`: render in
* all upper case.
* - `{{var toLowerCase}}`, `{{var:default toLowerCase}}`: render in
* all lower case.
* - `{{var concat}}`, `{{var:default concat}}`: concatinate a multiline
* string into one line
* - `{{var concat toUpperCase}}`: apply both transformations in sequence.
*
* If `var` contains multiple lines, the behavior depends on context;
* specifically, whether the pattern appears within a list or comment
Expand Down Expand Up @@ -288,7 +304,8 @@ public SimpleTemplate(Environment env, String template) {
@Override
public String render(Resource resource) {
StringBuffer sb = new StringBuffer();
Pattern p = Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*(:([\\w_\\-\\.]+))?\\s*\\}\\}");
Pattern p = Pattern.compile(
"([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*(:([\\w_\\-\\.]+))?\\s*((\\w+\\s*)*)\\s*\\}\\}");
Matcher m = p.matcher(template);
while (m.find()) {
String prefix = m.group(1);
Expand All @@ -297,18 +314,47 @@ public String render(Resource resource) {
}
String key = m.group(2);
String defaultValue = m.group(4);
String transform = m.group(5);
String value = resource.getOrDefault(key, () -> env.getOrDefault(key, () -> defaultValue));
if (value == null) {
throw new IllegalArgumentException(template + " has no value for key " + key + ".");
}
String transformedValue = applyTransform(value, transform);
String quotedPrefix = Matcher.quoteReplacement(prefix);
String quotedValue = Matcher.quoteReplacement(value);
String quotedValue = Matcher.quoteReplacement(transformedValue);
String replacement = quotedPrefix + quotedValue.replaceAll("\\n", quotedPrefix);
m.appendReplacement(sb, replacement);
}
m.appendTail(sb);
return sb.toString();
}

private static String applyTransform(String value, String transform) {
String res = value;
String[] funcs = transform.split("\\W+");
for (String f : funcs) {
switch (f) {
case "toLowerCase":
res = res.toLowerCase(Locale.ROOT);
break;
case "toUpperCase":
res = res.toUpperCase(Locale.ROOT);
break;
case "toName":
res = canonicalizeName(res);
break;
case "concat":
res = res.replace("\n", "");
break;
}
}
return res;
}

/** Attempt to format s as a K8s object name, or part of one. */
protected static String canonicalizeName(String s) {
return Names.canonicalize(s);
}
}

/** Locates a Template for a given Resource */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import static org.junit.Assert.assertEquals;
import org.junit.Test;

import java.util.function.Function;

public class ResourceTest {

@Test
Expand All @@ -25,4 +27,26 @@ public void handlesChainedEnvironments() {
assertEquals("bar", env.getOrDefault("foo", () -> "x"));
assertEquals("x", env.getOrDefault("oof", () -> "x"));
}

@Test
public void rendersTemplates() {
Resource.Environment env = new Resource.SimpleEnvironment() {{
export("one", "1");
export("foo", "bar");
}};
Resource res = new Resource("x") {{
export("car", "Hyundai Accent");
export("parts", "wheels\nseats\nbrakes\nwipers");
}};

Function<String, Resource.Template> f = x -> new Resource.SimpleTemplate(env, x);
assertEquals("xyz", f.apply("xyz").render(res));
assertEquals("bar", f.apply("{{foo}}").render(res));
assertEquals("bar", f.apply("{{ foo }}").render(res));
assertEquals("abc", f.apply("{{xyz:abc}}").render(res));
assertEquals("hyundai-accent", f.apply("{{car toName}}").render(res));
assertEquals("HYUNDAI-ACCENT", f.apply("{{car toName toUpperCase}}").render(res));
assertEquals("WHEELSSEATSBRAKESWIPERS", f.apply("{{parts concat toUpperCase}}").render(res));
assertEquals("BAR\nbar\nxyz", f.apply("{{foo toUpperCase}}\n{{foo toLowerCase}}\n{{x:xyz}}").render(res));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

import com.linkedin.hoptimator.catalog.Resource;

import java.util.Locale;
import java.util.Map;

class KafkaTopic extends Resource {
KafkaTopic(String topicName, Map<String, String> clientOverrides) {
super("KafkaTopic");
export("topicName", topicName);
export("topicNameLowerCase", topicName.toLowerCase(Locale.ROOT));
export("clientOverrides", clientOverrides);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ class KafkaTopicAcl extends Resource {
public KafkaTopicAcl(String topicName, String principal, String method) {
super("KafkaTopicAcl");
export("topicName", topicName);
export("topicNameLowerCase", topicName.toLowerCase(Locale.ROOT));
export("principal", principal);
export("method", method);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: KafkaTopic
metadata:
name: {{topicNameLowerCase}}
name: {{topicName toName}}
namespace: {{pipeline.namespace}}
spec:
topicName: {{topicName}}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Acl
metadata:
name: {{topicNameLowerCase}}-acl-{{id}}
name: {{topicName toName}}-acl-{{id}}
namespace: {{pipeline.namespace}}
spec:
resource:
kind: KafkaTopic
name: {{topicNameLowerCase}}
name: {{topicName toName}}
method: {{method}}
principal: {{principal}}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public Result reconcile(Request request) {

String kind = object.getKind();

object.getMetadata().setNamespace(namespace);

V1alpha1SubscriptionStatus status = object.getStatus();
if (status == null) {
status = new V1alpha1SubscriptionStatus();
Expand Down Expand Up @@ -198,6 +200,10 @@ private boolean apply(String yaml, V1alpha1Subscription owner) {

DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml);
String namespace = obj.getMetadata().getNamespace();
if (namespace == null) {
namespace = owner.getMetadata().getNamespace();
obj.getMetadata().setNamespace(namespace);
}
String name = obj.getMetadata().getName();
KubernetesApiResponse<DynamicKubernetesObject> existing = operator.apiFor(obj).get(namespace, name);
if (existing.isSuccess()) {
Expand Down
Loading