diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto index 4ec189e4637f..e2eef855ee69 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto @@ -367,6 +367,17 @@ message MonitoringInfoSpecs { } ] }]; + + + USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = { + urn: "beam:metric:user:per_worker_histogram_int64:v1", + type: "beam:metrics:per_worker_histogram_int64:v1", + required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"], + annotations: [{ + key: "description", + value: "URN utilized to report user metric." + }] + }]; } } @@ -576,6 +587,10 @@ message MonitoringInfoTypeUrns { SET_STRING_TYPE = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:metrics:set_string:v1"]; + PER_WORKER_HISTOGRAM = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:metrics:per_worker_histogram_int64:v1"]; + + // General monitored state information which contains structured information // which does not fit into a typical metric format. See MonitoringTableData // for more details. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java index ea8a333d397b..f45dd154eb9e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.metrics.StringSetResult; +import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.checkerframework.checker.nullness.qual.Nullable; @@ -42,16 +43,19 @@ public class DefaultMetricResults extends MetricResults { private final Iterable> distributions; private final Iterable> gauges; private final Iterable> stringSets; + private final Iterable> perWorkerHistograms; public DefaultMetricResults( Iterable> counters, Iterable> distributions, Iterable> gauges, - Iterable> stringSets) { + Iterable> stringSets, + Iterable> perWorkerHistograms) { this.counters = counters; this.distributions = distributions; this.gauges = gauges; this.stringSets = stringSets; + this.perWorkerHistograms = perWorkerHistograms; } @Override @@ -62,6 +66,9 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) { distributions, distribution -> MetricFiltering.matches(filter, distribution.getKey())), Iterables.filter(gauges, gauge -> MetricFiltering.matches(filter, gauge.getKey())), Iterables.filter( - stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey()))); + stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())), + Iterables.filter( + perWorkerHistograms, + perWorkerHistogram -> MetricFiltering.matches(filter, perWorkerHistogram.getKey()))); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java index 2a594401754c..63a9633997a6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java @@ -70,6 +70,12 @@ public void update(HistogramCell other) { dirty.afterModification(); } + @Override + public void update(HistogramData data) { + this.value.update(data); + dirty.afterModification(); + } + // TODO(https://github.com/apache/beam/issues/20853): Update this function to allow incrementing // the infinite buckets as well. // and remove the incTopBucketCount and incBotBucketCount methods. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java index ada5bda4df4a..7e876810c63a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.Collections; import org.apache.beam.sdk.metrics.MetricKey; +import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; /** Representation of multiple metric updates. */ @@ -34,6 +35,7 @@ public abstract class MetricUpdates { Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), + Collections.emptyList(), Collections.emptyList()); /** @@ -66,14 +68,22 @@ public static MetricUpdate create(MetricKey key, T update) { /** All the sets updates. */ public abstract Iterable> stringSetUpdates(); + /** All the histogram updates. */ + public abstract Iterable> perWorkerHistogramsUpdates(); + /** Create a new {@link MetricUpdates} bundle. */ public static MetricUpdates create( Iterable> counterUpdates, Iterable> distributionUpdates, Iterable> gaugeUpdates, - Iterable> stringSetUpdates) { + Iterable> stringSetUpdates, + Iterable> perWorkerHistogramsUpdates) { return new AutoValue_MetricUpdates( - counterUpdates, distributionUpdates, gaugeUpdates, stringSetUpdates); + counterUpdates, + distributionUpdates, + gaugeUpdates, + stringSetUpdates, + perWorkerHistogramsUpdates); } /** Returns true if there are no updates in this MetricUpdates object. */ @@ -81,6 +91,7 @@ public boolean isEmpty() { return Iterables.isEmpty(counterUpdates()) && Iterables.isEmpty(distributionUpdates()) && Iterables.isEmpty(gaugeUpdates()) - && Iterables.isEmpty(stringSetUpdates()); + && Iterables.isEmpty(stringSetUpdates()) + && Iterables.isEmpty(perWorkerHistogramsUpdates()); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java index 99cf98508505..3faf36219c9c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java @@ -19,15 +19,18 @@ import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE; +import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge; +import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Histogram; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge; +import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Histogram; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; @@ -44,6 +47,7 @@ import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo; import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate; import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.Metric; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; @@ -90,6 +94,9 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer { private MetricsMap stringSets = new MetricsMap<>(StringSetCell::new); + private MetricsMap, HistogramCell> perWorkerHistograms = + new MetricsMap<>(HistogramCell::new); + private MetricsMap, HistogramCell> histograms = new MetricsMap<>(HistogramCell::new); @@ -216,6 +223,22 @@ public StringSetCell getStringSet(MetricName metricName) { return stringSets.tryGet(metricName); } + /** + * Return the {@link Histogram} that should be used for implementing the given per-worker {@code + * metricName} in this container. + */ + @Override + public HistogramCell getPerWorkerHistogram( + MetricName metricName, HistogramData.BucketType bucketType) { + HistogramCell val = perWorkerHistograms.get(KV.of(metricName, bucketType)); + return val; + } + + public MetricsMap, HistogramCell> + getPerWorkerHistogram() { + return perWorkerHistograms; + } + private > ImmutableList> extractUpdates(MetricsMap cells) { ImmutableList.Builder> updates = ImmutableList.builder(); @@ -229,6 +252,22 @@ ImmutableList> extractUpdates(MetricsMap> + ImmutableList> extractHistogramUpdates( + MetricsMap, CellT> cells) { + ImmutableList.Builder> updates = ImmutableList.builder(); + cells.forEach( + (key, value) -> { + if (value.getDirty().beforeCommit()) { + updates.add( + MetricUpdate.create( + MetricKey.create(stepName, key.getKey()), value.getCumulative())); + } + }); + return updates.build(); + } + /** * Return the cumulative values for any metrics that have changed since the last time updates were * committed. @@ -238,7 +277,8 @@ public MetricUpdates getUpdates() { extractUpdates(counters), extractUpdates(distributions), extractUpdates(gauges), - extractUpdates(stringSets)); + extractUpdates(stringSets), + extractHistogramUpdates(perWorkerHistograms)); } /** @return The MonitoringInfo metadata from the metric. */ @@ -271,6 +311,20 @@ public MetricUpdates getUpdates() { return builder; } + /** + * @param metricUpdate + * @return The MonitoringInfo generated from the histogram metricUpdate. + */ + private @Nullable MonitoringInfo histogramUpdateToMonitoringInfo( + MetricUpdate metricUpdate) { + SimpleMonitoringInfoBuilder builder = histogramToMonitoringMetadata(metricUpdate.getKey()); + if (builder == null) { + return null; + } + builder.setInt64HistogramValue(metricUpdate.getUpdate()); + return builder.build(); + } + /** @return The MonitoringInfo metadata from the counter metric. */ private @Nullable SimpleMonitoringInfoBuilder counterToMonitoringMetadata(MetricKey metricKey) { return metricToMonitoringMetadata( @@ -342,6 +396,14 @@ public MetricUpdates getUpdates() { MonitoringInfoConstants.Urns.USER_SET_STRING); } + /** @return The MonitoringInfo metadata from the histogram metric. */ + private @Nullable SimpleMonitoringInfoBuilder histogramToMonitoringMetadata(MetricKey metricKey) { + return metricToMonitoringMetadata( + metricKey, + MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE, + MonitoringInfoConstants.Urns.USER_PER_WORKER_HISTOGRAM); + } + /** * @param metricUpdate * @return The MonitoringInfo generated from the string set metricUpdate. @@ -390,6 +452,14 @@ public Iterable getMonitoringInfos() { monitoringInfos.add(mi); } } + + for (MetricUpdate metricUpdate : metricUpdates.perWorkerHistogramsUpdates()) { + MonitoringInfo mi = histogramUpdateToMonitoringInfo(metricUpdate); + if (mi != null) { + monitoringInfos.add(mi); + } + } + return monitoringInfos; } @@ -432,6 +502,16 @@ public Map getMonitoringData(ShortIdMap shortIds) { } } }); + perWorkerHistograms.forEach( + (metricName, histogramCell) -> { + if (histogramCell.getDirty().beforeCommit()) { + String shortId = + getShortId(metricName.getKey(), this::histogramToMonitoringMetadata, shortIds); + if (shortId != null) { + builder.put(shortId, encodeInt64Histogram(histogramCell.getCumulative())); + } + } + }); return builder.build(); } @@ -467,6 +547,10 @@ public void commitUpdates() { distributions.forEachValue(distribution -> distribution.getDirty().afterCommit()); gauges.forEachValue(gauge -> gauge.getDirty().afterCommit()); stringSets.forEachValue(sSets -> sSets.getDirty().afterCommit()); + perWorkerHistograms.forEachValue( + histogram -> { + histogram.getDirty().afterCommit(); + }); } private > @@ -480,6 +564,18 @@ ImmutableList> extractCumulatives(MetricsMap> + ImmutableList> extractHistogramCumulatives( + MetricsMap, CellT> cells) { + ImmutableList.Builder> updates = ImmutableList.builder(); + cells.forEach( + (key, value) -> { + UpdateT update = checkNotNull(value.getCumulative()); + updates.add(MetricUpdate.create(MetricKey.create(stepName, key.getKey()), update)); + }); + return updates.build(); + } + /** * Return the {@link MetricUpdates} representing the cumulative values of all metrics in this * container. @@ -489,7 +585,8 @@ public MetricUpdates getCumulative() { extractCumulatives(counters), extractCumulatives(distributions), extractCumulatives(gauges), - extractCumulatives(stringSets)); + extractCumulatives(stringSets), + extractHistogramCumulatives(perWorkerHistograms)); } /** Update values of this {@link MetricsContainerImpl} by merging the value of another cell. */ @@ -510,7 +607,6 @@ private void updateForSumInt64Type(MonitoringInfo monitoringInfo) { private void updateForDistributionInt64Type(MonitoringInfo monitoringInfo) { MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo); Distribution distribution = getDistribution(metricName); - DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload()); distribution.update(data.sum(), data.count(), data.min(), data.max()); } @@ -527,6 +623,14 @@ private void updateForStringSetType(MonitoringInfo monitoringInfo) { stringSet.update(decodeStringSet(monitoringInfo.getPayload())); } + private void updateForPerWorkerHistogramInt64(MonitoringInfo monitoringInfo) { + MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo); + HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17); + Histogram histogram = getPerWorkerHistogram(metricName, buckets); + HistogramData data = decodeInt64Histogram(monitoringInfo.getPayload()); + histogram.update(data); + } + /** Update values of this {@link MetricsContainerImpl} by reading from {@code monitoringInfos}. */ public void update(Iterable monitoringInfos) { for (MonitoringInfo monitoringInfo : monitoringInfos) { @@ -551,6 +655,9 @@ public void update(Iterable monitoringInfos) { updateForStringSetType(monitoringInfo); break; + case PER_WORKER_HISTOGRAM_TYPE: + updateForPerWorkerHistogramInt64(monitoringInfo); // use type, and not urn info + break; default: LOG.warn("Unsupported metric type {}", monitoringInfo.getType()); } @@ -593,14 +700,15 @@ public boolean equals(@Nullable Object object) { && Objects.equals(counters, metricsContainerImpl.counters) && Objects.equals(distributions, metricsContainerImpl.distributions) && Objects.equals(gauges, metricsContainerImpl.gauges) - && Objects.equals(stringSets, metricsContainerImpl.stringSets); + && Objects.equals(stringSets, metricsContainerImpl.stringSets) + && Objects.equals(perWorkerHistograms, metricsContainerImpl.perWorkerHistograms); } return false; } @Override public int hashCode() { - return Objects.hash(stepName, counters, distributions, gauges, stringSets); + return Objects.hash(stepName, counters, distributions, gauges, stringSets, perWorkerHistograms); } /** @@ -722,6 +830,22 @@ public static MetricsContainerImpl deltaContainer( deltaValueCell.incTopBucketCount( currValue.getTopBucketCount() - prevValue.getTopBucketCount()); } + for (Map.Entry, HistogramCell> cell : + curr.perWorkerHistograms.entries()) { + HistogramData.BucketType bt = cell.getKey().getValue(); + HistogramData prevValue = prev.perWorkerHistograms.get(cell.getKey()).getCumulative(); + HistogramData currValue = cell.getValue().getCumulative(); + HistogramCell deltaValueCell = deltaContainer.perWorkerHistograms.get(cell.getKey()); + deltaValueCell.incBottomBucketCount( + currValue.getBottomBucketCount() - prevValue.getBottomBucketCount()); + for (int i = 0; i < bt.getNumBuckets(); i++) { + Long bucketCountDelta = currValue.getCount(i) - prevValue.getCount(i); + deltaValueCell.incBucketCount(i, bucketCountDelta); + } + deltaValueCell.incTopBucketCount( + currValue.getTopBucketCount() - prevValue.getTopBucketCount()); + } + for (Map.Entry cell : curr.stringSets.entries()) { // Simply take the most recent value for stringSets, no need to count deltas. deltaContainer.stringSets.get(cell.getKey()).update(cell.getValue().getCumulative()); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java index 688491184e67..cb74b26ff0bd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.util.JsonFormat; @@ -137,6 +138,7 @@ public static MetricResults asMetricResults( Map> distributions = new HashMap<>(); Map> gauges = new HashMap<>(); Map> sets = new HashMap<>(); + Map> perWorkerHistograms = new HashMap<>(); attemptedMetricsContainers.forEachMetricContainer( container -> { @@ -146,6 +148,8 @@ public static MetricResults asMetricResults( distributions, cumulative.distributionUpdates(), DistributionData::combine); mergeAttemptedResults(gauges, cumulative.gaugeUpdates(), GaugeData::combine); mergeAttemptedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine); + mergeAttemptedResults( + perWorkerHistograms, cumulative.perWorkerHistogramsUpdates(), HistogramData::combine); }); committedMetricsContainers.forEachMetricContainer( container -> { @@ -155,6 +159,8 @@ public static MetricResults asMetricResults( distributions, cumulative.distributionUpdates(), DistributionData::combine); mergeCommittedResults(gauges, cumulative.gaugeUpdates(), GaugeData::combine); mergeCommittedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine); + mergeCommittedResults( + perWorkerHistograms, cumulative.perWorkerHistogramsUpdates(), HistogramData::combine); }); return new DefaultMetricResults( @@ -167,6 +173,9 @@ public static MetricResults asMetricResults( .collect(toList()), sets.values().stream() .map(result -> result.transform(StringSetData::extractResult)) + .collect(toList()), + perWorkerHistograms.values().stream() + .map(result -> result.transform(HistogramData::extractResult)) .collect(toList())); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java index 2bb935111d38..33833fcb8383 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java @@ -54,6 +54,8 @@ public static final class Urns { extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_DOUBLE); public static final String USER_SET_STRING = extractUrn(MonitoringInfoSpecs.Enum.USER_SET_STRING); + public static final String USER_PER_WORKER_HISTOGRAM = + extractUrn(MonitoringInfoSpecs.Enum.USER_PER_WORKER_HISTOGRAM); public static final String SAMPLED_BYTE_SIZE = extractUrn(MonitoringInfoSpecs.Enum.SAMPLED_BYTE_SIZE); public static final String WORK_COMPLETED = extractUrn(MonitoringInfoSpecs.Enum.WORK_COMPLETED); @@ -64,7 +66,6 @@ public static final class Urns { extractUrn(MonitoringInfoSpecs.Enum.API_REQUEST_COUNT); public static final String API_REQUEST_LATENCIES = extractUrn(MonitoringInfoSpecs.Enum.API_REQUEST_LATENCIES); - static { // Validate that compile time constants match the values stored in the protos. // Defining these as constants allows for usage in switch case statements and also @@ -165,6 +166,8 @@ public static final class TypeUrns { public static final String BOTTOM_N_DOUBLE_TYPE = "beam:metrics:bottom_n_double:v1"; public static final String PROGRESS_TYPE = "beam:metrics:progress:v1"; public static final String SET_STRING_TYPE = "beam:metrics:set_string:v1"; + public static final String PER_WORKER_HISTOGRAM_TYPE = + "beam:metrics:per_worker_histogram_int64:v1"; static { // Validate that compile time constants match the values stored in the protos. @@ -191,6 +194,9 @@ public static final class TypeUrns { BOTTOM_N_DOUBLE_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.BOTTOM_N_DOUBLE_TYPE))); checkArgument(PROGRESS_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.PROGRESS_TYPE))); checkArgument(SET_STRING_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.SET_STRING_TYPE))); + checkArgument( + PER_WORKER_HISTOGRAM_TYPE.equals( + getUrn(MonitoringInfoTypeUrns.Enum.PER_WORKER_HISTOGRAM))); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java index e0f5092e6b1f..b15ff242f12b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java @@ -23,6 +23,7 @@ import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge; +import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Histogram; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; @@ -31,6 +32,7 @@ import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo; import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpec; import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpecs; +import org.apache.beam.sdk.util.HistogramData; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -159,6 +161,16 @@ public SimpleMonitoringInfoBuilder setStringSetValue(StringSetData value) { return this; } + /** + * Encodes the value and sets the type to {@link + * MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM_TYPE}. + */ + public SimpleMonitoringInfoBuilder setInt64HistogramValue(HistogramData data) { + this.builder.setPayload(encodeInt64Histogram(data)); + this.builder.setType(MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE); + return this; + } + /** Sets the MonitoringInfo label to the given name and value. */ public SimpleMonitoringInfoBuilder setLabel(String labelName, String labelValue) { this.builder.putLabels(labelName, labelValue); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/HistogramCellTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/HistogramCellTest.java index 3f25d6810217..93d87c0bed5f 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/HistogramCellTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/HistogramCellTest.java @@ -106,4 +106,13 @@ public void testReset() { assertThat(cell.getCumulative(), equalTo(HistogramData.linear(0, 10, 100))); assertThat(cell.getDirty(), equalTo(new DirtyState())); } + + @Test + public void testUpdateWithHistogramData() { + HistogramCell cell = new HistogramCell(KV.of(MetricName.named("hello", "world"), bucketType)); + HistogramData data = HistogramData.linear(0, 10, 100); + data.record(5, 7, 9); + cell.update(data); + assertThat(cell.getCumulative(), equalTo(data)); + } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java index 5b3d71f4873e..f212b54e05f6 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java @@ -372,6 +372,7 @@ public void testDeltaCounters() { HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 2, 5); MetricName hName = MetricName.named("namespace", "histogram"); MetricName stringSetName = MetricName.named("namespace", "stringset"); + MetricName pwhName = MetricName.named("namespace", "perWorkerHistogram"); MetricsContainerImpl prevContainer = new MetricsContainerImpl(null); prevContainer.getCounter(cName).inc(2L); @@ -383,6 +384,10 @@ public void testDeltaCounters() { prevContainer.getHistogram(hName, bucketType).update(3); prevContainer.getHistogram(hName, bucketType).update(20); + // Set PerWorkerBucketCounts to [0,1,1,0,0,0,0] + prevContainer.getPerWorkerHistogram(pwhName, bucketType).update(1); + prevContainer.getPerWorkerHistogram(pwhName, bucketType).update(3); + MetricsContainerImpl nextContainer = new MetricsContainerImpl(null); nextContainer.getCounter(cName).inc(9L); nextContainer.getGauge(gName).set(8L); @@ -401,6 +406,10 @@ public void testDeltaCounters() { nextContainer.getHistogram(hName, bucketType).update(20); nextContainer.getHistogram(hName, bucketType).update(20); + // Set PerWorkerBucketCounts to [1,0,0,0,0,0,1] + nextContainer.getPerWorkerHistogram(pwhName, bucketType).update(-1); + nextContainer.getPerWorkerHistogram(pwhName, bucketType).update(20); + MetricsContainerImpl deltaContainer = MetricsContainerImpl.deltaContainer(prevContainer, nextContainer); // Expect counter value: 7 = 9 - 2 @@ -426,6 +435,20 @@ public void testDeltaCounters() { } assertEquals( 2, deltaContainer.getHistogram(hName, bucketType).getCumulative().getTopBucketCount()); + + // Expect per worker bucket counts: [1,0,0,0,0,0,1] + assertEquals( + 1, + deltaContainer + .getPerWorkerHistogram(pwhName, bucketType) + .getCumulative() + .getBottomBucketCount()); + assertEquals( + 1, + deltaContainer + .getPerWorkerHistogram(pwhName, bucketType) + .getCumulative() + .getTopBucketCount()); } @Test diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java index 00df20c4ac39..729c22d42fcf 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java @@ -91,8 +91,8 @@ public void testApplyCommittedNoFilter() { MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(15L))), ImmutableList.of( MetricUpdate.create( - MetricKey.create("step1", NAME4), - StringSetData.create(ImmutableSet.of("ab")))))); + MetricKey.create("step1", NAME4), StringSetData.create(ImmutableSet.of("ab")))), + ImmutableList.of())); metrics.commitLogical( bundle1, MetricUpdates.create( @@ -106,9 +106,8 @@ public void testApplyCommittedNoFilter() { MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(27L))), ImmutableList.of( MetricUpdate.create( - MetricKey.create("step1", NAME4), - StringSetData.create(ImmutableSet.of("cd")))))); - + MetricKey.create("step1", NAME4), StringSetData.create(ImmutableSet.of("cd")))), + ImmutableList.of())); MetricQueryResults results = metrics.allMetrics(); assertThat( results.getCounters(), @@ -157,6 +156,7 @@ public void testApplyAttemptedCountersQueryOneNamespace() { MetricUpdate.create(MetricKey.create("step1", NAME3), 8L)), ImmutableList.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of())); metrics.updatePhysical( bundle1, @@ -166,6 +166,7 @@ public void testApplyAttemptedCountersQueryOneNamespace() { MetricUpdate.create(MetricKey.create("step1", NAME3), 4L)), ImmutableList.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of())); MetricQueryResults results = @@ -195,6 +196,7 @@ public void testApplyAttemptedQueryCompositeScope() { MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)), ImmutableList.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of())); metrics.updatePhysical( bundle1, @@ -204,6 +206,7 @@ public void testApplyAttemptedQueryCompositeScope() { MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)), ImmutableList.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of())); MetricQueryResults results = @@ -233,6 +236,7 @@ public void testPartialScopeMatchingInMetricsQuery() { MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner2", NAME1), 8L)), ImmutableList.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of())); metrics.updatePhysical( bundle1, @@ -242,6 +246,7 @@ public void testPartialScopeMatchingInMetricsQuery() { MetricUpdate.create(MetricKey.create("Top1/Outer2/Inner2", NAME1), 18L)), ImmutableList.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of())); MetricQueryResults results = diff --git a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java index 96c0374067cf..5e3319a0f0df 100644 --- a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java +++ b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricsSink; import org.apache.beam.sdk.metrics.StringSetResult; +import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.joda.time.Instant; @@ -82,4 +83,9 @@ public Iterable> getStringSets() { StringSetResult.create(ImmutableSet.of("ab")), StringSetResult.create(ImmutableSet.of("cd"))); } + + @Override + public Iterable> getPerWorkerHistograms() { + return Collections.emptyList(); + } } diff --git a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java index 10e9481d271b..ed7b408926a3 100644 --- a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java +++ b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java @@ -94,7 +94,7 @@ public void testWriteMetricsWithCommittedSupported() throws Exception { + "\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":" + "\"1970-01-05T00:04:22.800Z\",\"value\":120},\"committed\":{\"timestamp\":" + "\"1970-01-05T00:04:22.800Z\",\"value\":100},\"name\":{\"name\":\"n3\",\"namespace\":" - + "\"ns1\"},\"step\":\"s3\"}],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd" + + "\"ns1\"},\"step\":\"s3\"}],\"perWorkerHistograms\":[],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd" + "\"]},\"committed\":{\"stringSet\":[\"ab\"]},\"name\":{\"name\":\"n3\"," + "\"namespace\":\"ns1\"},\"step\":\"s3\"}]}"; assertEquals("Wrong number of messages sent to HTTP server", 1, messages.size()); @@ -116,7 +116,7 @@ public void testWriteMetricsWithCommittedUnSupported() throws Exception { + "{\"count\":4,\"max\":9,\"mean\":6.25,\"min\":3,\"sum\":25},\"name\":{\"name\":\"n2\"" + ",\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":" + "\"1970-01-05T00:04:22.800Z\",\"value\":120},\"name\":{\"name\":\"n3\",\"namespace\":" - + "\"ns1\"},\"step\":\"s3\"}],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd\"]}," + + "\"ns1\"},\"step\":\"s3\"}],\"perWorkerHistograms\":[],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd\"]}," + "\"name\":{\"name\":\"n3\",\"namespace\":\"ns1\"},\"step\":\"s3\"}]}"; assertEquals("Wrong number of messages sent to HTTP server", 1, messages.size()); assertEquals("Wrong messages sent to HTTP server", expected, messages.get(0)); diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java index 67cf3280a83c..281f903f2c54 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.metrics.StringSetResult; +import org.apache.beam.sdk.util.HistogramData; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -90,6 +91,11 @@ public Iterable> getGauges() { public Iterable> getStringSets() { return Collections.emptyList(); } + + @Override + public Iterable> getPerWorkerHistograms() { + return Collections.emptyList(); + } }; } }; diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java index 44681a626cc0..fdedcc0086be 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.jet.metrics; import com.hazelcast.map.IMap; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import javax.annotation.concurrent.GuardedBy; @@ -35,6 +36,7 @@ import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.metrics.StringSetResult; +import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicate; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable; import org.checkerframework.checker.nullness.qual.Nullable; @@ -104,6 +106,7 @@ private static class QueryResults extends MetricQueryResults { private final Iterable> distributions; private final Iterable> gauges; private final Iterable> stringSets; + private final Iterable> perWorkerHistograms; private QueryResults( Iterable> counters, @@ -114,6 +117,7 @@ private QueryResults( this.distributions = distributions; this.gauges = gauges; this.stringSets = stringSets; + this.perWorkerHistograms = Collections.emptyList(); // not implemented } @Override @@ -135,6 +139,11 @@ public Iterable> getGauges() { public Iterable> getStringSets() { return stringSets; } + + @Override + public Iterable> getPerWorkerHistograms() { + return perWorkerHistograms; + } } private static class Counters { diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java index 64455d704c9b..09e19219ed9b 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java @@ -21,6 +21,7 @@ import com.hazelcast.jet.core.Processor; import com.hazelcast.map.IMap; import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.beam.runners.core.metrics.DistributionData; @@ -34,6 +35,7 @@ import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.StringSet; +import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; /** Jet specific implementation of {@link MetricsContainer}. */ @@ -152,5 +154,10 @@ public Iterable> gaugeUpdates() { public Iterable> stringSetUpdates() { return stringSets; } + + @Override + public Iterable> perWorkerHistogramsUpdates() { + return Collections.emptyList(); // not implemented + } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Histogram.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Histogram.java index a37625ed05d6..943e00dc11da 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Histogram.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Histogram.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.metrics; +import org.apache.beam.sdk.util.HistogramData; + /** A metric that reports information about the histogram of reported values. */ public interface Histogram extends Metric { /** Add an observation to this histogram. */ @@ -28,4 +30,7 @@ default void update(double... values) { update(value); } } + + /** Add a histogram to this histogram. Requires underlying implementation to implement this */ + default void update(HistogramData data) {} } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java index 9f60ce3d6c07..5e2605b2dc70 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java @@ -18,7 +18,9 @@ package org.apache.beam.sdk.metrics; import com.google.auto.value.AutoValue; +import java.util.Collections; import java.util.List; +import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; /** The results of a query for metrics. Allows accessing all the metrics that matched the filter. */ @@ -36,6 +38,9 @@ public abstract class MetricQueryResults { /** Return the metric results for the sets that matched the filter. */ public abstract Iterable> getStringSets(); + /** Return the metric results for the sets that matched the filter. */ + public abstract Iterable> getPerWorkerHistograms(); + static void printMetrics(String type, Iterable> metrics, StringBuilder sb) { List> metricsList = ImmutableList.copyOf(metrics); if (!metricsList.isEmpty()) { @@ -74,6 +79,17 @@ public static MetricQueryResults create( Iterable> distributions, Iterable> gauges, Iterable> stringSets) { - return new AutoValue_MetricQueryResults(counters, distributions, gauges, stringSets); + return new AutoValue_MetricQueryResults( + counters, distributions, gauges, stringSets, Collections.emptyList()); + } + + public static MetricQueryResults create( + Iterable> counters, + Iterable> distributions, + Iterable> gauges, + Iterable> stringSets, + Iterable> histogramData) { + return new AutoValue_MetricQueryResults( + counters, distributions, gauges, stringSets, histogramData); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java index 516425108e68..27bb05cd2fa1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java @@ -304,6 +304,17 @@ public synchronized long getTotalCount() { return numBoundedBucketRecords + numTopRecords + numBottomRecords; } + public HistogramData extractResult() { + HistogramData other = new HistogramData(this.getBucketType()); + other.update(this); + return other; + } + + public HistogramData combine(HistogramData value) { + this.update(value); + return this; + } + public synchronized String getPercentileString(String elemType, String unit) { return String.format( "Total number of %s: %s, P99: %.0f %s, P90: %.0f %s, P50: %.0f %s", diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java index f5075a3f2c55..ba5a2cad9d19 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java @@ -52,7 +52,9 @@ public void testGcpCoreApiSurface() throws Exception { classesInPackage("com.google.api.client.http"), classesInPackage("com.google.api.client.json"), classesInPackage("com.google.api.client.util"), + classesInPackage("com.google.api.services.dataflow"), classesInPackage("com.google.api.services.storage"), + classesInPackage("com.google.api.services.dataflow"), classesInPackage("com.google.auth"), classesInPackage("com.fasterxml.jackson.annotation"), classesInPackage("java"), diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java index 8695a445c118..39824b7b2a92 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java @@ -32,10 +32,10 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import org.apache.beam.runners.core.metrics.CounterCell; +import org.apache.beam.runners.core.metrics.HistogramCell; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context; import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.metrics.MetricsEnvironment; @@ -51,10 +51,14 @@ @RunWith(JUnit4.class) public class BigQuerySinkMetricsTest { - public static class TestHistogram implements Histogram { + public static class TestHistogramCell extends HistogramCell { public List values = Lists.newArrayList(); private MetricName metricName = MetricName.named("namespace", "name"); + public TestHistogramCell(KV kv) { + super(kv); + } + @Override public void update(double value) { values.add(value); @@ -68,10 +72,9 @@ public MetricName getName() { public static class TestMetricsContainer extends MetricsContainerImpl { - // public TestHistogram testHistogram = new TestHistogram(); - public ConcurrentHashMap, TestHistogram> + public ConcurrentHashMap, TestHistogramCell> perWorkerHistograms = - new ConcurrentHashMap, TestHistogram>(); + new ConcurrentHashMap, TestHistogramCell>(); public ConcurrentHashMap perWorkerCounters = new ConcurrentHashMap(); @@ -80,11 +83,11 @@ public TestMetricsContainer() { } @Override - public Histogram getPerWorkerHistogram( + public TestHistogramCell getPerWorkerHistogram( MetricName metricName, HistogramData.BucketType bucketType) { - perWorkerHistograms.computeIfAbsent(KV.of(metricName, bucketType), kv -> new TestHistogram()); + perWorkerHistograms.computeIfAbsent( + KV.of(metricName, bucketType), kv -> new TestHistogramCell(kv)); return perWorkerHistograms.get(KV.of(metricName, bucketType)); - // return testHistogram; } @Override @@ -95,7 +98,6 @@ public Counter getPerWorkerCounter(MetricName metricName) { @Override public void reset() { - // testHistogram.values.clear(); perWorkerHistograms.clear(); perWorkerCounters.clear(); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java index b84e143be773..8186edb7f924 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java @@ -24,8 +24,8 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.runners.core.metrics.HistogramCell; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; -import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.HistogramData; @@ -39,10 +39,14 @@ // TODO:Naireen - Refactor to remove duplicate code between the two sinks @RunWith(JUnit4.class) public class KafkaMetricsTest { - public static class TestHistogram implements Histogram { + public static class TestHistogramCell extends HistogramCell { public List values = Lists.newArrayList(); private MetricName metricName = MetricName.named("KafkaSink", "name"); + public TestHistogramCell(KV kv) { + super(kv); + } + @Override public void update(double value) { values.add(value); @@ -55,25 +59,21 @@ public MetricName getName() { } public static class TestMetricsContainer extends MetricsContainerImpl { - public ConcurrentHashMap, TestHistogram> + public ConcurrentHashMap, TestHistogramCell> perWorkerHistograms = - new ConcurrentHashMap, TestHistogram>(); + new ConcurrentHashMap, TestHistogramCell>(); public TestMetricsContainer() { super("TestStep"); } @Override - public Histogram getPerWorkerHistogram( + public TestHistogramCell getPerWorkerHistogram( MetricName metricName, HistogramData.BucketType bucketType) { - perWorkerHistograms.computeIfAbsent(KV.of(metricName, bucketType), kv -> new TestHistogram()); + perWorkerHistograms.computeIfAbsent( + KV.of(metricName, bucketType), kv -> new TestHistogramCell(kv)); return perWorkerHistograms.get(KV.of(metricName, bucketType)); } - - @Override - public void reset() { - perWorkerHistograms.clear(); - } } @Test