Skip to content

Commit

Permalink
Support metrics at Source.split for Direct Runner (#32022)
Browse files Browse the repository at this point in the history
* Added a validate runner test for this scenario
  • Loading branch information
Abacn authored Aug 1, 2024
1 parent 6d3b547 commit f1c72c5
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.direct;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
Expand All @@ -30,11 +31,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.local.ExecutionDriver;
import org.apache.beam.runners.local.ExecutionDriver.DriverState;
import org.apache.beam.runners.local.PipelineMessageReceiver;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -155,13 +158,21 @@ public void start(DirectGraph graph, RootProviderRegistry rootProviderRegistry)
ImmutableMap.Builder<AppliedPTransform<?, ?, ?>, Queue<CommittedBundle<?>>> pendingRootBundles =
ImmutableMap.builder();
for (AppliedPTransform<?, ?, ?> root : graph.getRootTransforms()) {
MetricsContainerImpl metricsContainer = new MetricsContainerImpl(root.getFullName());
Queue<CommittedBundle<?>> pending = Queues.newArrayDeque();
try {
try (Closeable metricsScope = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
Collection<CommittedBundle<?>> initialInputs =
rootProviderRegistry.getInitialInputs(root, numTargetSplits);
pending.addAll(initialInputs);
} catch (Exception e) {
throw UserCodeException.wrap(e);
} finally {
// Metrics emitted initial split are reported along with the first bundle
if (pending.peek() != null) {
evaluationContext
.getMetrics()
.commitPhysical(pending.peek(), metricsContainer.getCumulative());
}
}
pendingRootBundles.put(root, pending);
}
Expand Down
2 changes: 2 additions & 0 deletions runners/flink/flink_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ def createValidatesRunnerTask(Map m) {
// Extremely flaky: https://github.com/apache/beam/issues/19814
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful'
// TODO(https://github.com/apache/beam/issues/29972) due to runtimeContext initialized after initial split
excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testBoundedSourceMetricsInSplit'
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions runners/spark/spark_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) {
// TODO(https://github.com/apache/beam/issues/31231
it.filter {
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata'
// TODO(https://github.com/apache/beam/issues/32021)
excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testBoundedSourceMetricsInSplit'
}
}

Expand Down Expand Up @@ -329,6 +331,8 @@ def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test)
excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
// TODO(https://github.com/apache/beam/issues/31231
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata'
// TODO(https://github.com/apache/beam/issues/32021)
excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testBoundedSourceMetricsInSplit'
}

