Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka add counters v1 uw2 #33503

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version
google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20240919-2.0.0", // [bomupgrader] sets version
google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20240310-2.0.0", // [bomupgrader] sets version
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20240817-$google_clients_version",
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20241209-$google_clients_version",
google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version",
google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version",
google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20240924-2.0.0", // [bomupgrader] sets version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ message MonitoringInfo {
SPANNER_TABLE_ID = 25 [(label_props) = { name: "SPANNER_TABLE_ID" }];
SPANNER_INSTANCE_ID = 26 [(label_props) = { name: "SPANNER_INSTANCE_ID" }];
SPANNER_QUERY_NAME = 27 [(label_props) = { name: "SPANNER_QUERY_NAME" }];
PER_WORKER_METRIC = 28 [(label_props) = { name: "PER_WORKER_METRIC" }];
}

// A set of key and value labels which define the scope of the metric. For
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,13 @@ public MetricUpdates getUpdates() {
.setLabel(MonitoringInfoConstants.Labels.NAME, metricKey.metricName().getName())
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, metricKey.stepName());
}

// Based on namespace, add per worker metrics label to enable separate runner based sink based
// processing.
if (metricName.getNamespace().equals("BigQuerySink")
|| metricName.getNamespace().equals("KafkaSink")) {
builder.setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true");
}
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public static final class Labels {
public static final String SPANNER_DATABASE_ID = "SPANNER_DATABASE_ID";
public static final String SPANNER_INSTANCE_ID = "SPANNER_INSTANCE_ID";
public static final String SPANNER_QUERY_NAME = "SPANNER_QUERY_NAME";
public static final String PER_WORKER_METRIC = "PER_WORKER_METRIC";

static {
// Validate that compile time constants match the values stored in the protos.
Expand Down Expand Up @@ -148,6 +149,7 @@ public static final class Labels {
SPANNER_INSTANCE_ID.equals(extractLabel(MonitoringInfoLabels.SPANNER_INSTANCE_ID)));
checkArgument(
SPANNER_QUERY_NAME.equals(extractLabel(MonitoringInfoLabels.SPANNER_QUERY_NAME)));
checkArgument(PER_WORKER_METRIC.equals(extractLabel(MonitoringInfoLabels.PER_WORKER_METRIC)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,54 @@ public void testMonitoringInfosArePopulatedForUserCounters() {
assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build()));
}

@Test
public void testMonitoringInfosLabelsArePopulatedForSinkCounter() {
MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
CounterCell c1 = testObject.getCounter(MetricName.named("KafkaSink", "name1"));
CounterCell c2 = testObject.getCounter(MetricName.named("BigQuerySink", "name2"));
CounterCell c3 = testObject.getCounter(MetricName.named("PS", "name3"));

c1.inc(2L);
c2.inc(4L);
c3.inc(5L);

SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
builder1
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "KafkaSink")
.setLabel(MonitoringInfoConstants.Labels.NAME, "name1")
.setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true")
.setInt64SumValue(2)
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");

SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder();
builder2
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "BigQuerySink")
.setLabel(MonitoringInfoConstants.Labels.NAME, "name2")
.setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true")
.setInt64SumValue(4)
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");

// Not in an supported namespace, so extra metadata isn't added.
SimpleMonitoringInfoBuilder builder3 = new SimpleMonitoringInfoBuilder();
builder3
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "PS")
.setLabel(MonitoringInfoConstants.Labels.NAME, "name3")
.setInt64SumValue(5)
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");

ArrayList<MonitoringInfo> actualMonitoringInfos = new ArrayList<MonitoringInfo>();
for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
actualMonitoringInfos.add(mi);
}

assertThat(
actualMonitoringInfos,
containsInAnyOrder(builder1.build(), builder2.build(), builder3.build()));
}

