Skip to content

Commit

Permalink
Enable pulsar e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Apr 9, 2024
1 parent 9225fc5 commit 801c833
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@
import java.util.concurrent.TimeUnit;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.inject.Inject;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

Expand All @@ -39,44 +42,59 @@
*/
@Tag(TestTags.PERFORMANCE)
@Tag(TestTags.SLOW)
@Disabled
public class EndToEndPayloadPerfTest extends WeldTestBase {

public static final int COUNT = 10_000;
public static String input_topic = UUID.randomUUID().toString();
public static String output_topic = UUID.randomUUID().toString();
public String output_topic = UUID.randomUUID().toString();

@BeforeAll
static void insertRecords() throws PulsarClientException {
send(client.newProducer(Schema.BYTES)
.producerName("consumer-perf")
.topic(input_topic)
.create(), COUNT, (i, p) -> p.newMessage().key("key").value(generateRandomPayload(10000)));
.create(), COUNT, (i, p) -> p.newMessage().key("key-" + i).value(generateRandomPayload(10 * 1024)));

}

@BeforeEach
void outputTopic() {
output_topic = UUID.randomUUID().toString();
}

private MapBasedConfig commonConfig() {
return new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", PulsarConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.in.serviceUrl", serviceUrl)
.with("mp.messaging.incoming.in.topic", input_topic)
// .with("mp.messaging.incoming.in.poolMessages", true)
// .with("mp.messaging.incoming.in.autoScaledReceiverQueueSizeEnabled", true)
.with("mp.messaging.incoming.in.tracing-enabled", false)
.with("mp.messaging.incoming.in.cloud-events", false)
.with("mp.messaging.incoming.in.subscriptionInitialPosition", SubscriptionInitialPosition.Earliest)
.with("mp.messaging.incoming.in.schema", "BYTES")
.with("mp.messaging.outgoing.out.connector", PulsarConnector.CONNECTOR_NAME)
.with("mp.messaging.outgoing.out.topic", output_topic)
.with("mp.messaging.outgoing.out.serviceUrl", serviceUrl)
.with("mp.messaging.outgoing.out.batchingEnabled", false)
// .with("mp.messaging.outgoing.out.memoryLimitBytes", 128 * 1024 * 1024)
// .with("mp.messaging.outgoing.out.maxPendingMessages", 500)
// .with("mp.messaging.outgoing.out.maxPendingMessagesAcrossPartitions", 1000)
.with("mp.messaging.outgoing.out.schema", "BYTES");
}

private void waitForOutMessages() {
List<MessageId> messages = new CopyOnWriteArrayList<>();
try {
receive(client.newConsumer(Schema.BYTES)
.subscriptionName(topic + "-consumer-" + UUID.randomUUID())
.subscriptionName(output_topic + "-consumer-")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.poolMessages(true)
.topic(output_topic)
.subscribe(), COUNT, m -> messages.add(m.getMessageId()));
.subscribe(), COUNT, m -> {
messages.add(m.getMessageId());
m.release();
});
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -138,6 +156,20 @@ public void test_hard_worker_processor() {
waitForOutMessages();
}

@ApplicationScoped
public static class MemoryReporter {

@Inject
@Any
PulsarConnector connector;

public void printMemoryUsage(String channel) {
System.out.println("Memory Usage : "
+ ((PulsarClientImpl) connector.getClient(channel)).getMemoryLimitController().currentUsagePercent());
}

}

private static volatile long consumedCPU = System.nanoTime();

private static final Random RANDOM = new Random();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

Expand All @@ -39,12 +39,11 @@
*/
@Tag(TestTags.PERFORMANCE)
@Tag(TestTags.SLOW)
@Disabled
public class EndToEndPerfTest extends WeldTestBase {

public static final int COUNT = 50_000;
public static final int COUNT = 30_000;
public static String input_topic = UUID.randomUUID().toString();
public static String output_topic = UUID.randomUUID().toString();
public String output_topic = UUID.randomUUID().toString();

@BeforeAll
static void insertRecords() throws PulsarClientException {
Expand All @@ -54,6 +53,11 @@ static void insertRecords() throws PulsarClientException {
.create(), COUNT, i -> Integer.toString(i));
}

@BeforeEach
void outputTopic() {
output_topic = UUID.randomUUID().toString();
}

private MapBasedConfig commonConfig() {
return new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", PulsarConnector.CONNECTOR_NAME)
Expand All @@ -71,19 +75,14 @@ private void waitForOutMessages() {
List<MessageId> messages = new CopyOnWriteArrayList<>();
try {
receive(client.newConsumer(Schema.STRING)
.subscriptionName(output_topic + "-consumer-" + UUID.randomUUID())
.subscriptionName(output_topic + "-consumer-")
.topic(output_topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe(), COUNT, m -> messages.add(m.getMessageId()));
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
await()
.atMost(5, TimeUnit.MINUTES)
.until(() -> {
System.out.println(messages.size());
return messages.size() >= COUNT;
});
await().atMost(5, TimeUnit.MINUTES).until(() -> messages.size() >= COUNT);
}

@ApplicationScoped
Expand Down

0 comments on commit 801c833

Please sign in to comment.