diff --git a/reactor-core/build.gradle b/reactor-core/build.gradle index 7f696c4aff..760c560191 100644 --- a/reactor-core/build.gradle +++ b/reactor-core/build.gradle @@ -146,8 +146,41 @@ task japicmp(type: JapicmpTask) { //TODO after a release, bump the gradle.properties baseline //TODO after a release, remove the reactor-core exclusions below if any - classExcludes = ["reactor.core.publisher.TopicProcessor", "reactor.core.publisher.WorkQueueProcessor", 'reactor.core.publisher.EventLoopProcessor$Slot'] -// methodExcludes = ["reactor.core.Scannable#operatorName()"] + classExcludes = [ + "reactor.core.publisher.TopicProcessor", + "reactor.core.publisher.WorkQueueProcessor", + 'reactor.core.publisher.EventLoopProcessor$Slot', + 'reactor.util.concurrent.WaitStrategy', + ] + methodExcludes = [ + 'reactor.core.publisher.Flux#onLastAssembly(reactor.core.publisher.Flux)', + 'reactor.core.publisher.Flux#compose(java.util.function.Function)', + 'reactor.core.publisher.Flux#retry(java.util.function.Predicate)', + 'reactor.core.publisher.Flux#retry(long, java.util.function.Predicate)', + 'reactor.core.publisher.Flux#retryBackoff(long, java.time.Duration)', + 'reactor.core.publisher.Flux#retryBackoff(long, java.time.Duration, java.time.Duration)', + 'reactor.core.publisher.Flux#retryBackoff(long, java.time.Duration, java.time.Duration, reactor.core.scheduler.Scheduler)', + 'reactor.core.publisher.Flux#retryBackoff(long, java.time.Duration, java.time.Duration, double)', + 'reactor.core.publisher.Flux#retryBackoff(long, java.time.Duration, java.time.Duration, double, reactor.core.scheduler.Scheduler)', + 'reactor.core.publisher.Flux#retryWhen(java.util.function.Function)', + 'reactor.core.publisher.Flux#usingWhen(org.reactivestreams.Publisher, java.util.function.Function, java.util.function.Function, java.util.function.Function)', + 'reactor.core.publisher.Flux#usingWhen(org.reactivestreams.Publisher, java.util.function.Function, java.util.function.Function, java.util.function.Function, java.util.function.Function)', + + 'reactor.core.publisher.Mono#onLastAssembly(reactor.core.publisher.Mono)', + 'reactor.core.publisher.Mono#compose(java.util.function.Function)', + 'reactor.core.publisher.Mono#retry(java.util.function.Predicate)', + 'reactor.core.publisher.Mono#retry(long, java.util.function.Predicate)', + 'reactor.core.publisher.Mono#retryBackoff(long, java.time.Duration)', + 'reactor.core.publisher.Mono#retryBackoff(long, java.time.Duration, java.time.Duration)', + 'reactor.core.publisher.Mono#retryBackoff(long, java.time.Duration, java.time.Duration, reactor.core.scheduler.Scheduler)', + 'reactor.core.publisher.Mono#retryBackoff(long, java.time.Duration, java.time.Duration, double)', + 'reactor.core.publisher.Mono#retryBackoff(long, java.time.Duration, java.time.Duration, double, reactor.core.scheduler.Scheduler)', + 'reactor.core.publisher.Mono#retryWhen(java.util.function.Function)', + 'reactor.core.publisher.Mono#usingWhen(org.reactivestreams.Publisher, java.util.function.Function, java.util.function.Function, java.util.function.Function)', + 'reactor.core.publisher.Mono#usingWhen(org.reactivestreams.Publisher, java.util.function.Function, java.util.function.Function, java.util.function.Function, java.util.function.Function)', + + 'reactor.core.publisher.ParallelFlux#composeGroup(java.util.function.Function)', + ] } //complements the javadoc.gradle common configuration diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index 6556171e1e..27f507bac0 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -1787,124 +1787,6 @@ public static Flux usingWhen(Publisher resourceSupplier, return usingWhen(resourceSupplier, resourceClosure, asyncCleanup, (resource, error) -> asyncCleanup.apply(resource), asyncCleanup); } - /** - * Uses a resource, generated by a {@link Publisher} for each individual {@link Subscriber}, - * while streaming the values from a {@link Publisher} derived from the same resource. - * Note that all steps of the operator chain that would need the resource to be in an open - * stable state need to be described inside the {@code resourceClosure} {@link Function}. - *

- * Whenever the resulting sequence terminates, the relevant {@link Function} generates - * a "cleanup" {@link Publisher} that is invoked but doesn't change the content of the - * main sequence. Instead it just defers the termination (unless it errors, in which case - * the error suppresses the original termination signal). - *

- * - *

- * Individual cleanups can also be associated with main sequence cancellation and - * error terminations: - *

- * - * - *

- * Note that if the resource supplying {@link Publisher} emits more than one resource, the - * subsequent resources are dropped ({@link Operators#onNextDropped(Object, Context)}). If - * the publisher errors AFTER having emitted one resource, the error is also silently dropped - * ({@link Operators#onErrorDropped(Throwable, Context)}). - * An empty completion or error without at least one onNext signal triggers a short-circuit - * of the main sequence with the same terminal signal (no resource is established, no - * cleanup is invoked). - *

- * Additionally, the terminal signal is replaced by any error that might have happened - * in the terminating {@link Publisher}: - *

- * - *

- * Finally, early cancellations will cancel the resource supplying {@link Publisher}: - *

- * - * - * @param resourceSupplier a {@link Publisher} that "generates" the resource, - * subscribed for each subscription to the main sequence - * @param resourceClosure a factory to derive a {@link Publisher} from the supplied resource - * @param asyncComplete an asynchronous resource cleanup invoked if the resource closure terminates with onComplete or is cancelled - * @param asyncError an asynchronous resource cleanup invoked if the resource closure terminates with onError. - * @param the type of elements emitted by the resource closure, and thus the main sequence - * @param the type of the resource object - * @return a new {@link Flux} built around a "transactional" resource, with several - * termination path triggering asynchronous cleanup sequences - * @deprecated prefer using the {@link #usingWhen(Publisher, Function, Function, BiFunction, Function)} version which is more explicit about all termination cases, - * will be removed in 3.4.0 - */ - @Deprecated - public static Flux usingWhen(Publisher resourceSupplier, - Function> resourceClosure, - Function> asyncComplete, - Function> asyncError) { - //null asyncCancel translates to using the `asyncComplete` function in the operator - return onAssembly(new FluxUsingWhen<>(resourceSupplier, resourceClosure, - asyncComplete, (res, err) -> asyncError.apply(res), null)); - } - - /** - * Uses a resource, generated by a {@link Publisher} for each individual {@link Subscriber}, - * while streaming the values from a {@link Publisher} derived from the same resource. - * Note that all steps of the operator chain that would need the resource to be in an open - * stable state need to be described inside the {@code resourceClosure} {@link Function}. - *

- * Whenever the resulting sequence terminates, the relevant {@link Function} generates - * a "cleanup" {@link Publisher} that is invoked but doesn't change the content of the - * main sequence. Instead it just defers the termination (unless it errors, in which case - * the error suppresses the original termination signal). - *

- * - *

- * Individual cleanups can also be associated with main sequence cancellation and - * error terminations: - *

- * - *

- * Note that if the resource supplying {@link Publisher} emits more than one resource, the - * subsequent resources are dropped ({@link Operators#onNextDropped(Object, Context)}). If - * the publisher errors AFTER having emitted one resource, the error is also silently dropped - * ({@link Operators#onErrorDropped(Throwable, Context)}). - * An empty completion or error without at least one onNext signal triggers a short-circuit - * of the main sequence with the same terminal signal (no resource is established, no - * cleanup is invoked). - *

- * Additionally, the terminal signal is replaced by any error that might have happened - * in the terminating {@link Publisher}: - *

- * - *

- * Finally, early cancellations will cancel the resource supplying {@link Publisher}: - *

- * - * - * @param resourceSupplier a {@link Publisher} that "generates" the resource, - * subscribed for each subscription to the main sequence - * @param resourceClosure a factory to derive a {@link Publisher} from the supplied resource - * @param asyncComplete an asynchronous resource cleanup invoked if the resource closure terminates with onComplete - * @param asyncError an asynchronous resource cleanup invoked if the resource closure terminates with onError. - * @param asyncCancel an asynchronous resource cleanup invoked if the resource closure is cancelled. - * When {@code null}, the {@code asyncComplete} path is used instead. - * @param the type of elements emitted by the resource closure, and thus the main sequence - * @param the type of the resource object - * @return a new {@link Flux} built around a "transactional" resource, with several - * termination path triggering asynchronous cleanup sequences - * @deprecated prefer using the {@link #usingWhen(Publisher, Function, Function, BiFunction, Function)} version which is more explicit about all termination cases, - * will be removed in 3.4.0 - */ - @Deprecated - public static Flux usingWhen(Publisher resourceSupplier, - Function> resourceClosure, - Function> asyncComplete, - Function> asyncError, - //the operator itself accepts null for asyncCancel, but we won't in the public API - Function> asyncCancel) { - return onAssembly(new FluxUsingWhen<>(resourceSupplier, resourceClosure, - asyncComplete, (res, err) -> asyncError.apply(res), asyncCancel)); - } - /** * Uses a resource, generated by a {@link Publisher} for each individual {@link Subscriber}, * while streaming the values from a {@link Publisher} derived from the same resource. @@ -3569,29 +3451,6 @@ public final Mono> collectSortedList(@Nullable Comparator com }); } - /** - * Defer the transformation of this {@link Flux} in order to generate a target {@link Flux} type. - * A transformation will occur for each {@link Subscriber}. For instance: - *

-	 * flux.compose(original -> original.log());
-	 * 
- *

- * - * - * @param transformer the {@link Function} to lazily map this {@link Flux} into a target {@link Publisher} - * instance for each new subscriber - * @param the item type in the returned {@link Publisher} - * - * @return a new {@link Flux} - * @see #transform transform() for immmediate transformation of {@link Flux} - * @see #as as() for a loose conversion to an arbitrary type - * @deprecated will be removed in 3.4.0, use {@link #transformDeferred(Function)} instead - */ - @Deprecated - public final Flux compose(Function, ? extends Publisher> transformer) { - return defer(() -> transformer.apply(this)); - } - /** * Transform the elements emitted by this {@link Flux} asynchronously into Publishers, * then flatten these inner publishers into a single {@link Flux}, sequentially and @@ -7223,92 +7082,6 @@ public final Flux retry(long numRetries) { return onAssembly(new FluxRetry<>(this, numRetries)); } - /** - * Re-subscribes to this {@link Flux} sequence if it signals any error - * that matches the given {@link Predicate}, otherwise push the error downstream. - * - *

- * - * - * @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal - * - * @return a {@link Flux} that retries on onError if the predicates matches. - * @deprecated use {@link #retryWhen(Retry)} instead, to be removed in 3.4 - */ - @Deprecated - public final Flux retry(Predicate retryMatcher) { - return onAssembly(new FluxRetryPredicate<>(this, retryMatcher)); - } - - /** - * Re-subscribes to this {@link Flux} sequence up to the specified number of retries if it signals any - * error that match the given {@link Predicate}, otherwise push the error downstream. - * - *

- * - * - * @param numRetries the number of times to tolerate an error - * @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal - * - * @return a {@link Flux} that retries on onError up to the specified number of retry - * attempts, only if the predicate matches. - * @deprecated use {@link #retryWhen(Retry)} instead, to be removed in 3.4 - */ - @Deprecated - public final Flux retry(long numRetries, Predicate retryMatcher) { - return defer(() -> retry(countingPredicate(retryMatcher, numRetries))); - } - - /** - * Retries this {@link Flux} when a companion sequence signals - * an item in response to this {@link Flux} error signal - *

If the companion sequence signals when the {@link Flux} is active, the retry - * attempt is suppressed and any terminal signal will terminate the {@link Flux} source with the same signal - * immediately. - * - *

- * - *

- * Note that if the companion {@link Publisher} created by the {@code whenFactory} - * emits {@link Context} as trigger objects, these {@link Context} will be merged with - * the previous Context: - *

- *
{@code
-	 * Function, Publisher> customFunction = errorCurrentAttempt -> errorCurrentAttempt.handle((lastError, sink) -> {
-	 * 	    Context ctx = sink.currentContext();
-	 * 	    int rl = ctx.getOrDefault("retriesLeft", 0);
-	 * 	    if (rl > 0) {
-	 *		    sink.next(Context.of(
-	 *		        "retriesLeft", rl - 1,
-	 *		        "lastError", lastError
-	 *		    ));
-	 * 	    } else {
-	 * 	        sink.error(Exceptions.retryExhausted("retries exhausted", lastError));
-	 * 	    }
-	 * });
-	 * Flux retried = originalFlux.retryWhen(customFunction);
-	 * }
- *
- * - * @param whenFactory the {@link Function} that returns the associated {@link Publisher} - * companion, given a {@link Flux} that signals each onError as a {@link Throwable}. - * - * @return a {@link Flux} that retries on onError when the companion {@link Publisher} produces an - * onNext signal - * @deprecated use {@link #retryWhen(Retry)} instead, to be removed in 3.4. Lambda Functions that don't make - * use of the error can simply be converted by wrapping via {@link Retry#from(Function)}. - * Functions that do use the error will additionally need to map the {@link reactor.util.retry.Retry.RetrySignal} - * emitted by the companion to its {@link Retry.RetrySignal#failure()}. - */ - @Deprecated - public final Flux retryWhen(Function, ? extends Publisher> whenFactory) { - Objects.requireNonNull(whenFactory, "whenFactory"); - return onAssembly(new FluxRetryWhen<>(this, Retry.from(fluxRetryWhenState -> fluxRetryWhenState - .map(Retry.RetrySignal::failure) - .as(whenFactory))) - ); - } - /** * Retries this {@link Flux} in response to signals emitted by a companion {@link Publisher}. * The companion is generated by the provided {@link Retry} instance, see {@link Retry#max(long)}, {@link Retry#maxInARow(long)} @@ -7363,215 +7136,6 @@ public final Flux retryWhen(Retry retrySpec) { return onAssembly(new FluxRetryWhen<>(this, retrySpec)); } - /** - * In case of error, retry this {@link Flux} up to {@code numRetries} times using a - * randomized exponential backoff strategy (jitter). The jitter factor is {@code 50%} - * but the effective backoff delay cannot be less than {@code firstBackoff}. - *

- * The randomized exponential backoff is good at preventing two typical issues with - * other simpler backoff strategies, namely: - *

    - *
  • - * having an exponentially growing backoff delay with a small initial delay gives - * the best tradeoff between not overwhelming the server and serving the client as - * fast as possible - *
  • - *
  • - * having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms" - * where eg. numerous clients would hit the server at the same time, causing it to - * display transient failures which would cause all clients to retry at the same - * backoff times, ultimately sparing no load on the server. - *
  • - *
- * - *

- * - * - * @param numRetries the maximum number of attempts before an {@link IllegalStateException} - * is raised (having the original retry-triggering exception as cause). - * @param firstBackoff the first backoff delay to apply then grow exponentially. Also - * minimum delay even taking jitter into account. - * @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries. - * @deprecated use {@link #retryWhen(Retry)} with a {@link Retry#backoff(long, Duration)} base, to be removed in 3.4 - */ - @Deprecated - public final Flux retryBackoff(long numRetries, Duration firstBackoff) { - return retryWhen(Retry.backoff(numRetries, firstBackoff)); - } - - /** - * In case of error, retry this {@link Flux} up to {@code numRetries} times using a - * randomized exponential backoff strategy. The jitter factor is {@code 50%} - * but the effective backoff delay cannot be less than {@code firstBackoff} nor more - * than {@code maxBackoff}. -

- * The randomized exponential backoff is good at preventing two typical issues with - * other simpler backoff strategies, namely: - *

    - *
  • - * having an exponentially growing backoff delay with a small initial delay gives - * the best tradeoff between not overwhelming the server and serving the client as - * fast as possible - *
  • - *
  • - * having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms" - * where eg. numerous clients would hit the server at the same time, causing it to - * display transient failures which would cause all clients to retry at the same - * backoff times, ultimately sparing no load on the server. - *
  • - *
- * - *

- * - * - * @param numRetries the maximum number of attempts before an {@link IllegalStateException} - * is raised (having the original retry-triggering exception as cause). - * @param firstBackoff the first backoff delay to apply then grow exponentially. Also - * minimum delay even taking jitter into account. - * @param maxBackoff the maximum delay to apply despite exponential growth and jitter. - * @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries. - * @deprecated use {@link #retryWhen(Retry)} with a {@link Retry#backoff(long, Duration)} base, to be removed in 3.4 - */ - @Deprecated - public final Flux retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff) { - return retryWhen(Retry - .backoff(numRetries, firstBackoff) - .maxBackoff(maxBackoff)); - } - - /** - * In case of error, retry this {@link Flux} up to {@code numRetries} times using a - * randomized exponential backoff strategy. The jitter factor is {@code 50%} - * but the effective backoff delay cannot be less than {@code firstBackoff} nor more - * than {@code maxBackoff}. The delays and subsequent attempts are materialized on the - * provided backoff {@link Scheduler} (see {@link Mono#delay(Duration, Scheduler)}). -

- * The randomized exponential backoff is good at preventing two typical issues with - * other simpler backoff strategies, namely: - *

    - *
  • - * having an exponentially growing backoff delay with a small initial delay gives - * the best tradeoff between not overwhelming the server and serving the client as - * fast as possible - *
  • - *
  • - * having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms" - * where eg. numerous clients would hit the server at the same time, causing it to - * display transient failures which would cause all clients to retry at the same - * backoff times, ultimately sparing no load on the server. - *
  • - *
- * - *

- * - * - * @param numRetries the maximum number of attempts before an {@link IllegalStateException} - * is raised (having the original retry-triggering exception as cause). - * @param firstBackoff the first backoff delay to apply then grow exponentially. Also - * minimum delay even taking jitter into account. - * @param maxBackoff the maximum delay to apply despite exponential growth and jitter. - * @param backoffScheduler the {@link Scheduler} on which the delays and subsequent attempts are executed. - * @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries. - * @deprecated use {@link #retryWhen(Retry)} with a {@link Retry#backoff(long, Duration)} base, to be removed in 3.4 - */ - @Deprecated - public final Flux retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, Scheduler backoffScheduler) { - return retryWhen(Retry - .backoff(numRetries, firstBackoff) - .maxBackoff(maxBackoff) - .scheduler(backoffScheduler)); - } - - /** - * In case of error, retry this {@link Flux} up to {@code numRetries} times using a - * randomized exponential backoff strategy, randomized with a user-provided jitter - * factor between {@code 0.d} (no jitter) and {@code 1.0} (default is {@code 0.5}). - * Even with the jitter, the effective backoff delay cannot be less than - * {@code firstBackoff} nor more than {@code maxBackoff}. -

- * The randomized exponential backoff is good at preventing two typical issues with - * other simpler backoff strategies, namely: - *

    - *
  • - * having an exponentially growing backoff delay with a small initial delay gives - * the best tradeoff between not overwhelming the server and serving the client as - * fast as possible - *
  • - *
  • - * having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms" - * where eg. numerous clients would hit the server at the same time, causing it to - * display transient failures which would cause all clients to retry at the same - * backoff times, ultimately sparing no load on the server. - *
  • - *
- * - *

- * - * - * @param numRetries the maximum number of attempts before an {@link IllegalStateException} - * is raised (having the original retry-triggering exception as cause). - * @param firstBackoff the first backoff delay to apply then grow exponentially. Also - * minimum delay even taking jitter into account. - * @param maxBackoff the maximum delay to apply despite exponential growth and jitter. - * @param jitterFactor the jitter percentage (as a double between 0.0 and 1.0). - * @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries. - * @deprecated use {@link #retryWhen(Retry)} with a {@link Retry#backoff(long, Duration)} base - */ - @Deprecated - public final Flux retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor) { - return retryWhen(Retry - .backoff(numRetries, firstBackoff) - .maxBackoff(maxBackoff) - .jitter(jitterFactor)); - } - - /** - * In case of error, retry this {@link Flux} up to {@code numRetries} times using a - * randomized exponential backoff strategy, randomized with a user-provided jitter - * factor between {@code 0.d} (no jitter) and {@code 1.0} (default is {@code 0.5}). - * Even with the jitter, the effective backoff delay cannot be less than - * {@code firstBackoff} nor more than {@code maxBackoff}. The delays and subsequent - * attempts are executed on the provided backoff {@link Scheduler} (see - * {@link Mono#delay(Duration, Scheduler)}). -

- * The randomized exponential backoff is good at preventing two typical issues with - * other simpler backoff strategies, namely: - *

    - *
  • - * having an exponentially growing backoff delay with a small initial delay gives - * the best tradeoff between not overwhelming the server and serving the client as - * fast as possible - *
  • - *
  • - * having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms" - * where eg. numerous clients would hit the server at the same time, causing it to - * display transient failures which would cause all clients to retry at the same - * backoff times, ultimately sparing no load on the server. - *
  • - *
- * - *

- * - * - * @param numRetries the maximum number of attempts before an {@link IllegalStateException} - * is raised (having the original retry-triggering exception as cause). - * @param firstBackoff the first backoff delay to apply then grow exponentially. Also - * minimum delay even taking jitter into account. - * @param maxBackoff the maximum delay to apply despite exponential growth and jitter. - * @param backoffScheduler the {@link Scheduler} on which the delays and subsequent attempts are executed. - * @param jitterFactor the jitter percentage (as a double between 0.0 and 1.0). - * @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries. - * @deprecated use {@link #retryWhen(Retry)} with a {@link Retry#backoff(long, Duration)} base - */ - @Deprecated - public final Flux retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor, Scheduler backoffScheduler) { - return retryWhen(Retry - .backoff(numRetries, firstBackoff) - .maxBackoff(maxBackoff) - .jitter(jitterFactor) - .scheduler(backoffScheduler)); - } - /** * Sample this {@link Flux} by periodically emitting an item corresponding to that * {@link Flux} latest emitted value within the periodical time window. @@ -9873,28 +9437,6 @@ protected static Flux onAssembly(Flux source) { return source; } - /** - * To be used by custom operators: invokes assembly {@link Hooks} pointcut given a - * {@link Flux}, potentially returning a new {@link Flux}. This is for example useful - * to activate cross-cutting concerns at assembly time, eg. a generalized - * {@link #checkpoint()}. - * - * @param the value type - * @param source the source to apply assembly hooks onto - * - * @return the source, potentially wrapped with assembly time cross-cutting behavior - * @deprecated use {@link Operators#onLastAssembly(CorePublisher)} - */ - @SuppressWarnings("unchecked") - @Deprecated - protected static Flux onLastAssembly(Flux source) { - Function hook = Hooks.onLastOperatorHook; - if(hook == null) { - return source; - } - return (Flux)Objects.requireNonNull(hook.apply(source), "LastOperator hook returned null"); - } - /** * To be used by custom operators: invokes assembly {@link Hooks} pointcut given a * {@link ConnectableFlux}, potentially returning a new {@link ConnectableFlux}. This diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxRetryPredicate.java b/reactor-core/src/main/java/reactor/core/publisher/FluxRetryPredicate.java deleted file mode 100644 index 197c54d217..0000000000 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxRetryPredicate.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.core.publisher; - -import java.util.Objects; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.function.Predicate; - -import reactor.core.CorePublisher; -import reactor.core.CoreSubscriber; -import reactor.core.Exceptions; - -/** - * Repeatedly subscribes to the source if the predicate returns true after - * completion of the previous subscription. - * - * @param the value type - * @see Reactive-Streams-Commons - */ -final class FluxRetryPredicate extends InternalFluxOperator { - - final Predicate predicate; - - FluxRetryPredicate(Flux source, Predicate predicate) { - super(source); - this.predicate = Objects.requireNonNull(predicate, "predicate"); - } - - @Override - public CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { - - RetryPredicateSubscriber parent = new RetryPredicateSubscriber<>(source, - actual, - predicate); - - actual.onSubscribe(parent); - - if (!parent.isCancelled()) { - parent.resubscribe(); - } - return null; - } - - @Override - public Object scanUnsafe(Attr key) { - if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; - return super.scanUnsafe(key); - } - - static final class RetryPredicateSubscriber - extends Operators.MultiSubscriptionSubscriber { - - final CorePublisher source; - - final Predicate predicate; - - volatile int wip; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater WIP = - AtomicIntegerFieldUpdater.newUpdater(RetryPredicateSubscriber.class, "wip"); - - long produced; - - RetryPredicateSubscriber(CorePublisher source, - CoreSubscriber actual, Predicate predicate) { - super(actual); - this.source = source; - this.predicate = predicate; - } - - @Override - public void onNext(T t) { - produced++; - - actual.onNext(t); - } - - @Override - public void onError(Throwable t) { - boolean b; - - try { - b = predicate.test(t); - } catch (Throwable e) { - Throwable _t = Operators.onOperatorError(e, actual.currentContext()); - _t = Exceptions.addSuppressed(_t, t); - actual.onError(_t); - return; - } - - if (b) { - resubscribe(); - } else { - actual.onError(t); - } - } - - @Override - public void onComplete() { - - actual.onComplete(); - } - - void resubscribe() { - if (WIP.getAndIncrement(this) == 0) { - do { - if (isCancelled()) { - return; - } - - long c = produced; - if (c != 0L) { - produced = 0L; - produced(c); - } - - source.subscribe(this); - - } while (WIP.decrementAndGet(this) != 0); - } - } - - @Override - public Object scanUnsafe(Attr key) { - if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; - return super.scanUnsafe(key); - } - } -} diff --git a/reactor-core/src/main/java/reactor/core/publisher/Mono.java b/reactor-core/src/main/java/reactor/core/publisher/Mono.java index 1c55fb451a..1816b4ae5c 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Mono.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Mono.java @@ -29,7 +29,6 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.BiPredicate; @@ -40,7 +39,6 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.logging.Level; -import java.util.stream.LongStream; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -799,125 +797,6 @@ public static Mono usingWhen(Publisher resourceSupplier, asyncCleanup); } - /** - * Uses a resource, generated by a {@link Publisher} for each individual {@link Subscriber}, - * to derive a {@link Mono}. Note that all steps of the operator chain that would need the - * resource to be in an open stable state need to be described inside the {@code resourceClosure} - * {@link Function}. - *

- * Unlike in {@link Flux#usingWhen(Publisher, Function, Function, Function) the Flux counterpart}, - * ALL signals are deferred until the {@link Mono} terminates and the relevant {@link Function} - * generates and invokes a "cleanup" {@link Publisher}. This is because a failure in the cleanup Publisher - * must result in a lone {@code onError} signal in the downstream {@link Mono} (any potential value in the - * derived {@link Mono} is discarded). Here are the various scenarios that can play out: - *

    - *
  • empty Mono, asyncComplete ends with {@code onComplete()}: downstream receives {@code onComplete()}
  • - *
  • empty Mono, asyncComplete ends with {@code onError(t)}: downstream receives {@code onError(t)}
  • - *
  • valued Mono, asyncComplete ends with {@code onComplete()}: downstream receives {@code onNext(value),onComplete()}
  • - *
  • valued Mono, asyncComplete ends with {@code onError(t)}: downstream receives {@code onError(t)}, {@code value} is discarded
  • - *
  • error(e) Mono, errorComplete ends with {@code onComplete()}: downstream receives {@code onError(e)}
  • - *
  • error(e) Mono, errorComplete ends with {@code onError(t)}: downstream receives {@code onError(t)}, t suppressing e
  • - *
- *

- * - *

- * A dedicated cleanup can also be associated with mono error termination: - *

- * - *

- * Note that if the resource supplying {@link Publisher} emits more than one resource, the - * subsequent resources are dropped ({@link Operators#onNextDropped(Object, Context)}). If - * the publisher errors AFTER having emitted one resource, the error is also silently dropped - * ({@link Operators#onErrorDropped(Throwable, Context)}). - * An empty completion or error without at least one onNext signal (no resource supplied) - * triggers a short-circuit of the main sequence with the same terminal signal - * (no cleanup is invoked). - * - * @reactor.discard This operator discards the element if the {@code asyncComplete} handler fails. - * - * @param resourceSupplier a {@link Publisher} that "generates" the resource, - * subscribed for each subscription to the main sequence - * @param resourceClosure a factory to derive a {@link Mono} from the supplied resource - * @param asyncComplete an asynchronous resource cleanup invoked if the resource closure terminates with onComplete - * @param asyncError an asynchronous resource cleanup invoked if the resource closure terminates with onError - * @param the type of elements emitted by the resource closure, and thus the main sequence - * @param the type of the resource object - * @return a new {@link Mono} built around a "transactional" resource, with deferred emission until the - * asynchronous cleanup sequence relevant to the termination signal completes - * @deprecated prefer using the {@link #usingWhen(Publisher, Function, Function, BiFunction, Function)} version which is more explicit about all termination cases, - * will be removed in 3.4.0 - */ - @Deprecated - public static Mono usingWhen(Publisher resourceSupplier, - Function> resourceClosure, - Function> asyncComplete, - Function> asyncError) { - return onAssembly(new MonoUsingWhen<>(resourceSupplier, resourceClosure, - asyncComplete, (res, err) -> asyncError.apply(res), null)); - } - - /** - * Uses a resource, generated by a {@link Publisher} for each individual {@link Subscriber}, - * to derive a {@link Mono}.Note that all steps of the operator chain that would need the - * resource to be in an open stable state need to be described inside the {@code resourceClosure} - * {@link Function}. - *

- * Unlike in {@link Flux#usingWhen(Publisher, Function, Function, Function, Function) the Flux counterpart}, - * ALL signals are deferred until the {@link Mono} terminates and the relevant {@link Function} - * generates and invokes a "cleanup" {@link Publisher}. This is because a failure in the cleanup Publisher - * must result in a lone {@code onError} signal in the downstream {@link Mono} (any potential value in the - * derived {@link Mono} is discarded). Here are the various scenarios that can play out: - *

    - *
  • empty Mono, asyncComplete ends with {@code onComplete()}: downstream receives {@code onComplete()}
  • - *
  • empty Mono, asyncComplete ends with {@code onError(t)}: downstream receives {@code onError(t)}
  • - *
  • valued Mono, asyncComplete ends with {@code onComplete()}: downstream receives {@code onNext(value),onComplete()}
  • - *
  • valued Mono, asyncComplete ends with {@code onError(t)}: downstream receives {@code onError(t)}, {@code value} is discarded
  • - *
  • error(e) Mono, errorComplete ends with {@code onComplete()}: downstream receives {@code onError(e)}
  • - *
  • error(e) Mono, errorComplete ends with {@code onError(t)}: downstream receives {@code onError(t)}, t suppressing e
  • - *
- *

- * - *

- * Individual cleanups can also be associated with mono cancellation and - * error terminations: - *

- * - *

- * Note that if the resource supplying {@link Publisher} emits more than one resource, the - * subsequent resources are dropped ({@link Operators#onNextDropped(Object, Context)}). If - * the publisher errors AFTER having emitted one resource, the error is also silently dropped - * ({@link Operators#onErrorDropped(Throwable, Context)}). - * An empty completion or error without at least one onNext signal (no resource supplied) - * triggers a short-circuit of the main sequence with the same terminal signal - * (no cleanup is invoked). - * - * @reactor.discard This operator discards the element if the {@code asyncComplete} handler fails. - * - * @param resourceSupplier a {@link Publisher} that "generates" the resource, - * subscribed for each subscription to the main sequence - * @param resourceClosure a factory to derive a {@link Mono} from the supplied resource - * @param asyncComplete an asynchronous resource cleanup invoked if the resource closure terminates with onComplete - * @param asyncError an asynchronous resource cleanup invoked if the resource closure terminates with onError - * @param asyncCancel an asynchronous resource cleanup invoked if the resource closure is cancelled. - * By default the {@code asyncComplete} path is used. - * @param the type of elements emitted by the resource closure, and thus the main sequence - * @param the type of the resource object - * @return a new {@link Mono} built around a "transactional" resource, with deferred emission until the - * asynchronous cleanup sequence relevant to the termination signal completes - * @deprecated prefer using the {@link #usingWhen(Publisher, Function, Function, BiFunction, Function)} version which is more explicit about all termination cases, - * will be removed in 3.4.0 - */ - @Deprecated - public static Mono usingWhen(Publisher resourceSupplier, - Function> resourceClosure, - Function> asyncComplete, - Function> asyncError, - //the operator itself accepts null for asyncCancel, but we won't in the public API - Function> asyncCancel) { - return onAssembly(new MonoUsingWhen<>(resourceSupplier, resourceClosure, - asyncComplete, (res, err) -> asyncError.apply(res), asyncCancel)); - } - /** * Uses a resource, generated by a {@link Publisher} for each individual {@link Subscriber}, * to derive a {@link Mono}.Note that all steps of the operator chain that would need the @@ -1947,31 +1826,6 @@ public final Mono checkpoint(@Nullable String description, boolean forceStack return new MonoOnAssembly<>(this, stacktrace); } - /** - * Defer the given transformation to this {@link Mono} in order to generate a - * target {@link Mono} type. A transformation will occur for each - * {@link Subscriber}. For instance: - * - *

-	 * mono.compose(original -> original.log());
-	 * 
- *

- * - * - * @param transformer the {@link Function} to lazily map this {@link Mono} into a target {@link Mono} - * instance upon subscription. - * @param the item type in the returned {@link Publisher} - * - * @return a new {@link Mono} - * @see #as as() for a loose conversion to an arbitrary type - * @see #transform(Function) - * @deprecated will be removed in 3.4.0, use {@link #transformDeferred(Function)} instead - */ - @Deprecated - public final Mono compose(Function, ? extends Publisher> transformer) { - return transformDeferred(transformer); - } - /** * Concatenate emissions of this {@link Mono} with the provided {@link Publisher} * (no interleave). @@ -3645,91 +3499,6 @@ public final Mono retry(long numRetries) { return onAssembly(new MonoRetry<>(this, numRetries)); } - /** - * Re-subscribes to this {@link Mono} sequence if it signals any error - * that matches the given {@link Predicate}, otherwise push the error downstream. - * - *

- * - * - * @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal - * - * @return a {@link Mono} that retries on onError if the predicates matches. - * @deprecated use {@link #retryWhen(Retry)} instead, to be removed in 3.4 - */ - @Deprecated - public final Mono retry(Predicate retryMatcher) { - return onAssembly(new MonoRetryPredicate<>(this, retryMatcher)); - } - - /** - * Re-subscribes to this {@link Mono} sequence up to the specified number of retries if it signals any - * error that match the given {@link Predicate}, otherwise push the error downstream. - * - *

- * - * - * @param numRetries the number of times to tolerate an error - * @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal - * - * @return a {@link Mono} that retries on onError up to the specified number of retry - * attempts, only if the predicate matches. - * @deprecated use {@link #retryWhen(Retry)} instead, to be removed in 3.4 - */ - @Deprecated - public final Mono retry(long numRetries, Predicate retryMatcher) { - return defer(() -> retry(Flux.countingPredicate(retryMatcher, numRetries))); - } - - /** - * Retries this {@link Mono} when a companion sequence signals - * an item in response to this {@link Mono} error signal - *

If the companion sequence signals when the {@link Mono} is active, the retry - * attempt is suppressed and any terminal signal will terminate the {@link Mono} source with the same signal - * immediately. - * - *

- * - *

- * Note that if the companion {@link Publisher} created by the {@code whenFactory} - * emits {@link Context} as trigger objects, the content of these Context will be added - * to the operator's own {@link Context}: - *

- *
-	 * {@code
-	 * Function, Publisher> customFunction = errorCurrentAttempt -> errorCurrentAttempt.handle((lastError, sink) -> {
-	 * 	    Context ctx = sink.currentContext();
-	 * 	    int rl = ctx.getOrDefault("retriesLeft", 0);
-	 * 	    if (rl > 0) {
-	 *		    sink.next(Context.of(
-	 *		        "retriesLeft", rl - 1,
-	 *		        "lastError", lastError
-	 *		    ));
-	 * 	    } else {
-	 * 	        sink.error(Exceptions.retryExhausted("retries exhausted", lastError));
-	 * 	    }
-	 * });
-	 * Mono retried = originalMono.retryWhen(customFunction);
-	 * }
- *
- * - * @param whenFactory the {@link Function} that returns the associated {@link Publisher} - * companion, given a {@link Flux} that signals each onError as a {@link Throwable}. - * - * @return a {@link Mono} that retries on onError when the companion {@link Publisher} produces an - * onNext signal - * @deprecated use {@link #retryWhen(Retry)} instead, to be removed in 3.4. Lambda Functions that don't make - * use of the error can simply be converted by wrapping via {@link Retry#from(Function)}. - * Functions that do use the error will additionally need to map the {@link reactor.util.retry.Retry.RetrySignal} - * emitted by the companion to its {@link Retry.RetrySignal#failure()}. - */ - @Deprecated - public final Mono retryWhen(Function, ? extends Publisher> whenFactory) { - Objects.requireNonNull(whenFactory, "whenFactory"); - return onAssembly(new MonoRetryWhen<>(this, Retry.from(rws -> whenFactory.apply(rws.map( - Retry.RetrySignal::failure))))); - } - /** * Retries this {@link Mono} in response to signals emitted by a companion {@link Publisher}. * The companion is generated by the provided {@link Retry} instance, see {@link Retry#max(long)}, {@link Retry#maxInARow(long)} @@ -3785,208 +3554,6 @@ public final Mono retryWhen(Retry retrySpec) { return onAssembly(new MonoRetryWhen<>(this, retrySpec)); } - /** - * In case of error, retry this {@link Mono} up to {@code numRetries} times using a - * randomized exponential backoff strategy (jitter). The jitter factor is {@code 50%} - * but the effective backoff delay cannot be less than {@code firstBackoff}. - *

- * The randomized exponential backoff is good at preventing two typical issues with - * other simpler backoff strategies, namely: - *

    - *
  • - * having an exponentially growing backoff delay with a small initial delay gives - * the best tradeoff between not overwhelming the server and serving the client as - * fast as possible - *
  • - *
  • - * having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms" - * where eg. numerous clients would hit the server at the same time, causing it to - * display transient failures which would cause all clients to retry at the same - * backoff times, ultimately sparing no load on the server. - *
  • - *
- * - *

- * - * - * @param numRetries the maximum number of attempts before an {@link IllegalStateException} - * is raised (having the original retry-triggering exception as cause). - * @param firstBackoff the first backoff delay to apply then grow exponentially. Also - * minimum delay even taking jitter into account. - * @return a {@link Mono} that retries on onError with exponentially growing randomized delays between retries. - * @deprecated use {@link #retryWhen(Retry)} with a {@link Retry#backoff(long, Duration)} base, to be removed in 3.4 - */ - @Deprecated - public final Mono retryBackoff(long numRetries, Duration firstBackoff) { - return retryWhen(Retry.backoff(numRetries, firstBackoff)); - } - - /** - * In case of error, retry this {@link Mono} up to {@code numRetries} times using a - * randomized exponential backoff strategy. The jitter factor is {@code 50%} - * but the effective backoff delay cannot be less than {@code firstBackoff} nor more - * than {@code maxBackoff}. - *

- * The randomized exponential backoff is good at preventing two typical issues with - * other simpler backoff strategies, namely: - *

    - *
  • - * having an exponentially growing backoff delay with a small initial delay gives - * the best tradeoff between not overwhelming the server and serving the client as - * fast as possible - *
  • - *
  • - * having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms" - * where eg. numerous clients would hit the server at the same time, causing it to - * display transient failures which would cause all clients to retry at the same - * backoff times, ultimately sparing no load on the server. - *
  • - *
- * - *

- * - * - * @param numRetries the maximum number of attempts before an {@link IllegalStateException} - * is raised (having the original retry-triggering exception as cause). - * @param firstBackoff the first backoff delay to apply then grow exponentially. Also - * minimum delay even taking jitter into account. - * @param maxBackoff the maximum delay to apply despite exponential growth and jitter. - * @return a {@link Mono} that retries on onError with exponentially growing randomized delays between retries. - * @deprecated use {@link #retryWhen(Retry)} with a {@link Retry#backoff(long, Duration)} base, to be removed in 3.4 - */ - @Deprecated - public final Mono retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff) { - return retryBackoff(numRetries, firstBackoff, maxBackoff, 0.5d); - } - - /** - * In case of error, retry this {@link Mono} up to {@code numRetries} times using a - * randomized exponential backoff strategy. The jitter factor is {@code 50%} - * but the effective backoff delay cannot be less than {@code firstBackoff} nor more - * than {@code maxBackoff}. The delays and subsequent attempts are materialized on the - * provided backoff {@link Scheduler} (see {@link Mono#delay(Duration, Scheduler)}). -

- * The randomized exponential backoff is good at preventing two typical issues with - * other simpler backoff strategies, namely: - *

    - *
  • - * having an exponentially growing backoff delay with a small initial delay gives - * the best tradeoff between not overwhelming the server and serving the client as - * fast as possible - *
  • - *
  • - * having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms" - * where eg. numerous clients would hit the server at the same time, causing it to - * display transient failures which would cause all clients to retry at the same - * backoff times, ultimately sparing no load on the server. - *
  • - *
- * - *

- * - * - * @param numRetries the maximum number of attempts before an {@link IllegalStateException} - * is raised (having the original retry-triggering exception as cause). - * @param firstBackoff the first backoff delay to apply then grow exponentially. Also - * minimum delay even taking jitter into account. - * @param maxBackoff the maximum delay to apply despite exponential growth and jitter. - * @param backoffScheduler the {@link Scheduler} on which the delays and subsequent attempts are executed. - * @return a {@link Mono} that retries on onError with exponentially growing randomized delays between retries. - * @deprecated use {@link #retryWhen(Retry)} with a {@link Retry#backoff(long, Duration)} base, to be removed in 3.4 - */ - @Deprecated - public final Mono retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, Scheduler backoffScheduler) { - return retryBackoff(numRetries, firstBackoff, maxBackoff, 0.5d, backoffScheduler); - } - - /** - * In case of error, retry this {@link Mono} up to {@code numRetries} times using a - * randomized exponential backoff strategy, randomized with a user-provided jitter - * factor between {@code 0.d} (no jitter) and {@code 1.0} (default is {@code 0.5}). - * Even with the jitter, the effective backoff delay cannot be less than - * {@code firstBackoff} nor more than {@code maxBackoff}. - *

- * The randomized exponential backoff is good at preventing two typical issues with - * other simpler backoff strategies, namely: - *

    - *
  • - * having an exponentially growing backoff delay with a small initial delay gives - * the best tradeoff between not overwhelming the server and serving the client as - * fast as possible - *
  • - *
  • - * having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms" - * where eg. numerous clients would hit the server at the same time, causing it to - * display transient failures which would cause all clients to retry at the same - * backoff times, ultimately sparing no load on the server. - *
  • - *
- * - *

- * - * - * @param numRetries the maximum number of attempts before an {@link IllegalStateException} - * is raised (having the original retry-triggering exception as cause). - * @param firstBackoff the first backoff delay to apply then grow exponentially. Also - * minimum delay even taking jitter into account. - * @param maxBackoff the maximum delay to apply despite exponential growth and jitter. - * @param jitterFactor the jitter percentage (as a double between 0.0 and 1.0). - * @return a {@link Mono} that retries on onError with exponentially growing randomized delays between retries. - * @deprecated use {@link #retryWhen(Retry)} with a {@link Retry#backoff(long, Duration)} base, to be removed in 3.4 - */ - @Deprecated - public final Mono retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor) { - return retryBackoff(numRetries, firstBackoff, maxBackoff, jitterFactor, Schedulers.parallel()); - } - - /** - * In case of error, retry this {@link Mono} up to {@code numRetries} times using a - * randomized exponential backoff strategy, randomized with a user-provided jitter - * factor between {@code 0.d} (no jitter) and {@code 1.0} (default is {@code 0.5}). - * Even with the jitter, the effective backoff delay cannot be less than - * {@code firstBackoff} nor more than {@code maxBackoff}. The delays and subsequent - * attempts are executed on the provided backoff {@link Scheduler} (see - * {@link Mono#delay(Duration, Scheduler)}). -

- * The randomized exponential backoff is good at preventing two typical issues with - * other simpler backoff strategies, namely: - *

    - *
  • - * having an exponentially growing backoff delay with a small initial delay gives - * the best tradeoff between not overwhelming the server and serving the client as - * fast as possible - *
  • - *
  • - * having a jitter, or randomized backoff delay, is beneficial in avoiding "retry-storms" - * where eg. numerous clients would hit the server at the same time, causing it to - * display transient failures which would cause all clients to retry at the same - * backoff times, ultimately sparing no load on the server. - *
  • - *
- * - *

- * - * - * @param numRetries the maximum number of attempts before an {@link IllegalStateException} - * is raised (having the original retry-triggering exception as cause). - * @param firstBackoff the first backoff delay to apply then grow exponentially. Also - * minimum delay even taking jitter into account. - * @param maxBackoff the maximum delay to apply despite exponential growth and jitter. - * @param backoffScheduler the {@link Scheduler} on which the delays and subsequent attempts are executed. - * @param jitterFactor the jitter percentage (as a double between 0.0 and 1.0). - * @return a {@link Mono} that retries on onError with exponentially growing randomized delays between retries. - * @deprecated use {@link #retryWhen(Retry)} with a {@link Retry#backoff(long, Duration)} base, to be removed in 3.4 - */ - @Deprecated - public final Mono retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor, Scheduler backoffScheduler) { - return retryWhen(Retry - .backoff(numRetries, firstBackoff) - .maxBackoff(maxBackoff) - .jitter(jitterFactor) - .scheduler(backoffScheduler) - .transientErrors(false)); - } - /** * Expect exactly one item from this {@link Mono} source or signal * {@link java.util.NoSuchElementException} for an empty source. @@ -4850,28 +4417,6 @@ protected static Mono onAssembly(Mono source) { return source; } - /** - * To be used by custom operators: invokes assembly {@link Hooks} pointcut given a - * {@link Mono}, potentially returning a new {@link Mono}. This is for example useful - * to activate cross-cutting concerns at assembly time, eg. a generalized - * {@link #checkpoint()}. - * - * @param the value type - * @param source the source to apply assembly hooks onto - * - * @return the source, potentially wrapped with assembly time cross-cutting behavior - * @deprecated use {@link Operators#onLastAssembly(CorePublisher)} - */ - @SuppressWarnings("unchecked") - @Deprecated - protected static Mono onLastAssembly(Mono source) { - Function hook = Hooks.onLastOperatorHook; - if(hook == null) { - return source; - } - return (Mono)Objects.requireNonNull(hook.apply(source), "LastOperator hook returned null"); - } - @Override public String toString() { return getClass().getSimpleName(); diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoRetryPredicate.java b/reactor-core/src/main/java/reactor/core/publisher/MonoRetryPredicate.java deleted file mode 100644 index 3ac72a0c4e..0000000000 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoRetryPredicate.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package reactor.core.publisher; - -import java.util.Objects; -import java.util.function.Predicate; - -import reactor.core.CoreSubscriber; - -/** - * Repeatedly subscribes to the source if the predicate returns true after completion of - * the previous subscription. - * - * @param the value type - * @see Reactive-Streams-Commons - */ -final class MonoRetryPredicate extends InternalMonoOperator { - - final Predicate predicate; - - MonoRetryPredicate(Mono source, - Predicate predicate) { - super(source); - this.predicate = Objects.requireNonNull(predicate, "predicate"); - } - - @Override - public CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { - - FluxRetryPredicate.RetryPredicateSubscriber parent = - new FluxRetryPredicate.RetryPredicateSubscriber<>(source, - actual, predicate); - - actual.onSubscribe(parent); - - if (!parent.isCancelled()) { - parent.resubscribe(); - } - return null; - } - - @Override - public Object scanUnsafe(Attr key) { - if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; - return super.scanUnsafe(key); - } -} diff --git a/reactor-core/src/main/java/reactor/core/publisher/ParallelFlux.java b/reactor-core/src/main/java/reactor/core/publisher/ParallelFlux.java index 38d18409d1..ebdda84ea1 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/ParallelFlux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/ParallelFlux.java @@ -307,33 +307,6 @@ public final Mono> collectSortedList(Comparator comparator, return merged; } - - /** - * Allows composing operators off the 'rails', as individual {@link GroupedFlux} instances keyed by - * the zero based rail's index. The transformed groups are {@link Flux#parallel parallelized} back - * once the transformation has been applied. - *

- * Note that like in {@link #groups()}, requests and cancellation compose through, and - * cancelling only one rail may result in undefined behavior. - * - * @param composer the composition function to apply on each {@link GroupedFlux rail} - * @param the type of the resulting parallelized flux - * @return a {@link ParallelFlux} of the composed groups - * @deprecated will be removed in 3.4.0. Use {@link #transformGroups(Function)} instead - */ - @Deprecated - public final ParallelFlux composeGroup(Function, - ? extends Publisher> composer) { - if (getPrefetch() > -1) { - return from(groups().flatMap(composer::apply), - parallelism(), getPrefetch(), - Queues.small()); - } - else { - return from(groups().flatMap(composer::apply), parallelism()); - } - } - /** * Generates and concatenates Publishers on each 'rail', signalling errors immediately * and generating 2 publishers upfront. @@ -1022,7 +995,7 @@ public final Disposable subscribe( @Nullable Consumer onNext, @Nullable Consumer onError, @Nullable Runnable onComplete) { - return subscribe(onNext, onError, onComplete, (Context) null); + return this.subscribe(onNext, onError, onComplete, null, (Context) null); } @Override @@ -1062,7 +1035,6 @@ public final Disposable subscribe( * @param onComplete callback on completion signal * @param initialContext {@link Context} for the rails */ - @Deprecated public final Disposable subscribe( @Nullable Consumer onNext, @Nullable Consumer onError, @@ -1299,27 +1271,6 @@ protected static ParallelFlux onAssembly(ParallelFlux source) { return source; } - /** - * Invoke {@link Hooks} pointcut given a {@link ParallelFlux} and returning an - * eventually new {@link ParallelFlux} - * - * @param the value type - * @param source the source to wrap - * - * @return the potentially wrapped source - * @deprecated use {@link Operators#onLastAssembly(CorePublisher)} - */ - @SuppressWarnings("unchecked") - @Deprecated - protected static ParallelFlux onLastAssembly(ParallelFlux source) { - Function hook = Hooks.onLastOperatorHook; - if (hook == null) { - return source; - } - return (ParallelFlux) Objects.requireNonNull(hook.apply(source), - "LastOperator hook returned null"); - } - @SuppressWarnings("unchecked") static ParallelFlux doOnSignal(ParallelFlux source, @Nullable Consumer onNext, diff --git a/reactor-core/src/main/java/reactor/core/publisher/RingBuffer.java b/reactor-core/src/main/java/reactor/core/publisher/RingBuffer.java deleted file mode 100644 index 8b6448bc72..0000000000 --- a/reactor-core/src/main/java/reactor/core/publisher/RingBuffer.java +++ /dev/null @@ -1,1476 +0,0 @@ -/* - * Copyright (c) 2011-2019 Pivotal Software Inc, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package reactor.core.publisher; - -import java.lang.reflect.Field; -import java.nio.Buffer; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.concurrent.locks.LockSupport; -import java.util.function.LongSupplier; -import java.util.function.Supplier; - -import reactor.util.Logger; -import reactor.util.Loggers; -import reactor.util.annotation.Nullable; -import reactor.util.concurrent.Queues; -import reactor.util.concurrent.WaitStrategy; -import sun.misc.Unsafe; - -import static java.util.Arrays.copyOf; - -/** - * Ring based store of reusable entries containing the data representing an event being exchanged between event producer - * and ringbuffer consumers. - * @param implementation storing the data for sharing during exchange or parallel coordination of an event. - * - * This is an adaption of the original LMAX Disruptor RingBuffer code from - * https://lmax-exchange.github.io/disruptor/. - */ -@SuppressWarnings("deprecation") -abstract class RingBuffer implements LongSupplier { - - static void addSequence(final T holder, - final AtomicReferenceFieldUpdater updater, - final Sequence sequence) { - - Sequence[] updatedSequences; - Sequence[] currentSequences; - - do { - currentSequences = updater.get(holder); - updatedSequences = copyOf(currentSequences, currentSequences.length + 1); - - updatedSequences[currentSequences.length] = sequence; - } - while (!updater.compareAndSet(holder, currentSequences, updatedSequences)); - } - - private static int countMatching(T[] values, final T toMatch) { - int numToRemove = 0; - for (T value : values) { - if (value == toMatch) // Specifically uses identity - { - numToRemove++; - } - } - return numToRemove; - } - - static boolean removeSequence(final T holder, - final AtomicReferenceFieldUpdater sequenceUpdater, - final Sequence sequence) { - int numToRemove; - Sequence[] oldSequences; - Sequence[] newSequences; - - do { - oldSequences = sequenceUpdater.get(holder); - - numToRemove = countMatching(oldSequences, sequence); - - if (0 == numToRemove) { - break; - } - - final int oldSize = oldSequences.length; - newSequences = new Sequence[oldSize - numToRemove]; - - for (int i = 0, pos = 0; i < oldSize; i++) { - final Sequence testSequence = oldSequences[i]; - if (sequence != testSequence) { - newSequences[pos++] = testSequence; - } - } - } - while (!sequenceUpdater.compareAndSet(holder, oldSequences, newSequences)); - - return numToRemove != 0; - } - - /** - * Set to -1 as sequence starting point - */ - static final long INITIAL_CURSOR_VALUE = -1L; - - /** - * Create a new multiple producer RingBuffer with the specified wait strategy. - *

See {@code MultiProducerRingBuffer}. - * @param the element type - * @param factory used to create the events within the ring buffer. - * @param bufferSize number of elements to create within the ring buffer. - * @param waitStrategy used to determine how to wait for new elements to become available. - * @param spinObserver the Runnable to call on a spin loop wait - * @return the new RingBuffer instance - */ - static RingBuffer createMultiProducer(Supplier factory, - int bufferSize, - WaitStrategy waitStrategy, Runnable spinObserver) { - - if (hasUnsafe()) { - MultiProducerRingBuffer - sequencer = new MultiProducerRingBuffer(bufferSize, waitStrategy, spinObserver); - - return new UnsafeRingBuffer<>(factory, sequencer); - } - else { - throw new IllegalStateException("This JVM does not support sun.misc.Unsafe"); - } - } - - /** - * Create a new single producer RingBuffer with the specified wait strategy. - *

See {@code MultiProducerRingBuffer}. - * @param the element type - * @param factory used to create the events within the ring buffer. - * @param bufferSize number of elements to create within the ring buffer. - * @param waitStrategy used to determine how to wait for new elements to become available. - * @return the new RingBuffer instance - */ - static RingBuffer createSingleProducer(Supplier factory, - int bufferSize, - WaitStrategy waitStrategy) { - return createSingleProducer(factory, bufferSize, waitStrategy, null); - } - - /** - * Create a new single producer RingBuffer with the specified wait strategy. - *

See {@code MultiProducerRingBuffer}. - * @param the element type - * @param factory used to create the events within the ring buffer. - * @param bufferSize number of elements to create within the ring buffer. - * @param waitStrategy used to determine how to wait for new elements to become available. - * @param spinObserver called each time the next claim is spinning and waiting for a slot - * @return the new RingBuffer instance - */ - static RingBuffer createSingleProducer(Supplier factory, - int bufferSize, - WaitStrategy waitStrategy, - @Nullable Runnable spinObserver) { - SingleProducerSequencer - sequencer = new SingleProducerSequencer(bufferSize, waitStrategy, spinObserver); - - if (hasUnsafe() && Queues.isPowerOfTwo(bufferSize)) { - return new UnsafeRingBuffer<>(factory, sequencer); - } - else { - return new NotFunRingBuffer<>(factory, sequencer); - } - } - - /** - * Get the minimum sequence from an array of {@link Sequence}s. - * - * @param sequences to compare. - * @param minimum an initial default minimum. If the array is empty this value will be returned. - * - * @return the minimum sequence found or Long.MAX_VALUE if the array is empty. - */ - static long getMinimumSequence(final Sequence[] sequences, long minimum) { - for (int i = 0, n = sequences.length; i < n; i++) { - long value = sequences[i].getAsLong(); - minimum = Math.min(minimum, value); - } - - return minimum; - } - - /** - * Get the minimum sequence from an array of {@link Sequence}s. - * - * @param excludeSequence to exclude from search. - * @param sequences to compare. - * @param minimum an initial default minimum. If the array is empty this value will be returned. - * - * @return the minimum sequence found or Long.MAX_VALUE if the array is empty. - */ - static long getMinimumSequence(@Nullable Sequence excludeSequence, final Sequence[] sequences, long minimum) { - for (int i = 0, n = sequences.length; i < n; i++) { - if (excludeSequence == null || sequences[i] != excludeSequence) { - long value = sequences[i].getAsLong(); - minimum = Math.min(minimum, value); - } - } - - return minimum; - } - - /** - * Return the {@code sun.misc.Unsafe} instance if found on the classpath and can be used for acclerated - * direct memory access. - * - * @param the Unsafe type - * @return the Unsafe instance - */ - @SuppressWarnings("unchecked") - static T getUnsafe() { - return (T) UnsafeSupport.getUnsafe(); - } - - /** - * Calculate the log base 2 of the supplied integer, essentially reports the location of the highest bit. - * - * @param i Value to calculate log2 for. - * - * @return The log2 value - */ - static int log2(int i) { - int r = 0; - while ((i >>= 1) != 0) { - ++r; - } - return r; - } - - /** - * @param init the initial value - * - * @return a safe or unsafe sequence set to the passed init value - */ - static Sequence newSequence(long init) { - if (hasUnsafe()) { - return new UnsafeSequence(init); - } - else { - return new AtomicSequence(init); - } - } - - /** - * Add the specified gating sequence to this instance of the Disruptor. It will safely and atomically be added to - * the list of gating sequences and not RESET to the current ringbuffer cursor. - * @param gatingSequence The sequences to add. - */ - abstract void addGatingSequence(Sequence gatingSequence); - - /** - * @return the fixed buffer size - */ - abstract int bufferSize(); - - /** - *

Get the event for a given sequence in the RingBuffer.

- * - *

This call has 2 uses. Firstly use this call when publishing to a ring buffer. After calling {@link - * RingBuffer#next()} use this call to get hold of the preallocated event to fill with data before calling {@link - * RingBuffer#publish(long)}.

- * - *

Secondly use this call when consuming data from the ring buffer. After calling {@link - * Reader#waitFor)} call this method with any value greater than that your - * current consumer sequence - * and less than or equal to the value returned from the {@link Reader#waitFor)} method.

- * @param sequence for the event - * @return the event for the given sequence - */ - abstract E get(long sequence); - - @Override - public long getAsLong() { - return getCursor(); - } - - /** - * Get the current cursor value for the ring buffer. The actual value received will depend on the type of {@code - * RingBufferProducer} that is being used. - *

- * See {@code MultiProducerRingBuffer}. - * See {@code SingleProducerSequencer} - * @return the current cursor value - */ - abstract long getCursor(); - - /** - * Get the minimum sequence value from all of the gating sequences added to this ringBuffer. - * @return The minimum gating sequence or the cursor sequence if no sequences have been added. - */ - abstract long getMinimumGatingSequence(); - - /** - * Get the minimum sequence value from all of the gating sequences added to this ringBuffer. - * @param sequence the target sequence - * @return The minimum gating sequence or the cursor sequence if no sequences have been added. - */ - abstract long getMinimumGatingSequence(Sequence sequence); - - /** - * Get the buffered count - * @return the buffered count - */ - abstract int getPending(); - - /** - * - * @return the current list of read cursors - */ - Sequence[] getSequenceReceivers() { - return getSequencer().getGatingSequences(); - } - - /** - * Create a new {@link Reader} to track - * which - * messages are available to be read - * from the ring buffer given a list of sequences to track. - * @return A sequence barrier that will track the ringbuffer. - * @see Reader - */ - abstract Reader newReader(); - - /** - * Increment and return the next sequence for the ring buffer. Calls of this method should ensure that they always - * publish the sequence afterward. E.g. - *

-	 * long sequence = ringBuffer.next();
-	 * try {
-	 *     Event e = ringBuffer.get(sequence);
-	 *     // Do some work with the event.
-	 * } finally {
-	 *     ringBuffer.publish(sequence);
-	 * }
-	 * 
- * @return The next sequence to publish to. - * @see RingBuffer#publish(long) - * @see RingBuffer#get(long) - */ - abstract long next(); - - /** - * The same functionality as {@link RingBuffer#next()}, but allows the caller to claim the next n sequences. - *

- * See {@code RingBufferProducer.next(int)} - * @param n number of slots to claim - * @return sequence number of the highest slot claimed - */ - abstract long next(int n); - - /** - * Publish the specified sequence. This action marks this particular message as being available to be read. - * @param sequence the sequence to publish. - */ - abstract void publish(long sequence); - /** - * Remove the specified sequence from this ringBuffer. - * @param sequence to be removed. - * @return true if this sequence was found, false otherwise. - */ - abstract boolean removeGatingSequence(Sequence sequence); - - abstract RingBufferProducer getSequencer();/* - - - /** - * Return {@code true} if {@code sun.misc.Unsafe} was found on the classpath and can be used for accelerated - * direct memory access. - * @return true if unsafe is present - */ - static boolean hasUnsafe() { - return HAS_UNSAFE; - } - - static boolean hasUnsafe0() { - return UnsafeSupport.hasUnsafe(); - } - - private static final boolean HAS_UNSAFE = hasUnsafe0(); - - /** - *

Concurrent sequence class used for tracking the progress of - * the ring buffer and event processors. Support a number - * of concurrent operations including CAS and order writes. - * - *

Also attempts to be more efficient with regards to false - * sharing by adding padding around the volatile field. - */ - interface Sequence extends LongSupplier - { - long INITIAL_VALUE = INITIAL_CURSOR_VALUE; - - /** - * Perform an ordered write of this sequence. The intent is - * a Store/Store barrier between this write and any previous - * store. - * - * @param value The new value for the sequence. - */ - void set(long value); - - /** - * Perform a compare and set operation on the sequence. - * - * @param expectedValue The expected current value. - * @param newValue The value to update to. - * @return true if the operation succeeds, false otherwise. - */ - boolean compareAndSet(long expectedValue, long newValue); - - } - - /** - * Used for Gating ringbuffer consumers on a cursor sequence and optional dependent ringbuffer consumer(s), - * using the given WaitStrategy. - */ - static final class Reader { - private final WaitStrategy waitStrategy; - private volatile boolean alerted = false; - private final Sequence cursorSequence; - private final RingBufferProducer sequenceProducer; - - Reader(final RingBufferProducer sequenceProducer, - final WaitStrategy waitStrategy, - final Sequence cursorSequence) { - this.sequenceProducer = sequenceProducer; - this.waitStrategy = waitStrategy; - this.cursorSequence = cursorSequence; - } - - /** - * Wait for the given sequence to be available for consumption. - * - * @param consumer a spin observer to invoke when nothing is available to read - * @param sequence to wait for - * @return the sequence up to which is available - * @throws InterruptedException if the thread needs awaking on a condition variable. - */ - long waitFor(final long sequence, Runnable consumer) - throws InterruptedException { - if (alerted) - { - WaitStrategy.alert(); - } - - long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, consumer); - - if (availableSequence < sequence) { - return availableSequence; - } - - return sequenceProducer.getHighestPublishedSequence(sequence, availableSequence); - } - /** - * The current alert status for the barrier. - * - * @return true if in alert otherwise false. - */ - boolean isAlerted() { - return alerted; - } - - /** - * Alert the ringbuffer consumers of a status change and stay in this status until cleared. - */ - void alert() { - alerted = true; - waitStrategy.signalAllWhenBlocking(); - } - - /** - * Signal the ringbuffer consumers. - */ - void signal() { - waitStrategy.signalAllWhenBlocking(); - } - - /** - * Clear the current alert status. - */ - void clearAlert() { - alerted = false; - } - } -} - -// UnsafeSupport static initialization is derived from Netty's PlatformDependent0 -// static initialization, the original licence of which is included below, verbatim. -// Modifications to the source material are: -// - a shorter amount of checks and fields (focusing on Unsafe and buffer address) -// - modifications to the logging messages and their level -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -enum UnsafeSupport { - ; - - static final Logger logger = Loggers.getLogger(UnsafeSupport.class); - - static { - String javaSpecVersion = System.getProperty("java.specification.version"); - logger.debug("Starting UnsafeSupport init in Java " + javaSpecVersion); - - ByteBuffer direct = ByteBuffer.allocateDirect(1); - Unsafe unsafe; - - // Get an Unsafe instance first, via the (still legit as of Java 9) - // deep reflection trick on theUnsafe Field - Object maybeUnsafe; - try { - final Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe"); - unsafeField.setAccessible(true); - // the unsafe instance - maybeUnsafe = unsafeField.get(null); - } catch (NoSuchFieldException | SecurityException | IllegalAccessException e) { - maybeUnsafe = e; - } - - // the conditional check here can not be replaced with checking that maybeUnsafe - // is an instanceof Unsafe and reversing the if and else blocks; this is because an - // instanceof check against Unsafe will trigger a class load and we might not have - // the runtime permission accessClassInPackage.sun.misc - if (maybeUnsafe instanceof Throwable) { - unsafe = null; - logger.debug("Unsafe unavailable - Could not get it via Field sun.misc.Unsafe.theUnsafe", (Throwable) maybeUnsafe); - } else { - unsafe = (Unsafe) maybeUnsafe; - logger.trace("sun.misc.Unsafe.theUnsafe ok"); - } - - // ensure the unsafe supports all necessary methods to work around the mistake in the latest OpenJDK - // https://github.com/netty/netty/issues/1061 - // https://www.mail-archive.com/jdk6-dev@openjdk.java.net/msg00698.html - if (unsafe != null) { - final Unsafe finalUnsafe = unsafe; - Object maybeException; - try { - finalUnsafe.getClass().getDeclaredMethod( - "copyMemory", Object.class, long.class, Object.class, long.class, long.class); - maybeException = null; - } catch (NoSuchMethodException | SecurityException e) { - maybeException = e; - } - - if (maybeException == null) { - logger.trace("sun.misc.Unsafe.copyMemory ok"); - } else { - unsafe = null; - logger.debug("Unsafe unavailable - failed on sun.misc.Unsafe.copyMemory", (Throwable) maybeException); - } - } - - // finally check the Buffer#address - if (unsafe != null) { - final Unsafe finalUnsafe = unsafe; - Object maybeAddressField; - try { - final Field field = Buffer.class.getDeclaredField("address"); - - // Use Unsafe to read value of the address field. - // This way it will not fail on JDK9+ which forbids changing the - // access level via reflection. - final long offset = finalUnsafe.objectFieldOffset(field); - final long heapAddress = finalUnsafe.getLong(ByteBuffer.allocate(1), offset); - final long directAddress = finalUnsafe.getLong(direct, offset); - - if (heapAddress != 0 && "1.8".equals(javaSpecVersion)) { - maybeAddressField = new IllegalStateException("A heap buffer must have 0 address in Java 8, got " + heapAddress); - } - else if (heapAddress == 0 && !"1.8".equals(javaSpecVersion)) { - maybeAddressField = new IllegalStateException("A heap buffer must have non-zero address in Java " + javaSpecVersion); - } - else if (directAddress == 0) { - maybeAddressField = new IllegalStateException("A direct buffer must have non-zero address"); - } - else { - maybeAddressField = field; - } - - } catch (NoSuchFieldException | SecurityException e) { - maybeAddressField = e; - } - - if (maybeAddressField instanceof Throwable) { - logger.debug("Unsafe unavailable - failed on java.nio.Buffer.address", (Throwable) maybeAddressField); - - // If we cannot access the address of a direct buffer, there's no point in using unsafe. - // Let's just pretend unsafe is unavailable for overall simplicity. - unsafe = null; - } - else { - logger.trace("java.nio.Buffer.address ok"); - logger.debug("Unsafe is available"); - } - } - - UNSAFE = unsafe; - } - - static Unsafe getUnsafe(){ - return UNSAFE; - } - - static boolean hasUnsafe() { - return UNSAFE != null; - } - - private static final Unsafe UNSAFE; -} - -/** - * Base class for the various sequencer types (single/multi). Provides common functionality like the management of - * gating sequences (add/remove) and ownership of the current cursor. - */ -@SuppressWarnings("deprecation") -abstract class RingBufferProducer { - - static final AtomicReferenceFieldUpdater - SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(RingBufferProducer.class, RingBuffer.Sequence[].class, - "gatingSequences"); - - final Runnable spinObserver; - final int bufferSize; - final WaitStrategy waitStrategy; - final RingBuffer.Sequence cursor = RingBuffer.newSequence( - RingBuffer.INITIAL_CURSOR_VALUE); - volatile RingBuffer.Sequence[] gatingSequences = new RingBuffer.Sequence[0]; - - /** - * Create with the specified buffer size and wait strategy. - * - * @param bufferSize The total number of entries, must be a positive power of 2. - * @param waitStrategy The {@link WaitStrategy} to use. - * @param spinObserver an iteration observer - */ - RingBufferProducer(int bufferSize, WaitStrategy waitStrategy, @Nullable Runnable spinObserver) { - this.spinObserver = spinObserver; - this.bufferSize = bufferSize; - this.waitStrategy = waitStrategy; - } - - /** - * Get the current cursor value. - * - * @return current cursor value - */ - final long getCursor() { - return cursor.getAsLong(); - } - - /** - * The capacity of the data structure to hold entries. - * - * @return the size of the RingBuffer. - */ - final int getBufferSize() { - return bufferSize; - } - - /** - * Add the specified gating sequences to this instance of the Disruptor. They will - * safely and atomically added to the list of gating sequences. - * - * @param gatingSequence The sequences to add. - */ - final void addGatingSequence(RingBuffer.Sequence gatingSequence) { - RingBuffer.addSequence(this, SEQUENCE_UPDATER, gatingSequence); - } - - /** - * Remove the specified sequence from this sequencer. - * - * @param sequence to be removed. - * @return true if this sequence was found, false otherwise. - */ - boolean removeGatingSequence(RingBuffer.Sequence sequence) { - return RingBuffer.removeSequence(this, SEQUENCE_UPDATER, sequence); - } - - /** - * Get the minimum sequence value from all of the gating sequences - * added to this ringBuffer. - * - * @param excludeSequence to exclude from search - * @return The minimum gating sequence or the cursor sequence if - * no sequences have been added. - */ - long getMinimumSequence(@Nullable RingBuffer.Sequence excludeSequence) { - return RingBuffer.getMinimumSequence(excludeSequence, gatingSequences, cursor.getAsLong()); - } - - /** - * Create a new {@link RingBuffer.Reader} to be used by an EventProcessor to track which messages - * are available to be read from the ring buffer - * - * @see RingBuffer.Reader - * @return A sequence barrier that will track the specified sequences. - */ - RingBuffer.Reader newBarrier() { - return new RingBuffer.Reader(this, waitStrategy, cursor); - } - - /** - * Get the highest sequence number that can be safely read from the ring buffer. Depending - * on the implementation of the Sequencer this call may need to get a number of values - * in the Sequencer. The get will range from nextSequence to availableSequence. If - * there are no available values >= nextSequence the return value will be - * nextSequence - 1. To work correctly a consumer should pass a value that - * it 1 higher than the last sequence that was successfully processed. - * - * @param nextSequence The sequence to start scanning from. - * @param availableSequence The sequence to get to. - * @return The highest value that can be safely read, will be at least nextSequence - 1. - */ - abstract long getHighestPublishedSequence(long nextSequence, long availableSequence); - - /** - * Get the pending capacity for this sequencer. - * @return The number of slots pending consuming. - */ - abstract long getPending(); - - /** - * Claim the next event in sequence for publishing. - * @return the claimed sequence value - */ - abstract long next(); - - /** - * Claim the next n events in sequence for publishing. This is for batch event producing. Using batch producing - * requires a little care and some math. - *

-	 * int n = 10;
-	 * long hi = sequencer.next(n);
-	 * long lo = hi - (n - 1);
-	 * for (long sequence = lo; sequence <= hi; sequence++) {
-	 *     // Do work.
-	 * }
-	 * sequencer.publish(lo, hi);
-	 * 
- * - * @param n the number of sequences to claim - * @return the highest claimed sequence value - */ - abstract long next(int n); - - /** - * Publishes a sequence. Call when the event has been filled. - * - * @param sequence the sequence number to be published - */ - abstract void publish(long sequence); - - /** - * - * @return the gating sequences array - */ - RingBuffer.Sequence[] getGatingSequences() { - return gatingSequences; - } -} - -abstract class SingleProducerSequencerPad extends RingBufferProducer -{ - protected long p1, p2, p3, p4, p5, p6, p7; - @SuppressWarnings("deprecation") - SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy, @Nullable Runnable spinObserver) - { - super(bufferSize, waitStrategy, spinObserver); - } -} - -abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad -{ - @SuppressWarnings("deprecation") - SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy, @Nullable Runnable spinObserver) - { - super(bufferSize, waitStrategy, spinObserver); - } - - /** Set to -1 as sequence starting point */ - protected long nextValue = RingBuffer.Sequence.INITIAL_VALUE; - protected long cachedValue = RingBuffer.Sequence.INITIAL_VALUE; -} - -/** - *

Coordinator for claiming sequences for access to a data structure while tracking dependent {@link RingBuffer.Sequence}s. - * Not safe for use from multiple threads as it does not implement any barriers.

- * - *

Note on {@code RingBufferProducer.getCursor()}: With this sequencer the cursor value is updated after the call - * to {@code RingBufferProducer.publish(long)} is made. - */ - -final class SingleProducerSequencer extends SingleProducerSequencerFields { - protected long p1, p2, p3, p4, p5, p6, p7; - - /** - * Construct a Sequencer with the selected wait strategy and buffer size. - * - * @param bufferSize the size of the buffer that this will sequence over. - * @param waitStrategy for those waiting on sequences. - * @param spinObserver the runnable to call on a spin-wait - */ - @SuppressWarnings("deprecation") - SingleProducerSequencer(int bufferSize, final WaitStrategy waitStrategy, @Nullable Runnable spinObserver) { - super(bufferSize, waitStrategy, spinObserver); - } - - /** - * See {@code RingBufferProducer.next()}. - */ - @Override - long next() { - return next(1); - } - - /** - * See {@code RingBufferProducer.next(int)}. - */ - @Override - long next(int n) { - long nextValue = this.nextValue; - - long nextSequence = nextValue + n; - long wrapPoint = nextSequence - bufferSize; - long cachedGatingSequence = this.cachedValue; - - if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) - { - long minSequence; - while (wrapPoint > (minSequence = RingBuffer.getMinimumSequence(gatingSequences, nextValue))) - { - if(spinObserver != null) { - spinObserver.run(); - } - LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? - } - - this.cachedValue = minSequence; - } - - this.nextValue = nextSequence; - - return nextSequence; - } - - /** - * See {@code RingBufferProducer.producerCapacity()}. - */ - @Override - public long getPending() { - long nextValue = this.nextValue; - - long consumed = RingBuffer.getMinimumSequence(gatingSequences, nextValue); - long produced = nextValue; - return produced - consumed; - } - - /** - * See {@code RingBufferProducer.publish(long)}. - */ - @Override - void publish(long sequence) { - cursor.set(sequence); - waitStrategy.signalAllWhenBlocking(); - } - - @Override - long getHighestPublishedSequence(long lowerBound, long availableSequence) { - return availableSequence; - } -} - -abstract class NotFunRingBufferFields extends RingBuffer -{ - private final long indexMask; - private final Object[] entries; - final int bufferSize; - final RingBufferProducer sequenceProducer; - - NotFunRingBufferFields(Supplier eventFactory, - RingBufferProducer sequenceProducer) - { - this.sequenceProducer = sequenceProducer; - this.bufferSize = sequenceProducer.getBufferSize(); - this.indexMask = bufferSize - 1; - this.entries = new Object[sequenceProducer.getBufferSize()]; - fill(eventFactory); - } - - private void fill(Supplier eventFactory) - { - for (int i = 0; i < bufferSize; i++) - { - entries[i] = eventFactory.get(); - } - } - - @SuppressWarnings("unchecked") - final E elementAt(long sequence) - { - return (E) entries[(int) (sequence & indexMask)]; - } -} - -/** - * Ring based store of reusable entries containing the data representing - * an event being exchanged between event producer and ringbuffer consumers. - * - * @param implementation storing the data for sharing during exchange or parallel coordination of an event. - */ -final class NotFunRingBuffer extends NotFunRingBufferFields -{ - /** - * Construct a RingBuffer with the full option set. - * - * @param eventFactory to newInstance entries for filling the RingBuffer - * @param sequenceProducer sequencer to handle the ordering of events moving through the RingBuffer. - * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2 - */ - NotFunRingBuffer(Supplier eventFactory, - RingBufferProducer sequenceProducer) - { - super(eventFactory, sequenceProducer); - } - - @Override - E get(long sequence) - { - return elementAt(sequence); - } - - @Override - long next() - { - return next(1); - } - - @Override - long next(int n) - { - return sequenceProducer.next(n); - } - - @Override - void addGatingSequence(Sequence gatingSequence) - { - sequenceProducer.addGatingSequence(gatingSequence); - } - - @Override - long getMinimumGatingSequence() - { - return getMinimumGatingSequence(null); - } - - @Override - long getMinimumGatingSequence(@Nullable Sequence sequence) - { - return sequenceProducer.getMinimumSequence(sequence); - } - - @Override - boolean removeGatingSequence(Sequence sequence) - { - return sequenceProducer.removeGatingSequence(sequence); - } - - @Override - Reader newReader() - { - return sequenceProducer.newBarrier(); - } - - @Override - long getCursor() - { - return sequenceProducer.getCursor(); - } - - @Override - int bufferSize() - { - return bufferSize; - } - - @Override - void publish(long sequence) - { - sequenceProducer.publish(sequence); - } - - @Override - int getPending() { - return (int)sequenceProducer.getPending(); - } - - @Override - RingBufferProducer getSequencer() { - return sequenceProducer; - } -} - -final class AtomicSequence extends RhsPadding - implements LongSupplier, RingBuffer.Sequence -{ - - private static final AtomicLongFieldUpdater UPDATER = - AtomicLongFieldUpdater.newUpdater(Value.class, "value"); - - /** - * Create a sequence with a specified initial value. - * - * @param initialValue The initial value for this sequence. - */ - AtomicSequence(final long initialValue) - { - UPDATER.lazySet(this, initialValue); - } - - @Override - public long getAsLong() - { - return value; - } - - @Override - public void set(final long value) - { - UPDATER.set(this, value); - } - - @Override - public boolean compareAndSet(final long expectedValue, final long newValue) - { - return UPDATER.compareAndSet(this, expectedValue, newValue); - } -} - -abstract class RingBufferPad extends RingBuffer -{ - protected long p1, p2, p3, p4, p5, p6, p7; -} - -abstract class RingBufferFields extends RingBufferPad -{ - private static final int BUFFER_PAD; - private static final long REF_ARRAY_BASE; - private static final int REF_ELEMENT_SHIFT; - private static final Unsafe UNSAFE = RingBuffer.getUnsafe(); - - static { - final int scale = UNSAFE.arrayIndexScale(Object[].class); - if (4 == scale) { - REF_ELEMENT_SHIFT = 2; - } else if (8 == scale) { - REF_ELEMENT_SHIFT = 3; - } else { - throw new IllegalStateException("Unknown pointer size"); - } - BUFFER_PAD = 128 / scale; - // Including the buffer pad in the array base offset - REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT); - } - - private final long indexMask; - private final Object[] entries; - protected final int bufferSize; - protected final RingBufferProducer sequenceProducer; - - RingBufferFields(Supplier eventFactory, - RingBufferProducer sequenceProducer) { - this.sequenceProducer = sequenceProducer; - this.bufferSize = sequenceProducer.getBufferSize(); - this.indexMask = bufferSize - 1; - this.entries = new Object[sequenceProducer.getBufferSize() + 2 * BUFFER_PAD]; - fill(eventFactory); - } - - private void fill(Supplier eventFactory) - { - for (int i = 0; i < bufferSize; i++) - { - entries[BUFFER_PAD + i] = eventFactory.get(); - } - } - - @SuppressWarnings("unchecked") - final E elementAt(long sequence) - { - return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); - } -} - -/** - * Ring based store of reusable entries containing the data representing - * an event being exchanged between event producer and ringbuffer consumers. - * - * @param implementation storing the data for sharing during exchange or parallel coordination of an event. - */ -final class UnsafeRingBuffer extends RingBufferFields -{ - protected long p1, p2, p3, p4, p5, p6, p7; - - /** - * Construct a RingBuffer with the full option set. - * - * @param eventFactory to newInstance entries for filling the RingBuffer - * @param sequenceProducer sequencer to handle the ordering of events moving through the RingBuffer. - * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2 - */ - UnsafeRingBuffer(Supplier eventFactory, - RingBufferProducer sequenceProducer) - { - super(eventFactory, sequenceProducer); - } - - @Override - E get(long sequence) - { - return elementAt(sequence); - } - - @Override - long next() - { - return sequenceProducer.next(); - } - - @Override - long next(int n) - { - return sequenceProducer.next(n); - } - - @Override - void addGatingSequence(Sequence gatingSequence) - { - sequenceProducer.addGatingSequence(gatingSequence); - } - - @Override - long getMinimumGatingSequence() - { - return getMinimumGatingSequence(null); - } - - @Override - long getMinimumGatingSequence(@Nullable Sequence sequence) - { - return sequenceProducer.getMinimumSequence(sequence); - } - - @Override - boolean removeGatingSequence(Sequence sequence) - { - return sequenceProducer.removeGatingSequence(sequence); - } - - @Override - Reader newReader() - { - return sequenceProducer.newBarrier(); - } - - @Override - long getCursor() - { - return sequenceProducer.getCursor(); - } - - @Override - int bufferSize() - { - return bufferSize; - } - - @Override - void publish(long sequence) - { - sequenceProducer.publish(sequence); - } - - @Override - int getPending() { - return (int)sequenceProducer.getPending(); - } - - @Override - RingBufferProducer getSequencer() { - return sequenceProducer; - } - - -} - -class LhsPadding -{ - protected long p1, p2, p3, p4, p5, p6, p7; -} - -class Value extends LhsPadding -{ - protected volatile long value; -} - -class RhsPadding extends Value -{ - protected long p9, p10, p11, p12, p13, p14, p15; -} - -/** - *

Concurrent sequence class used for tracking the progress of - * the ring buffer and event processors. Support a number - * of concurrent operations including CAS and order writes. - * - *

Also attempts to be more efficient with regards to false - * sharing by adding padding around the volatile field. - */ -final class UnsafeSequence extends RhsPadding - implements RingBuffer.Sequence, LongSupplier -{ - private static final Unsafe UNSAFE; - private static final long VALUE_OFFSET; - - static - { - UNSAFE = RingBuffer.getUnsafe(); - try - { - VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value")); - } - catch (final Exception e) - { - throw new RuntimeException(e); - } - } - - /** - * Create a sequence with a specified initial value. - * - * @param initialValue The initial value for this sequence. - */ - UnsafeSequence(final long initialValue) - { - UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue); - } - - @Override - public long getAsLong() - { - return value; - } - - @Override - public void set(final long value) - { - UNSAFE.putOrderedLong(this, VALUE_OFFSET, value); - } - - @Override - public boolean compareAndSet(final long expectedValue, final long newValue) - { - return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue); - } - -} - -/** - *

Coordinator for claiming sequences for access to a data structure while tracking dependent {@link RingBuffer.Sequence}s. - * Suitable for use for sequencing across multiple publisher threads.

- * - *

- *

Note on {@code RingBufferProducer.getCursor()}: With this sequencer the cursor value is updated after the call - * to {@code RingBufferProducer.next()}, to determine the highest available sequence that can be read, then - * {@code RingBufferProducer.getHighestPublishedSequence(long, long)} should be used. - */ -final class MultiProducerRingBuffer extends RingBufferProducer -{ - private static final Unsafe UNSAFE = RingBuffer.getUnsafe(); - private static final long BASE = UNSAFE.arrayBaseOffset(int[].class); - private static final long SCALE = UNSAFE.arrayIndexScale(int[].class); - - private final RingBuffer.Sequence gatingSequenceCache = new UnsafeSequence( - RingBuffer.INITIAL_CURSOR_VALUE); - - // availableBuffer tracks the state of each ringbuffer slot - // see below for more details on the approach - private final int[] availableBuffer; - private final int indexMask; - private final int indexShift; - - /** - * Construct a Sequencer with the selected wait strategy and buffer size. - * - * @param bufferSize the size of the buffer that this will sequence over. - * @param waitStrategy for those waiting on sequences. - */ - @SuppressWarnings("deprecation") - MultiProducerRingBuffer(int bufferSize, final WaitStrategy waitStrategy, Runnable spinObserver) { - super(bufferSize, waitStrategy, spinObserver); - availableBuffer = new int[bufferSize]; - indexMask = bufferSize - 1; - indexShift = RingBuffer.log2(bufferSize); - initialiseAvailableBuffer(); - } - - /** - * See {@code RingBufferProducer.next()}. - */ - @Override - long next() - { - return next(1); - } - - /** - * See {@code RingBufferProducer.next(int)}. - */ - @Override - long next(int n) - { - long current; - long next; - - do - { - current = cursor.getAsLong(); - next = current + n; - - long wrapPoint = next - bufferSize; - long cachedGatingSequence = gatingSequenceCache.getAsLong(); - - if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) - { - long gatingSequence = RingBuffer.getMinimumSequence(gatingSequences, current); - - if (wrapPoint > gatingSequence) - { - if(spinObserver != null) { - spinObserver.run(); - } - LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? - continue; - } - - gatingSequenceCache.set(gatingSequence); - } - else if (cursor.compareAndSet(current, next)) - { - break; - } - } - while (true); - - return next; - } - - /** - * See {@code RingBufferProducer.producerCapacity()}. - */ - @Override - long getPending() - { - long consumed = RingBuffer.getMinimumSequence(gatingSequences, cursor.getAsLong()); - long produced = cursor.getAsLong(); - return produced - consumed; - } - - private void initialiseAvailableBuffer() - { - for (int i = availableBuffer.length - 1; i != 0; i--) - { - setAvailableBufferValue(i, -1); - } - - setAvailableBufferValue(0, -1); - } - - /** - * See {@code RingBufferProducer.publish(long)}. - */ - @Override - void publish(final long sequence) - { - setAvailable(sequence); - waitStrategy.signalAllWhenBlocking(); - } - - /** - * The below methods work on the availableBuffer flag. - * - * The prime reason is to avoid a shared sequence object between publisher threads. - * (Keeping single pointers tracking start and end would require coordination - * between the threads). - * - * -- Firstly we have the constraint that the delta between the cursor and minimum - * gating sequence will never be larger than the buffer size (the code in - * next/tryNext in the Sequence takes care of that). - * -- Given that; take the sequence value and mask off the lower portion of the - * sequence as the index into the buffer (indexMask). (aka modulo operator) - * -- The upper portion of the sequence becomes the value to check for availability. - * ie: it tells us how many times around the ring buffer we've been (aka division) - * -- Because we can't wrap without the gating sequences moving forward (i.e. the - * minimum gating sequence is effectively our last available position in the - * buffer), when we have new data and successfully claimed a slot we can simply - * write over the top. - */ - private void setAvailable(final long sequence) - { - setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); - } - - private void setAvailableBufferValue(int index, int flag) - { - long bufferAddress = (index * SCALE) + BASE; - UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); - } - - /** - * See {@code RingBufferProducer.isAvailable(long)} - */ - boolean isAvailable(long sequence) - { - int index = calculateIndex(sequence); - int flag = calculateAvailabilityFlag(sequence); - long bufferAddress = (index * SCALE) + BASE; - return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag; - } - - @Override - long getHighestPublishedSequence(long lowerBound, long availableSequence) - { - for (long sequence = lowerBound; sequence <= availableSequence; sequence++) - { - if (!isAvailable(sequence)) - { - return sequence - 1; - } - } - - return availableSequence; - } - - private int calculateAvailabilityFlag(final long sequence) - { - return (int) (sequence >>> indexShift); - } - - private int calculateIndex(final long sequence) - { - return ((int) sequence) & indexMask; - } -} \ No newline at end of file diff --git a/reactor-core/src/main/java/reactor/util/concurrent/WaitStrategy.java b/reactor-core/src/main/java/reactor/util/concurrent/WaitStrategy.java deleted file mode 100644 index 80b5353c7d..0000000000 --- a/reactor-core/src/main/java/reactor/util/concurrent/WaitStrategy.java +++ /dev/null @@ -1,537 +0,0 @@ -/* - * Copyright (c) 2011-2019 Pivotal Software Inc, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.util.concurrent; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.LockSupport; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.LongSupplier; - -/** - * Strategy employed to wait for specific {@link LongSupplier} values with various spinning strategies. - * @deprecated Has been moved to io.projectreactor.addons:reactor-extra:3.3.0+ and will be removed in 3.4.0 - */ -@Deprecated -public abstract class WaitStrategy { - - /** - * Blocking strategy that uses a lock and condition variable for consumer waiting on a barrier. - * - * This strategy can be used when throughput and low-latency are not as important as CPU resource. - * @return the wait strategy - */ - public static WaitStrategy blocking() { - return new Blocking(); - } - - /** - * Busy Spin strategy that uses a busy spin loop for consumers waiting on a barrier. - * - * This strategy will use CPU resource to avoid syscalls which can introduce latency jitter. It is best - * used when threads can be bound to specific CPU cores. - * @return the wait strategy - */ - public static WaitStrategy busySpin() { - return BusySpin.INSTANCE; - } - - /** - * Test if exception is alert - * @param t exception checked - * @return true if this is an alert signal - */ - public static boolean isAlert(Throwable t){ - return t == AlertException.INSTANCE; - } - - /** - * Variation of the {@link #blocking()} that attempts to elide conditional wake-ups when the lock is uncontended. - * Shows performance improvements on microbenchmarks. However this wait strategy should be considered experimental - * as I have not full proved the correctness of the lock elision code. - * @return the wait strategy - */ - public static WaitStrategy liteBlocking() { - return new LiteBlocking(); - } - - /** - * Parking strategy that initially spins, then uses a Thread.yield(), and eventually sleep - * (LockSupport.parkNanos(1)) for the minimum number of nanos the OS and JVM will allow while the - * consumers are waiting on a barrier. - *

- * This strategy is a good compromise between performance and CPU resource. Latency spikes can occur after quiet - * periods. - * @return the wait strategy - */ - public static WaitStrategy parking() { - return Parking.INSTANCE; - } - - /** - * Parking strategy that initially spins, then uses a Thread.yield(), and eventually - * sleep (LockSupport.parkNanos(1)) for the minimum number of nanos the - * OS and JVM will allow while the consumers are waiting on a barrier. - *

- * This strategy is a good compromise between performance and CPU resource. Latency - * spikes can occur after quiet periods. - * - * @param retries the spin cycle count before parking - * @return the wait strategy - */ - public static WaitStrategy parking(int retries) { - return new Parking(retries); - } - - /** - *

Phased wait strategy for waiting consumers on a barrier.

- *

- *

This strategy can be used when throughput and low-latency are not as important as CPU resource. Spins, then - * yields, then waits using the configured fallback WaitStrategy.

- * - * @param spinTimeout the spin timeout - * @param yieldTimeout the yield timeout - * @param units the time unit - * @param delegate the target wait strategy to fallback on - * @return the wait strategy - */ - public static WaitStrategy phasedOff(long spinTimeout, long yieldTimeout, TimeUnit units, WaitStrategy delegate) { - return new PhasedOff(spinTimeout, yieldTimeout, units, delegate); - } - - /** - * Block with wait/notifyAll semantics - * @param spinTimeout the spin timeout - * @param yieldTimeout the yield timeout - * @param units the time unit - * @return the wait strategy - */ - public static WaitStrategy phasedOffLiteLock(long spinTimeout, long yieldTimeout, TimeUnit units) { - return phasedOff(spinTimeout, yieldTimeout, units, liteBlocking()); - } - - /** - * Block with wait/notifyAll semantics - * @param spinTimeout the spin timeout - * @param yieldTimeout the yield timeout - * @param units the time unit - * @return the wait strategy - */ - public static WaitStrategy phasedOffLock(long spinTimeout, long yieldTimeout, TimeUnit units) { - return phasedOff(spinTimeout, yieldTimeout, units, blocking()); - } - - /** - * Block by parking in a loop - * @param spinTimeout the spin timeout - * @param yieldTimeout the yield timeout - * @param units the time unit - * @return the wait strategy - */ - public static WaitStrategy phasedOffSleep(long spinTimeout, long yieldTimeout, TimeUnit units) { - return phasedOff(spinTimeout, yieldTimeout, units, parking(0)); - } - - /** - * Yielding strategy that uses a Thread.sleep(1) for consumers waiting on a - * barrier - * after an initially spinning. - * - * This strategy will incur up a latency of 1ms and save a maximum CPU resources. - * @return the wait strategy - */ - public static WaitStrategy sleeping() { - return Sleeping.INSTANCE; - } - - /** - * Yielding strategy that uses a Thread.yield() for consumers waiting on a - * barrier - * after an initially spinning. - * - * This strategy is a good compromise between performance and CPU resource without incurring significant latency spikes. - * @return the wait strategy - */ - public static WaitStrategy yielding() { - return Yielding.INSTANCE; - } - - /** - * Implementations should signal the waiting consumers that the cursor has advanced. - */ - public void signalAllWhenBlocking() { - } - - /** - * Wait for the given sequence to be available. It is possible for this method to return a value - * less than the sequence number supplied depending on the implementation of the WaitStrategy. A common - * use for this is to signal a timeout. Any EventProcessor that is using a WaitStrategy to get notifications - * about message becoming available should remember to handle this case. - * - * @param sequence to be waited on. - * @param cursor the main sequence from ringbuffer. Wait/notify strategies will - * need this as is notified upon update. - * @param spinObserver Spin observer - * @return the sequence that is available which may be greater than the requested sequence. - * @throws InterruptedException if the thread is interrupted. - */ - public abstract long waitFor(long sequence, LongSupplier cursor, Runnable spinObserver) - throws InterruptedException; - - /** - * Throw an Alert signal exception (singleton) that can be checked against {@link #isAlert(Throwable)} - */ - public static void alert(){ - throw AlertException.INSTANCE; - } - - /** - * Used to alert consumers waiting with a {@link WaitStrategy} for status changes. - *

- * It does not fill in a stack trace for performance reasons. - */ - @SuppressWarnings("serial") - static final class AlertException extends RuntimeException { - /** Pre-allocated exception to avoid garbage generation */ - public static final AlertException INSTANCE = new AlertException(); - - /** - * Private constructor so only a single instance any. - */ - private AlertException() { - } - - /** - * Overridden so the stack trace is not filled in for this exception for performance reasons. - * - * @return this instance. - */ - @Override - public Throwable fillInStackTrace() { - return this; - } - - } - - final static class Blocking extends WaitStrategy { - - private final Lock lock = new ReentrantLock(); - private final Condition processorNotifyCondition = lock.newCondition(); - - @Override - public void signalAllWhenBlocking() - { - lock.lock(); - try - { - processorNotifyCondition.signalAll(); - } - finally - { - lock.unlock(); - } - } - - @SuppressWarnings("UnusedAssignment") //for availableSequence - @Override - public long waitFor(long sequence, LongSupplier cursorSequence, Runnable barrier) - throws InterruptedException - { - long availableSequence; - if ((availableSequence = cursorSequence.getAsLong()) < sequence) - { - lock.lock(); - try - { - while ((availableSequence = cursorSequence.getAsLong()) < sequence) - { - barrier.run(); - processorNotifyCondition.await(); - } - } - finally - { - lock.unlock(); - } - } - - while ((availableSequence = cursorSequence.getAsLong()) < sequence) - { - barrier.run(); - } - - return availableSequence; - } - } - - final static class BusySpin extends WaitStrategy { - - static final WaitStrategy.BusySpin - INSTANCE = new WaitStrategy.BusySpin(); - - @Override - public long waitFor(final long sequence, LongSupplier cursor, final Runnable barrier) - throws InterruptedException - { - long availableSequence; - - while ((availableSequence = cursor.getAsLong()) < sequence) - { - barrier.run(); - } - - return availableSequence; - } - } - - final static class Sleeping extends WaitStrategy { - - static final WaitStrategy.Sleeping - INSTANCE = new WaitStrategy.Sleeping(); - - @Override - public long waitFor(final long sequence, - LongSupplier cursor, - final Runnable barrier) - throws InterruptedException { - long availableSequence; - - while ((availableSequence = cursor.getAsLong()) < sequence) { - barrier.run(); - Thread.sleep(1); - } - - return availableSequence; - } - } - - final static class LiteBlocking extends WaitStrategy { - - private final Lock lock = new ReentrantLock(); - private final Condition processorNotifyCondition = lock.newCondition(); - private final AtomicBoolean signalNeeded = new AtomicBoolean(false); - - @Override - public void signalAllWhenBlocking() - { - if (signalNeeded.getAndSet(false)) - { - lock.lock(); - try - { - processorNotifyCondition.signalAll(); - } - finally - { - lock.unlock(); - } - } - } - - @SuppressWarnings("UnusedAssignment") //for availableSequence - @Override - public long waitFor(long sequence, LongSupplier cursorSequence, Runnable barrier) - throws InterruptedException - { - long availableSequence; - if ((availableSequence = cursorSequence.getAsLong()) < sequence) - { - lock.lock(); - - try - { - do - { - signalNeeded.getAndSet(true); - - if ((availableSequence = cursorSequence.getAsLong()) >= sequence) - { - break; - } - - barrier.run(); - processorNotifyCondition.await(); - } - while ((availableSequence = cursorSequence.getAsLong()) < sequence); - } - finally - { - lock.unlock(); - } - } - - while ((availableSequence = cursorSequence.getAsLong()) < sequence) - { - barrier.run(); - } - - return availableSequence; - } - } - - final static class PhasedOff extends WaitStrategy { - - private final long spinTimeoutNanos; - private final long yieldTimeoutNanos; - private final WaitStrategy fallbackStrategy; - PhasedOff(long spinTimeout, long yieldTimeout, - TimeUnit units, - WaitStrategy fallbackStrategy) - { - this.spinTimeoutNanos = units.toNanos(spinTimeout); - this.yieldTimeoutNanos = spinTimeoutNanos + units.toNanos(yieldTimeout); - this.fallbackStrategy = fallbackStrategy; - } - - @Override - public void signalAllWhenBlocking() - { - fallbackStrategy.signalAllWhenBlocking(); - } - - @Override - public long waitFor(long sequence, LongSupplier cursor, Runnable barrier) - throws InterruptedException - { - long availableSequence; - long startTime = 0; - int counter = SPIN_TRIES; - - do - { - if ((availableSequence = cursor.getAsLong()) >= sequence) - { - return availableSequence; - } - - if (0 == --counter) - { - if (0 == startTime) - { - startTime = System.nanoTime(); - } - else - { - long timeDelta = System.nanoTime() - startTime; - if (timeDelta > yieldTimeoutNanos) - { - return fallbackStrategy.waitFor(sequence, cursor, barrier); - } - else if (timeDelta > spinTimeoutNanos) - { - Thread.yield(); - } - } - counter = SPIN_TRIES; - } - barrier.run(); - } - while (true); - } - private static final int SPIN_TRIES = 10000; - } - - final static class Parking extends WaitStrategy { - - static final WaitStrategy.Parking - INSTANCE = new WaitStrategy.Parking(); - - private final int retries; - - Parking() { - this(DEFAULT_RETRIES); - } - - Parking(int retries) { - this.retries = retries; - } - - @Override - public long waitFor(final long sequence, LongSupplier cursor, final Runnable barrier) - throws InterruptedException - { - long availableSequence; - int counter = retries; - - while ((availableSequence = cursor.getAsLong()) < sequence) - { - counter = applyWaitMethod(barrier, counter); - } - - return availableSequence; - } - - private int applyWaitMethod(final Runnable barrier, int counter) - throws WaitStrategy.AlertException - { - barrier.run(); - - if (counter > 100) - { - --counter; - } - else if (counter > 0) - { - --counter; - Thread.yield(); - } - else - { - LockSupport.parkNanos(1L); - } - - return counter; - } - private static final int DEFAULT_RETRIES = 200; - } - - final static class Yielding extends WaitStrategy { - - static final WaitStrategy.Yielding - INSTANCE = new WaitStrategy.Yielding(); - - @Override - public long waitFor(final long sequence, LongSupplier cursor, final Runnable barrier) - throws InterruptedException { - long availableSequence; - int counter = SPIN_TRIES; - - while ((availableSequence = cursor.getAsLong()) < sequence) { - counter = applyWaitMethod(barrier, counter); - } - - return availableSequence; - } - - private int applyWaitMethod(final Runnable barrier, int counter) - throws WaitStrategy.AlertException { - barrier.run(); - - if (0 == counter) { - Thread.yield(); - } - else { - --counter; - } - - return counter; - } - - private static final int SPIN_TRIES = 100; - } -} diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryPredicateTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryPredicateTest.java deleted file mode 100644 index 43110b8e55..0000000000 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryPredicateTest.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package reactor.core.publisher; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; - -import org.junit.Test; - -import org.reactivestreams.Subscription; -import reactor.core.CoreSubscriber; -import reactor.core.Scannable; -import reactor.test.StepVerifier; -import reactor.test.subscriber.AssertSubscriber; - -import static org.assertj.core.api.Assertions.assertThat; - -@SuppressWarnings("deprecation") //retry with Predicate are deprecated, the underlying implementation is going to be removed ultimately -public class FluxRetryPredicateTest { - - final Flux source = Flux.concat(Flux.range(1, 5), - Flux.error(new RuntimeException("forced failure 0"))); - - @Test(expected = NullPointerException.class) - public void sourceNull() { - new FluxRetryPredicate<>(null, e -> true); - } - - @Test(expected = NullPointerException.class) - public void predicateNull() { - Flux.never() - .retry((Predicate) null); - } - - @Test - public void normal() { - int[] times = {1}; - - AssertSubscriber ts = AssertSubscriber.create(); - - source.retry(e -> times[0]-- > 0) - .subscribe(ts); - - ts.assertValues(1, 2, 3, 4, 5, 1, 2, 3, 4, 5) - .assertError(RuntimeException.class) - .assertErrorMessage("forced failure 0") - .assertNotComplete(); - } - - @Test - public void normalBackpressured() { - int[] times = {1}; - - AssertSubscriber ts = AssertSubscriber.create(0); - - source.retry(e -> times[0]-- > 0) - .subscribe(ts); - - ts.assertNoValues() - .assertNoError() - .assertNotComplete(); - - ts.request(2); - - ts.assertValues(1, 2) - .assertNoError() - .assertNotComplete(); - - ts.request(5); - - ts.assertValues(1, 2, 3, 4, 5, 1, 2) - .assertNoError() - .assertNotComplete(); - - ts.request(10); - - ts.assertValues(1, 2, 3, 4, 5, 1, 2, 3, 4, 5) - .assertError(RuntimeException.class) - .assertErrorMessage("forced failure 0") - .assertNotComplete(); - } - - @Test - public void dontRepeat() { - AssertSubscriber ts = AssertSubscriber.create(); - - source.retry(e -> false) - .subscribe(ts); - - ts.assertValues(1, 2, 3, 4, 5) - .assertError(RuntimeException.class) - .assertErrorMessage("forced failure 0") - .assertNotComplete(); - } - - @Test - public void predicateThrows() { - AssertSubscriber ts = AssertSubscriber.create(); - - source.retry(e -> { - throw new RuntimeException("forced failure"); - }) - .subscribe(ts); - - ts.assertValues(1, 2, 3, 4, 5) - .assertError(RuntimeException.class) - .assertErrorMessage("forced failure") - .assertNotComplete(); - } - - @Test - public void twoRetryNormal() { - AtomicInteger i = new AtomicInteger(); - - StepVerifier.create(Flux.just("test", "test2", "test3") - .doOnNext(d -> { - if (i.getAndIncrement() < 2) { - throw new RuntimeException("test"); - } - }) - .retry(e -> i.get() <= 2) - .count()) - .expectNext(3L) - .expectComplete() - .verify(); - } - - - @Test - public void twoRetryNormalSupplier() { - AtomicInteger i = new AtomicInteger(); - AtomicBoolean bool = new AtomicBoolean(true); - - StepVerifier.create(Flux.defer(() -> Flux.just(i.incrementAndGet())) - .doOnNext(v -> { - if(v < 4) { - throw new RuntimeException("test"); - } - else { - bool.set(false); - } - }) - .retry(3, e -> bool.get())) - .expectNext(4) - .expectComplete() - .verify(); - } - - @Test - public void twoRetryErrorSupplier() { - AtomicInteger i = new AtomicInteger(); - AtomicBoolean bool = new AtomicBoolean(true); - - StepVerifier.create(Flux.defer(() -> Flux.just(i.incrementAndGet())) - .doOnNext(v -> { - if(v < 4) { - if( v > 2){ - bool.set(false); - } - throw new RuntimeException("test"); - } - }) - .retry(3, e -> bool.get())) - .verifyErrorMessage("test"); - } - - @Test - public void twoRetryNormalSupplier3() { - AtomicInteger i = new AtomicInteger(); - AtomicBoolean bool = new AtomicBoolean(true); - - StepVerifier.create(Flux.defer(() -> Flux.just(i.incrementAndGet())) - .doOnNext(v -> { - if(v < 4) { - throw new RuntimeException("test"); - } - else { - bool.set(false); - } - }) - .retry(2, e -> bool.get())) - .verifyErrorMessage("test"); - } - - @Test - public void twoRetryNormalSupplier2() { - AtomicInteger i = new AtomicInteger(); - AtomicBoolean bool = new AtomicBoolean(true); - - StepVerifier.create(Flux.defer(() -> Flux.just(i.incrementAndGet())) - .doOnNext(v -> { - if(v < 4) { - throw new RuntimeException("test"); - } - else { - bool.set(false); - } - }) - .retry(0, e -> bool.get())) - .expectNext(4) - .expectComplete() - .verify(); - } - - @Test - public void twoRetryErrorSupplier2() { - AtomicInteger i = new AtomicInteger(); - AtomicBoolean bool = new AtomicBoolean(true); - - StepVerifier.create(Flux.defer(() -> Flux.just(i.incrementAndGet())) - .doOnNext(v -> { - if(v < 4) { - if( v > 2){ - bool.set(false); - } - throw new RuntimeException("test"); - } - }) - .retry(0, e -> bool.get())) - .verifyErrorMessage("test"); - } - - @Test - public void scanOperator(){ - Flux parent = Flux.just(1); - FluxRetryPredicate test = new FluxRetryPredicate<>(parent, p -> true); - - assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent); - assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); - } - - @Test - public void scanSubscriber(){ - CoreSubscriber actual = new LambdaSubscriber<>(null, e -> {}, null, null); - Flux source = Flux.just(1); - FluxRetryPredicate.RetryPredicateSubscriber test = - new FluxRetryPredicate.RetryPredicateSubscriber<>(source, actual, p -> true); - - Subscription parent = Operators.emptySubscription(); - test.onSubscribe(parent); - - assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent); - assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual); - assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); - } -} diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java index 54a9b230b4..dd9beae028 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java @@ -31,6 +31,7 @@ import org.assertj.core.api.LongAssert; import org.assertj.core.data.Percentage; import org.junit.Test; +import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -56,16 +57,176 @@ public class FluxRetryWhenTest { Flux rangeError = Flux.concat(Flux.range(1, 2), Flux.error(new RuntimeException("forced failure 0"))); - @Test(expected = NullPointerException.class) - public void sourceNull() { - new FluxRetryWhen<>(null, Retry.from(v -> v)); + @Test + public void dontRepeat() { + AssertSubscriber ts = AssertSubscriber.create(); + + rangeError.retryWhen(Retry.indefinitely().filter(e -> false)) + .subscribe(ts); + + ts.assertValues(1, 2) + .assertError(RuntimeException.class) + .assertErrorMessage("forced failure 0") + .assertNotComplete(); + } + + @Test + public void predicateThrows() { + AssertSubscriber ts = AssertSubscriber.create(); + + rangeError + .retryWhen( + Retry.indefinitely() + .filter(e -> { + throw new RuntimeException("forced failure"); + }) + ) + .subscribe(ts); + + ts.assertValues(1, 2) + .assertError(RuntimeException.class) + .assertErrorMessage("forced failure") + .assertNotComplete(); + } + + @Test + public void twoRetryNormal() { + AtomicInteger i = new AtomicInteger(); + + Mono source = Flux + .just("test", "test2", "test3") + .doOnNext(d -> { + if (i.getAndIncrement() < 2) { + throw new RuntimeException("test"); + } + }) + .retryWhen(Retry.indefinitely().filter(e -> i.get() <= 2)) + .count(); + + StepVerifier.create(source) + .expectNext(3L) + .expectComplete() + .verify(); + } + + + @Test + public void twoRetryNormalSupplier() { + AtomicInteger i = new AtomicInteger(); + AtomicBoolean bool = new AtomicBoolean(true); + + Flux source = Flux.defer(() -> { + return Flux.defer(() -> Flux.just(i.incrementAndGet())) + .doOnNext(v -> { + if (v < 4) { + throw new RuntimeException("test"); + } + else { + bool.set(false); + } + }) + .retryWhen(Retry.indefinitely().filter(Flux.countingPredicate(e -> bool.get(), 3))); + }); + + StepVerifier.create(source) + .expectNext(4) + .expectComplete() + .verify(); + } + + @Test + public void twoRetryErrorSupplier() { + AtomicInteger i = new AtomicInteger(); + AtomicBoolean bool = new AtomicBoolean(true); + + Flux source = Flux.defer(() -> { + return Flux.defer(() -> Flux.just(i.incrementAndGet())) + .doOnNext(v -> { + if (v < 4) { + if (v > 2) { + bool.set(false); + } + throw new RuntimeException("test"); + } + }) + .retryWhen(Retry.indefinitely().filter(Flux.countingPredicate(e -> bool.get(), 3))); + }); + + StepVerifier.create(source) + .verifyErrorMessage("test"); + } + + @Test + public void twoRetryNormalSupplier3() { + AtomicInteger i = new AtomicInteger(); + AtomicBoolean bool = new AtomicBoolean(true); + + Flux source = Flux.defer(() -> { + return Flux.defer(() -> Flux.just(i.incrementAndGet())) + .doOnNext(v -> { + if (v < 4) { + throw new RuntimeException("test"); + } + else { + bool.set(false); + } + }) + .retryWhen(Retry.indefinitely().filter(Flux.countingPredicate(e -> bool.get(), 2))); + }); + + StepVerifier.create(source) + .verifyErrorMessage("test"); + } + + @Test + public void twoRetryNormalSupplier2() { + AtomicInteger i = new AtomicInteger(); + AtomicBoolean bool = new AtomicBoolean(true); + + Flux source = Flux.defer(() -> { + return Flux.defer(() -> Flux.just(i.incrementAndGet())) + .doOnNext(v -> { + if (v < 4) { + throw new RuntimeException("test"); + } + else { + bool.set(false); + } + }) + .retryWhen(Retry.indefinitely().filter(Flux.countingPredicate(e -> bool.get(), 0))); + }); + + StepVerifier.create(source) + .expectNext(4) + .expectComplete() + .verify(); + } + + @Test + public void twoRetryErrorSupplier2() { + AtomicInteger i = new AtomicInteger(); + AtomicBoolean bool = new AtomicBoolean(true); + + Flux source = Flux.defer(() -> { + return Flux.defer(() -> Flux.just(i.incrementAndGet())) + .doOnNext(v -> { + if (v < 4) { + if (v > 2) { + bool.set(false); + } + throw new RuntimeException("test"); + } + }) + .retryWhen(Retry.indefinitely().filter(Flux.countingPredicate(e -> bool.get(), 0))); + }); + + StepVerifier.create(source) + .verifyErrorMessage("test"); } - @SuppressWarnings({"deprecation", "unchecked", "rawtypes", "ConstantConditions"}) @Test(expected = NullPointerException.class) - public void whenThrowableFactoryNull() { - Flux.never() - .retryWhen((Function) null); + public void sourceNull() { + new FluxRetryWhen<>(null, Retry.from(v -> v)); } @SuppressWarnings("ConstantConditions") @@ -889,7 +1050,7 @@ public void retryWhenThrowableCompanionIsComparableToRetryWhenRetryFromFunction( } }); - StepVerifier.withVirtualTime(() -> source.retryWhen(companion -> companion.delayElements(Duration.ofSeconds(3)))) + StepVerifier.withVirtualTime(() -> source.retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(3)))) .expectSubscription() .expectNoEvent(Duration.ofSeconds(3 * 3)) .expectNext(1, 2, 3) diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxUsingWhenTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxUsingWhenTest.java index 32af8bf806..4e7349be42 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxUsingWhenTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxUsingWhenTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; +import java.util.function.Function; import java.util.logging.Level; import org.awaitility.Awaitility; @@ -463,10 +464,13 @@ public void cancelWithoutHandlerAppliesCommit(Flux source) { TestResource testResource = new TestResource(); Flux test = Flux - .usingWhen(Mono.just(testResource).hide(), + .usingWhen( + Mono.just(testResource).hide(), tr -> source, TestResource::commit, - tr -> tr.rollback(new RuntimeException("placeholder rollback exception"))) + (tr, e) -> tr.rollback(new RuntimeException("placeholder rollback exception")), + TestResource::commit + ) .take(2); StepVerifier.create(test) @@ -487,14 +491,20 @@ public void cancelDefaultHandlerFailure(Flux source) { Loggers.useCustomLoggers(name -> tl); try { - Flux test = Flux.usingWhen(Mono.just(testResource), - tr -> source, - r -> r.commit() - //immediate error to trigger the logging within the test - .concatWith(Mono.error(new IllegalStateException("commit error"))), - r -> r.rollback(new RuntimeException("placeholder ignored rollback exception")) - ) - .take(2); + Function> completeOrCancel = r -> { + return r.commit() + //immediate error to trigger the logging within the test + .concatWith(Mono.error(new IllegalStateException("commit error"))); + }; + Flux test = Flux + .usingWhen( + Mono.just(testResource), + tr -> source, + completeOrCancel, + (r, e) -> r.rollback(new RuntimeException("placeholder ignored rollback exception")), + completeOrCancel + ) + .take(2); StepVerifier.create(test) .expectNext("0", "1") diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoRetryPredicateTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoRetryPredicateTest.java deleted file mode 100644 index a2a7818ed2..0000000000 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoRetryPredicateTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.core.publisher; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Test; -import reactor.core.Scannable; -import reactor.test.StepVerifier; - -import static org.assertj.core.api.Assertions.assertThat; - -public class MonoRetryPredicateTest { - - @SuppressWarnings("deprecation") - @Test - public void twoRetryNormalSupplier() { - AtomicInteger i = new AtomicInteger(); - AtomicBoolean bool = new AtomicBoolean(true); - - StepVerifier.create(Mono.fromCallable(i::incrementAndGet) - .doOnNext(v -> { - if(v < 4) { - throw new RuntimeException("test"); - } - else { - bool.set(false); - } - }) - .retry(3, e -> bool.get())) - .expectNext(4) - .expectComplete() - .verify(); - } - - @SuppressWarnings("deprecation") - @Test - public void twoRetryErrorSupplier() { - AtomicInteger i = new AtomicInteger(); - AtomicBoolean bool = new AtomicBoolean(true); - - StepVerifier.create(Mono.fromCallable(i::incrementAndGet) - .doOnNext(v -> { - if(v < 4) { - if( v > 2){ - bool.set(false); - } - throw new RuntimeException("test"); - } - }) - .retry(3, e -> bool.get())) - .verifyErrorMessage("test"); - } - - @Test - public void scanOperator(){ - MonoRetryPredicate test = new MonoRetryPredicate<>(Mono.just("foo"), e -> true); - - assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); - } -} \ No newline at end of file diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoRetryWhenTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoRetryWhenTest.java index b537874d31..76d9d3560a 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoRetryWhenTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoRetryWhenTest.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.assertj.core.data.Percentage; @@ -36,6 +37,49 @@ public class MonoRetryWhenTest { + @Test + public void twoRetryNormalSupplier() { + AtomicInteger i = new AtomicInteger(); + AtomicBoolean bool = new AtomicBoolean(true); + + Mono source = Mono + .fromCallable(i::incrementAndGet) + .doOnNext(v -> { + if (v < 4) { + throw new RuntimeException("test"); + } + else { + bool.set(false); + } + }) + .retryWhen(Retry.max(3).filter(e -> bool.get())); + + StepVerifier.create(source) + .expectNext(4) + .expectComplete() + .verify(); + } + + @Test + public void twoRetryErrorSupplier() { + AtomicInteger i = new AtomicInteger(); + AtomicBoolean bool = new AtomicBoolean(true); + + Mono source = Mono + .fromCallable(i::incrementAndGet) + .doOnNext(v -> { + if (v < 4) { + if (v > 2) { + bool.set(false); + } + throw new RuntimeException("test"); + } + }) + .retryWhen(Retry.max(3).filter(e -> bool.get())); + + StepVerifier.create(source) + .verifyErrorMessage("test"); + } Mono exponentialRetryScenario() { AtomicInteger i = new AtomicInteger(); diff --git a/reactor-test/build.gradle b/reactor-test/build.gradle index 0954c4408f..094f55a07c 100644 --- a/reactor-test/build.gradle +++ b/reactor-test/build.gradle @@ -86,6 +86,9 @@ task japicmp(type: JapicmpTask) { includeSynthetic = true //TODO after a release, remove the reactor-test exclusions below if any + methodExcludes = [ + 'reactor.test.StepVerifier$FirstStep#expectNoEvent(java.time.Duration)', + ] } tasks.withType(Test).all { diff --git a/reactor-test/src/main/java/reactor/test/StepVerifier.java b/reactor-test/src/main/java/reactor/test/StepVerifier.java index 0a5b40d988..a7d2cd2294 100644 --- a/reactor-test/src/main/java/reactor/test/StepVerifier.java +++ b/reactor-test/src/main/java/reactor/test/StepVerifier.java @@ -1021,28 +1021,6 @@ interface FirstStep extends Step { */ Step expectNoFusionSupport(); - /** - * Expect no event and no Subscription has been observed by the verifier for the - * length of the provided {@link Duration}. If virtual time is used, this duration - * is verified using the virtual clock. - *

- * Note that you should only use this method as the first expectation if you - * actually don't expect a subscription to happen. Use - * {@link FirstStep#expectSubscription()} combined with {@link Step#expectNoEvent(Duration)} - * to work around that. - *

- * Also avoid using this method at the end of the set of expectations: - * prefer {@link #expectTimeout(Duration)} rather than {@code expectNoEvent(...).thenCancel()}. - * - * @param duration the duration for which to observe no event has been received - * - * @return this builder - * @deprecated should probably always first use {@link #expectSubscription()} or equivalent - */ - @Override - @Deprecated - FirstStep expectNoEvent(Duration duration); - /** * Expect a {@link Subscription}. * Effectively behave as the default implicit {@link Subscription} expectation.