Skip to content

Commit

Permalink
Change FnApiDoFnRunner to skip trySplit checkpoint requests if not dr…
Browse files Browse the repository at this point in the history
…aining and nothing has yet been claimed by the tracker. (#32044)
  • Loading branch information
scwhittle authored Aug 14, 2024
1 parent c23e603 commit 8fbad48
Show file tree
Hide file tree
Showing 2 changed files with 485 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -118,6 +119,7 @@
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.util.Durations;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -327,6 +329,11 @@ static class Factory<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT,
* otherwise.
*/
private RestrictionTracker<RestrictionT, PositionT> currentTracker;
/**
* If non-null, set to true after currentTracker has had a tryClaim issued on it. Used to ignore
* checkpoint split requests if no progress was made.
*/
private @Nullable AtomicBoolean currentTrackerClaimed;

/**
* Only valid during {@link #processTimer} and {@link #processOnWindowExpiration}, null otherwise.
Expand Down Expand Up @@ -877,12 +884,18 @@ private void processElementForSplitRestriction(
currentElement = elem.withValue(elem.getValue().getKey());
currentRestriction = elem.getValue().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getValue().getValue();
currentTrackerClaimed = new AtomicBoolean(false);
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
new ClaimObserver<PositionT>() {
private final AtomicBoolean claimed =
Preconditions.checkNotNull(currentTrackerClaimed);

@Override
public void onClaimed(PositionT position) {}
public void onClaimed(PositionT position) {
claimed.lazySet(true);
}

@Override
public void onClaimFailed(PositionT position) {}
Expand All @@ -894,6 +907,7 @@ public void onClaimFailed(PositionT position) {}
currentRestriction = null;
currentWatermarkEstimatorState = null;
currentTracker = null;
currentTrackerClaimed = null;
}

this.stateAccessor.finalizeState();
Expand All @@ -909,12 +923,18 @@ private void processElementForWindowObservingSplitRestriction(
(Iterator<BoundedWindow>) elem.getWindows().iterator();
while (windowIterator.hasNext()) {
currentWindow = windowIterator.next();
currentTrackerClaimed = new AtomicBoolean(false);
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
new ClaimObserver<PositionT>() {
private final AtomicBoolean claimed =
Preconditions.checkNotNull(currentTrackerClaimed);

@Override
public void onClaimed(PositionT position) {}
public void onClaimed(PositionT position) {
claimed.lazySet(true);
}

@Override
public void onClaimFailed(PositionT position) {}
Expand All @@ -927,6 +947,7 @@ public void onClaimFailed(PositionT position) {}
currentWatermarkEstimatorState = null;
currentWindow = null;
currentTracker = null;
currentTrackerClaimed = null;
}

this.stateAccessor.finalizeState();
Expand All @@ -937,6 +958,8 @@ private void processElementForTruncateRestriction(
currentElement = elem.withValue(elem.getValue().getKey().getKey());
currentRestriction = elem.getValue().getKey().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
// For truncation, we don't set currentTrackerClaimed so that we enable checkpointing even if no
// progress is made.
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
Expand Down Expand Up @@ -989,6 +1012,8 @@ private void processElementForWindowObservingTruncateRestriction(
currentRestriction = elem.getValue().getKey().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
currentWindow = currentWindows.get(windowCurrentIndex);
// We leave currentTrackerClaimed unset as we want to split regardless of if tryClaim is
// called.
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
Expand Down Expand Up @@ -1081,12 +1106,18 @@ private void processElementForWindowObservingSizedElementAndRestriction(
currentRestriction = elem.getValue().getKey().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
currentWindow = currentWindows.get(windowCurrentIndex);
currentTrackerClaimed = new AtomicBoolean(false);
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
new ClaimObserver<PositionT>() {
private final AtomicBoolean claimed =
Preconditions.checkNotNull(currentTrackerClaimed);

@Override
public void onClaimed(PositionT position) {}
public void onClaimed(PositionT position) {
claimed.lazySet(true);
}

@Override
public void onClaimFailed(PositionT position) {}
Expand All @@ -1107,7 +1138,7 @@ public void onClaimFailed(PositionT position) {}

// Attempt to checkpoint the current restriction.
HandlesSplits.SplitResult splitResult =
trySplitForElementAndRestriction(0, continuation.resumeDelay());
trySplitForElementAndRestriction(0, continuation.resumeDelay(), false);

/**
* After the user has chosen to resume processing later, either the restriction is already
Expand All @@ -1132,7 +1163,7 @@ private abstract class SplittableFnDataReceiver
implements HandlesSplits, FnDataReceiver<WindowedValue> {
@Override
public HandlesSplits.SplitResult trySplit(double fractionOfRemainder) {
return trySplitForElementAndRestriction(fractionOfRemainder, Duration.ZERO);
return trySplitForElementAndRestriction(fractionOfRemainder, Duration.ZERO, true);
}

@Override
Expand Down Expand Up @@ -1278,6 +1309,13 @@ private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
if (currentWindow == null) {
return null;
}
// We are requesting a checkpoint but have not yet progressed on the restriction, skip
// request.
if (fractionOfRemainder == 0
&& currentTrackerClaimed != null
&& !currentTrackerClaimed.get()) {
return null;
}

SplitResultsWithStopIndex splitResult =
computeSplitForProcessOrTruncate(
Expand Down Expand Up @@ -1620,14 +1658,21 @@ static <WatermarkEstimatorStateT> HandlesSplits.SplitResult constructSplitResult
}

private HandlesSplits.SplitResult trySplitForElementAndRestriction(
double fractionOfRemainder, Duration resumeDelay) {
double fractionOfRemainder, Duration resumeDelay, boolean requireClaimForCheckpoint) {
KV<Instant, WatermarkEstimatorStateT> watermarkAndState;
WindowedSplitResult windowedSplitResult = null;
synchronized (splitLock) {
// There is nothing to split if we are between element and restriction processing calls.
if (currentTracker == null) {
return null;
}
// The tracker has not yet been claimed meaning that a checkpoint won't meaningfully advance.
if (fractionOfRemainder == 0
&& requireClaimForCheckpoint
&& currentTrackerClaimed != null
&& !currentTrackerClaimed.get()) {
return null;
}
// Make sure to get the output watermark before we split to ensure that the lower bound
// applies to the residual.
watermarkAndState = currentWatermarkEstimator.getWatermarkAndState();
Expand Down
Loading

0 comments on commit 8fbad48

Please sign in to comment.