Skip to content

Commit

Permalink
[Dataflow Streaming] Reduce contention on work submission (#33687)
Browse files Browse the repository at this point in the history
* [Dataflow Streaming] Reduce contention on work submission
  • Loading branch information
arunpandianp authored Jan 24, 2025
1 parent 6b52518 commit e8aea3f
Showing 1 changed file with 43 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.apache.beam.runners.dataflow.worker.util;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;
Expand All @@ -36,6 +38,9 @@ public class BoundedQueueExecutor {

// Used to guard elementsOutstanding and bytesOutstanding.
private final Monitor monitor = new Monitor();
private final ConcurrentLinkedQueue<Long> decrementQueue = new ConcurrentLinkedQueue<>();
private final Object decrementQueueDrainLock = new Object();
private final AtomicBoolean isDecrementBatchPending = new AtomicBoolean(false);
private int elementsOutstanding = 0;
private long bytesOutstanding = 0;

Expand Down Expand Up @@ -236,10 +241,44 @@ private void executeMonitorHeld(Runnable work, long workBytes) {
}

private void decrementCounters(long workBytes) {
monitor.enter();
--elementsOutstanding;
bytesOutstanding -= workBytes;
monitor.leave();
// All threads queue decrements and one thread grabs the monitor and updates
// counters. We do this to reduce contention on monitor which is locked by
// GetWork thread
decrementQueue.add(workBytes);
boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true);
if (submittedToExistingBatch) {
// There is already a thread about to drain the decrement queue
// Current thread does not need to drain.
return;
}
synchronized (decrementQueueDrainLock) {
// By setting false here, we may allow another decrement to claim submission of the next batch
// and start waiting on the decrementQueueDrainLock.
//
// However this prevents races that would leave decrements in the queue and unclaimed and we
// are ensured there is at most one additional thread blocked. This helps prevent the executor
// from creating threads over the limit if many were contending on the lock while their
// decrements were already applied.
isDecrementBatchPending.set(false);
long bytesToDecrement = 0;
int elementsToDecrement = 0;
while (true) {
Long pollResult = decrementQueue.poll();
if (pollResult == null) {
break;
}
bytesToDecrement += pollResult;
++elementsToDecrement;
}
if (elementsToDecrement == 0) {
return;
}

monitor.enter();
elementsOutstanding -= elementsToDecrement;
bytesOutstanding -= bytesToDecrement;
monitor.leave();
}
}

private long bytesAvailable() {
Expand Down

0 comments on commit e8aea3f

Please sign in to comment.