Skip to content

Commit

Permalink
Porting the memory leak fix from dev to master (#183)
Browse files Browse the repository at this point in the history
* Fixing a memory leak in RequestResponseLink (#180)


# Conflicts:
#	azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java
#	azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java

* Changing version number.
  • Loading branch information
yvgopal authored Jan 20, 2018
1 parent 009adf5 commit f182602
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 58 deletions.
2 changes: 1 addition & 1 deletion azure-servicebus/azure-servicebus.pom
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>1.1.0</version>
<version>1.1.1</version>
<licenses>
<license>
<name>The MIT License (MIT)</name>
Expand Down
2 changes: 1 addition & 1 deletion azure-servicebus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus-parent</artifactId>
<version>1.1.0</version>
<version>1.1.1</version>
</parent>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,15 +381,7 @@ private void createReceiveLink()

final ReceiveLinkHandler handler = new ReceiveLinkHandler(this);
BaseHandler.setHandler(receiver, handler);
this.underlyingFactory.registerForConnectionError(receiver);

receiver.open();

if (this.receiveLink != null)
{
this.underlyingFactory.deregisterForConnectionError(this.receiveLink);
}

this.receiveLink = receiver;
}

Expand Down Expand Up @@ -587,7 +579,9 @@ public void onOpenComplete(Exception exception)
}

if (exception == null)
{
{
this.underlyingFactory.registerForConnectionError(this.receiveLink);

if (this.linkOpen != null && !this.linkOpen.getWork().isDone())
{
AsyncUtil.completeFuture(this.linkOpen.getWork(), this);
Expand Down Expand Up @@ -631,7 +625,7 @@ public void onOpenComplete(Exception exception)
this.closeAsync();
}
}

this.lastKnownLinkError = exception;
}
}
Expand Down Expand Up @@ -800,6 +794,8 @@ public void onError(Exception exception)
}
else
{
this.underlyingFactory.deregisterForConnectionError(this.receiveLink);

if (exception != null &&
(!(exception instanceof ServiceBusException) || !((ServiceBusException) exception).getIsTransient()))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ public void onOpenComplete(Exception completionException)
{
if (completionException == null)
{
this.underlyingFactory.registerForConnectionError(this.sendLink);
this.lastKnownLinkError = null;
this.retryPolicy.resetRetryCount(this.getClientId());

Expand Down Expand Up @@ -450,6 +451,7 @@ public void onError(Exception completionException)
}
else
{
this.underlyingFactory.deregisterForConnectionError(this.sendLink);
this.lastKnownLinkError = completionException;
this.lastKnownErrorReportedAt = Instant.now();

Expand Down Expand Up @@ -593,16 +595,7 @@ private void createSendLink()

SendLinkHandler handler = new SendLinkHandler(CoreMessageSender.this);
BaseHandler.setHandler(sender, handler);

this.underlyingFactory.registerForConnectionError(sender);
sender.open();

if (this.sendLink != null)
{
final Sender oldSender = this.sendLink;
this.underlyingFactory.deregisterForConnectionError(oldSender);
}

sender.open();
this.sendLink = sender;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ private void closeConnection(ErrorCondition error, Exception cause)
TRACE_LOGGER.info("Closing connection to host");
// Important to copy the reference of the connection as a call to getConnection might create a new connection while we are still in this method
Connection currentConnection = this.connection;
if(connection != null)
if(currentConnection != null)
{
Link[] links = this.registeredLinks.toArray(new Link[0]);

Expand Down Expand Up @@ -392,27 +392,28 @@ public void onEvent()
} catch (IOException e) {
AsyncUtil.completeFutureExceptionally(this.connetionCloseFuture, e);
}

Timer.schedule(new Runnable()
{
@Override
public void run()
{
if (!MessagingFactory.this.connetionCloseFuture.isDone())
{
String errorMessage = "Closing MessagingFactory timed out.";
TRACE_LOGGER.warn(errorMessage);
MessagingFactory.this.connetionCloseFuture.completeExceptionally(new TimeoutException(errorMessage));
}
}
},
this.operationTimeout, TimerType.OneTimeRun);
}
else if(this.connection == null || this.connection.getRemoteState() == EndpointState.CLOSED)
else
{
this.connetionCloseFuture.complete(null);
Timer.unregister(this.getClientId());
}
});

Timer.schedule(new Runnable()
{
@Override
public void run()
{
if (!MessagingFactory.this.connetionCloseFuture.isDone())
{
String errorMessage = "Closing MessagingFactory timed out.";
TRACE_LOGGER.warn(errorMessage);
MessagingFactory.this.connetionCloseFuture.completeExceptionally(new TimeoutException(errorMessage));
}
}
},
this.operationTimeout, TimerType.OneTimeRun);

