diff --git a/azure-servicebus/azure-servicebus.pom b/azure-servicebus/azure-servicebus.pom index e1ec47f5..839c4502 100644 --- a/azure-servicebus/azure-servicebus.pom +++ b/azure-servicebus/azure-servicebus.pom @@ -4,7 +4,7 @@ 4.0.0 com.microsoft.azure azure-servicebus - 1.1.0 + 1.1.1 The MIT License (MIT) diff --git a/azure-servicebus/pom.xml b/azure-servicebus/pom.xml index def5c575..60773d38 100644 --- a/azure-servicebus/pom.xml +++ b/azure-servicebus/pom.xml @@ -9,7 +9,7 @@ com.microsoft.azure azure-servicebus-parent - 1.1.0 + 1.1.1 diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java index 1a154691..08db5f1e 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java @@ -381,15 +381,7 @@ private void createReceiveLink() final ReceiveLinkHandler handler = new ReceiveLinkHandler(this); BaseHandler.setHandler(receiver, handler); - this.underlyingFactory.registerForConnectionError(receiver); - receiver.open(); - - if (this.receiveLink != null) - { - this.underlyingFactory.deregisterForConnectionError(this.receiveLink); - } - this.receiveLink = receiver; } @@ -587,7 +579,9 @@ public void onOpenComplete(Exception exception) } if (exception == null) - { + { + this.underlyingFactory.registerForConnectionError(this.receiveLink); + if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) { AsyncUtil.completeFuture(this.linkOpen.getWork(), this); @@ -631,7 +625,7 @@ public void onOpenComplete(Exception exception) this.closeAsync(); } } - + this.lastKnownLinkError = exception; } } @@ -800,6 +794,8 @@ public void onError(Exception exception) } else { + this.underlyingFactory.deregisterForConnectionError(this.receiveLink); + if (exception != null && (!(exception instanceof ServiceBusException) || !((ServiceBusException) exception).getIsTransient())) { diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java index cbd72ec9..5f1c0f9a 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java @@ -364,6 +364,7 @@ public void onOpenComplete(Exception completionException) { if (completionException == null) { + this.underlyingFactory.registerForConnectionError(this.sendLink); this.lastKnownLinkError = null; this.retryPolicy.resetRetryCount(this.getClientId()); @@ -450,6 +451,7 @@ public void onError(Exception completionException) } else { + this.underlyingFactory.deregisterForConnectionError(this.sendLink); this.lastKnownLinkError = completionException; this.lastKnownErrorReportedAt = Instant.now(); @@ -593,16 +595,7 @@ private void createSendLink() SendLinkHandler handler = new SendLinkHandler(CoreMessageSender.this); BaseHandler.setHandler(sender, handler); - - this.underlyingFactory.registerForConnectionError(sender); - sender.open(); - - if (this.sendLink != null) - { - final Sender oldSender = this.sendLink; - this.underlyingFactory.deregisterForConnectionError(oldSender); - } - + sender.open(); this.sendLink = sender; } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java index 71b065b9..a4371fed 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java @@ -311,7 +311,7 @@ private void closeConnection(ErrorCondition error, Exception cause) TRACE_LOGGER.info("Closing connection to host"); // Important to copy the reference of the connection as a call to getConnection might create a new connection while we are still in this method Connection currentConnection = this.connection; - if(connection != null) + if(currentConnection != null) { Link[] links = this.registeredLinks.toArray(new Link[0]); @@ -392,27 +392,28 @@ public void onEvent() } catch (IOException e) { AsyncUtil.completeFutureExceptionally(this.connetionCloseFuture, e); } + + Timer.schedule(new Runnable() + { + @Override + public void run() + { + if (!MessagingFactory.this.connetionCloseFuture.isDone()) + { + String errorMessage = "Closing MessagingFactory timed out."; + TRACE_LOGGER.warn(errorMessage); + MessagingFactory.this.connetionCloseFuture.completeExceptionally(new TimeoutException(errorMessage)); + } + } + }, + this.operationTimeout, TimerType.OneTimeRun); } - else if(this.connection == null || this.connection.getRemoteState() == EndpointState.CLOSED) + else { this.connetionCloseFuture.complete(null); + Timer.unregister(this.getClientId()); } }); - - Timer.schedule(new Runnable() - { - @Override - public void run() - { - if (!MessagingFactory.this.connetionCloseFuture.isDone()) - { - String errorMessage = "Closing MessagingFactory timed out."; - TRACE_LOGGER.warn(errorMessage); - MessagingFactory.this.connetionCloseFuture.completeExceptionally(new TimeoutException(errorMessage)); - } - } - }, - this.operationTimeout, TimerType.OneTimeRun); return this.connetionCloseFuture; } @@ -494,7 +495,10 @@ public void run() @Override public void registerForConnectionError(Link link) { - this.registeredLinks.add(link); + if(link != null) + { + this.registeredLinks.add(link); + } } /** @@ -503,7 +507,10 @@ public void registerForConnectionError(Link link) @Override public void deregisterForConnectionError(Link link) { - this.registeredLinks.remove(link); + if(link != null) + { + this.registeredLinks.remove(link); + } } void scheduleOnReactorThread(final DispatchHandler handler) throws IOException diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java index aab98a9c..0d0be657 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java @@ -480,6 +480,7 @@ public void onOpenComplete(Exception completionException) { if(completionException == null) { TRACE_LOGGER.debug("Opened internal receive link of requestresponselink to {}", parent.linkPath); + this.parent.underlyingFactory.registerForConnectionError(this.receiveLink); this.openFuture.complete(null); // Send unlimited credit @@ -511,6 +512,7 @@ public void onError(Exception exception) { } TRACE_LOGGER.error("Internal receive link of requestresponselink to '{}' encountered error.", this.parent.linkPath, exception); + this.parent.underlyingFactory.deregisterForConnectionError(this.receiveLink); this.parent.amqpSender.closeInternals(false); this.parent.amqpSender.setClosed(); this.parent.completeAllPendingRequestsWithException(exception); @@ -547,6 +549,7 @@ public void onClose(ErrorCondition condition) { else { TRACE_LOGGER.error("Internal receive link of requestresponselink to '{}' closed with error.", this.parent.linkPath, exception); + this.parent.underlyingFactory.deregisterForConnectionError(this.receiveLink); this.parent.amqpSender.closeInternals(false); this.parent.amqpSender.setClosed(); this.parent.completeAllPendingRequestsWithException(exception); @@ -592,13 +595,6 @@ public void onReceiveComplete(Delivery delivery) } public void setReceiveLink(Receiver receiveLink) { - if (this.receiveLink != null) - { - Receiver oldReceiver = this.receiveLink; - this.parent.underlyingFactory.deregisterForConnectionError(oldReceiver); - } - - this.parent.underlyingFactory.registerForConnectionError(receiveLink); this.receiveLink = receiveLink; } } @@ -685,6 +681,7 @@ public void onOpenComplete(Exception completionException) { if(completionException == null) { TRACE_LOGGER.debug("Opened internal send link of requestresponselink to {}", parent.linkPath); + this.parent.underlyingFactory.registerForConnectionError(this.sendLink); this.openFuture.complete(null); this.runSendLoop(); } @@ -714,6 +711,7 @@ public void onError(Exception exception) { } TRACE_LOGGER.error("Internal send link of requestresponselink to '{}' encountered error.", this.parent.linkPath, exception); + this.parent.underlyingFactory.deregisterForConnectionError(this.sendLink); this.parent.amqpReceiver.closeInternals(false); this.parent.amqpReceiver.setClosed(); this.parent.completeAllPendingRequestsWithException(exception); @@ -750,6 +748,7 @@ public void onClose(ErrorCondition condition) { else { TRACE_LOGGER.error("Internal send link of requestresponselink to '{}' closed with error.", this.parent.linkPath, exception); + this.parent.underlyingFactory.deregisterForConnectionError(this.sendLink); this.parent.amqpReceiver.closeInternals(false); this.parent.amqpReceiver.setClosed(); this.parent.completeAllPendingRequestsWithException(exception); @@ -819,14 +818,7 @@ public void onSendComplete(Delivery delivery) { // Doesn't happen as sends are settled on send } - public void setSendLink(Sender sendLink) { - if (this.sendLink != null) - { - Sender oldSender = this.sendLink; - this.parent.underlyingFactory.deregisterForConnectionError(oldSender); - } - - this.parent.underlyingFactory.registerForConnectionError(sendLink); + public void setSendLink(Sender sendLink) { this.sendLink = sendLink; this.availableCredit = new AtomicInteger(0); } diff --git a/pom.xml b/pom.xml index ca891f80..f8aaa5e6 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 com.microsoft.azure azure-servicebus-parent - 1.1.0 + 1.1.1 pom https://github.com/Azure/azure-service-bus-java @@ -13,7 +13,7 @@ 0.22.0 4.12 1.8.0-alpha2 - 1.1.0 + 1.1.1