Skip to content

Commit

Permalink
Report delta StringSet counters for streaming (#33691)
Browse files Browse the repository at this point in the history
* Report delta StringSet counters from for streaming

* Update unit test
  • Loading branch information
rohitsinha54 authored Jan 30, 2025
1 parent 1742b2e commit bbe6394
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ public StringSetData getCumulative() {
return setValue.get();
}

// Used by Streaming metric container to extract deltas since streaming metrics are
// reported as deltas rather than cumulative as in batch.
// For delta we take the current value then reset the cell to empty so the next call only see
// delta/updates from last call.
public StringSetData getAndReset() {
return setValue.getAndUpdate(unused -> StringSetData.empty());
}

@Override
public MetricName getName() {
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ public Iterable<CounterUpdate> extractMetricUpdates(boolean isFinalUpdate) {
.transform(
update ->
MetricsToCounterUpdateConverter.fromStringSet(
update.getKey(), update.getUpdate())));
update.getKey(), true, update.getUpdate())));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,16 @@ public static CounterUpdate fromGauge(
.setIntegerGauge(integerGaugeProto);
}

public static CounterUpdate fromStringSet(MetricKey key, StringSetData stringSetData) {
public static CounterUpdate fromStringSet(
MetricKey key, boolean isCumulative, StringSetData stringSetData) {
CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET);

StringList stringList = new StringList();
stringList.setElements(new ArrayList<>(stringSetData.stringSet()));

return new CounterUpdate()
.setStructuredNameAndMetadata(name)
.setCumulative(false)
.setCumulative(isCumulative)
.setStringList(stringList);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.runners.core.metrics.GaugeCell;
import org.apache.beam.runners.core.metrics.MetricsMap;
import org.apache.beam.runners.core.metrics.StringSetCell;
import org.apache.beam.runners.core.metrics.StringSetData;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
Expand Down Expand Up @@ -73,7 +74,7 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
private final ConcurrentHashMap<MetricName, GaugeCell> perWorkerGauges =
new ConcurrentHashMap<>();

private MetricsMap<MetricName, StringSetCell> stringSet = new MetricsMap<>(StringSetCell::new);
private MetricsMap<MetricName, StringSetCell> stringSets = new MetricsMap<>(StringSetCell::new);

private MetricsMap<MetricName, DeltaDistributionCell> distributions =
new MetricsMap<>(DeltaDistributionCell::new);
Expand Down Expand Up @@ -182,7 +183,7 @@ public Gauge getPerWorkerGauge(MetricName metricName) {

@Override
public StringSet getStringSet(MetricName metricName) {
return stringSet.get(metricName);
return stringSets.get(metricName);
}

@Override
Expand All @@ -202,9 +203,11 @@ public Histogram getPerWorkerHistogram(
}

public Iterable<CounterUpdate> extractUpdates() {
// Streaming metrics are updated as delta and not cumulative.
return counterUpdates()
.append(distributionUpdates())
.append(gaugeUpdates().append(stringSetUpdates()));
.append(gaugeUpdates())
.append(stringSetUpdates());
}

private FluentIterable<CounterUpdate> counterUpdates() {
Expand Down Expand Up @@ -247,14 +250,18 @@ private FluentIterable<CounterUpdate> gaugeUpdates() {
}

private FluentIterable<CounterUpdate> stringSetUpdates() {
return FluentIterable.from(stringSet.entries())
return FluentIterable.from(stringSets.entries())
.transform(
new Function<Entry<MetricName, StringSetCell>, CounterUpdate>() {
@Override
public @Nullable CounterUpdate apply(
@Nonnull Map.Entry<MetricName, StringSetCell> entry) {
StringSetData value = entry.getValue().getAndReset();
if (value.stringSet().isEmpty()) {
return null;
}
return MetricsToCounterUpdateConverter.fromStringSet(
MetricKey.create(stepName, entry.getKey()), entry.getValue().getCumulative());
MetricKey.create(stepName, entry.getKey()), false, value);
}
})
.filter(Predicates.notNull());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,14 @@ public void testStringSetUpdateExtraction() {
.setStringList(new StringList().setElements(Arrays.asList("ij", "kl", "mn")));

updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update, name2Update));
assertThat(updates, containsInAnyOrder(name2Update));

// test deltas
c1.getStringSet(name1).add("op");
name1Update.setStringList(
new StringList().setElements(Arrays.asList("ab", "cd", "ef", "gh", "op")));
name1Update.setStringList(new StringList().setElements(Arrays.asList("op")));

updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update, name2Update));
assertThat(updates, containsInAnyOrder(name1Update));
}

@Test
Expand Down

0 comments on commit bbe6394

Please sign in to comment.