return this.connetionCloseFuture;
}
Expand Down Expand Up @@ -494,7 +495,10 @@ public void run()
@Override
public void registerForConnectionError(Link link)
{
this.registeredLinks.add(link);
if(link != null)
{
this.registeredLinks.add(link);
}
}

/**
Expand All @@ -503,7 +507,10 @@ public void registerForConnectionError(Link link)
@Override
public void deregisterForConnectionError(Link link)
{
this.registeredLinks.remove(link);
if(link != null)
{
this.registeredLinks.remove(link);
}
}

void scheduleOnReactorThread(final DispatchHandler handler) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ public void onOpenComplete(Exception completionException) {
if(completionException == null)
{
TRACE_LOGGER.debug("Opened internal receive link of requestresponselink to {}", parent.linkPath);
this.parent.underlyingFactory.registerForConnectionError(this.receiveLink);
this.openFuture.complete(null);

// Send unlimited credit
Expand Down Expand Up @@ -511,6 +512,7 @@ public void onError(Exception exception) {
}

TRACE_LOGGER.error("Internal receive link of requestresponselink to '{}' encountered error.", this.parent.linkPath, exception);
this.parent.underlyingFactory.deregisterForConnectionError(this.receiveLink);
this.parent.amqpSender.closeInternals(false);
this.parent.amqpSender.setClosed();
this.parent.completeAllPendingRequestsWithException(exception);
Expand Down Expand Up @@ -547,6 +549,7 @@ public void onClose(ErrorCondition condition) {
else
{
TRACE_LOGGER.error("Internal receive link of requestresponselink to '{}' closed with error.", this.parent.linkPath, exception);
this.parent.underlyingFactory.deregisterForConnectionError(this.receiveLink);
this.parent.amqpSender.closeInternals(false);
this.parent.amqpSender.setClosed();
this.parent.completeAllPendingRequestsWithException(exception);
Expand Down Expand Up @@ -592,13 +595,6 @@ public void onReceiveComplete(Delivery delivery)
}

public void setReceiveLink(Receiver receiveLink) {
if (this.receiveLink != null)
{
Receiver oldReceiver = this.receiveLink;
this.parent.underlyingFactory.deregisterForConnectionError(oldReceiver);
}

this.parent.underlyingFactory.registerForConnectionError(receiveLink);
this.receiveLink = receiveLink;
}
}
Expand Down Expand Up @@ -685,6 +681,7 @@ public void onOpenComplete(Exception completionException) {
if(completionException == null)
{
TRACE_LOGGER.debug("Opened internal send link of requestresponselink to {}", parent.linkPath);
this.parent.underlyingFactory.registerForConnectionError(this.sendLink);
this.openFuture.complete(null);
this.runSendLoop();
}
Expand Down Expand Up @@ -714,6 +711,7 @@ public void onError(Exception exception) {
}

TRACE_LOGGER.error("Internal send link of requestresponselink to '{}' encountered error.", this.parent.linkPath, exception);
this.parent.underlyingFactory.deregisterForConnectionError(this.sendLink);
this.parent.amqpReceiver.closeInternals(false);
this.parent.amqpReceiver.setClosed();
this.parent.completeAllPendingRequestsWithException(exception);
Expand Down Expand Up @@ -750,6 +748,7 @@ public void onClose(ErrorCondition condition) {
else
{
TRACE_LOGGER.error("Internal send link of requestresponselink to '{}' closed with error.", this.parent.linkPath, exception);
this.parent.underlyingFactory.deregisterForConnectionError(this.sendLink);
this.parent.amqpReceiver.closeInternals(false);
this.parent.amqpReceiver.setClosed();
this.parent.completeAllPendingRequestsWithException(exception);
Expand Down Expand Up @@ -819,14 +818,7 @@ public void onSendComplete(Delivery delivery) {
// Doesn't happen as sends are settled on send
}

public void setSendLink(Sender sendLink) {
if (this.sendLink != null)
{
Sender oldSender = this.sendLink;
this.parent.underlyingFactory.deregisterForConnectionError(oldSender);
}

this.parent.underlyingFactory.registerForConnectionError(sendLink);
public void setSendLink(Sender sendLink) {
this.sendLink = sendLink;
this.availableCredit = new AtomicInteger(0);
}
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus-parent</artifactId>
<version>1.1.0</version>
<version>1.1.1</version>
<packaging>pom</packaging>

<url>https://github.com/Azure/azure-service-bus-java</url>
Expand All @@ -13,7 +13,7 @@
<proton-j-version>0.22.0</proton-j-version>
<junit-version>4.12</junit-version>
<slf4j-version>1.8.0-alpha2</slf4j-version>
<client-current-version>1.1.0</client-current-version>
<client-current-version>1.1.1</client-current-version>
</properties>

<modules>
Expand Down

0 comments on commit f182602

Please sign in to comment.