Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds null checks when reading operational limits config. #32051

Closed
wants to merge 11 commits into from
Closed
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;

import com.google.auto.value.AutoBuilder;

/** Keep track of any operational limits required by the backend. */
public class OperationalLimits {
// Maximum size of a commit from a single work item.
public final long maxWorkItemCommitBytes;
// Maximum size of a single output element's serialized key.
public final long maxOutputKeyBytes;
// Maximum size of a single output element's serialized value.
public final long maxOutputValueBytes;
// Whether to throw an exception when processing output that violates any of the given limits.
public final boolean throwExceptionOnLargeOutput;

OperationalLimits(
long maxWorkItemCommitBytes,
long maxOutputKeyBytes,
long maxOutputValueBytes,
boolean throwExceptionOnLargeOutput) {
this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
this.maxOutputKeyBytes = maxOutputKeyBytes;
this.maxOutputValueBytes = maxOutputValueBytes;
this.throwExceptionOnLargeOutput = throwExceptionOnLargeOutput;
}

@AutoBuilder(ofClass = OperationalLimits.class)
public interface Builder {
Builder setMaxWorkItemCommitBytes(long bytes);

Builder setMaxOutputKeyBytes(long bytes);

Builder setMaxOutputValueBytes(long bytes);

Builder setThrowExceptionOnLargeOutput(boolean shouldThrow);

OperationalLimits build();
}

public static Builder builder() {
return new AutoBuilder_OperationalLimits_Builder()
.setMaxWorkItemCommitBytes(Long.MAX_VALUE)
.setMaxOutputKeyBytes(Long.MAX_VALUE)
.setMaxOutputValueBytes(Long.MAX_VALUE)
.setThrowExceptionOnLargeOutput(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;

import org.checkerframework.checker.nullness.qual.Nullable;

/** Indicates that an output element was too large. */
public class OutputTooLargeException extends RuntimeException {
public OutputTooLargeException(String reason) {
super(reason);
}

/** Returns whether an exception was caused by a {@link OutputTooLargeException}. */
public static boolean isCausedByOutputTooLargeException(@Nullable Throwable t) {
while (t != null) {
if (t instanceof OutputTooLargeException) {
return true;
}
t = t.getCause();
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -185,7 +185,7 @@ private StreamingDataflowWorker(
WorkFailureProcessor workFailureProcessor,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
AtomicInteger maxWorkItemCommitBytes,
AtomicReference<OperationalLimits> operationalLimits,
GrpcWindmillStreamFactory windmillStreamFactory,
Function<String, ScheduledExecutorService> executorSupplier,
ConcurrentMap<String, StageInfo> stageInfoMap) {
Expand Down Expand Up @@ -304,15 +304,14 @@ private StreamingDataflowWorker(
streamingCounters,
hotKeyLogger,
sampler,
maxWorkItemCommitBytes,
operationalLimits,
ID_GENERATOR,
stageInfoMap);

LOG.debug("windmillServiceEnabled: {}", windmillServiceEnabled);
LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint());
LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes.get());
}

public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) {
Expand All @@ -322,7 +321,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
StreamingCounters streamingCounters = StreamingCounters.create();
WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, LOG);
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
AtomicInteger maxWorkItemCommitBytes = new AtomicInteger(Integer.MAX_VALUE);
AtomicReference<OperationalLimits> operationalLimits =
new AtomicReference<>(OperationalLimits.builder().build());
WindmillStateCache windmillStateCache =
WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb());
Function<String, ScheduledExecutorService> executorSupplier =
Expand All @@ -337,7 +337,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
createConfigFetcherComputationStateCacheAndWindmillClient(
options,
dataflowServiceClient,
maxWorkItemCommitBytes,
operationalLimits,
windmillStreamFactoryBuilder,
configFetcher ->
ComputationStateCache.create(
Expand Down Expand Up @@ -395,7 +395,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
workFailureProcessor,
streamingCounters,
memoryMonitor,
maxWorkItemCommitBytes,
operationalLimits,
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
executorSupplier,
stageInfo);
Expand All @@ -411,7 +411,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
createConfigFetcherComputationStateCacheAndWindmillClient(
DataflowWorkerHarnessOptions options,
WorkUnitClient dataflowServiceClient,
AtomicInteger maxWorkItemCommitBytes,
AtomicReference<OperationalLimits> operationalLimits,
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder,
Function<ComputationConfig.Fetcher, ComputationStateCache> computationStateCacheFactory) {
ComputationConfig.Fetcher configFetcher;
Expand All @@ -427,8 +427,9 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
config ->
onPipelineConfig(
config,
options,
dispatcherClient::consumeWindmillDispatcherEndpoints,
maxWorkItemCommitBytes));
operationalLimits::set));
computationStateCache = computationStateCacheFactory.apply(configFetcher);
windmillStreamFactory =
windmillStreamFactoryBuilder
Expand Down Expand Up @@ -474,9 +475,9 @@ static StreamingDataflowWorker forTesting(
Supplier<Instant> clock,
Function<String, ScheduledExecutorService> executorSupplier,
int localRetryTimeoutMs,
int maxWorkItemCommitBytesOverrides) {
OperationalLimits limits) {
ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
AtomicInteger maxWorkItemCommitBytes = new AtomicInteger(maxWorkItemCommitBytesOverrides);
AtomicReference<OperationalLimits> operationalLimits = new AtomicReference<>(limits);
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
WindmillStateCache stateCache = WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb());
ComputationConfig.Fetcher configFetcher =
Expand All @@ -489,8 +490,9 @@ static StreamingDataflowWorker forTesting(
config ->
onPipelineConfig(
config,
options,
windmillServer::setWindmillServiceEndpoints,
maxWorkItemCommitBytes))
operationalLimits::set))
: new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
ConcurrentMap<String, String> stateNameMap =
new ConcurrentHashMap<>(prePopulatedStateNameMappings);
Expand Down Expand Up @@ -558,7 +560,7 @@ static StreamingDataflowWorker forTesting(
workFailureProcessor,
streamingCounters,
memoryMonitor,
maxWorkItemCommitBytes,
operationalLimits,
options.isEnableStreamingEngine()
? windmillStreamFactory
.setHealthCheckIntervalMillis(
Expand All @@ -571,12 +573,18 @@ static StreamingDataflowWorker forTesting(

private static void onPipelineConfig(
StreamingEnginePipelineConfig config,
DataflowWorkerHarnessOptions options,
Consumer<ImmutableSet<HostAndPort>> consumeWindmillServiceEndpoints,
AtomicInteger maxWorkItemCommitBytes) {
if (config.maxWorkItemCommitBytes() != maxWorkItemCommitBytes.get()) {
LOG.info("Setting maxWorkItemCommitBytes to {}", maxWorkItemCommitBytes);
maxWorkItemCommitBytes.set((int) config.maxWorkItemCommitBytes());
}
Consumer<OperationalLimits> operationalLimits) {

operationalLimits.accept(
OperationalLimits.builder()
.setMaxWorkItemCommitBytes(config.maxWorkItemCommitBytes())
.setMaxOutputKeyBytes(config.maxOutputKeyBytes())
.setMaxOutputValueBytes(config.maxOutputValueBytes())
.setThrowExceptionOnLargeOutput(
DataflowRunner.hasExperiment(options, "throw_exceptions_on_large_output"))
.build());

if (!config.windmillServiceEndpoints().isEmpty()) {
consumeWindmillServiceEndpoints.accept(config.windmillServiceEndpoints());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
private Work work;
private WindmillComputationKey computationKey;
private SideInputStateFetcher sideInputStateFetcher;
// OperationalLimits is updated in start() because a StreamingModeExecutionContext can
// be used for processing many work items and these values can change during the context's
// lifetime. start() is called for each work item.
private OperationalLimits operationalLimits;
private Windmill.WorkItemCommitRequest.Builder outputBuilder;

/**
Expand Down Expand Up @@ -168,6 +172,18 @@ public final long getBacklogBytes() {
return backlogBytes;
}

public long getMaxOutputKeyBytes() {
return operationalLimits.maxOutputKeyBytes;
}

public long getMaxOutputValueBytes() {
return operationalLimits.maxOutputValueBytes;
}

public boolean throwExceptionsForLargeOutput() {
return operationalLimits.throwExceptionOnLargeOutput;
}

public boolean workIsFailed() {
return Optional.ofNullable(work).map(Work::isFailed).orElse(false);
}
Expand All @@ -177,11 +193,13 @@ public void start(
Work work,
WindmillStateReader stateReader,
SideInputStateFetcher sideInputStateFetcher,
OperationalLimits operationalLimits,
Windmill.WorkItemCommitRequest.Builder outputBuilder) {
this.key = key;
this.work = work;
this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey());
this.sideInputStateFetcher = sideInputStateFetcher;
this.operationalLimits = operationalLimits;
this.outputBuilder = outputBuilder;
this.sideInputCache.clear();
clearSinkFullHint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
Expand All @@ -54,6 +56,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
private final Coder<T> valueCoder;
private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
private StreamingModeExecutionContext context;
private static final Logger LOG = LoggerFactory.getLogger(WindmillSink.class);

WindmillSink(
String destinationName,
Expand Down Expand Up @@ -172,6 +175,28 @@ public long add(WindowedValue<T> data) throws IOException {
key = context.getSerializedKey();
value = encode(valueCoder, data.getValue());
}
if (key.size() > context.getMaxOutputKeyBytes()) {
if (context.throwExceptionsForLargeOutput()) {
throw new OutputTooLargeException("Key too large: " + key.size());
} else {
LOG.error(
"Trying to output too large key with size "
+ key.size()
+ ". Limit is "
+ context.getMaxOutputKeyBytes());
}
}
if (value.size() > context.getMaxOutputValueBytes()) {
if (context.throwExceptionsForLargeOutput()) {
throw new OutputTooLargeException("Value too large: " + value.size());
} else {
LOG.error(
"Trying to output too large value with size "
+ value.size()
+ ". Limit is "
+ context.getMaxOutputValueBytes());
}
}

Windmill.KeyedMessageBundle.Builder keyedOutput = productionMap.get(key);
if (keyedOutput == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
import org.apache.beam.runners.dataflow.worker.OperationalLimits;
import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
Expand All @@ -45,7 +46,7 @@
* @implNote Once closed, it cannot be reused.
*/
// TODO(m-trieu): See if this can be combined/cleaned up with StreamingModeExecutionContext as the
// seperation of responsibilities are unclear.
// separation of responsibilities are unclear.
@AutoValue
@Internal
@NotThreadSafe
Expand All @@ -72,9 +73,11 @@ public final void executeWork(
Work work,
WindmillStateReader stateReader,
SideInputStateFetcher sideInputStateFetcher,
OperationalLimits operationalLimits,
Windmill.WorkItemCommitRequest.Builder outputBuilder)
throws Exception {
context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder);
context()
.start(key, work, stateReader, sideInputStateFetcher, operationalLimits, outputBuilder);
workExecutor().execute();
}

Expand Down
Loading
Loading