diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index ef934efa94be..9938affd0ba2 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -739,7 +739,7 @@ class BeamModulePlugin implements Plugin { google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20240919-2.0.0", // [bomupgrader] sets version google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20240310-2.0.0", // [bomupgrader] sets version - google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20240817-$google_clients_version", + google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20241209-$google_clients_version", google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version", google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version", google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20240924-2.0.0", // [bomupgrader] sets version diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto index 33bb5ae729f8..3a7fed923dbe 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto @@ -447,6 +447,7 @@ message MonitoringInfo { SPANNER_TABLE_ID = 25 [(label_props) = { name: "SPANNER_TABLE_ID" }]; SPANNER_INSTANCE_ID = 26 [(label_props) = { name: "SPANNER_INSTANCE_ID" }]; SPANNER_QUERY_NAME = 27 [(label_props) = { name: "SPANNER_QUERY_NAME" }]; + PER_WORKER_METRIC = 28 [(label_props) = { name: "PER_WORKER_METRIC" }]; } // A set of key and value labels which define the scope of the metric. For diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java index 99cf98508505..8760b59c158e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java @@ -268,6 +268,13 @@ public MetricUpdates getUpdates() { .setLabel(MonitoringInfoConstants.Labels.NAME, metricKey.metricName().getName()) .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, metricKey.stepName()); } + + // Based on namespace, add per worker metrics label to enable separate runner based sink based + // processing. + if (metricName.getNamespace().equals("BigQuerySink") + || metricName.getNamespace().equals("KafkaSink")) { + builder.setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true"); + } return builder; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java index 2bb935111d38..fcd5db38fddb 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java @@ -107,6 +107,7 @@ public static final class Labels { public static final String SPANNER_DATABASE_ID = "SPANNER_DATABASE_ID"; public static final String SPANNER_INSTANCE_ID = "SPANNER_INSTANCE_ID"; public static final String SPANNER_QUERY_NAME = "SPANNER_QUERY_NAME"; + public static final String PER_WORKER_METRIC = "PER_WORKER_METRIC"; static { // Validate that compile time constants match the values stored in the protos. @@ -148,6 +149,7 @@ public static final class Labels { SPANNER_INSTANCE_ID.equals(extractLabel(MonitoringInfoLabels.SPANNER_INSTANCE_ID))); checkArgument( SPANNER_QUERY_NAME.equals(extractLabel(MonitoringInfoLabels.SPANNER_QUERY_NAME))); + checkArgument(PER_WORKER_METRIC.equals(extractLabel(MonitoringInfoLabels.PER_WORKER_METRIC))); } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java index 5b3d71f4873e..3feb21784b6e 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java @@ -204,6 +204,54 @@ public void testMonitoringInfosArePopulatedForUserCounters() { assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build())); } + @Test + public void testMonitoringInfosLabelsArePopulatedForSinkCounter() { + MetricsContainerImpl testObject = new MetricsContainerImpl("step1"); + CounterCell c1 = testObject.getCounter(MetricName.named("KafkaSink", "name1")); + CounterCell c2 = testObject.getCounter(MetricName.named("BigQuerySink", "name2")); + CounterCell c3 = testObject.getCounter(MetricName.named("PS", "name3")); + + c1.inc(2L); + c2.inc(4L); + c3.inc(5L); + + SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder(); + builder1 + .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64) + .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "KafkaSink") + .setLabel(MonitoringInfoConstants.Labels.NAME, "name1") + .setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true") + .setInt64SumValue(2) + .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1"); + + SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder(); + builder2 + .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64) + .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "BigQuerySink") + .setLabel(MonitoringInfoConstants.Labels.NAME, "name2") + .setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true") + .setInt64SumValue(4) + .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1"); + + // Not in an supported namespace, so extra metadata isn't added. + SimpleMonitoringInfoBuilder builder3 = new SimpleMonitoringInfoBuilder(); + builder3 + .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64) + .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "PS") + .setLabel(MonitoringInfoConstants.Labels.NAME, "name3") + .setInt64SumValue(5) + .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1"); + + ArrayList actualMonitoringInfos = new ArrayList(); + for (MonitoringInfo mi : testObject.getMonitoringInfos()) { + actualMonitoringInfos.add(mi); + } + + assertThat( + actualMonitoringInfos, + containsInAnyOrder(builder1.build(), builder2.build(), builder3.build())); + } + @Test public void testMonitoringInfosArePopulatedForUserDistributions() { MetricsContainerImpl testObject = new MetricsContainerImpl("step1"); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java index f9cd098edaa6..5bc6ecc64ea6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java @@ -74,6 +74,12 @@ public Gauge getGauge(MetricName metricName) { return getCurrentContainer().getGauge(metricName); } + @Override + public Gauge getPerWorkerGauge(MetricName metricName) { + Gauge gauge = getCurrentContainer().getPerWorkerGauge(metricName); + return gauge; + } + @Override public StringSet getStringSet(MetricName metricName) { return getCurrentContainer().getStringSet(metricName); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java index 91baefa0be4c..83ab9434a7d3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java @@ -19,22 +19,27 @@ import com.google.api.services.dataflow.model.Base2Exponent; import com.google.api.services.dataflow.model.BucketOptions; +import com.google.api.services.dataflow.model.DataflowGaugeValue; import com.google.api.services.dataflow.model.DataflowHistogramValue; import com.google.api.services.dataflow.model.Linear; import com.google.api.services.dataflow.model.MetricValue; import com.google.api.services.dataflow.model.OutlierStats; import com.google.api.services.dataflow.model.PerStepNamespaceMetrics; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.stream.Stream; import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics; import org.apache.beam.sdk.metrics.LabeledMetricNameUtils; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.util.HistogramData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Converts metric updates to {@link PerStepNamespaceMetrics} protos. Currently we only support @@ -42,9 +47,16 @@ * converter. */ public class MetricsToPerStepNamespaceMetricsConverter { + + private static final Logger LOG = + LoggerFactory.getLogger(MetricsToPerStepNamespaceMetricsConverter.class); + // Avoids to introduce mandatory kafka-io dependency to Dataflow worker // keep in sync with org.apache.beam.sdk.io.kafka.KafkaSinkMetrics.METRICS_NAMESPACE public static String KAFKA_SINK_METRICS_NAMESPACE = "KafkaSink"; + private static String[] SUPPORTED_NAMESPACES = { + KAFKA_SINK_METRICS_NAMESPACE, BigQuerySinkMetrics.METRICS_NAMESPACE + }; private static Optional getParsedMetricName( MetricName metricName, @@ -59,6 +71,23 @@ private static Optional getParsedMetric return parsedMetricName; } + /** + * @param metricName The {@link MetricName} that represents this metric. + * @return boolean If the metric is from a supported namespace. + */ + private static boolean isNameSpaceSupported(MetricName metricName) { + boolean isValidNameSpace = + Stream.of(SUPPORTED_NAMESPACES).anyMatch(x -> x.equals(metricName.getNamespace())); + if (!isValidNameSpace) { + LOG.warn( + "Dropping metric {} since {} is not one of the supported namespaces: {}", + metricName, + metricName.getNamespace(), + Arrays.toString(SUPPORTED_NAMESPACES)); + } + return isValidNameSpace; + } + /** * @param metricName The {@link MetricName} that represents this counter. * @param value The counter value. @@ -70,9 +99,7 @@ private static Optional convertCounterToMetricValue( Long value, Map parsedPerWorkerMetricsCache) { - if (value == 0 - || (!metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE) - && !metricName.getNamespace().equals(KAFKA_SINK_METRICS_NAMESPACE))) { + if (value == 0) { return Optional.empty(); } @@ -86,6 +113,33 @@ private static Optional convertCounterToMetricValue( .setValueInt64(value)); } + /** + * @param metricName The {@link MetricName} that represents this counter. + * @param value The counter value. + * @return If the conversion succeeds, {@code MetricValue} that represents this counter. Otherwise + * returns an empty optional + */ + private static Optional convertGaugeToMetricValue( + MetricName metricName, + Long value, + Map parsedPerWorkerMetricsCache) { + + Optional labeledName = + getParsedMetricName(metricName, parsedPerWorkerMetricsCache); + if (!labeledName.isPresent() || labeledName.get().getBaseName().isEmpty()) { + return Optional.empty(); + } + + DataflowGaugeValue gauge_value = new DataflowGaugeValue(); + gauge_value.setValue(value); + + return Optional.of( + new MetricValue() + .setMetric(labeledName.get().getBaseName()) + .setMetricLabels(labeledName.get().getMetricLabels()) + .setValueGauge64(gauge_value)); + } + /** * Adds {@code outlierStats} to {@code outputHistogram} if {@code inputHistogram} has recorded * overflow or underflow values. @@ -196,6 +250,7 @@ private static Optional convertHistogramToMetricValue( public static Collection convert( String stepName, Map counters, + Map gauges, Map histograms, Map parsedPerWorkerMetricsCache) { @@ -203,6 +258,11 @@ public static Collection convert( for (Entry entry : counters.entrySet()) { MetricName metricName = entry.getKey(); + boolean validNameSpace = isNameSpaceSupported(metricName); + if (!validNameSpace) { + continue; + } + Optional metricValue = convertCounterToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache); if (!metricValue.isPresent()) { @@ -225,6 +285,11 @@ public static Collection convert( for (Entry entry : histograms.entrySet()) { MetricName metricName = entry.getKey(); + + boolean validNameSpace = isNameSpaceSupported(metricName); + if (!validNameSpace) { + continue; + } Optional metricValue = convertHistogramToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache); if (!metricValue.isPresent()) { @@ -245,6 +310,32 @@ public static Collection convert( stepNamespaceMetrics.getMetricValues().add(metricValue.get()); } + for (Entry entry : gauges.entrySet()) { + MetricName metricName = entry.getKey(); + + boolean validNameSpace = isNameSpaceSupported(metricName); + if (!validNameSpace) { + continue; + } + + Optional metricValue = + convertGaugeToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache); + if (!metricValue.isPresent()) { + continue; + } + + PerStepNamespaceMetrics stepNamespaceMetrics = + metricsByNamespace.get(metricName.getNamespace()); + if (stepNamespaceMetrics == null) { + stepNamespaceMetrics = + new PerStepNamespaceMetrics() + .setMetricValues(new ArrayList<>()) + .setOriginalStep(stepName) + .setMetricsNamespace(metricName.getNamespace()); + metricsByNamespace.put(metricName.getNamespace(), stepNamespaceMetrics); + } + stepNamespaceMetrics.getMetricValues().add(metricValue.get()); + } return metricsByNamespace.values(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index 7cc0dc68f7e7..03ba501941ae 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -58,6 +58,7 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class StreamingStepMetricsContainer implements MetricsContainer { + private final String stepName; private static boolean enablePerWorkerMetrics = false; @@ -69,6 +70,9 @@ public class StreamingStepMetricsContainer implements MetricsContainer { private MetricsMap gauges = new MetricsMap<>(GaugeCell::new); + private final ConcurrentHashMap perWorkerGauges = + new ConcurrentHashMap<>(); + private MetricsMap stringSet = new MetricsMap<>(StringSetCell::new); private MetricsMap distributions = @@ -163,6 +167,19 @@ public Gauge getGauge(MetricName metricName) { return gauges.get(metricName); } + @Override + public Gauge getPerWorkerGauge(MetricName metricName) { + if (!enablePerWorkerMetrics) { + return MetricsContainer.super.getPerWorkerGauge(metricName); + } + Gauge val = perWorkerGauges.get(metricName); + if (val != null) { + return val; + } + + return perWorkerGauges.computeIfAbsent(metricName, name -> new GaugeCell(metricName)); + } + @Override public StringSet getStringSet(MetricName metricName) { return stringSet.get(metricName); @@ -330,11 +347,10 @@ private void deleteStaleCounters( @VisibleForTesting Iterable extractPerWorkerMetricUpdates() { ConcurrentHashMap counters = new ConcurrentHashMap(); + ConcurrentHashMap gauges = new ConcurrentHashMap(); ConcurrentHashMap histograms = new ConcurrentHashMap(); HashSet currentZeroValuedCounters = new HashSet(); - - // Extract metrics updates. perWorkerCounters.forEach( (k, v) -> { Long val = v.getAndSet(0); @@ -344,6 +360,13 @@ Iterable extractPerWorkerMetricUpdates() { } counters.put(k, val); }); + + perWorkerGauges.forEach( + (k, v) -> { + Long val = v.getCumulative().value(); + gauges.put(k, val); + v.reset(); + }); perWorkerHistograms.forEach( (k, v) -> { v.getSnapshotAndReset().ifPresent(snapshot -> histograms.put(k, snapshot)); @@ -352,7 +375,7 @@ Iterable extractPerWorkerMetricUpdates() { deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock)); return MetricsToPerStepNamespaceMetricsConverter.convert( - stepName, counters, histograms, parsedPerWorkerMetricsCache); + stepName, counters, gauges, histograms, parsedPerWorkerMetricsCache); } /** diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java index f9a9bb42906b..b393a2696f64 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java @@ -23,6 +23,7 @@ import com.google.api.services.dataflow.model.Base2Exponent; import com.google.api.services.dataflow.model.BucketOptions; +import com.google.api.services.dataflow.model.DataflowGaugeValue; import com.google.api.services.dataflow.model.DataflowHistogramValue; import com.google.api.services.dataflow.model.Linear; import com.google.api.services.dataflow.model.MetricValue; @@ -86,6 +87,7 @@ public void testConvert_successfulyConvertCounters() { String step = "testStepName"; Map emptyHistograms = new HashMap<>(); Map counters = new HashMap(); + Map emptyGauges = new HashMap(); Map parsedMetricNames = new HashMap<>(); MetricName bigQueryMetric1 = MetricName.named("BigQuerySink", "metric1"); @@ -99,7 +101,7 @@ public void testConvert_successfulyConvertCounters() { Collection conversionResult = MetricsToPerStepNamespaceMetricsConverter.convert( - step, counters, emptyHistograms, parsedMetricNames); + step, counters, emptyGauges, emptyHistograms, parsedMetricNames); MetricValue expectedVal1 = new MetricValue().setMetric("metric1").setValueInt64(5L).setMetricLabels(new HashMap<>()); @@ -133,6 +135,7 @@ public void testConvert_skipInvalidMetricNames() { Map parsedMetricNames = new HashMap<>(); Map counters = new HashMap<>(); + Map emptyGauges = new HashMap(); MetricName invalidName1 = MetricName.named("BigQuerySink", "**"); counters.put(invalidName1, 5L); @@ -144,15 +147,38 @@ public void testConvert_skipInvalidMetricNames() { Collection conversionResult = MetricsToPerStepNamespaceMetricsConverter.convert( - "testStep", counters, histograms, parsedMetricNames); + "testStep", counters, emptyGauges, histograms, parsedMetricNames); assertThat(conversionResult.size(), equalTo(0)); assertThat(parsedMetricNames.size(), equalTo(0)); } @Test - public void testConvert_successfulConvertHistograms() { + public void testConvert_skipInvalidMetricNameSpaces() { Map parsedMetricNames = new HashMap<>(); + Map counters = new HashMap<>(); + Map emptyGauges = new HashMap(); + MetricName invalidNameSpace1 = MetricName.named("Unsupported", "baseLabel1"); + counters.put(invalidNameSpace1, 5L); + + Map histograms = new HashMap<>(); + MetricName invalidNameSpace2 = MetricName.named("Unsupported", "baseLabel2"); + LockFreeHistogram nonEmptyLinearHistogram = + new LockFreeHistogram(invalidNameSpace2, lienarBuckets); + nonEmptyLinearHistogram.update(-5.0); + histograms.put(invalidNameSpace2, nonEmptyLinearHistogram.getSnapshotAndReset().get()); + + Collection conversionResult = + MetricsToPerStepNamespaceMetricsConverter.convert( + "testStep", counters, emptyGauges, histograms, parsedMetricNames); + assertThat(conversionResult.size(), equalTo(0)); + assertThat(parsedMetricNames.size(), equalTo(0)); + } + + @Test + public void testConvert_successfulConvertHistograms() { + Map parsedMetricNames = new HashMap<>(); + Map emptyGauges = new HashMap(); Map histograms = new HashMap<>(); MetricName bigQueryMetric1 = MetricName.named("BigQuerySink", "baseLabel"); MetricName bigQueryMetric2 = @@ -181,7 +207,7 @@ public void testConvert_successfulConvertHistograms() { Map emptyCounters = new HashMap<>(); Collection conversionResult = MetricsToPerStepNamespaceMetricsConverter.convert( - step, emptyCounters, histograms, parsedMetricNames); + step, emptyCounters, emptyGauges, histograms, parsedMetricNames); // Expected value 1 List bucketCounts1 = ImmutableList.of(0L, 1L, 1L, 1L); @@ -271,7 +297,7 @@ public void testConvert_skipUnknownHistogramBucketType() { Collection conversionResult = MetricsToPerStepNamespaceMetricsConverter.convert( - step, emptyCounters, histograms, parsedMetricNames); + step, emptyCounters, emptyCounters, histograms, parsedMetricNames); assertThat(conversionResult.size(), equalTo(0)); assertThat(parsedMetricNames.size(), equalTo(0)); } @@ -280,6 +306,7 @@ public void testConvert_skipUnknownHistogramBucketType() { public void testConvert_convertCountersAndHistograms() { String step = "testStep"; Map counters = new HashMap<>(); + Map emptyGauges = new HashMap<>(); Map histograms = new HashMap<>(); Map parsedMetricNames = new HashMap<>(); @@ -293,7 +320,7 @@ public void testConvert_convertCountersAndHistograms() { Collection conversionResult = MetricsToPerStepNamespaceMetricsConverter.convert( - step, counters, histograms, parsedMetricNames); + step, counters, emptyGauges, histograms, parsedMetricNames); // Expected counter MetricValue Map counterLabelMap = new HashMap<>(); @@ -345,4 +372,76 @@ public void testConvert_convertCountersAndHistograms() { parsedMetricNames, IsMapContaining.hasEntry(histogramMetricName, parsedHistogramMetricName)); } + + @Test + public void testConvert_successfulyConvertGauges() { + String step = "testStepName"; + Map emptyHistograms = new HashMap<>(); + Map counters = new HashMap(); + Map gauges = new HashMap(); + Map parsedMetricNames = new HashMap<>(); + + MetricName KafkaMetric1 = MetricName.named("KafkaSink", "metric1"); + MetricName KafkaMetric2 = MetricName.named("KafkaSink", "metric2*label1:val1;label2:val2;"); + MetricName KafkaMetric3 = MetricName.named("KafkaSink", "metric3"); + + gauges.put(KafkaMetric1, 5L); + gauges.put(KafkaMetric2, 10L); + gauges.put(KafkaMetric3, 0L); + + Collection conversionResult = + MetricsToPerStepNamespaceMetricsConverter.convert( + step, counters, gauges, emptyHistograms, parsedMetricNames); + + DataflowGaugeValue gauge_value1 = new DataflowGaugeValue(); + gauge_value1.setValue(5L); + + DataflowGaugeValue gauge_value2 = new DataflowGaugeValue(); + gauge_value2.setValue(10L); + + DataflowGaugeValue gauge_value3 = new DataflowGaugeValue(); + gauge_value3.setValue(0L); // zero valued + + MetricValue expectedVal1 = + new MetricValue() + .setMetric("metric1") + .setValueGauge64(gauge_value1) + .setMetricLabels(new HashMap<>()); + + Map val2LabelMap = new HashMap<>(); + val2LabelMap.put("label1", "val1"); + val2LabelMap.put("label2", "val2"); + MetricValue expectedVal2 = + new MetricValue() + .setMetric("metric2") + .setValueGauge64(gauge_value2) + .setMetricLabels(val2LabelMap); + + MetricValue expectedVal3 = + new MetricValue() + .setMetric("metric3") + .setValueGauge64(gauge_value3) + .setMetricLabels(new HashMap<>()); + + assertThat(conversionResult.size(), equalTo(1)); + PerStepNamespaceMetrics perStepNamespaceMetrics = conversionResult.iterator().next(); + + assertThat(perStepNamespaceMetrics.getOriginalStep(), equalTo(step)); + assertThat(perStepNamespaceMetrics.getMetricsNamespace(), equalTo("KafkaSink")); + assertThat(perStepNamespaceMetrics.getMetricValues().size(), equalTo(3)); + assertThat( + perStepNamespaceMetrics.getMetricValues(), + containsInAnyOrder(expectedVal1, expectedVal2, expectedVal3)); + + LabeledMetricNameUtils.ParsedMetricName parsedKafkaMetric1 = + LabeledMetricNameUtils.parseMetricName(KafkaMetric1.getName()).get(); + LabeledMetricNameUtils.ParsedMetricName parsedKafkaMetric2 = + LabeledMetricNameUtils.parseMetricName(KafkaMetric2.getName()).get(); + LabeledMetricNameUtils.ParsedMetricName parsedKafkaMetric3 = + LabeledMetricNameUtils.parseMetricName(KafkaMetric3.getName()).get(); + assertThat(parsedMetricNames.size(), equalTo(3)); + assertThat(parsedMetricNames, IsMapContaining.hasEntry(KafkaMetric1, parsedKafkaMetric1)); + assertThat(parsedMetricNames, IsMapContaining.hasEntry(KafkaMetric2, parsedKafkaMetric2)); + assertThat(parsedMetricNames, IsMapContaining.hasEntry(KafkaMetric3, parsedKafkaMetric3)); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingGauge.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingGauge.java new file mode 100644 index 000000000000..5eb19d21d5f3 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingGauge.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Internal; + +/** Implementation of {@link Gauge} that delegates to the instance for the current context. */ +@Internal +public class DelegatingGauge implements Metric, Gauge, Serializable { + private final MetricName name; + private final boolean processWideContainer; + private final boolean perWorkerGauge; + + /** + * Create a {@code DelegatingGauge} with {@code perWorkerGauge} and {@code processWideContainer} + * set to false. + * + * @param name Metric name for this metric. + */ + public DelegatingGauge(MetricName name) { + this(name, false, false); + } + + /** + * Create a {@code DelegatingGauge} with {@code perWorkerGauge} set to false. + * + * @param name Metric name for this metric. + * @param processWideContainer Whether this Gauge is stored in the ProcessWide container or the + * current thread's container. + */ + public DelegatingGauge(MetricName name, boolean processWideContainer) { + this(name, processWideContainer, false); + } + + /** + * @param name Metric name for this metric. + * @param processWideContainer Whether this gauge is stored in the ProcessWide container or the + * current thread's container. + * @param perWorkerGauge Whether this gauge refers to a perWorker metric or not. + */ + public DelegatingGauge(MetricName name, boolean processWideContainer, boolean perWorkerGauge) { + this.name = name; + this.processWideContainer = processWideContainer; + this.perWorkerGauge = perWorkerGauge; + } + + /** Set the gauge. */ + @Override + public void set(long n) { + MetricsContainer container = + this.processWideContainer + ? MetricsEnvironment.getProcessWideContainer() + : MetricsEnvironment.getCurrentContainer(); + if (container == null) { + return; + } + if (perWorkerGauge) { + container.getPerWorkerGauge(name).set(n); + } else { + container.getGauge(name).set(n); + } + } + + @Override + public MetricName getName() { + return name; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java index 0c4766bb2c0b..2d07df1d899b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java @@ -75,6 +75,14 @@ default Histogram getPerWorkerHistogram( return NoOpHistogram.getInstance(); } + /** + * Return the {@link Gauge} that should be used for implementing the given per-worker {@code + * metricName} in this container. + */ + default Gauge getPerWorkerGauge(MetricName metricName) { + return NoOpGauge.getInstance(); + } + /** Return the cumulative values for any metrics in this container as MonitoringInfos. */ default Iterable getMonitoringInfos() { throw new RuntimeException("getMonitoringInfos is not implemented on this MetricsContainer."); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpGauge.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpGauge.java new file mode 100644 index 000000000000..39d3186bba66 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpGauge.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +/** + * A no-op implementation of Gauge. This class exists to provide a default if an implementation of + * MetricsContainer does not override a Gauge getter. + */ +public class NoOpGauge implements Gauge { + + private static final NoOpGauge singleton = new NoOpGauge(); + private static final MetricName name = MetricName.named(NoOpGauge.class, "singleton"); + + private NoOpGauge() {} + + @Override + public void set(long n) {} + + @Override + public MetricName getName() { + return name; + } + + public static NoOpGauge getInstance() { + return singleton; + } +} diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java index 147a30dcdd1a..398e627b4fda 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java @@ -21,9 +21,13 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Histogram; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,8 +37,12 @@ public interface KafkaMetrics { void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime); + void updateBacklogBytes(String topic, int partitionId, long backlog); + void updateKafkaMetrics(); + void recordBacklogBytes(String topic, int partitionId, long backlog); + /** No-op implementation of {@code KafkaResults}. */ class NoOpKafkaMetrics implements KafkaMetrics { private NoOpKafkaMetrics() {} @@ -42,9 +50,15 @@ private NoOpKafkaMetrics() {} @Override public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {} + @Override + public void updateBacklogBytes(String topic, int partitionId, long elapsedTime) {} + @Override public void updateKafkaMetrics() {} + @Override + public void recordBacklogBytes(String topic, int partitionId, long backlog) {}; + private static NoOpKafkaMetrics singleton = new NoOpKafkaMetrics(); static NoOpKafkaMetrics getInstance() { @@ -71,11 +85,17 @@ abstract class KafkaMetricsImpl implements KafkaMetrics { abstract HashMap> perTopicRpcLatencies(); + static ConcurrentHashMap backlogGauges = new ConcurrentHashMap(); + + abstract HashMap perTopicPartitionBacklogs(); + abstract AtomicBoolean isWritable(); public static KafkaMetricsImpl create() { return new AutoValue_KafkaMetrics_KafkaMetricsImpl( - new HashMap>(), new AtomicBoolean(true)); + new HashMap>(), + new HashMap(), + new AtomicBoolean(true)); } /** Record the rpc status and latency of a successful Kafka poll RPC call. */ @@ -93,6 +113,23 @@ public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) { } } + /** + * This is for tracking backlog bytes to be added to the Metric Container at a later time. + * + * @param topicName topicName + * @param partitionId partitionId for the topic Only included in the metric key if + * 'supportsMetricsDeletion' is enabled. + * @param backlog backlog for the topic Only included in the metric key if + * 'supportsMetricsDeletion' is enabled. + */ + @Override + public void updateBacklogBytes(String topicName, int partitionId, long backlog) { + if (isWritable().get()) { + String name = KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId).getName(); + perTopicPartitionBacklogs().put(name, backlog); + } + } + /** Record rpc latency histogram metrics for all recorded topics. */ private void recordRpcLatencyMetrics() { for (Map.Entry> topicLatencies : @@ -114,6 +151,31 @@ private void recordRpcLatencyMetrics() { } } + private void recordBacklogBytesInternal() { + for (Map.Entry backlogs : perTopicPartitionBacklogs().entrySet()) { + Gauge gauge = + KafkaSinkMetrics.createBacklogGauge(MetricName.named("KafkaSink", backlogs.getKey())); + gauge.set(backlogs.getValue()); + } + } + + /** + * This is for recording backlog bytes on the current thread. + * + * @param topicName topicName + * @param partitionId partitionId for the topic Only included in the metric key if + * 'supportsMetricsDeletion' is enabled. + * @param backlogBytes backlog for the topic Only included in the metric key if + * 'supportsMetricsDeletion' is enabled. + */ + @Override + public void recordBacklogBytes(String topicName, int partitionId, long backlogBytes) { + Gauge perPartion = + Metrics.gauge( + "KafkaSink", KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId).getName()); + perPartion.set(backlogBytes); + } + /** * Export all metrics recorded in this instance to the underlying {@code perWorkerMetrics} * containers. This function will only report metrics once per instance. Subsequent calls to @@ -125,6 +187,7 @@ public void updateKafkaMetrics() { LOG.warn("Updating stale Kafka metrics container"); return; } + recordBacklogBytesInternal(); recordRpcLatencyMetrics(); } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java index f71926f97d27..e0eddcb0beb1 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java @@ -17,7 +17,9 @@ */ package org.apache.beam.sdk.io.kafka; +import org.apache.beam.sdk.metrics.DelegatingGauge; import org.apache.beam.sdk.metrics.DelegatingHistogram; +import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.LabeledMetricNameUtils; import org.apache.beam.sdk.metrics.MetricName; @@ -40,6 +42,7 @@ public class KafkaSinkMetrics { // Base Metric names private static final String RPC_LATENCY = "RpcLatency"; + private static final String ESTIAMTED_BACKLOG_SIZE = "EstimatedBacklogSize"; // Kafka Consumer Method names enum RpcMethod { @@ -49,6 +52,7 @@ enum RpcMethod { // Metric labels private static final String TOPIC_LABEL = "topic_name"; private static final String RPC_METHOD = "rpc_method"; + private static final String PARTITION_ID = "partition_id"; /** * Creates an Histogram metric to record RPC latency. Metric will have name. @@ -71,6 +75,48 @@ public static Histogram createRPCLatencyHistogram(RpcMethod method, String topic return new DelegatingHistogram(metricName, buckets, false, true); } + /** + * Creates an Gauge metric to record per partition backlog. Metric will have name: + * + *

'EstimatedBacklogSize*topic_name:{topic};partitionId:{partitionId};' + * + * @param topic Kafka topic associated with this metric. + * @param partitionId partition id associated with this metric. + * @return Counter. + */ + public static Gauge createBacklogGauge(String topic, int partitionId) { + return new DelegatingGauge(getMetricGaugeName(topic, partitionId), false, true); + } + + /** + * Creates an Gauge metric to record per partition backlog. Metric will have name: + * + *

'name' + * + * @param name MetricName for the KafkaSink. + * @return Counter. + */ + public static Gauge createBacklogGauge(MetricName name) { + return new DelegatingGauge(name, false, true); + } + + /** + * Creates an MetricName based on topic name and partition id. + * + *

'EstimatedBacklogSize*topic_name:{topic};partition_id:{partitionId};' + * + * @param topic Kafka topic associated with this metric. + * @param partitionId partition id associated with this metric. + * @return MetricName. + */ + public static MetricName getMetricGaugeName(String topic, int partitionId) { + LabeledMetricNameUtils.MetricNameBuilder nameBuilder = + LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(ESTIAMTED_BACKLOG_SIZE); + nameBuilder.addLabel(PARTITION_ID, String.valueOf(partitionId)); + nameBuilder.addLabel(TOPIC_LABEL, topic); + return nameBuilder.build(METRICS_NAMESPACE); + } + /** * Returns a container to store metrics for Kafka metrics in Unbounded Readed. If these metrics * are disabled, then we return a no-op container. diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index ab9e26b3b740..5755e7179e6d 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -231,8 +231,9 @@ public boolean advance() throws IOException { kafkaResults.updateKafkaMetrics(); return true; } else { // -- (b) + kafkaResults = KafkaSinkMetrics.kafkaMetrics(); nextBatch(); - + kafkaResults.updateKafkaMetrics(); if (!curBatch.hasNext()) { return false; } @@ -302,13 +303,11 @@ public Instant getCurrentTimestamp() throws NoSuchElementException { @Override public long getSplitBacklogBytes() { long backlogBytes = 0; - for (PartitionState p : partitionStates) { long pBacklog = p.approxBacklogInBytes(); if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) { return UnboundedReader.BACKLOG_UNKNOWN; } - backlogBytes += pBacklog; } return backlogBytes; @@ -455,6 +454,10 @@ private static class PartitionState { this.timestampPolicy = timestampPolicy; } + public TopicPartition topicPartition() { + return topicPartition; + } + // Update consumedOffset, avgRecordSize, and avgOffsetGap void recordConsumed(long offset, int size, long offsetGap) { nextOffset = offset + 1; @@ -672,6 +675,7 @@ private void nextBatch() throws IOException { partitionStates.forEach(p -> p.recordIter = records.records(p.topicPartition).iterator()); reportBacklog(); + reportBacklogMetrics(); // cycle through the partitions in order to interleave records from each. curBatch = Iterators.cycle(new ArrayList<>(partitionStates)); @@ -743,6 +747,16 @@ private void reportBacklog() { backlogElementsOfSplit.set(splitBacklogMessages); } + private void reportBacklogMetrics() { + for (PartitionState p : partitionStates) { + long pBacklog = p.approxBacklogInBytes(); + if (pBacklog != UnboundedReader.BACKLOG_UNKNOWN) { + kafkaResults.updateBacklogBytes( + p.topicPartition().topic(), p.topicPartition().partition(), pBacklog); + } + } + } + private long getSplitBacklogMessageCount() { long backlogCount = 0; diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 1cf4aad34e4e..3d6cc910d009 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -549,7 +549,6 @@ public ProcessContinuation processElement( } } } - backlogBytes.set( (long) (BigDecimal.valueOf( @@ -558,6 +557,17 @@ public ProcessContinuation processElement( .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) .doubleValue() * avgRecordSize.estimateRecordByteSizeToOffsetCountRatio())); + KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics(); + kafkaResults.recordBacklogBytes( + kafkaSourceDescriptor.getTopic(), + kafkaSourceDescriptor.getPartition(), + (long) + (BigDecimal.valueOf( + Preconditions.checkStateNotNull( + offsetEstimatorCache.get(kafkaSourceDescriptor).estimate())) + .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) + .doubleValue() + * avgRecordSize.estimateRecordByteSizeToOffsetCountRatio())); } } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java index b84e143be773..e9339878d42f 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java @@ -24,7 +24,9 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.runners.core.metrics.GaugeCell; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; +import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsEnvironment; @@ -59,6 +61,9 @@ public static class TestMetricsContainer extends MetricsContainerImpl { perWorkerHistograms = new ConcurrentHashMap, TestHistogram>(); + public ConcurrentHashMap perWorkerGauges = + new ConcurrentHashMap(); + public TestMetricsContainer() { super("TestStep"); } @@ -70,9 +75,16 @@ public Histogram getPerWorkerHistogram( return perWorkerHistograms.get(KV.of(metricName, bucketType)); } + @Override + public Gauge getPerWorkerGauge(MetricName metricName) { + perWorkerGauges.computeIfAbsent(metricName, name -> new GaugeCell(metricName)); + return perWorkerGauges.get(metricName); + } + @Override public void reset() { perWorkerHistograms.clear(); + perWorkerGauges.clear(); } } @@ -83,10 +95,11 @@ public void testNoOpKafkaMetrics() throws Exception { KafkaMetrics results = KafkaMetrics.NoOpKafkaMetrics.getInstance(); results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10)); - + results.updateBacklogBytes("test-topic", 0, 10); results.updateKafkaMetrics(); assertThat(testContainer.perWorkerHistograms.size(), equalTo(0)); + assertThat(testContainer.perWorkerGauges.size(), equalTo(0)); } @Test @@ -99,6 +112,7 @@ public void testKafkaRPCLatencyMetrics() throws Exception { KafkaMetrics results = KafkaSinkMetrics.kafkaMetrics(); results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10)); + results.updateBacklogBytes("test-topic", 0, 10); results.updateKafkaMetrics(); // RpcLatency*rpc_method:POLL;topic_name:test-topic @@ -110,6 +124,11 @@ public void testKafkaRPCLatencyMetrics() throws Exception { assertThat( testContainer.perWorkerHistograms.get(KV.of(histogramName, bucketType)).values, containsInAnyOrder(Double.valueOf(10.0))); + + MetricName gaugeName = + MetricName.named("KafkaSink", "EstimatedBacklogSize*partition_id:0;topic_name:test-topic;"); + assertThat(testContainer.perWorkerGauges.size(), equalTo(1)); + assertThat(testContainer.perWorkerGauges.get(gaugeName).getCumulative().value(), equalTo(10L)); } @Test