Skip to content

Commit

Permalink
Cherry-picking from master : Fixing the infinite loop that runs to re…
Browse files Browse the repository at this point in the history
…new SAS token after 20 minutes when a connection string contains the SAS token instead of SAS key. (#342) (#346)

# Conflicts:
#	azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CommonRequestResponseOperations.java
#	azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java
#	azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java
  • Loading branch information
yvgopal authored Mar 15, 2019
1 parent bf53563 commit f3ca020
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ static CompletableFuture<Void> sendCBSTokenAsync(RequestResponseLink requestResp
Message requestMessage = RequestResponseUtils.createRequestMessageFromValueBody(ClientConstants.REQUEST_RESPONSE_PUT_TOKEN_OPERATION, securityToken.getTokenValue(), Util.adjustServerTimeout(operationTimeout));
requestMessage.getApplicationProperties().getValue().put(ClientConstants.REQUEST_RESPONSE_PUT_TOKEN_TYPE, securityToken.getTokenType().toString());
requestMessage.getApplicationProperties().getValue().put(ClientConstants.REQUEST_RESPONSE_PUT_TOKEN_AUDIENCE, securityToken.getTokenAudience());
requestMessage.getApplicationProperties().getValue().put(ClientConstants.REQUEST_RESPONSE_PUT_TOKEN_EXPIRATION, securityToken.getValidUntil().toEpochMilli());
CompletableFuture<Message> responseFuture = requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, operationTimeout);
return responseFuture.thenComposeAsync((responseMessage) -> {
CompletableFuture<Void> returningFuture = new CompletableFuture<Void>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,7 @@ private synchronized CompletableFuture<Void> ensureLinkIsOpen()
Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx);
TRACE_LOGGER.error("Sending SAS Token to '{}' failed.", this.receivePath, cause);
this.receiveLinkReopenFuture.completeExceptionally(sendTokenEx);
this.clearAllPendingWorkItems(sendTokenEx);
}
else
{
Expand Down Expand Up @@ -1282,7 +1283,7 @@ private void completePendingUpdateStateWorkItem(Delivery delivery, String delive
}
}

private void clearAllPendingWorkItems(Exception exception)
private void clearAllPendingWorkItems(Throwable exception)
{
TRACE_LOGGER.info("Completeing all pending receive and updateState operation on the receiver to '{}'", this.receivePath);
final boolean isTransientException = exception == null ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ else if (outcome instanceof Released)
}
}

private void clearAllPendingSendsWithException(Exception failureException)
private void clearAllPendingSendsWithException(Throwable failureException)
{
synchronized (this.pendingSendLock)
{
Expand All @@ -623,7 +623,7 @@ private void clearAllPendingSendsWithException(Exception failureException)
}
}

private void cleanupFailedSend(final SendWorkItem<DeliveryState> failedSend, final Exception exception)
private void cleanupFailedSend(final SendWorkItem<DeliveryState> failedSend, final Throwable exception)
{
failedSend.cancelTimeoutTask(false);
ExceptionUtil.completeExceptionally(failedSend.getWork(), exception, this, true);
Expand Down Expand Up @@ -812,6 +812,7 @@ private synchronized CompletableFuture<Void> ensureLinkIsOpen()
Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx);
TRACE_LOGGER.error("Sending SAS Token to '{}' failed.", this.sendPath, cause);
this.sendLinkReopenFuture.completeExceptionally(sendTokenEx);
this.clearAllPendingSendsWithException(sendTokenEx);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ else if (errorCondition.getCondition() == AmqpErrorCode.DecodeError)
return new ServiceBusException(ClientConstants.DEFAULT_IS_TRANSIENT, errorCondition.toString());
}

static <T> void completeExceptionally(CompletableFuture<T> future, Exception exception, IErrorContextProvider contextProvider, boolean completeAsynchronously)
static <T> void completeExceptionally(CompletableFuture<T> future, Throwable exception, IErrorContextProvider contextProvider, boolean completeAsynchronously)
{
if (exception != null && exception instanceof ServiceBusException)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,9 +725,17 @@ private CompletableFuture<Instant> generateAndSendSecurityToken(String sasTokenA

private static ScheduledFuture<?> scheduleRenewTimer(Instant currentTokenValidUntil, Runnable validityRenewer)
{
// It will eventually expire. Renew it
int renewInterval = Util.getTokenRenewIntervalInSeconds((int)Duration.between(Instant.now(), currentTokenValidUntil).getSeconds());
return Timer.schedule(validityRenewer, Duration.ofSeconds(renewInterval), TimerType.OneTimeRun);
if(currentTokenValidUntil == Instant.MAX)
{
// User provided token or will never expire
return null;
}
else
{
// It will eventually expire. Renew it
int renewInterval = Util.getTokenRenewIntervalInSeconds((int)Duration.between(Instant.now(), currentTokenValidUntil).getSeconds());
return Timer.schedule(validityRenewer, Duration.ofSeconds(renewInterval), TimerType.OneTimeRun);
}
}

CompletableFuture<RequestResponseLink> obtainRequestResponseLinkAsync(String entityPath, MessagingEntityType entityType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ private CompletableFuture<Void> recreateInternalLinks()
Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sasTokenEx);
TRACE_LOGGER.error("Sending SAS Token failed. RequestResponseLink path:{}", this.linkPath, cause);
recreateInternalLinksFuture.completeExceptionally(cause);
this.completeAllPendingRequestsWithException(cause);
}
else
{
Expand Down Expand Up @@ -439,7 +440,7 @@ public void run()
return recreateInternalLinksFuture;
}

private void completeAllPendingRequestsWithException(Exception exception)
private void completeAllPendingRequestsWithException(Throwable exception)
{
TRACE_LOGGER.warn("Completing all pending requests with exception in request response link to {}", this.linkPath);
for(RequestResponseWorkItem workItem : this.pendingRequests.values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ public static ClientSettings getClientSettingsFromConnectionStringBuilder(Connec
}
else
{
tokenProvider = new SharedAccessSignatureTokenProvider(builder.getSharedAccessSignatureToken(), Instant.now().plus(Duration.ofSeconds(SecurityConstants.DEFAULT_SAS_TOKEN_VALIDITY_IN_SECONDS)));
tokenProvider = new SharedAccessSignatureTokenProvider(builder.getSharedAccessSignatureToken(), Instant.MAX); // Max validity as we will not generate another token
}

return new ClientSettings(tokenProvider, builder.getRetryPolicy(), builder.getOperationTimeout(), builder.getTransportType());
Expand Down

0 comments on commit f3ca020

Please sign in to comment.