diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 59dc736693d0..95cadef7afdb 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.direct; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.Map; @@ -30,11 +31,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.local.ExecutionDriver; import org.apache.beam.runners.local.ExecutionDriver.DriverState; import org.apache.beam.runners.local.PipelineMessageReceiver; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.PCollection; @@ -155,13 +158,21 @@ public void start(DirectGraph graph, RootProviderRegistry rootProviderRegistry) ImmutableMap.Builder, Queue>> pendingRootBundles = ImmutableMap.builder(); for (AppliedPTransform root : graph.getRootTransforms()) { + MetricsContainerImpl metricsContainer = new MetricsContainerImpl(root.getFullName()); Queue> pending = Queues.newArrayDeque(); - try { + try (Closeable metricsScope = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) { Collection> initialInputs = rootProviderRegistry.getInitialInputs(root, numTargetSplits); pending.addAll(initialInputs); } catch (Exception e) { throw UserCodeException.wrap(e); + } finally { + // Metrics emitted initial split are reported along with the first bundle + if (pending.peek() != null) { + evaluationContext + .getMetrics() + .commitPhysical(pending.peek(), metricsContainer.getCumulative()); + } } pendingRootBundles.put(root, pending); } diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index da3dbe08b503..c8f492a901d3 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -335,6 +335,8 @@ def createValidatesRunnerTask(Map m) { // Extremely flaky: https://github.com/apache/beam/issues/19814 excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful' excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful' + // TODO(https://github.com/apache/beam/issues/29972) due to runtimeContext initialized after initial split + excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testBoundedSourceMetricsInSplit' } } } diff --git a/runners/spark/spark_runner.gradle b/runners/spark/spark_runner.gradle index 8b6f1117019a..f4e6bf740189 100644 --- a/runners/spark/spark_runner.gradle +++ b/runners/spark/spark_runner.gradle @@ -290,6 +290,8 @@ def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) { // TODO(https://github.com/apache/beam/issues/31231 it.filter { excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata' + // TODO(https://github.com/apache/beam/issues/32021) + excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testBoundedSourceMetricsInSplit' } } @@ -329,6 +331,8 @@ def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test) excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata' // TODO(https://github.com/apache/beam/issues/31231 excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata' + // TODO(https://github.com/apache/beam/issues/32021) + excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testBoundedSourceMetricsInSplit' } // TestStream using processing time is not supported in Spark @@ -428,6 +432,8 @@ tasks.register("validatesStructuredStreamingRunnerBatch", Test) { excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testLifecycleMethodsBounded' // https://github.com/apache/beam/issues/29972 excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$CombineWithContextTests.testHotKeyCombineWithSideInputs' + // TODO(https://github.com/apache/beam/issues/32021) + excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testBoundedSourceMetricsInSplit' } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java index c75d872dc38e..9299ae81fa46 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.metrics; import java.util.Objects; +import org.checkerframework.checker.nullness.qual.Nullable; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; @@ -211,9 +212,9 @@ private static class MatchNameAndKey extends TypeSafeMatcher> private final String namespace; private final String name; - private final String step; + private final @Nullable String step; - MatchNameAndKey(String namespace, String name, String step) { + MatchNameAndKey(String namespace, String name, @Nullable String step) { this.namespace = namespace; this.name = name; this.step = step; @@ -221,7 +222,11 @@ private static class MatchNameAndKey extends TypeSafeMatcher> @Override protected boolean matchesSafely(MetricResult item) { - return MetricFiltering.matches(MetricsFilter.builder().addStep(step).build(), item.getKey()) + MetricsFilter.Builder builder = MetricsFilter.builder(); + if (step != null) { + builder = builder.addStep(step); + } + return MetricFiltering.matches(builder.build(), item.getKey()) && Objects.equals(MetricName.named(namespace, name), item.getName()); } @@ -231,9 +236,10 @@ public void describeTo(Description description) { .appendText("MetricResult{inNamespace=") .appendValue(namespace) .appendText(", name=") - .appendValue(name) - .appendText(", step=") - .appendValue(step); + .appendValue(name); + if (step != null) { + description.appendText(", step=").appendValue(step); + } if (this.getClass() == MatchNameAndKey.class) { description.appendText("}"); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index 5d9a68e0d86c..750d43a4f9ae 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult; import static org.apache.beam.sdk.metrics.MetricResultsMatchers.distributionMinMax; import static org.apache.beam.sdk.metrics.MetricResultsMatchers.metricsResult; +import static org.apache.beam.sdk.testing.SerializableMatchers.greaterThan; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.hasItem; @@ -27,9 +28,17 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; import java.io.Serializable; +import java.util.List; +import java.util.NoSuchElementException; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.UsesAttemptedMetrics; @@ -44,6 +53,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.hamcrest.Matcher; import org.joda.time.Duration; @@ -416,6 +426,121 @@ public void testAttemptedStringSetMetrics() { MetricQueryResults metrics = queryTestMetrics(result); assertStringSetMetrics(metrics, false); } + + @Test + @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class}) + public void testBoundedSourceMetricsInSplit() { + pipeline.apply(Read.from(new CountingSourceWithMetrics(0, 10))); + PipelineResult pipelineResult = pipeline.run(); + MetricQueryResults metrics = + pipelineResult + .metrics() + .queryMetrics( + MetricsFilter.builder() + .addNameFilter( + MetricNameFilter.named( + CountingSourceWithMetrics.class, + CountingSourceWithMetrics.SPLIT_NAME)) + .addNameFilter( + MetricNameFilter.named( + CountingSourceWithMetrics.class, + CountingSourceWithMetrics.ADVANCE_NAME)) + .build()); + assertThat( + metrics.getCounters(), + hasItem( + attemptedMetricsResult( + CountingSourceWithMetrics.class.getName(), + CountingSourceWithMetrics.ADVANCE_NAME, + null, // step name varies depending on the runner + 10L))); + assertThat( + metrics.getCounters(), + hasItem( + metricsResult( + CountingSourceWithMetrics.class.getName(), + CountingSourceWithMetrics.SPLIT_NAME, + null, // step name varies depending on the runner + greaterThan(0L), + false))); + } + } + + public static class CountingSourceWithMetrics extends BoundedSource { + public static final String SPLIT_NAME = "num-split"; + public static final String ADVANCE_NAME = "num-advance"; + private static Counter splitCounter = + Metrics.counter(CountingSourceWithMetrics.class, SPLIT_NAME); + private static Counter advanceCounter = + Metrics.counter(CountingSourceWithMetrics.class, ADVANCE_NAME); + private final int start; + private final int end; + + @Override + public List> split( + long desiredBundleSizeBytes, PipelineOptions options) { + splitCounter.inc(); + // simply split the current source into two + if (end - start >= 2) { + int mid = (start + end + 1) / 2; + return ImmutableList.of( + new CountingSourceWithMetrics(start, mid), new CountingSourceWithMetrics(mid, end)); + } + return null; + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) { + return 0; + } + + @Override + public BoundedReader createReader(PipelineOptions options) { + return new CountingReader(); + } + + public CountingSourceWithMetrics(int start, int end) { + this.start = start; + this.end = end; + } + + @Override + public Coder getOutputCoder() { + return VarIntCoder.of(); + } + + public class CountingReader extends BoundedSource.BoundedReader { + private int current; + + @Override + public boolean start() throws IOException { + return current < end; + } + + @Override + public boolean advance() { + ++current; + advanceCounter.inc(); + return current < end; + } + + @Override + public Integer getCurrent() throws NoSuchElementException { + return current; + } + + @Override + public void close() {} + + @Override + public BoundedSource getCurrentSource() { + return null; + } + + public CountingReader() { + current = start; + } + } } private static Matcher> metricsResultPatchStep(