Skip to content

Commit

Permalink
Change FnApiDoFnRunner to skip trySplit checkpoint requests if not dr…
Browse files Browse the repository at this point in the history
…aining and nothing has yet been claimed by the tracker.
  • Loading branch information
scwhittle committed Aug 1, 2024
1 parent 5d13894 commit 20dc4dc
Show file tree
Hide file tree
Showing 2 changed files with 313 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -327,6 +328,11 @@ static class Factory<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT,
* otherwise.
*/
private RestrictionTracker<RestrictionT, PositionT> currentTracker;
/**
* If non-null, set to true after currentTracker has had a tryClaim issued on it. Used to ignore
* checkpoint split requests if no progress was made.
*/
private AtomicBoolean currentTrackerClaimed;

/**
* Only valid during {@link #processTimer} and {@link #processOnWindowExpiration}, null otherwise.
Expand Down Expand Up @@ -877,12 +883,15 @@ private void processElementForSplitRestriction(
currentElement = elem.withValue(elem.getValue().getKey());
currentRestriction = elem.getValue().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getValue().getValue();
currentTrackerClaimed = new AtomicBoolean(false);
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
new ClaimObserver<PositionT>() {
@Override
public void onClaimed(PositionT position) {}
public void onClaimed(PositionT position) {
currentTrackerClaimed.lazySet(true);
}

@Override
public void onClaimFailed(PositionT position) {}
Expand All @@ -894,6 +903,7 @@ public void onClaimFailed(PositionT position) {}
currentRestriction = null;
currentWatermarkEstimatorState = null;
currentTracker = null;
currentTrackerClaimed = null;
}

this.stateAccessor.finalizeState();
Expand All @@ -909,12 +919,15 @@ private void processElementForWindowObservingSplitRestriction(
(Iterator<BoundedWindow>) elem.getWindows().iterator();
while (windowIterator.hasNext()) {
currentWindow = windowIterator.next();
currentTrackerClaimed = new AtomicBoolean(false);
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
new ClaimObserver<PositionT>() {
@Override
public void onClaimed(PositionT position) {}
public void onClaimed(PositionT position) {
currentTrackerClaimed.lazySet(true);
}

@Override
public void onClaimFailed(PositionT position) {}
Expand All @@ -927,6 +940,7 @@ public void onClaimFailed(PositionT position) {}
currentWatermarkEstimatorState = null;
currentWindow = null;
currentTracker = null;
currentTrackerClaimed = null;
}

this.stateAccessor.finalizeState();
Expand All @@ -937,6 +951,8 @@ private void processElementForTruncateRestriction(
currentElement = elem.withValue(elem.getValue().getKey().getKey());
currentRestriction = elem.getValue().getKey().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
// For truncation, we don't set currentTrackerClaimed so that we enable checkpointing even if no
// progress is made.
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
Expand Down Expand Up @@ -989,6 +1005,8 @@ private void processElementForWindowObservingTruncateRestriction(
currentRestriction = elem.getValue().getKey().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
currentWindow = currentWindows.get(windowCurrentIndex);
// We leave currentTrackerClaimed unset as we want to split regardless of if tryClaim is
// called.
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
Expand Down Expand Up @@ -1081,12 +1099,15 @@ private void processElementForWindowObservingSizedElementAndRestriction(
currentRestriction = elem.getValue().getKey().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
currentWindow = currentWindows.get(windowCurrentIndex);
currentTrackerClaimed = new AtomicBoolean(false);
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
new ClaimObserver<PositionT>() {
@Override
public void onClaimed(PositionT position) {}
public void onClaimed(PositionT position) {
currentTrackerClaimed.lazySet(true);
}

@Override
public void onClaimFailed(PositionT position) {}
Expand Down Expand Up @@ -1278,6 +1299,13 @@ private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
if (currentWindow == null) {
return null;
}
// We are requesting a checkpoint but have not yet progressed on the restriction, skip
// request.
if (fractionOfRemainder == 0
&& currentTrackerClaimed != null
&& !currentTrackerClaimed.get()) {
return null;
}

SplitResultsWithStopIndex splitResult =
computeSplitForProcessOrTruncate(
Expand Down Expand Up @@ -1628,6 +1656,12 @@ private HandlesSplits.SplitResult trySplitForElementAndRestriction(
if (currentTracker == null) {
return null;
}
// The tracker has not yet been claimed meaning that a checkpoint won't meaningfully advance.
if (fractionOfRemainder == 0
&& currentTrackerClaimed != null
&& !currentTrackerClaimed.get()) {
return null;
}
// Make sure to get the output watermark before we split to ensure that the lower bound
// applies to the residual.
watermarkAndState = currentWatermarkEstimator.getWatermarkAndState();
Expand Down
Loading

0 comments on commit 20dc4dc

Please sign in to comment.