diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java index 4884bb61e628..0a9ee4618b1e 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java @@ -17,14 +17,10 @@ */ package org.apache.beam.sdk.io.solace.broker; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.http.HttpRequestFactory; import com.solacesystems.jcsmp.JCSMPFactory; import java.io.IOException; import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.io.solace.data.Semp.Queue; import org.apache.beam.sdk.util.SerializableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,8 +36,6 @@ @Internal public class BasicAuthSempClient implements SempClient { private static final Logger LOG = LoggerFactory.getLogger(BasicAuthSempClient.class); - private final ObjectMapper objectMapper = - new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); private final SempBasicAuthClientExecutor sempBasicAuthClientExecutor; @@ -58,13 +52,12 @@ public BasicAuthSempClient( @Override public boolean isQueueNonExclusive(String queueName) throws IOException { - LOG.info("SolaceIO.Read: SempOperations: query SEMP if queue {} is nonExclusive", queueName); - BrokerResponse response = sempBasicAuthClientExecutor.getQueueResponse(queueName); - if (response.content == null) { - throw new IOException("SolaceIO: response from SEMP is empty!"); - } - Queue q = mapJsonToClass(response.content, Queue.class); - return q.data().accessType().equals("non-exclusive"); + boolean queueNonExclusive = sempBasicAuthClientExecutor.isQueueNonExclusive(queueName); + LOG.info( + "SolaceIO.Read: SempOperations: queried SEMP if queue {} is non-exclusive: {}", + queueName, + queueNonExclusive); + return queueNonExclusive; } @Override @@ -77,12 +70,7 @@ public com.solacesystems.jcsmp.Queue createQueueForTopic(String queueName, Strin @Override public long getBacklogBytes(String queueName) throws IOException { - BrokerResponse response = sempBasicAuthClientExecutor.getQueueResponse(queueName); - if (response.content == null) { - throw new IOException("SolaceIO: response from SEMP is empty!"); - } - Queue q = mapJsonToClass(response.content, Queue.class); - return q.data().msgSpoolUsage(); + return sempBasicAuthClientExecutor.getBacklogBytes(queueName); } private void createQueue(String queueName) throws IOException { @@ -94,9 +82,4 @@ private void createSubscription(String queueName, String topicName) throws IOExc LOG.info("SolaceIO.Read: Creating new subscription {} for topic {}.", queueName, topicName); sempBasicAuthClientExecutor.createSubscriptionResponse(queueName, topicName); } - - private T mapJsonToClass(String content, Class mapSuccessToClass) - throws JsonProcessingException { - return objectMapper.readValue(content, mapSuccessToClass); - } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java index 99a81f716435..965fc8741374 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java @@ -19,6 +19,9 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.http.GenericUrl; import com.google.api.client.http.HttpContent; import com.google.api.client.http.HttpHeaders; @@ -40,6 +43,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import org.apache.beam.sdk.io.solace.data.Semp.Queue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; @@ -52,7 +56,7 @@ * response is 401 Unauthorized, the client will execute an additional request with Basic Auth * header to refresh the token. */ -class SempBasicAuthClientExecutor implements Serializable { +public class SempBasicAuthClientExecutor implements Serializable { // Every request will be repeated 2 times in case of abnormal connection failures. private static final int REQUEST_NUM_RETRIES = 2; private static final Map COOKIE_MANAGER_MAP = @@ -65,8 +69,10 @@ class SempBasicAuthClientExecutor implements Serializable { private final String password; private final CookieManagerKey cookieManagerKey; private final transient HttpRequestFactory requestFactory; + private final ObjectMapper objectMapper = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - SempBasicAuthClientExecutor( + public SempBasicAuthClientExecutor( String host, String username, String password, @@ -78,7 +84,16 @@ class SempBasicAuthClientExecutor implements Serializable { this.password = password; this.requestFactory = httpRequestFactory; this.cookieManagerKey = new CookieManagerKey(this.baseUrl, this.username); - COOKIE_MANAGER_MAP.putIfAbsent(this.cookieManagerKey, new CookieManager()); + COOKIE_MANAGER_MAP.computeIfAbsent(this.cookieManagerKey, key -> new CookieManager()); + } + + public boolean isQueueNonExclusive(String queueName) throws IOException { + BrokerResponse response = getQueueResponse(queueName); + if (response.content == null) { + throw new IOException("SolaceIO: response from SEMP is empty!"); + } + Queue q = mapJsonToClass(response.content, Queue.class); + return q.data().accessType().equals("non-exclusive"); } private static String getQueueEndpoint(String messageVpn, String queueName) @@ -199,6 +214,20 @@ private static String urlEncode(String queueName) throws UnsupportedEncodingExce return URLEncoder.encode(queueName, StandardCharsets.UTF_8.name()); } + private T mapJsonToClass(String content, Class mapSuccessToClass) + throws JsonProcessingException { + return objectMapper.readValue(content, mapSuccessToClass); + } + + public long getBacklogBytes(String queueName) throws IOException { + BrokerResponse response = getQueueResponse(queueName); + if (response.content == null) { + throw new IOException("SolaceIO: response from SEMP is empty!"); + } + Queue q = mapJsonToClass(response.content, Queue.class); + return q.data().msgSpoolUsage(); + } + private static class CookieManagerKey implements Serializable { private final String baseUrl; private final String username; diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/AddShardKeyDoFn.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/AddShardKeyDoFn.java index 12d8a8507d8a..c55d37942c72 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/AddShardKeyDoFn.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/AddShardKeyDoFn.java @@ -23,8 +23,9 @@ import org.apache.beam.sdk.values.KV; /** - * This class a pseudo-key with a given cardinality. The downstream steps will use state {@literal - * &} timers to distribute the data and control for the number of parallel workers used for writing. + * This class adds pseudo-key with a given cardinality. The downstream steps will use state + * {@literal &} timers to distribute the data and control for the number of parallel workers used + * for writing. */ @Internal public class AddShardKeyDoFn extends DoFn> { diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/BasicAuthMultipleSempClient.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/BasicAuthMultipleSempClient.java new file mode 100644 index 000000000000..637cecdcfd15 --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/BasicAuthMultipleSempClient.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.it; + +import com.google.api.client.http.HttpRequestFactory; +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.io.solace.broker.BasicAuthSempClient; +import org.apache.beam.sdk.io.solace.broker.SempBasicAuthClientExecutor; +import org.apache.beam.sdk.util.SerializableSupplier; + +/** + * Example class showing how the {@link BasicAuthSempClient} can be extended or have functionalities + * overridden. In this case, the modified method is {@link + * BasicAuthSempClient#getBacklogBytes(String)}, which queries multiple SEMP endpoints to collect + * accurate backlog metrics. For usage, see {@link SolaceIOMultipleSempIT}. + */ +public class BasicAuthMultipleSempClient extends BasicAuthSempClient { + private final List sempBacklogBasicAuthClientExecutors; + + public BasicAuthMultipleSempClient( + String mainHost, + List backlogHosts, + String username, + String password, + String vpnName, + SerializableSupplier httpRequestFactorySupplier) { + super(mainHost, username, password, vpnName, httpRequestFactorySupplier); + sempBacklogBasicAuthClientExecutors = + backlogHosts.stream() + .map( + host -> + new SempBasicAuthClientExecutor( + host, username, password, vpnName, httpRequestFactorySupplier.get())) + .collect(Collectors.toList()); + } + + @Override + public long getBacklogBytes(String queueName) throws IOException { + long backlog = 0; + for (SempBasicAuthClientExecutor client : sempBacklogBasicAuthClientExecutors) { + backlog += client.getBacklogBytes(queueName); + } + return backlog; + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/BasicAuthMultipleSempClientFactory.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/BasicAuthMultipleSempClientFactory.java new file mode 100644 index 000000000000..0a548c10555c --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/BasicAuthMultipleSempClientFactory.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.it; + +import com.google.api.client.http.HttpRequestFactory; +import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.auto.value.AutoValue; +import java.util.List; +import org.apache.beam.sdk.io.solace.broker.SempClient; +import org.apache.beam.sdk.io.solace.broker.SempClientFactory; +import org.apache.beam.sdk.util.SerializableSupplier; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Example class showing how to implement a custom {@link SempClientFactory} with custom client. For + * usage, see {@link SolaceIOMultipleSempIT}. + */ +@AutoValue +public abstract class BasicAuthMultipleSempClientFactory implements SempClientFactory { + + public abstract String mainHost(); + + public abstract List backlogHosts(); + + public abstract String username(); + + public abstract String password(); + + public abstract String vpnName(); + + public abstract @Nullable SerializableSupplier httpRequestFactorySupplier(); + + public static Builder builder() { + return new AutoValue_BasicAuthMultipleSempClientFactory.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + /** Set Solace host, format: [Protocol://]Host[:Port]. */ + public abstract Builder mainHost(String host); + + public abstract Builder backlogHosts(List hosts); + + /** Set Solace username. */ + public abstract Builder username(String username); + /** Set Solace password. */ + public abstract Builder password(String password); + + /** Set Solace vpn name. */ + public abstract Builder vpnName(String vpnName); + + abstract Builder httpRequestFactorySupplier( + SerializableSupplier httpRequestFactorySupplier); + + public abstract BasicAuthMultipleSempClientFactory build(); + } + + @Override + public SempClient create() { + return new BasicAuthMultipleSempClient( + mainHost(), + backlogHosts(), + username(), + password(), + vpnName(), + getHttpRequestFactorySupplier()); + } + + @SuppressWarnings("return") + private @NonNull SerializableSupplier getHttpRequestFactorySupplier() { + SerializableSupplier httpRequestSupplier = httpRequestFactorySupplier(); + return httpRequestSupplier != null + ? httpRequestSupplier + : () -> new NetHttpTransport().createRequestFactory(); + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOMultipleSempIT.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOMultipleSempIT.java new file mode 100644 index 000000000000..77d00b4e41ec --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOMultipleSempIT.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.it; + +import static org.apache.beam.sdk.io.solace.it.SolaceContainerManager.TOPIC_NAME; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; +import static org.junit.Assert.assertEquals; + +import com.solacesystems.jcsmp.DeliveryMode; +import java.io.IOException; +import java.util.Arrays; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.io.solace.SolaceIO; +import org.apache.beam.sdk.io.solace.SolaceIO.WriterType; +import org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory; +import org.apache.beam.sdk.io.solace.broker.SempClientFactory; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.io.solace.data.Solace.Queue; +import org.apache.beam.sdk.io.solace.data.SolaceDataUtils; +import org.apache.beam.sdk.io.solace.write.SolaceOutput; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.testutils.metrics.MetricsReader; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +public class SolaceIOMultipleSempIT { + private static final String NAMESPACE = SolaceIOMultipleSempIT.class.getName(); + private static final String READ_COUNT = "read_count"; + private static final String QUEUE_NAME = "test_queue"; + private static final long PUBLISH_MESSAGE_COUNT = 20; + private static final TestPipelineOptions pipelineOptions; + private static SolaceContainerManager solaceContainerManager; + + static { + pipelineOptions = PipelineOptionsFactory.create().as(TestPipelineOptions.class); + pipelineOptions.as(StreamingOptions.class).setStreaming(true); + // For the read connector tests, we need to make sure that p.run() does not block + pipelineOptions.setBlockOnRun(false); + pipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false); + } + + @Rule public final TestPipeline pipeline = TestPipeline.fromOptions(pipelineOptions); + + @BeforeClass + public static void setup() throws IOException { + solaceContainerManager = new SolaceContainerManager(); + solaceContainerManager.start(); + solaceContainerManager.createQueueWithSubscriptionTopic(QUEUE_NAME); + } + + @AfterClass + public static void afterClass() { + if (solaceContainerManager != null) { + solaceContainerManager.stop(); + } + } + + /** + * This test verifies the functionality of reading data from a Solace queue using the + * SolaceIO.read() transform. This test does not actually test functionalities of {@link + * BasicAuthMultipleSempClientFactory}, but it demonstrates how to integrate a custom + * implementation of {@link SempClientFactory}, in this case, {@link + * BasicAuthMultipleSempClientFactory}, to handle authentication and configuration interactions + * with the Solace message broker. + */ + @Test + public void test01writeAndReadWithMultipleSempClientFactory() { + Pipeline writerPipeline = + createWriterPipeline(WriterType.BATCHED, solaceContainerManager.jcsmpPortMapped); + writerPipeline + .apply( + "Read from Solace", + SolaceIO.read() + .from(Queue.fromName(QUEUE_NAME)) + .withMaxNumConnections(1) + .withDeduplicateRecords(true) + .withSempClientFactory( + BasicAuthMultipleSempClientFactory.builder() + .backlogHosts( + Arrays.asList( + "http://localhost:" + solaceContainerManager.sempPortMapped, + "http://localhost:" + solaceContainerManager.sempPortMapped)) + .mainHost("http://localhost:" + solaceContainerManager.sempPortMapped) + .username("admin") + .password("admin") + .vpnName(SolaceContainerManager.VPN_NAME) + .build()) + .withSessionServiceFactory( + BasicAuthJcsmpSessionServiceFactory.builder() + .host("localhost:" + solaceContainerManager.jcsmpPortMapped) + .username(SolaceContainerManager.USERNAME) + .password(SolaceContainerManager.PASSWORD) + .vpnName(SolaceContainerManager.VPN_NAME) + .build())) + .apply("Count", ParDo.of(new CountingFn<>(NAMESPACE, READ_COUNT))); + + PipelineResult pipelineResult = writerPipeline.run(); + // We need enough time for Beam to pull all messages from the queue, but we need a timeout too, + // as the Read connector will keep attempting to read forever. + pipelineResult.waitUntilFinish(Duration.standardSeconds(15)); + + MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE); + long actualRecordsCount = metricsReader.getCounterMetric(READ_COUNT); + assertEquals(PUBLISH_MESSAGE_COUNT, actualRecordsCount); + } + + private Pipeline createWriterPipeline( + SolaceIO.WriterType writerType, int solaceContainerJcsmpPort) { + TestStream.Builder> kvBuilder = + TestStream.create(KvCoder.of(AvroCoder.of(String.class), AvroCoder.of(String.class))) + .advanceWatermarkTo(Instant.EPOCH); + + for (int i = 0; i < PUBLISH_MESSAGE_COUNT; i++) { + String key = "Solace-Message-ID:m" + solaceContainerJcsmpPort + i; + String payload = String.format("{\"field_str\":\"value\",\"field_int\":123%d}", i); + kvBuilder = + kvBuilder + .addElements(KV.of(key, payload)) + .advanceProcessingTime(Duration.standardSeconds(60)); + } + + TestStream> testStream = kvBuilder.advanceWatermarkToInfinity(); + + PCollection> kvs = + pipeline.apply(String.format("Test stream %s", writerType), testStream); + + PCollection records = + kvs.apply( + String.format("To Record %s", writerType), + MapElements.into(TypeDescriptor.of(Solace.Record.class)) + .via(kv -> SolaceDataUtils.getSolaceRecord(kv.getValue(), kv.getKey()))); + + SolaceOutput result = + records.apply( + String.format("Write to Solace %s", writerType), + SolaceIO.write() + .to(Solace.Topic.fromName(TOPIC_NAME)) + .withSubmissionMode(SolaceIO.SubmissionMode.TESTING) + .withWriterType(writerType) + .withDeliveryMode(DeliveryMode.PERSISTENT) + .withNumberOfClientsPerWorker(1) + .withNumShards(1) + .withSessionServiceFactory( + BasicAuthJcsmpSessionServiceFactory.builder() + .host("localhost:" + solaceContainerJcsmpPort) + .username(SolaceContainerManager.USERNAME) + .password(SolaceContainerManager.PASSWORD) + .vpnName(SolaceContainerManager.VPN_NAME) + .build())); + result + .getSuccessfulPublish() + .apply( + String.format("Get ids %s", writerType), + MapElements.into(strings()).via(Solace.PublishResult::getMessageId)); + + return pipeline; + } + + private static class CountingFn extends DoFn { + + private final Counter elementCounter; + + CountingFn(String namespace, String name) { + elementCounter = Metrics.counter(namespace, name); + } + + @ProcessElement + public void processElement(@Element T record, OutputReceiver c) { + elementCounter.inc(1L); + c.output(record); + } + } +}