Skip to content

Commit

Permalink
Add histogram metric to portable runner
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Jan 7, 2025
1 parent 6509e51 commit 94452e6
Show file tree
Hide file tree
Showing 23 changed files with 332 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,16 @@ message MonitoringInfoSpecs {
}
]
}];

USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = {
urn: "beam:metric:user:per_worker_histogram_int64:v1",
type: "beam:metrics:per_worker_histogram_int64:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report per worker histogram metric."
}]
}];
}
}

Expand Down Expand Up @@ -593,6 +603,12 @@ message MonitoringInfoTypeUrns {
BOUNDED_TRIE_TYPE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:bounded_trie:v1"];

// Represents a histogram.
//
// Encoding: DataflowHistogram proto
HISTOGRAM = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:histogram_int64:v1"];

// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand All @@ -42,16 +43,19 @@ public class DefaultMetricResults extends MetricResults {
private final Iterable<MetricResult<DistributionResult>> distributions;
private final Iterable<MetricResult<GaugeResult>> gauges;
private final Iterable<MetricResult<StringSetResult>> stringSets;
private final Iterable<MetricResult<HistogramData>> perWorkerHistograms;

public DefaultMetricResults(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions,
Iterable<MetricResult<GaugeResult>> gauges,
Iterable<MetricResult<StringSetResult>> stringSets) {
Iterable<MetricResult<StringSetResult>> stringSets,
Iterable<MetricResult<HistogramData>> perWorkerHistograms) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
this.stringSets = stringSets;
this.perWorkerHistograms = perWorkerHistograms;
}

@Override
Expand All @@ -62,6 +66,9 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
distributions, distribution -> MetricFiltering.matches(filter, distribution.getKey())),
Iterables.filter(gauges, gauge -> MetricFiltering.matches(filter, gauge.getKey())),
Iterables.filter(
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())));
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())),
Iterables.filter(
perWorkerHistograms,
perWorkerHistogram -> MetricFiltering.matches(filter, perWorkerHistogram.getKey())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public void update(HistogramCell other) {
dirty.afterModification();
}

@Override
public void update(HistogramData data) {
this.value.update(data);
dirty.afterModification();
}

// TODO(https://github.com/apache/beam/issues/20853): Update this function to allow incrementing
// the infinite buckets as well.
// and remove the incTopBucketCount and incBotBucketCount methods.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.Collections;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;

/** Representation of multiple metric updates. */
Expand All @@ -34,6 +35,7 @@ public abstract class MetricUpdates {
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList());

/**
Expand Down Expand Up @@ -66,21 +68,30 @@ public static <T> MetricUpdate<T> create(MetricKey key, T update) {
/** All the sets updates. */
public abstract Iterable<MetricUpdate<StringSetData>> stringSetUpdates();

/** All the histogram updates. */
public abstract Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates();

/** Create a new {@link MetricUpdates} bundle. */
public static MetricUpdates create(
Iterable<MetricUpdate<Long>> counterUpdates,
Iterable<MetricUpdate<DistributionData>> distributionUpdates,
Iterable<MetricUpdate<GaugeData>> gaugeUpdates,
Iterable<MetricUpdate<StringSetData>> stringSetUpdates) {
Iterable<MetricUpdate<StringSetData>> stringSetUpdates,
Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates) {
return new AutoValue_MetricUpdates(
counterUpdates, distributionUpdates, gaugeUpdates, stringSetUpdates);
counterUpdates,
distributionUpdates,
gaugeUpdates,
stringSetUpdates,
perWorkerHistogramsUpdates);
}

/** Returns true if there are no updates in this MetricUpdates object. */
public boolean isEmpty() {
return Iterables.isEmpty(counterUpdates())
&& Iterables.isEmpty(distributionUpdates())
&& Iterables.isEmpty(gaugeUpdates())
&& Iterables.isEmpty(stringSetUpdates());
&& Iterables.isEmpty(stringSetUpdates())
&& Iterables.isEmpty(perWorkerHistogramsUpdates());
}
}
Loading

0 comments on commit 94452e6

Please sign in to comment.