// TestStream using processing time is not supported in Spark
Expand Down Expand Up @@ -428,6 +432,8 @@ tasks.register("validatesStructuredStreamingRunnerBatch", Test) {
excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testLifecycleMethodsBounded'
// https://github.com/apache/beam/issues/29972
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$CombineWithContextTests.testHotKeyCombineWithSideInputs'
// TODO(https://github.com/apache/beam/issues/32021)
excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testBoundedSourceMetricsInSplit'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.metrics;

import java.util.Objects;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
Expand Down Expand Up @@ -211,17 +212,21 @@ private static class MatchNameAndKey<T> extends TypeSafeMatcher<MetricResult<T>>

private final String namespace;
private final String name;
private final String step;
private final @Nullable String step;

MatchNameAndKey(String namespace, String name, String step) {
MatchNameAndKey(String namespace, String name, @Nullable String step) {
this.namespace = namespace;
this.name = name;
this.step = step;
}

@Override
protected boolean matchesSafely(MetricResult<T> item) {
return MetricFiltering.matches(MetricsFilter.builder().addStep(step).build(), item.getKey())
MetricsFilter.Builder builder = MetricsFilter.builder();
if (step != null) {
builder = builder.addStep(step);
}
return MetricFiltering.matches(builder.build(), item.getKey())
&& Objects.equals(MetricName.named(namespace, name), item.getName());
}

Expand All @@ -231,9 +236,10 @@ public void describeTo(Description description) {
.appendText("MetricResult{inNamespace=")
.appendValue(namespace)
.appendText(", name=")
.appendValue(name)
.appendText(", step=")
.appendValue(step);
.appendValue(name);
if (step != null) {
description.appendText(", step=").appendValue(step);
}
if (this.getClass() == MatchNameAndKey.class) {
description.appendText("}");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,25 @@
import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult;
import static org.apache.beam.sdk.metrics.MetricResultsMatchers.distributionMinMax;
import static org.apache.beam.sdk.metrics.MetricResultsMatchers.metricsResult;
import static org.apache.beam.sdk.testing.SerializableMatchers.greaterThan;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesAttemptedMetrics;
Expand All @@ -44,6 +53,7 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.hamcrest.Matcher;
import org.joda.time.Duration;
Expand Down Expand Up @@ -416,6 +426,121 @@ public void testAttemptedStringSetMetrics() {
MetricQueryResults metrics = queryTestMetrics(result);
assertStringSetMetrics(metrics, false);
}

@Test
@Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class})
public void testBoundedSourceMetricsInSplit() {
pipeline.apply(Read.from(new CountingSourceWithMetrics(0, 10)));
PipelineResult pipelineResult = pipeline.run();
MetricQueryResults metrics =
pipelineResult
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(
MetricNameFilter.named(
CountingSourceWithMetrics.class,
CountingSourceWithMetrics.SPLIT_NAME))
.addNameFilter(
MetricNameFilter.named(
CountingSourceWithMetrics.class,
CountingSourceWithMetrics.ADVANCE_NAME))
.build());
assertThat(
metrics.getCounters(),
hasItem(
attemptedMetricsResult(
CountingSourceWithMetrics.class.getName(),
CountingSourceWithMetrics.ADVANCE_NAME,
null, // step name varies depending on the runner
10L)));
assertThat(
metrics.getCounters(),
hasItem(
metricsResult(
CountingSourceWithMetrics.class.getName(),
CountingSourceWithMetrics.SPLIT_NAME,
null, // step name varies depending on the runner
greaterThan(0L),
false)));
}
}

public static class CountingSourceWithMetrics extends BoundedSource<Integer> {
public static final String SPLIT_NAME = "num-split";
public static final String ADVANCE_NAME = "num-advance";
private static Counter splitCounter =
Metrics.counter(CountingSourceWithMetrics.class, SPLIT_NAME);
private static Counter advanceCounter =
Metrics.counter(CountingSourceWithMetrics.class, ADVANCE_NAME);
private final int start;
private final int end;

@Override
public List<? extends BoundedSource<Integer>> split(
long desiredBundleSizeBytes, PipelineOptions options) {
splitCounter.inc();
// simply split the current source into two
if (end - start >= 2) {
int mid = (start + end + 1) / 2;
return ImmutableList.of(
new CountingSourceWithMetrics(start, mid), new CountingSourceWithMetrics(mid, end));
}
return null;
}

@Override
public long getEstimatedSizeBytes(PipelineOptions options) {
return 0;
}

@Override
public BoundedReader<Integer> createReader(PipelineOptions options) {
return new CountingReader();
}

public CountingSourceWithMetrics(int start, int end) {
this.start = start;
this.end = end;
}

@Override
public Coder<Integer> getOutputCoder() {
return VarIntCoder.of();
}

public class CountingReader extends BoundedSource.BoundedReader<Integer> {
private int current;

@Override
public boolean start() throws IOException {
return current < end;
}

@Override
public boolean advance() {
++current;
advanceCounter.inc();
return current < end;
}

@Override
public Integer getCurrent() throws NoSuchElementException {
return current;
}

@Override
public void close() {}

@Override
public BoundedSource<Integer> getCurrentSource() {
return null;
}

public CountingReader() {
current = start;
}
}
}

private static <T> Matcher<MetricResult<T>> metricsResultPatchStep(
Expand Down

0 comments on commit f1c72c5

Please sign in to comment.