Skip to content

Commit

Permalink
Make GrpcCommitWorkStream thread-safe as documented by moving batcher…
Browse files Browse the repository at this point in the history
… out of it. (#31304)

Also increase the number of streams in commit cache to number of threads
  • Loading branch information
scwhittle authored May 21, 2024
1 parent fed6489 commit 89795c0
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ public class StreamingDataflowWorker {
// Maximum number of threads for processing. Currently each thread processes one key at a time.
static final int MAX_PROCESSING_THREADS = 300;
static final long THREAD_EXPIRATION_TIME_SEC = 60;
static final int NUM_COMMIT_STREAMS = 1;
static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1);

Expand Down Expand Up @@ -280,7 +279,7 @@ public class StreamingDataflowWorker {
windmillServiceEnabled
? StreamingEngineWorkCommitter.create(
WindmillStreamPool.create(
NUM_COMMIT_STREAMS, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream)
numCommitThreads, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream)
::getCloseableStream,
numCommitThreads,
this::onCompleteCommit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.client;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest;
Expand Down Expand Up @@ -68,21 +70,34 @@ Windmill.KeyedGetDataResponse requestKeyedData(
/** Interface for streaming CommitWorkRequests to Windmill. */
@ThreadSafe
interface CommitWorkStream extends WindmillStream {
@NotThreadSafe
interface RequestBatcher extends Closeable {
/**
* Commits a work item and running onDone when the commit has been processed by the server.
* Returns true if the request was accepted. If false is returned the stream should be flushed
* and the request recommitted.
*
* <p>onDone will be called with the status of the commit.
*/
boolean commitWorkItem(
String computation,
Windmill.WorkItemCommitRequest request,
Consumer<Windmill.CommitStatus> onDone);

/** Flushes any pending work items to the wire. */
void flush();

@Override
default void close() {
flush();
}
}

/**
* Commits a work item and running onDone when the commit has been processed by the server.
* Returns true if the request was accepted. If false is returned the stream should be flushed
* and the request recommitted.
*
* <p>onDone will be called with the status of the commit.
* Returns a builder that can be used for sending requests. Each builder is not thread-safe but
* different builders for the same stream may be used simultaneously.
*/
boolean commitWorkItem(
String computation,
Windmill.WorkItemCommitRequest request,
Consumer<Windmill.CommitStatus> onDone);

/** Flushes any pending work items to the wire. */
void flush();
RequestBatcher batcher();
}

/** Interface for streaming GetWorkerMetadata requests to Windmill. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ private void commitLoop() {
try {
commit = commitQueue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
continue;
return;
}
while (commit != null) {
ComputationState computationState = commit.computationState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,15 @@ public long currentActiveCommitBytes() {

@Override
public void stop() {
if (!commitSenders.isTerminated() || !commitSenders.isShutdown()) {
commitSenders.shutdown();
if (!commitSenders.isTerminated()) {
commitSenders.shutdownNow();
try {
commitSenders.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("Could not shut down commitSenders gracefully, forcing shutdown.", e);
LOG.warn(
"Commit senders didn't complete shutdown within 10 seconds, continuing to drain queue",
e);
}
commitSenders.shutdownNow();
}
drainCommitQueue();
}
Expand Down Expand Up @@ -143,9 +144,10 @@ private void streamingCommitLoop() {
// Block until we have a commit or are shutting down.
initialCommit = commitQueue.take();
} catch (InterruptedException e) {
continue;
return;
}
}
Preconditions.checkNotNull(initialCommit);

if (initialCommit.work().isFailed()) {
onCommitComplete.accept(CompleteCommit.forFailedWork(initialCommit));
Expand All @@ -156,15 +158,17 @@ private void streamingCommitLoop() {
try (CloseableStream<CommitWorkStream> closeableCommitStream =
commitWorkStreamFactory.get()) {
CommitWorkStream commitStream = closeableCommitStream.stream();
if (!tryAddToCommitStream(initialCommit, commitStream)) {
throw new AssertionError("Initial commit on flushed stream should always be accepted.");
try (CommitWorkStream.RequestBatcher batcher = commitStream.batcher()) {
if (!tryAddToCommitBatch(initialCommit, batcher)) {
throw new AssertionError(
"Initial commit on flushed stream should always be accepted.");
}
// Batch additional commits to the stream and possibly make an un-batched commit the
// next initial commit.
initialCommit = expandBatch(batcher);
}
// Batch additional commits to the stream and possibly make an un-batched commit the next
// initial commit.
initialCommit = batchCommitsToStream(commitStream);
commitStream.flush();
} catch (Exception e) {
LOG.error("Error occurred fetching a CommitWorkStream.", e);
LOG.error("Error occurred sending commits.", e);
}
}
} finally {
Expand All @@ -174,13 +178,13 @@ private void streamingCommitLoop() {
}
}

/** Adds the commit to the commitStream if it fits, returning true if it is consumed. */
private boolean tryAddToCommitStream(Commit commit, CommitWorkStream commitStream) {
/** Adds the commit to the batch if it fits, returning true if it is consumed. */
private boolean tryAddToCommitBatch(Commit commit, CommitWorkStream.RequestBatcher batcher) {
Preconditions.checkNotNull(commit);
commit.work().setState(Work.State.COMMITTING);
activeCommitBytes.addAndGet(commit.getSize());
boolean isCommitAccepted =
commitStream.commitWorkItem(
batcher.commitWorkItem(
commit.computationId(),
commit.request(),
(commitStatus) -> {
Expand All @@ -197,9 +201,9 @@ private boolean tryAddToCommitStream(Commit commit, CommitWorkStream commitStrea
return isCommitAccepted;
}

// Helper to batch additional commits into the commit stream as long as they fit.
// Helper to batch additional commits into the commit batch as long as they fit.
// Returns a commit that was removed from the queue but not consumed or null.
private Commit batchCommitsToStream(CommitWorkStream commitStream) {
private Commit expandBatch(CommitWorkStream.RequestBatcher batcher) {
int commits = 1;
while (true) {
Commit commit;
Expand All @@ -210,8 +214,7 @@ private Commit batchCommitsToStream(CommitWorkStream commitStream) {
commit = commitQueue.poll();
}
} catch (InterruptedException e) {
// Continue processing until !running.get()
continue;
return null;
}

if (commit == null) {
Expand All @@ -224,7 +227,7 @@ private Commit batchCommitsToStream(CommitWorkStream commitStream) {
continue;
}

if (!tryAddToCommitStream(commit, commitStream)) {
if (!tryAddToCommitBatch(commit, batcher)) {
return commit;
}
commits++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public final class GrpcCommitWorkStream
private static final long HEARTBEAT_REQUEST_ID = Long.MAX_VALUE;

private final Map<Long, PendingRequest> pending;
private final Batcher batcher;
private final AtomicLong idGenerator;
private final JobHeader jobHeader;
private final ThrottleTimer commitWorkThrottleTimer;
Expand All @@ -75,7 +74,6 @@ private GrpcCommitWorkStream(
streamRegistry,
logEveryNStreamFailures);
pending = new ConcurrentHashMap<>();
batcher = new Batcher();
this.idGenerator = idGenerator;
this.jobHeader = jobHeader;
this.commitWorkThrottleTimer = commitWorkThrottleTimer;
Expand Down Expand Up @@ -116,14 +114,23 @@ public void appendSpecificHtml(PrintWriter writer) {
@Override
protected synchronized void onNewStream() {
send(StreamingCommitWorkRequest.newBuilder().setHeader(jobHeader).build());
Batcher resendBatcher = new Batcher();
for (Map.Entry<Long, PendingRequest> entry : pending.entrySet()) {
if (!resendBatcher.canAccept(entry.getValue())) {
resendBatcher.flush();
try (Batcher resendBatcher = new Batcher()) {
for (Map.Entry<Long, PendingRequest> entry : pending.entrySet()) {
if (!resendBatcher.canAccept(entry.getValue().getBytes())) {
resendBatcher.flush();
}
resendBatcher.add(entry.getKey(), entry.getValue());
}
resendBatcher.add(entry.getKey(), entry.getValue());
}
resendBatcher.flush();
}

/**
* Returns a builder that can be used for sending requests. Each builder is not thread-safe but
* different builders for the same stream may be used simultaneously.
*/
@Override
public CommitWorkStream.RequestBatcher batcher() {
return new Batcher();
}

@Override
Expand Down Expand Up @@ -175,22 +182,6 @@ protected void startThrottleTimer() {
commitWorkThrottleTimer.start();
}

@Override
public boolean commitWorkItem(
String computation, WorkItemCommitRequest commitRequest, Consumer<CommitStatus> onDone) {
PendingRequest request = new PendingRequest(computation, commitRequest, onDone);
if (!batcher.canAccept(request)) {
return false;
}
batcher.add(idGenerator.incrementAndGet(), request);
return true;
}

@Override
public void flush() {
batcher.flush();
}

private void flushInternal(Map<Long, PendingRequest> requests) {
if (requests.isEmpty()) {
return;
Expand Down Expand Up @@ -305,7 +296,7 @@ long getBytes() {
}
}

private class Batcher {
private class Batcher implements CommitWorkStream.RequestBatcher {

private final Map<Long, PendingRequest> queue;
private long queuedBytes;
Expand All @@ -315,22 +306,35 @@ private Batcher() {
this.queue = new HashMap<>();
}

boolean canAccept(PendingRequest request) {
return queue.isEmpty()
|| (queue.size() < streamingRpcBatchLimit
&& (request.getBytes() + queuedBytes) < AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE);
@Override
public boolean commitWorkItem(
String computation, WorkItemCommitRequest commitRequest, Consumer<CommitStatus> onDone) {
if (!canAccept(commitRequest.getSerializedSize() + computation.length())) {
return false;
}
PendingRequest request = new PendingRequest(computation, commitRequest, onDone);
add(idGenerator.incrementAndGet(), request);
return true;
}

/** Flushes any pending work items to the wire. */
@Override
public void flush() {
flushInternal(queue);
queuedBytes = 0;
queue.clear();
}

void add(long id, PendingRequest request) {
assert (canAccept(request));
assert (canAccept(request.getBytes()));
queuedBytes += request.getBytes();
queue.put(id, request);
}

void flush() {
flushInternal(queue);
queuedBytes = 0;
queue.clear();
private boolean canAccept(long requestBytes) {
return queue.isEmpty()
|| (queue.size() < streamingRpcBatchLimit
&& (requestBytes + queuedBytes) < AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE);
}
}
}
Loading

0 comments on commit 89795c0

Please sign in to comment.