From 9e128f951d37ef37be305280689520328cc5415f Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Tue, 2 Jul 2024 22:01:13 +0000 Subject: [PATCH 01/11] Grabs operational limits from streaming config and plumbs them to WindmillSink, which can then throw an exception if outputs are too large. --- .../worker/StreamingDataflowWorker.java | 45 ++++++++- .../worker/StreamingModeExecutionContext.java | 14 +++ .../runners/dataflow/worker/WindmillSink.java | 6 ++ .../streaming/ComputationWorkExecutor.java | 7 +- ...reamingEngineComputationConfigFetcher.java | 23 ++++- .../config/StreamingEnginePipelineConfig.java | 10 ++ .../processing/StreamingWorkScheduler.java | 17 +++- .../worker/StreamingDataflowWorkerTest.java | 98 ++++++++++++++++++- .../StreamingModeExecutionContextTest.java | 4 + .../worker/WorkerCustomSourcesTest.java | 4 + 10 files changed, 214 insertions(+), 14 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 59819db88a07..b3d251ae1db0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -186,6 +186,8 @@ private StreamingDataflowWorker( StreamingCounters streamingCounters, MemoryMonitor memoryMonitor, AtomicInteger maxWorkItemCommitBytes, + AtomicInteger maxOutputKeyBytes, + AtomicInteger maxOutputValueBytes, GrpcWindmillStreamFactory windmillStreamFactory, Function executorSupplier, ConcurrentMap stageInfoMap) { @@ -305,6 +307,8 @@ private StreamingDataflowWorker( hotKeyLogger, sampler, maxWorkItemCommitBytes, + maxOutputKeyBytes, + maxOutputValueBytes, ID_GENERATOR, stageInfoMap); @@ -323,6 +327,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, LOG); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); AtomicInteger maxWorkItemCommitBytes = new AtomicInteger(Integer.MAX_VALUE); + AtomicInteger maxOutputKeyBytes = new AtomicInteger(Integer.MAX_VALUE); + AtomicInteger maxOutputValueBytes = new AtomicInteger(Integer.MAX_VALUE); WindmillStateCache windmillStateCache = WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()); Function executorSupplier = @@ -338,6 +344,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o options, dataflowServiceClient, maxWorkItemCommitBytes, + maxOutputKeyBytes, + maxOutputValueBytes, windmillStreamFactoryBuilder, configFetcher -> ComputationStateCache.create( @@ -396,6 +404,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o streamingCounters, memoryMonitor, maxWorkItemCommitBytes, + maxOutputKeyBytes, + maxOutputValueBytes, configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(), executorSupplier, stageInfo); @@ -412,6 +422,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o DataflowWorkerHarnessOptions options, WorkUnitClient dataflowServiceClient, AtomicInteger maxWorkItemCommitBytes, + AtomicInteger maxOutputKeyBytes, + AtomicInteger maxOutputValueBytes, GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder, Function computationStateCacheFactory) { ComputationConfig.Fetcher configFetcher; @@ -424,11 +436,14 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o StreamingEngineComputationConfigFetcher.create( options.getGlobalConfigRefreshPeriod().getMillis(), dataflowServiceClient, + DataflowRunner.hasExperiment(options, "throw_exceptions_on_large_output"), config -> onPipelineConfig( config, dispatcherClient::consumeWindmillDispatcherEndpoints, - maxWorkItemCommitBytes)); + maxWorkItemCommitBytes, + maxOutputKeyBytes, + maxOutputValueBytes)); computationStateCache = computationStateCacheFactory.apply(configFetcher); windmillStreamFactory = windmillStreamFactoryBuilder @@ -474,9 +489,13 @@ static StreamingDataflowWorker forTesting( Supplier clock, Function executorSupplier, int localRetryTimeoutMs, - int maxWorkItemCommitBytesOverrides) { + int maxWorkItemCommitBytesOverrides, + int maxOutputKeyBytesOverride, + int maxOutputValueBytesOverride) { ConcurrentMap stageInfo = new ConcurrentHashMap<>(); AtomicInteger maxWorkItemCommitBytes = new AtomicInteger(maxWorkItemCommitBytesOverrides); + AtomicInteger maxOutputKeyBytes = new AtomicInteger(maxOutputKeyBytesOverride); + AtomicInteger maxOutputValueBytes = new AtomicInteger(maxOutputValueBytesOverride); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); WindmillStateCache stateCache = WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()); ComputationConfig.Fetcher configFetcher = @@ -490,7 +509,9 @@ static StreamingDataflowWorker forTesting( onPipelineConfig( config, windmillServer::setWindmillServiceEndpoints, - maxWorkItemCommitBytes)) + maxWorkItemCommitBytes, + maxOutputKeyBytes, + maxOutputValueBytes)) : new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig); ConcurrentMap stateNameMap = new ConcurrentHashMap<>(prePopulatedStateNameMappings); @@ -559,6 +580,8 @@ static StreamingDataflowWorker forTesting( streamingCounters, memoryMonitor, maxWorkItemCommitBytes, + maxOutputKeyBytes, + maxOutputValueBytes, options.isEnableStreamingEngine() ? windmillStreamFactory .setHealthCheckIntervalMillis( @@ -572,12 +595,24 @@ static StreamingDataflowWorker forTesting( private static void onPipelineConfig( StreamingEnginePipelineConfig config, Consumer> consumeWindmillServiceEndpoints, - AtomicInteger maxWorkItemCommitBytes) { + AtomicInteger maxWorkItemCommitBytes, + AtomicInteger maxOutputKeyBytes, + AtomicInteger maxOutputValueBytes) { if (config.maxWorkItemCommitBytes() != maxWorkItemCommitBytes.get()) { - LOG.info("Setting maxWorkItemCommitBytes to {}", maxWorkItemCommitBytes); + LOG.info("Setting maxWorkItemCommitBytes to {}", config.maxWorkItemCommitBytes()); maxWorkItemCommitBytes.set((int) config.maxWorkItemCommitBytes()); } + if (config.maxOutputKeyBytes() != maxOutputKeyBytes.get()) { + LOG.info("Setting maxOutputKeyBytes to {}", config.maxOutputKeyBytes()); + maxOutputKeyBytes.set((int) config.maxOutputKeyBytes()); + } + + if (config.maxOutputValueBytes() != maxOutputValueBytes.get()) { + LOG.info("Setting maxOutputValueBytes to {}", config.maxOutputValueBytes()); + maxOutputValueBytes.set((int) config.maxOutputValueBytes()); + } + if (!config.windmillServiceEndpoints().isEmpty()) { consumeWindmillServiceEndpoints.accept(config.windmillServiceEndpoints()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index dd6353060abc..5b20cc73c9df 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -129,6 +129,11 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext data) throws IOException { key = context.getSerializedKey(); value = encode(valueCoder, data.getValue()); } + if (key.size() > context.getMaxOutputKeyBytes()) { + throw new IOException("Key too large: " + key.size()); + } + if (value.size() > context.getMaxOutputValueBytes()) { + throw new IOException("Value too large: " + value.size()); + } Windmill.KeyedMessageBundle.Builder keyedOutput = productionMap.get(key); if (keyedOutput == null) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java index dd34e85bc93c..9304e72968fc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java @@ -45,7 +45,7 @@ * @implNote Once closed, it cannot be reused. */ // TODO(m-trieu): See if this can be combined/cleaned up with StreamingModeExecutionContext as the -// seperation of responsibilities are unclear. +// separation of responsibilities are unclear. @AutoValue @Internal @NotThreadSafe @@ -72,9 +72,12 @@ public final void executeWork( Work work, WindmillStateReader stateReader, SideInputStateFetcher sideInputStateFetcher, + int maxOutputKeyBytes, + int maxOutputValueBytes, Windmill.WorkItemCommitRequest.Builder outputBuilder) throws Exception { - context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder); + context().start(key, work, stateReader, sideInputStateFetcher, maxOutputKeyBytes, + maxOutputValueBytes, outputBuilder); workExecutor().execute(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java index 51d1507af5fe..809ff902cd7a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java @@ -72,6 +72,7 @@ public final class StreamingEngineComputationConfigFetcher implements Computatio private final long globalConfigRefreshPeriodMillis; private final WorkUnitClient dataflowServiceClient; private final ScheduledExecutorService globalConfigRefresher; + private final boolean shouldPerformOutputSizeChecks; private final Consumer onStreamingConfig; private final AtomicBoolean hasReceivedGlobalConfig; @@ -80,10 +81,12 @@ private StreamingEngineComputationConfigFetcher( long globalConfigRefreshPeriodMillis, WorkUnitClient dataflowServiceClient, ScheduledExecutorService globalConfigRefresher, + boolean shouldPerformOutputSizeChecks, Consumer onStreamingConfig) { this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis; this.dataflowServiceClient = dataflowServiceClient; this.globalConfigRefresher = globalConfigRefresher; + this.shouldPerformOutputSizeChecks = shouldPerformOutputSizeChecks; this.onStreamingConfig = onStreamingConfig; this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig); } @@ -91,6 +94,7 @@ private StreamingEngineComputationConfigFetcher( public static StreamingEngineComputationConfigFetcher create( long globalConfigRefreshPeriodMillis, WorkUnitClient dataflowServiceClient, + boolean shouldPerformOutputSizeChecks, Consumer onStreamingConfig) { return new StreamingEngineComputationConfigFetcher( /* hasReceivedGlobalConfig= */ false, @@ -98,6 +102,7 @@ public static StreamingEngineComputationConfigFetcher create( dataflowServiceClient, Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME).build()), + shouldPerformOutputSizeChecks, onStreamingConfig); } @@ -113,6 +118,7 @@ public static StreamingEngineComputationConfigFetcher forTesting( globalConfigRefreshPeriodMillis, dataflowServiceClient, executorSupplier.apply(CONFIG_REFRESHER_THREAD_NAME), + /*shouldPerformOutputSizeChecks=*/true, onStreamingConfig); } @@ -157,7 +163,8 @@ private static Optional fetchConfigWithRetry( } } - private static StreamingEnginePipelineConfig createPipelineConfig(StreamingConfigTask config) { + private StreamingEnginePipelineConfig createPipelineConfig( + StreamingConfigTask config, boolean shouldPerformOutputSizeChecks) { StreamingEnginePipelineConfig.Builder pipelineConfig = StreamingEnginePipelineConfig.builder(); if (config.getUserStepToStateFamilyNameMap() != null) { pipelineConfig.setUserStepToStateFamilyNameMap(config.getUserStepToStateFamilyNameMap()); @@ -187,6 +194,18 @@ private static StreamingEnginePipelineConfig createPipelineConfig(StreamingConfi pipelineConfig.setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue()); } + if (shouldPerformOutputSizeChecks && config.getOperationalLimits() != null) { + if (config.getOperationalLimits().getMaxKeyBytes() > 0 + && config.getOperationalLimits().getMaxKeyBytes() <= Integer.MAX_VALUE) { + pipelineConfig.setMaxOutputKeyBytes(config.getOperationalLimits().getMaxKeyBytes()); + } + if (config.getOperationalLimits().getMaxProductionOutputBytes() > 0 + && config.getOperationalLimits().getMaxProductionOutputBytes() <= Integer.MAX_VALUE) { + pipelineConfig.setMaxOutputValueBytes( + config.getOperationalLimits().getMaxProductionOutputBytes()); + } + } + return pipelineConfig.build(); } @@ -273,7 +292,7 @@ private synchronized void fetchInitialPipelineGlobalConfig() { private Optional fetchGlobalConfig() { return fetchConfigWithRetry(dataflowServiceClient::getGlobalStreamingConfigWorkItem) - .map(StreamingEngineComputationConfigFetcher::createPipelineConfig); + .map(config -> createPipelineConfig(config, shouldPerformOutputSizeChecks)); } @FunctionalInterface diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java index b5b761ada703..8f1ff93f6a49 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java @@ -34,12 +34,18 @@ public abstract class StreamingEnginePipelineConfig { public static StreamingEnginePipelineConfig.Builder builder() { return new AutoValue_StreamingEnginePipelineConfig.Builder() .setMaxWorkItemCommitBytes(DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES) + .setMaxOutputKeyBytes(Long.MAX_VALUE) + .setMaxOutputValueBytes(Long.MAX_VALUE) .setUserStepToStateFamilyNameMap(new HashMap<>()) .setWindmillServiceEndpoints(ImmutableSet.of()); } public abstract long maxWorkItemCommitBytes(); + public abstract long maxOutputKeyBytes(); + + public abstract long maxOutputValueBytes(); + public abstract Map userStepToStateFamilyNameMap(); public abstract ImmutableSet windmillServiceEndpoints(); @@ -48,6 +54,10 @@ public static StreamingEnginePipelineConfig.Builder builder() { public abstract static class Builder { public abstract Builder setMaxWorkItemCommitBytes(long value); + public abstract Builder setMaxOutputKeyBytes(long value); + + public abstract Builder setMaxOutputValueBytes(long value); + public abstract Builder setUserStepToStateFamilyNameMap(Map value); public abstract Builder setWindmillServiceEndpoints(ImmutableSet value); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 334ab8efeae2..ad8cd3a1cb3b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -82,6 +82,8 @@ public final class StreamingWorkScheduler { private final ConcurrentMap stageInfoMap; private final DataflowExecutionStateSampler sampler; private final AtomicInteger maxWorkItemCommitBytes; + private final AtomicInteger maxOutputKeyBytes; + private final AtomicInteger maxOutputValueBytes; public StreamingWorkScheduler( DataflowWorkerHarnessOptions options, @@ -95,7 +97,9 @@ public StreamingWorkScheduler( HotKeyLogger hotKeyLogger, ConcurrentMap stageInfoMap, DataflowExecutionStateSampler sampler, - AtomicInteger maxWorkItemCommitBytes) { + AtomicInteger maxWorkItemCommitBytes, + AtomicInteger maxOutputKeyBytes, + AtomicInteger maxOutputValueBytes) { this.options = options; this.clock = clock; this.computationWorkExecutorFactory = computationWorkExecutorFactory; @@ -108,6 +112,8 @@ public StreamingWorkScheduler( this.stageInfoMap = stageInfoMap; this.sampler = sampler; this.maxWorkItemCommitBytes = maxWorkItemCommitBytes; + this.maxOutputKeyBytes = maxOutputKeyBytes; + this.maxOutputValueBytes = maxOutputValueBytes; } public static StreamingWorkScheduler create( @@ -124,6 +130,8 @@ public static StreamingWorkScheduler create( HotKeyLogger hotKeyLogger, DataflowExecutionStateSampler sampler, AtomicInteger maxWorkItemCommitBytes, + AtomicInteger maxOutputKeyBytes, + AtomicInteger maxOutputValueBytes, IdGenerator idGenerator, ConcurrentMap stageInfoMap) { ComputationWorkExecutorFactory computationWorkExecutorFactory = @@ -148,7 +156,9 @@ public static StreamingWorkScheduler create( hotKeyLogger, stageInfoMap, sampler, - maxWorkItemCommitBytes); + maxWorkItemCommitBytes, + maxOutputKeyBytes, + maxOutputValueBytes); } private static long computeShuffleBytesRead(Windmill.WorkItem workItem) { @@ -375,7 +385,8 @@ private ExecuteWorkResult executeWork( // Blocks while executing work. computationWorkExecutor.executeWork( - executionKey, work, stateReader, localSideInputStateFetcher, outputBuilder); + executionKey, work, stateReader, localSideInputStateFetcher, maxOutputKeyBytes.get(), + maxOutputValueBytes.get(), outputBuilder); if (work.isFailed()) { throw new WorkItemCancelledException(workItem.getShardingKey()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index e3aee23e511e..43262ddf5002 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -849,7 +849,9 @@ private StreamingDataflowWorker makeWorker( streamingDataflowWorkerTestParams.clock(), streamingDataflowWorkerTestParams.executorSupplier(), streamingDataflowWorkerTestParams.localRetryTimeoutMs(), - streamingDataflowWorkerTestParams.maxWorkItemCommitBytes()); + streamingDataflowWorkerTestParams.maxWorkItemCommitBytes(), + streamingDataflowWorkerTestParams.maxOutputKeyBytes(), + streamingDataflowWorkerTestParams.maxOutputValueBytes()); this.computationStateCache = worker.getComputationStateCache(); return worker; } @@ -1271,6 +1273,76 @@ public void testKeyCommitTooLargeException() throws Exception { assertTrue(foundErrors); } + @Test + public void testOutputKeyTooLargeException() throws Exception { + KvCoder kvCoder = KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); + + List instructions = + Arrays.asList( + makeSourceInstruction(kvCoder), + makeDoFnInstruction(new ExceptionCatchingFn(), 0, kvCoder), + makeSinkInstruction(kvCoder, 1)); + + server.setExpectedExceptionCount(1); + + StreamingDataflowWorker worker = + makeWorker( + defaultWorkerParams() + .setInstructions(instructions) + .setMaxOutputKeyBytes(15) + .build()); + worker.start(); + + // This large key will cause the ExceptionCatchingFn to throw an exception, which will then + // cause it to output a smaller key. + String bigKey = "some_much_too_large_output_key"; + server + .whenGetWorkCalled() + .thenReturn(makeInput(1, 0, bigKey, DEFAULT_SHARDING_KEY)); + server.waitForEmptyWorkQueue(); + + Map result = server.waitForAndGetCommits(1); + assertEquals(1, result.size()); + assertEquals( + makeExpectedOutput(1, 0, bigKey, DEFAULT_SHARDING_KEY, "smaller_key").build(), + removeDynamicFields(result.get(1L))); + } + + @Test + public void testOutputValueTooLargeException() throws Exception { + KvCoder kvCoder = KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); + + List instructions = + Arrays.asList( + makeSourceInstruction(kvCoder), + makeDoFnInstruction(new ExceptionCatchingFn(), 0, kvCoder), + makeSinkInstruction(kvCoder, 1)); + + server.setExpectedExceptionCount(1); + + StreamingDataflowWorker worker = + makeWorker( + defaultWorkerParams() + .setInstructions(instructions) + .setMaxOutputValueBytes(15) + .build()); + worker.start(); + + // The first time processing will have value "data1_a_bunch_more_data_output", which is above + // the limit. After throwing the exception, the output should be just "data1", which is small + // enough. + server + .whenGetWorkCalled() + .thenReturn(makeInput(1, 0, "key", DEFAULT_SHARDING_KEY)); + server.waitForEmptyWorkQueue(); + + Map result = server.waitForAndGetCommits(1); + assertEquals(1, result.size()); + assertEquals( + makeExpectedOutput(1, 0, "key", DEFAULT_SHARDING_KEY, "smaller_key").build(), + removeDynamicFields(result.get(1L))); + } + @Test public void testKeyChange() throws Exception { KvCoder kvCoder = KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); @@ -4021,6 +4093,18 @@ public void processElement(ProcessContext c) { } } + static class ExceptionCatchingFn extends DoFn, KV> { + + @ProcessElement + public void processElement(ProcessContext c) { + try { + c.output(KV.of(c.element().getKey(), c.element().getValue() + "_a_bunch_more_data_output")); + } catch (Exception e) { + c.output(KV.of("smaller_key", c.element().getValue())); + } + } + } + static class ChangeKeysFn extends DoFn, KV> { @ProcessElement @@ -4433,7 +4517,9 @@ private static StreamingDataflowWorkerTestParams.Builder builder() { .setLocalRetryTimeoutMs(-1) .setPublishCounters(false) .setClock(Instant::now) - .setMaxWorkItemCommitBytes(Integer.MAX_VALUE); + .setMaxWorkItemCommitBytes(Integer.MAX_VALUE) + .setMaxOutputKeyBytes(Integer.MAX_VALUE) + .setMaxOutputValueBytes(Integer.MAX_VALUE); } abstract ImmutableMap stateNameMappings(); @@ -4452,6 +4538,10 @@ private static StreamingDataflowWorkerTestParams.Builder builder() { abstract int maxWorkItemCommitBytes(); + abstract int maxOutputKeyBytes(); + + abstract int maxOutputValueBytes(); + @AutoValue.Builder abstract static class Builder { abstract Builder setStateNameMappings(ImmutableMap value); @@ -4486,6 +4576,10 @@ final Builder publishCounters() { abstract Builder setMaxWorkItemCommitBytes(int maxWorkItemCommitBytes); + abstract Builder setMaxOutputKeyBytes(int maxOutputKeyBytes); + + abstract Builder setMaxOutputValueBytes(int maxOutputValueBytes); + abstract StreamingDataflowWorkerTestParams build(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 2193f20f3fe3..38c5333895ac 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -154,6 +154,8 @@ public void testTimerInternalsSetTimer() { Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), stateReader, sideInputStateFetcher, + /*maxOutputKeyBytes=*/Integer.MAX_VALUE, + /*maxOutputValueBytes=*/Integer.MAX_VALUE, outputBuilder); TimerInternals timerInternals = stepContext.timerInternals(); @@ -203,6 +205,8 @@ public void testTimerInternalsProcessingTimeSkew() { Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), stateReader, sideInputStateFetcher, + /*maxOutputKeyBytes=*/Integer.MAX_VALUE, + /*maxOutputValueBytes=*/Integer.MAX_VALUE, outputBuilder); TimerInternals timerInternals = stepContext.timerInternals(); assertTrue(timerTimestamp.isBefore(timerInternals.currentProcessingTime())); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 9f97c9835ddc..5335c7ad108f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -634,6 +634,8 @@ public void testReadUnboundedReader() throws Exception { Watermarks.builder().setInputDataWatermark(new Instant(0)).build()), mock(WindmillStateReader.class), mock(SideInputStateFetcher.class), + /*maxOutputKeyBytes=*/Integer.MAX_VALUE, + /*maxOutputValueBytes=*/Integer.MAX_VALUE, Windmill.WorkItemCommitRequest.newBuilder()); @SuppressWarnings({"unchecked", "rawtypes"}) @@ -1005,6 +1007,8 @@ public void testFailedWorkItemsAbort() throws Exception { dummyWork, mock(WindmillStateReader.class), mock(SideInputStateFetcher.class), + /*maxOutputKeyBytes=*/Integer.MAX_VALUE, + /*maxOutputValueBytes=*/Integer.MAX_VALUE, Windmill.WorkItemCommitRequest.newBuilder()); @SuppressWarnings({"unchecked", "rawtypes"}) From 700eebe3d5507a175751d8eee4f22e4fbd8f44eb Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Wed, 10 Jul 2024 01:19:27 +0000 Subject: [PATCH 02/11] Replaces 3 AtomicIntegers with an AtomicReference to a new struct containing those ints. Adds a custom exception for too large of outputs. --- .../worker/OutputTooLargeException.java | 39 ++++++++ .../worker/StreamingDataflowWorker.java | 91 ++++++++++--------- .../runners/dataflow/worker/WindmillSink.java | 4 +- .../processing/StreamingWorkScheduler.java | 29 +++--- 4 files changed, 102 insertions(+), 61 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java new file mode 100644 index 000000000000..0b763e773226 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +/** Indicates that an output element was too large. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class OutputTooLargeException extends RuntimeException { + public OutputTooLargeException(String reason) { + super(reason); + } + + /** Returns whether an exception was caused by a {@link OutputTooLargeException}. */ + public static boolean isOutputTooLargeException(Throwable t) { + while (t != null) { + if (t instanceof OutputTooLargeException) { + return true; + } + t = t.getCause(); + } + return false; + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index b3d251ae1db0..34356a14120f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -35,7 +35,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -185,9 +185,7 @@ private StreamingDataflowWorker( WorkFailureProcessor workFailureProcessor, StreamingCounters streamingCounters, MemoryMonitor memoryMonitor, - AtomicInteger maxWorkItemCommitBytes, - AtomicInteger maxOutputKeyBytes, - AtomicInteger maxOutputValueBytes, + AtomicReference operationalLimits, GrpcWindmillStreamFactory windmillStreamFactory, Function executorSupplier, ConcurrentMap stageInfoMap) { @@ -306,9 +304,7 @@ private StreamingDataflowWorker( streamingCounters, hotKeyLogger, sampler, - maxWorkItemCommitBytes, - maxOutputKeyBytes, - maxOutputValueBytes, + operationalLimits, ID_GENERATOR, stageInfoMap); @@ -316,7 +312,6 @@ private StreamingDataflowWorker( LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint()); LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort()); LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); - LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes.get()); } public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) { @@ -326,9 +321,14 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o StreamingCounters streamingCounters = StreamingCounters.create(); WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, LOG); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); - AtomicInteger maxWorkItemCommitBytes = new AtomicInteger(Integer.MAX_VALUE); - AtomicInteger maxOutputKeyBytes = new AtomicInteger(Integer.MAX_VALUE); - AtomicInteger maxOutputValueBytes = new AtomicInteger(Integer.MAX_VALUE); + AtomicReference operationalLimits = + new AtomicReference<>(new OperationalLimits()); + operationalLimits.getAndUpdate((limits) -> { + limits.maxWorkItemCommitBytes = Integer.MAX_VALUE; + limits.maxOutputKeyBytes = Integer.MAX_VALUE; + limits.maxOutputValueBytes = Integer.MAX_VALUE; + return limits; + }); WindmillStateCache windmillStateCache = WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()); Function executorSupplier = @@ -343,9 +343,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o createConfigFetcherComputationStateCacheAndWindmillClient( options, dataflowServiceClient, - maxWorkItemCommitBytes, - maxOutputKeyBytes, - maxOutputValueBytes, + operationalLimits, windmillStreamFactoryBuilder, configFetcher -> ComputationStateCache.create( @@ -403,14 +401,21 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o workFailureProcessor, streamingCounters, memoryMonitor, - maxWorkItemCommitBytes, - maxOutputKeyBytes, - maxOutputValueBytes, + operationalLimits, configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(), executorSupplier, stageInfo); } + public static class OperationalLimits { + // Maximum size of a commit from a single work item. + public int maxWorkItemCommitBytes; + // Maximum size of a single output element's serialized key. + public int maxOutputKeyBytes; + // Maximum size of a single output element's serialized value. + public int maxOutputValueBytes; + } + /** * {@link ComputationConfig.Fetcher}, {@link ComputationStateCache}, and {@link * WindmillServerStub} are constructed in different orders due to cyclic dependencies depending on @@ -421,9 +426,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o createConfigFetcherComputationStateCacheAndWindmillClient( DataflowWorkerHarnessOptions options, WorkUnitClient dataflowServiceClient, - AtomicInteger maxWorkItemCommitBytes, - AtomicInteger maxOutputKeyBytes, - AtomicInteger maxOutputValueBytes, + AtomicReference operationalLimits, GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder, Function computationStateCacheFactory) { ComputationConfig.Fetcher configFetcher; @@ -441,9 +444,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o onPipelineConfig( config, dispatcherClient::consumeWindmillDispatcherEndpoints, - maxWorkItemCommitBytes, - maxOutputKeyBytes, - maxOutputValueBytes)); + operationalLimits)); computationStateCache = computationStateCacheFactory.apply(configFetcher); windmillStreamFactory = windmillStreamFactoryBuilder @@ -493,9 +494,14 @@ static StreamingDataflowWorker forTesting( int maxOutputKeyBytesOverride, int maxOutputValueBytesOverride) { ConcurrentMap stageInfo = new ConcurrentHashMap<>(); - AtomicInteger maxWorkItemCommitBytes = new AtomicInteger(maxWorkItemCommitBytesOverrides); - AtomicInteger maxOutputKeyBytes = new AtomicInteger(maxOutputKeyBytesOverride); - AtomicInteger maxOutputValueBytes = new AtomicInteger(maxOutputValueBytesOverride); + AtomicReference operationalLimits = + new AtomicReference<>(new OperationalLimits()); + operationalLimits.getAndUpdate((limits) -> { + limits.maxWorkItemCommitBytes = maxWorkItemCommitBytesOverrides; + limits.maxOutputKeyBytes = maxOutputKeyBytesOverride; + limits.maxOutputValueBytes = maxOutputValueBytesOverride; + return limits; + }); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); WindmillStateCache stateCache = WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()); ComputationConfig.Fetcher configFetcher = @@ -509,9 +515,7 @@ static StreamingDataflowWorker forTesting( onPipelineConfig( config, windmillServer::setWindmillServiceEndpoints, - maxWorkItemCommitBytes, - maxOutputKeyBytes, - maxOutputValueBytes)) + operationalLimits)) : new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig); ConcurrentMap stateNameMap = new ConcurrentHashMap<>(prePopulatedStateNameMappings); @@ -579,9 +583,7 @@ static StreamingDataflowWorker forTesting( workFailureProcessor, streamingCounters, memoryMonitor, - maxWorkItemCommitBytes, - maxOutputKeyBytes, - maxOutputValueBytes, + operationalLimits, options.isEnableStreamingEngine() ? windmillStreamFactory .setHealthCheckIntervalMillis( @@ -595,22 +597,29 @@ static StreamingDataflowWorker forTesting( private static void onPipelineConfig( StreamingEnginePipelineConfig config, Consumer> consumeWindmillServiceEndpoints, - AtomicInteger maxWorkItemCommitBytes, - AtomicInteger maxOutputKeyBytes, - AtomicInteger maxOutputValueBytes) { - if (config.maxWorkItemCommitBytes() != maxWorkItemCommitBytes.get()) { + AtomicReference operationalLimits) { + if (config.maxWorkItemCommitBytes() != operationalLimits.get().maxWorkItemCommitBytes) { LOG.info("Setting maxWorkItemCommitBytes to {}", config.maxWorkItemCommitBytes()); - maxWorkItemCommitBytes.set((int) config.maxWorkItemCommitBytes()); + operationalLimits.getAndUpdate((limits) -> { + limits.maxWorkItemCommitBytes = (int)config.maxWorkItemCommitBytes(); + return limits; + }); } - if (config.maxOutputKeyBytes() != maxOutputKeyBytes.get()) { + if (config.maxOutputKeyBytes() != operationalLimits.get().maxOutputKeyBytes) { LOG.info("Setting maxOutputKeyBytes to {}", config.maxOutputKeyBytes()); - maxOutputKeyBytes.set((int) config.maxOutputKeyBytes()); + operationalLimits.getAndUpdate((limits) -> { + limits.maxOutputKeyBytes = (int)config.maxOutputKeyBytes(); + return limits; + }); } - if (config.maxOutputValueBytes() != maxOutputValueBytes.get()) { + if (config.maxOutputValueBytes() != operationalLimits.get().maxOutputValueBytes) { LOG.info("Setting maxOutputValueBytes to {}", config.maxOutputValueBytes()); - maxOutputValueBytes.set((int) config.maxOutputValueBytes()); + operationalLimits.getAndUpdate((limits) -> { + limits.maxOutputValueBytes = (int)config.maxOutputValueBytes(); + return limits; + }); } if (!config.windmillServiceEndpoints().isEmpty()) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index e5ce442732cf..606ae65bb8df 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -173,10 +173,10 @@ public long add(WindowedValue data) throws IOException { value = encode(valueCoder, data.getValue()); } if (key.size() > context.getMaxOutputKeyBytes()) { - throw new IOException("Key too large: " + key.size()); + throw new OutputTooLargeException("Key too large: " + key.size()); } if (value.size() > context.getMaxOutputValueBytes()) { - throw new IOException("Value too large: " + value.size()); + throw new OutputTooLargeException("Value too large: " + value.size()); } Windmill.KeyedMessageBundle.Builder keyedOutput = productionMap.get(key); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index ad8cd3a1cb3b..789343793225 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; import javax.annotation.concurrent.ThreadSafe; @@ -32,6 +33,7 @@ import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutorFactory; import org.apache.beam.runners.dataflow.worker.HotKeyLogger; import org.apache.beam.runners.dataflow.worker.ReaderCache; +import org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker; import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException; import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC; import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; @@ -81,9 +83,7 @@ public final class StreamingWorkScheduler { private final HotKeyLogger hotKeyLogger; private final ConcurrentMap stageInfoMap; private final DataflowExecutionStateSampler sampler; - private final AtomicInteger maxWorkItemCommitBytes; - private final AtomicInteger maxOutputKeyBytes; - private final AtomicInteger maxOutputValueBytes; + private final AtomicReference operationalLimits; public StreamingWorkScheduler( DataflowWorkerHarnessOptions options, @@ -97,9 +97,7 @@ public StreamingWorkScheduler( HotKeyLogger hotKeyLogger, ConcurrentMap stageInfoMap, DataflowExecutionStateSampler sampler, - AtomicInteger maxWorkItemCommitBytes, - AtomicInteger maxOutputKeyBytes, - AtomicInteger maxOutputValueBytes) { + AtomicReference operationalLimits) { this.options = options; this.clock = clock; this.computationWorkExecutorFactory = computationWorkExecutorFactory; @@ -111,9 +109,7 @@ public StreamingWorkScheduler( this.hotKeyLogger = hotKeyLogger; this.stageInfoMap = stageInfoMap; this.sampler = sampler; - this.maxWorkItemCommitBytes = maxWorkItemCommitBytes; - this.maxOutputKeyBytes = maxOutputKeyBytes; - this.maxOutputValueBytes = maxOutputValueBytes; + this.operationalLimits = operationalLimits; } public static StreamingWorkScheduler create( @@ -129,9 +125,7 @@ public static StreamingWorkScheduler create( StreamingCounters streamingCounters, HotKeyLogger hotKeyLogger, DataflowExecutionStateSampler sampler, - AtomicInteger maxWorkItemCommitBytes, - AtomicInteger maxOutputKeyBytes, - AtomicInteger maxOutputValueBytes, + AtomicReference operationalLimits, IdGenerator idGenerator, ConcurrentMap stageInfoMap) { ComputationWorkExecutorFactory computationWorkExecutorFactory = @@ -156,9 +150,7 @@ public static StreamingWorkScheduler create( hotKeyLogger, stageInfoMap, sampler, - maxWorkItemCommitBytes, - maxOutputKeyBytes, - maxOutputValueBytes); + operationalLimits); } private static long computeShuffleBytesRead(Windmill.WorkItem workItem) { @@ -302,7 +294,7 @@ private Windmill.WorkItemCommitRequest validateCommitRequestSize( Windmill.WorkItemCommitRequest commitRequest, String computationId, Windmill.WorkItem workItem) { - int byteLimit = maxWorkItemCommitBytes.get(); + int byteLimit = operationalLimits.get().maxWorkItemCommitBytes; int commitSize = commitRequest.getSerializedSize(); int estimatedCommitSize = commitSize < 0 ? Integer.MAX_VALUE : commitSize; @@ -385,8 +377,9 @@ private ExecuteWorkResult executeWork( // Blocks while executing work. computationWorkExecutor.executeWork( - executionKey, work, stateReader, localSideInputStateFetcher, maxOutputKeyBytes.get(), - maxOutputValueBytes.get(), outputBuilder); + executionKey, work, stateReader, localSideInputStateFetcher, + operationalLimits.get().maxOutputKeyBytes, operationalLimits.get().maxOutputValueBytes, + outputBuilder); if (work.isFailed()) { throw new WorkItemCancelledException(workItem.getShardingKey()); From 85fe38c96700708cbbfb6566ee9c8f91f72b6047 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Wed, 10 Jul 2024 03:55:22 +0000 Subject: [PATCH 03/11] Run spotlessApply. --- .../worker/StreamingDataflowWorker.java | 57 ++++++++++--------- .../worker/StreamingModeExecutionContext.java | 8 ++- .../streaming/ComputationWorkExecutor.java | 11 +++- ...reamingEngineComputationConfigFetcher.java | 2 +- .../processing/StreamingWorkScheduler.java | 9 ++- .../worker/StreamingDataflowWorkerTest.java | 18 ++---- .../StreamingModeExecutionContextTest.java | 8 +-- .../worker/WorkerCustomSourcesTest.java | 8 +-- 8 files changed, 64 insertions(+), 57 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 34356a14120f..9b7b7ca259d2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -323,12 +323,13 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); AtomicReference operationalLimits = new AtomicReference<>(new OperationalLimits()); - operationalLimits.getAndUpdate((limits) -> { - limits.maxWorkItemCommitBytes = Integer.MAX_VALUE; - limits.maxOutputKeyBytes = Integer.MAX_VALUE; - limits.maxOutputValueBytes = Integer.MAX_VALUE; - return limits; - }); + operationalLimits.getAndUpdate( + (limits) -> { + limits.maxWorkItemCommitBytes = Integer.MAX_VALUE; + limits.maxOutputKeyBytes = Integer.MAX_VALUE; + limits.maxOutputValueBytes = Integer.MAX_VALUE; + return limits; + }); WindmillStateCache windmillStateCache = WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()); Function executorSupplier = @@ -496,12 +497,13 @@ static StreamingDataflowWorker forTesting( ConcurrentMap stageInfo = new ConcurrentHashMap<>(); AtomicReference operationalLimits = new AtomicReference<>(new OperationalLimits()); - operationalLimits.getAndUpdate((limits) -> { - limits.maxWorkItemCommitBytes = maxWorkItemCommitBytesOverrides; - limits.maxOutputKeyBytes = maxOutputKeyBytesOverride; - limits.maxOutputValueBytes = maxOutputValueBytesOverride; - return limits; - }); + operationalLimits.getAndUpdate( + (limits) -> { + limits.maxWorkItemCommitBytes = maxWorkItemCommitBytesOverrides; + limits.maxOutputKeyBytes = maxOutputKeyBytesOverride; + limits.maxOutputValueBytes = maxOutputValueBytesOverride; + return limits; + }); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); WindmillStateCache stateCache = WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()); ComputationConfig.Fetcher configFetcher = @@ -513,9 +515,7 @@ static StreamingDataflowWorker forTesting( executorSupplier, config -> onPipelineConfig( - config, - windmillServer::setWindmillServiceEndpoints, - operationalLimits)) + config, windmillServer::setWindmillServiceEndpoints, operationalLimits)) : new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig); ConcurrentMap stateNameMap = new ConcurrentHashMap<>(prePopulatedStateNameMappings); @@ -600,26 +600,29 @@ private static void onPipelineConfig( AtomicReference operationalLimits) { if (config.maxWorkItemCommitBytes() != operationalLimits.get().maxWorkItemCommitBytes) { LOG.info("Setting maxWorkItemCommitBytes to {}", config.maxWorkItemCommitBytes()); - operationalLimits.getAndUpdate((limits) -> { - limits.maxWorkItemCommitBytes = (int)config.maxWorkItemCommitBytes(); - return limits; - }); + operationalLimits.getAndUpdate( + (limits) -> { + limits.maxWorkItemCommitBytes = (int) config.maxWorkItemCommitBytes(); + return limits; + }); } if (config.maxOutputKeyBytes() != operationalLimits.get().maxOutputKeyBytes) { LOG.info("Setting maxOutputKeyBytes to {}", config.maxOutputKeyBytes()); - operationalLimits.getAndUpdate((limits) -> { - limits.maxOutputKeyBytes = (int)config.maxOutputKeyBytes(); - return limits; - }); + operationalLimits.getAndUpdate( + (limits) -> { + limits.maxOutputKeyBytes = (int) config.maxOutputKeyBytes(); + return limits; + }); } if (config.maxOutputValueBytes() != operationalLimits.get().maxOutputValueBytes) { LOG.info("Setting maxOutputValueBytes to {}", config.maxOutputValueBytes()); - operationalLimits.getAndUpdate((limits) -> { - limits.maxOutputValueBytes = (int)config.maxOutputValueBytes(); - return limits; - }); + operationalLimits.getAndUpdate( + (limits) -> { + limits.maxOutputValueBytes = (int) config.maxOutputValueBytes(); + return limits; + }); } if (!config.windmillServiceEndpoints().isEmpty()) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 5b20cc73c9df..1bbc49d1a532 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -174,9 +174,13 @@ public final long getBacklogBytes() { } // Use long instead? - public int getMaxOutputKeyBytes() { return maxOutputKeyBytes; } + public int getMaxOutputKeyBytes() { + return maxOutputKeyBytes; + } - public int getMaxOutputValueBytes() { return maxOutputValueBytes; } + public int getMaxOutputValueBytes() { + return maxOutputValueBytes; + } public boolean workIsFailed() { return Optional.ofNullable(work).map(Work::isFailed).orElse(false); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java index 9304e72968fc..adbcbd3a42f7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java @@ -76,8 +76,15 @@ public final void executeWork( int maxOutputValueBytes, Windmill.WorkItemCommitRequest.Builder outputBuilder) throws Exception { - context().start(key, work, stateReader, sideInputStateFetcher, maxOutputKeyBytes, - maxOutputValueBytes, outputBuilder); + context() + .start( + key, + work, + stateReader, + sideInputStateFetcher, + maxOutputKeyBytes, + maxOutputValueBytes, + outputBuilder); workExecutor().execute(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java index 809ff902cd7a..f5baac699961 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java @@ -118,7 +118,7 @@ public static StreamingEngineComputationConfigFetcher forTesting( globalConfigRefreshPeriodMillis, dataflowServiceClient, executorSupplier.apply(CONFIG_REFRESHER_THREAD_NAME), - /*shouldPerformOutputSizeChecks=*/true, + /*shouldPerformOutputSizeChecks=*/ true, onStreamingConfig); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 789343793225..0fdd47a27385 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -23,7 +23,6 @@ import java.util.Optional; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; @@ -377,8 +376,12 @@ private ExecuteWorkResult executeWork( // Blocks while executing work. computationWorkExecutor.executeWork( - executionKey, work, stateReader, localSideInputStateFetcher, - operationalLimits.get().maxOutputKeyBytes, operationalLimits.get().maxOutputValueBytes, + executionKey, + work, + stateReader, + localSideInputStateFetcher, + operationalLimits.get().maxOutputKeyBytes, + operationalLimits.get().maxOutputValueBytes, outputBuilder); if (work.isFailed()) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 43262ddf5002..59f8b0630ca6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -1287,18 +1287,13 @@ public void testOutputKeyTooLargeException() throws Exception { StreamingDataflowWorker worker = makeWorker( - defaultWorkerParams() - .setInstructions(instructions) - .setMaxOutputKeyBytes(15) - .build()); + defaultWorkerParams().setInstructions(instructions).setMaxOutputKeyBytes(15).build()); worker.start(); // This large key will cause the ExceptionCatchingFn to throw an exception, which will then // cause it to output a smaller key. String bigKey = "some_much_too_large_output_key"; - server - .whenGetWorkCalled() - .thenReturn(makeInput(1, 0, bigKey, DEFAULT_SHARDING_KEY)); + server.whenGetWorkCalled().thenReturn(makeInput(1, 0, bigKey, DEFAULT_SHARDING_KEY)); server.waitForEmptyWorkQueue(); Map result = server.waitForAndGetCommits(1); @@ -1322,18 +1317,13 @@ public void testOutputValueTooLargeException() throws Exception { StreamingDataflowWorker worker = makeWorker( - defaultWorkerParams() - .setInstructions(instructions) - .setMaxOutputValueBytes(15) - .build()); + defaultWorkerParams().setInstructions(instructions).setMaxOutputValueBytes(15).build()); worker.start(); // The first time processing will have value "data1_a_bunch_more_data_output", which is above // the limit. After throwing the exception, the output should be just "data1", which is small // enough. - server - .whenGetWorkCalled() - .thenReturn(makeInput(1, 0, "key", DEFAULT_SHARDING_KEY)); + server.whenGetWorkCalled().thenReturn(makeInput(1, 0, "key", DEFAULT_SHARDING_KEY)); server.waitForEmptyWorkQueue(); Map result = server.waitForAndGetCommits(1); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 38c5333895ac..d6ab4f09233a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -154,8 +154,8 @@ public void testTimerInternalsSetTimer() { Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), stateReader, sideInputStateFetcher, - /*maxOutputKeyBytes=*/Integer.MAX_VALUE, - /*maxOutputValueBytes=*/Integer.MAX_VALUE, + /*maxOutputKeyBytes=*/ Integer.MAX_VALUE, + /*maxOutputValueBytes=*/ Integer.MAX_VALUE, outputBuilder); TimerInternals timerInternals = stepContext.timerInternals(); @@ -205,8 +205,8 @@ public void testTimerInternalsProcessingTimeSkew() { Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), stateReader, sideInputStateFetcher, - /*maxOutputKeyBytes=*/Integer.MAX_VALUE, - /*maxOutputValueBytes=*/Integer.MAX_VALUE, + /*maxOutputKeyBytes=*/ Integer.MAX_VALUE, + /*maxOutputValueBytes=*/ Integer.MAX_VALUE, outputBuilder); TimerInternals timerInternals = stepContext.timerInternals(); assertTrue(timerTimestamp.isBefore(timerInternals.currentProcessingTime())); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 5335c7ad108f..8cfdda5c4e5b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -634,8 +634,8 @@ public void testReadUnboundedReader() throws Exception { Watermarks.builder().setInputDataWatermark(new Instant(0)).build()), mock(WindmillStateReader.class), mock(SideInputStateFetcher.class), - /*maxOutputKeyBytes=*/Integer.MAX_VALUE, - /*maxOutputValueBytes=*/Integer.MAX_VALUE, + /*maxOutputKeyBytes=*/ Integer.MAX_VALUE, + /*maxOutputValueBytes=*/ Integer.MAX_VALUE, Windmill.WorkItemCommitRequest.newBuilder()); @SuppressWarnings({"unchecked", "rawtypes"}) @@ -1007,8 +1007,8 @@ public void testFailedWorkItemsAbort() throws Exception { dummyWork, mock(WindmillStateReader.class), mock(SideInputStateFetcher.class), - /*maxOutputKeyBytes=*/Integer.MAX_VALUE, - /*maxOutputValueBytes=*/Integer.MAX_VALUE, + /*maxOutputKeyBytes=*/ Integer.MAX_VALUE, + /*maxOutputValueBytes=*/ Integer.MAX_VALUE, Windmill.WorkItemCommitRequest.newBuilder()); @SuppressWarnings({"unchecked", "rawtypes"}) From 6edf1cb828d1a1d1c0b15a17e1ddad5c4313cfb5 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Wed, 10 Jul 2024 15:15:38 +0000 Subject: [PATCH 04/11] Changed operational limits to long to avoid possible truncation. --- .../worker/StreamingDataflowWorker.java | 24 +++++++++---------- .../worker/StreamingModeExecutionContext.java | 13 +++++----- .../streaming/ComputationWorkExecutor.java | 4 ++-- .../processing/StreamingWorkScheduler.java | 2 +- 4 files changed, 21 insertions(+), 22 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 9b7b7ca259d2..f463d13acc76 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -325,9 +325,9 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o new AtomicReference<>(new OperationalLimits()); operationalLimits.getAndUpdate( (limits) -> { - limits.maxWorkItemCommitBytes = Integer.MAX_VALUE; - limits.maxOutputKeyBytes = Integer.MAX_VALUE; - limits.maxOutputValueBytes = Integer.MAX_VALUE; + limits.maxWorkItemCommitBytes = Long.MAX_VALUE; + limits.maxOutputKeyBytes = Long.MAX_VALUE; + limits.maxOutputValueBytes = Long.MAX_VALUE; return limits; }); WindmillStateCache windmillStateCache = @@ -410,11 +410,11 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o public static class OperationalLimits { // Maximum size of a commit from a single work item. - public int maxWorkItemCommitBytes; + public long maxWorkItemCommitBytes; // Maximum size of a single output element's serialized key. - public int maxOutputKeyBytes; + public long maxOutputKeyBytes; // Maximum size of a single output element's serialized value. - public int maxOutputValueBytes; + public long maxOutputValueBytes; } /** @@ -491,9 +491,9 @@ static StreamingDataflowWorker forTesting( Supplier clock, Function executorSupplier, int localRetryTimeoutMs, - int maxWorkItemCommitBytesOverrides, - int maxOutputKeyBytesOverride, - int maxOutputValueBytesOverride) { + long maxWorkItemCommitBytesOverrides, + long maxOutputKeyBytesOverride, + long maxOutputValueBytesOverride) { ConcurrentMap stageInfo = new ConcurrentHashMap<>(); AtomicReference operationalLimits = new AtomicReference<>(new OperationalLimits()); @@ -602,7 +602,7 @@ private static void onPipelineConfig( LOG.info("Setting maxWorkItemCommitBytes to {}", config.maxWorkItemCommitBytes()); operationalLimits.getAndUpdate( (limits) -> { - limits.maxWorkItemCommitBytes = (int) config.maxWorkItemCommitBytes(); + limits.maxWorkItemCommitBytes = config.maxWorkItemCommitBytes(); return limits; }); } @@ -611,7 +611,7 @@ private static void onPipelineConfig( LOG.info("Setting maxOutputKeyBytes to {}", config.maxOutputKeyBytes()); operationalLimits.getAndUpdate( (limits) -> { - limits.maxOutputKeyBytes = (int) config.maxOutputKeyBytes(); + limits.maxOutputKeyBytes = config.maxOutputKeyBytes(); return limits; }); } @@ -620,7 +620,7 @@ private static void onPipelineConfig( LOG.info("Setting maxOutputValueBytes to {}", config.maxOutputValueBytes()); operationalLimits.getAndUpdate( (limits) -> { - limits.maxOutputValueBytes = (int) config.maxOutputValueBytes(); + limits.maxOutputValueBytes = config.maxOutputValueBytes(); return limits; }); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 1bbc49d1a532..a1db4a93cf4b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -132,8 +132,8 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext Date: Wed, 10 Jul 2024 15:23:11 +0000 Subject: [PATCH 05/11] Also updates a couple test case uses of operational limits to be longs to avoid possible truncation. --- .../worker/StreamingDataflowWorkerTest.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 59f8b0630ca6..1eb20b6b1db5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -4507,9 +4507,9 @@ private static StreamingDataflowWorkerTestParams.Builder builder() { .setLocalRetryTimeoutMs(-1) .setPublishCounters(false) .setClock(Instant::now) - .setMaxWorkItemCommitBytes(Integer.MAX_VALUE) - .setMaxOutputKeyBytes(Integer.MAX_VALUE) - .setMaxOutputValueBytes(Integer.MAX_VALUE); + .setMaxWorkItemCommitBytes(Long.MAX_VALUE) + .setMaxOutputKeyBytes(Long.MAX_VALUE) + .setMaxOutputValueBytes(Long.MAX_VALUE); } abstract ImmutableMap stateNameMappings(); @@ -4526,11 +4526,11 @@ private static StreamingDataflowWorkerTestParams.Builder builder() { abstract int localRetryTimeoutMs(); - abstract int maxWorkItemCommitBytes(); + abstract long maxWorkItemCommitBytes(); - abstract int maxOutputKeyBytes(); + abstract long maxOutputKeyBytes(); - abstract int maxOutputValueBytes(); + abstract long maxOutputValueBytes(); @AutoValue.Builder abstract static class Builder { @@ -4564,11 +4564,11 @@ final Builder publishCounters() { abstract Builder setLocalRetryTimeoutMs(int value); - abstract Builder setMaxWorkItemCommitBytes(int maxWorkItemCommitBytes); + abstract Builder setMaxWorkItemCommitBytes(long maxWorkItemCommitBytes); - abstract Builder setMaxOutputKeyBytes(int maxOutputKeyBytes); + abstract Builder setMaxOutputKeyBytes(long maxOutputKeyBytes); - abstract Builder setMaxOutputValueBytes(int maxOutputValueBytes); + abstract Builder setMaxOutputValueBytes(long maxOutputValueBytes); abstract StreamingDataflowWorkerTestParams build(); } From c7c42b014f77a73994214e405209cbd18d15e347 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Wed, 24 Jul 2024 21:40:04 +0000 Subject: [PATCH 06/11] Run spotlessApply. --- .../apache/beam/runners/dataflow/worker/OperationalLimits.java | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java new file mode 100644 index 000000000000..7508100c5ae3 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java @@ -0,0 +1,2 @@ +package org.apache.beam.runners.dataflow.worker;public class OperationalLimits { +} From c879a97c4c784022f74042c416b24ef4bced9ffe Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Wed, 24 Jul 2024 21:43:33 +0000 Subject: [PATCH 07/11] Moved OperationalLimits to its own class and passed it around instead of the individual fields. Also addressed minor PR comments including making the class immutable. --- .../dataflow/worker/OperationalLimits.java | 54 +++++++++++++- .../worker/OutputTooLargeException.java | 2 +- .../worker/StreamingDataflowWorker.java | 71 +++++-------------- .../worker/StreamingModeExecutionContext.java | 15 ++-- .../streaming/ComputationWorkExecutor.java | 13 +--- .../processing/StreamingWorkScheduler.java | 11 ++- .../worker/StreamingDataflowWorkerTest.java | 1 - .../StreamingModeExecutionContextTest.java | 6 +- .../worker/WorkerCustomSourcesTest.java | 8 +-- 9 files changed, 91 insertions(+), 90 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java index 7508100c5ae3..47e36e498507 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java @@ -1,2 +1,54 @@ -package org.apache.beam.runners.dataflow.worker;public class OperationalLimits { +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import com.google.auto.value.AutoBuilder; + +/** Keep track of any operational limits required by the backend. */ +public class OperationalLimits { + // Maximum size of a commit from a single work item. + public final long maxWorkItemCommitBytes; + // Maximum size of a single output element's serialized key. + public final long maxOutputKeyBytes; + // Maximum size of a single output element's serialized value. + public final long maxOutputValueBytes; + + OperationalLimits(long maxWorkItemCommitBytes, long maxOutputKeyBytes, long maxOutputValueBytes) { + this.maxWorkItemCommitBytes = maxWorkItemCommitBytes; + this.maxOutputKeyBytes = maxOutputKeyBytes; + this.maxOutputValueBytes = maxOutputValueBytes; + } + + @AutoBuilder(ofClass = OperationalLimits.class) + public interface Builder { + Builder setMaxWorkItemCommitBytes(long bytes); + + Builder setMaxOutputKeyBytes(long bytes); + + Builder setMaxOutputValueBytes(long bytes); + + OperationalLimits build(); + } + + public static Builder builder() { + return new AutoBuilder_OperationalLimits_Builder() + .setMaxWorkItemCommitBytes(Long.MAX_VALUE) + .setMaxOutputKeyBytes(Long.MAX_VALUE) + .setMaxOutputValueBytes(Long.MAX_VALUE); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java index 0b763e773226..da80f087e55e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java @@ -27,7 +27,7 @@ public OutputTooLargeException(String reason) { } /** Returns whether an exception was caused by a {@link OutputTooLargeException}. */ - public static boolean isOutputTooLargeException(Throwable t) { + public static boolean isCausedByOutputTooLargeException(Throwable t) { while (t != null) { if (t instanceof OutputTooLargeException) { return true; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index f463d13acc76..95213407d9e1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -322,14 +322,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, LOG); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); AtomicReference operationalLimits = - new AtomicReference<>(new OperationalLimits()); - operationalLimits.getAndUpdate( - (limits) -> { - limits.maxWorkItemCommitBytes = Long.MAX_VALUE; - limits.maxOutputKeyBytes = Long.MAX_VALUE; - limits.maxOutputValueBytes = Long.MAX_VALUE; - return limits; - }); + new AtomicReference<>(OperationalLimits.builder().build()); WindmillStateCache windmillStateCache = WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()); Function executorSupplier = @@ -408,15 +401,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o stageInfo); } - public static class OperationalLimits { - // Maximum size of a commit from a single work item. - public long maxWorkItemCommitBytes; - // Maximum size of a single output element's serialized key. - public long maxOutputKeyBytes; - // Maximum size of a single output element's serialized value. - public long maxOutputValueBytes; - } - /** * {@link ComputationConfig.Fetcher}, {@link ComputationStateCache}, and {@link * WindmillServerStub} are constructed in different orders due to cyclic dependencies depending on @@ -445,7 +429,7 @@ public static class OperationalLimits { onPipelineConfig( config, dispatcherClient::consumeWindmillDispatcherEndpoints, - operationalLimits)); + operationalLimits::set)); computationStateCache = computationStateCacheFactory.apply(configFetcher); windmillStreamFactory = windmillStreamFactoryBuilder @@ -496,14 +480,12 @@ static StreamingDataflowWorker forTesting( long maxOutputValueBytesOverride) { ConcurrentMap stageInfo = new ConcurrentHashMap<>(); AtomicReference operationalLimits = - new AtomicReference<>(new OperationalLimits()); - operationalLimits.getAndUpdate( - (limits) -> { - limits.maxWorkItemCommitBytes = maxWorkItemCommitBytesOverrides; - limits.maxOutputKeyBytes = maxOutputKeyBytesOverride; - limits.maxOutputValueBytes = maxOutputValueBytesOverride; - return limits; - }); + new AtomicReference<>( + OperationalLimits.builder() + .setMaxWorkItemCommitBytes(maxWorkItemCommitBytesOverrides) + .setMaxOutputKeyBytes(maxOutputKeyBytesOverride) + .setMaxOutputValueBytes(maxOutputValueBytesOverride) + .build()); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); WindmillStateCache stateCache = WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()); ComputationConfig.Fetcher configFetcher = @@ -515,7 +497,9 @@ static StreamingDataflowWorker forTesting( executorSupplier, config -> onPipelineConfig( - config, windmillServer::setWindmillServiceEndpoints, operationalLimits)) + config, + windmillServer::setWindmillServiceEndpoints, + operationalLimits::set)) : new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig); ConcurrentMap stateNameMap = new ConcurrentHashMap<>(prePopulatedStateNameMappings); @@ -597,33 +581,14 @@ static StreamingDataflowWorker forTesting( private static void onPipelineConfig( StreamingEnginePipelineConfig config, Consumer> consumeWindmillServiceEndpoints, - AtomicReference operationalLimits) { - if (config.maxWorkItemCommitBytes() != operationalLimits.get().maxWorkItemCommitBytes) { - LOG.info("Setting maxWorkItemCommitBytes to {}", config.maxWorkItemCommitBytes()); - operationalLimits.getAndUpdate( - (limits) -> { - limits.maxWorkItemCommitBytes = config.maxWorkItemCommitBytes(); - return limits; - }); - } + Consumer operationalLimits) { - if (config.maxOutputKeyBytes() != operationalLimits.get().maxOutputKeyBytes) { - LOG.info("Setting maxOutputKeyBytes to {}", config.maxOutputKeyBytes()); - operationalLimits.getAndUpdate( - (limits) -> { - limits.maxOutputKeyBytes = config.maxOutputKeyBytes(); - return limits; - }); - } - - if (config.maxOutputValueBytes() != operationalLimits.get().maxOutputValueBytes) { - LOG.info("Setting maxOutputValueBytes to {}", config.maxOutputValueBytes()); - operationalLimits.getAndUpdate( - (limits) -> { - limits.maxOutputValueBytes = config.maxOutputValueBytes(); - return limits; - }); - } + operationalLimits.accept( + OperationalLimits.builder() + .setMaxWorkItemCommitBytes(config.maxWorkItemCommitBytes()) + .setMaxOutputKeyBytes(config.maxOutputKeyBytes()) + .setMaxOutputValueBytes(config.maxOutputValueBytes()) + .build()); if (!config.windmillServiceEndpoints().isEmpty()) { consumeWindmillServiceEndpoints.accept(config.windmillServiceEndpoints()); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index a1db4a93cf4b..a29c0e2913db 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -129,11 +129,10 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext stageInfoMap; private final DataflowExecutionStateSampler sampler; - private final AtomicReference operationalLimits; + private final AtomicReference operationalLimits; public StreamingWorkScheduler( DataflowWorkerHarnessOptions options, @@ -96,7 +96,7 @@ public StreamingWorkScheduler( HotKeyLogger hotKeyLogger, ConcurrentMap stageInfoMap, DataflowExecutionStateSampler sampler, - AtomicReference operationalLimits) { + AtomicReference operationalLimits) { this.options = options; this.clock = clock; this.computationWorkExecutorFactory = computationWorkExecutorFactory; @@ -124,7 +124,7 @@ public static StreamingWorkScheduler create( StreamingCounters streamingCounters, HotKeyLogger hotKeyLogger, DataflowExecutionStateSampler sampler, - AtomicReference operationalLimits, + AtomicReference operationalLimits, IdGenerator idGenerator, ConcurrentMap stageInfoMap) { ComputationWorkExecutorFactory computationWorkExecutorFactory = @@ -380,8 +380,7 @@ private ExecuteWorkResult executeWork( work, stateReader, localSideInputStateFetcher, - operationalLimits.get().maxOutputKeyBytes, - operationalLimits.get().maxOutputValueBytes, + operationalLimits.get(), outputBuilder); if (work.isFailed()) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 1eb20b6b1db5..c1e259377df4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -549,7 +549,6 @@ private Windmill.GetWorkResponse buildSessionInput( List inputs, List timers) throws Exception { - // Windmill.GetWorkResponse.Builder builder = Windmill.GetWorkResponse.newBuilder(); Windmill.WorkItem.Builder builder = Windmill.WorkItem.newBuilder(); builder.setKey(DEFAULT_KEY_BYTES); builder.setShardingKey(DEFAULT_SHARDING_KEY); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index d6ab4f09233a..a293c693ceaa 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -154,8 +154,7 @@ public void testTimerInternalsSetTimer() { Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), stateReader, sideInputStateFetcher, - /*maxOutputKeyBytes=*/ Integer.MAX_VALUE, - /*maxOutputValueBytes=*/ Integer.MAX_VALUE, + OperationalLimits.builder().build(), outputBuilder); TimerInternals timerInternals = stepContext.timerInternals(); @@ -205,8 +204,7 @@ public void testTimerInternalsProcessingTimeSkew() { Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), stateReader, sideInputStateFetcher, - /*maxOutputKeyBytes=*/ Integer.MAX_VALUE, - /*maxOutputValueBytes=*/ Integer.MAX_VALUE, + OperationalLimits.builder().build(), outputBuilder); TimerInternals timerInternals = stepContext.timerInternals(); assertTrue(timerTimestamp.isBefore(timerInternals.currentProcessingTime())); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 8cfdda5c4e5b..4179d31ef176 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -634,8 +634,7 @@ public void testReadUnboundedReader() throws Exception { Watermarks.builder().setInputDataWatermark(new Instant(0)).build()), mock(WindmillStateReader.class), mock(SideInputStateFetcher.class), - /*maxOutputKeyBytes=*/ Integer.MAX_VALUE, - /*maxOutputValueBytes=*/ Integer.MAX_VALUE, + OperationalLimits.builder().build(), Windmill.WorkItemCommitRequest.newBuilder()); @SuppressWarnings({"unchecked", "rawtypes"}) @@ -999,7 +998,7 @@ public void testFailedWorkItemsAbort() throws Exception { Work.createProcessingContext( COMPUTATION_ID, (a, b) -> Windmill.KeyedGetDataResponse.getDefaultInstance(), - gnored -> {}), + ignored -> {}), Instant::now, Collections.emptyList()); context.start( @@ -1007,8 +1006,7 @@ public void testFailedWorkItemsAbort() throws Exception { dummyWork, mock(WindmillStateReader.class), mock(SideInputStateFetcher.class), - /*maxOutputKeyBytes=*/ Integer.MAX_VALUE, - /*maxOutputValueBytes=*/ Integer.MAX_VALUE, + OperationalLimits.builder().build(), Windmill.WorkItemCommitRequest.newBuilder()); @SuppressWarnings({"unchecked", "rawtypes"}) From e274142ce2931e3e85db2feaf05c2f0f44642bcc Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Mon, 29 Jul 2024 20:32:38 +0000 Subject: [PATCH 08/11] Marks OutputTooLargeException's Throwable input as Nullable so we can remove SuppressWarnings. --- .../runners/dataflow/worker/OutputTooLargeException.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java index da80f087e55e..9f4b413841c5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java @@ -17,17 +17,16 @@ */ package org.apache.beam.runners.dataflow.worker; +import org.checkerframework.checker.nullness.qual.Nullable; + /** Indicates that an output element was too large. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class OutputTooLargeException extends RuntimeException { public OutputTooLargeException(String reason) { super(reason); } /** Returns whether an exception was caused by a {@link OutputTooLargeException}. */ - public static boolean isCausedByOutputTooLargeException(Throwable t) { + public static boolean isCausedByOutputTooLargeException(@Nullable Throwable t) { while (t != null) { if (t instanceof OutputTooLargeException) { return true; From 58eb4cf4c8cb67d4c7b1f9b24296a7ef56fe7dd8 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Mon, 29 Jul 2024 20:52:25 +0000 Subject: [PATCH 09/11] Replaces more instances of passing individual operational limits with an OperationalLimits object. --- .../worker/StreamingDataflowWorker.java | 12 ++----- ...reamingEngineComputationConfigFetcher.java | 18 +++++----- .../worker/StreamingDataflowWorkerTest.java | 34 ++++++++----------- 3 files changed, 26 insertions(+), 38 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 95213407d9e1..511f63e69cb8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -475,17 +475,9 @@ static StreamingDataflowWorker forTesting( Supplier clock, Function executorSupplier, int localRetryTimeoutMs, - long maxWorkItemCommitBytesOverrides, - long maxOutputKeyBytesOverride, - long maxOutputValueBytesOverride) { + OperationalLimits limits) { ConcurrentMap stageInfo = new ConcurrentHashMap<>(); - AtomicReference operationalLimits = - new AtomicReference<>( - OperationalLimits.builder() - .setMaxWorkItemCommitBytes(maxWorkItemCommitBytesOverrides) - .setMaxOutputKeyBytes(maxOutputKeyBytesOverride) - .setMaxOutputValueBytes(maxOutputValueBytesOverride) - .build()); + AtomicReference operationalLimits = new AtomicReference<>(limits); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); WindmillStateCache stateCache = WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()); ComputationConfig.Fetcher configFetcher = diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java index f5baac699961..402a6e6fde6d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java @@ -72,7 +72,7 @@ public final class StreamingEngineComputationConfigFetcher implements Computatio private final long globalConfigRefreshPeriodMillis; private final WorkUnitClient dataflowServiceClient; private final ScheduledExecutorService globalConfigRefresher; - private final boolean shouldPerformOutputSizeChecks; + private final boolean shouldThrowExceptionsOnLargeOutput; private final Consumer onStreamingConfig; private final AtomicBoolean hasReceivedGlobalConfig; @@ -81,12 +81,12 @@ private StreamingEngineComputationConfigFetcher( long globalConfigRefreshPeriodMillis, WorkUnitClient dataflowServiceClient, ScheduledExecutorService globalConfigRefresher, - boolean shouldPerformOutputSizeChecks, + boolean shouldThrowExceptionsOnLargeOutput, Consumer onStreamingConfig) { this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis; this.dataflowServiceClient = dataflowServiceClient; this.globalConfigRefresher = globalConfigRefresher; - this.shouldPerformOutputSizeChecks = shouldPerformOutputSizeChecks; + this.shouldThrowExceptionsOnLargeOutput = shouldThrowExceptionsOnLargeOutput; this.onStreamingConfig = onStreamingConfig; this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig); } @@ -94,7 +94,7 @@ private StreamingEngineComputationConfigFetcher( public static StreamingEngineComputationConfigFetcher create( long globalConfigRefreshPeriodMillis, WorkUnitClient dataflowServiceClient, - boolean shouldPerformOutputSizeChecks, + boolean shouldThrowExceptionsOnLargeOutput, Consumer onStreamingConfig) { return new StreamingEngineComputationConfigFetcher( /* hasReceivedGlobalConfig= */ false, @@ -102,7 +102,7 @@ public static StreamingEngineComputationConfigFetcher create( dataflowServiceClient, Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME).build()), - shouldPerformOutputSizeChecks, + shouldThrowExceptionsOnLargeOutput, onStreamingConfig); } @@ -118,7 +118,7 @@ public static StreamingEngineComputationConfigFetcher forTesting( globalConfigRefreshPeriodMillis, dataflowServiceClient, executorSupplier.apply(CONFIG_REFRESHER_THREAD_NAME), - /*shouldPerformOutputSizeChecks=*/ true, + /*shouldThrowExceptionsOnLargeOutput=*/ true, onStreamingConfig); } @@ -164,7 +164,7 @@ private static Optional fetchConfigWithRetry( } private StreamingEnginePipelineConfig createPipelineConfig( - StreamingConfigTask config, boolean shouldPerformOutputSizeChecks) { + StreamingConfigTask config, boolean shouldThrowExceptionsOnLargeOutput) { StreamingEnginePipelineConfig.Builder pipelineConfig = StreamingEnginePipelineConfig.builder(); if (config.getUserStepToStateFamilyNameMap() != null) { pipelineConfig.setUserStepToStateFamilyNameMap(config.getUserStepToStateFamilyNameMap()); @@ -194,7 +194,7 @@ private StreamingEnginePipelineConfig createPipelineConfig( pipelineConfig.setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue()); } - if (shouldPerformOutputSizeChecks && config.getOperationalLimits() != null) { + if (shouldThrowExceptionsOnLargeOutput && config.getOperationalLimits() != null) { if (config.getOperationalLimits().getMaxKeyBytes() > 0 && config.getOperationalLimits().getMaxKeyBytes() <= Integer.MAX_VALUE) { pipelineConfig.setMaxOutputKeyBytes(config.getOperationalLimits().getMaxKeyBytes()); @@ -292,7 +292,7 @@ private synchronized void fetchInitialPipelineGlobalConfig() { private Optional fetchGlobalConfig() { return fetchConfigWithRetry(dataflowServiceClient::getGlobalStreamingConfigWorkItem) - .map(config -> createPipelineConfig(config, shouldPerformOutputSizeChecks)); + .map(config -> createPipelineConfig(config, shouldThrowExceptionsOnLargeOutput)); } @FunctionalInterface diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index c1e259377df4..05e116582d9b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -848,9 +848,7 @@ private StreamingDataflowWorker makeWorker( streamingDataflowWorkerTestParams.clock(), streamingDataflowWorkerTestParams.executorSupplier(), streamingDataflowWorkerTestParams.localRetryTimeoutMs(), - streamingDataflowWorkerTestParams.maxWorkItemCommitBytes(), - streamingDataflowWorkerTestParams.maxOutputKeyBytes(), - streamingDataflowWorkerTestParams.maxOutputValueBytes()); + streamingDataflowWorkerTestParams.operationalLimits()); this.computationStateCache = worker.getComputationStateCache(); return worker; } @@ -1217,7 +1215,8 @@ public void testKeyCommitTooLargeException() throws Exception { makeWorker( defaultWorkerParams() .setInstructions(instructions) - .setMaxWorkItemCommitBytes(1000) + .setOperationalLimits( + OperationalLimits.builder().setMaxWorkItemCommitBytes(1000).build()) .publishCounters() .build()); worker.start(); @@ -1286,7 +1285,10 @@ public void testOutputKeyTooLargeException() throws Exception { StreamingDataflowWorker worker = makeWorker( - defaultWorkerParams().setInstructions(instructions).setMaxOutputKeyBytes(15).build()); + defaultWorkerParams() + .setInstructions(instructions) + .setOperationalLimits(OperationalLimits.builder().setMaxOutputKeyBytes(15).build()) + .build()); worker.start(); // This large key will cause the ExceptionCatchingFn to throw an exception, which will then @@ -1316,7 +1318,11 @@ public void testOutputValueTooLargeException() throws Exception { StreamingDataflowWorker worker = makeWorker( - defaultWorkerParams().setInstructions(instructions).setMaxOutputValueBytes(15).build()); + defaultWorkerParams() + .setInstructions(instructions) + .setOperationalLimits( + OperationalLimits.builder().setMaxOutputValueBytes(15).build()) + .build()); worker.start(); // The first time processing will have value "data1_a_bunch_more_data_output", which is above @@ -4506,9 +4512,7 @@ private static StreamingDataflowWorkerTestParams.Builder builder() { .setLocalRetryTimeoutMs(-1) .setPublishCounters(false) .setClock(Instant::now) - .setMaxWorkItemCommitBytes(Long.MAX_VALUE) - .setMaxOutputKeyBytes(Long.MAX_VALUE) - .setMaxOutputValueBytes(Long.MAX_VALUE); + .setOperationalLimits(OperationalLimits.builder().build()); } abstract ImmutableMap stateNameMappings(); @@ -4525,11 +4529,7 @@ private static StreamingDataflowWorkerTestParams.Builder builder() { abstract int localRetryTimeoutMs(); - abstract long maxWorkItemCommitBytes(); - - abstract long maxOutputKeyBytes(); - - abstract long maxOutputValueBytes(); + abstract OperationalLimits operationalLimits(); @AutoValue.Builder abstract static class Builder { @@ -4563,11 +4563,7 @@ final Builder publishCounters() { abstract Builder setLocalRetryTimeoutMs(int value); - abstract Builder setMaxWorkItemCommitBytes(long maxWorkItemCommitBytes); - - abstract Builder setMaxOutputKeyBytes(long maxOutputKeyBytes); - - abstract Builder setMaxOutputValueBytes(long maxOutputValueBytes); + abstract Builder setOperationalLimits(OperationalLimits operationalLimits); abstract StreamingDataflowWorkerTestParams build(); } From db62cf9c16a4e742bf0150d2b0dd28558dbea34d Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Mon, 29 Jul 2024 22:27:51 +0000 Subject: [PATCH 10/11] Moves shouldThrowExceptionsOnLargeOutput into OperationalLimits. Now the WindmillSink can decide to throw an exception or just log a warning if the limits are exceeded. --- .../dataflow/worker/OperationalLimits.java | 14 +++++++++-- .../worker/StreamingDataflowWorker.java | 6 ++++- .../worker/StreamingModeExecutionContext.java | 4 ++++ .../runners/dataflow/worker/WindmillSink.java | 23 +++++++++++++++++-- ...reamingEngineComputationConfigFetcher.java | 13 +++-------- .../worker/StreamingDataflowWorkerTest.java | 11 +++++++-- 6 files changed, 54 insertions(+), 17 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java index 47e36e498507..e9ee8f39cba4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java @@ -27,11 +27,18 @@ public class OperationalLimits { public final long maxOutputKeyBytes; // Maximum size of a single output element's serialized value. public final long maxOutputValueBytes; + // Whether to throw an exception when processing output that violates any of the given limits. + public final boolean throwExceptionOnLargeOutput; - OperationalLimits(long maxWorkItemCommitBytes, long maxOutputKeyBytes, long maxOutputValueBytes) { + OperationalLimits( + long maxWorkItemCommitBytes, + long maxOutputKeyBytes, + long maxOutputValueBytes, + boolean throwExceptionOnLargeOutput) { this.maxWorkItemCommitBytes = maxWorkItemCommitBytes; this.maxOutputKeyBytes = maxOutputKeyBytes; this.maxOutputValueBytes = maxOutputValueBytes; + this.throwExceptionOnLargeOutput = throwExceptionOnLargeOutput; } @AutoBuilder(ofClass = OperationalLimits.class) @@ -42,6 +49,8 @@ public interface Builder { Builder setMaxOutputValueBytes(long bytes); + Builder setThrowExceptionOnLargeOutput(boolean shouldThrow); + OperationalLimits build(); } @@ -49,6 +58,7 @@ public static Builder builder() { return new AutoBuilder_OperationalLimits_Builder() .setMaxWorkItemCommitBytes(Long.MAX_VALUE) .setMaxOutputKeyBytes(Long.MAX_VALUE) - .setMaxOutputValueBytes(Long.MAX_VALUE); + .setMaxOutputValueBytes(Long.MAX_VALUE) + .setThrowExceptionOnLargeOutput(false); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 511f63e69cb8..d7c0662ae804 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -424,10 +424,10 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o StreamingEngineComputationConfigFetcher.create( options.getGlobalConfigRefreshPeriod().getMillis(), dataflowServiceClient, - DataflowRunner.hasExperiment(options, "throw_exceptions_on_large_output"), config -> onPipelineConfig( config, + options, dispatcherClient::consumeWindmillDispatcherEndpoints, operationalLimits::set)); computationStateCache = computationStateCacheFactory.apply(configFetcher); @@ -490,6 +490,7 @@ static StreamingDataflowWorker forTesting( config -> onPipelineConfig( config, + options, windmillServer::setWindmillServiceEndpoints, operationalLimits::set)) : new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig); @@ -572,6 +573,7 @@ static StreamingDataflowWorker forTesting( private static void onPipelineConfig( StreamingEnginePipelineConfig config, + DataflowWorkerHarnessOptions options, Consumer> consumeWindmillServiceEndpoints, Consumer operationalLimits) { @@ -580,6 +582,8 @@ private static void onPipelineConfig( .setMaxWorkItemCommitBytes(config.maxWorkItemCommitBytes()) .setMaxOutputKeyBytes(config.maxOutputKeyBytes()) .setMaxOutputValueBytes(config.maxOutputValueBytes()) + .setThrowExceptionOnLargeOutput( + DataflowRunner.hasExperiment(options, "throw_exceptions_on_large_output")) .build()); if (!config.windmillServiceEndpoints().isEmpty()) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index a29c0e2913db..a594dbb1e0f7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -180,6 +180,10 @@ public long getMaxOutputValueBytes() { return operationalLimits.maxOutputValueBytes; } + public boolean throwExceptionsForLargeOutput() { + return operationalLimits.throwExceptionOnLargeOutput; + } + public boolean workIsFailed() { return Optional.ofNullable(work).map(Work::isFailed).orElse(false); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index 606ae65bb8df..78d0c6b4550a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -44,6 +44,8 @@ import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @SuppressWarnings({ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) @@ -54,6 +56,7 @@ class WindmillSink extends Sink> { private final Coder valueCoder; private final Coder> windowsCoder; private StreamingModeExecutionContext context; + private static final Logger LOG = LoggerFactory.getLogger(WindmillSink.class); WindmillSink( String destinationName, @@ -173,10 +176,26 @@ public long add(WindowedValue data) throws IOException { value = encode(valueCoder, data.getValue()); } if (key.size() > context.getMaxOutputKeyBytes()) { - throw new OutputTooLargeException("Key too large: " + key.size()); + if (context.throwExceptionsForLargeOutput()) { + throw new OutputTooLargeException("Key too large: " + key.size()); + } else { + LOG.error( + "Trying to output too large key with size " + + key.size() + + ". Limit is " + + context.getMaxOutputKeyBytes()); + } } if (value.size() > context.getMaxOutputValueBytes()) { - throw new OutputTooLargeException("Value too large: " + value.size()); + if (context.throwExceptionsForLargeOutput()) { + throw new OutputTooLargeException("Value too large: " + value.size()); + } else { + LOG.error( + "Trying to output too large value with size " + + value.size() + + ". Limit is " + + context.getMaxOutputValueBytes()); + } } Windmill.KeyedMessageBundle.Builder keyedOutput = productionMap.get(key); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java index 402a6e6fde6d..850e8c3f24bd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java @@ -72,7 +72,6 @@ public final class StreamingEngineComputationConfigFetcher implements Computatio private final long globalConfigRefreshPeriodMillis; private final WorkUnitClient dataflowServiceClient; private final ScheduledExecutorService globalConfigRefresher; - private final boolean shouldThrowExceptionsOnLargeOutput; private final Consumer onStreamingConfig; private final AtomicBoolean hasReceivedGlobalConfig; @@ -81,12 +80,10 @@ private StreamingEngineComputationConfigFetcher( long globalConfigRefreshPeriodMillis, WorkUnitClient dataflowServiceClient, ScheduledExecutorService globalConfigRefresher, - boolean shouldThrowExceptionsOnLargeOutput, Consumer onStreamingConfig) { this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis; this.dataflowServiceClient = dataflowServiceClient; this.globalConfigRefresher = globalConfigRefresher; - this.shouldThrowExceptionsOnLargeOutput = shouldThrowExceptionsOnLargeOutput; this.onStreamingConfig = onStreamingConfig; this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig); } @@ -94,7 +91,6 @@ private StreamingEngineComputationConfigFetcher( public static StreamingEngineComputationConfigFetcher create( long globalConfigRefreshPeriodMillis, WorkUnitClient dataflowServiceClient, - boolean shouldThrowExceptionsOnLargeOutput, Consumer onStreamingConfig) { return new StreamingEngineComputationConfigFetcher( /* hasReceivedGlobalConfig= */ false, @@ -102,7 +98,6 @@ public static StreamingEngineComputationConfigFetcher create( dataflowServiceClient, Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME).build()), - shouldThrowExceptionsOnLargeOutput, onStreamingConfig); } @@ -118,7 +113,6 @@ public static StreamingEngineComputationConfigFetcher forTesting( globalConfigRefreshPeriodMillis, dataflowServiceClient, executorSupplier.apply(CONFIG_REFRESHER_THREAD_NAME), - /*shouldThrowExceptionsOnLargeOutput=*/ true, onStreamingConfig); } @@ -163,8 +157,7 @@ private static Optional fetchConfigWithRetry( } } - private StreamingEnginePipelineConfig createPipelineConfig( - StreamingConfigTask config, boolean shouldThrowExceptionsOnLargeOutput) { + private StreamingEnginePipelineConfig createPipelineConfig(StreamingConfigTask config) { StreamingEnginePipelineConfig.Builder pipelineConfig = StreamingEnginePipelineConfig.builder(); if (config.getUserStepToStateFamilyNameMap() != null) { pipelineConfig.setUserStepToStateFamilyNameMap(config.getUserStepToStateFamilyNameMap()); @@ -194,7 +187,7 @@ private StreamingEnginePipelineConfig createPipelineConfig( pipelineConfig.setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue()); } - if (shouldThrowExceptionsOnLargeOutput && config.getOperationalLimits() != null) { + if (config.getOperationalLimits() != null) { if (config.getOperationalLimits().getMaxKeyBytes() > 0 && config.getOperationalLimits().getMaxKeyBytes() <= Integer.MAX_VALUE) { pipelineConfig.setMaxOutputKeyBytes(config.getOperationalLimits().getMaxKeyBytes()); @@ -292,7 +285,7 @@ private synchronized void fetchInitialPipelineGlobalConfig() { private Optional fetchGlobalConfig() { return fetchConfigWithRetry(dataflowServiceClient::getGlobalStreamingConfigWorkItem) - .map(config -> createPipelineConfig(config, shouldThrowExceptionsOnLargeOutput)); + .map(config -> createPipelineConfig(config)); } @FunctionalInterface diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 05e116582d9b..eb0cb1df6305 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -1287,7 +1287,11 @@ public void testOutputKeyTooLargeException() throws Exception { makeWorker( defaultWorkerParams() .setInstructions(instructions) - .setOperationalLimits(OperationalLimits.builder().setMaxOutputKeyBytes(15).build()) + .setOperationalLimits( + OperationalLimits.builder() + .setMaxOutputKeyBytes(15) + .setThrowExceptionOnLargeOutput(true) + .build()) .build()); worker.start(); @@ -1321,7 +1325,10 @@ public void testOutputValueTooLargeException() throws Exception { defaultWorkerParams() .setInstructions(instructions) .setOperationalLimits( - OperationalLimits.builder().setMaxOutputValueBytes(15).build()) + OperationalLimits.builder() + .setMaxOutputValueBytes(15) + .setThrowExceptionOnLargeOutput(true) + .build()) .build()); worker.start(); From 0456fdabfd7565500cc0aaae9e5d3a85880a1203 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Thu, 1 Aug 2024 19:55:48 +0000 Subject: [PATCH 11/11] Adds null checks before trying to access OperationalLimit fields. --- .../config/StreamingEngineComputationConfigFetcher.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java index 850e8c3f24bd..d230aac54c63 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java @@ -188,11 +188,13 @@ private StreamingEnginePipelineConfig createPipelineConfig(StreamingConfigTask c } if (config.getOperationalLimits() != null) { - if (config.getOperationalLimits().getMaxKeyBytes() > 0 + if (config.getOperationalLimits().getMaxKeyBytes() != null + && config.getOperationalLimits().getMaxKeyBytes() > 0 && config.getOperationalLimits().getMaxKeyBytes() <= Integer.MAX_VALUE) { pipelineConfig.setMaxOutputKeyBytes(config.getOperationalLimits().getMaxKeyBytes()); } - if (config.getOperationalLimits().getMaxProductionOutputBytes() > 0 + if (config.getOperationalLimits().getMaxProductionOutputBytes() != null + && config.getOperationalLimits().getMaxProductionOutputBytes() > 0 && config.getOperationalLimits().getMaxProductionOutputBytes() <= Integer.MAX_VALUE) { pipelineConfig.setMaxOutputValueBytes( config.getOperationalLimits().getMaxProductionOutputBytes());