Skip to content

Commit

Permalink
Add toxiproxy for kafka (#849)
Browse files Browse the repository at this point in the history
  • Loading branch information
akovalov-playtika authored Mar 31, 2022
1 parent a04aaf5 commit bc9a342
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 46 deletions.
8 changes: 8 additions & 0 deletions embedded-kafka/README.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
=== embedded-kafka

TIP: This module provides integration with https://github.com/Shopify/toxiproxy[ToxiProxy] out of the box.
ToxiProxy is a great tool for simulating network conditions, meaning that you can test your application's resiliency.

==== Maven dependency

.pom.xml
Expand Down Expand Up @@ -28,6 +31,7 @@
* `embedded.kafka.schema-registry.port` `(default is 8081)`
* `embedded.kafka.schema-registry.avroCompatibilityLevel` `(NONE|BACKWARD|BACKWARD_TRANSITIVE|FORWARD|FORWARD_TRANSITIVE|FULL|FULL_TRANSITIVE, default is BACKWARD)`
* `embedded.kafka.schema-registry.authentication` `(NONE|BASIC, default is NONE)`
* `embedded.toxiproxy.proxies.kafka.enabled` Enables creation of the container with ToxiProxy TCP proxy and a proxy to the `embedded-kafka` container.

==== Filesystem bindings

Expand All @@ -50,6 +54,10 @@ By default, to your projects `target` folder. You can configure binding using pr
* `embedded.kafka.schema-registry.port`
* `embedded.kafka.schema-registry.username`
* `embedded.kafka.schema-registry.password`
* `embedded.kafka.toxiproxy.brokerList`
* `embedded.kafka.toxiproxy.saslPlaintext.brokerList`
* Bean `ToxiproxyContainer.ContainerProxy kafkaPlainTextContainerProxy`
* Bean `ToxiproxyContainer.ContainerProxy kafkaSaslContainerProxy`

==== Using `SASL_PLAINTEXT`

Expand Down
4 changes: 4 additions & 0 deletions embedded-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.playtika.testcontainers</groupId>
<artifactId>embedded-toxiproxy</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import com.playtika.test.kafka.checks.KafkaStatusCheck;
import com.playtika.test.kafka.properties.KafkaConfigurationProperties;
import com.playtika.test.kafka.properties.ZookeeperConfigurationProperties;
import com.playtika.test.toxiproxy.EmbeddedToxiProxyBootstrapConfiguration;
import com.playtika.test.toxiproxy.condition.ConditionalOnToxiProxyEnabled;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
Expand All @@ -17,6 +20,7 @@
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;
import org.testcontainers.utility.MountableFile;

import java.io.IOException;
Expand All @@ -30,16 +34,19 @@
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

import static com.playtika.test.common.utils.ContainerUtils.configureCommonsAndStart;
import static com.playtika.test.kafka.properties.KafkaConfigurationProperties.KAFKA_BEAN_NAME;
import static com.playtika.test.kafka.properties.KafkaConfigurationProperties.KAFKA_PLAIN_TEXT_TOXI_PROXY_BEAN_NAME;
import static com.playtika.test.kafka.properties.KafkaConfigurationProperties.KAFKA_SASL_TOXI_PROXY_BEAN_NAME;
import static java.lang.String.format;
import static org.testcontainers.containers.KafkaContainer.KAFKA_PORT;

@Slf4j
@Configuration
@AutoConfigureAfter(EmbeddedToxiProxyBootstrapConfiguration.class)
@ConditionalOnProperty(value = "embedded.kafka.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(value = {KafkaConfigurationProperties.class, ZookeeperConfigurationProperties.class})
public class KafkaContainerConfiguration {
Expand All @@ -60,6 +67,52 @@ public KafkaStatusCheck kafkaStartupCheckStrategy(KafkaConfigurationProperties k
return new KafkaStatusCheck(kafkaProperties);
}

@Bean(name = KAFKA_PLAIN_TEXT_TOXI_PROXY_BEAN_NAME)
@ConditionalOnToxiProxyEnabled(module = "kafka")
ToxiproxyContainer.ContainerProxy kafkaContainerPlainTextProxy(ToxiproxyContainer toxiproxyContainer,
GenericContainer kafka,
KafkaConfigurationProperties properties,
ConfigurableEnvironment environment) {
ToxiproxyContainer.ContainerProxy plainTextProxy =
toxiproxyContainer.getProxy(kafka, properties.getBrokerPort());

Map<String, Object> map = new LinkedHashMap<>();

String plaintextToxiProxyBrokerList =
format("%s:%d", plainTextProxy.getContainerIpAddress(), plainTextProxy.getProxyPort());
map.put("embedded.kafka.toxiproxy.brokerList", plaintextToxiProxyBrokerList);
map.put("embedded.kafka.toxiproxy.proxyName", plainTextProxy.getName());

MapPropertySource propertySource = new MapPropertySource("embeddedKafkaPlainToxiProxyInfo", map);
environment.getPropertySources().addFirst(propertySource);
log.info("Kafka ToxiProxy plain-text connection details {}", map);

return plainTextProxy;
}

@Bean(name = KAFKA_SASL_TOXI_PROXY_BEAN_NAME)
@ConditionalOnToxiProxyEnabled(module = "kafka")
ToxiproxyContainer.ContainerProxy kafkaContainerSaslProxy(ToxiproxyContainer toxiproxyContainer,
GenericContainer kafka,
KafkaConfigurationProperties properties,
ConfigurableEnvironment environment) {
ToxiproxyContainer.ContainerProxy saslProxy =
toxiproxyContainer.getProxy(kafka, properties.getSaslPlaintextBrokerPort());

Map<String, Object> map = new LinkedHashMap<>();

String saslToxiProxyBrokerList =
format("%s:%d", saslProxy.getContainerIpAddress(), saslProxy.getProxyPort());
map.put("embedded.kafka.toxiproxy.saslPlaintext.brokerList", saslToxiProxyBrokerList);
map.put("embedded.kafka.toxiproxy.saslPlaintext.proxyName", saslProxy.getName());

MapPropertySource propertySource = new MapPropertySource("embeddedKafkaSaslToxiProxyInfo", map);
environment.getPropertySources().addFirst(propertySource);
log.info("Kafka ToxiProxy SASL connection details {}", map);

return saslProxy;
}

@Bean(name = KAFKA_BEAN_NAME, destroyMethod = "stop")
public GenericContainer kafka(
KafkaStatusCheck kafkaStatusCheck,
Expand All @@ -79,7 +132,6 @@ public GenericContainer kafka(
KafkaContainer kafka = new KafkaContainer(ContainerUtils.getDockerImageName(kafkaProperties)) {
@Override
public String getBootstrapServers() {
super.getBootstrapServers();
return "EXTERNAL_PLAINTEXT://" + getHost() + ":" + getMappedPort(kafkaExternalPort) + "," +
"EXTERNAL_SASL_PLAINTEXT://" + getHost() + ":" + getMappedPort(saslPlaintextKafkaExternalPort) + "," +
"INTERNAL_PLAINTEXT://" + KAFKA_HOST_NAME + ":" + kafkaInternalPort;
Expand Down Expand Up @@ -117,7 +169,7 @@ public String getBootstrapServers() {
.withCopyFileToContainer(MountableFile.forClasspathResource("kafka_server_jaas.conf"), "/etc/kafka/kafka_server_jaas.conf")
.withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf")
.withEnv("KAFKA_GC_LOG_OPTS", "-Dnogclog")
.withExposedPorts(kafkaInternalPort, kafkaExternalPort, saslPlaintextKafkaExternalPort, KAFKA_PORT)
.withExposedPorts(kafkaInternalPort, kafkaExternalPort, saslPlaintextKafkaExternalPort)
.withNetwork(network)
.withNetworkAliases(KAFKA_HOST_NAME)
.withExtraHost(KAFKA_HOST_NAME, "127.0.0.1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@

import com.github.dockerjava.api.model.Capability;
import com.playtika.test.common.properties.CommonContainerProperties;
import com.playtika.test.common.utils.ContainerUtils;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.With;
import org.springframework.boot.context.properties.ConfigurationProperties;

import javax.annotation.PostConstruct;
import javax.validation.constraints.AssertTrue;

import java.util.Arrays;
Expand All @@ -23,14 +21,16 @@
public class KafkaConfigurationProperties extends CommonContainerProperties {

public static final String KAFKA_BEAN_NAME = "kafka";
public static final String KAFKA_PLAIN_TEXT_TOXI_PROXY_BEAN_NAME = "kafkaPlainTextContainerProxy";
public static final String KAFKA_SASL_TOXI_PROXY_BEAN_NAME = "kafkaSaslContainerProxy";
public static final String KAFKA_USER = "alice";
public static final String KAFKA_PASSWORD = "alice-secret";

protected String brokerList;
protected String containerBrokerList;
protected int brokerPort = 0;
protected int containerBrokerPort = 0;
protected int saslPlaintextBrokerPort = 0;
protected int brokerPort = 9093;
protected int containerBrokerPort = 9094;
protected int saslPlaintextBrokerPort = 9095;
protected int socketTimeoutMs = 5_000;
protected int bufferSize = 64 * 1024;

Expand Down Expand Up @@ -63,25 +63,6 @@ public KafkaConfigurationProperties() {
this.setCapabilities(Arrays.asList(Capability.NET_ADMIN));
}

/**
* Kafka container port will be assigned automatically if free port is available.
* Override this only if you are sure that specified port is free.
*/
@PostConstruct
private void init() {
if (this.brokerPort == 0) {
this.brokerPort = ContainerUtils.getAvailableMappingPort();
}

if (this.containerBrokerPort == 0) {
this.containerBrokerPort = ContainerUtils.getAvailableMappingPort();
}

if (this.saslPlaintextBrokerPort == 0) {
this.saslPlaintextBrokerPort = ContainerUtils.getAvailableMappingPort();
}
}

// https://hub.docker.com/r/confluentinc/cp-kafka
// https://docs.confluent.io/platform/current/installation/versions-interoperability.html
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public class EmbeddedKafkaTest extends AbstractEmbeddedKafkaTest {

@Autowired
protected KafkaTopicsConfigurer kafkaTopicsConfigurer;
// @Autowired
// protected NetworkTestOperations kafkaNetworkTestOperations;

@Test
@DisplayName("creates topics on startup")
Expand Down Expand Up @@ -79,23 +77,6 @@ public void shouldSendAndConsumeTransactionalMessage() throws Exception {
.isEqualTo(MESSAGE);
}

// @Disabled("RHEL image doesn't support to simply install tc")
// @Test
// @DisplayName("allows to emulate latency on send")
// public void shouldEmulateLatencyOnSend() throws Exception {
// kafkaNetworkTestOperations
// .withNetworkLatency(
// ofMillis(1000),
// () -> assertThat(durationOf(() -> sendMessage("topic3", "abc0")))
// .isGreaterThan(1000L));
//
// assertThat(durationOf(() -> sendMessage("topic3", "abc1")))
// .isLessThan(200L);
//
// assertThat(consumeMessages("topic3"))
// .containsExactly("abc0", "abc1");
// }

@AfterAll
public static void afterAll(@Autowired KafkaConfigurationProperties kafkaProperties, @Autowired ZookeeperConfigurationProperties zookeeperProperties, TestInfo testInfo) throws Exception {
// JUnit invokes static afterAll() even when child class is running and TestInstance.Lifecycle.PER_CLASS is being used
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package com.playtika.test.kafka;

import eu.rekawek.toxiproxy.model.ToxicDirection;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.test.context.TestPropertySource;
import org.testcontainers.containers.ToxiproxyContainer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.playtika.test.kafka.properties.KafkaConfigurationProperties.KAFKA_PLAIN_TEXT_TOXI_PROXY_BEAN_NAME;
import static com.playtika.test.kafka.properties.KafkaConfigurationProperties.KAFKA_SASL_TOXI_PROXY_BEAN_NAME;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;

@Order(7)
@TestPropertySource(properties = {
"embedded.toxiproxy.proxies.kafka.enabled=true"
})
@TestInstance(PER_CLASS)
@DisplayName("ToxiProxy embedded-kafka test")
public class ToxiProxyEmbeddedKafkaTest extends AbstractEmbeddedKafkaTest {
private static final String SECURE_TOPIC = "secureTopic";

@Autowired
@Qualifier(KAFKA_PLAIN_TEXT_TOXI_PROXY_BEAN_NAME)
protected ToxiproxyContainer.ContainerProxy kafkaPlainTextProxy;
@Autowired
@Qualifier(KAFKA_SASL_TOXI_PROXY_BEAN_NAME)
protected ToxiproxyContainer.ContainerProxy kafkaSaslProxy;

@Value("${embedded.kafka.toxiproxy.brokerList}")
protected List<String> toxiproxyBrokerList;
@Value("${embedded.kafka.toxiproxy.saslPlaintext.brokerList}")
protected List<String> toxiproxySaslBrokerList;
@Value("${embedded.kafka.saslPlaintext.user}")
protected String kafkaUser;
@Value("${embedded.kafka.saslPlaintext.password}")
protected String kafkaPassword;

@Override
protected Map<String, Object> getKafkaProducerConfiguration() {
Map<String, Object> conf = super.getKafkaProducerConfiguration();
conf.put(BOOTSTRAP_SERVERS_CONFIG, toxiproxyBrokerList);
return conf;
}

@Test
@DisplayName("allows to emulate latency on send")
public void shouldEmulateLatencyOnSend() throws Exception {
kafkaPlainTextProxy.toxics()
.latency("latency", ToxicDirection.DOWNSTREAM, 1_100)
.setJitter(50);

try {
assertThat(durationOf(() -> sendMessage("topic3", "abc0")))
.isGreaterThan(1000L);
} finally {
kafkaPlainTextProxy.toxics().get("latency").remove();
}

assertThat(durationOf(() -> sendMessage("topic3", "abc1")))
.isLessThan(200L);

assertThat(consumeMessages("topic3"))
.containsExactly("abc0", "abc1");
}

@Test
@DisplayName("allows to emulate latency on send to SASL-Plaintext secure topic")
public void shouldEmulateLatencyOnSendToSecureTopic() throws Exception {
kafkaSaslProxy.toxics()
.latency("latency", ToxicDirection.DOWNSTREAM, 1_100)
.setJitter(50);

try {
assertThat(durationOf(() -> sendMessage(SECURE_TOPIC, "abc0", saslKafkaProducerConfiguration())))
.isGreaterThan(1000L);
} finally {
kafkaSaslProxy.toxics().get("latency").remove();
}

assertThat(durationOf(() -> sendMessage(SECURE_TOPIC, "abc1", saslKafkaProducerConfiguration())))
.isLessThan(200L);

assertThat(consumeMessages(SECURE_TOPIC, saslKafkaConsumerConfiguration()))
.containsExactly("abc0", "abc1");
}

protected Map<String, Object> saslKafkaProducerConfiguration() {
Map<String, Object> conf = getKafkaProducerConfiguration();
conf.putAll(createSaslPlaintextKafkaConfigurationProperties());
return conf;
}

protected Map<String, Object> saslKafkaConsumerConfiguration() {
Map<String, Object> conf = getKafkaConsumerConfiguration();
conf.putAll(createSaslPlaintextKafkaConfigurationProperties());
return conf;
}

private Map<String, Object> createSaslPlaintextKafkaConfigurationProperties() {
Map<String, Object> conf = new HashMap<>();
conf.put(BOOTSTRAP_SERVERS_CONFIG, toxiproxySaslBrokerList);
conf.put(SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
conf.put(SASL_MECHANISM, "PLAIN");
conf.put(SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + kafkaUser + "\" " +
"password=\"" + kafkaPassword + "\";"
);
return conf;
}
}

0 comments on commit bc9a342

Please sign in to comment.