From 63bfe2d47b8ab0a48d614511f6d7228d1f284139 Mon Sep 17 00:00:00 2001 From: Amit Kumar Mondal Date: Thu, 18 Jul 2024 10:33:24 +0200 Subject: [PATCH] Added support of custom executor --- .../mqtt5/provider/MessageClientProvider.java | 69 ++++++++++++++++++- .../mqtt5/provider/helper/MessageHelper.java | 37 ++++++++++ 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/MessageClientProvider.java b/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/MessageClientProvider.java index 65f0021..d09b9bd 100644 --- a/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/MessageClientProvider.java +++ b/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/MessageClientProvider.java @@ -20,6 +20,7 @@ import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.MQTT_CONNECTION_READY_SERVICE_PROPERTY; import static in.bytehue.messaging.mqtt5.api.MqttMessageConstants.ConfigurationPid.CLIENT; import static in.bytehue.messaging.mqtt5.provider.helper.MessageHelper.getOptionalService; +import static in.bytehue.messaging.mqtt5.provider.helper.MessageHelper.getOptionalServiceWithoutType; import static java.util.concurrent.TimeUnit.SECONDS; import static org.osgi.service.condition.Condition.CONDITION_ID; import static org.osgi.service.condition.Condition.CONDITION_ID_TRUE; @@ -32,6 +33,11 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.function.Supplier; import javax.net.ssl.HostnameVerifier; @@ -54,6 +60,7 @@ import org.osgi.service.metatype.annotations.Designate; import org.osgi.service.metatype.annotations.ObjectClassDefinition; +import com.hivemq.client.internal.netty.NettyEventLoopProvider; import com.hivemq.client.mqtt.datatypes.MqttClientIdentifier; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedContext; @@ -75,6 +82,7 @@ import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5DisconnectReasonCode; import in.bytehue.messaging.mqtt5.provider.MessageClientProvider.Config; +import in.bytehue.messaging.mqtt5.provider.helper.ThreadFactoryBuilder; @ProvideMessagingFeature @Designate(ocd = Config.class) @@ -143,6 +151,27 @@ public final class MessageClientProvider { @AttributeDefinition(name = "Simple Authentication Password", type = PASSWORD) String password() default ""; + @AttributeDefinition(name = "Custom Executor Configuration") + boolean useCustomExecutor() default false; + + @AttributeDefinition(name = "Custom Executor Number of Threads") + int numberOfThreads() default 5; + + @AttributeDefinition(name = "Custom Executor Prefix of the thread name") + String threadNamePrefix() default "osgi-mqtt"; + + @AttributeDefinition(name = "Custom Executor Suffix of the thread name (supports only {@code %d} format specifier)") + String threadNameSuffix() default "-%d"; + + @AttributeDefinition(name = "Flag to set if the threads will be daemon threads") + boolean isDaemon() default true; + + @AttributeDefinition(name = "Custom Thread Executor Service Class Name (Note that, the service should be an instance of Java Executor)") + String executorTargetClass() default ""; + + @AttributeDefinition(name = "Custom Thread Executor Service Target Filter") + String executorTargetFilter() default ""; + @AttributeDefinition(name = "SSL Configuration") boolean useSSL() default false; @@ -257,6 +286,7 @@ public final class MessageClientProvider { private BundleContext bundleContext; public volatile Config config; + private ScheduledExecutorService customExecutor; private ServiceRegistration readyServiceReg; @Activate @@ -281,6 +311,7 @@ public synchronized Config config() { } private void init(final Config config) { + logger.info("Performing connection"); this.config = config; try { connect(); @@ -290,7 +321,7 @@ private void init(final Config config) { } private void disconnect(final boolean isNormalDisconnection) { - logger.info("Performing disconection"); + logger.info("Performing disconnection"); Mqtt5DisconnectReasonCode reasonCode; String reasonDescription; if (isNormalDisconnection) { @@ -314,6 +345,12 @@ private void disconnect(final boolean isNormalDisconnection) { disconnectParams.noSessionExpiry(); } disconnectParams.send(); + // shutdown the custom executor if used + if (customExecutor != null) { + customExecutor.shutdownNow(); + NettyEventLoopProvider.INSTANCE.releaseEventLoop(customExecutor); + customExecutor = null; + } } private void connect() { @@ -424,6 +461,36 @@ private void connect() { .orElse(null)) .applySslConfig(); } + if (config.useCustomExecutor()) { + logger.debug("Applying Custom Executor Configuration"); + final String clazz = config.executorTargetClass().trim(); + if (clazz.isEmpty()) { + logger.debug("Applying Executor as Non-Service Configuration"); + final ThreadFactory threadFactory = + new ThreadFactoryBuilder() + .setThreadFactoryName(config.threadNamePrefix()) + .setThreadNameFormat(config.threadNameSuffix()) + .setDaemon(config.isDaemon()) + .build(); + customExecutor = Executors.newScheduledThreadPool(config.numberOfThreads(), threadFactory); + ((ScheduledThreadPoolExecutor) customExecutor).setRemoveOnCancelPolicy(true); + clientBuilder.executorConfig() + .nettyExecutor(customExecutor) + .applyExecutorConfig(); + } else { + logger.debug("Applying Executor as Service Configuration"); + String filter = config.executorTargetFilter().trim(); + Optional service = + getOptionalServiceWithoutType( + clazz, + filter, + bundleContext, + logger); + service.ifPresent(executor -> clientBuilder.executorConfig() + .nettyExecutor((Executor) executor) + .applyExecutorConfig()); + } + } if (config.useEnhancedAuthentication()) { logger.debug("Applying Enhanced Authentication Configuration"); clientBuilder.enhancedAuth( diff --git a/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/helper/MessageHelper.java b/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/helper/MessageHelper.java index 38a015f..7e057c3 100644 --- a/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/helper/MessageHelper.java +++ b/in.bytehue.messaging.mqtt5.provider/src/main/java/in/bytehue/messaging/mqtt5/provider/helper/MessageHelper.java @@ -35,6 +35,7 @@ import java.lang.reflect.Array; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -80,6 +81,42 @@ public final class MessageHelper { private MessageHelper() { throw new IllegalAccessError("Non-instantiable"); } + + public static Object getServiceWithoutType(final String clazz, final String filter, final BundleContext context) { + try { + ServiceReference[] references = context.getServiceReferences(clazz, filter); + if (references == null || references.length == 0) { + throw new RuntimeException("'" + clazz + "' service instance cannot be found"); + } + + final ToLongFunction> srFunc = + sr -> Optional.ofNullable(((ServiceReference) sr).getProperty(SERVICE_RANKING)) + .filter(Number.class::isInstance) + .map(Number.class::cast) + .map(Number::longValue) + .orElse(0L); + + return Arrays.stream(references) + .max(comparingLong(srFunc)) // Finds the reference with the highest ranking + .map(context::getService) + .orElseThrow(() -> new RuntimeException("'" + clazz + "' service instance cannot be found")); + } catch (Exception e) { + throw new RuntimeException("Service '" + clazz + "' cannot be retrieved", e); + } + } + + public static Optional getOptionalServiceWithoutType(final String clazz, String filter, final BundleContext context, final Logger logger) { + try { + if (filter.trim().isEmpty()) { + filter = null; + } + final Object service = getServiceWithoutType(clazz, filter, context); + return Optional.ofNullable(service); + } catch (final Exception e) { + logger.warn("Service '{}' cannot be retrieved", clazz); + return Optional.empty(); + } + } public static T getService(final Class clazz, final String filter, final BundleContext context) { try {