@Test
public void testMonitoringInfosArePopulatedForUserDistributions() {
MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ public Gauge getGauge(MetricName metricName) {
return getCurrentContainer().getGauge(metricName);
}

@Override
public Gauge getPerWorkerGauge(MetricName metricName) {
Gauge gauge = getCurrentContainer().getPerWorkerGauge(metricName);
return gauge;
}

@Override
public StringSet getStringSet(MetricName metricName) {
return getCurrentContainer().getStringSet(metricName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,44 @@

import com.google.api.services.dataflow.model.Base2Exponent;
import com.google.api.services.dataflow.model.BucketOptions;
import com.google.api.services.dataflow.model.DataflowGaugeValue;
import com.google.api.services.dataflow.model.DataflowHistogramValue;
import com.google.api.services.dataflow.model.Linear;
import com.google.api.services.dataflow.model.MetricValue;
import com.google.api.services.dataflow.model.OutlierStats;
import com.google.api.services.dataflow.model.PerStepNamespaceMetrics;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.metrics.LabeledMetricNameUtils;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.HistogramData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Converts metric updates to {@link PerStepNamespaceMetrics} protos. Currently we only support
* converting metrics from {@link BigQuerySinkMetrics} and from {@link KafkaSinkMetrics} with this
* converter.
*/
public class MetricsToPerStepNamespaceMetricsConverter {

private static final Logger LOG =
LoggerFactory.getLogger(MetricsToPerStepNamespaceMetricsConverter.class);

// Avoids to introduce mandatory kafka-io dependency to Dataflow worker
// keep in sync with org.apache.beam.sdk.io.kafka.KafkaSinkMetrics.METRICS_NAMESPACE
public static String KAFKA_SINK_METRICS_NAMESPACE = "KafkaSink";
private static String[] SUPPORTED_NAMESPACES = {
KAFKA_SINK_METRICS_NAMESPACE, BigQuerySinkMetrics.METRICS_NAMESPACE
};

private static Optional<LabeledMetricNameUtils.ParsedMetricName> getParsedMetricName(
MetricName metricName,
Expand All @@ -59,6 +71,23 @@ private static Optional<LabeledMetricNameUtils.ParsedMetricName> getParsedMetric
return parsedMetricName;
}

/**
* @param metricName The {@link MetricName} that represents this metric.
* @return boolean If the metric is from a supported namespace.
*/
private static boolean isNameSpaceSupported(MetricName metricName) {
boolean isValidNameSpace =
Stream.of(SUPPORTED_NAMESPACES).anyMatch(x -> x.equals(metricName.getNamespace()));
if (!isValidNameSpace) {
LOG.warn(
"Dropping metric {} since {} is not one of the supported namespaces: {}",
metricName,
metricName.getNamespace(),
Arrays.toString(SUPPORTED_NAMESPACES));
}
return isValidNameSpace;
}

/**
* @param metricName The {@link MetricName} that represents this counter.
* @param value The counter value.
Expand All @@ -70,9 +99,7 @@ private static Optional<MetricValue> convertCounterToMetricValue(
Long value,
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedPerWorkerMetricsCache) {

if (value == 0
|| (!metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE)
&& !metricName.getNamespace().equals(KAFKA_SINK_METRICS_NAMESPACE))) {
if (value == 0) {
return Optional.empty();
}

Expand All @@ -86,6 +113,33 @@ private static Optional<MetricValue> convertCounterToMetricValue(
.setValueInt64(value));
}

/**
* @param metricName The {@link MetricName} that represents this counter.
* @param value The counter value.
* @return If the conversion succeeds, {@code MetricValue} that represents this counter. Otherwise
* returns an empty optional
*/
private static Optional<MetricValue> convertGaugeToMetricValue(
MetricName metricName,
Long value,
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedPerWorkerMetricsCache) {

Optional<LabeledMetricNameUtils.ParsedMetricName> labeledName =
getParsedMetricName(metricName, parsedPerWorkerMetricsCache);
if (!labeledName.isPresent() || labeledName.get().getBaseName().isEmpty()) {
return Optional.empty();
}

DataflowGaugeValue gauge_value = new DataflowGaugeValue();
gauge_value.setValue(value);

return Optional.of(
new MetricValue()
.setMetric(labeledName.get().getBaseName())
.setMetricLabels(labeledName.get().getMetricLabels())
.setValueGauge64(gauge_value));
}

/**
* Adds {@code outlierStats} to {@code outputHistogram} if {@code inputHistogram} has recorded
* overflow or underflow values.
Expand Down Expand Up @@ -196,13 +250,19 @@ private static Optional<MetricValue> convertHistogramToMetricValue(
public static Collection<PerStepNamespaceMetrics> convert(
String stepName,
Map<MetricName, Long> counters,
Map<MetricName, Long> gauges,
Map<MetricName, LockFreeHistogram.Snapshot> histograms,
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedPerWorkerMetricsCache) {

Map<String, PerStepNamespaceMetrics> metricsByNamespace = new HashMap<>();
for (Entry<MetricName, Long> entry : counters.entrySet()) {
MetricName metricName = entry.getKey();

boolean validNameSpace = isNameSpaceSupported(metricName);
if (!validNameSpace) {
continue;
}

Optional<MetricValue> metricValue =
convertCounterToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache);
if (!metricValue.isPresent()) {
Expand All @@ -225,6 +285,11 @@ public static Collection<PerStepNamespaceMetrics> convert(

for (Entry<MetricName, LockFreeHistogram.Snapshot> entry : histograms.entrySet()) {
MetricName metricName = entry.getKey();

boolean validNameSpace = isNameSpaceSupported(metricName);
if (!validNameSpace) {
continue;
}
Optional<MetricValue> metricValue =
convertHistogramToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache);
if (!metricValue.isPresent()) {
Expand All @@ -245,6 +310,32 @@ public static Collection<PerStepNamespaceMetrics> convert(
stepNamespaceMetrics.getMetricValues().add(metricValue.get());
}

for (Entry<MetricName, Long> entry : gauges.entrySet()) {
MetricName metricName = entry.getKey();

boolean validNameSpace = isNameSpaceSupported(metricName);
if (!validNameSpace) {
continue;
}

Optional<MetricValue> metricValue =
convertGaugeToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache);
if (!metricValue.isPresent()) {
continue;
}

PerStepNamespaceMetrics stepNamespaceMetrics =
metricsByNamespace.get(metricName.getNamespace());
if (stepNamespaceMetrics == null) {
stepNamespaceMetrics =
new PerStepNamespaceMetrics()
.setMetricValues(new ArrayList<>())
.setOriginalStep(stepName)
.setMetricsNamespace(metricName.getNamespace());
metricsByNamespace.put(metricName.getNamespace(), stepNamespaceMetrics);
}
stepNamespaceMetrics.getMetricValues().add(metricValue.get());
}
return metricsByNamespace.values();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class StreamingStepMetricsContainer implements MetricsContainer {

private final String stepName;

private static boolean enablePerWorkerMetrics = false;
Expand All @@ -69,6 +70,9 @@ public class StreamingStepMetricsContainer implements MetricsContainer {

private MetricsMap<MetricName, GaugeCell> gauges = new MetricsMap<>(GaugeCell::new);

private final ConcurrentHashMap<MetricName, GaugeCell> perWorkerGauges =
new ConcurrentHashMap<>();

private MetricsMap<MetricName, StringSetCell> stringSet = new MetricsMap<>(StringSetCell::new);

private MetricsMap<MetricName, DeltaDistributionCell> distributions =
Expand Down Expand Up @@ -163,6 +167,19 @@ public Gauge getGauge(MetricName metricName) {
return gauges.get(metricName);
}

@Override
public Gauge getPerWorkerGauge(MetricName metricName) {
if (!enablePerWorkerMetrics) {
return MetricsContainer.super.getPerWorkerGauge(metricName);
}
Gauge val = perWorkerGauges.get(metricName);
if (val != null) {
return val;
}

return perWorkerGauges.computeIfAbsent(metricName, name -> new GaugeCell(metricName));
}

@Override
public StringSet getStringSet(MetricName metricName) {
return stringSet.get(metricName);
Expand Down Expand Up @@ -330,11 +347,10 @@ private void deleteStaleCounters(
@VisibleForTesting
Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
ConcurrentHashMap<MetricName, Long> counters = new ConcurrentHashMap<MetricName, Long>();
ConcurrentHashMap<MetricName, Long> gauges = new ConcurrentHashMap<MetricName, Long>();
ConcurrentHashMap<MetricName, LockFreeHistogram.Snapshot> histograms =
new ConcurrentHashMap<MetricName, LockFreeHistogram.Snapshot>();
HashSet<MetricName> currentZeroValuedCounters = new HashSet<MetricName>();

// Extract metrics updates.
perWorkerCounters.forEach(
(k, v) -> {
Long val = v.getAndSet(0);
Expand All @@ -344,6 +360,13 @@ Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
}
counters.put(k, val);
});

perWorkerGauges.forEach(
(k, v) -> {
Long val = v.getCumulative().value();
gauges.put(k, val);
v.reset();
});
perWorkerHistograms.forEach(
(k, v) -> {
v.getSnapshotAndReset().ifPresent(snapshot -> histograms.put(k, snapshot));
Expand All @@ -352,7 +375,7 @@ Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock));

return MetricsToPerStepNamespaceMetricsConverter.convert(
stepName, counters, histograms, parsedPerWorkerMetricsCache);
stepName, counters, gauges, histograms, parsedPerWorkerMetricsCache);
}

/**
Expand Down
Loading
Loading