Skip to content

Commit

Permalink
Kafka add gauge v1 (#33408)
Browse files Browse the repository at this point in the history
* add counter stuff

* Address John's comments about separting conversion and validation checks

* address Steven's comments

* another round of comments

---------

Co-authored-by: Naireen <[email protected]>
  • Loading branch information
Naireen and Naireen authored Jan 24, 2025
1 parent e7b4d63 commit 2064103
Show file tree
Hide file tree
Showing 12 changed files with 504 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version
google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20241222-2.0.0", // [bomupgrader] sets version
google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20240310-2.0.0", // [bomupgrader] sets version
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20240817-$google_clients_version",
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20241209-$google_clients_version",
google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version",
google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version",
google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20241206-2.0.0", // [bomupgrader] sets version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ public Gauge getGauge(MetricName metricName) {
return getCurrentContainer().getGauge(metricName);
}

@Override
public Gauge getPerWorkerGauge(MetricName metricName) {
Gauge gauge = getCurrentContainer().getPerWorkerGauge(metricName);
return gauge;
}

@Override
public StringSet getStringSet(MetricName metricName) {
return getCurrentContainer().getStringSet(metricName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,44 @@

import com.google.api.services.dataflow.model.Base2Exponent;
import com.google.api.services.dataflow.model.BucketOptions;
import com.google.api.services.dataflow.model.DataflowGaugeValue;
import com.google.api.services.dataflow.model.DataflowHistogramValue;
import com.google.api.services.dataflow.model.Linear;
import com.google.api.services.dataflow.model.MetricValue;
import com.google.api.services.dataflow.model.OutlierStats;
import com.google.api.services.dataflow.model.PerStepNamespaceMetrics;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.metrics.LabeledMetricNameUtils;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.HistogramData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Converts metric updates to {@link PerStepNamespaceMetrics} protos. Currently we only support
* converting metrics from {@link BigQuerySinkMetrics} and from {@link KafkaSinkMetrics} with this
* converter.
*/
public class MetricsToPerStepNamespaceMetricsConverter {

private static final Logger LOG =
LoggerFactory.getLogger(MetricsToPerStepNamespaceMetricsConverter.class);

// Avoids to introduce mandatory kafka-io dependency to Dataflow worker
// keep in sync with org.apache.beam.sdk.io.kafka.KafkaSinkMetrics.METRICS_NAMESPACE
public static String KAFKA_SINK_METRICS_NAMESPACE = "KafkaSink";
private static String[] SUPPORTED_NAMESPACES = {
KAFKA_SINK_METRICS_NAMESPACE, BigQuerySinkMetrics.METRICS_NAMESPACE
};

private static Optional<LabeledMetricNameUtils.ParsedMetricName> getParsedMetricName(
MetricName metricName,
Expand All @@ -59,20 +71,35 @@ private static Optional<LabeledMetricNameUtils.ParsedMetricName> getParsedMetric
return parsedMetricName;
}

/**
* @param metricName The {@link MetricName} that represents this metric.
* @return boolean If the metric is from a supported namespace.
*/
private static boolean isNameSpaceSupported(MetricName metricName) {
boolean isValidNameSpace =
Stream.of(SUPPORTED_NAMESPACES).anyMatch(x -> x.equals(metricName.getNamespace()));
if (!isValidNameSpace) {
LOG.warn(
"Dropping metric {} since {} is not one of the supported namespaces: {}",
metricName,
metricName.getNamespace(),
Arrays.toString(SUPPORTED_NAMESPACES));
}
return isValidNameSpace;
}

/**
* @param metricName The {@link MetricName} that represents this counter.
* @param value The counter value.
* @return If the conversion succeeds, {@code MetricValue} that represents this counter. Otherwise
* returns an empty optional
* return an empty optional
*/
private static Optional<MetricValue> convertCounterToMetricValue(
MetricName metricName,
Long value,
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedPerWorkerMetricsCache) {

if (value == 0
|| (!metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE)
&& !metricName.getNamespace().equals(KAFKA_SINK_METRICS_NAMESPACE))) {
if (value == 0) {
return Optional.empty();
}

Expand All @@ -86,6 +113,33 @@ private static Optional<MetricValue> convertCounterToMetricValue(
.setValueInt64(value));
}

/**
* @param metricName The {@link MetricName} that represents this counter.
* @param value The counter value.
* @return If the conversion succeeds, {@code MetricValue} that represents this counter. Otherwise
* returns an empty optional
*/
private static Optional<MetricValue> convertGaugeToMetricValue(
MetricName metricName,
Long value,
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedPerWorkerMetricsCache) {

Optional<LabeledMetricNameUtils.ParsedMetricName> labeledName =
getParsedMetricName(metricName, parsedPerWorkerMetricsCache);
if (!labeledName.isPresent() || labeledName.get().getBaseName().isEmpty()) {
return Optional.empty();
}

DataflowGaugeValue gauge_value = new DataflowGaugeValue();
gauge_value.setValue(value);

return Optional.of(
new MetricValue()
.setMetric(labeledName.get().getBaseName())
.setMetricLabels(labeledName.get().getMetricLabels())
.setValueGauge64(gauge_value));
}

/**
* Adds {@code outlierStats} to {@code outputHistogram} if {@code inputHistogram} has recorded
* overflow or underflow values.
Expand Down Expand Up @@ -196,13 +250,19 @@ private static Optional<MetricValue> convertHistogramToMetricValue(
public static Collection<PerStepNamespaceMetrics> convert(
String stepName,
Map<MetricName, Long> counters,
Map<MetricName, Long> gauges,
Map<MetricName, LockFreeHistogram.Snapshot> histograms,
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedPerWorkerMetricsCache) {

Map<String, PerStepNamespaceMetrics> metricsByNamespace = new HashMap<>();
for (Entry<MetricName, Long> entry : counters.entrySet()) {
MetricName metricName = entry.getKey();

boolean validNameSpace = isNameSpaceSupported(metricName);
if (!validNameSpace) {
continue;
}

Optional<MetricValue> metricValue =
convertCounterToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache);
if (!metricValue.isPresent()) {
Expand All @@ -225,6 +285,11 @@ public static Collection<PerStepNamespaceMetrics> convert(

for (Entry<MetricName, LockFreeHistogram.Snapshot> entry : histograms.entrySet()) {
MetricName metricName = entry.getKey();

boolean validNameSpace = isNameSpaceSupported(metricName);
if (!validNameSpace) {
continue;
}
Optional<MetricValue> metricValue =
convertHistogramToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache);
if (!metricValue.isPresent()) {
Expand All @@ -245,6 +310,32 @@ public static Collection<PerStepNamespaceMetrics> convert(
stepNamespaceMetrics.getMetricValues().add(metricValue.get());
}

for (Entry<MetricName, Long> entry : gauges.entrySet()) {
MetricName metricName = entry.getKey();

boolean validNameSpace = isNameSpaceSupported(metricName);
if (!validNameSpace) {
continue;
}

Optional<MetricValue> metricValue =
convertGaugeToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache);
if (!metricValue.isPresent()) {
continue;
}

PerStepNamespaceMetrics stepNamespaceMetrics =
metricsByNamespace.get(metricName.getNamespace());
if (stepNamespaceMetrics == null) {
stepNamespaceMetrics =
new PerStepNamespaceMetrics()
.setMetricValues(new ArrayList<>())
.setOriginalStep(stepName)
.setMetricsNamespace(metricName.getNamespace());
metricsByNamespace.put(metricName.getNamespace(), stepNamespaceMetrics);
}
stepNamespaceMetrics.getMetricValues().add(metricValue.get());
}
return metricsByNamespace.values();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class StreamingStepMetricsContainer implements MetricsContainer {

private final String stepName;

private static boolean enablePerWorkerMetrics = false;
Expand All @@ -69,6 +70,9 @@ public class StreamingStepMetricsContainer implements MetricsContainer {

private MetricsMap<MetricName, GaugeCell> gauges = new MetricsMap<>(GaugeCell::new);

private final ConcurrentHashMap<MetricName, GaugeCell> perWorkerGauges =
new ConcurrentHashMap<>();

private MetricsMap<MetricName, StringSetCell> stringSet = new MetricsMap<>(StringSetCell::new);

private MetricsMap<MetricName, DeltaDistributionCell> distributions =
Expand Down Expand Up @@ -163,6 +167,19 @@ public Gauge getGauge(MetricName metricName) {
return gauges.get(metricName);
}

@Override
public Gauge getPerWorkerGauge(MetricName metricName) {
if (!enablePerWorkerMetrics) {
return MetricsContainer.super.getPerWorkerGauge(metricName);
}
Gauge val = perWorkerGauges.get(metricName);
if (val != null) {
return val;
}

return perWorkerGauges.computeIfAbsent(metricName, name -> new GaugeCell(metricName));
}

@Override
public StringSet getStringSet(MetricName metricName) {
return stringSet.get(metricName);
Expand Down Expand Up @@ -330,11 +347,10 @@ private void deleteStaleCounters(
@VisibleForTesting
Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
ConcurrentHashMap<MetricName, Long> counters = new ConcurrentHashMap<MetricName, Long>();
ConcurrentHashMap<MetricName, Long> gauges = new ConcurrentHashMap<MetricName, Long>();
ConcurrentHashMap<MetricName, LockFreeHistogram.Snapshot> histograms =
new ConcurrentHashMap<MetricName, LockFreeHistogram.Snapshot>();
HashSet<MetricName> currentZeroValuedCounters = new HashSet<MetricName>();

// Extract metrics updates.
perWorkerCounters.forEach(
(k, v) -> {
Long val = v.getAndSet(0);
Expand All @@ -344,6 +360,13 @@ Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
}
counters.put(k, val);
});

perWorkerGauges.forEach(
(k, v) -> {
Long val = v.getCumulative().value();
gauges.put(k, val);
v.reset();
});
perWorkerHistograms.forEach(
(k, v) -> {
v.getSnapshotAndReset().ifPresent(snapshot -> histograms.put(k, snapshot));
Expand All @@ -352,7 +375,7 @@ Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock));

return MetricsToPerStepNamespaceMetricsConverter.convert(
stepName, counters, histograms, parsedPerWorkerMetricsCache);
stepName, counters, gauges, histograms, parsedPerWorkerMetricsCache);
}

/**
Expand Down
Loading

0 comments on commit 2064103

Please sign in to comment.